使用go-zero微服务框架实现云监控后台(一.后台微服务搭建)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云监控,每月短信1000条
简介: 使用go-zero微服务框架实现云监控后台(一.后台微服务搭建)

接上一篇文章,按照”终端出厂实现自动化运维方案",https://blog.csdn.net/yyz_1987/article/details/118358038


以终端状态上保监控服务和远程采集日志指令下发为例,记录下go-zero微服务的简单使用。最终实现一个低成本的后台监控云服务,监控所有出厂终端设备的状态和后续的报警推送服务。


这个方案说简单也简单,说难也确实不容易。难在而如何能否支撑全国各地上万个设备,每间隔十分钟一次的高并发。终端数量按10万计算,不像其他系统是读多写少。这套监控的场景反倒是写数据的多,读数据的少。单个mysql数据库能否撑得住同一时刻10万条记录的写入?


涉及API网关的负载均衡、同一微服务节点的多个部署。数据记录先入持久化缓存队列,空闲再写入mysql.这些肯定是少不了的。


Golang群里大神建议上MQ如Kafka,,这样减轻数据库的写入压力。但是kafka有点儿重量级了,先不考虑。还有人建议上硬件的F5负载均衡或者用keepalive 或者lvs 的方案。但是得借助域名解析,也先暂不考虑,等真达到量级或有必要解决了,总会有办法的。


这里先介绍下初步实现吧:


新建一个Golang服务后台项目代码的目录,取名monitor。


环境准备


电脑或服务器安装有mysql,redis,etcd


下载好一些插件工具有:goctl,protoc.exe,proto-gen-go.exe


API网关层实现


按照goctl这一代码生成神器的使用方式,首先定义一下终端需要上送的接口字段信息:


statusUpload.api


type (
  //终端状态上报内容
  StatusUploadReq {
    Sn     string `json:"sn"`     //设备唯一号
    Pos    string `json:"pos"`    //终端编号
    City   string `json:"city"`   //城市代码
    Id     string `json:"id"`     //终端类型
    Unum1  uint   `json:"unnum"`  //未传记录数量--公交
    Unum2  uint   `json:"unnum"`  //未传记录数量--三方
    Ndate  string `json:"ndate"`  //当前日期
    Ntime  string `json:"ntime"`  //当前时间
    Amount uint   `json:"amount"` //当班总额
    Count  uint   `json:"count"`  //当班人数
    Line   uint   `json:"line"`   //线路号
    Stime string `json:"stime"` //开机时间
    Ctime string `json:"ctime"` //关机时间
    Tenant uint `json:"tenant"` //租户ID
  }
  //应答内容
  StatusUploadResp {
    Code int    `json:"code"`
    Msg  string `json:"msg"`
    Cmd  int    `json:"cmd"` //控制终端命令字
  }
)
service open-api {
  @doc(
    summary: 公开的api函数
    desc: >statusUpload 终端状态上报
  )
  @server(
    handler: statusUploadHandler
    folder: open
  )
  post /open/statusUpload(StatusUploadReq) returns(StatusUploadResp)
}


接下来借助goctl神器的威力,直接生成网关层代码啦:


goctl api    go    -api       statusUpload.api   -dir    .


生成代码目录结构如下:




接下来跑起来试试:


go run open.go


网关层启动成功,侦听端口8888,在etc文件夹内的 open-api.yaml文件中有配置



分别用postman和curl工具测下:



curl http://127.0.0.1:8888/open/statusUpload -X POST -H "Content-Type: application/json" -d @status.json


status.json文件内容:


{
    "sn": "1C2EB08D",
    "pos": "12345678",
    "city": "0371",
    "id": "B503",
    "unum1": 0,
    "unum2": 0,
    "ndate": "2021-08-07",
    "ntime": "18:30:30",
    "amount": 0,
    "count": 0,
    "line": 101,
    "stime": "05:01:01",
    "ctime": "18:30:20",
    "tenant": 0
}


