环境信息
canal version 1.1.3 mysql version 5.7
问题描述
在集成kafka的情况下,canal在拉取到binlog之后,匹配topic的时候,如果topic字母相同但大小写不同会引发kafka的日志恢复并resize,报错如下:(之前已有一个topic:redis_cachecloud_QRTZ_FIRED_TRIGGERS) ERROR Error while creating log for redis_cachecloud_qrtz_fired_triggers-0 in dir E:\kafka\logs\kafka2 (kafka.server.LogDirFailureChannel) java.io.IOException: 请求的操作无法在使用用户映射区域打开的文件上执行 at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.AbstractIndex$$anonfun$resize$1.apply$mcZ$sp(AbstractIndex.scala:186) at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:173) at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:173) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.log.AbstractIndex.resize(AbstractIndex.scala:173) at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcZ$sp(AbstractIndex.scala:242) at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:242) at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:242) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:241) at kafka.log.LogSegment.recover(LogSegment.scala:377) at kafka.log.Log.kafka$log$Log$$recoverSegment(Log.scala:467) at kafka.log.Log.kafka$log$Log$$recoverLog(Log.scala:581) at kafka.log.Log$$anonfun$2.apply$mcJ$sp(Log.scala:552) at kafka.log.Log$$anonfun$2.apply(Log.scala:552) at kafka.log.Log$$anonfun$2.apply(Log.scala:552) at kafka.log.Log.retryOnOffsetOverflow(Log.scala:1938) at kafka.log.Log.loadSegments(Log.scala:551) at kafka.log.Log.(Log.scala:276) at kafka.log.Log$.apply(Log.scala:2071) at kafka.log.LogManager$$anonfun$getOrCreateLog$1.apply(LogManager.scala:691) at kafka.log.LogManager$$anonfun$getOrCreateLog$1.apply(LogManager.scala:659) at scala.Option.getOrElse(Option.scala:121) at kafka.log.LogManager.getOrCreateLog(LogManager.scala:659) at kafka.cluster.Partition$$anonfun$getOrCreateReplica$1.apply(Partition.scala:199) at kafka.cluster.Partition$$anonfun$getOrCreateReplica$1.apply(Partition.scala:195) at kafka.utils.Pool$$anon$2.apply(Pool.scala:61) at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660) at kafka.utils.Pool.getAndMaybePut(Pool.scala:60) at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:194) at kafka.cluster.Partition$$anonfun$makeFollower$1$$anonfun$apply$mcZ$sp$3.apply(Partition.scala:439) at kafka.cluster.Partition$$anonfun$makeFollower$1$$anonfun$apply$mcZ$sp$3.apply(Partition.scala:439) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.cluster.Partition$$anonfun$makeFollower$1.apply$mcZ$sp(Partition.scala:439) at kafka.cluster.Partition$$anonfun$makeFollower$1.apply(Partition.scala:431) at kafka.cluster.Partition$$anonfun$makeFollower$1.apply(Partition.scala:431) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at kafka.cluster.Partition.makeFollower(Partition.scala:431) at kafka.server.ReplicaManager$$anonfun$makeFollowers$3.apply(ReplicaManager.scala:1244) at kafka.server.ReplicaManager$$anonfun$makeFollowers$3.apply(ReplicaManager.scala:1238) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:1238) at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1076) at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:185) at kafka.server.KafkaApis.handle(KafkaApis.scala:110) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:748)
步骤重现
既然数据库不区分大小写,那为何动态匹配规则的时候要弄成两个topic?可否修改?
期待结果
正确匹配
现执行情况
broker报错
我查看了binlog,确实是同一张表,但是记录却是大小写不一致,结果导致canal认为是两张表的修改,发到kafka的时候就会出现错误,后来我在canal的MQMessageUtils类里动态匹配topic那里修改了,直接全部通过String的toLowerCase()改为小写并清除kafka log,重启broker之后就能正常运行了
原提问者GitHub用户finefuture
MQMessageUtils我看都是默认忽略了大小写进行匹配的,filter和simpleName的匹配都是
原回答者GitHub用户agapple
这个问题可能是因为 Windows 系统的文件名大小写不敏感导致的。在 Windows 系统中,redis_cachecloud_QRTZ_FIRED_TRIGGERS
和 redis_cachecloud_qrtz_fired_triggers
实际上被视为相同的文件名。但是在 Kafka 中,每个 topic 都会对应一个独立的数据目录,如果两个 topic 的名称只有大小写不同,实际上会被视为两个不同的目录,导致 Kafka 在写入数据时出现问题。因此,建议在创建 topic 时使用统一的大小写规范,避免出现大小写不同但名称相同的情况,这样可以避免这个问题的发生。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。