现象
线上有2个集群A和B,配置了双向同步,单活,即业务层某一时刻只会访问其中一个集群;
近期A集群的regionserver日志中报了很多异常,但监控页面正常,功能也未受影响。
HBase版本为2.0.0;
2019-09-04 02:44:58,115 WARN org.apache.zookeeper.ClientCnxn: Session 0x36abf38cec5531d for server host-15/ip-15:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:117)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:355)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1073)
定位
1、查看前后日志,可以看出该异常与replication有关
2019-09-04 02:44:56,960 INFO org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl: Atomically moving host-17,16020,1561539485645/1's WALs to my queue
2019-09-04 02:44:58,115 WARN org.apache.zookeeper.ClientCnxn: Session 0x36abf38cec5531d for server host-15/ip-15:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:117)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:355)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1073)
...
中间多次Broken pipe的异常日志
...
2019-09-04 02:45:45,544 ERROR org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper: ZooKeeper multi failed after 4 attempts
2019-09-04 02:45:45,544 WARN org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl: Got exception in copyQueuesFromRSUsingMulti:
org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss
at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
at org.apache.zookeeper.ZooKeeper.multiInternal(ZooKeeper.java:944)
at org.apache.zookeeper.ZooKeeper.multi(ZooKeeper.java:924)
at org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.multi(RecoverableZooKeeper.java:663)
at org.apache.hadoop.hbase.zookeeper.ZKUtil.multiOrSequential(ZKUtil.java:1670)
at org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl.moveQueueUsingMulti(ReplicationQueuesZKImpl.java:318)
at org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl.claimQueue(ReplicationQueuesZKImpl.java:210)
at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager$NodeFailoverWorker.run(ReplicationSourceManager.java:686)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2、为什么会有这个操作
HBase的replication处理机制是,由各个regionServer负责自己产生的wal文件,如果某个rs退出,则其它rs会认领该rs尚未完成的wal文件同步任务,上面的日志就是认领动作产生的日志。
3、为什么会有rs退出
在cdh中查看命令运行日志,最近的一次重启是8月16,应该是当时修改了集群参数;
查看日志中最早出现异常的时间,也是8月16。
4、为什么认领失败
查看zk日志,有大量如下警告信息
2019-09-04 02:45:44,270 WARN org.apache.zookeeper.server.NIOServerCnxn: Exception causing close of session 0x36abf38cec5531a due to java.io.IOException: Len error 4448969
这个错是zk的一个bug,会在单次请求涉及过多数据量的时候触发,修复版本是3.4.7, 3.5.2, 3.6.0,我们的线上版本是3.4.5,相关的issue:
https://issues.apache.org/jira/browse/ZOOKEEPER-706
但这只是直接原因,即使修复这个bug,也只是将问题暂时掩盖,根本原因还需要继续分析。
5、为什么认领时涉及大量数据
rs在认领任务时,对于每个wal文件会生成一个OP对象,然后封装到一个List中,最后调用multi执行
private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) {
try {
// hbase/replication/rs/deadrs
String deadRSZnodePath = ZNodePaths.joinZNode(this.queuesZNode, znode);
List<ZKUtilOp> listOfOps = new ArrayList<>();
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
String newPeerId = peerId + "-" + znode;
String newPeerZnode = ZNodePaths.joinZNode(this.myQueuesZnode, newPeerId);
// check the logs queue for the old peer cluster
String oldClusterZnode = ZNodePaths.joinZNode(deadRSZnodePath, peerId);
List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
if (!peerExists(replicationQueueInfo.getPeerId())) {
LOG.warn("Peer " + replicationQueueInfo.getPeerId() +
" didn't exist, will move its queue to avoid the failure of multi op");
for (String wal : wals) {
String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal);
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
}
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
return null;
}
SortedSet<String> logQueue = new TreeSet<>();
if (wals == null || wals.isEmpty()) {
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
} else {
// create the new cluster znode
ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
listOfOps.add(op);
// get the offset of the logs and set it to new znodes
for (String wal : wals) {
String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal);
byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode);
LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset));
String newLogZnode = ZNodePaths.joinZNode(newPeerZnode, wal);
listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
logQueue.add(wal);
}
// add delete op for peer
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
if (LOG.isTraceEnabled())
LOG.trace(" The multi list size is: " + listOfOps.size());
}
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
LOG.info("Atomically moved " + znode + "/" + peerId + "'s WALs to my queue");
return new Pair<>(newPeerId, logQueue);
} catch (KeeperException e) {
// Multi call failed; it looks like some other regionserver took away the logs.
LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
} catch (InterruptedException e) {
LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
Thread.currentThread().interrupt();
}
return null;
}
这意味着,退出的rs遗留了大量的wal同步任务,查看zk上该rs的任务列表,路径是/hbase/replication/rs/#rs实例名#/#peerId#;
结果打印出茫茫多的子节点,10秒钟都没打印结束;
具体数量不好统计,但从cdh中可以看到集群的zk有30多万个znode,作为参考,马驹桥集群的zk只有1.2万个znode。
6、为什么会出现这么多znode
查看replication模块的源码,梳理核心流程如下:
walReader和walShipper的实现类分别为ReplicationSourceWALReader和ReplicationSourceShipper,各自起了一个线程,以生产者消费者方式通过一个队列通信;
删除wal znode是由shipper负责的,实际逻辑是在shipEdits方法中,在数据发送结束后执行,可确保数据不丢失,删除时比该wal更旧的wal对应的znode会一起删除;
由于积压了很多znode,猜测该shipEdits方法应该是长时间没有被调用,查看这2个线程的栈信息:
"main-EventThread.replicationSource,2.replicationSource.host-17%2C16020%2C1567586932902.host-17%2C16020%2C1567586932902.regiongroup-0,2.replicationSource.wal-reader.host-17%2C16020%2C1567586932902.host-17%2C16020%2C1567586932902.regiongroup-0,2" #157238 daemon prio=5 os_prio=0 tid=0x00007f7634be8800 nid=0x377ef waiting on condition [0x00007f6114c0e000]"main-EventThread.replicationSource,2.replicationSource.host-17%2C16020%2C1567586932902.host-17%2C16020%2C1567586932902.regiongroup-0,2.replicationSource.wal-reader.host-17%2C16020%2C1567586932902.host-17%2C16020%2C1567586932902.regiongroup-0,2" #157238 daemon prio=5 os_prio=0 tid=0x00007f7634be8800 nid=0x377ef waiting on condition [0x00007f6114c0e000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.handleEmptyWALEntryBatch(ReplicationSourceWALReader.java:192) at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.run(ReplicationSourceWALReader.java:142)
"main-EventThread.replicationSource,2.replicationSource.host-17%2C16020%2C1567586932902.host-17%2C16020%2C1567586932902.regiongroup-0,2" #157237 daemon prio=5 os_prio=0 tid=0x00007f76350b0000 nid=0x377ee waiting on condition [0x00007f6108173000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00007f6f99bb6718> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.take(ReplicationSourceWALReader.java:248) at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceShipper.run(ReplicationSourceShipper.java:108)
从栈信息可以看出,shipper确实处于阻塞状态;
查看walReader的相关代码,写入队列的部分如下:
WALEntryBatch batch = readWALEntries(entryStream);
if (batch != null && batch.getNbEntries() > 0) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Read %s WAL entries eligible for replication",
batch.getNbEntries()));
}
entryBatchQueue.put(batch);
sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL
handleEmptyWALEntryBatch(batch, entryStream.getCurrentPath());
}
推测是一直进入了else逻辑,handleEmptyWALEntryBatch方法代码如下:
protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath)
throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
Thread.sleep(sleepForRetries);
}
打开该类的trace级别log,可以看到不停的打印如下日志:
2019-09-10 17:09:34,093 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL
2019-09-10 17:09:35,096 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL
2019-09-10 17:09:36,099 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL
2019-09-10 17:09:37,102 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL
2019-09-10 17:09:38,105 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL
2019-09-10 17:09:39,108 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL
同时,并没有类似"WAL entries eligible for replication"的日志出现;
至此,可以确认是walReader读取时一直返回了empty的batch对象导致;
wal里面有数据,但是返回为空的原因是,被过滤掉了,readWALEntries方法的代码如下:
private WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException {
WALEntryBatch batch = null;
while (entryStream.hasNext()) {
if (batch == null) {
batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
}
Entry entry = entryStream.next();
entry = filterEntry(entry);
if (entry != null) {
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySize(entry);
batch.addEntry(entry);
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
// Stop if too many entries or too big
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
|| batch.getNbEntries() >= replicationBatchCountCapacity) {
break;
}
}
}
}
return batch;
}
这里起作用的是ClusterMarkingEntryFilter,HBase为了避免master-master模式下出现replication环路,会在同步的walEntry中写入源集群的clusterId,如果当前处理的entry中的clusterId与目标集群相同,说明它是从目标集群同步过来的,就不用再同步回去了,代码如下;
public Entry filter(Entry entry) {
// don't replicate if the log entries have already been consumed by the cluster
if (replicationEndpoint.canReplicateToSameCluster()
|| !entry.getKey().getClusterIds().contains(peerClusterId)) {
WALEdit edit = entry.getEdit();
WALKeyImpl logKey = (WALKeyImpl)entry.getKey();
if (edit != null && !edit.isEmpty()) {
// Mark that the current cluster has the change
logKey.addClusterId(clusterId);
// We need to set the CC to null else it will be compressed when sent to the sink
entry.setCompressionContext(null);
return entry;
}
}
return null;
}
至此,原因基本清楚,线上的2个HBase集群配置为master-master模式的双向同步,但是只有1个作为active集群会写入数据,而作为backup的集群接受同步数据时会产生wal,但一直没有写入操作就一直触发不了shipper的删除动作。
解决
在读取的batch为empty时依然更新其lastWalPosition,并写入带有到队列中,以触发shipper的清理动作;
这个bug存在于2.0分支,2.1之后这部分代码在实现serial replication时做了改动,问题已经不存在;
对于2.0分支,已提了一个issue(https://issues.apache.org/jira/browse/HBASE-23008)到社区,但由于提交pr时该分支已停止维护,因此并未合并进去;