最低水位线(Low-Water Mark)
最低水位线是指在 WAL(Write Ahead Log)预写日志这种设计模式中,标记在这个位置之前的日志可以被丢弃。
问题背景
WAL(Write Ahead Log)预写日志维护了对于存储的每次更新,随着时间不断增长,这个日志文件会变得无限大。Segmented Log 分割日志这种设计模式可以让我们每次只处理一个更小的文件,但是日志如果不清理,会无休止增长以至于硬盘被占满。
解决方案
最低水位线这种设计模式会告诉系统哪一部分的日志可以被删除了,即在最低水位线之前的所有日志可以被清理掉。一般的方式是,程序内有一个线程运行一个定时任务,不断地检查哪一部分的日志可以被清理并且删除这些日志文件。
this.logCleaner = newLogCleaner(config); this.logCleaner.startup();
这里的 LogCleaner 可以用定时任务实现:
public void startup() { scheduleLogCleaning(); } private void scheduleLogCleaning() { singleThreadedExecutor.schedule(() -> { cleanLogs(); }, config.getCleanTaskIntervalMs(), TimeUnit.MILLISECONDS); }
基于快照的最低水位线实现以及示例
大部分的分布式一致性系统(例如 Zookeeper(ZAB 简化 paxos协议),etcd(raft协议)),都实现了快照机制。在这种机制下,他们的存储引擎会定时的进行全量快照,并且记录下快照对应的日志位置,将这个位置作为最低水位线。
//进行快照 public SnapShot takeSnapshot() { //获取最近的日志id Long snapShotTakenAtLogIndex = wal.getLastLogEntryId(); //利用这个日志 id 作为标识,生成快照 return new SnapShot(serializeState(kv), snapShotTakenAtLogIndex); }
当生成了快照并成功存储到了磁盘上,对应的最低水位线将用来清理老的日志:
//根据位置获取这个位置之前的所有日志文件 List<WALSegment> getSegmentsBefore(Long snapshotIndex) { List<WALSegment> markedForDeletion = new ArrayList<>(); List<WALSegment> sortedSavedSegments = wal.sortedSavedSegments; for (WALSegment sortedSavedSegment : sortedSavedSegments) { //如果这个日志文件的最新log id 小于快照位置,证明可以被清理掉 if (sortedSavedSegment.getLastLogEntryId() < snapshotIndex) { markedForDeletion.add(sortedSavedSegment); } } return markedForDeletion; }
zookeeper 中的最低水位线实现
定时任务位于DatadirCleanupManager
的start
方法:
public void start() { //只启动一次 if (PurgeTaskStatus.STARTED == purgeTaskStatus) { LOG.warn("Purge task is already running."); return; } //检查定时间隔有效性 if (purgeInterval <= 0) { LOG.info("Purge task is not scheduled."); return; } //启动定时任务 timer = new Timer("PurgeTask", true); TimerTask task = new PurgeTask(dataLogDir, snapDir,snapRetainCount); timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval)); purgeTaskStatus = PurgeTaskStatus.STARTED; }
核心方法为PurgeTxnLog
的purge
方法:
public static void purge(File dataDir, File snapDir, int num) throws IOException { //保留的snapshot数量不能超过3 if (num < 3) { throw new IllegalArgumentException(COUNT_ERR_MSG); } FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir); //统计文件数量 List<File> snaps = txnLog.findNValidSnapshots(num); int numSnaps = snaps.size(); if (numSnaps > 0) { //利用上一个文件的日志偏移,清理log文件和snapshot文件 purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1)); } } static void purgeOlderSnapshots(FileTxnSnapLog txnLog, File snapShot) { //名字包括开头的zxid,就是代表了日志位置 final long leastZxidToBeRetain = Util.getZxidFromName(snapShot.getName(), PREFIX_SNAPSHOT); final Set<File> retainedTxnLogs = new HashSet<File>(); retainedTxnLogs.addAll(Arrays.asList(txnLog.getSnapshotLogs(leastZxidToBeRetain))); class MyFileFilter implements FileFilter { private final String prefix; MyFileFilter(String prefix) { this.prefix = prefix; } public boolean accept(File f) { if (!f.getName().startsWith(prefix + ".")) { return false; } if (retainedTxnLogs.contains(f)) { return false; } long fZxid = Util.getZxidFromName(f.getName(), prefix); //根据文件名称代表的zxid,过滤出要删除的文件 return fZxid < leastZxidToBeRetain; } } //筛选出符合条件的 log 文件和 snapshot 文件 File[] logs = txnLog.getDataDir().listFiles(new MyFileFilter(PREFIX_LOG)); List<File> files = new ArrayList<>(); if (logs != null) { files.addAll(Arrays.asList(logs)); } File[] snapshots = txnLog.getSnapDir().listFiles(new MyFileFilter(PREFIX_SNAPSHOT)); if (snapshots != null) { files.addAll(Arrays.asList(snapshots)); } //进行删除 for (File f : files) { final String msg = String.format( "Removing file: %s\t%s", DateFormat.getDateTimeInstance().format(f.lastModified()), f.getPath()); LOG.info(msg); System.out.println(msg); if (!f.delete()) { System.err.println("Failed to remove " + f.getPath()); } } }
那么是什么时候 snapshot 呢?查看SyncRequestProcessor
的run
方法,这个方法时处理请求,处理请求的时候记录操作日志到 log 文件,同时在有需要进行 snapshot 的时候进行 snapshot:
public void run() { try { //避免所有的server都同时进行snapshot resetSnapshotStats(); lastFlushTime = Time.currentElapsedTime(); while (true) { //获取请求代码省略 // 请求操作纪录成功 if (!si.isThrottled() && zks.getZKDatabase().append(si)) { //是否需要snapshot if (shouldSnapshot()) { //重置是否需要snapshot判断相关的统计 resetSnapshotStats(); //另起新文件 zks.getZKDatabase().rollLog(); //进行snapshot,先获取锁,保证只有一个进行中的snapshot if (!snapThreadMutex.tryAcquire()) { LOG.warn("Too busy to snap, skipping"); } else { //异步snapshot new ZooKeeperThread("Snapshot Thread") { public void run() { try { zks.takeSnapshot(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { //释放锁 snapThreadMutex.release(); } } }.start(); } } } //省略其他 } } catch (Throwable t) { handleException(this.getName(), t); } }
resetSnapshotStats()
设置随机起始位,避免集群内所有实例同时进行 snapshot:
private void resetSnapshotStats() { //生成随机roll,snapCount(默认100000) randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2); //生成随机size,snapSizeInBytes(默认4GB) randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2)); }
shouldSnapshot()
根据启动时设置的随机起始位以及配置,判断是否需要 snapshot
private boolean shouldSnapshot() { //获取日志计数 int logCount = zks.getZKDatabase().getTxnCount(); //获取大小 long logSize = zks.getZKDatabase().getTxnSize(); //当日志个数大于snapCount(默认100000)/2 + 随机roll,或者日志大小大于snapSizeInBytes(默认4GB)/2+随机size return (logCount > (snapCount / 2 + randRoll)) || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize)); }
``
基于时间的最低水位线实现与示例
在某些系统中,日志不是用来更新系统的状态,可以在一段时间之后删除,并且不用考虑任何子系统这个最低水位线之前的是否可以删除。例如,kafka 默认保留 7 天的 log,RocketMQ 默认保留 3 天的 commit log。
RocketMQ中最低水位线实现
在 DefaultMeesageStore
的addScheduleTask()
方法中,定义了清理的定时任务:
private void addScheduleTask() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { DefaultMessageStore.this.cleanFilesPeriodically(); } }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); //忽略其他定时任务 } private void cleanFilesPeriodically() { //清理消息存储文件 this.cleanCommitLogService.run(); //清理消费队列文件 this.cleanConsumeQueueService.run(); }
我们这里只关心清理消息存储文件,即DefaultMessageStore
的deleteExpiredFiles
方法:
private void deleteExpiredFiles() { int deleteCount = 0; //文件保留时间,就是文件最后一次更新时间到现在的时间间隔,如果超过了这个时间间隔,就认为可以被清理掉了 long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); //删除文件的间隔,每次清理可能不止删除一个文件,这个配置指定两个文件删除之间的最小间隔 int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); //清理文件时,可能文件被其他线程占用,例如读取消息,这时不能轻易删除 //在第一次触发时,记录一个当前时间戳,当与当前时间间隔超过这个配置之后,强制删除 int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); //判断是否要删除的时间到了 boolean timeup = this.isTimeToDelete(); //判断磁盘空间是否还充足 boolean spacefull = this.isSpaceToDelete(); //是否是手工触发 boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; //满足其一,就执行清理 if (timeup || spacefull || manualDelete) { if (manualDelete) this.manualDeleteFileSeveralTimes--; boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; fileReservedTime *= 60 * 60 * 1000; //清理文件 deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce); if (deleteCount > 0) { } else if (spacefull) { log.warn("disk space will be full soon, but delete file failed."); } } }
清理文件的代码MappedFile
的deleteExpiredFileByTime
方法:
public int deleteExpiredFileByTime(final long expiredTime, final int deleteFilesInterval, final long intervalForcibly, final boolean cleanImmediately) { Object[] mfs = this.copyMappedFiles(0); if (null == mfs) return 0; //刨除最新的那个文件 int mfsLength = mfs.length - 1; int deleteCount = 0; List<MappedFile> files = new ArrayList<MappedFile>(); if (null != mfs) { for (int i = 0; i < mfsLength; i++) { MappedFile mappedFile = (MappedFile) mfs[i]; long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; //如果超过了过期时间,或者需要立即清理 if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { //关闭,清理并删除文件 if (mappedFile.destroy(intervalForcibly)) { files.add(mappedFile); deleteCount++; if (files.size() >= DELETE_FILES_BATCH_MAX) { break; } //如果配置了删除文件时间间隔,则需要等待 if (deleteFilesInterval > 0 && (i + 1) < mfsLength) { try { Thread.sleep(deleteFilesInterval); } catch (InterruptedException e) { } } } else { break; } } else { //avoid deleting files in the middle break; } } } //从文件列表里面里将本次删除的文件剔除 deleteExpiredFile(files); return deleteCount; }