开发者社区> 问答> 正文

FLUME集群采集mysql到kafka集群出现重复数据

各位大佬,请问一下按照一下配置,FLUME集群将MYSQL数据写入到kafka集群(MYSQL-->FLUME集群-->KAFKA集群,数据可以进入kafka的TOPIC,但是出现了5条重复数据。
FLUME集群设计,AGENT1/AGENT2/AGENG3对应HOST1/HOST2/HOST3,collector1/collector2对应HOST1/HOST2,配置文件如下:

client.conf

FLUME CLUSTER CONFIG

agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1 k2

Describe/configure the source MYSQL

agent1.sources.r1.type = org.keedio.flume.source.SQLSource
agent1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
agent1.sources.r1.hibernate.connection.user = root
agent1.sources.r1.hibernate.connection.password = 07710714
agent1.sources.r1.table = flink_kafka_table
agent1.sources.r1.hibernate.connection.autocommit = true
agent1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
agent1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
agent1.sources.r1.status.file.name = sqlSource.status

Describe the sink1

agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.topic = flumekafkacluster
agent1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
agent1.sinks.k1.requiredAcks = 1
agent1.sinks.k1.batchSize = 20

Describe the sink2

agent1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k2.topic = flumekafkacluster
agent1.sinks.k2.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
agent1.sinks.k2.requiredAcks = 1
agent1.sinks.k2.batchSize = 20

创建sink groups,将多个sinks绑定为一个组

agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = k1 k2
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000

failover模式,只有collector1工作。仅当collector1挂了后,collector2才能启动服务。

agent1.sinkgroups.g1.processor.type = failover

值越大,优先级越高,collector1优先级最高

agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1

发生异常的sink最大故障转移时间(毫秒),这里设为10秒

agent1.sinkgroups.g1.processor.maxpenalty = 10000

使用一个内存缓冲事件的通道

agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

以上配置了三个组件,下面进行连接

将源和 sink 绑定到通道

agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c1

###colletor.conf

可以有多个sources、sinks、channels

collector1.sources = r1
collector1.sinks = k1
collector1.channels = c1

Describe/configure the source MYSQL

collector1.sources.r1.type = org.keedio.flume.source.SQLSource
collector1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
collector1.sources.r1.hibernate.connection.user = root
collector1.sources.r1.hibernate.connection.password = 07710714
collector1.sources.r1.table = flink_kafka_table
collector1.sources.r1.hibernate.connection.autocommit = true
collector1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
collector1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
collector1.sources.r1.status.file.name = sqlSource.status

Describe the sink

collector1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
collector1.sinks.k1.topic = flumekafkacluster
collector1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
collector1.sinks.k1.requiredAcks = 1
collector1.sinks.k1.batchSize = 20

Use a channel which buffers events in memory

collector1.channels.c1.type = memory
collector1.channels.c1.capacity = 1000
collector1.channels.c1.transactionCapacity = 100

以上配置了三个组件,下面进行连接

将源和 sink 绑定到通道

collector1.sources.r1.channels = c1
collector1.sinks.k1.channel = c1

然后在HOST1/HOST2/HOST3执行
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/config/client.conf \
--name agent1

在HOST1/HOST2执行
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/config/colletor.conf \
--name collector1

执行完成后在MySQL写入数据作为测试,发现在TOPIC的flumekafkacluster上出现了5条重复数据,
请各位大佬解惑,配置文件是否写错了,应该如何写?
出现这个现象好像是我启动了3个agent1,2个collector1,他们都独立采集数据了

展开
收起
侠客张 2023-07-27 18:04:06 229 0
4 条回答
写回答
取消 提交回答
  • 在FLUME集群中,AGENT1、AGENT2和AGENG3分别运行在HOST1、Host2和Host3上,COLLECTOR1和COLLECTOR2分别运行在Host1和Host2上。以下是可能引起重复数据问题的几种情况和解决方法:

    数据源问题:可能是由于MySQL数据源本身存在重复数据。可以在MySQL数据源端添加去重逻辑,确保传入FLUME的数据没有重复。

    FLUME Agent问题:如果每个Agent都从同一个MySQL数据源获取数据,并且使用相同的参数配置,那么可能会出现重复数据。解决方法是修改每个Agent的参数配置,使其具有不同的源和/或通道配置。

    KAFKA Sink问题:如果在KAFKA Sink中未正确配置分区器(Partitioner)或消息的键(Key)具有重复值,则可能会导致相同消息被写入同一分区中的多个分区。检查KAFKA Sink的配置,确保正确配置了分区器或键。

    网络问题:如果网络存在重复数据,例如在传输过程中发生了数据包复制或网络延迟,则也可能导致重复数据。检查网络连接和传输设置,确保数据传输过程中没有重复数据。

    您可以根据上述建议检查并排除可能的问题,从而解决重复数据的问题。

    2023-07-28 08:03:44
    赞同 展开评论 打赏
  • 在使用Flume集群采集MySQL数据到Kafka集群时出现重复数据的问题可能有多种原因。以下是一些常见的排查步骤和解决方法:

    1. 查看Flume配置:检查Flume配置文件中是否存在重复的source或sink节点。确保每个Flume实例只有一个source和一个sink。

    2. 检查Flume拦截器配置:如果在Flume配置中启用了拦截器,确保拦截器不会导致重复数据。拦截器可能会影响事件的唯一标识符或消息内容。

    3. 检查Kafka的分区设置:确认Kafka主题的分区数是否正确设置。如果分区数过少,可能会导致数据写入同一分区而造成重复数据。

    4. 检查Flume事务设置:在Flume配置中,如果开启了事务(transactional)模式,确保事务的设置正确。如果设置不当,可能会导致数据重复写入Kafka。

    5. 检查MySQL的主键或唯一索引:确保MySQL表中定义了适当的主键或唯一索引。这可以帮助避免重复数据被写入到Kafka中。

    6. 调整Flume和Kafka的批处理设置:尝试调整Flume和Kafka的批处理相关的参数,例如每次提交的事件数量、等待时间等。适当的批处理设置可能有助于减少重复数据的发生。

    7. 检查MySQL binlog日志:如果采用了MySQL的binlog来实现实时数据同步,可以检查binlog日志是否有异常。例如,可能存在因为网络问题或配置错误导致的重复写入binlog的情况。

    2023-07-27 20:39:15
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    FLUME Source配置有问题
    最普遍的问题是没有正确指定唯一主键来避免重复。

    比如MySQL Source配置应该是:

    properties
    Copy

    指定主键避免重复

    selector.columns= ID
    selector.type=column
    但这部分却缺少或配置错误,导致无法正确识别重复数据。

    FLUME Agent配置时间重叠
    若多个FLUME Agent配置了相同的采集时间范围,就会造成重复。

    应确保每个Agent配置不同的开始时间,避免重叠。

    Kafka Sink配置不当
    Kafka Sink中没有指定唯一主键(requied.acks),导致前面的去重无效。

    应配置:

    properties
    Copy

    提交至Kafka前,先做一次去重

    kafka.required.acks = 1
    FLUME配置不一致
    当使用多台FLUME Agent时,各个agent的配置必须完全一致。否者也会产生重复。

    Kafka端幂等性设置不当
    部分情况下需要在Kafka端进行幂等处理,确保不产生重复消息。

    以上这些都可能造成FLUME到KAFKA中出现重复数据。

    解决方案主要就是:

    Source/Sink端正确指定主键,并去重
    不同Agent采集不同时间范围的数据
    保持所有FLUME Agent配置完全一致
    在Kafka端开启幂等性配置(需要kafka 0.11以上版本)

    2023-07-27 19:23:20
    赞同 展开评论 打赏
  • 值得去的地方都没有捷径

    根据您提供的配置文件,您的 FLUME 集群是将 MYSQL 数据写入到 Kafka 集群中的。但是您提到在数据写入 Kafka 的 TOPIC(flumekafkacluster)时出现了 5 条重复数据。根据您的描述,您启动了 3 个 agent1 和 2 个 collector1,这可能导致了数据的重复。

    在您的配置文件中,您的 agent1 和 collector1 使用了相同的 source(r1)和 sink(k1),这会导致数据被复制到多个 sink 中,从而引起重复数据。您可以考虑对配置进行调整,确保每个 agent1 和 collector1 使用独立的 source 和 sink。

    另外,您的配置中使用了 sinkgroups(g1),但是您没有在 collector1 的配置文件中定义 sinkgroups,这可能会导致配置不一致。建议您在 collector1 的配置文件中添加相应的 sinkgroups 配置,确保与 agent1 的配置保持一致。

    此外,还要注意确保每个 agent1 和 collector1 运行在不同的主机上,以避免重复数据。

    最后,您可能还需要检查 MySQL 数据库中是否存在数据冗余或重复插入的情况。您可以通过查看 MySQL 数据库的日志或使用 SQL 查询来确认数据是否重复插入。

    总结来说,解决重复数据问题的步骤如下:

    1. 确保每个 agent1 和 collector1 使用独立的 source 和 sink。
    2. 在 collector1 的配置文件中添加相应的 sinkgroups 配置,保持与 agent1 的配置一致。
    3. 确保每个 agent1 和 collector1 运行在不同的主机上,避免重复数据。
    4. 检查 MySQL 数据库中是否存在数据冗余或重复插入的情况。

    希望这些步骤能帮助您解决重复数据的问题!如果还有其他疑问,请随时提问。

    2023-07-27 19:07:31
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载

相关镜像