RPC服务端实现


接下来,把它改造成微服务的形式,通过rpc调用服务提供的接口。大体结构如下:




需要提前安装就绪etcd环境,且需要安装一些插件工具,如proto.exe 和proto-gen-go.exe工具,放到go或gopath的bin目录下。


在项目代码跟目录下创建rpc文件夹,建个微服端的代码目录,这里取名为status。


定义proto文件,status.proto如下:


syntax = "proto3";
package status;
message statusUploadReq {
    string sn = 1;
    string pos = 2;
  string city  = 3;
  string id = 4;  
  uint32 unum1 = 5;
  uint32 unum2 = 6;
  string ndate = 7;
  string ntime = 8;
  uint32 amount = 9;
  uint32 count = 10;
  uint32 line  = 11;
  string stime = 12;
  string ctime = 13;
  uint32 tenant = 14;
}
message statusUploadResp {
    int32 code = 1;
    string msg = 2;
    int32 cmd = 3;
}
service statusUploader {
    rpc statusUpload(statusUploadReq) returns(statusUploadResp);
}


然后又是goctl神器发威了,自动生成代码,厉害不?


goctl rpc proto -src=status.proto  -dir    .


自动生成的文件目录如下:




自动生成的并不包含上图的那个client文件夹。client文件夹是为了单独测试rpc服务自己创建的,做个client端的demo调用一下rpc服务。model文件夹也是手工创建的,里面放数据库的操作接口。


自动生成的rpc服务端status.go入口文件内容:


package main
import (
  "flag"
  "fmt"
  "monitor/rpc/status/internal/config"
  "monitor/rpc/status/internal/server"
  "monitor/rpc/status/internal/svc"
  "monitor/rpc/status/status"
  "github.com/tal-tech/go-zero/core/conf"
  "github.com/tal-tech/go-zero/zrpc"
  "google.golang.org/grpc"
)
var configFile = flag.String("f", "etc/status.yaml", "the config file")
func main() {
  flag.Parse()
  var c config.Config
  conf.MustLoad(*configFile, &c)
  ctx := svc.NewServiceContext(c)
  srv := server.NewStatusUploaderServer(ctx)
  s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
    status.RegisterStatusUploaderServer(grpcServer, srv)
  })
  defer s.Stop()
  fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
  s.Start()
}


这时候如果启动了etcd,那么直接go run status.go,服务端就启动成功啦。


RPC客户端测试


为了验证下rpc服务端是工作正常的,在client文件夹中实现个zrpc的客户端测试一下:


client.go文件如下:


package main
import (
  "context"
  "fmt"
  "github.com/tal-tech/go-zero/core/discov"
  "github.com/tal-tech/go-zero/zrpc"
  "log"
  pb "monitor/rpc/status/status"
)
func main() {
  client := zrpc.MustNewClient(zrpc.RpcClientConf{
    Etcd: discov.EtcdConf{
      Hosts: []string{"127.0.0.1:2379"},
      Key:   "status.rpc",
    },
  })
  sclient := pb.NewStatusUploaderClient(client.Conn())
  reply, err := sclient.StatusUpload(context.Background(), &pb.StatusUploadReq{Sn: "test rpc", Pos: "go-zero"})
  if err != nil {
    log.Fatal(err)
  }
  fmt.Println(reply.Msg)
}


如果服务正常,会收到服务端接口的响应。


网关层调用改为微服务方式调用


可以把网关层改造下,改为微服务的调用方式。改动点并不大,如下:


第一步:


api目录api\internal\config路径下的config文件和api\etc下的open-api.yaml文件改动:


open-api.yaml增加etcd的相关配置,用于连接到etcd服务中心,查找对应的服务方法。




注意,Config结构里的Status名字和那个配置文件中的是一一对应的,不能错。如果有多个微服务,这里 可以依次写上,如这种:


Status:
  Etcd:
    Hosts:
      - localhost:2379
    Key: status.rpc
Expander:
  Etcd:
    Hosts:
      - localhost:2379
    Key: expand.rpc


type Config struct {
  rest.RestConf
  Status    zrpc.RpcClientConf     // 手动代码
  Expander  zrpc.RpcClientConf     // 手动代码
}


第二步:


api目录api\internal\svc路径下servicecontext.go文件改动:



第三步:


api\internal\logic目录下statusuploadlogic.go文件改动,



至此api网关层改造完成。可以模拟访问网关接口地址试试啦


curl http://127.0.0.1:8888/open/statusUpload -X POST -H "Content-Type: application/json" -d @status.json


定义数据库表结构,并生成 CRUD+cache 代码


  • monitor项目根路径下创建 rpc/model 目录:mkdir -p rpc/model


  • 在 rpc/model 目录下编写创建 tb_status表的 sql 文件status.sql,如下:


CREATE TABLE `tb_status`
(
   `id` INT UNSIGNED AUTO_INCREMENT,
   `sn` VARCHAR(32) NOT NULL COMMENT '设备唯一号',
   `posno` VARCHAR(32)  COMMENT '终端编号',
   `city` VARCHAR(16)   COMMENT '城市代码',
   `tyid` VARCHAR(16)   COMMENT '设备类型',
   `unum1`  INT   COMMENT '未传记录数--公交',
   `unum2`  INT   COMMENT '未传记录数--三方',
   `ndate`  DATE  COMMENT '当前日期',
   `ntime`  TIME  COMMENT '当前时间',
   `amount` INT   COMMENT '当班总额',
   `count`  INT   COMMENT '当班人数',
   `line`   INT   COMMENT '线路编号',
   `stime`  TIME  COMMENT '开机时间 ',
   `ctime`  TIME  COMMENT '关机时间 ',
   `tenant` INT   COMMENT '租户号 ',
  PRIMARY KEY(`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4;


  • 创建 DB,取名叫monitor, 和 table


create database monitor;
source status.sql;


  • rpc/model目录下执行如下命令生成 CRUD+cache 代码,-c表示使用redis cache


goctl model mysql ddl -c -src status.sql -dir .


也可以用datasource命令代替ddl来指定数据库链接直接从 schema 生成


生成后的文件结构如下:


rpc/model
├── status.sql
├── tbstatusmodel.go              // CRUD+cache代码
└── vars.go                       // 定义常量和变量


自动生成的tbstatusmodel.go 文件内容:


package model
import (
  "database/sql"
  "fmt"
  "strings"
  "github.com/tal-tech/go-zero/core/stores/cache"
  "github.com/tal-tech/go-zero/core/stores/sqlc"
  "github.com/tal-tech/go-zero/core/stores/sqlx"
  "github.com/tal-tech/go-zero/core/stringx"
  "github.com/tal-tech/go-zero/tools/goctl/model/sql/builderx"
)
var (
  tbStatusFieldNames          = builderx.RawFieldNames(&TbStatus{})
  tbStatusRows                = strings.Join(tbStatusFieldNames, ",")
  tbStatusRowsExpectAutoSet   = strings.Join(stringx.Remove(tbStatusFieldNames, "`id`", "`create_time`", "`update_time`"), ",")
  tbStatusRowsWithPlaceHolder = strings.Join(stringx.Remove(tbStatusFieldNames, "`id`", "`create_time`", "`update_time`"), "=?,") + "=?"
  cacheTbStatusIdPrefix = "cache::tbStatus:id:"
)
type (
  TbStatusModel interface {
    Insert(data TbStatus) (sql.Result, error)
    FindOne(id int64) (*TbStatus, error)
    Update(data TbStatus) error
    Delete(id int64) error
  }
  defaultTbStatusModel struct {
    sqlc.CachedConn
    table string
  }
  TbStatus struct {
    Id     int64          `db:"id"`
    Sn     sql.NullString `db:"sn"`     // 设备唯一号
    Posno  sql.NullString `db:"posno"`  // 终端编号
    City   sql.NullString `db:"city"`   // 城市代码
    Tyid   sql.NullString `db:"tyid"`   // 设备类型
    Unum1  sql.NullInt64  `db:"unum1"`  // 未传记录数--公交
    Unum2  sql.NullInt64  `db:"unum2"`  // 未传记录数--三方
    Ndate  sql.NullTime   `db:"ndate"`  // 当前日期
    Ntime  sql.NullString `db:"ntime"`  // 当前时间
    Amount sql.NullInt64  `db:"amount"` // 当班总额
    Count  sql.NullInt64  `db:"count"`  // 当班人数
    Line   sql.NullInt64  `db:"line"`   // 线路编号
    Stime  sql.NullString `db:"stime"`  // 开机时间
    Ctime  sql.NullString `db:"ctime"`  // 关机时间
    Tenant sql.NullInt64  `db:"tenant"` // 租户号
  }
)
func NewTbStatusModel(conn sqlx.SqlConn, c cache.CacheConf) TbStatusModel {
  return &defaultTbStatusModel{
    CachedConn: sqlc.NewConn(conn, c),
    table:      "`tb_status`",
  }
}
func (m *defaultTbStatusModel) Insert(data TbStatus) (sql.Result, error) {
  query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, tbStatusRowsExpectAutoSet)
  ret, err := m.ExecNoCache(query, data.Sn, data.Posno, data.City, data.Tyid, data.Unum1, data.Unum2, data.Ndate, data.Ntime, data.Amount, data.Count, data.Line, data.Stime, data.Ctime, data.Tenant)
  return ret, err
}
func (m *defaultTbStatusModel) FindOne(id int64) (*TbStatus, error) {
  tbStatusIdKey := fmt.Sprintf("%s%v", cacheTbStatusIdPrefix, id)
  var resp TbStatus
  err := m.QueryRow(&resp, tbStatusIdKey, func(conn sqlx.SqlConn, v interface{}) error {
    query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", tbStatusRows, m.table)
    return conn.QueryRow(v, query, id)
  })
  switch err {
  case nil:
    return &resp, nil
  case sqlc.ErrNotFound:
    return nil, ErrNotFound
  default:
    return nil, err
  }
}
func (m *defaultTbStatusModel) Update(data TbStatus) error {
  tbStatusIdKey := fmt.Sprintf("%s%v", cacheTbStatusIdPrefix, data.Id)
  _, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {
    query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, tbStatusRowsWithPlaceHolder)
    return conn.Exec(query, data.Sn, data.Posno, data.City, data.Tyid, data.Unum1, data.Unum2, data.Ndate, data.Ntime, data.Amount, data.Count, data.Line, data.Stime, data.Ctime, data.Tenant, data.Id)
  }, tbStatusIdKey)
  return err
}
func (m *defaultTbStatusModel) Delete(id int64) error {
  tbStatusIdKey := fmt.Sprintf("%s%v", cacheTbStatusIdPrefix, id)
  _, err := m.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) {
    query := fmt.Sprintf("delete from %s where `id` = ?", m.table)
    return conn.Exec(query, id)
  }, tbStatusIdKey)
  return err
}
func (m *defaultTbStatusModel) formatPrimary(primary interface{}) string {
  return fmt.Sprintf("%s%v", cacheTbStatusIdPrefix, primary)
}
func (m *defaultTbStatusModel) queryPrimary(conn sqlx.SqlConn, v, primary interface{}) error {
  query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", tbStatusRows, m.table)
  return conn.QueryRow(v, query, primary)
}


修改 monitor/rpc/status rpc 代码调用 crud+cache 代码


  • 修改rpc/status/etc/status.yaml,增加如下内容:



  • 修改rpc/status/internal/config.go,如下:


package config
import "github.com/tal-tech/go-zero/zrpc"
//手动代码
import "github.com/tal-tech/go-zero/core/stores/cache"
type Config struct {
  zrpc.RpcServerConf
  DataSource string          // 手动代码
  Cache      cache.CacheConf // 手动代码
}


增加了 mysql 和 redis cache 配置


  • 修改rpc/status/internal/svc/servicecontext.go,如下:


package svc
import "monitor/rpc/status/internal/config"
//手动代码
import "monitor/rpc/status/model"
type ServiceContext struct {
  Config config.Config
  Model  model.TbStatusModel // 手动代码
}
func NewServiceContext(c config.Config) *ServiceContext {
  return &ServiceContext{
    Config: c,
    Model:  model.NewTbStatusModel(sqlx.NewMysql(c.DataSource), c.Cache), // 手动代码
  }
}


  • 修改rpc/status/internal/logic/statusuploadlogic.go,如下:


package logic
import (
  "context"
  "monitor/rpc/status/internal/svc"
  "monitor/rpc/status/status"
  "github.com/tal-tech/go-zero/core/logx"
  //手动代码
  "database/sql"
  "monitor/rpc/status/model"
  "time"
)
type StatusUploadLogic struct {
  ctx    context.Context
  svcCtx *svc.ServiceContext
  logx.Logger
  model model.TbStatusModel // 手动代码
}
func NewStatusUploadLogic(ctx context.Context, svcCtx *svc.ServiceContext) *StatusUploadLogic {
  return &StatusUploadLogic{
    ctx:    ctx,
    svcCtx: svcCtx,
    Logger: logx.WithContext(ctx),
    model:  svcCtx.Model, // 手动代码
  }
}
func (l *StatusUploadLogic) StatusUpload(in *status.StatusUploadReq) (*status.StatusUploadResp, error) {
  // todo: add your logic here and delete this line
  // 手动代码开始,插入记录到数据库
  t, _ := time.Parse("2006-01-02", in.Ndate)
  _, err := l.model.Insert(model.TbStatus{
    Sn:    sql.NullString{in.Sn, true},
    Posno: sql.NullString{in.Pos, true},
    City:  sql.NullString{in.City, true},
    Tyid:  sql.NullString{in.Id, true},
    Ndate: sql.NullTime{t, true},
    Ntime: sql.NullString{in.Ntime, true},
  })
  if err != nil {
    return nil, err
  }
  return &status.StatusUploadResp{Code: 0, Msg: "server resp,insert data ok", Cmd: 1}, nil
}


注意这里的sql.NullString和NullTime的写法,如果后面的第二个参数值为false,则插入到库中的值为空。


最后测试下,发现数据已经可以成功入库啦。


相关实践学习
基于云监控实现的监控系统
通过阿里云云监控功能给非阿里云主机安装监控插件,从而实现对非阿里云主机的各项指标进行监控和管理,在配置报警规则和报警人的情况下,能对特定的场景做出报警反应通知到报警人的手机上。
相关文章
|
1月前
|
Dubbo Java 应用服务中间件
微服务框架Dubbo环境部署实战
微服务框架Dubbo环境部署的实战指南,涵盖了Dubbo的概述、服务部署、以及Dubbo web管理页面的部署,旨在指导读者如何搭建和使用Dubbo框架。
100 17
微服务框架Dubbo环境部署实战
|
12天前
|
JSON Go API
使用Go语言和Gin框架构建RESTful API:GET与POST请求示例
使用Go语言和Gin框架构建RESTful API:GET与POST请求示例
|
20天前
|
Kubernetes Java Android开发
用 Quarkus 框架优化 Java 微服务架构的设计与实现
Quarkus 是专为 GraalVM 和 OpenJDK HotSpot 设计的 Kubernetes Native Java 框架,提供快速启动、低内存占用及高效开发体验,显著优化了 Java 在微服务架构中的表现。它采用提前编译和懒加载技术实现毫秒级启动,通过优化类加载机制降低内存消耗,并支持多种技术和框架集成,如 Kubernetes、Docker 及 Eclipse MicroProfile,助力开发者轻松构建强大微服务应用。例如,在电商场景中,可利用 Quarkus 快速搭建商品管理和订单管理等微服务,提升系统响应速度与稳定性。
34 5
|
1月前
|
存储 Java Maven
从零到微服务专家:用Micronaut框架轻松构建未来架构
【9月更文挑战第5天】在现代软件开发中,微服务架构因提升应用的可伸缩性和灵活性而广受欢迎。Micronaut 是一个轻量级的 Java 框架,适合构建微服务。本文介绍如何从零开始使用 Micronaut 搭建微服务架构,包括设置开发环境、创建 Maven 项目并添加 Micronaut 依赖,编写主类启动应用,以及添加控制器处理 HTTP 请求。通过示例代码展示如何实现简单的 “Hello, World!” 功能,并介绍如何通过添加更多依赖来扩展应用功能,如数据访问、验证和安全性等。Micronaut 的强大和灵活性使你能够快速构建复杂的微服务系统。
64 5
|
1月前
|
缓存 Java 应用服务中间件
随着微服务架构的兴起,Spring Boot凭借其快速开发和易部署的特点,成为构建RESTful API的首选框架
【9月更文挑战第6天】随着微服务架构的兴起,Spring Boot凭借其快速开发和易部署的特点,成为构建RESTful API的首选框架。Nginx作为高性能的HTTP反向代理服务器,常用于前端负载均衡,提升应用的可用性和响应速度。本文详细介绍如何通过合理配置实现Spring Boot与Nginx的高效协同工作,包括负载均衡策略、静态资源缓存、数据压缩传输及Spring Boot内部优化(如线程池配置、缓存策略等)。通过这些方法,开发者可以显著提升系统的整体性能,打造高性能、高可用的Web应用。
59 2
|
1月前
|
Cloud Native 安全 Java
Micronaut对决Spring Boot:谁是微服务领域的王者?揭秘两者优劣,选对框架至关重要!
【9月更文挑战第5天】近年来,微服务架构备受关注,Micronaut和Spring Boot成为热门选择。Micronaut由OCI开发,基于注解的依赖注入,内置多种特性,轻量级且启动迅速;Spring Boot则简化了Spring应用开发,拥有丰富的生态支持。选择框架需考虑项目需求、团队经验、性能要求及社区支持等因素。希望本文能帮助您选择合适的微服务框架,助力您的软件开发项目取得成功!
105 2
|
1月前
|
消息中间件 NoSQL Go
PHP转Go系列 | ThinkPHP与Gin框架之Redis延时消息队列技术实践
【9月更文挑战第7天】在从 PHP 的 ThinkPHP 框架迁移到 Go 的 Gin 框架时,涉及 Redis 延时消息队列的技术实践主要包括:理解延时消息队列概念,其能在特定时间处理消息,适用于定时任务等场景;在 ThinkPHP 中使用 Redis 实现延时队列;在 Gin 中结合 Go 的 Redis 客户端库实现类似功能;Go 具有更高性能和简洁性,适合处理大量消息。迁移过程中需考虑业务需求及系统稳定性。
|
15天前
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
15天前
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
2月前
|
Kubernetes Cloud Native Docker
云原生之旅:从容器到微服务的架构演变
【8月更文挑战第29天】在数字化时代的浪潮下,云原生技术以其灵活性、可扩展性和弹性管理成为企业数字化转型的关键。本文将通过浅显易懂的语言和生动的比喻,带领读者了解云原生的基本概念,探索容器化技术的奥秘,并深入微服务架构的世界。我们将一起见证代码如何转化为现实中的服务,实现快速迭代和高效部署。无论你是初学者还是有经验的开发者,这篇文章都会为你打开一扇通往云原生世界的大门。