Etcd源码分析: 存储

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 存储数据结构 Etcd存储在集群搭建和使用篇有简介,总结起来有如下特点: 采用kv型数据存储,一般情况下比关系型数据库快。 支持动态存储(内存)以及静态存储(磁盘)。 分布式存储,可集成为多节点集群。

存储数据结构

Etcd存储在集群搭建和使用篇有简介,总结起来有如下特点:

  • 采用kv型数据存储,一般情况下比关系型数据库快。
  • 支持动态存储(内存)以及静态存储(磁盘)。
  • 分布式存储,可集成为多节点集群。
  • 存储方式,采用类似目录结构。
    1、只有叶子节点才能真正存储数据,相当于文件。

2、叶子节点的父节点一定是目录,目录不能存储数据。

叶子节点数据结构位于 /store/store.go

type store struct {
    Root           *node
    WatcherHub     *watcherHub
    CurrentIndex   uint64
    Stats          *Stats
    CurrentVersion int
    ttlKeyHeap     *ttlKeyHeap  // need to recovery manually
    worldLock      sync.RWMutex // stop the world lock
    clock          clockwork.Clock
    readonlySet    types.Set
}

其父节点数据结构位于/store/node.go

type node struct {
    Path string

    CreatedIndex  uint64
    ModifiedIndex uint64

    Parent *node `json:"-"` // should not encode this field! avoid circular dependency.

    ExpireTime time.Time
    Value      string           // for key-value pair
    Children   map[string]*node // for directory

    // A reference to the store this node is attached to.
    store *store
}

其中Path即为key

WAL

内存中的数据格式

 +-------------------------------+
 | F |  Pad  |                   |
 +-----------+  Length           |
 |                               |
 +-------------------------------+
 |             type              |
 +-------------------------------+
 |              CRC              |
 +-------------------------------+
 |             Data              |
 +-------------------------------+
字段名称 含义 占用大小
文本1 文本2 文本3
F 是否存在补齐数据 0:表示没有补齐字段 1:表示存在补齐字段 1bit
Pad 表示补齐长度。在F为1时有效 7bit
Length 表示数据有效负载长度,不包括F、Pad自身长度、补齐字段。 56bit
Type 类型 int64,8字节,有符号
CRC 校验 uint32,4字节,无符号
Data 私有数据

WAL文件数据格式

当我们持久化到文件系统中,数据格式并不是上面介绍,而是grpc格式。

WAL文件以小端序方式存储

启动一个全新etcd,默认会在目录:/var/lib/etcd/default.etcd/member/wal/中生成一个.wal文件。

WAL的定义和创建

定义在wal.go中, WAL日志文件遵循一定的命名规则,由walName()实现,格式为"序号--raft日志索引.wal"。

// 根据seq和index产生wal文件名
func walName(seq, index uint64) string {
    return fmt.Sprintf("%016x-%016x.wal", seq, index)
}

WAL对外暴露的创建接口就是Create()函数

// Create creates a WAL ready for appending records. The given metadata is
// recorded at the head of each WAL file, and can be retrieved(检索) with ReadAll.
func Create(dirpath string, metadata []byte) (*WAL, error) {
    if Exist(dirpath) {
        return nil, os.ErrExist
    }

    // 先在.tmp临时文件上做修改,修改完之后可以直接执行rename,这样起到了原子修改文件的效果
    tmpdirpath := filepath.Clean(dirpath) + ".tmp"
    if fileutil.Exist(tmpdirpath) {
        if err := os.RemoveAll(tmpdirpath); err != nil {
            return nil, err
        }
    }
    if err := fileutil.CreateDirAll(tmpdirpath); err != nil {
        return nil, err
    }

    // dir/filename  ,filename从walName获取   seq-index.wal
    p := filepath.Join(tmpdirpath, walName(0, 0))

    // 对文件上互斥锁
    f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
    if err != nil {
        return nil, err
    }

    // 定位到文件末尾
    if _, err = f.Seek(0, io.SeekEnd); err != nil {
        return nil, err
    }

    // 预分配文件,大小为SegmentSizeBytes(64MB)
    if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
        return nil, err
    }

    // 新建WAL结构
    w := &WAL{
        dir:      dirpath,
        metadata: metadata,// metadata 可为nil
    }

    // 在这个wal文件上创建一个encoder
    w.encoder, err = newFileEncoder(f.File, 0)
    if err != nil {
        return nil, err
    }

    // 把这个上了互斥锁的文件加入到locks数组中
    w.locks = append(w.locks, f)
    if err = w.saveCrc(0); err != nil {
        return nil, err
    }

    // 将metadataType类型的record记录在wal的header处
    if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
        return nil, err
    }

    // 保存空的snapshot
    if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
        return nil, err
    }

    // 重命名,之前以.tmp结尾的文件,初始化完成之后重命名,类似原子操作
    if w, err = w.renameWal(tmpdirpath); err != nil {
        return nil, err
    }

    // directory was renamed; sync parent dir to persist rename
    pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
    if perr != nil {
        return nil, perr
    }

    // 将上述的所有文件操作进行同步
    if perr = fileutil.Fsync(pdir); perr != nil {
        return nil, perr
    }

    // 关闭目录
    if perr = pdir.Close(); err != nil {
        return nil, perr
    }

    return w, nil
}

