etcd源码分析 - 2.【打通核心流程】PUT键值对匹配处理函数

简介: 在阅读了etcd server的启动流程后,我们对很多关键性函数的入口都有了初步印象。那么,接下来我们一起看看对键值对的修改,在etcd server内部是怎么流转的。

在阅读了etcd server的启动流程后,我们对很多关键性函数的入口都有了初步印象。

那么,接下来我们一起看看对键值对的修改,在etcd server内部是怎么流转的。

PUT键值对的HTTP请求

etcdctl这个指令,我们可以快速地用命令etcdctl put key value发送PUT键值对的请求。

etcdctl对请求做了封装,我们要了解原始的HTTP请求格式,才能方便地阅读相关代码。相关的途径有很多,比如抓包、读源码等,这里为了可阅读性,我给出一个curl请求。

curl -L http://localhost:2379/v3/kv/put -X POST -d '{"key":"mykey","value":"myvalue"}'

主要关注如下三点:

  1. Method - POST
  2. URL -/v3/kv/put
  3. Body - {"key":"mykey","value":"myvalue"}

这个请求是v3版本的,而v2版本的差异比较大,暂不细谈。

Mux的路由匹配

背景知识介绍

为了更好地介绍下面的内容,我先介绍mux下的2个概念。

  • pattern指的是一种URL的匹配模式,最常见的如全量匹配、前缀匹配、正则匹配。当一个请求进来时,它会有自己的一个URL,去匹配mux中预先定义的多个pattern,找到一个最合适的。这是一种URL路由规则的实现
  • 当请求匹配到一个pattern后,就会执行它预定义的handler,也就是一个处理函数,返回结果。

所以, pattern负责匹配,而handler负责执行。在不同语境下,它们的专业术语有所差异,大家自行对应即可。

http mux的创建

我们要找HTTP1.X的路由匹配逻辑,就回到了上一节最后看到的代码中:

// 创建路由匹配规则
httpmux := sctx.createMux(gwmux, handler)

// 新建http server对象
srvhttp := &http.Server{
   
   
  Handler:  createAccessController(sctx.lg, s, httpmux),
  ErrorLog: logger, // do not log user error
}
// 这个cumx.HTTP1是检查协议是否满足HTTP1
httpl := m.Match(cmux.HTTP1())
// 运行server
go func() {
   
    errHandler(srvhttp.Serve(httpl)) }()

(*serveCtx)createMux

本函数不长,但很容易让读源码的同学陷入误区,我们一起来看看。这块代码主要分为三段:

func (sctx *serveCtx) createMux(gwmux *gw.ServeMux, handler http.Handler) *http.ServeMux {
   
   
    httpmux := http.NewServeMux()
  // 1.注册handler
    for path, h := range sctx.userHandlers {
   
   
        httpmux.Handle(path, h)
    }

  // 2.注册grpcGateway mux中的handler到/v3/路径下
    if gwmux != nil {
   
   
        httpmux.Handle(
            "/v3/",
            wsproxy.WebsocketProxy(
                gwmux,
                wsproxy.WithRequestMutator(
                    // Default to the POST method for streams
                    func(_ *http.Request, outgoing *http.Request) *http.Request {
   
   
                        outgoing.Method = "POST"
                        return outgoing
                    },
                ),
                wsproxy.WithMaxRespBodyBufferSize(0x7fffffff),
            ),
        )
    }
  // 3.注册根路径下的handler
    if handler != nil {
   
   
        httpmux.Handle("/", handler)
    }
    return httpmux
}

第一点,可以通过简单的代码阅读,看到是对pprofdebug这些通用功能的URL功能注册,也是一些用户自定义的handler注册,这就很好地对应到sctx.userHandlers这个变量的命名了。

第三点很快就能被排除,它注册的是对根路径下的handler。我们阅读代码,找到handler最原始的生成处,就能看到它是对version、metrcis这类handler的注册。

所以,我们的重点就放在了gwmux这个对象上。阅读它的创建过程,就得跳转到上层函数。

(*serveCtx)registerGateway

在函数中,我们可以看到它注册了一个类型为registerHandlerFunc的handlers列表,包括如下内容:

handlers := []registerHandlerFunc{
   
   
        etcdservergw.RegisterKVHandler, // KV键值对的处理
        etcdservergw.RegisterWatchHandler, // Watch监听
        etcdservergw.RegisterLeaseHandler, // Lease租约
        etcdservergw.RegisterClusterHandler, // 集群
        etcdservergw.RegisterMaintenanceHandler, // 维护相关
        etcdservergw.RegisterAuthHandler, // 认证
        v3lockgw.RegisterLockHandler, // 锁
        v3electiongw.RegisterElectionHandler, // 选举
}
for _, h := range handlers {
   
   
  if err := h(ctx, gwmux, conn); err != nil {
   
   
    return nil, err
  }
}

我们聚焦到PUT请求的处理,它自然走的是etcdservergw.RegisterKVHandler这个入口。

