开发者社区> 问答> 正文

如何配置FLUME集群将MYSQL数据写入到kafka集群?

已解决

各位大佬好!请教如何配置FLUME(1.9)集群将MYSQL数据写入到kafka(2.12-2.5)集群(MYSQL-->FLUME集群(未配置完成)-->KAFKA集群(已经配置完成)),以下是找到的配置资料,还不完整,请帮完成。
FLUME集群设计,AGENT1/AGENT2/AGENG3对应HOST1/HOST2/HOST3
collector1/collector2对应HOST1/HOST2

#########client.conf

FLUME CLUSTER CONFIG

Name the components on this agent,a1为agent名字

列出agent1的组件,sinks有两个,分别去到collector1和collector2

agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1 k2

Describe/configure the source MYSQL

a1.sources.r1.type = org.keedio.flume.source.SQLSource
a1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
a1.sources.r1.hibernate.connection.user = root
a1.sources.r1.hibernate.connection.password = 07710714
a1.sources.r1.table = flink_kafka_table
a1.sources.r1.hibernate.connection.autocommit = true
a1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
a1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
a1.sources.r1.status.file.name = sqlSource.status

Describe the sink1

如何配置???

Describe the sink2

如何配置???

创建sink groups,将多个sinks绑定为一个组

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

failover模式,只有collector1工作。仅当collector1挂了后,collector2才能启动服务。

a1.sinkgroups.g1.processor.type = failover

值越大,优先级越高,collector1优先级最高

a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 1

发生异常的sink最大故障转移时间(毫秒),这里设为10秒

a1.sinkgroups.g1.processor.maxpenalty = 10000

Use a channel which buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

以上配置了三个组件,下面进行连接

Bind the source and sink to the channel

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

#########colletor.conf

Name the components on this agent,collector1为agent名字

可以有多个sources、sinks、channels

collector1.sources = r1
collector1.sinks = k1
collector1.channels = c1

Describe/configure the source MYSQL

如何配置???

Describe the sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = flumekafkacluster
a1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

Use a channel which buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

以上配置了三个组件,下面进行连接

Bind the source and sink to the channel

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

其中client.conf在三台服务器 colletor.conf在二台服务器
网上资料在client.conf的sink和 colletor.conf的source用avro /hadoop这个在我当前场景好像用不到,请教各位大佬,应该如何配置

