数据库同步 Elasticsearch 后数据不一致,怎么办?

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: 数据库同步 Elasticsearch 后数据不一致,怎么办?

1、实战线上问题

  • Q1:Logstash 同步 postgreSQL 到 Elasticsearch 数据不一致。

在使用 Logstash 从 pg 库中将一张表导入到 ES 中时,发现 ES 中的数据量和 PG 库中的这张表的数据量存在较大差距。如何快速比对哪些数据没有插入?导入过程中,Logstash 日志没有异常。PG 中这张表有 7600W。

  • Q2:mq 异步双写数据库、es 的方案中,如何保证数据库数据和 es 数据的一致性?

2、推荐解决方案之一——ID 比较法

如下示例,仅拿问题1举例验证,问题2原理一致。

2.1 方案探讨

要找出哪些数据没有插入到 Elasticsearch 中,可以采用以下方法:

  • 确保 Logstash 配置文件中的 input 插件的 JDBC 驱动程序正确配置,以便从 PostgreSQL 数据库中提取所有数据。注意 statement 参数,确保它选择了所有需要的数据。
  • 检查 Logstash 配置文件的 output 插件,确保正确配置了 Elasticsearch 的连接参数。同时,检查是否有过滤器在导入过程中过滤掉了部分数据。
  • 在 Logstash 配置文件中添加一个 stdout 插件,将从 PostgreSQL 数据库中读取的数据记录到文件中。

例如,可以添加以下内容:

output {
  elasticsearch {
    ...Elasticsearch 配置...
  }
  stdout {
    codec => json_lines
    path => "/path/to/logstash_output.log"
  }
}

将 Logstash 输出文件与 PostgreSQL 数据库中的原始数据进行比较,以找出未导入的数据。可以使用 Python、Shell 脚本或其他编程语言编写一个简单的脚本来执行此操作。

如果 Logstash 输出文件中的记录数与 PostgreSQL 数据库中的记录数一致,但 Elasticsearch 中的记录数不一致,请检查 Elasticsearch 集群的健康状况和日志。确认集群是否在接收和索引数据时遇到问题。

如果问题仍然存在,尝试将批量操作的大小减小,以减轻 Elasticsearch 和 Logstash 的负担。可以通过在 Logstash 配置文件的 output 插件中设置 flush_size 和 idle_flush_time 参数来实现。

处理大量数据时,可能需要调整 Logstash 和 Elasticsearch 的性能和资源配置。根据硬件和网络条件,可能需要优化批量操作、JVM 设置、线程池大小等方面的设置。

2.2 比较脚本的实现

以下是一个简单的 Shell 脚本示例,用于比较 Logstash 输出文件(JSON 格式)和 PostgreSQL 数据库中的数据。该脚本将比较特定字段(如 id)以确定哪些数据可能未导入到 Elasticsearch。

首先,从 PostgreSQL 数据库中导出数据,将其保存为 CSV 文件:

COPY (SELECT id FROM your_table) TO '/path/to/postgres_data.csv' WITH

接下来,创建一个名为 compare.sh 的 Shell 脚本:

#!/bin/bash
# 将 JSON 文件中的 ID 提取到一个文件中
jq '.id' /path/to/logstash_output.log > logstash_ids.txt
 
# 删除 JSON 中的双引号
sed -i 's/"//g' logstash_ids.txt
 
# 对 Logstash 和 PostgreSQL 的 ID 文件进行排序
sort -n logstash_ids.txt > logstash_ids_sorted.txt
sort -n /path/to/postgres_data.csv > postgres_ids_sorted.txt
 
# 使用 comm 比较两个已排序的 ID 文件
comm -23 postgres_ids_sorted.txt logstash_ids_sorted.txt > missing_ids.txt
 
# 输出结果
echo "以下 ID 在 Logstash 输出文件中未找到:"
cat missing_ids.txt

为脚本添加可执行权限并运行:

chmod +x compare.sh
 
./compare.sh

此脚本会比较 logstash_output.log 和 postgres_data.csv 文件中的 ID。如果发现缺失的 ID,它们将被保存在 missing_ids.txt 文件中,并输出到控制台。请注意,该脚本假设已经安装了 jq(一个命令行 JSON 处理器)。如果没有,请先安装 jq

3、推荐方案二——Redis 加速对比

在这种情况下,可以使用 Redis 的集合数据类型来存储 PostgreSQL 数据库和 Logstash 输出文件中的 ID。接下来,可以使用 Redis 提供的集合操作来找到缺失的 ID。

以下是一个使用 Redis 实现加速比对的示例:

首先,从 PostgreSQL 数据库中导出数据,将其保存为 CSV 文件:

COPY (SELECT id FROM your_table) TO '/path/to/postgres_data.csv' WITH CSV HEADER;

安装并启动 Redis。

使用 Python 脚本将 ID 数据加载到 Redis:

import redis
import csv
 
# 连接到 Redis
 
r = redis.StrictRedis(host='localhost', port=6379, db=0)
 
# 从 PostgreSQL 导出的 CSV 文件中加载数据
with open('/path/to/postgres_data.csv', newline='') as csvfile:
    csv_reader = csv.reader(csvfile)
    next(csv_reader)  # 跳过表头
    for row in csv_reader:
        r.sadd('postgres_ids', row[0])
 
# 从 Logstash 输出文件中加载数据
with open('/path/to/logstash_output.log', newline='') as logstash_file:
    for line in logstash_file:
        id = line.split('"id":')[1].split(',')[0].strip()
        r.sadd('logstash_ids', id)
 
# 计算差集
missing_ids = r.sdiff('postgres_ids', 'logstash_ids')
 
# 输出缺失的 ID
print("以下 ID 在 Logstash 输出文件中未找到:")
for missing_id in missing_ids:
    print(missing_id)

