一、前言
前一篇已经分析了序列化,这篇接着分析Zookeeper的持久化过程源码,持久化对于数据的存储至关重要,下面进行详细分析。
二、持久化总体框架
持久化的类主要在包org.apache.zookeeper.server.persistence下,此次也主要是对其下的类进行分析,其包下总体的类结构如下图所示。
· TxnLog,接口类型,读取事务性日志的接口。
· FileTxnLog,实现TxnLog接口,添加了访问该事务性日志的API。
· Snapshot,接口类型,持久层快照接口。
· FileSnap,实现Snapshot接口,负责存储、序列化、反序列化、访问快照。
· FileTxnSnapLog,封装了TxnLog和SnapShot。
· Util,工具类,提供持久化所需的API。
下面先来分析TxnLog和FileTxnLog的源码。
三、TxnLog源码分析
TxnLog是接口,规定了对日志的响应操作。
public interface TxnLog {
/**
* roll the current
* log being appended to
* @throws IOException
*/
// 回滚日志
void rollLog() throws IOException;
/**
* Append a request to the transaction log
* @param hdr the transaction header
* @param r the transaction itself
* returns true iff something appended, otw false
* @throws IOException
*/
// 添加一个请求至事务性日志
boolean append(TxnHeader hdr, Record r) throws IOException;
/**
* Start reading the transaction logs
* from a given zxid
* @param zxid
* @return returns an iterator to read the
* next transaction in the logs.
* @throws IOException
*/
// 读取事务性日志
TxnIterator read(long zxid) throws IOException;
/**
* the last zxid of the logged transactions.
* @return the last zxid of the logged transactions.
* @throws IOException
*/
// 事务性操作的最新zxid
long getLastLoggedZxid() throws IOException;
/**
* truncate the log to get in sync with the
* leader.
* @param zxid the zxid to truncate at.
* @throws IOException
*/
// 清空日志,与Leader保持同步
boolean truncate(long zxid) throws IOException;
/**
* the dbid for this transaction log.
* @return the dbid for this transaction log.
* @throws IOException
*/
// 获取数据库的id
long getDbId() throws IOException;
/**
* commmit the trasaction and make sure
* they are persisted
* @throws IOException
*/
// 提交事务并进行确认
void commit() throws IOException;
/**
* close the transactions logs
*/
// 关闭事务性日志
void close() throws IOException;
/**
* an iterating interface for reading
* transaction logs.
*/
// 读取事务日志的迭代器接口
public interface TxnIterator {
/**
* return the transaction header.
* @return return the transaction header.
*/
// 获取事务头部
TxnHeader getHeader();
/**
* return the transaction record.
* @return return the transaction record.
*/
// 获取事务
Record getTxn();
/**
* go to the next transaction record.
* @throws IOException
*/
// 下个事务
boolean next() throws IOException;
/**
* close files and release the
* resources
* @throws IOException
*/
// 关闭文件释放资源
void close() throws IOException;
}
}
其中,TxnLog除了提供读写事务日志的API外,还提供了一个用于读取日志的迭代器接口TxnIterator。
四、FileTxnLog源码分析
对于LogFile而言,其格式可分为如下三部分
LogFile:
FileHeader TxnList ZeroPad
FileHeader格式如下
FileHeader: {
magic 4bytes (ZKLG)
version 4bytes
dbid 8bytes
}
TxnList格式如下
TxnList:
Txn || Txn TxnList
Txn格式如下
Txn:
checksum Txnlen TxnHeader Record 0x42
Txnlen格式如下
Txnlen:
len 4bytes
TxnHeader格式如下
TxnHeader: {
sessionid 8bytes
cxid 4bytes
zxid 8bytes
time 8bytes
type 4bytes
}
ZeroPad格式如下
ZeroPad:
0 padded to EOF (filled during preallocation stage)
了解LogFile的格式对于理解源码会有很大的帮助。
4.1 属性
public class FileTxnLog implements TxnLog {
private static final Logger LOG;
// 预分配大小 64M
static long preAllocSize = 65536 * 1024;
// 魔术数字,默认为1514884167
public final static int TXNLOG_MAGIC =
ByteBuffer.wrap("ZKLG".getBytes()).getInt();
// 版本号
public final static int VERSION = 2;
/** Maximum time we allow for elapsed fsync before WARNing */
// 进行同步时,发出warn之前所能等待的最长时间
private final static long fsyncWarningThresholdMS;
// 静态属性,确定Logger、预分配空间大小和最长时间
static {
LOG = LoggerFactory.getLogger(FileTxnLog.class);
String size = System.getProperty("zookeeper.preAllocSize");
if (size != null) {
try {
preAllocSize = Long.parseLong(size) * 1024;
} catch (NumberFormatException e) {
LOG.warn(size + " is not a valid value for preAllocSize");
}
}
fsyncWarningThresholdMS = Long.getLong("fsync.warningthresholdms", 1000);
}
// 最大(新)的zxid
long lastZxidSeen;
// 存储数据相关的流
volatile BufferedOutputStream logStream = null;
volatile OutputArchive oa;
volatile FileOutputStream fos = null;
// log目录文件
File logDir;
// 是否强制同步
private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");;
// 数据库id
long dbId;
// 流列表
private LinkedList<FileOutputStream> streamsToFlush = new LinkedList<FileOutputStream>();
// 当前大小
long currentSize;
// 写日志文件
File logFileWrite = null;
}
4.2. 核心函数
append函数
public synchronized boolean append(TxnHeader hdr, Record txn)throws IOException{
if (hdr != null) { // 事务头部不为空 if (hdr.getZxid() <= lastZxidSeen) { // 事务的zxid小于等于最后的zxid LOG.warn("Current zxid " + hdr.getZxid() + " is <= " + lastZxidSeen + " for " + hdr.getType()); } if (logStream==null) { // 日志流为空 if(LOG.isInfoEnabled()){ LOG.info("Creating new log file: log." + Long.toHexString(hdr.getZxid())); } // logFileWrite = new File(logDir, ("log." + Long.toHexString(hdr.getZxid()))); fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); // FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); // 序列化 fhdr.serialize(oa, "fileheader"); // Make sure that the magic number is written before padding. // 刷新到磁盘 logStream.flush(); // 当前通道的大小 currentSize = fos.getChannel().position(); // 添加fos streamsToFlush.add(fos); } // 填充文件 padFile(fos); // Serializes transaction header and transaction data into a byte buffer. // 将事务头和事务数据序列化成Byte Buffer byte[] buf = Util.marshallTxnEntry(hdr, txn); if (buf == null || buf.length == 0) { // 为空,抛出异常 throw new IOException("Faulty serialization for header " + "and txn"); } // 生成一个验证算法 Checksum crc = makeChecksumAlgorithm(); // Updates the current checksum with the specified array of bytes // 使用Byte数组来更新当前的Checksum crc.update(buf, 0, buf.length); // 写long类型数据 oa.writeLong(crc.getValue(), "txnEntryCRC"); // Write the serialized transaction record to the output archive. // 将序列化的事务记录写入OutputArchive Util.writeTxnBytes(oa, buf); return true; } return false;}
说明:append函数主要用做向事务日志中添加一个条目,其大体步骤如下
① 检查TxnHeader是否为空,若不为空,则进入②,否则,直接返回false
② 检查logStream是否为空(初始化为空),若不为空,则进入③,否则,进入⑤
③ 初始化写数据相关的流和FileHeader,并序列化FileHeader至指定文件,进入④
④ 强制刷新(保证数据存到磁盘),并获取当前写入数据的大小。进入⑤
⑤ 填充数据,填充0,进入⑥
⑥ 将事务头和事务序列化成ByteBuffer(使用Util.marshallTxnEntry函数),进入⑦
⑦ 使用Checksum算法更新步骤⑥的ByteBuffer。进入⑧
⑧ 将更新的ByteBuffer写入磁盘文件,返回true
append间接调用了padLog函数,其源码如下
public static long padLogFile(FileOutputStream f,long currentSize,long preAllocSize) throws IOException{// 获取位置
long position = f.getChannel().position();
if (position + 4096 >= currentSize) { // 计算后是否大于当前大小// 重新设置当前大小,剩余部分填充0 currentSize = currentSize + preAllocSize; fill.position(0); f.getChannel().write(fill, currentSize-fill.remaining());}
return currentSize;
}
说明:padLog其主要作用是当文件大小不满64MB时,向文件填充0以达到64MB大小。- getLogFiles函数
public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
// 按照zxid对文件进行排序
List files = Util.sortDataDir(logDirList, "log", true);
long logZxid = 0;
// Find the log file that starts before or at the same time as the
// zxid of the snapshot
for (File f : files) { // 遍历文件
}// 从文件中获取zxid long fzxid = Util.getZxidFromName(f.getName(), "log"); if (fzxid > snapshotZxid) { // 跳过大于snapshotZxid的文件 continue; } // the files // are sorted with zxid's if (fzxid > logZxid) { // 找出文件中最大的zxid(同时还需要小于等于snapshotZxid) logZxid = fzxid; }
// 文件列表
List v=new ArrayList(5);
for (File f : files) { // 再次遍历文件
}// 从文件中获取zxid long fzxid = Util.getZxidFromName(f.getName(), "log"); if (fzxid < logZxid) { // 跳过小于logZxid的文件 continue; } // 添加 v.add(f);
// 转化成File[] 类型后返回
return v.toArray(new File[0]);
}
说明:该函数的作用是找出刚刚小于或者等于snapshot的所有log文件。其步骤大致如下。
① 对所有log文件按照zxid进行升序排序,进入②
② 遍历所有log文件并记录刚刚小于或等于给定snapshotZxid的log文件的logZxid,进入③
③ 再次遍历log文件,添加zxid大于等于步骤②中的logZxid的所有log文件,进入④
④ 转化后返回
getLogFiles函数调用了sortDataDir,其源码如下
public static List sortDataDir(File[] files, String prefix, boolean ascending)
{
if(files==null)
return new ArrayList(0);
// 转化为列表
List filelist = Arrays.asList(files);
// 进行排序,Comparator是关键,根据zxid进行排序
Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));
return filelist;
}
说明:getLogFiles其用于排序log文件,可以选择根据zxid进行升序或降序。
getLogFiles函数间接调用了getZxidFromName,其源码如下:
// 从文件名中解析出zxid
public static long getZxidFromName(String name, String prefix) {
long zxid = -1;
// 对文件名进行分割
String nameParts[] = name.split("\.");
if (nameParts.length == 2 && nameParts[0].equals(prefix)) { // 前缀相同
try {
// 转化成长整形
zxid = Long.parseLong(nameParts[1], 16);
} catch (NumberFormatException e) {
}
}
return zxid;
}
说明:getZxidFromName主要用作从文件名中解析zxid,并且需要从指定的前缀开始。