其中, SaveSnapshot()是做walpb.Snapshot持久化的, 里面的内容略过, 不过里面有一行代码if err := w.encoder.encode(rec)表示一条Record需要先把序列化后才能持久化,这个是通过encode()函数完成的(encoder.go)

一个Record被序列化之后(这里为JOSN格式),会以一个Frame的格式持久化。Frame首先是一个长度字段(encodeFrameSize()完成,在encoder.go文件),64bit,其中MSB表示这个Frame是否有padding字节,接下来才是真正的序列化后的数据

WAL存储

当raft模块收到一个proposal时就会调用Save方法完成(定义在wal.go)持久化

func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
    w.mu.Lock()  // 上锁
    defer w.mu.Unlock()

    // short cut(捷径), do not call sync
    // IsEmptyHardState returns true if the given HardState is empty.
    if raft.IsEmptyHardState(st) && len(ents) == 0 {
        return nil
    }

    // 是否需要同步刷新磁盘
    mustSync := raft.MustSync(st, w.state, len(ents))

    // TODO(xiangli): no more reference operator
    // 保存所有日志项
    for i := range ents {
        if err := w.saveEntry(&ents[i]); err != nil {
            return err
        }
    }

    // 持久化HardState, HardState表示服务器当前状态,定义在raft.pb.go,主要包含Term、Vote、Commit
    if err := w.saveState(&st); err != nil {
        return err
    }

    // 获取最后一个LockedFile的大小(已经使用的)
    curOff, err := w.tail().Seek(0, io.SeekCurrent)
    if err != nil {
        return err
    }
    // 如果小于64MB
    if curOff < SegmentSizeBytes {
        if mustSync {
            // 如果需要sync,就执行sync
            return w.sync()
        }
        return nil
    }

    // 否则执行切割(也就是说明,WAL文件是可以超过64MB的)
    return w.cut()
}

snapshot

snapshot比wal大小要小5倍左右,只有CRCData两个字段

etcd中对raft snapshot的定义如下(在文件raft.pb.go):

