Etcd源码分析: put流程

简介: put操作 put操作是etcd v3 client支持的命令,和v2的set用法差不多 但是需要注意的是,如果你在一个3节点的etcd集群中,A节点切换为v3 client版本,然后put进了一对key-value,在B节点,还是v2的client,这个时候你get不到数据的,如果在B节点切换到.

put操作

put操作是etcd v3 client支持的命令,和v2的set用法差不多

但是需要注意的是,如果你在一个3节点的etcd集群中,A节点切换为v3 client版本,然后put进了一对key-value,在B节点,还是v2的client,这个时候你get不到数据的,如果在B节点切换到了v3 client,这个时候才可以get到数据

简单的说,v2和v3 client,插入数据到同一个etcd集群中,数据不能互通

put流程分析

client端

put命令接受的入口,在/etcdctl/ctlv3/command/put_command.go中的NewPutCommand()函数中,采用了一个corba结构体接受命令参数,实际Run执行的命令是putCommandFunc()

func putCommandFunc(cmd *cobra.Command, args []string) {
    key, value, opts := getPutOp(cmd, args)  //解析参数

    ctx, cancel := commandCtx(cmd)
    resp, err := mustClientFromCmd(cmd).Put(ctx, key, value, opts...)
    cancel()
    if err != nil {
        ExitWithError(ExitError, err)
    }
    display.Put(*resp) //打印返回的结果
}

该函数的核心就是mustClientFromCmd(cmd).Put(ctx, key, value, opts...)

mustClientFromCmd()函数返回的是一个clientv3.Client结构体指针

相当于上面调用了clientv3.Client.Put()

/clientV3/client.go 中,可以看到Client 结构体的定义,

type Client struct {
    Cluster
    KV
    Lease

里面内嵌一个KV的interface,由下面的代码具体实现接口

func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
    r, err := kv.Do(ctx, OpPut(key, val, opts...))
    return r.put, toErr(ctx, err)
}

继续往下,是到了func (c *kVClient) Put(......),该函数里面,通过grpc的调用发到了server端 grpc.Invoke(ctx, "/etcdserverpb.KV/Put", in, out, c.cc, opts...)

server端

/etcdserver/etcdserverpb/rpc.pb.go里面,可以看到上面定义的ServiceName和MethodName,可以找到对应的方法_KV_Put_Handler

var _KV_serviceDesc = grpc.ServiceDesc{
    ServiceName: "etcdserverpb.KV",
    HandlerType: (*KVServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "Range",
            Handler:    _KV_Range_Handler,
        },
        {
            MethodName: "Put",
            Handler:    _KV_Put_Handler,
        },

往下追踪 srv.(KVServer).Put(ctx, in) -> (s *EtcdServer) Put() -> ··· ··· -> (s *EtcdServer) processInternalRaftRequestOnce(...)

在该函数里面有一句关键调用 s.r.Propose(cctx, data)

sEtcdServer, r是其里面的成员变量raftNode, 这就是进入raft协议相关的节奏了

(n *node) Propose() -> step(), 该函数代码较短,来看看

func (n *node) step(ctx context.Context, m pb.Message) error {
    ch := n.recvc
    if m.Type == pb.MsgProp {
        ch = n.propc
    }

    select {
    case ch <- m:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    case <-n.done:
        return ErrStopped
    }
}

这段代码主要就是根据消息类型来把传进来的pb.Message赋值给channel n.recvc或者n.propc,上面的Propose()定义了pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}),所以就是赋值给了n.propc

raft协议里的流程

在一个raft集群启动完成以后, (n *node) run() 函数就是其运行的主函数, 里面是一个死循环, 循环中会根据channel来响应各种事件, 从而跳转状态

上节有说到channel propc里面被塞入了数据, run() 函数里面就会有对应的处理,代码如下:

        case m := <-propc:
            m.From = r.id
            r.Step(m)

对应的(r *raft) Step(m pb.Message)函数也是raft协议中的核心函数, 负责状态机的跳转, 里面主要有2个逻辑

  • 根据传进来的term和本身的term的大小, 决定要做的动作, 具体的逻辑原理可见raft协议原理;
  • 根据消息类型m.Type做不同的处理

注意其最后一段

    default:
        r.step(r, m)

step()是一个类似函数借口的东西,根据节点的类型不同而调用不同的函数,比如leader节点该函数就是raft.stepLeader()

在node run的死循环中,看看开头

             // readyc 和 advance 只有一个是有效值
        if advancec != nil {
            readyc = nil
        } else {
            rd = newReady(r, prevSoftSt, prevHardSt)
                   
                   // 如果raft.msgs中队列大小不为0 也会返回true 表示有数据发出
            if rd.containsUpdates() {
                readyc = n.readyc
            } else {
                readyc = nil
            }
        }

如果advancec是nil,说明刚commit了,可以创建ready channel来继续去把ready commit的commit了。

(未完待续)

相关文章
|
XML JSON Go
etcd源码分析 - 3.【打通核心流程】PUT键值对的执行链路
在上一讲,我们一起看了etcd server是怎么匹配到对应的处理函数的,如果忘记了请回顾一下。 今天,我们再进一步,看看`PUT`操作接下来是怎么执行的。
81 0
etcd源码分析 - 2.【打通核心流程】PUT键值对匹配处理函数
在阅读了etcd server的启动流程后,我们对很多关键性函数的入口都有了初步印象。 那么,接下来我们一起看看对键值对的修改,在etcd server内部是怎么流转的。
76 0
etcd源码分析 - 2.【打通核心流程】PUT键值对匹配处理函数
|
IDE Go 开发工具
etcd源码分析 - 5.【打通核心流程】EtcdServer消息的处理函数
在上一讲,我们梳理了`EtcdServer`的关键函数`processInternalRaftRequestOnce`里的四个细节。 其中,`wait.Wait`组件使用里,我们还遗留了一个细节实现,也就是请求的处理结果是怎么通过channel返回的。
109 0
etcd源码分析 - 5.【打通核心流程】EtcdServer消息的处理函数
broker ID是干什么的?底层原理是什么?
broker ID是干什么的?底层原理是什么?
249 0
|
5月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
96 0
|
5月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(2)
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
68 0
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(2)
|
17天前
|
Java
Optional源码分析(涉及Objects源码和Stream源码)
本文分析了Java中Optional类的源码,包括其内部的Objects.requireNonNull方法、EMPTY定义、构造方法、ofNullable方法、isEmpty方法以及如何与Stream类交互,展示了Optional类如何避免空指针异常并提供流式操作。
30 0
Optional源码分析(涉及Objects源码和Stream源码)
|
消息中间件 安全 Go
动态订阅时 rocketmq-client-go 代码有map并发bug
动态订阅时 rocketmq-client-go 代码有map并发bug
62 2
|
5月前
|
存储 缓存 Java
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(1)
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】
49 0
【Zookeeper】Apach Curator 框架源码分析:后台构造器和节点操作相关源码分析(二)【Ver 4.3.0】(1)
|
5月前
|
索引
HashMap的put方法的具体流程
HashMap的put方法的具体流程