RegisterKVHandler

本函数位于etcd/etcdserver/etcdserverpb/gw/rpc.pb.gw.go。它其实是用protobuf自动生成的,其中用到了grpc-gateway这个关键性技术,它的作用是将HTTP1的请求转换成gRPC,实现一个server可以同时支持HTTP1与gRPC,并且只写一份gRPC处理的代码即可。

有兴趣地可以去看看 https://github.com/grpc-ecosystem/grpc-gateway 项目。

大致调用链路为: HTTP1 -> gRPC -> 自己实现的handler

RegisterKVHandlerClient

该函数是由proto文件生成的,这里我忽略了关于context的处理,提取关键性的内容:

mux.Handle("POST", pattern_KV_Put_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
   
   
  // 反序列化请求和序列化响应
  inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
  // 执行PUT请求
  resp, md, err := request_KV_Put_0(rctx, inboundMarshaler, client, req, pathParams)
  // 返回结果
  forward_KV_Put_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})

序列化与反序列化存在多种选择,我们暂不深入,先来看看处理这部分的工作:

首先是如何匹配请求,也就是http://localhost:2379/v3/kv/put,对应如下:

var pattern_KV_Put_0 = runtime.MustPattern(runtime.NewPattern(1, []int{
   
   2, 0, 2, 1, 2, 2}, []string{
   
   "v3", "kv", "put"}, ""))

而最核心的处理,也就是解析PUT请求的函数request_KV_Put_0与返回处理结果的函数forward_KV_Put_0,我们放到下一讲再来看。

小结

今天我们看了PUT请求在etcd server中通过mux的匹配逻辑,思路参考下图。

在阅读代码期间,我们接触到了grpc-gateway这个技术方案,有兴趣的朋友可以参考我的另一篇文章

目录
相关文章
|
3月前
|
监控 前端开发 Java
分布式链路监控系统问题之执行原方法时不能调用method.invoke的问题如何解决
分布式链路监控系统问题之执行原方法时不能调用method.invoke的问题如何解决
|
XML JSON Go
etcd源码分析 - 3.【打通核心流程】PUT键值对的执行链路
在上一讲,我们一起看了etcd server是怎么匹配到对应的处理函数的,如果忘记了请回顾一下。 今天,我们再进一步,看看`PUT`操作接下来是怎么执行的。
86 0
|
IDE Go 开发工具
etcd源码分析 - 5.【打通核心流程】EtcdServer消息的处理函数
在上一讲,我们梳理了`EtcdServer`的关键函数`processInternalRaftRequestOnce`里的四个细节。 其中,`wait.Wait`组件使用里,我们还遗留了一个细节实现,也就是请求的处理结果是怎么通过channel返回的。
113 0
etcd源码分析 - 5.【打通核心流程】EtcdServer消息的处理函数
|
缓存 运维 NoSQL
分布式ID生成方法的超详细分析(全)
目录前言1. UUID2. 数据库自增3. 数据库集群4. 数据库号段5. redis模式6. 雪花算法7. 其他总结 前言 关于什么是分布式ID 数据量不是很多的时候,单一个数据库表可以支撑其业务,即使数据在大也可以主从复制 到一定量的数据时,实现分库分表的时候,就需要一个全局唯一的ID,订单的编号就是分布式ID 关于上面牵扯到的主从复制 可看我之前的文章进行查缺补漏 关于主从复制的超详细解析(全) 关于数据库的分布式ID可看我之前在Mycat种提及到 具体都有如下: 在实现分库分表的情况下,数据库自增主
318 0
分布式ID生成方法的超详细分析(全)
|
1月前
|
Python
字典是怎么创建的,支持的操作又是如何实现的?
字典是怎么创建的,支持的操作又是如何实现的?
49 8
|
1月前
|
索引 Python
列表作为序列型对象都支持哪些操作,它们在底层是怎么实现的?
列表作为序列型对象都支持哪些操作,它们在底层是怎么实现的?
44 4
|
3月前
|
JSON 前端开发 API
构建前端防腐策略问题之更新getMemoryUsagePercent函数以适应新的API返回格式的问题如何解决
构建前端防腐策略问题之更新getMemoryUsagePercent函数以适应新的API返回格式的问题如何解决
构建前端防腐策略问题之更新getMemoryUsagePercent函数以适应新的API返回格式的问题如何解决
|
2月前
|
存储 前端开发 JavaScript
node中循环异步的问题[‘解决方案‘]_源于map循环和for循环对异步事件配合async、await的支持
本文探讨了在Node.js中处理循环异步操作的问题,比较了使用map和for循环结合async/await处理异步事件的差异,并提供了解决方案。
35 0
|
6月前
|
存储 Java API
Flink中的状态管理是什么?请解释其作用和常用方法。
Flink中的状态管理是什么?请解释其作用和常用方法。
79 0
|
存储 算法 NoSQL
​浅谈分布式唯一Id生成器之最佳实践
​浅谈分布式唯一Id生成器之最佳实践
314 1