使用Flume实现MySQL与Kafka实时同步

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 使用Flume实现MySQL与Kafka实时同步

使用Flume实现MySQL与Kafka实时同步

一、Kafka配置

1.创建Topic

./kafka-topics.sh --zookeeper localhost:2181  --topic test1 

2.创建Producer

./kafka-console-producer.sh --broker-list localhost:9092  --topic test1

3.创建Consumer

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test > ../result 2>&1

二、Flume配置

1.下载

http://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

https://github.com/keedio/flume-ng-sql-source/archive/v1.5.2.tar.gz

2.解压

tar -xivf  apache-flume-1.9.0-bin.tar.gz
tar -xivf flume-ng-sql-source-1.5.2.tar.gz

3.编译flume-ng-sql-source jar包

mvn package

将编译好的jar包复制到flume的lib目录下

这边给个编译好的:https://share.weiyun.com/5TKVe54

4.配置文件

conf文件夹下,自己新建一个文件,名字随便起,启动的时候指定该配置文件就行了

a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource
# mysql地址
a1.sources.src-1.hibernate.connection.url = jdbc:mysql://192.168.11.38:13306/ccb_yiqian
# Hibernate Database connection properties
#数据库账号
a1.sources.src-1.hibernate.connection.user = root
#数据库密码
a1.sources.src-1.hibernate.connection.password = jinbill
#是否自动提交
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
#查询间隔
a1.sources.src-1.run.query.delay=100000000
#输出路径
a1.sources.src-1.status.file.path = /home/mysql/flume/apache-flume-1.9.0-bin
#输出文件名称
a1.sources.src-1.status.file.name = sqlSource.status
# Custom query
#从哪里开始读取数据传输
a1.sources.src-1.start.from = 0
#SQL--传什么写什么
a1.sources.src-1.custom.query = SELECT * from  ticket_back_assign
#批量发送数据量 应该是source 发送到 channel 
a1.sources.src-1.batch.size = 1000
#最大查询行数
a1.sources.src-1.max.rows = 100000
a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10
#分割符
a1.sources.sqlSource.delimiter.entry = |
################################################################
a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 1000000
a1.channels.ch-1.transactionCapacity = 1000000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
#a1.channels.ch-1.byteCapacity = 1000000
################################################################
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#要传输的topic
a1.sinks.k1.topic = test1
#broker地址
a1.sinks.k1.brokerList = 192.168.11.38:19092
#ack模式选择 -1.0,1
a1.sinks.k1.requiredAcks = 1
#批量发送数据量 应该是sink发送到 kafka 
a1.sinks.k1.batchSize = 200
a1.sinks.k1.channel = c1
a1.sinks.k1.channel = ch-1
a1.sources.src-1.channels=ch-1
~                               

启动

bin/flume-ng agent -n a1 -c conf -f conf/mysql-flume.conf -Dflume.root.logger=INFO,console

注意事项

1.kafka producer 报错内存不够

./kafka-topics.sh --zookeeper localhost:2181 --alter --topic test1  --config max.message.bytes=4096000 replica.fetch.max.bytes=4096000

2.flume 报错内存不够

注意这几个参数

a1.channels.ch-1.capacity = 1000000
a1.channels.ch-1.transactionCapacity = 1000000
a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 100000
相关文章
|
3月前
|
SQL DataWorks 关系型数据库
阿里云 DataWorks 正式支持 SelectDB & Apache Doris 数据源,实现 MySQL 整库实时同步
阿里云数据库 SelectDB 版是阿里云与飞轮科技联合基于 Apache Doris 内核打造的现代化数据仓库,支持大规模实时数据上的极速查询分析。通过实时、统一、弹性、开放的核心能力,能够为企业提供高性价比、简单易用、安全稳定、低成本的实时大数据分析支持。SelectDB 具备世界领先的实时分析能力,能够实现秒级的数据实时导入与同步,在宽表、复杂多表关联、高并发点查等不同场景下,提供超越一众国际知名的同类产品的优秀性能,多次登顶 ClickBench 全球数据库分析性能排行榜。
|
4月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
376 0
|
4月前
|
消息中间件 监控 关系型数据库
MySQL数据实时同步到Elasticsearch:技术深度解析与实践分享
在当今的数据驱动时代,实时数据同步成为许多应用系统的核心需求之一。MySQL作为关系型数据库的代表,以其强大的事务处理能力和数据完整性保障,广泛应用于各种业务场景中。然而,随着数据量的增长和查询复杂度的提升,单一依赖MySQL进行高效的数据检索和分析变得日益困难。这时,Elasticsearch(简称ES)以其卓越的搜索性能、灵活的数据模式以及强大的可扩展性,成为处理复杂查询需求的理想选择。本文将深入探讨MySQL数据实时同步到Elasticsearch的技术实现与最佳实践。
307 0
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之MySQL到MySOL的批量实时同步该如何操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
消息中间件 数据采集 关系型数据库
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
88 1
|
7月前
|
关系型数据库 MySQL 数据处理
实时计算 Flink版产品使用问题之任务无法实时同步MySQL到StarRocks中修改的数据,是什么原因
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
DataWorks 安全 关系型数据库
DataWorks产品使用合集之如何进行mysql的实时同步
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。z
200 0
|
8月前
|
Java 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何实现MySQL数据库之间的实时同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之如何实现OSS数据到Kafka的实时同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
消息中间件 分布式计算 关系型数据库
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
119 0