各位大佬,请问一下按照一下配置,FLUME集群将MYSQL数据写入到kafka集群(MYSQL-->FLUME集群-->KAFKA集群,数据可以进入kafka的TOPIC,但是出现了5条重复数据。
FLUME集群设计,AGENT1/AGENT2/AGENG3对应HOST1/HOST2/HOST3,collector1/collector2对应HOST1/HOST2,配置文件如下:
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1 k2
agent1.sources.r1.type = org.keedio.flume.source.SQLSource
agent1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
agent1.sources.r1.hibernate.connection.user = root
agent1.sources.r1.hibernate.connection.password = 07710714
agent1.sources.r1.table = flink_kafka_table
agent1.sources.r1.hibernate.connection.autocommit = true
agent1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
agent1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
agent1.sources.r1.status.file.name = sqlSource.status
agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.topic = flumekafkacluster
agent1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
agent1.sinks.k1.requiredAcks = 1
agent1.sinks.k1.batchSize = 20
agent1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k2.topic = flumekafkacluster
agent1.sinks.k2.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
agent1.sinks.k2.requiredAcks = 1
agent1.sinks.k2.batchSize = 20
agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = k1 k2
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c1
collector1.sources = r1
collector1.sinks = k1
collector1.channels = c1
collector1.sources.r1.type = org.keedio.flume.source.SQLSource
collector1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
collector1.sources.r1.hibernate.connection.user = root
collector1.sources.r1.hibernate.connection.password = 07710714
collector1.sources.r1.table = flink_kafka_table
collector1.sources.r1.hibernate.connection.autocommit = true
collector1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
collector1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
collector1.sources.r1.status.file.name = sqlSource.status
collector1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
collector1.sinks.k1.topic = flumekafkacluster
collector1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
collector1.sinks.k1.requiredAcks = 1
collector1.sinks.k1.batchSize = 20
collector1.channels.c1.type = memory
collector1.channels.c1.capacity = 1000
collector1.channels.c1.transactionCapacity = 100
collector1.sources.r1.channels = c1
collector1.sinks.k1.channel = c1
然后在HOST1/HOST2/HOST3执行
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/config/client.conf \
--name agent1
在HOST1/HOST2执行
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/config/colletor.conf \
--name collector1
执行完成后在MySQL写入数据作为测试,发现在TOPIC的flumekafkacluster上出现了5条重复数据,
请各位大佬解惑,配置文件是否写错了,应该如何写?
出现数据重复的原因可能是多方面的,需要对系统进行逐步排查和调试。以下是一些可能导致数据重复的原因和排查方法,供您参考:
数据源导致的重复:首先需要确认数据源是否存在重复的数据,可以使用SQL查询或者其他方式对数据源进行检查和去重,以确保数据的唯一性。
FLUME配置错误:可能是因为FLUME的配置不正确导致了数据重复,可以对FLUME的配置文件进行检查和调整,尤其是source和channel的配置,确保数据的唯一性和一致性。
KAFKA配置错误:可能是因为KAFKA的配置不正确导致了数据重复,可以对KAFKA的配置文件进行检查和调整,尤其是partition和replication的配置,确保数据的分布和复制策略正确。
网络延迟或故障:可能是因为网络延迟或故障导致了数据重复,在此情况下,可以对网络进行检查和优化,尤其是对于跨机房或跨地域的网络传输,需要进行特别关注和调整。
数据处理逻辑错误:可能是因为数据处理逻辑存在错误或漏洞导致了数据重复,可以对数据处理逻辑进行检查和优化,尤其是对于数据的去重、聚合、分组等操作,需要进行特别关注和调试
出现重复数据的问题在使用Flume 1.9集群采集MySQL到Kafka时可能有多个原因,以下是一些常见的排查步骤和可能的解决方案:
增加事务支持:确保Flume的Kafka Sink配置中启用了事务支持(transactional.mode = transactional),并配合正确的Kafka版本。这可以帮助确保数据在发送到Kafka时具有幂等性,避免重复写入。
检查Flume Agent配置:确认Flume Agent配置中是否存在多个相同的数据流从MySQL读取数据,并将其发送到Kafka。请确保只有一个Agent配置来处理MySQL数据源并将其发送到Kafka主题。
检查Flume Channel配置:在Flume Agent的Channel配置中,验证是否使用可靠的存储机制(例如使用文件或内存持久化通道)。这可以帮助确保数据在发生故障或重新启动时不会丢失。
检查Kafka配置:检查Kafka的配置,确保分区设置适当,并且没有其他消费者在相同的消费组中读取相同的主题。如果使用相同的消费组进行数据消费,可能会导致重复消费相同的消息。
数据去重处理:如果以上步骤无法解决重复数据问题,你可以考虑在数据处理过程中添加去重逻辑。例如,在Flume的自定义拦截器中记录已处理的数据ID或使用唯一键进行判断,以防止重复写入。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。