使用Flume实现MySQL与Kafka实时同步
一、Kafka配置
1.创建Topic
./kafka-topics.sh --zookeeper localhost:2181 --topic test1
2.创建Producer
./kafka-console-producer.sh --broker-list localhost:9092 --topic test1
3.创建Consumer
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test > ../result 2>&1
二、Flume配置
1.下载
http://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
https://github.com/keedio/flume-ng-sql-source/archive/v1.5.2.tar.gz
2.解压
tar -xivf apache-flume-1.9.0-bin.tar.gz tar -xivf flume-ng-sql-source-1.5.2.tar.gz
3.编译flume-ng-sql-source jar包
mvn package
将编译好的jar包复制到flume的lib目录下
这边给个编译好的:https://share.weiyun.com/5TKVe54
4.配置文件
conf文件夹下,自己新建一个文件,名字随便起,启动的时候指定该配置文件就行了
a1.channels = ch-1 a1.sources = src-1 a1.sinks = k1 ###########sql source################# # For each one of the sources, the type is defined a1.sources.src-1.type = org.keedio.flume.source.SQLSource # mysql地址 a1.sources.src-1.hibernate.connection.url = jdbc:mysql://192.168.11.38:13306/ccb_yiqian # Hibernate Database connection properties #数据库账号 a1.sources.src-1.hibernate.connection.user = root #数据库密码 a1.sources.src-1.hibernate.connection.password = jinbill #是否自动提交 a1.sources.src-1.hibernate.connection.autocommit = true a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver #查询间隔 a1.sources.src-1.run.query.delay=100000000 #输出路径 a1.sources.src-1.status.file.path = /home/mysql/flume/apache-flume-1.9.0-bin #输出文件名称 a1.sources.src-1.status.file.name = sqlSource.status # Custom query #从哪里开始读取数据传输 a1.sources.src-1.start.from = 0 #SQL--传什么写什么 a1.sources.src-1.custom.query = SELECT * from ticket_back_assign #批量发送数据量 应该是source 发送到 channel a1.sources.src-1.batch.size = 1000 #最大查询行数 a1.sources.src-1.max.rows = 100000 a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider a1.sources.src-1.hibernate.c3p0.min_size=1 a1.sources.src-1.hibernate.c3p0.max_size=10 #分割符 a1.sources.sqlSource.delimiter.entry = | ################################################################ a1.channels.ch-1.type = memory a1.channels.ch-1.capacity = 1000000 a1.channels.ch-1.transactionCapacity = 1000000 a1.channels.ch-1.byteCapacityBufferPercentage = 20 #a1.channels.ch-1.byteCapacity = 1000000 ################################################################ a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink #要传输的topic a1.sinks.k1.topic = test1 #broker地址 a1.sinks.k1.brokerList = 192.168.11.38:19092 #ack模式选择 -1.0,1 a1.sinks.k1.requiredAcks = 1 #批量发送数据量 应该是sink发送到 kafka a1.sinks.k1.batchSize = 200 a1.sinks.k1.channel = c1 a1.sinks.k1.channel = ch-1 a1.sources.src-1.channels=ch-1 ~
启动
bin/flume-ng agent -n a1 -c conf -f conf/mysql-flume.conf -Dflume.root.logger=INFO,console
注意事项
1.kafka producer 报错内存不够
./kafka-topics.sh --zookeeper localhost:2181 --alter --topic test1 --config max.message.bytes=4096000 replica.fetch.max.bytes=4096000
2.flume 报错内存不够
注意这几个参数
a1.channels.ch-1.capacity = 1000000 a1.channels.ch-1.transactionCapacity = 1000000 a1.sources.src-1.batch.size = 1000 a1.sources.src-1.max.rows = 100000