type Snapshot struct {
    Data             []byte           `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
    Metadata         SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
    XXX_unrecognized []byte           `json:"-"`
}

Metadata则是snaoshot的元信息

// snapshot的元数据
type SnapshotMetadata struct {
      // 最后一次的配置状态
    ConfState        ConfState `protobuf:"bytes,1,opt,name=conf_state,json=confState" json:"conf_state"` 
       // 被快照取代的最后的条目在日志中的索引值(appliedIndex)
    Index            uint64    `protobuf:"varint,2,opt,name=index" json:"index"`
       // 该条目的任期号                     
    Term             uint64    `protobuf:"varint,3,opt,name=term" json:"term"`                           
    XXX_unrecognized []byte    `json:"-"`
}

snapshot持久化使用func (s *Snapshotter) SaveSnap(snapshot raftpb.Snapshot)

过程比较简单, 略去, 里面可以看到snapshot文件的命名规则

// 将raft snapshot序列化后持久化到磁盘
func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
    // 产生snapshot的时间
    start := time.Now()
    // snapshot的文件名Term-Index.snap
    fname := fmt.Sprintf("%016x-%016x%s", snapshot.Metadata.Term, snapshot.Metadata.Index, snapSuffix)

动态存储

+--------------------+          +--------------------+
| etcdserver/raft.go |          |   raft/storge.go   |
|                    +<-------->+                    |
|   startNode()      |          | NewMemoryStorge()  |
+---------^----------+          +--------------------+
          |
          |
          |
+---------v----------+          +--------------------+
|     rafe/node.go   |          |     rafe/node.go   |
|                    +<-------->+                    |
|   StartNode()      |          |     newRaft()      |
+---------^----------+          +--------------------+
          |
          |
          |
+---------v----------+
|     raft/node.go   |
|                    |
|        run()       |
+--------------------+

首先调用NewMemoryStorage进行初始化,然后在newRaft()中生成raftLog对象并且调用InitialState()进行状态初始化,最后在node中run方法接收数据。

相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
相关文章
|
运维 安全 Cloud Native
Apsara Stack 技术百科 | 混合云全景智能化观测平台Sunfire
在企业数字化转型的浪潮中,核心业务的上云和迁云无疑是转型过程的重中之重,企业对于数字安全性及等保合规层面的需求也日益强烈,混合云成为诸多大型政府企业客户上云迁云的首选方案。随着企业云上业务的复杂化,云上云下技术栈的多样化,以及云上运维组织规模的扩大化,云上业务的稳定性和连续性面临着巨大的挑战。
3226 0
Apsara Stack 技术百科 | 混合云全景智能化观测平台Sunfire
|
9月前
|
机器学习/深度学习 人工智能 边缘计算
24/7全时守护:AI视频监控技术的深度实现与应用分享
本文深入解析了AI视频监控系统在车间安全领域的技术实现与应用,涵盖多源数据接入、边缘计算、深度学习驱动的智能分析及高效预警机制,通过具体案例展示了系统的实时性、高精度和易部署特性,为工业安全管理提供了新路径。
2181 7
|
12月前
|
开发工具 git
GIT:如何合并已commit的信息并进行push操作
通过上述步骤,您可以有效地合并已提交的信息,并保持项目的提交历史整洁。记得在执行这些操作之前备份当前工作状态,以防万一。这样的做法不仅有助于项目维护,也能提升团队协作的效率。
519 3
|
存储 算法 开发工具
学习分享|Etcd/Raft 原理篇
本文是根据近期对 Etcd-Raft 的学习把自己的理解做个简单整理和分享。
|
存储 Kubernetes 算法
在K8S中,etcd 及其特点?
在K8S中,etcd 及其特点?
|
Kubernetes 监控 容器
etcd源码分析 - 1.【打通核心流程】etcd server的启动流程
在第一阶段,我将从主流程出发,讲述一个`PUT`指令是怎么将数据更新到`etcd server`中的。今天,我们先来看看server是怎么启动的。
284 0
|
存储
etcd raft 处理流程图系列1-raftexample
etcd raft 处理流程图系列1-raftexample
104 2
|
存储 弹性计算 资源调度
K8S下一代设备管理机制:DRA
背景Kubernetes从1.8开始引入了Device Plugin机制,用于第三方设备厂商以插件化的方式将设备资源(GPU、RDMA、FPGA、InfiniBand等)接入Kubernetes集群中。用户无需修改Kubernetes代码,只需在集群中以DaemonSet方式部署设备厂商提供的插件,然后在Pod中申明使用该资源的使用量,容器在启动成功后,便可在容器中发现该设备。然而,随着Kuber
3578 2
K8S下一代设备管理机制:DRA
|
Kubernetes 测试技术 API
快速体验NVIDIA在k8s中下一代设备管理插件
背景Kubernetes 1.26开始引入DRA( Dynamic Resource Allocation,动态资源分配),用于解决Kubernetes现有Device Plugin机制的不足。相比于现有的Device Plugin机制,DRA更加开放和自主,能够满足一些复杂的使用场景。NVIDIA、Intel这些设备厂商也基于DRA开放自己下一代Device Plugin,以期满足更复杂的业务场
1602 0