开源数据集成平台SeaTunnel:MySQL实时同步到es

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
简介: 免费支持 MySQL 实时同步到 ElasticSearch 的工具很少,Apache SeaTunnel 是一个高性能开源大数据集成工具,提供灵活易用、易扩展并支持千亿级数据集成的解决方案,已经在B站、腾讯云、字节等数百家公司使用。

一、前言

  • 最近,项目有几个表要从 MySQL 实时同步到 另一个 MySQL,也有同步到 ElasticSearch 的。
  • 目前,公司生产环境同步,用的是 阿里云的 DTS,每个同步任务每月 500多元,有点小贵。
  • 其他环境:MySQL同步到ES,用的是 CloudCanal,不支持 数据转换,添加同步字段比较麻烦,社区版限制5个任务,不够用;MySQL同步到MySQL,用的是 debezium,不支持写入 ES。
  • 恰好3年前用过 SeaTunnel 的 前身 WaterDrop,那就开始吧。本文以 2.3.1 版本,Ubuntu 系统为例

二、[开源数据集成平台SeaTunnel]

1. 简介

  • SeaTunnel 是 Apache 软件基金会下的一个高性能开源大数据集成工具,为数据集成场景提供灵活易用、易扩展并支持千亿级数据集成的解决方案。
  • Seaunnel 为实时(CDC)和批量数据提供高性能数据同步能力,支持十种以上数据源,已经在B站、腾讯云、字节等数百家公司使用。
  • 可以选择 SeaTunnel Zeta 引擎上运行,也可以在 Apache Flink 或 Spark 引擎上运行。

seatunnel-architecture.png

2. 安装

Caused by: java.sql.SQLException: No suitable driver
        at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
        at com.zaxxer.hikari.util.DriverDataSource.<init>(DriverDataSource.java:106)
        ... 20 more

        ... 11 more

        at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181)

3. 安装 connectors 插件

  • 执行 bash bin/install-plugin.sh,国内建议先配置 maven 镜像,不然容易失败 或者 慢
  • 官方文档写着执行 sh bin/install-plugin.sh,我在 Ubuntu 20.04.2 LTS 上执行报错(bin/install-plugin.sh: 54: Bad substitution),我提了PR

seatunnel-install-connectors-error.png

4. 编写配置文件

  • config 目录下,新建配置文件:如 mysql-es-test.conf
  • [添加 env 配置]

因为是 实时同步,这里 job.mode = "STREAMING",execution.parallelism 是 并发数

env {
  # You can set flink configuration here
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 2000
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

result_table_name 取个 临时表名,便于后续使用。table-names 必须是 数据库.表名,base-url 必须指定 数据库。
[startup.mode 默认是 INITIAL,先同步历史数据,后增量同步,详情点击]

source {
  MySQL-CDC {
    result_table_name = "t1"
    server-id = 5656
    username = "root"
    password = "pwd"
    table-names = ["db.t1"]
    base-url = "jdbc:mysql://host:3306/db"
  }
}

函数列表请点击

transform {
  Sql {
    source_table_name = "t1"
    query = "SELECT id, alias_name aliasName FROM t1 WHERE c1 = '1'"
  }
}

CDC 实时同步 es,必须配置 primary_keys

sink {
    Elasticsearch {
        hosts = ["host:9200"]
        username = "elastic"
        password = "pwd"

        index = "index_t1"
        # cdc required options
        primary_keys = ["id"]
    }
}
  • 最终配置截图

seatunnel-mysql-cdc-es.png

5. 启动任务

这里以 本地模式为例,另有 集群、spark、flink 模式。

./bin/seatunnel.sh -e local --config ./config/mysql-es-test.conf

三、总结

本文遵守【CC BY-NC】协议,转载请保留原文出处及本版权声明,否则将追究法律责任。
本文首先发布于 https://www.890808.xyz/ ,其他平台需要审核更新慢一些。

相关文章
|
2月前
|
存储 关系型数据库 MySQL
ES的全文索引和MySQL的全文索引有什么区别?如何选择?
【8月更文挑战第26天】ES的全文索引和MySQL的全文索引有什么区别?如何选择?
184 5
|
1月前
|
监控 关系型数据库 MySQL
zabbix agent集成percona监控MySQL的插件实战案例
这篇文章是关于如何使用Percona监控插件集成Zabbix agent来监控MySQL的实战案例。
32 2
zabbix agent集成percona监控MySQL的插件实战案例
|
2天前
|
消息中间件 监控 关系型数据库
MySQL数据实时同步到Elasticsearch:技术深度解析与实践分享
在当今的数据驱动时代,实时数据同步成为许多应用系统的核心需求之一。MySQL作为关系型数据库的代表,以其强大的事务处理能力和数据完整性保障,广泛应用于各种业务场景中。然而,随着数据量的增长和查询复杂度的提升,单一依赖MySQL进行高效的数据检索和分析变得日益困难。这时,Elasticsearch(简称ES)以其卓越的搜索性能、灵活的数据模式以及强大的可扩展性,成为处理复杂查询需求的理想选择。本文将深入探讨MySQL数据实时同步到Elasticsearch的技术实现与最佳实践。
15 0
|
2月前
|
NoSQL 关系型数据库 MySQL
SpringBoot 集成 SpringSecurity + MySQL + JWT 附源码,废话不多直接盘
SpringBoot 集成 SpringSecurity + MySQL + JWT 附源码,废话不多直接盘
100 2
|
2月前
|
Java 关系型数据库 MySQL
SpringBoot 集成 Quartz + MySQL
SpringBoot 集成 Quartz + MySQL
77 1
|
2月前
|
搜索推荐 关系型数据库 MySQL
不引入ES,如何利用MySQL实现模糊匹配?
【8月更文挑战第23天】在数据处理和查询优化的日常工作中,我们常常面临需要执行模糊匹配的场景,比如搜索用户姓名、商品标题等。虽然Elasticsearch(ES)等搜索引擎提供了高效且强大的文本搜索能力,但在某些轻量级或资源受限的环境中,直接利用MySQL数据库实现模糊匹配也是一个经济且可行的选择。下面,我将分享几种在MySQL中实现模糊匹配的技术方法。
47 0
|
2月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之MySQL到MySOL的批量实时同步该如何操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
Java 关系型数据库 MySQL
如何实现Springboot+camunda+mysql的集成
【7月更文挑战第2天】集成Spring Boot、Camunda和MySQL的简要步骤: 1. 初始化Spring Boot项目,添加Camunda和MySQL驱动依赖。 2. 配置`application.properties`,包括数据库URL、用户名和密码。 3. 设置Camunda引擎属性,指定数据源。 4. 引入流程定义文件(如`.bpmn`)。 5. 创建服务处理流程操作,创建控制器接收请求。 6. Camunda自动在数据库创建表结构。 7. 启动应用,测试流程启动,如通过服务和控制器开始流程实例。 示例代码包括服务类启动流程实例及控制器接口。实际集成需按业务需求调整。
229 4
|
3月前
|
分布式计算 关系型数据库 MySQL
MySQL超时参数优化与DataX高效数据同步实践
通过合理设置MySQL的超时参数,可以有效地提升数据库的稳定性和性能。而DataX作为一种高效的数据同步工具,可以帮助企业轻松实现不同数据源之间的数据迁移。无论是优化MySQL参数还是使用DataX进行数据同步,都需要根据具体的应用场景来进行细致的配置和测试,以达到最佳效果。
|
3月前
|
关系型数据库 MySQL 数据处理
实时计算 Flink版产品使用问题之任务无法实时同步MySQL到StarRocks中修改的数据,是什么原因
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章