直接上SessionHandler的代码段 `case SUBSCRIPTION: Sub sub = Sub.parseFrom(packet.getBody()); if (StringUtils.isNotEmpty(sub.getDestination()) && StringUtils.isNotEmpty(sub.getClientId())) { clientIdentity = new ClientIdentity(sub.getDestination(), Short.valueOf(sub.getClientId()), sub.getFilter()); MDC.put("destination", clientIdentity.getDestination()); embeddedServer.subscribe(clientIdentity);
// 尝试启动,如果已经启动,忽略
if (!embeddedServer.isStart(clientIdentity.getDestination())) {
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
if (!runningMonitor.isStart()) {
runningMonitor.start();
}
}
ctx.setAttachment(clientIdentity);// 设置状态数据
NettyUtils.ack(ctx.getChannel(), null);
} else {
NettyUtils.error(401,
MessageFormatter.format("destination or clientId is null", sub.toString()).getMessage(),
ctx.getChannel(),
null);
}
break;
问题代码为:embeddedServer.subscribe(clientIdentity); instance还没有启动就可以执行subscribe??? 个人认为正确的逻辑应该是:先启动runningMonitor,再判断instance是否启动,最后再执行subscribe。
按目前的代码逻辑,会产生如下的问题:
1.两台canalserver和两台canalclient正在运行,其中有个instance名称为xxx
2.将xxx的Active置为false,xxx会被release掉,然后触发重新抢占
3.在抢占的过程中,zk上xxx/cluster节点和xxx/running节点的注册是有时间间隔的
4.因为有时间间隔,所以,按照ClusterNodeAccessStrategy的逻辑,canalclient在进行connect时拿到的ip不一定是以后抢占到running节点的那个canalserver的ip
5.即使ip有问题,仍然可以进行connect和subscribe操作,但在get的时候会报错,然后重连
6.虽然重连了,但是subscribe时产生了脏数据,position信息被缓存到metaManager了
7.过了几天xxx处于running状态的那台canal关了,此时slave-canal接管,会启动instance,但是metaManager已经启动过了,postion定位到了几天前
8.悲剧了
备注:
1.此处判断runningMonitor的目的是想触发lazy-start,但是lazy-start在HA模式下不成立
2.强烈建议改造一下ClusterNodeAccessStrategy,HA模式下取Address的时候只认running节点,就甭支持lazy了,按现在的设计为了支持lazy徒增了复杂度,导致客户端的高可用切换的时间有时非常长
原提问者GitHub用户lulu2panpan
我看了下代码,embeddedServer.subscribe(clientIdentity);订阅时会提前启动metaManager来记录订阅的信息,后启动CanalInstance,在拿数据的时候才会去校验是否拿到了锁启动了服务
原回答者GitHub用户agapple
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。