开发者社区> 问答> 正文

FLUME1.9集群采集mysql到kafka出现重复数据

各位大佬,请问一下按照一下配置,FLUME集群将MYSQL数据写入到kafka集群(MYSQL-->FLUME集群-->KAFKA集群,数据可以进入kafka的TOPIC,但是出现了5条重复数据。
FLUME集群设计,AGENT1/AGENT2/AGENG3对应HOST1/HOST2/HOST3,collector1/collector2对应HOST1/HOST2,配置文件如下:

client.conf

FLUME CLUSTER CONFIG

agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1 k2

Describe/configure the source MYSQL

agent1.sources.r1.type = org.keedio.flume.source.SQLSource
agent1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
agent1.sources.r1.hibernate.connection.user = root
agent1.sources.r1.hibernate.connection.password = 07710714
agent1.sources.r1.table = flink_kafka_table
agent1.sources.r1.hibernate.connection.autocommit = true
agent1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
agent1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
agent1.sources.r1.status.file.name = sqlSource.status

Describe the sink1

agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.topic = flumekafkacluster
agent1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
agent1.sinks.k1.requiredAcks = 1
agent1.sinks.k1.batchSize = 20

Describe the sink2

agent1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k2.topic = flumekafkacluster
agent1.sinks.k2.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
agent1.sinks.k2.requiredAcks = 1
agent1.sinks.k2.batchSize = 20

创建sink groups,将多个sinks绑定为一个组

agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = k1 k2
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000

failover模式,只有collector1工作。仅当collector1挂了后,collector2才能启动服务。

agent1.sinkgroups.g1.processor.type = failover

值越大,优先级越高,collector1优先级最高

agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1

发生异常的sink最大故障转移时间(毫秒),这里设为10秒

agent1.sinkgroups.g1.processor.maxpenalty = 10000

使用一个内存缓冲事件的通道

agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

以上配置了三个组件,下面进行连接

将源和 sink 绑定到通道

agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c1

###colletor.conf

可以有多个sources、sinks、channels

collector1.sources = r1
collector1.sinks = k1
collector1.channels = c1

Describe/configure the source MYSQL

collector1.sources.r1.type = org.keedio.flume.source.SQLSource
collector1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
collector1.sources.r1.hibernate.connection.user = root
collector1.sources.r1.hibernate.connection.password = 07710714
collector1.sources.r1.table = flink_kafka_table
collector1.sources.r1.hibernate.connection.autocommit = true
collector1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
collector1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
collector1.sources.r1.status.file.name = sqlSource.status

Describe the sink

collector1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
collector1.sinks.k1.topic = flumekafkacluster
collector1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
collector1.sinks.k1.requiredAcks = 1
collector1.sinks.k1.batchSize = 20

Use a channel which buffers events in memory

collector1.channels.c1.type = memory
collector1.channels.c1.capacity = 1000
collector1.channels.c1.transactionCapacity = 100

以上配置了三个组件,下面进行连接

将源和 sink 绑定到通道

collector1.sources.r1.channels = c1
collector1.sinks.k1.channel = c1

然后在HOST1/HOST2/HOST3执行
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/config/client.conf \
--name agent1

在HOST1/HOST2执行
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/config/colletor.conf \
--name collector1

执行完成后在MySQL写入数据作为测试,发现在TOPIC的flumekafkacluster上出现了5条重复数据,
请各位大佬解惑,配置文件是否写错了,应该如何写?

展开
收起
侠客张 2023-07-23 17:38:25 139 0
2 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    出现数据重复的原因可能是多方面的,需要对系统进行逐步排查和调试。以下是一些可能导致数据重复的原因和排查方法,供您参考:

    数据源导致的重复:首先需要确认数据源是否存在重复的数据,可以使用SQL查询或者其他方式对数据源进行检查和去重,以确保数据的唯一性。

    FLUME配置错误:可能是因为FLUME的配置不正确导致了数据重复,可以对FLUME的配置文件进行检查和调整,尤其是source和channel的配置,确保数据的唯一性和一致性。

    KAFKA配置错误:可能是因为KAFKA的配置不正确导致了数据重复,可以对KAFKA的配置文件进行检查和调整,尤其是partition和replication的配置,确保数据的分布和复制策略正确。

    网络延迟或故障:可能是因为网络延迟或故障导致了数据重复,在此情况下,可以对网络进行检查和优化,尤其是对于跨机房或跨地域的网络传输,需要进行特别关注和调整。

    数据处理逻辑错误:可能是因为数据处理逻辑存在错误或漏洞导致了数据重复,可以对数据处理逻辑进行检查和优化,尤其是对于数据的去重、聚合、分组等操作,需要进行特别关注和调试

    2023-07-27 20:25:17
    赞同 展开评论 打赏
  • 出现重复数据的问题在使用Flume 1.9集群采集MySQL到Kafka时可能有多个原因,以下是一些常见的排查步骤和可能的解决方案:

    1. 增加事务支持:确保Flume的Kafka Sink配置中启用了事务支持(transactional.mode = transactional),并配合正确的Kafka版本。这可以帮助确保数据在发送到Kafka时具有幂等性,避免重复写入。

    2. 检查Flume Agent配置:确认Flume Agent配置中是否存在多个相同的数据流从MySQL读取数据,并将其发送到Kafka。请确保只有一个Agent配置来处理MySQL数据源并将其发送到Kafka主题。

    3. 检查Flume Channel配置:在Flume Agent的Channel配置中,验证是否使用可靠的存储机制(例如使用文件或内存持久化通道)。这可以帮助确保数据在发生故障或重新启动时不会丢失。

    4. 检查Kafka配置:检查Kafka的配置,确保分区设置适当,并且没有其他消费者在相同的消费组中读取相同的主题。如果使用相同的消费组进行数据消费,可能会导致重复消费相同的消息。

    5. 数据去重处理:如果以上步骤无法解决重复数据问题,你可以考虑在数据处理过程中添加去重逻辑。例如,在Flume的自定义拦截器中记录已处理的数据ID或使用唯一键进行判断,以防止重复写入。

    2023-07-23 21:14:48
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载

相关镜像