我在启动canal执行 sh ./bin/startup.sh后,在./log/canal/canal.log里看到大量下面的错误(Connection to node -1 could not be established. Broker may not be available.),然后binlog也没有输出到kafka,麻烦各位大侠看一下,谢谢。 2018-11-14 17:07:09.669 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.175.155:11111] 2018-11-14 17:07:10.601 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)] 2018-11-14 17:07:10.719 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ...... 2018-11-14 17:07:10.800 [destination = example , address = /192.168.175.140:3406 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position {"identity":{"slaveId":-1,"sourceAddress":{"address":"192.168.175.140","port":3406}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000013","position":6642707,"serverId":1001,"timestamp":1541529191000}} 2018-11-14 17:07:10.826 [destination = example , address = /192.168.175.140:3406 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - find start position : EntryPosition[included=false,journalName=mysql-bin.000013,position=6642707,serverId=1001,gtid=,timestamp=1541529191000] 2018-11-14 17:07:11.639 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available. 2018-11-14 17:07:11.639 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available. 2018-11-14 17:07:11.690 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available. 2018-11-14 17:07:11.690 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available. 2018-11-14 17:07:11.792 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available. 2018-11-14 17:07:11.792 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
这个问题,我试过这个办法也不行 http://www.voidcn.com/article/p-yorufhta-brs.html
手动推消息给kafka是ok的, echo "delete db002.table002 where id=1;" | /data/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hihihi
/data/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hihihi --from-beginning
下面是kafka.yml配置: cat >conf/kafka.yml<<EOF servers: 127.0.0.1:9092 retries: 0 batchSize: 16384 lingerMs: 1 bufferMemory: 33554432 canalBatchSize: 50 canalGetTimeout: 100 flatMessage: true canalDestinations:
canalDestination: example topic: hihihi partition: 1 EOF 下面是instance.properties配置: cat >conf/example/instance.properties<<EOF canal.instance.mysql.slaveId=175140 canal.instance.gtidon=false canal.instance.master.address=192.168.175.140:3406 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= canal.instance.tsdb.enable=true canal.instance.dbUsername=canal canal.instance.dbPassword=canalpwd canal.instance.connectionCharset=utf-8 canal.instance.defaultDatabaseName=db002 canal.instance.enableDruid=false canal.instance.filter.regex=db002.table002; canal.instance.filter.black.regex= EOF
下面是canal.properties配置: cp conf/canal.properties conf/canal.properties.$(date +%Y%m%d%H%M%S) sed -i '/canal.id/c canal.id=1' conf/canal.properties sed -i '/canal.ip/c canal.ip=127.0.0.1' conf/canal.properties sed -i '/canal.port/c canal.port=11111' conf/canal.properties sed -i '/canal.metrics.pull.port/c canal.metrics.pull.port=11112' conf/canal.properties sed -i '/canal.zkServers/c canal.zkServers=127.0.0.1:2181' conf/canal.properties sed -i '/canal.withoutNetty/c canal.withoutNetty=true' conf/canal.properties sed -i '/canal.serverMode/c canal.serverMode=kafka' conf/canal.properties sed -i '/canal.destinations/c canal.destinations=example' conf/canal.properties sed -i '/canal.instance.tsdb.spring.xml/c #canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml' conf/canal.properties
下面是kafka配置 cat >/data/kafka/config/server.properties<<EOF broker.id=0 listeners=PLAINTEXT://:9092 num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/data/kafka/data num.partitions=2 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 #log.flush.interval.messages=10000 #log.flush.interval.ms=1000 log.retention.hours=168 log.segment.bytes=536870912 log.retention.check.interval.ms=30000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=30000 group.initial.rebalance.delay.ms=0 EOF
原提问者GitHub用户xiukaiyu
应该是kafka的连接参数不对, partition: 设置为0 或者 不填试试
看一下kafka的 listeners=PLAINTEXT:// ip是什么, servers: 用这个ip+端口, 不要用127.0.0.1
kafka 的yml servers: 192.168.0.210:9092 这样配置试一下
原回答者GitHub用户rewerma
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。