数据库同步 Elasticsearch 后数据不一致,怎么办?

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 数据库同步 Elasticsearch 后数据不一致,怎么办?

1、实战线上问题

  • Q1:Logstash 同步 postgreSQL 到 Elasticsearch 数据不一致。

在使用 Logstash 从 pg 库中将一张表导入到 ES 中时,发现 ES 中的数据量和 PG 库中的这张表的数据量存在较大差距。如何快速比对哪些数据没有插入?导入过程中,Logstash 日志没有异常。PG 中这张表有 7600W。

  • Q2:mq 异步双写数据库、es 的方案中,如何保证数据库数据和 es 数据的一致性?

2、推荐解决方案之一——ID 比较法

如下示例,仅拿问题1举例验证,问题2原理一致。

2.1 方案探讨

要找出哪些数据没有插入到 Elasticsearch 中,可以采用以下方法:

  • 确保 Logstash 配置文件中的 input 插件的 JDBC 驱动程序正确配置,以便从 PostgreSQL 数据库中提取所有数据。注意 statement 参数,确保它选择了所有需要的数据。
  • 检查 Logstash 配置文件的 output 插件,确保正确配置了 Elasticsearch 的连接参数。同时,检查是否有过滤器在导入过程中过滤掉了部分数据。
  • 在 Logstash 配置文件中添加一个 stdout 插件,将从 PostgreSQL 数据库中读取的数据记录到文件中。

例如,可以添加以下内容:

output {
  elasticsearch {
    ...Elasticsearch 配置...
  }
  stdout {
    codec => json_lines
    path => "/path/to/logstash_output.log"
  }
}

将 Logstash 输出文件与 PostgreSQL 数据库中的原始数据进行比较,以找出未导入的数据。可以使用 Python、Shell 脚本或其他编程语言编写一个简单的脚本来执行此操作。

如果 Logstash 输出文件中的记录数与 PostgreSQL 数据库中的记录数一致,但 Elasticsearch 中的记录数不一致,请检查 Elasticsearch 集群的健康状况和日志。确认集群是否在接收和索引数据时遇到问题。

如果问题仍然存在,尝试将批量操作的大小减小,以减轻 Elasticsearch 和 Logstash 的负担。可以通过在 Logstash 配置文件的 output 插件中设置 flush_size 和 idle_flush_time 参数来实现。

处理大量数据时,可能需要调整 Logstash 和 Elasticsearch 的性能和资源配置。根据硬件和网络条件,可能需要优化批量操作、JVM 设置、线程池大小等方面的设置。

2.2 比较脚本的实现

以下是一个简单的 Shell 脚本示例,用于比较 Logstash 输出文件(JSON 格式)和 PostgreSQL 数据库中的数据。该脚本将比较特定字段(如 id)以确定哪些数据可能未导入到 Elasticsearch。

首先,从 PostgreSQL 数据库中导出数据,将其保存为 CSV 文件:

COPY (SELECT id FROM your_table) TO '/path/to/postgres_data.csv' WITH

接下来,创建一个名为 compare.sh 的 Shell 脚本:

#!/bin/bash
# 将 JSON 文件中的 ID 提取到一个文件中
jq '.id' /path/to/logstash_output.log > logstash_ids.txt
 
# 删除 JSON 中的双引号
sed -i 's/"//g' logstash_ids.txt
 
# 对 Logstash 和 PostgreSQL 的 ID 文件进行排序
sort -n logstash_ids.txt > logstash_ids_sorted.txt
sort -n /path/to/postgres_data.csv > postgres_ids_sorted.txt
 
# 使用 comm 比较两个已排序的 ID 文件
comm -23 postgres_ids_sorted.txt logstash_ids_sorted.txt > missing_ids.txt
 
# 输出结果
echo "以下 ID 在 Logstash 输出文件中未找到:"
cat missing_ids.txt

为脚本添加可执行权限并运行:

chmod +x compare.sh
 
./compare.sh

此脚本会比较 logstash_output.log 和 postgres_data.csv 文件中的 ID。如果发现缺失的 ID,它们将被保存在 missing_ids.txt 文件中,并输出到控制台。请注意,该脚本假设已经安装了 jq(一个命令行 JSON 处理器)。如果没有,请先安装 jq

3、推荐方案二——Redis 加速对比

在这种情况下,可以使用 Redis 的集合数据类型来存储 PostgreSQL 数据库和 Logstash 输出文件中的 ID。接下来,可以使用 Redis 提供的集合操作来找到缺失的 ID。

以下是一个使用 Redis 实现加速比对的示例:

首先,从 PostgreSQL 数据库中导出数据,将其保存为 CSV 文件:

COPY (SELECT id FROM your_table) TO '/path/to/postgres_data.csv' WITH CSV HEADER;

安装并启动 Redis。

使用 Python 脚本将 ID 数据加载到 Redis:

import redis
import csv
 
# 连接到 Redis
 
r = redis.StrictRedis(host='localhost', port=6379, db=0)
 
# 从 PostgreSQL 导出的 CSV 文件中加载数据
with open('/path/to/postgres_data.csv', newline='') as csvfile:
    csv_reader = csv.reader(csvfile)
    next(csv_reader)  # 跳过表头
    for row in csv_reader:
        r.sadd('postgres_ids', row[0])
 
