开发者社区> 问答> 正文

HA ZK 模式下 StandBy的 canal 切换为 Running状态后,无法向Kafka发送

环境信息

canal version 1.1.2 mysql version 5.7.11

问题描述

使用ZK HA模式,一台RUNNING,一台STANDBY。读取binlog能进入kafka。

手动STOP掉一台后(RUNNING),消息无法进入KAFKA。ZK节点下已自动切换至另外一台。

ZK节点信息

[zk: 127.0.0.1:2181(CONNECTED) 30] get /otter/canal/destinations/example/running Node does not exist: /otter/canal/destinations/example/running [zk: 127.0.0.1:2181(CONNECTED) 31] get /otter/canal/destinations/example/running {"active":true,"address":"10.166.8.25:12111","cid":2} cZxid = 0x100000432 ctime = Mon Mar 11 18:25:59 CST 2019 mZxid = 0x100000432 mtime = Mon Mar 11 18:25:59 CST 2019 pZxid = 0x100000432 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x1013d8b624e0010 dataLength = 53 numChildren = 0

STAND BY 转为 RUNNING 节点日志 (canal.log) partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2019-03-11 18:30:49.976 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = all batch.size = 16384 bootstrap.servers = [10.166.15.149:9092, 10.166.15.149:9093, 10.166.15.149:9094] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 1 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2019-03-11 18:30:50.026 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.1.1 2019-03-11 18:30:50.026 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.1.1 2019-03-11 18:30:50.027 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 98b6346a977495f6 2019-03-11 18:30:50.027 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 98b6346a977495f6 2019-03-11 18:30:50.028 [main] INFO com.alibaba.otter.canal.server.CanalMQStarter - ## start the MQ workers. 2019-03-11 18:30:50.028 [main] INFO com.alibaba.otter.canal.server.CanalMQStarter - ## start the MQ workers. 2019-03-11 18:30:50.036 [main] INFO com.alibaba.otter.canal.server.CanalMQStarter - ## the MQ workers is running now ...... 2019-03-11 18:30:50.036 [main] INFO com.alibaba.otter.canal.server.CanalMQStarter - ## the MQ workers is running now ...... 2019-03-11 18:35:50.134 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: Q7I_3ZBYQiOCPAwyR8kd_A 2019-03-11 18:35:50.134 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: Q7I_3ZBYQiOCPAwyR8kd_A

2019-03-11 18:39:17.671 [pool-2-thread-1] INFO o.s.context.support.ClassPathXmlApplicationContext - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@32448ce2: startup date [Mon Mar 11 18:39:17 CST 2019]; root of context hierarchy 2019-03-11 18:39:17.710 [pool-2-thread-1] INFO o.s.beans.factory.xml.XmlBeanDefinitionReader - Loading XML bean definitions from class path resource [spring/default-instance.xml] 2019-03-11 18:39:17.820 [pool-2-thread-1] INFO o.s.beans.factory.xml.XmlBeanDefinitionReader - Loading XML bean definitions from class path resource [spring/base-instance.xml] 2019-03-11 18:39:17.957 [pool-2-thread-1] INFO o.s.beans.factory.support.DefaultListableBeanFactory - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@7a17d6d3: defining beans [com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer#0,socketAddressEditor,org.springframework.beans.factory.config.CustomEditorConfigurer#0,baseEventParser,instance,alarmHandler,zkClientx,metaManager,eventStore,eventSink,eventParser,mqConfig]; root of factory hierarchy 2019-03-11 18:39:18.100 [pool-2-thread-1] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)] 2019-03-11 18:39:18.102 [pool-2-thread-1] INFO o.s.context.support.ClassPathXmlApplicationContext - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@5efcad46: startup date [Mon Mar 11 18:39:18 CST 2019]; root of context hierarchy 2019-03-11 18:39:18.102 [pool-2-thread-1] INFO o.s.beans.factory.xml.XmlBeanDefinitionReader - Loading XML bean definitions from class path resource [spring/tsdb/h2-tsdb.xml] 2019-03-11 18:39:18.134 [pool-2-thread-1] INFO o.s.beans.factory.support.DefaultListableBeanFactory - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@735f3e7: defining beans [com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer#0,tableMetaTSDB,dataSource,sqlMapClient,metaHistoryDAO,metaSnapshotDAO]; root of factory hierarchy 2019-03-11 18:39:18.288 [pool-2-thread-1] ERROR com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set 2019-03-11 18:39:18.297 [pool-2-thread-1] INFO com.alibaba.druid.pool.DruidDataSource - {dataSource-1} inited 2019-03-11 18:39:18.542 [pool-2-thread-1] INFO c.a.o.c.p.inbound.mysql.tsdb.DefaultTableMetaTSDBFactory - example init TableMetaTSDB with classpath:spring/tsdb/h2-tsdb.xml 2019-03-11 18:39:18.548 [pool-2-thread-1] INFO com.alibaba.otter.canal.prometheus.CanalInstanceExports - Successfully register metrics for instance example. 2019-03-11 18:39:18.548 [pool-2-thread-1] INFO com.alibaba.otter.canal.prometheus.PrometheusService - Register metrics for destination example. 2019-03-11 18:39:18.563 [destination = example , address = /10.166.8.32:3306 , EventParser] INFO c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - start heart beat.... 2019-03-11 18:39:18.573 [pool-2-thread-1] INFO c.a.otter.canal.server.embedded.CanalServerWithEmbedded - start CanalInstances[example] successfully 2019-03-11 18:39:18.577 [destination = example , address = /10.166.8.32:3306 , EventParser] INFO c.alibaba.otter.canal.parse.driver.mysql.MysqlConnector - connect MysqlConnection to /10.166.8.32:3306... 2019-03-11 18:39:18.579 [destination = example , address = /10.166.8.32:3306 , EventParser] INFO c.alibaba.otter.canal.parse.driver.mysql.MysqlConnector - handshake initialization packet received, prepare the client authentication packet to send 2019-03-11 18:39:18.586 [destination = example , address = /10.166.8.32:3306 , EventParser] INFO c.alibaba.otter.canal.parse.driver.mysql.MysqlConnector - client authentication packet is sent out. 2019-03-11 18:39:18.634 [destination = example , address = /10.166.8.32:3306 , EventParser] INFO c.alibaba.otter.canal.parse.driver.mysql.MysqlConnector - connect MysqlConnection to /10.166.8.32:3306... 2019-03-11 18:39:18.634 [destination = example , address = /10.166.8.32:3306 , EventParser] INFO c.alibaba.otter.canal.parse.driver.mysql.MysqlConnector - handshake initialization packet received, prepare the client authentication packet to send 2019-03-11 18:39:18.635 [destination = example , address = /10.166.8.32:3306 , EventParser] INFO c.alibaba.otter.canal.parse.driver.mysql.MysqlConnector - client authentication packet is sent out. 2019-03-11 18:39:18.639 [destination = example , address = /10.166.8.32:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2019-03-11 18:39:18.671 [destination = example , address = /10.166.8.32:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position {"identity":{"slaveId":-1,"sourceAddress":{"address":"10.166.8.32","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000046","position":16863384,"serverId":1,"timestamp":1552300750000}} 2019-03-11 18:39:19.293 [destination = example , address = /10.166.8.32:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000046,position=16863384,serverId=1,gtid=,timestamp=1552300750000] cost : 645ms , the next step is binlog dump 2019-03-11 18:39:19.293 [destination = example , address = /10.166.8.32:3306 , EventParser] INFO c.alibaba.otter.canal.parse.driver.mysql.MysqlConnector - disConnect MysqlConnection to /10.166.8.32:3306... 2019-03-11 18:39:19.294 [destination = example , address = /10.166.8.32:3306 , EventParser] INFO c.alibaba.otter.canal.parse.driver.mysql.MysqlConnector - connect MysqlConnection to /10.166.8.32:3306... 2019-03-11 18:39:19.294 [destination = example , address = /10.166.8.32:3306 , EventParser] INFO c.alibaba.otter.canal.parse.driver.mysql.MysqlConnector - handshake initialization packet received, prepare the client authentication packet to send 2019-03-11 18:39:19.294 [destination = example , address = /10.166.8.32:3306 , EventParser] INFO c.alibaba.otter.canal.parse.driver.mysql.MysqlConnector - client authentication packet is sent out. 2019-03-11 18:39:19.350 [destination = example , address = /10.166.8.32:3306 , EventParser] INFO c.a.otter.canal.parse.inbound.mysql.MysqlConnection - Register slave RegisterSlaveCommandPacket[reportHost=10.166.8.25,reportPort=20991,reportUser=,reportPasswd= ,serverId=78900,command=21] 2019-03-11 18:39:19.352 [destination = example , address = /10.166.8.32:3306 , EventParser] INFO c.a.otter.canal.parse.inbound.mysql.MysqlConnection - COM_BINLOG_DUMP with position:BinlogDumpCommandPacket[binlogPosition=16863384,slaveServerId=78900,binlogFileName=mysql-bin.000046,command=18] 2019-03-11 18:39:19.359 [MultiStageCoprocessor-other-example-0] INFO com.taobao.tddl.dbsync.binlog.LogEvent - common_header_len= 19, number_of_event_types= 42

原提问者GitHub用户marskbt

展开
收起
古拉古拉 2023-05-08 13:48:58 101 0
2 条回答
写回答
取消 提交回答
  • 1.1.2 release 版本问题,切换至 1.1.3-alpha-2 后正常了

    原回答者GitHub用户marskbt

    2023-05-09 17:48:44
    赞同 展开评论 打赏
  • 随心分享,欢迎友善交流讨论:)

    根据您提供的信息,当使用 Canal 的 ZK HA 模式时,您在将 Standby 节点切换为 Running 状态后,无法将数据发送到 Kafka 中。这个问题可能是由于 Kafka 配置错误或 Canal Server 启动参数配置不正确导致的。

    首先,您需要检查 Kafka 集群的配置是否正确,并确保 Canal Server 的启动参数中指定了正确的 Kafka 服务器地址和端口号。您可以通过在 Canal Server 启动脚本中添加如下参数来指定 Kafka 的配置信息:

    -DCanalKafkaBootstrapServers=your.kafka.server:9092 \n-DCanalKafkaBatchSize=4096 \n-DCanalKafkaLingerMs=1 \n-DCanalKafkaBufferMemory=33554432 \n-DCanalKafkaRetries=0 \n-DCanalKafkaMaxInFlightRequestsPerConnection=1 \n-DCanalKafkaCompressionType=none \n```

    在上述参数中,-DCanalKafkaBootstrapServers 指定了 Kafka 的地址和端口号,-DCanalKafkaBatchSize 指定了每个批次发送的最大消息数,-DCanalKafkaLingerMs 指定了每个批次发送的最大等待时间,-DCanalKafkaBufferMemory 指定了 Kafka 生产者可以使用的最大内存大小,-DCanalKafkaRetries 指定了消息发送失败后的最大重试次数,-DCanalKafkaMaxInFlightRequestsPerConnection 指定了 Kafka 生产者可以在未收到响应之前发送的最大消息数,-DCanalKafkaCompressionType 指定了消息压缩算法类型。

    另外,您还需要检查 Canal Server 的运行日志,以查看是否存在任何错误或异常信息。您可以查看 Canal Server 的启动日志和运行日志,以确认 Canal Server 是否已经正确连接到 Kafka 集群,并开始发送数据到 Kafka 中。

    最后,您可以尝试重新启动 Canal Server,或者使用 Canal Admin 工具来手动切换 Canal Server 的状态,以检查是否可以正确向 Kafka 中发送数据。如果问题仍然存在,建议您联系 Canal 开发团队或者 Kafka 支持团队,以获取更详细的技术支持和帮助。

    希望以上信息能够帮助您解决问题。

    2023-05-08 14:18:46
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载