开发者社区 问答 正文

如何让数据库和topic一一对应

我想让每个数据库都投递到和他名字一样的topic里,该如何设置,wiki我看了,不是很理解, 也做过很多尝试性的调整,都没有用,能否告知如何设置,谢谢 比如我数据库名字是A,B,C,topic名字也创建了A,B,C,如何配置

原提问者GitHub用户ZSH9053

展开
收起
古拉古拉 2023-05-08 12:19:34 100 分享 版权
2 条回答
写回答
取消 提交回答
  • 在instance.properties里面配置 同步库配置 canal.instance.filter.regex=A\..,B\..,C\..*

    消息队列配置 使用正则路由schema和table 格式topic:schema.table,topic:schema.table,topic:schema.table canal.mq.dynamicTopic=topic_A:A,topic_B:B,topic_C:C 最好配置一个canal.mq.topic作为默认的topic,在实践中发现,有一些mysql schema的binlog也会读进来(建表语句,grant语句等),如果没有这个默认的topic,会报找不到分区的错误,从而导致canal停止写入。 canal.mq.topic=a_default_topic

    分区设置

    如果每个topic只有一个分区设置如下 canal.mq.partition=0 如果每个topic有多个分区,且希望按照表名做hash进行如下配置

    hash partition config

    canal.mq.partitionsNum=3 canal.mq.partitionHash=.\..

    原回答者GitHub用户chenxyz707

    2023-05-09 17:36:49
    赞同 展开评论
  • 你可以在 Canal 的配置文件 canal.properties 中设置实例对应的数据库和topic映射关系。

    可以在 canal.instance.instances 配置项中增加实例,例如:

    canal.instance.instances = instance1,instance2
    

    然后在 canal.instance.instance1 和 canal.instance.instance2 分别配置对应的实例信息,例如:

    ## instance1
    canal.instance.instance1.alter.table = true
    canal.instance.instance1.enable.druid = false
    canal.instance.instance1.enable.gtid = false
    canal.instance.instance1.filter.regex = .*\\..*
    canal.instance.instance1.filter.black.regex = 
    canal.instance.instance1.master.address = 127.0.0.1:3306
    canal.instance.instance1.master.gtid = 
    canal.instance.instance1.master.journal.name = 
    canal.instance.instance1.master.position = 
    canal.instance.instance1.master.timestamp = 
    canal.instance.instance1.position.tsdb.jdbc.driver = com.mysql.jdbc.Driver
    canal.instance.instance1.position.tsdb.jdbc.password = 123456
    canal.instance.instance1.position.tsdb.jdbc.url = jdbc:mysql://127.0.0.1:3306/canal_position?useUnicode=true
    canal.instance.instance1.position.tsdb.jdbc.username = canal
    canal.instance.instance1.position.meta.mode = mixed
    canal.instance.instance1.position.meta.store = mysql
    canal.instance.instance1.server.id = 1234
    canal.instance.instance1.slave.address = 
    canal.instance.instance1.storage.batch.mode = item
    canal.instance.instance1.storage.buffer.size = 256
    canal.instance.instance1.storage.dbBatchSize = 1000
    canal.instance.instance1.storage.es.batchSize = 1000
    canal.instance.instance1.storage.es.cluster.name = elasticsearch
    canal.instance.instance1.storage.es.index = canal
    canal.instance.instance1.storage.es.mapping = canal
    canal.instance.instance1.storage.es.mode = op
    canal.instance.instance1.storage.es.type = canal
    canal.instance.instance1.storage.mode = kafka
    canal.instance.instance1.storage.kafka.bootstrap.servers = 127.0.0.1:9092
    canal.instance.instance1.storage.kafka.retries = 0
    canal.instance.instance1.storage.kafka.topic = A
    canal.instance.instance1.zk.address = 127.0.0.1:2181
    canal.instance.instance1.zk.password = 
    canal.instance.instance1.zk.session.timeout = 6000
    canal.instance.instance1.zk.username = 
    

    可以看到,配置项 canal.instance.instance1.storage.kafka.topic 就是配置实例对应的topic名称,你只需要将对应的数据库名和topic名称保持一致即可。

    2023-05-08 16:06:36
    赞同 展开评论
问答分类:
问答地址: