接上一篇文章,按照”终端出厂实现自动化运维方案",https://blog.csdn.net/yyz_1987/article/details/118358038
以终端状态上保监控服务和远程采集日志指令下发为例,记录下go-zero微服务的简单使用。最终实现一个低成本的后台监控云服务,监控所有出厂终端设备的状态和后续的报警推送服务。
这个方案说简单也简单,说难也确实不容易。难在而如何能否支撑全国各地上万个设备,每间隔十分钟一次的高并发。终端数量按10万计算,不像其他系统是读多写少。这套监控的场景反倒是写数据的多,读数据的少。单个mysql数据库能否撑得住同一时刻10万条记录的写入?
涉及API网关的负载均衡、同一微服务节点的多个部署。数据记录先入持久化缓存队列,空闲再写入mysql.这些肯定是少不了的。
Golang群里大神建议上MQ如Kafka,,这样减轻数据库的写入压力。但是kafka有点儿重量级了,先不考虑。还有人建议上硬件的F5负载均衡或者用keepalive 或者lvs 的方案。但是得借助域名解析,也先暂不考虑,等真达到量级或有必要解决了,总会有办法的。
这里先介绍下初步实现吧:
新建一个Golang服务后台项目代码的目录,取名monitor。
环境准备
电脑或服务器安装有mysql,redis,etcd
下载好一些插件工具有:goctl,protoc.exe,proto-gen-go.exe
API网关层实现
按照goctl这一代码生成神器的使用方式,首先定义一下终端需要上送的接口字段信息:
statusUpload.api
type ( //终端状态上报内容 StatusUploadReq { Sn string `json:"sn"` //设备唯一号 Pos string `json:"pos"` //终端编号 City string `json:"city"` //城市代码 Id string `json:"id"` //终端类型 Unum1 uint `json:"unnum"` //未传记录数量--公交 Unum2 uint `json:"unnum"` //未传记录数量--三方 Ndate string `json:"ndate"` //当前日期 Ntime string `json:"ntime"` //当前时间 Amount uint `json:"amount"` //当班总额 Count uint `json:"count"` //当班人数 Line uint `json:"line"` //线路号 Stime string `json:"stime"` //开机时间 Ctime string `json:"ctime"` //关机时间 Tenant uint `json:"tenant"` //租户ID } //应答内容 StatusUploadResp { Code int `json:"code"` Msg string `json:"msg"` Cmd int `json:"cmd"` //控制终端命令字 } ) service open-api { @doc( summary: 公开的api函数 desc: >statusUpload 终端状态上报 ) @server( handler: statusUploadHandler folder: open ) post /open/statusUpload(StatusUploadReq) returns(StatusUploadResp) }
接下来借助goctl神器的威力,直接生成网关层代码啦:
goctl api go -api statusUpload.api -dir .
生成代码目录结构如下:
接下来跑起来试试:
go run open.go
网关层启动成功,侦听端口8888,在etc文件夹内的 open-api.yaml文件中有配置
分别用postman和curl工具测下:
curl http://127.0.0.1:8888/open/statusUpload -X POST -H "Content-Type: application/json" -d @status.json
status.json文件内容:
{ "sn": "1C2EB08D", "pos": "12345678", "city": "0371", "id": "B503", "unum1": 0, "unum2": 0, "ndate": "2021-08-07", "ntime": "18:30:30", "amount": 0, "count": 0, "line": 101, "stime": "05:01:01", "ctime": "18:30:20", "tenant": 0 }
RPC服务端实现
接下来,把它改造成微服务的形式,通过rpc调用服务提供的接口。大体结构如下:
需要提前安装就绪etcd环境,且需要安装一些插件工具,如proto.exe 和proto-gen-go.exe工具,放到go或gopath的bin目录下。
在项目代码跟目录下创建rpc文件夹,建个微服端的代码目录,这里取名为status。
定义proto文件,status.proto如下:
syntax = "proto3"; package status; message statusUploadReq { string sn = 1; string pos = 2; string city = 3; string id = 4; uint32 unum1 = 5; uint32 unum2 = 6; string ndate = 7; string ntime = 8; uint32 amount = 9; uint32 count = 10; uint32 line = 11; string stime = 12; string ctime = 13; uint32 tenant = 14; } message statusUploadResp { int32 code = 1; string msg = 2; int32 cmd = 3; } service statusUploader { rpc statusUpload(statusUploadReq) returns(statusUploadResp); }
然后又是goctl神器发威了,自动生成代码,厉害不?
goctl rpc proto -src=status.proto -dir .
自动生成的文件目录如下:
自动生成的并不包含上图的那个client文件夹。client文件夹是为了单独测试rpc服务自己创建的,做个client端的demo调用一下rpc服务。model文件夹也是手工创建的,里面放数据库的操作接口。
自动生成的rpc服务端status.go入口文件内容:
package main import ( "flag" "fmt" "monitor/rpc/status/internal/config" "monitor/rpc/status/internal/server" "monitor/rpc/status/internal/svc" "monitor/rpc/status/status" "github.com/tal-tech/go-zero/core/conf" "github.com/tal-tech/go-zero/zrpc" "google.golang.org/grpc" ) var configFile = flag.String("f", "etc/status.yaml", "the config file") func main() { flag.Parse() var c config.Config conf.MustLoad(*configFile, &c) ctx := svc.NewServiceContext(c) srv := server.NewStatusUploaderServer(ctx) s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) { status.RegisterStatusUploaderServer(grpcServer, srv) }) defer s.Stop() fmt.Printf("Starting rpc server at %s...\n", c.ListenOn) s.Start() }
这时候如果启动了etcd,那么直接go run status.go,服务端就启动成功啦。
RPC客户端测试
为了验证下rpc服务端是工作正常的,在client文件夹中实现个zrpc的客户端测试一下:
client.go文件如下:
package main import ( "context" "fmt" "github.com/tal-tech/go-zero/core/discov" "github.com/tal-tech/go-zero/zrpc" "log" pb "monitor/rpc/status/status" ) func main() { client := zrpc.MustNewClient(zrpc.RpcClientConf{ Etcd: discov.EtcdConf{ Hosts: []string{"127.0.0.1:2379"}, Key: "status.rpc", }, }) sclient := pb.NewStatusUploaderClient(client.Conn()) reply, err := sclient.StatusUpload(context.Background(), &pb.StatusUploadReq{Sn: "test rpc", Pos: "go-zero"}) if err != nil { log.Fatal(err) } fmt.Println(reply.Msg) }
如果服务正常,会收到服务端接口的响应。
网关层调用改为微服务方式调用
可以把网关层改造下,改为微服务的调用方式。改动点并不大,如下:
第一步:
api目录api\internal\config路径下的config文件和api\etc下的open-api.yaml文件改动:
open-api.yaml增加etcd的相关配置,用于连接到etcd服务中心,查找对应的服务方法。
注意,Config结构里的Status名字和那个配置文件中的是一一对应的,不能错。如果有多个微服务,这里 可以依次写上,如这种:
Status: Etcd: Hosts: - localhost:2379 Key: status.rpc Expander: Etcd: Hosts: - localhost:2379 Key: expand.rpc
type Config struct { rest.RestConf Status zrpc.RpcClientConf // 手动代码 Expander zrpc.RpcClientConf // 手动代码 }
第二步:
api目录api\internal\svc路径下servicecontext.go文件改动:
第三步:
api\internal\logic目录下statusuploadlogic.go文件改动,
至此api网关层改造完成。可以模拟访问网关接口地址试试啦
curl http://127.0.0.1:8888/open/statusUpload -X POST -H "Content-Type: application/json" -d @status.json
定义数据库表结构,并生成 CRUD+cache 代码
- monitor项目根路径下创建 rpc/model 目录:
mkdir -p rpc/model
- 在 rpc/model 目录下编写创建 tb_status表的 sql 文件
status.sql
,如下:
CREATE TABLE `tb_status` ( `id` INT UNSIGNED AUTO_INCREMENT, `sn` VARCHAR(32) NOT NULL COMMENT '设备唯一号', `posno` VARCHAR(32) COMMENT '终端编号', `city` VARCHAR(16) COMMENT '城市代码', `tyid` VARCHAR(16) COMMENT '设备类型', `unum1` INT COMMENT '未传记录数--公交', `unum2` INT COMMENT '未传记录数--三方', `ndate` DATE COMMENT '当前日期', `ntime` TIME COMMENT '当前时间', `amount` INT COMMENT '当班总额', `count` INT COMMENT '当班人数', `line` INT COMMENT '线路编号', `stime` TIME COMMENT '开机时间 ', `ctime` TIME COMMENT '关机时间 ', `tenant` INT COMMENT '租户号 ', PRIMARY KEY(`id`) ) ENGINE=INNODB DEFAULT CHARSET=utf8mb4;
- 创建 DB,取名叫monitor, 和 table
create database monitor; source status.sql;
- 在
rpc/model
目录下执行如下命令生成 CRUD+cache 代码,-c
表示使用redis cache
goctl model mysql ddl -c -src status.sql -dir .
也可以用datasource
命令代替ddl
来指定数据库链接直接从 schema 生成
生成后的文件结构如下:
rpc/model ├── status.sql ├── tbstatusmodel.go // CRUD+cache代码 └── vars.go // 定义常量和变量
自动生成的tbstatusmodel.go 文件内容:
package model import ( "database/sql" "fmt" "strings" "github.com/tal-tech/go-zero/core/stores/cache" "github.com/tal-tech/go-zero/core/stores/sqlc" "github.com/tal-tech/go-zero/core/stores/sqlx" "github.com/tal-tech/go-zero/core/stringx" "github.com/tal-tech/go-zero/tools/goctl/model/sql/builderx" ) var ( tbStatusFieldNames = builderx.RawFieldNames(&TbStatus{}) tbStatusRows = strings.Join(tbStatusFieldNames, ",") tbStatusRowsExpectAutoSet = strings.Join(stringx.Remove(tbStatusFieldNames, "`id`", "`create_time`", "`update_time`"), ",") tbStatusRowsWithPlaceHolder = strings.Join(stringx.Remove(tbStatusFieldNames, "`id`", "`create_time`", "`update_time`"), "=?,") + "=?" cacheTbStatusIdPrefix = "cache::tbStatus:id:" ) type ( TbStatusModel interface { Insert(data TbStatus) (sql.Result, error) FindOne(id int64) (*TbStatus, error) Update(data TbStatus) error Delete(id int64) error } defaultTbStatusModel struct { sqlc.CachedConn table string } TbStatus struct { Id int64 `db:"id"` Sn sql.NullString `db:"sn"` // 设备唯一号 Posno sql.NullString `db:"posno"` // 终端编号 City sql.NullString `db:"city"` // 城市代码 Tyid sql.NullString `db:"tyid"` // 设备类型 Unum1 sql.NullInt64 `db:"unum1"` // 未传记录数--公交 Unum2 sql.NullInt64 `db:"unum2"` // 未传记录数--三方 Ndate sql.NullTime `db:"ndate"` // 当前日期 Ntime sql.NullString `db:"ntime"` // 当前时间 Amount sql.NullInt64 `db:"amount"` // 当班总额 Count sql.NullInt64 `db:"count"` // 当班人数 Line sql.NullInt64 `db:"line"` // 线路编号 Stime sql.NullString `db:"stime"` // 开机时间 Ctime sql.NullString `db:"ctime"` // 关机时间 Tenant sql.NullInt64 `db:"tenant"` // 租户号 } ) func NewTbStatusModel(conn sqlx.SqlConn, c cache.CacheConf) TbStatusModel { return &defaultTbStatusModel{ CachedConn: sqlc.NewConn(conn, c), table: "`tb_status`", } } func (m *defaultTbStatusModel) Insert(data TbStatus) (sql.Result, error) { query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, tbStatusRowsExpectAutoSet) ret, err := m.ExecNoCache(query, data.Sn, data.Posno, data.City, data.Tyid, data.Unum1, data.Unum2, data.Ndate, data.Ntime, data.Amount, data.Count, data.Line, data.Stime, data.Ctime, data.Tenant) return ret, err } func (m *defaultTbStatusModel) FindOne(id int64) (*TbStatus, error) { tbStatusIdKey := fmt.Sprintf("%s%v", cacheTbStatusIdPrefix, id) var resp TbStatus err := m.QueryRow(&resp, tbStatusIdKey, func(conn sqlx.SqlConn, v interface{}) error { query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", tbStatusRows, m.table) return conn.QueryRow(v, query, id) }) switch err { case nil: return &resp, nil case sqlc.ErrNotFound: return nil, ErrNotFound default: return nil, err } } func (m *defaultTbStatusModel) Update(data TbStatus) error { tbStatusIdKey := fmt.Sprintf("%s%v", cacheTbStatusIdPrefix, data.Id) _, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) { query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, tbStatusRowsWithPlaceHolder) return conn.Exec(query, data.Sn, data.Posno, data.City, data.Tyid, data.Unum1, data.Unum2, data.Ndate, data.Ntime, data.Amount, data.Count, data.Line, data.Stime, data.Ctime, data.Tenant, data.Id) }, tbStatusIdKey) return err } func (m *defaultTbStatusModel) Delete(id int64) error { tbStatusIdKey := fmt.Sprintf("%s%v", cacheTbStatusIdPrefix, id) _, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) { query := fmt.Sprintf("delete from %s where `id` = ?", m.table) return conn.Exec(query, id) }, tbStatusIdKey) return err } func (m *defaultTbStatusModel) formatPrimary(primary interface{}) string { return fmt.Sprintf("%s%v", cacheTbStatusIdPrefix, primary) } func (m *defaultTbStatusModel) queryPrimary(conn sqlx.SqlConn, v, primary interface{}) error { query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", tbStatusRows, m.table) return conn.QueryRow(v, query, primary) }
修改 monitor/rpc/status rpc 代码调用 crud+cache 代码
- 修改
rpc/status/etc/status.yaml
,增加如下内容:
- 修改
rpc/status/internal/config.go
,如下:
package config import "github.com/tal-tech/go-zero/zrpc" //手动代码 import "github.com/tal-tech/go-zero/core/stores/cache" type Config struct { zrpc.RpcServerConf DataSource string // 手动代码 Cache cache.CacheConf // 手动代码 }
增加了 mysql 和 redis cache 配置
- 修改
rpc/status/internal/svc/servicecontext.go
,如下:
package svc import "monitor/rpc/status/internal/config" //手动代码 import "monitor/rpc/status/model" type ServiceContext struct { Config config.Config Model model.TbStatusModel // 手动代码 } func NewServiceContext(c config.Config) *ServiceContext { return &ServiceContext{ Config: c, Model: model.NewTbStatusModel(sqlx.NewMysql(c.DataSource), c.Cache), // 手动代码 } }
- 修改
rpc/status/internal/logic/statusuploadlogic.go
,如下:
package logic import ( "context" "monitor/rpc/status/internal/svc" "monitor/rpc/status/status" "github.com/tal-tech/go-zero/core/logx" //手动代码 "database/sql" "monitor/rpc/status/model" "time" ) type StatusUploadLogic struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger model model.TbStatusModel // 手动代码 } func NewStatusUploadLogic(ctx context.Context, svcCtx *svc.ServiceContext) *StatusUploadLogic { return &StatusUploadLogic{ ctx: ctx, svcCtx: svcCtx, Logger: logx.WithContext(ctx), model: svcCtx.Model, // 手动代码 } } func (l *StatusUploadLogic) StatusUpload(in *status.StatusUploadReq) (*status.StatusUploadResp, error) { // todo: add your logic here and delete this line // 手动代码开始,插入记录到数据库 t, _ := time.Parse("2006-01-02", in.Ndate) _, err := l.model.Insert(model.TbStatus{ Sn: sql.NullString{in.Sn, true}, Posno: sql.NullString{in.Pos, true}, City: sql.NullString{in.City, true}, Tyid: sql.NullString{in.Id, true}, Ndate: sql.NullTime{t, true}, Ntime: sql.NullString{in.Ntime, true}, }) if err != nil { return nil, err } return &status.StatusUploadResp{Code: 0, Msg: "server resp,insert data ok", Cmd: 1}, nil }
注意这里的sql.NullString和NullTime的写法,如果后面的第二个参数值为false,则插入到库中的值为空。
最后测试下,发现数据已经可以成功入库啦。