# 从 Logstash 输出文件中加载数据
with open('/path/to/logstash_output.log', newline='') as logstash_file:
    for line in logstash_file:
        id = line.split('"id":')[1].split(',')[0].strip()
        r.sadd('logstash_ids', id)
 
# 计算差集
missing_ids = r.sdiff('postgres_ids', 'logstash_ids')
 
# 输出缺失的 ID
print("以下 ID 在 Logstash 输出文件中未找到:")
for missing_id in missing_ids:
    print(missing_id)

这个 Python 脚本使用 Redis 集合数据类型存储 ID,然后计算它们之间的差集以找到缺失的 ID。需要先安装 Python 的 Redis 库。可以使用以下命令安装:

pip install redis

这个脚本是一个基本示例,可以根据需要修改和扩展它。使用 Redis 的优点是它能在内存中快速处理大量数据,而不需要在磁盘上读取和写入临时文件。

4、小结

方案一:使用 Shell 脚本和 grep 命令

  • 优点:

(1)简单,易于实现。

(2)不需要额外的库或工具。

  • 缺点:

(1)速度较慢,因为它需要在磁盘上读写临时文件。

(2)对于大数据量的情况,可能会导致较高的磁盘 I/O 和内存消耗。

方案二:使用 Redis 实现加速比对

  • 优点:

(1)速度更快,因为 Redis 是基于内存的数据结构存储。

(2)可扩展性较好,可以处理大量数据。

  • 缺点:

(1)实现相对复杂,需要编写额外的脚本。

(2)需要安装和运行 Redis 服务器。

根据需求和数据量,可以选择合适的方案。如果处理的数据量较小,且对速度要求不高,可以选择方案一,使用 Shell 脚本和 grep 命令。这种方法简单易用,但可能在大数据量下表现不佳。

如果需要处理大量数据,建议选择方案二,使用 Redis 实现加速比对。这种方法速度更快,能够有效地处理大数据量。然而,这种方法需要额外的设置和配置,例如安装 Redis 服务器和编写 Python 脚本。

在实际应用中,可能需要根据具体需求进行权衡,以选择最适合的解决方案。

推荐阅读


更短时间更快习得更多干货!

和全球 近2000+ Elastic 爱好者一起精进!

比同事抢先一步学习进阶干货!


相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
1月前
|
存储 人工智能 Cloud Native
云栖重磅|从数据到智能:Data+AI驱动的云原生数据库
在9月20日2024云栖大会上,阿里云智能集团副总裁,数据库产品事业部负责人,ACM、CCF、IEEE会士(Fellow)李飞飞发表《从数据到智能:Data+AI驱动的云原生数据库》主题演讲。他表示,数据是生成式AI的核心资产,大模型时代的数据管理系统需具备多模处理和实时分析能力。阿里云瑶池将数据+AI全面融合,构建一站式多模数据管理平台,以数据驱动决策与创新,为用户提供像“搭积木”一样易用、好用、高可用的使用体验。
云栖重磅|从数据到智能:Data+AI驱动的云原生数据库
|
17天前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
141 61
|
1月前
|
缓存 关系型数据库 MySQL
高并发架构系列:数据库主从同步的 3 种方案
本文详解高并发场景下数据库主从同步的三种解决方案:数据主从同步、数据库半同步复制、数据库中间件同步和缓存记录写key同步,旨在帮助解决数据一致性问题。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
高并发架构系列:数据库主从同步的 3 种方案
|
1月前
|
SQL 关系型数据库 数据库
国产数据实战之docker部署MyWebSQL数据库管理工具
【10月更文挑战第23天】国产数据实战之docker部署MyWebSQL数据库管理工具
110 4
国产数据实战之docker部署MyWebSQL数据库管理工具
|
1月前
|
关系型数据库 分布式数据库 数据库
云栖大会|从数据到决策:AI时代数据库如何实现高效数据管理?
在2024云栖大会「海量数据的高效存储与管理」专场,阿里云瑶池讲师团携手AMD、FunPlus、太美医疗科技、中石化、平安科技以及小赢科技、迅雷集团的资深技术专家深入分享了阿里云在OLTP方向的最新技术进展和行业最佳实践。
|
2月前
|
人工智能 Cloud Native 容灾
云数据库“再进化”,OB Cloud如何打造云时代的数据底座?
云数据库“再进化”,OB Cloud如何打造云时代的数据底座?
|
2月前
|
算法 大数据 数据库
云计算与大数据平台的数据库迁移与同步
本文详细介绍了云计算与大数据平台的数据库迁移与同步的核心概念、算法原理、具体操作步骤、数学模型公式、代码实例及未来发展趋势与挑战。涵盖全量与增量迁移、一致性与异步复制等内容,旨在帮助读者全面了解并应对相关技术挑战。
44 3
|
2月前
|
SQL 存储 关系型数据库
数据储存数据库管理系统(DBMS)
【10月更文挑战第11天】
121 3
|
2月前
|
SQL 存储 关系型数据库
添加数据到数据库的SQL语句详解与实践技巧
在数据库管理中,添加数据是一个基本操作,它涉及到向表中插入新的记录
|
2月前
|
Web App开发 JavaScript Java
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
这篇文章是关于如何使用Spring Boot整合Elasticsearch,并通过REST客户端操作Elasticsearch,实现一个简单的搜索前后端,以及如何爬取京东数据到Elasticsearch的案例教程。
217 0
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。