我想让每个数据库都投递到和他名字一样的topic里,该如何设置,wiki我看了,不是很理解, 也做过很多尝试性的调整,都没有用,能否告知如何设置,谢谢 比如我数据库名字是A,B,C,topic名字也创建了A,B,C,如何配置
原提问者GitHub用户ZSH9053
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在instance.properties里面配置 同步库配置 canal.instance.filter.regex=A\..,B\..,C\..*
消息队列配置 使用正则路由schema和table 格式topic:schema.table,topic:schema.table,topic:schema.table canal.mq.dynamicTopic=topic_A:A,topic_B:B,topic_C:C 最好配置一个canal.mq.topic作为默认的topic,在实践中发现,有一些mysql schema的binlog也会读进来(建表语句,grant语句等),如果没有这个默认的topic,会报找不到分区的错误,从而导致canal停止写入。 canal.mq.topic=a_default_topic
分区设置
如果每个topic只有一个分区设置如下 canal.mq.partition=0 如果每个topic有多个分区,且希望按照表名做hash进行如下配置
canal.mq.partitionsNum=3 canal.mq.partitionHash=.\..
原回答者GitHub用户chenxyz707
你可以在 Canal 的配置文件 canal.properties 中设置实例对应的数据库和topic映射关系。
可以在 canal.instance.instances 配置项中增加实例,例如:
canal.instance.instances = instance1,instance2
然后在 canal.instance.instance1 和 canal.instance.instance2 分别配置对应的实例信息,例如:
## instance1
canal.instance.instance1.alter.table = true
canal.instance.instance1.enable.druid = false
canal.instance.instance1.enable.gtid = false
canal.instance.instance1.filter.regex = .*\\..*
canal.instance.instance1.filter.black.regex =
canal.instance.instance1.master.address = 127.0.0.1:3306
canal.instance.instance1.master.gtid =
canal.instance.instance1.master.journal.name =
canal.instance.instance1.master.position =
canal.instance.instance1.master.timestamp =
canal.instance.instance1.position.tsdb.jdbc.driver = com.mysql.jdbc.Driver
canal.instance.instance1.position.tsdb.jdbc.password = 123456
canal.instance.instance1.position.tsdb.jdbc.url = jdbc:mysql://127.0.0.1:3306/canal_position?useUnicode=true
canal.instance.instance1.position.tsdb.jdbc.username = canal
canal.instance.instance1.position.meta.mode = mixed
canal.instance.instance1.position.meta.store = mysql
canal.instance.instance1.server.id = 1234
canal.instance.instance1.slave.address =
canal.instance.instance1.storage.batch.mode = item
canal.instance.instance1.storage.buffer.size = 256
canal.instance.instance1.storage.dbBatchSize = 1000
canal.instance.instance1.storage.es.batchSize = 1000
canal.instance.instance1.storage.es.cluster.name = elasticsearch
canal.instance.instance1.storage.es.index = canal
canal.instance.instance1.storage.es.mapping = canal
canal.instance.instance1.storage.es.mode = op
canal.instance.instance1.storage.es.type = canal
canal.instance.instance1.storage.mode = kafka
canal.instance.instance1.storage.kafka.bootstrap.servers = 127.0.0.1:9092
canal.instance.instance1.storage.kafka.retries = 0
canal.instance.instance1.storage.kafka.topic = A
canal.instance.instance1.zk.address = 127.0.0.1:2181
canal.instance.instance1.zk.password =
canal.instance.instance1.zk.session.timeout = 6000
canal.instance.instance1.zk.username =
可以看到,配置项 canal.instance.instance1.storage.kafka.topic 就是配置实例对应的topic名称,你只需要将对应的数据库名和topic名称保持一致即可。