简介
微服务体系
微服务的水挺深的,准确的说,不仅深还特别广。微服务涉及的内容特别多,而且每一块都可以深入研究,成为这方面的专家。
在《微服务设计》这本书里,给微服务下的定义为:微服务就是一些协同工作的小而自治的服务。
这个定义不是特别好,总感觉是把微服务的范围缩小了。
另外阅历不同对这句话的理解上差距还是蛮大的。记得以前我有一个评论系统,评论服务、评论后台、DB、缓存等都是独立部署的,我当时觉得这个评论系统就是微服务。这么说不能算百分之百的错,但肯定也不是正确的。
因为微服务阐述的是一整套体系,单单一个独立的服务,只占微服务很小的一部分。
微服务主要由6部分构成
- 服务描述
类似服务的说明文档,简单但不可或缺。比如,服务调用首先要解决的问题就是服务如何对外描述。比如,你对外提供了一个服务,那么这个服务的服务名叫什么?调用这个服务需要提供哪些信息?调用这个服务返回的结果是什么格式的?该如何解析?这些就是服务描述要解决的问题。
- 注册中心
有了服务的接口描述,下一步要解决的问题就是服务的发布和订阅,就是说你提供了一个服务(Provider),如何让外部(Consumer)想调用你的服务的人知道。这个时候就需要一个类似注册中心(Registry)的角色,服务提供者将自己提供的服务以及地址登记到注册中心,服务消费者则从注册中心查询所需要调用的服务的地址,然后发起请求。
- 服务框架
通过注册中心,服务消费者就可以获取到服务提供者的地址,有了地址后就可以发起调用。但在发起调用之前你还需要解决以下几个问题。服务通信采用什么协议?是RESTful API还是gRPC?数据传输采用什么方式数据压缩采用什么格式?这些活通常集成到了我们的服务框架里面,市面上有很多这样的开源框架,相对都比较成熟,接下来考验你的是快速上手的能力。
- 服务监控
一旦服务消费者与服务提供者之间能够正常发起服务调用,你就需要对调用情况进行监控,以了解服务是否正常。通常来讲,服务监控主要包括三个流程,指标收集,数据处理,数据展示。监控是为了发现问题和异常,如果要进一步跟踪和定位问题,则需要进一步了解服务追踪。
- 服务追踪
除了需要对服务调用情况进行监控之外,你还需要记录服务调用经过的每一层链路,以便进行问题追踪和故障定位,最后达到接近问题的目的。服务监控和追踪可以合并起来,但是要明确各自的职责是不一样的。
- 服务治理
服务监控能够发现问题,服务追踪能够定位问题所在,而解决问题就得靠服务治理了。服务治理就是通过一系列的手段来保证在各种意外情况下,服务调用仍然能够正常进行。就目前开源的服务框架,大部分都不包括服务治理的内容,所以有可能这块是需要你和你的团队进行定制化开发,就看你做到什么程度了,就好比你有数据库但是你没有ER图描述,并不影响你用微服务,当然如果有就是锦上添花的东西了。
这6部分组合起来才称之为微服务。下面的链接是我做的一个思维导图,导图里面的有些内容我还没有完全学会,后期会做进一步的整理,如果大家喜欢的话,可以先记一下这个链接。
https://www.processon.com/view/link/5f3952a17d9c0806d41a90a9
微服务体系搭建
创建微服务
创建一个微服务,需要考虑服务的如下三个方面:
通信框架。它主要解决客户端和服务端如何建立连接、管理连接以及服务端如何处理请求的问题。采用同步还是异步,是在单连接上传输,还是多路复用?
通信协议。它主要解决客户端和服务端采用哪种数据传输协议的问题。采用四层 TCP、UDP 协议,还是采用七层 HTTP 协议,还是采用其他协议?
序列化和反序列化。它主要解决客户端和服务端采用哪种数据编解码的问题。 采用JSON 序列化、Java 对象序列化还是Protobuf 序列化等?
推荐使用开源框架搭建微服务,如果自己写,会重复造轮子,而且性能与安全很难保证。与语言无关且比较热门的开源框架有google的grpc和twitter的thrift。本文使用gRPC作为样例进行讲解。
gRPC简介
gRPC有如下几个优点:
- 通信协议采用了 HTTP/2,因为 HTTP/2 提供了连接复用、双向流、服务器推送、请求优先级、首部压缩等机制,所以在通信过程中可以节省带宽、降低 TCP 连接次数、节省 CPU,尤其对于移动端应用来说,可以帮助延长电池寿命。
- IDL 使用了ProtoBuf,ProtoBuf 是由 Google 开发的一种数据序列化协议,它的压缩和传输效率极高,语法也简单,所以被广泛应用在数据存储和通信协议上。
- 多语言支持,能够基于多种语言自动生成对应语言的客户端和服务端的代码。
选用gRPC顺便完成了服务描述、服务发布和引用
搭建gRPC服务端和客户端
定义服务
helloworld.proto
syntax = "proto3";
option go_package = "google.golang.org/grpc/examples/helloworld/helloworld";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
生成 gRPC 代码
为了生成客户端和服务端接口,运行 protocol buffer 编译器:
protoc -I . helloworld.proto --go_out=plugins=grpc:helloworld
这生成了 helloworld.pb.go
,包含了我们生成的客户端和服务端类,此外还有用于填充、序列化、提取 HelloRequest
和 HelloResponse
消息类型的类。
写一个服务器
- 使用gomod
- protoc使用正确版本,proto-gen-go也需要用正确版本。如果报helloworld/helloworld.pb.go:103:4: cannot use _Greeter_SayHello_Handler (type func(interface {}, "context".Context, func(interface {}) error) (interface {}, error)) as type grpc.methodHandler in field value
protoc位置 /usr/local/Cellar/protobuf/3.11.4_1/bin/protoc
protoc-gen-go位置 /Users/pangzhiqiang/data/code/golang/myproject/bin/protoc-gen-go ,使用1.3.1版本
google.golang.org/grpc v1.26.0 grpc需要使用1.26版本
版本出问题后,需要不断从包里查找
// Package main implements a server for Greeter service. package main import ( "context" "log" "net" "google.golang.org/grpc" pb "grpcservice/helloworld" ) const ( port = ":50051" ) // server is used to implement helloworld.GreeterServer. type server struct { pb.UnimplementedGreeterServer } // SayHello implements helloworld.GreeterServer func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { log.Printf("Received: %v", in.GetName()) return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil } func main() { lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
- 代码写入到项目grpcservice中
写一个客户端
- 将生成的go文件拷贝到asap项目,controller中创建grpcclient
package grpcclient import ( "github.com/gin-gonic/gin" "google.golang.org/grpc" "net/http" "os" "time" pb "asap/lib/helloworld" "log" "context" ) const ( address = "localhost:50051" defaultName = "world" ) func Hello(contextGin *gin.Context) { // Set up a connection to the server. conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewGreeterClient(conn) // Contact the server and print out its response. name := defaultName if len(os.Args) > 1 { name = os.Args[1] } ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name}) if err != nil { log.Fatalf("could not greet: %v", err) } log.Printf("Greeting: %s", r.GetMessage()) contextGin.String(http.StatusOK, r.GetMessage()) }
调用RPC
注册中心
上一节搭建出了微服务的客户端和服务端,现在有一个至关重要的问题要解决:
服务端的IP是哪些?
为什么这个问题至关重要,因为上一节中之所以客户端能够调用服务端,是因为我知道服务端的ip,在代码中写死了该地址。但是微服务需要满足能够随时上线、随时下线、随时扩容,如果没有方案能够解决这个问题,会导致微服务无法达到理想的状态。解决这个问题的方案是注册中心。
注册中心原理
在微服务架构下,主要有三种角色:服务提供者(RPC Server)、服务消费者(RPC Client)和服务注册中心(Registry),三者的交互关系请看下面这张图,我来简单解释一 下。
RPC Server 提供服务,在启动时,根据服务发布文件 server.xml 中的配置的信息,向 Registry 注册自身服务,并向 Registry 定期发送心跳汇报存活状态。
RPC Client 调用服务,在启动时,根据服务引用文件 client.xml 中配置的信息,向 Registry 订阅服务,把 Registry 返回的服务节点列表缓存在本地内存中,并与 RPC Sever 建立连接。
当 RPC Server 节点发生变更时,Registry 会同步变更,RPC Client 感知后会刷新本地 内存中缓存的服务节点列表。
RPC Client 从本地缓存的服务节点列表中,基于负载均衡算法选择一台 RPC Sever 发起 调用。
根据注册中心原理的描述,注册中心必须提供以下最基本的 API,例如:
服务注册接口:服务提供者通过调用服务注册接口来完成服务注册。- 一般使用租约或者TTL,PUT设置值
服务反注册接口:服务提供者通过调用服务反注册接口来完成服务注销。 - 一般是delete
心跳汇报接口:服务提供者通过调用心跳汇报接口完成节点存活状态上报。 - 一般使用租约来续租
服务订阅接口:服务消费者通过调用服务订阅接口完成服务订阅,获取可用的服务提供者节点列表。 - 一般是watch接口
服务变更查询接口:服务消费者通过调用服务变更查询接口,获取最新的可用服务节点列表。- 一般是get
除此之外,为了便于管理,注册中心还必须提供一些后台管理的 API,例如:
服务查询接口:查询注册中心当前注册了哪些服务信息。
服务修改接口:修改注册中心中某一服务的信息。
ETCD简介
搭建注册中心的方法有很多,如ETCD、Zookeeper、Consul等,因为对ETCD相对熟悉一些,所以本文选择使用ETCD来构建注册中心。
安装
如果开发联系使用,可以使用单机ETCD,如果生产环境使用,部署ETCD的机器至少需要3台。本文章只做练习使用,所以我们简单一点,部署单台ETCD
- https://github.com/etcd-io/etcd/releases 下载对应文件
- 进到文件,执行./etcd开启etcd服务,只不过不是集群的
使用
export ETCDCTL_AP=3 设置etcd api版本为3,执行etcdctl与etcd交互
- ./etcdctl put hello etcdv3
- ./etcdctl get hello
- ./etcdctl watch hello
ETCD实现注册中心
本文只做简单演示,所以代码只以简单实现功能为主,可能有部分bug。另外ETCD实现注册中心其实还有大量配套功能,如各种后台、监控等,本文也不做介绍。
服务端SDK
服务端主要做三件事情
- 服务启动的时候使用EtcdPut,将地址注册到etcd
- 按时发送心跳给ETCD,告诉注册中心服务仍然存活
- 服务下线的时候使用EtcdDelete,将地址删除
package lib
import (
"github.com/coreos/etcd/clientv3"
"time"
"fmt"
"context"
)
const (
GROUP = "b2c"
TEAM = "i18n"
)
var (
config clientv3.Config
err error
client *clientv3.Client
kv clientv3.KV
putResp *clientv3.PutResponse
)
func init(){
//配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: time.Second * 5,
}
//连接 创建一个客户端
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
}
func EtcdPut(port string) {
if client == nil {
return
}
//获取ip
ip, err := ExternalIP()
if err != nil {
fmt.Println(err)
return
}
address := ip.String() + port
fmt.Println(address)
//租约
//创建租约
lease := clientv3.NewLease(client)
var leaseId clientv3.LeaseID
//设置10秒租约(过期时间为10秒)
if leaseRes,err := lease.Grant(context.TODO(),5);err != nil {
fmt.Println(err)
return
} else {
//得到租约id
leaseId = leaseRes.ID
}
lease.KeepAlive(context.TODO(), leaseId)
//用于读写etcd的键值对
kv = clientv3.NewKV(client)
ticker := time.NewTicker(time.Second * 3)
go func() {
for range ticker.C {
putResp, err = kv.Put(context.TODO(), "/"+GROUP+ "/" + TEAM + "/" + address, address, clientv3.WithLease(leaseId))
if err != nil {
fmt.Println(err)
} else {
//获取版本信息
fmt.Println("Revision:", putResp.Header.Revision)
if putResp.PrevKv != nil {
fmt.Println("key:", string(putResp.PrevKv.Key))
fmt.Println("Value:", string(putResp.PrevKv.Value))
fmt.Println("Version:", string(putResp.PrevKv.Version))
}
}
}
}()
}
func EtcdDelete(port string){
fmt.Println("etcddelete")
if client == nil {
return
}
//获取ip
ip, err := ExternalIP()
if err != nil {
fmt.Println(err)
return
}
address := ip.String() + port
fmt.Println(address)
//用于读写etcd的键值对
kv = clientv3.NewKV(client)
delResp,err := kv.Delete(context.TODO(),"/"+GROUP+ "/" + TEAM + "/" + address,clientv3.WithPrevKV())
if err != nil{
fmt.Println(err)
return
}else{
if len(delResp.PrevKvs) > 0 {
for idx,kvpair := range delResp.PrevKvs{
idx = idx
fmt.Println("删除了",string(kvpair.Key),string(kvpair.Value))
}
}
}
}
客户端SDK
客户端SDK主要做两件事情
- 从ETCD获取服务端ip地址
- 从ETCD订阅该服务内容,如果服务端有变更,能够获取到变更
package global
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"strings"
"time"
"fmt"
"context"
)
var (
config clientv3.Config
err error
client *clientv3.Client
kv clientv3.KV
getResp *clientv3.GetResponse
)
var (
//
globalService map[string](map[string]string)
)
func init() {
globalService = make(map[string](map[string]string))
//配置
config = clientv3.Config{
Endpoints:[]string{"127.0.0.1:2379"},
DialTimeout:time.Second*5,
}
//连接 床见一个客户端
if client,err = clientv3.New(config);err != nil{
fmt.Println(err)
return
}
}
func SetService(serviceName string, address string) {
if _, ok := globalService[serviceName];!ok {
globalService[serviceName] = make(map[string]string)
}
globalService[serviceName][address] = address
}
func DelService(serviceName string, address string) bool{
if _,ok:= globalService[serviceName];ok{
if _,ok2 := globalService[serviceName][address];ok2{
delete(globalService[serviceName],address)
return true
}
}
return false
}
func GetService(serviceName string) (map[string]string) {
return globalService[serviceName]
}
func GetServiceArr() map[string](map[string]string) {
return globalService
}
func GetServiceFromEtcd(serviceName string){
if client == nil{
return
}
//用于读写etcd的键值对
kv = clientv3.NewKV(client)
getResp,err = kv.Get(context.TODO(),serviceName,clientv3.WithPrefix())
if err != nil {
fmt.Println(err)
return
}
for _, v := range getResp.Kvs{
fmt.Println(string(v.Value))
SetService(serviceName,string(v.Value))
}
fmt.Println(GetServiceArr())
fmt.Println(getResp.Kvs)
}
func WatchServiceFromEtcd(serviceName string){
if client == nil{
return
}
/*ticker := time.NewTicker(time.Second * 20)
go func() {
for range ticker.C {
}
}*/
// 创建一个watcher
watcher := clientv3.NewWatcher(client)
ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(500000 * time.Second, func() {
cancelFunc()
})
watchRespChan := watcher.Watch(ctx, serviceName, clientv3.WithPrefix())
// 处理kv变化事件
for watchResp := range watchRespChan {
for _, event := range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
SetService(serviceName,string(event.Kv.Value))
fmt.Println("now service ip", GetService(serviceName))
case mvccpb.DELETE:
fmt.Println("删除了" + strings.TrimPrefix(string(event.Kv.Key),serviceName), "Revision:", event.Kv.ModRevision)
DelService(serviceName,strings.TrimPrefix(string(event.Kv.Key),serviceName))
fmt.Println("now service ip", GetService(serviceName))
}
}
}
}
演示
- 启动调用端服务,因为被调用服务没有启动,所以从注册中心获取不到被调用服务信息
- 启动一个被调用服务,端口号50051,该服务的ip和端口号会注册到ETCD,调用端通过watch也能监听到增加的服务
GRPC服务端
ETCD显示
客户端显示
- 通过GRPC客户端调用服务,可以发现请求会请求到50051端口的服务上
- 启动另一个被调用服务,端口号为50052,该服务也会被注册到ETCD上,调用者通过watch能发现两个服务
- 调用服务,可以发现,请求会分别发送到端口50051和50052上
- 将50051服务下线,服务会调用delete从ETCD上取消注册,调用者也不会再调用该服务
ETCD上的显示
- 将50052服务下线
ETCD显示
调用方会发现没有任何可用服务了
再请求就报错了
上面的演示就是ETCD作为注册中心,是怎样实现自己的服务发现的。当然注册中心还有很多其他的辅助接口,这里就不具体展示了,上面阐述的接口已经是比较核心的接口了。
实战
公司用ETCD作为注册中心,最近研究了一下客户端和服务端的包源码,代码无法直接提供,不过做了一份脑图,https://www.processon.com/view/link/5f6ed492f346fb166d0d3e24,大家感兴趣可以看一下
代码的主逻辑和一般的ETCD作为注册中心的逻辑是一致的,和我前面给出的图一致,不过有几点需要说明一下
服务端注册方案为:定时执行register,register实际为调用etcd的put函数-c.put(key, value, ttl, nil),通过过期时间,也实现了心跳功能
func (p *XEtcdRegister) run() { timer := time.NewTicker(time.Duration(p.heartBeat-1) * time.Second) defer timer.Stop() exit := false for { if exit { break } select { case <-timer.C: p.register() case <-p.exitChan: exit = true } } }
客户端获取服务配置:设置定时器,每秒从ETCD获取,然后写入cache中。当客户端调用服务时,从cache中获取,不需要请求ETCD,可以节省资源消耗。
func (p *XEtcdRpcConfig) SyncConfig() { timer := time.NewTicker(time.Second * time.Duration(p.interval)) defer timer.Stop() for { if p.exit { break } select { case <-timer.C: p.syncConfig() } } p.exitChan <- true }
这个代码有个问题在于,如果服务端因为各种原因无法提供服务,客户端最多有1s的延时才能发觉该服务无法访问了。
总结
这篇文章给大家简单介绍了服务框架和注册中心,服务描述大家可以看我的关于swagger的文章-https://shidawuhen.github.io/2020/01/30/Gin%E6%A1%86%E6%9E%B6%E9%9B%86%E6%88%90swagger%E8%BF%87%E7%A8%8B/。至于服务监控、服务追踪、服务治理会在后期的文章中给大家展示。
之所以这篇文章,是因为想起了当年看《深入浅出MFC》的时候,那里面有一章叫做-MFC 六大关键技术之仿真,令我印象深刻,这种仿真能够帮助我很好的理解整个框架。所以我也尝试将我认为最核心的内容写出来,也算是帮自己梳理知识。
代码位置:
客户端:https://github.com/shidawuhen/asap
服务端:https://github.com/shidawuhen/grpcservice
资料
- Http、Socket、WebSocket之间联系与区别
- HTTP 和 SOCKET 的区别
- 一图说明http和socket关系
- gRPC官方文档中文版
- https://github.com/grpc/grpc
- https://ask.csdn.net/questions/1020982
- https://studygolang.com/articles/26652?fr=sidebar golang go mod 替换指定版本
- https://www.jianshu.com/p/1971a27096b9 golang:如何在go-mod中指定包的版本号
- https://studygolang.com/articles/23761?fr=sidebar 用Golang构建gRPC服务
- https://xueyuanjun.com/post/21218 注册中心篇(五):Etcd 简介和使用入门
- https://github.com/etcd-io/etcd/releases etcd下载
- golang中使用etcd
- golang etcd简明教程
- https://studygolang.com/articles/30078?fr=sidebar 【golang】解决etcd安装出现的问题
- https://studygolang.com/articles/26652?fr=sidebar golang go mod 替换指定版本
- https://www.jianshu.com/p/ae15f1296cad protobuf2.6.1及protoc-gen-go v1.2.0安装
- https://blog.csdn.net/u010918487/article/details/89003747 安装go ------protobuf
最后
大家如果喜欢我的文章,可以关注我的公众号(程序员麻辣烫)
我的个人博客为:https://shidawuhen.github.io/
往期文章回顾:
算法
技术
- 微服务之服务框架和注册中心
- Beego框架使用
- 浅谈微服务
- TCP性能优化
- 限流实现1
- Redis实现分布式锁
- Golang源码BUG追查
- 事务原子性、一致性、持久性的实现原理
- CDN请求过程详解
- 记博客服务被压垮的历程
- 常用缓存技巧
- 如何高效对接第三方支付
- Gin框架简洁版
- InnoDB锁与事务简析
读书笔记
思考