Write-Ahead log 预写日志
预写日志(WAL,Write-Ahead Log)将每次状态更新抽象为一个命令并追加写入一个日志中,这个日志只追加写入,也就是顺序写入,所以 IO 会很快。相比于更新存储的数据结构并且更新落盘这个随机 IO 操作,写入速度更快了,并且也提供了一定的持久性,也就是数据不会丢失,可以根据这个日志恢复数据。
背景介绍
如果遇到了服务器存储数据失败,例如已经确认客户端的请求,但是存储过程中,重启进程导致真正存储的数据没有落盘,在重启后,也需要保证已经答应客户端的请求数据更新真正落盘成功。
解决方案
将每一个更新,抽象为一个指令,并将这些指令存储在一个文件中。每个进程顺序追加写各自独立的一个文件,简化了重启后日志的处理,以及后续的在线更新操作。每个日志记录有一个独立 id,这个 id 可以用来实现分段日志(Segmented Log)或者最低水位线(Low-Water Mark)清理老的日志。日志更新可以使用单一更新队列(Singular Update Queue)这种设计模式。
日志记录的结构类似于:
class WALEntry { //日志id private final Long entryId; //日志内容 private final byte[] data; //类型 private final EntryType entryType; //时间 private long timeStamp; }
在每次重新启动时读取日志文件,回放所有日志条目来恢复当前数据状态。
假设有一内存键值对数据库:
class KVStore { private Map<String, String> kv = new HashMap<>(); public String get(String key) { return kv.get(key); } public void put(String key, String value) { appendLog(key, value); kv.put(key, value); } private Long appendLog(String key, String value) { return wal.writeEntry(new SetValueCommand(key, value).serialize()); } }
put 操作被抽象为 SetValueCommand,在更新内存 hashmap 之前将其序列化并存储在日志中。SetValueCommand 可以序列化和反序列化。
class SetValueCommand { final String key; final String value; public SetValueCommand(String key, String value) { this.key = key; this.value = value; } @Override public byte[] serialize() { try { //序列化 var baos = new ByteArrayOutputStream(); var dataInputStream = new DataOutputStream(baos); dataInputStream.writeInt(Command.SetValueType); dataInputStream.writeUTF(key); dataInputStream.writeUTF(value); return baos.toByteArray(); } catch (IOException e) { throw new RuntimeException(e); } } public static SetValueCommand deserialize(InputStream is) { try { //反序列化 DataInputStream dataInputStream = new DataInputStream(is); return new SetValueCommand(dataInputStream.readUTF(), dataInputStream.readUTF()); } catch (IOException e) { throw new RuntimeException(e); } } }
这可以确保即使进程重启,这个 hashmap 也可以通过在启动时读取日志文件来恢复。
class KVStore { public KVStore(Config config) { this.config = config; this.wal = WriteAheadLog.openWAL(config); this.applyLog(); } public void applyLog() { List<WALEntry> walEntries = wal.readAll(); applyEntries(walEntries); } private void applyEntries(List<WALEntry> walEntries) { for (WALEntry walEntry : walEntries) { Command command = deserialize(walEntry); if (command instanceof SetValueCommand) { SetValueCommand setValueCommand = (SetValueCommand)command; kv.put(setValueCommand.key, setValueCommand.value); } } } public void initialiseFromSnapshot(SnapShot snapShot) { kv.putAll(snapShot.deserializeState()); } }
实现考虑
首先是保证 WAL 日志真的写入了磁盘。所有编程语言提供的文件处理库提供了一种机制,强制操作系统将文件更改flush
落盘。在flush
时,需要考虑的是一种权衡。对于日志的每一条记录都flush
一次,保证了强持久性,但是严重影响了性能并且很快会成为性能瓶颈。如果是异步flush
,性能会提高,但是如果在flush
前程序崩溃,则有可能造成日志丢失。大部分的实现都采用批处理,减少flush
带来的性能影响,同时也尽量少丢数据。
另外,我们还需要保证日志文件没有损坏。为了处理这个问题,日志条目通常伴随 CRC 记录写入,然后在读取文件时进行验证。
同时,采用单个日志文件可能变得很难管理(很难清理老日志,重启时读取文件过大)。为了解决这个问题,通常采用之前提到的分段日志(Segmented Log)或者最低水位线(Low-Water Mark)来减少程序启动时读取的文件大小以及清理老的日志。
最后,要考虑重试带来的重复问题,也就是幂等性。由于 WAL 日志仅附加,在发生客户端通信失败和重试时,日志可能包含重复的条目。当读取日志条目时,可能会需要确保重复项被忽略。但是如果存储类似于 HashMap,其中对同一键的更新是幂等的,则不需要排重,但是可能会存在 ABA 更新问题。一般都需要实现某种机制来标记每个请求的唯一标识符并检测重复请求。
举例
各种 MQ 中的类似于 CommitLog 的日志
MQ 中的消息存储,由于消息队列的特性导致消息存储和日志类似,所以一般用日志直接作为存储。这个消息存储一般就是 WAL 这种设计模式,以 RocketMQ 为例子:
RocketMQ:
RocketMQ 存储首先将消息存储在 Commitlog 文件之中,这个文件采用的是 mmap (文件映射内存)技术写入与保存。关于这个技术,请参考另一篇文章JDK核心JAVA源码解析(5) - JAVA File MMAP原理解析
当消息来时,写入文件的核心方法是MappedFile
的appendMessagesInner
方法:
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { assert messageExt != null; assert cb != null; //获取当前写入位置 int currentPos = this.wrotePosition.get(); //如果当前写入位置小于文件大小则尝试写入 if (currentPos < this.fileSize) { //mappedByteBuffer是公用的,在这里不能修改其position影响读取 //mappedByteBuffer是文件映射内存抽象出来的文件的内存ByteBuffer //对这个buffer的写入,就相当于对文件的写入 //所以通过slice方法生成一个共享原有相同内存的新byteBuffer,设置position //如果writeBuffer不为空,则证明启用了TransientStorePool,使用其中缓存的内存写入 ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result; //分单条消息还有批量消息的情况 if (messageExt instanceof MessageExtBrokerInner) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } //增加写入大小 this.wrotePosition.addAndGet(result.getWroteBytes()); //更新最新消息保存时间 this.storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); }