展开
收起
侠客张 2023-07-10 20:02:13 96 0
4 条回答
写回答
取消 提交回答
  • 值得去的地方都没有捷径
    采纳回答

    你好!根据你提供的配置信息,以下是将 MySQL 数据写入 Kafka 的 FLUME 集群配置的完整示例。

    首先,是 client.conf 的配置:

    # FLUME CLUSTER CONFIG
    # Name the components on this agent
    # 列出 agent1 的组件,sinks 有两个,分别去到 collector1 和 collector2
    agent1.sources = r1
    agent1.channels = c1
    agent1.sinks = k1 k2
    
    # Describe/configure the source MYSQL
    agent1.sources.r1.type = org.keedio.flume.source.SQLSource
    agent1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
    agent1.sources.r1.hibernate.connection.user = root
    agent1.sources.r1.hibernate.connection.password = 07710714
    agent1.sources.r1.table = flink_kafka_table
    agent1.sources.r1.hibernate.connection.autocommit = true
    agent1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
    agent1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
    agent1.sources.r1.status.file.name = sqlSource.status
    
    # Describe the sink1
    agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    agent1.sinks.k1.topic = flumekafkacluster
    agent1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
    agent1.sinks.k1.requiredAcks = 1
    agent1.sinks.k1.batchSize = 20
    
    # Describe the sink2
    agent1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    agent1.sinks.k2.topic = flumekafkacluster
    agent1.sinks.k2.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
    agent1.sinks.k2.requiredAcks = 1
    agent1.sinks.k2.batchSize = 20
    
    # Create sink groups, 将多个 sinks 绑定为一个组
    agent1.sinkgroups = g1
    agent1.sinkgroups.g1.sinks = k1 k2
    agent1.sinkgroups.g1.processor.type = load_balance
    agent1.sinkgroups.g1.processor.backoff = true
    agent1.sinkgroups.g1.processor.selector = round_robin
    agent1.sinkgroups.g1.processor.selector.maxTimeOut = 10000
    
    # Failover 模式,只有 collector1 工作。仅当 collector1 挂了后,collector2 才能启动服务。
    agent1.sinkgroups.g1.processor.type = failover
    
    # 值越大,优先级越高,collector1 优先级最高
    agent1.sinkgroups.g1.processor.priority.k1 = 10
    agent1.sinkgroups.g1.processor.priority.k2 = 1
    
    # 发生异常的 sink 最大故障转移时间(毫秒),这里设为10秒
    agent1.sinkgroups.g1.processor.maxpenalty = 10000
    
    # Use a channel which buffers events in memory
    agent1.channels.c1.type = memory
    agent1.channels.c1.capacity = 1000
    agent1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    agent1.sources.r1.channels = c1
    agent1.sinks.k1.channel = c1
    agent1.sinks.k2.channel = c1
    

    接下来是 collector.conf 的配置:

    # Name the components on this agent, collector1 为 agent 名字
    # 可以有多个 sources、sinks、channels
    collector1.sources = r1
    collector1.sinks = k1
    collector1.channels = c1
    
    # Describe/configure the source MYSQL
    collector1.sources.r1.type = org.keedio.flume.source.SQLSource
    collector1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
    collector1.sources.r1.hibernate.connection.user = root
    collector1.sources.r1.hibernate.connection.password = 07710714
    collector1.sources.r1.table = flink_kafka_table
    collector1.sources.r1.hibernate.connection.autocommit = true
    collector1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
    collector1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
    collector1.sources.r1.status.file.name = sqlSource.status
    
    # Describe the sink
    collector1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    collector1.sinks.k1.topic = flumekafkacluster
    collector1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
    collector1.sinks.k1.requiredAcks = 1
    collector1.sinks.k1.batchSize = 20
    
    # Use a channel which buffers events in memory
    collector1.channels.c1.type = memory
    collector1.channels.c1.capacity = 1000
    collector1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    collector1.sources.r1.channels = c1
    collector1.sinks.k1.channel = c1
    

    以上配置可以实现将 MySQL 数据写入 Kafka,其中 client.conf 在三台服务器上配置,collector.conf 在两台服务器上配置。根据你提供的信息,已经完成了相应的配置,如果有需要,可以根据实际情况进行修改。希望这对你有所帮助!如果还有其他问题,请随时提问。

    2023-07-10 21:18:21
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    在MYSQL数据库中创建一个表,并确保您拥有读取该表的权限。

    在每个Flume Agent所在的主机上安装Flume,并创建一个Flume配置文件。配置文件应包含以下信息:

    Source:使用JDBC Source,配置JDBC连接信息、查询语句、数据类型和字段映射等。

    Channel:使用Memory Channel,配置Channel容量和事务等。

    Sink:使用Kafka Sink,配置Kafka连接信息、Topic和数据序列化等。在Sink中,您可以使用Kafka的Producer API将数据写入到Kafka集群中。

    在每个Flume Agent所在的主机上启动Flume Agent,并指定相应的配置文件。例如,在HOST1上启动Agent1:

    Copy
    $ flume-ng agent -n Agent1 -c conf -f /path/to/flume.conf -Dflume.root.logger=INFO,console
    ```

    在Kafka集群中创建一个Topic,并确保您拥有写入该Topic的权限。

    在每个Collector所在的主机上安装Kafka,并创建一个Kafka配置文件。配置文件应包含以下信息:

    Producer:使用Kafka Producer,配置Kafka连接信息和Topic等。
    在每个Collector所在的主机上启动Kafka Producer,并指定相应的配置文件。例如,在HOST1上启动Collector1:

    $ kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic mytopic
    启动MySQL数据库,并向该表中插入一些数据。

    检查Kafka集群中的Topic是否接收到了相应的数据。

    2023-07-11 07:51:36
    赞同 展开评论 打赏
  • 在你提供的配置中,缺少关于源和第二个 sink 的具体配置。下面是完整配置的示例:

    ###client.conf(放置在三台服务器上)

    FLUME CLUSTER CONFIG
    agent1.sources = r1
    agent1.channels = c1
    agent1.sinks = k1 k2

    Describe/configure the source MYSQL
    agent1.sources.r1.type = org.keedio.flume.source.SQLSource
    agent1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
    agent1.sources.r1.hibernate.connection.user = root
    agent1.sources.r1.hibernate.connection.password = 07710714
    agent1.sources.r1.table = flink_kafka_table
    agent1.sources.r1.hibernate.connection.autocommit = true
    agent1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
    agent1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
    agent1.sources.r1.status.file.name = sqlSource.status

    Describe the sink1
    agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    agent1.sinks.k1.topic = flumekafkacluster
    agent1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
    agent1.sinks.k1.requiredAcks = 1
    agent1.sinks.k1.batchSize = 20

    Describe the sink2
    agent1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    agent1.sinks.k2.topic = flumekafkacluster
    agent1.sinks.k2.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
    agent1.sinks.k2.requiredAcks = 1
    agent1.sinks.k2.batchSize = 20

    创建sink groups,将多个sinks绑定为一个组
    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinkgroups.g1.processor.type = load_balance
    a1.sinkgroups.g1.processor.backoff = true
    a1.sinkgroups.g1.processor.selector = round_robin
    a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

    failover模式,只有collector1工作。仅当collector1挂了后,collector2才能启动服务。
    a1.sinkgroups.g1.processor.type = failover

    值越大,优先级越高,collector1优先级最高
    a1.sinkgroups.g1.processor.priority.k1 = 10
    a1.sinkgroups.g1.processor.priority.k2 = 1

    发生异常的sink最大故障转移时间(毫秒),这里设为10秒
    a1.sinkgroups.g1.processor.maxpenalty = 10000

    使用一个内存缓冲事件的通道
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    以上配置了三个组件,下面进行连接
    将源和 sink 绑定到通道
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1

    ###colletor.conf(放置在两台服务器上)

    Name the components on this agent,collector1为agent名字
    可以有多个sources、sinks、channels
    collector1.sources = r1
    collector1.sinks = k1
    collector1.channels = c1

    Describe/configure the source MYSQL
    collector1.sources.r1.type = org.keedio.flume.source.SQLSource
    collector1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
    collector1.sources.r1.hibernate.connection.user = root
    collector1.sources.r1.hibernate.connection.password = 07710714
    collector1.sources.r1.table = flink_kafka_table
    collector1.sources.r1.hibernate.connection.autocommit = true
    collector1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
    collector1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
    collector1.sources.r1.status.file.name = sqlSource.status

    Describe the sink
    collector1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    collector1.sinks.k1.topic = flumekafkacluster
    collector1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
    collector1.sinks.k1.requiredAcks = 1
    collector1.sinks.k1.batchSize = 20

    Use a channel which buffers events in memory
    collector1.channels.c1.type = memory
    collector1.channels.c1.capacity = 1000
    collector1.channels.c1.transactionCapacity = 100

    以上配置了三个组件,下面进行连接
    将源和 sink 绑定到通道
    collector1.sources.r1.channels = c1
    collector1.sinks.k1.channel = c1

    请使用上述配置完成你的 FLUME 集群,将 MySQL 数据写入到 Kafka 集群中。记得在每台服务器上根据对应的主机名进行配置。

    2023-07-10 22:19:08
    赞同 展开评论 打赏
  • 要配置一个FLUME集群将MySQL数据写入到Kafka集群,您可以按照以下步骤进行操作:

    1. 安装和配置Kafka集群:首先,确保您已经安装和配置了可用的Kafka集群。请参考Kafka的官方文档或相关教程来完成该步骤。

    2. 安装和配置FLUME:在每个要运行FLUME代理的节点上进行安装和配置。您可以从Apache FLUME的官方网站下载FLUME,并按照文档中提供的指南进行安装和基本配置。

    3. 创建FLUME配置文件:创建FLUME的配置文件(例如flume.conf),并配置数据源、通道和目标。在该配置文件中,您需要为数据源指定MySQL的连接信息(主机、端口、数据库名等),并配置Kafka的连接信息(Bootstrap Servers、Topic等)。

      以下是一个示例配置文件的部分内容:

      # source: MySQL
      agent.sources = jdbc_source
      agent.sources.jdbc_source.type = org.apache.flume.source.jdbc.JdbcSource
      agent.sources.jdbc_source.driver = com.mysql.jdbc.Driver
      agent.sources.jdbc_source.url = jdbc:mysql://mysql_host:3306/db_name
      agent.sources.jdbc_source.user = mysql_user
      agent.sources.jdbc_source.password = mysql_password
      agent.sources.jdbc_source.query = SELECT * FROM table_name
      
      # channel: memory
      agent.channels = memory_channel
      agent.channels.memory_channel.type = memory
      
      # sink: Kafka
      agent.sinks = kafka_sink
      agent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
      agent.sinks.kafka_sink.bootstrap.servers = kafka_host1:9092,kafka_host2:9092
      agent.sinks.kafka_sink.topic = kafka_topic
      
      # bind source, channel, and sink together
      agent.sources.jdbc_source.channels = memory_channel
      agent.sinks.kafka_sink.channel = memory_channel
      
    4. 启动FLUME代理:在每个节点上启动FLUME代理。您可以使用以下命令来启动FLUME:

      flume-ng agent --conf /path/to/flume/conf --conf-file /path/to/flume.conf --name agent_name -Dflume.root.logger=INFO,console
      

      /path/to/flume/conf 替换为FLUME的配置文件目录,将 /path/to/flume.conf 替换为您创建的FLUME配置文件路径,将 agent_name 替换为代理的名称。

    5. 监测和管理FLUME集群:一旦FLUME代理启动,您可以监测和管理它们以确保数据正确地从MySQL写入到Kafka中。这可能涉及查看日志、指标和性能统计信息,以及调整配置文件或添加更多的FLUME代理节点等。

    2023-07-10 20:57:43
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
搭建电商项目架构连接MySQL 立即下载
搭建4层电商项目架构,实战连接MySQL 立即下载
PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

相关镜像