各位大佬,请问一下按照一下配置,FLUME集群将MYSQL数据写入到kafka集群(MYSQL-->FLUME集群-->KAFKA集群,数据可以进入kafka的TOPIC,但是出现了5条重复数据。
FLUME集群设计,AGENT1/AGENT2/AGENG3对应HOST1/HOST2/HOST3,collector1/collector2对应HOST1/HOST2,配置文件如下:
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1 k2
agent1.sources.r1.type = org.keedio.flume.source.SQLSource
agent1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
agent1.sources.r1.hibernate.connection.user = root
agent1.sources.r1.hibernate.connection.password = 07710714
agent1.sources.r1.table = flink_kafka_table
agent1.sources.r1.hibernate.connection.autocommit = true
agent1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
agent1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
agent1.sources.r1.status.file.name = sqlSource.status
agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.topic = flumekafkacluster
agent1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
agent1.sinks.k1.requiredAcks = 1
agent1.sinks.k1.batchSize = 20
agent1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k2.topic = flumekafkacluster
agent1.sinks.k2.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
agent1.sinks.k2.requiredAcks = 1
agent1.sinks.k2.batchSize = 20
agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = k1 k2
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c1
collector1.sources = r1
collector1.sinks = k1
collector1.channels = c1
collector1.sources.r1.type = org.keedio.flume.source.SQLSource
collector1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
collector1.sources.r1.hibernate.connection.user = root
collector1.sources.r1.hibernate.connection.password = 07710714
collector1.sources.r1.table = flink_kafka_table
collector1.sources.r1.hibernate.connection.autocommit = true
collector1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
collector1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
collector1.sources.r1.status.file.name = sqlSource.status
collector1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
collector1.sinks.k1.topic = flumekafkacluster
collector1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
collector1.sinks.k1.requiredAcks = 1
collector1.sinks.k1.batchSize = 20
collector1.channels.c1.type = memory
collector1.channels.c1.capacity = 1000
collector1.channels.c1.transactionCapacity = 100
collector1.sources.r1.channels = c1
collector1.sinks.k1.channel = c1
然后在HOST1/HOST2/HOST3执行
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/config/client.conf \
--name agent1
在HOST1/HOST2执行
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/config/colletor.conf \
--name collector1
执行完成后在MySQL写入数据作为测试,发现在TOPIC的flumekafkacluster上出现了5条重复数据,
请各位大佬解惑,配置文件是否写错了,应该如何写?
出现这个现象好像是我启动了3个agent1,2个collector1,他们都独立采集数据了
在FLUME集群中,AGENT1、AGENT2和AGENG3分别运行在HOST1、Host2和Host3上,COLLECTOR1和COLLECTOR2分别运行在Host1和Host2上。以下是可能引起重复数据问题的几种情况和解决方法:
数据源问题:可能是由于MySQL数据源本身存在重复数据。可以在MySQL数据源端添加去重逻辑,确保传入FLUME的数据没有重复。
FLUME Agent问题:如果每个Agent都从同一个MySQL数据源获取数据,并且使用相同的参数配置,那么可能会出现重复数据。解决方法是修改每个Agent的参数配置,使其具有不同的源和/或通道配置。
KAFKA Sink问题:如果在KAFKA Sink中未正确配置分区器(Partitioner)或消息的键(Key)具有重复值,则可能会导致相同消息被写入同一分区中的多个分区。检查KAFKA Sink的配置,确保正确配置了分区器或键。
网络问题:如果网络存在重复数据,例如在传输过程中发生了数据包复制或网络延迟,则也可能导致重复数据。检查网络连接和传输设置,确保数据传输过程中没有重复数据。
您可以根据上述建议检查并排除可能的问题,从而解决重复数据的问题。
在使用Flume集群采集MySQL数据到Kafka集群时出现重复数据的问题可能有多种原因。以下是一些常见的排查步骤和解决方法:
查看Flume配置:检查Flume配置文件中是否存在重复的source或sink节点。确保每个Flume实例只有一个source和一个sink。
检查Flume拦截器配置:如果在Flume配置中启用了拦截器,确保拦截器不会导致重复数据。拦截器可能会影响事件的唯一标识符或消息内容。
检查Kafka的分区设置:确认Kafka主题的分区数是否正确设置。如果分区数过少,可能会导致数据写入同一分区而造成重复数据。
检查Flume事务设置:在Flume配置中,如果开启了事务(transactional)模式,确保事务的设置正确。如果设置不当,可能会导致数据重复写入Kafka。
检查MySQL的主键或唯一索引:确保MySQL表中定义了适当的主键或唯一索引。这可以帮助避免重复数据被写入到Kafka中。
调整Flume和Kafka的批处理设置:尝试调整Flume和Kafka的批处理相关的参数,例如每次提交的事件数量、等待时间等。适当的批处理设置可能有助于减少重复数据的发生。
检查MySQL binlog日志:如果采用了MySQL的binlog来实现实时数据同步,可以检查binlog日志是否有异常。例如,可能存在因为网络问题或配置错误导致的重复写入binlog的情况。
FLUME Source配置有问题
最普遍的问题是没有正确指定唯一主键来避免重复。
比如MySQL Source配置应该是:
properties
Copy
selector.columns= ID
selector.type=column
但这部分却缺少或配置错误,导致无法正确识别重复数据。
FLUME Agent配置时间重叠
若多个FLUME Agent配置了相同的采集时间范围,就会造成重复。
应确保每个Agent配置不同的开始时间,避免重叠。
Kafka Sink配置不当
Kafka Sink中没有指定唯一主键(requied.acks),导致前面的去重无效。
应配置:
properties
Copy
kafka.required.acks = 1
FLUME配置不一致
当使用多台FLUME Agent时,各个agent的配置必须完全一致。否者也会产生重复。
Kafka端幂等性设置不当
部分情况下需要在Kafka端进行幂等处理,确保不产生重复消息。
以上这些都可能造成FLUME到KAFKA中出现重复数据。
解决方案主要就是:
Source/Sink端正确指定主键,并去重
不同Agent采集不同时间范围的数据
保持所有FLUME Agent配置完全一致
在Kafka端开启幂等性配置(需要kafka 0.11以上版本)
根据您提供的配置文件,您的 FLUME 集群是将 MYSQL 数据写入到 Kafka 集群中的。但是您提到在数据写入 Kafka 的 TOPIC(flumekafkacluster)时出现了 5 条重复数据。根据您的描述,您启动了 3 个 agent1 和 2 个 collector1,这可能导致了数据的重复。
在您的配置文件中,您的 agent1 和 collector1 使用了相同的 source(r1)和 sink(k1),这会导致数据被复制到多个 sink 中,从而引起重复数据。您可以考虑对配置进行调整,确保每个 agent1 和 collector1 使用独立的 source 和 sink。
另外,您的配置中使用了 sinkgroups(g1),但是您没有在 collector1 的配置文件中定义 sinkgroups,这可能会导致配置不一致。建议您在 collector1 的配置文件中添加相应的 sinkgroups 配置,确保与 agent1 的配置保持一致。
此外,还要注意确保每个 agent1 和 collector1 运行在不同的主机上,以避免重复数据。
最后,您可能还需要检查 MySQL 数据库中是否存在数据冗余或重复插入的情况。您可以通过查看 MySQL 数据库的日志或使用 SQL 查询来确认数据是否重复插入。
总结来说,解决重复数据问题的步骤如下:
希望这些步骤能帮助您解决重复数据的问题!如果还有其他疑问,请随时提问。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。