这个 Python 脚本使用 Redis 集合数据类型存储 ID,然后计算它们之间的差集以找到缺失的 ID。需要先安装 Python 的 Redis 库。可以使用以下命令安装:

pip install redis

这个脚本是一个基本示例,可以根据需要修改和扩展它。使用 Redis 的优点是它能在内存中快速处理大量数据,而不需要在磁盘上读取和写入临时文件。

4、小结

方案一:使用 Shell 脚本和 grep 命令

  • 优点:

(1)简单,易于实现。

(2)不需要额外的库或工具。

  • 缺点:

(1)速度较慢,因为它需要在磁盘上读写临时文件。

(2)对于大数据量的情况,可能会导致较高的磁盘 I/O 和内存消耗。

方案二:使用 Redis 实现加速比对

  • 优点:

(1)速度更快,因为 Redis 是基于内存的数据结构存储。

(2)可扩展性较好,可以处理大量数据。

  • 缺点:

(1)实现相对复杂,需要编写额外的脚本。

(2)需要安装和运行 Redis 服务器。

根据需求和数据量,可以选择合适的方案。如果处理的数据量较小,且对速度要求不高,可以选择方案一,使用 Shell 脚本和 grep 命令。这种方法简单易用,但可能在大数据量下表现不佳。

如果需要处理大量数据,建议选择方案二,使用 Redis 实现加速比对。这种方法速度更快,能够有效地处理大数据量。然而,这种方法需要额外的设置和配置,例如安装 Redis 服务器和编写 Python 脚本。

在实际应用中,可能需要根据具体需求进行权衡,以选择最适合的解决方案。

推荐阅读


更短时间更快习得更多干货!

和全球 近2000+ Elastic 爱好者一起精进!

比同事抢先一步学习进阶干货!


相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。  
相关文章
|
3月前
|
存储 JSON 关系型数据库
【干货满满】解密 API 数据解析:从 JSON 到数据库存储的完整流程
本文详解电商API开发中JSON数据解析与数据库存储的全流程,涵盖数据提取、清洗、转换及优化策略,结合Python实战代码与主流数据库方案,助开发者构建高效、可靠的数据处理管道。
|
8天前
|
数据采集 数据可视化 数据挖掘
阿里云瑶池数据库 Data Agent,数据安全,分析准确,让数据更有价值!
Data Agent 是阿里云瑶池数据库推出的智能数据体产品,融合 Data+AI 与 Agentic AI 技术,覆盖数据全生命周期。支持多源数据接入,可自主规划分析任务、生成代码并输出可视化洞察报告,让业务人员零门槛获取专业级分析结果,助力企业高效实现数据驱动决策。
|
22天前
|
人工智能 Java 关系型数据库
使用数据连接池进行数据库操作
使用数据连接池进行数据库操作
66 11
|
2月前
|
存储 数据管理 数据库
数据字典是什么?和数据库、数据仓库有什么关系?
在数据处理中,你是否常困惑于字段含义、指标计算或数据来源?数据字典正是解答这些问题的关键工具,它清晰定义数据的名称、类型、来源、计算方式等,服务于开发者、分析师和数据管理者。本文详解数据字典的定义、组成及其与数据库、数据仓库的关系,助你夯实数据基础。
数据字典是什么?和数据库、数据仓库有什么关系?
|
6月前
|
存储 缓存 数据库
数据库数据删除策略:硬删除vs软删除的最佳实践指南
在项目开发中,“删除”操作常见但方式多样,主要分为硬删除与软删除。硬删除直接从数据库移除数据,操作简单、高效,但不可恢复;适用于临时或敏感数据。软删除通过标记字段保留数据,支持恢复和审计,但增加查询复杂度与数据量;适合需追踪历史或可恢复的场景。两者各有优劣,实际开发中常结合使用以满足不同需求。
432 4
|
2月前
|
存储 关系型数据库 数据库
【赵渝强老师】PostgreSQL数据库的WAL日志与数据写入的过程
PostgreSQL中的WAL(预写日志)是保证数据完整性的关键技术。在数据修改前,系统会先将日志写入WAL,确保宕机时可通过日志恢复数据。它减少了磁盘I/O,提升了性能,并支持手动切换日志文件。WAL文件默认存储在pg_wal目录下,采用16进制命名规则。此外,PostgreSQL提供pg_waldump工具解析日志内容。
163 0
|
4月前
|
存储 SQL Java
数据存储使用文件还是数据库,哪个更合适?
数据库和文件系统各有优劣:数据库读写性能较低、结构 rigid,但具备计算能力和数据一致性保障;文件系统灵活易管理、读写高效,但缺乏计算能力且无法保证一致性。针对仅需高效存储与灵活管理的场景,文件系统更优,但其计算短板可通过开源工具 SPL(Structured Process Language)弥补。SPL 提供独立计算语法及高性能文件格式(如集文件、组表),支持复杂计算与多源混合查询,甚至可替代数据仓库。此外,SPL 易集成、支持热切换,大幅提升开发运维效率,是后数据库时代文件存储的理想补充方案。
|
7月前
|
数据库 Python
【YashanDB知识库】python驱动查询gbk字符集崖山数据库CLOB字段,数据被驱动截断
【YashanDB知识库】python驱动查询gbk字符集崖山数据库CLOB字段,数据被驱动截断
|
6月前
|
人工智能 关系型数据库 分布式数据库
让数据与AI贴得更近,阿里云瑶池数据库系列产品焕新升级
4月9日阿里云AI势能大会上,阿里云瑶池数据库发布重磅新品及一系列产品能力升级。「推理加速服务」Tair KVCache全新上线,实现KVCache动态分层存储,显著提高内存资源利用率,为大模型推理降本提速。

热门文章

最新文章