使用Flume实现MySQL与Kafka实时同步

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 使用Flume实现MySQL与Kafka实时同步

使用Flume实现MySQL与Kafka实时同步

一、Kafka配置

1.创建Topic

./kafka-topics.sh --zookeeper localhost:2181  --topic test1 

2.创建Producer

./kafka-console-producer.sh --broker-list localhost:9092  --topic test1

3.创建Consumer

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test > ../result 2>&1

二、Flume配置

1.下载

http://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

https://github.com/keedio/flume-ng-sql-source/archive/v1.5.2.tar.gz

2.解压

tar -xivf  apache-flume-1.9.0-bin.tar.gz
tar -xivf flume-ng-sql-source-1.5.2.tar.gz

3.编译flume-ng-sql-source jar包

mvn package

将编译好的jar包复制到flume的lib目录下

这边给个编译好的:https://share.weiyun.com/5TKVe54

4.配置文件

conf文件夹下,自己新建一个文件,名字随便起,启动的时候指定该配置文件就行了

a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource
# mysql地址
a1.sources.src-1.hibernate.connection.url = jdbc:mysql://192.168.11.38:13306/ccb_yiqian
# Hibernate Database connection properties
#数据库账号
a1.sources.src-1.hibernate.connection.user = root
#数据库密码
a1.sources.src-1.hibernate.connection.password = jinbill
#是否自动提交
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
#查询间隔
a1.sources.src-1.run.query.delay=100000000
#输出路径
a1.sources.src-1.status.file.path = /home/mysql/flume/apache-flume-1.9.0-bin
#输出文件名称
a1.sources.src-1.status.file.name = sqlSource.status
# Custom query
#从哪里开始读取数据传输
a1.sources.src-1.start.from = 0
#SQL--传什么写什么
a1.sources.src-1.custom.query = SELECT * from  ticket_back_assign
#批量发送数据量 应该是source 发送到 channel 
a1.sources.src-1.batch.size = 1000
#最大查询行数
a1.sources.src-1.max.rows = 100000
a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10
#分割符
a1.sources.sqlSource.delimiter.entry = |
################################################################
a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 1000000
a1.channels.ch-1.transactionCapacity = 1000000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
#a1.channels.ch-1.byteCapacity = 1000000
################################################################
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#要传输的topic
a1.sinks.k1.topic = test1
#broker地址
a1.sinks.k1.brokerList = 192.168.11.38:19092
#ack模式选择 -1.0,1
a1.sinks.k1.requiredAcks = 1
#批量发送数据量 应该是sink发送到 kafka 
a1.sinks.k1.batchSize = 200
a1.sinks.k1.channel = c1
a1.sinks.k1.channel = ch-1
a1.sources.src-1.channels=ch-1
~                               

启动

bin/flume-ng agent -n a1 -c conf -f conf/mysql-flume.conf -Dflume.root.logger=INFO,console

注意事项

1.kafka producer 报错内存不够

./kafka-topics.sh --zookeeper localhost:2181 --alter --topic test1  --config max.message.bytes=4096000 replica.fetch.max.bytes=4096000

2.flume 报错内存不够

注意这几个参数

a1.channels.ch-1.capacity = 1000000
a1.channels.ch-1.transactionCapacity = 1000000
a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 100000
相关文章
|
2月前
|
消息中间件 关系型数据库 MySQL
Flink问题子实现Kafka到Mysql如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
379 2
|
1月前
|
canal SQL 关系型数据库
MySQL数据直接实时同步到ES
MySQL数据直接实时同步到ES
33 0
|
3月前
|
canal 缓存 SpringCloudAlibaba
Springcloud Alibaba 使用Canal将MySql数据实时同步到Elasticsearch
本篇文章在Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性-CSDN博客 基础上使用canal将mysql数据实时同步到Elasticsearch。
|
3月前
|
canal 缓存 关系型数据库
Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性
canal [kə'næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。其诞生的背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
|
3月前
|
NoSQL Java 关系型数据库
使用Kafka实现Java异步更新通知解决Redis与MySQL数据不一致
使用Kafka实现Java异步更新通知解决Redis与MySQL数据不一致
43 0
|
3月前
|
DataWorks 关系型数据库 MySQL
DataWorks的数据集成实时同步mysql数据吗?
DataWorks的数据集成实时同步mysql数据吗?
120 0
|
4月前
|
消息中间件 关系型数据库 MySQL
在kafka connect 同步 mysql 主从数据库
在kafka connect 同步 mysql 主从数据库
46 0
|
4月前
|
数据可视化 JavaScript 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
44 0
|
4月前
|
SQL 消息中间件 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
77 0
|
4月前
|
SQL 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
63 0