这是逐步实现该方案的第二篇文章。
后台操控指令远程下发的实现
这里记录下如何实现后台命令下发到终端设备上,触发终端上送日志文件,实现运维人员远程可以足不出户,采集设备日志的功能。
实现原理:
终端设备上运行的监控服务会定时10分钟上送一次设备状态。在终端设备请求上送状态时,把后台设置的操作命令响应给它。虽然这样后台的操作指令,只能10分钟后被下发,实时性不够高,然而对采集日志而言,这也不影响什么。
实现方法:
使用redis,封装提供微服务接口,传参数设备号和操控命令字和过期时间。终端设备每10分钟状态上报时,先查询redis缓存是否存在有该设备的控制命令,有则在响应报文中应答给终端。
type CmdSetReq struct { Sn string `json:"sn"` //设备唯一号 Cmd int `json:"cmd"` //控制命令字 Expire int `json:"expire"` //失效时间(分钟) } type CmdSetResp struct { Code int `json:"code"` Msg string `json:"msg"` }
func (l *CmdSetLogic) CmdSet(in *status.CmdSetReq) (*status.CmdSetResp, error) { // todo: add your logic here and delete this line err := l.svcCtx.Cache.SetWithExpire(shared.CacheSnPrefix+in.Sn, in.Cmd, 60*time.Duration(in.Expire)*time.Second) if err != nil && err == shared.ErrNotFound { return &status.CmdSetResp{Code: 1, Msg: "cmd set Cache error"}, nil } return &status.CmdSetResp{Code: 0, Msg: "ok"}, nil }
接上篇文章,在monitor项目代码根目录下,创建shared目录,里面放置公共的配置信息,如缓存KEY的前缀和错误码定义信息:
package shared import "errors" var ErrNotFound = errors.New("cache not found") var CacheSnPrefix = "cache::cmd:sn:"
gozero中使用缓存Cache模块。使用方法:
monitor\rpc\status\internal\svc路径下的servicecontext.go中,增加Cache字段,并在NewServiceContext中完成初始化:
package svc import "monitor/shared" import "monitor/rpc/status/internal/config" //手动代码 import "monitor/rpc/status/model" import "github.com/tal-tech/go-zero/core/stores/sqlx" import "github.com/tal-tech/go-zero/core/stores/cache" import "github.com/tal-tech/go-zero/core/syncx" type ServiceContext struct { Config config.Config Model model.TbStatusModel // 手动代码 Cache cache.Cache //增加使用redis缓存 } func NewServiceContext(c config.Config) *ServiceContext { //缓存 ca := cache.New(c.Cache, syncx.NewSharedCalls(), cache.NewStat("cmdset"), shared.ErrNotFound) return &ServiceContext{ Config: c, Model: model.NewTbStatusModel(sqlx.NewMysql(c.DataSource), c.Cache), // 手动代码 Cache: ca, } }
在monitor\rpc\status\internal\logic的cmdsetlogic.go中,增加对缓存的设置。
package logic import ( "context" "monitor/rpc/status/internal/svc" "monitor/rpc/status/status" "monitor/shared" "time" "github.com/tal-tech/go-zero/core/logx" ) type CmdSetLogic struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger } func NewCmdSetLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CmdSetLogic { return &CmdSetLogic{ ctx: ctx, svcCtx: svcCtx, Logger: logx.WithContext(ctx), } } func (l *CmdSetLogic) CmdSet(in *status.CmdSetReq) (*status.CmdSetResp, error) { // todo: add your logic here and delete this line err := l.svcCtx.Cache.SetWithExpire(shared.CacheSnPrefix+in.Sn, in.Cmd, 60*time.Duration(in.Expire)*time.Second) if err != nil && err == shared.ErrNotFound { return &status.CmdSetResp{Code: 1, Msg: "cmd set Cache error"}, nil } return &status.CmdSetResp{Code: 0, Msg: "ok"}, nil }
然后在接收到终端上送上来的状态时,先读取一下缓存,看是否有该终端的控制命令。(读redis很快,耗时可忽略)
func (l *StatusUploadLogic) StatusUpload(in *status.StatusUploadReq) (*status.StatusUploadResp, error) { // todo: add your logic here and delete this line //检查 缓存中是否有值 var cmd int32 = 0 err := l.svcCtx.Cache.Get(shared.CacheSnPrefix+in.Sn, &cmd) if err != nil && err == shared.ErrNotFound { fmt.Println(err) } else { fmt.Println("GetCache ok:", cmd) l.svcCtx.Cache.Del(shared.CacheSnPrefix + in.Sn) } // 手动代码开始,插入记录到数据库 t, _ := time.Parse("2006-01-02", in.Ndate) _, err = l.model.Insert(model.TbStatus{ Sn: sql.NullString{in.Sn, true}, Posno: sql.NullString{in.Posno, true}, City: sql.NullString{in.City, true}, Tyid: sql.NullString{in.Tyid, true}, Ndate: sql.NullTime{t, true}, Ntime: sql.NullString{in.Ntime, true}, Cmd: sql.NullInt64{int64(cmd), true}, }) if err != nil { return nil, err } return &status.StatusUploadResp{Code: 0, Msg: "server resp,insert record ok", Cmd: cmd}, nil }
Over,就这么简单,实现了指令的下发。
注意本机测试时,调整下rest和rpc服务的接口超时时间。默认的网关rest接口超时是3秒,默认rpc服务接口超时时间是2秒。有点儿短,在etc下的yaml文件中调整下即可。
增加timeout:6000的配置改为6秒超时。