Golang 从0到1之任务提醒(二)

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: Golang 从0到1之任务提醒(二)

定时器


如何定时执行提醒推送呢?

我们可以开启一个定时器,每小时去获取一次下一个小时内需要推送的任务。

package handlers
import (
  "fmt"
  "go-remind/logic"
  "go-remind/models"
  "go-remind/server"
  "go-remind/tools"
  "time"
)
var isFirst bool = true
func Scheduler() {
  var job logic.JobLogic
  for {
    if !isFirst {
      timer := time.NewTicker(1 * time.Hour)
      <-timer.C
    }
    isFirst = false
    // 获取接下来一小时内需要发送的任务列表
    now := tools.GetCurrTime()
    h, _ := time.ParseDuration("1h")
    jobs, err := job.GetJobsByTime(tools.TimeString(now), tools.TimeString(now.Add(1*h)))
    if err != nil {
      fmt.Printf("出错了:%v", err)
      return
    }
    // 任务通道
    ch := make(chan models.Job, 100)
    jobFunc := func(ch <-chan models.Job) {
      for item := range ch {
        // 发送通知
        go HandleJob(item)
      }
    }
    // 处理任务
    go jobFunc(ch)
    // 投递任务
    for _, job := range jobs {
      ch <- job
    }
  }
}

isFirst 默认值是 true。它的作用是,如果是第一次项目启动的时候,那么作为初始化先去拉取一遍小时内全量任务。我们看 Scheduler 逻辑,前半部分的逻辑就是通过定时器固定周期去拉取要执行的任务,然后创建一个缓冲的通道,

ch := make(chan models.Job, 100)

把从数据库中拉取的任务都丢进通道中,另开一个 G 用来从通道中接收并执行提醒任务。这里每个提醒任务都单独开启一个 G 执行。

jobFunc := func(ch <-chan models.Job) {
      for item := range ch {
        // 发送通知
        go HandleJob(item)
      }
    }
    // 处理任务
    go jobFunc(ch)
    // 投递任务
    for _, job := range jobs {
      ch <- job
    }

主要看HandleJob(item) 操作,

func HandleJob(job models.Job) {
  now := tools.GetCurrTime()
  noticeTime, _ := time.ParseInLocation(tools.TimeFormat,
    job.NoticeTime.Format(tools.TimeFormat), time.Local)
  diff := noticeTime.Sub(now)
  timer := time.NewTimer(diff)
  <-timer.C
  var sendTool server.Message
  sendTool = &server.SmsMsg{Job: job}
  if job.Phone == "" {
    sendTool = &server.EmailMsg{Job: job}
  }
  err := server.Notice(sendTool)
  //成功与否
  isOk := models.JobSuccess
  jobLogic := logic.JobLogic{}
  if err != nil {
    isOk = models.JobFail
    fmt.Printf("通知失败:%v", err)
  }
  _ = jobLogic.UpdateStatusById(job.Id, isOk)
}

前半部分就是计算当前时间距离任务执行间隔时间,对应单独设置一个定时器,任务的真正执行时间取决于用户自己设定的提醒时间。

这里实现并不是很好,想象我们当前一小时内有 10w 个任务,那么必然会开启 10w 个 G 来执行,每个 G 都有自己的定时器,在更极端的情况下,所有任务都集中在五十几分钟,你已经可以想象了。

我们可以分批去取,比如按照时间顺序每次获取 100 个任务 A,这样的话后100 个任务 B,他们在任务执行时间的关系一定是:B>=A。

然后通过 go 开箱即用的并发控制技术 sync.WaitGroup 来等待控制。这算是一种优化方向。


接下来看,


var sendTool server.Message

Message 是一个接口类型,代表发送任务通知的动作。


type Message interface {
  SendMessage() error
}

底下的 EmailMsg 和 PhoneMsg 都隐式实现了此接口,完成具体的发送操作洗细节。(ps:短信懒的接第三方了)


type EmailMsg struct {
  Job models.Job
}
func (email *EmailMsg) SendMessage() error {
  fmt.Printf("成功给%s发送邮件\n", email.Job.Email)
  sendMail := gomail.NewMessage()
  sendMail.SetHeader(`From`, ConfAll.Email.User)
  sendMail.SetHeader(`To`, email.Job.Email)
  sendMail.SetHeader(`Subject`, "来自吴亲库里的温馨提醒")
  sendMail.SetBody(`text/html`, email.Job.Content)
  err := gomail.NewDialer(
    ConfAll.Email.Host, ConfAll.Email.Port, ConfAll.Email.User,
    ConfAll.Email.Pass).DialAndSend(sendMail)
  if err != nil {
    return err
  }
  return nil
}
type SmsMsg struct {
  Job models.Job
}
func (email *SmsMsg) SendMessage() error {
  fmt.Printf("成功给%s发送短信\n", email.Job.Phone)
  return nil
}


回头看HandleJob,如果填了手机号,优先选择手机号,否则就发邮箱通知。

看这行代码,

err := server.Notice(sendTool)


func Notice(msg Message) error {
  return try.Do(func(attempt int) (retry bool, err error) {
    err = msg.SendMessage()
    if err != nil {
      return attempt < try.MaxRetries, err
    }
    return true, nil
  })

msg.SendMessage 就是实际的执行发送操作,再看看 try.Do 是干嘛的。

// 最大允许重试次数
var MaxRetries = 3
var errMaxRetriesReached = errors.New("exceeded retry limit")
type Func func(attempt int) (retry bool, err error)
func Do(fn Func) error {
  var err error
  var cont bool
  attempt := 1
  for {
    if attempt > 1 {
      time.Sleep(2 * time.Second)
    }
    cont, err = fn(attempt)
    if !cont || err == nil {
      break
    }
    attempt++
    if attempt > MaxRetries {
      return errMaxRetriesReached
    }
  }
  return err
}

其实就是一个重试的操作,我稍微解释一下这段代码。

首先最外面是个死循环,如果 attempt 等于1,说明是第一次执行,运行自己传递的闭包函数,也就是执行任务。否则的话,代表此次是重试,先等待一段时间。

cont 表示是否超过最大重试次数,是个 bool 值。err 代码运行有无错误。那么底下这句代码就是判断:如果超过最大重试次数或者任务处理是正常的,两者有一个满足就可以退出这段程序了。

if !cont || err == nil {      break    }

这个函数最终返回一个 err,如果不为空,说明最终任务经历过重试还是执行失败,那么就真的失败了,标记任务失败。

err := server.Notice(sendTool)
  //成功与否
  isOk := models.JobSuccess
  jobLogic := logic.JobLogic{}
  if err != nil {
    isOk = models.JobFail
    fmt.Printf("通知失败:%v", err)
  }
  _ = jobLogic.UpdateStatusById(job.Id, isOk)

为什么需要重试?

简单的说,与外部有依赖的操作本身就是不可控的,尤其是网络波动。出现这种情况,就需要通过重试机制去保障服务的正常。

但是重试也是有限制的,不可能短期内无休止的去重试一个服务。抛开网络波动,在一定时间内重试失败大概率不要指望短时间能恢复服务了,这样造成的数据损失,在事后恢复,经常被人高大上称之为最终一致性,很多时候,往往就是人肉罢了。

这里很骚的一个操作,重试时间直接被我定住了,真实。

time.Sleep(2 * time.Second)



解析微信消息

接下来看看处理微信消息。上篇文章已经把主要微信接收的代码写上了,现在我们主要是解析发送的信息,入库而已。没什么黑科技,靠的就是正则匹配。

//时间匹配
var TimeMatch = contentRegexp{regexp.MustCompile(
  `(今天|明天|后天|大后天|[\d]{4}-[\d]{2}-[\d]{2}\s[\d]{2}:[\d]{2}|[\d]{8}\s[\d]{1,2}:[\d]{1,2}|[[\d]{1,2}:[\d]{1,2}|[\d]{1,2}(个月|小时|点|分钟|分|秒|周|天))`,
)}
//手机号匹配
var PhoneMatch = contentRegexp{regexp.MustCompile(
  `(1[356789]\d)(\d{4})(\d{4})`,
)}
//邮箱匹配
var EmailMatch = contentRegexp{regexp.MustCompile(
  `(\w+([-+.]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*)`,
)}


func HandleMessage(content string) (string, error) {
  phone := tools.PhoneMatch.FindStringSubmatch(content)
  email := tools.EmailMatch.FindStringSubmatch(content)
  if phone == nil || email == nil {
    return "不留下联系方式我咋么联系上你", RequestFormatErr
  }
  mmp := tools.TimeMatch.FindAllStringSubmatch(content, -1)
  if mmp == nil {
    return "我得再升升级才能满足你的时间格式", RequestFormatErr
  }
  // 最大匹配到分
  if len(mmp) > 3 {
    mmp = mmp[:3]
  }
  var sendDate string
  //....
  //省略一万行解析代码
  //...
  sendTimer := tools.StringToTimer(sendDate + ":00")
  diff := sendTimer.Sub(tools.GetCurrTime())
  if diff < 0 {
    return "过期的时间就别让我通知了", RequestFormatErr
  }
  jobLogic := &logic.JobLogic{}
  job := logic.NewJob(content, sendTimer, phone[0], email[0])
  err := jobLogic.Insert(job)
  if err != nil {
    return "请检查输入内容", RequestFormatErr
  }
  if diff.Minutes() < 0 {
    return fmt.Sprintf("%s秒后短信提醒内容:%s", tools.Decimal(diff.Seconds()), content), nil
  }
  if diff.Hours() < 1 {
    //小于1个小时直接加入到定时器
    go func() {
      HandleJob(job)
    }()
    return fmt.Sprintf("%s分钟后短信提醒内容:%s", tools.Decimal(diff.Minutes()), content), nil
  }
  return fmt.Sprintf("%s小时后短信提醒内容:%s", tools.Decimal(diff.Hours()), content), nil
}

上面一部分主要是去解析数据,比如通知时间、手机号、邮箱。

后面还有一个操作,之前我们是以 1 小时为单位的定时器,如果这个新增任务的时间属于当前小时内的,那么我们在入库的同时丢给刚才的 HandleJob 。

然后在主函数中专门开个 g 去运行这个定时任务,这样整体的流程就连接起来了。

r := gin.Default()
  go func() {
    handlers.Scheduler()
  }()
  r.POST("/msg", handlers.Message)
  _ = r.Run(":8099")

到这里,我们就已经把最小可行性的项目运行起来了。

但是还是有很多可以优化的点,这个后续我随着优化。感兴趣的可以给我提 PR。

哦对了,单元测试都没写。咋么能不写单元测试呢,致命打击。

项目放在:https://github.com/wuqinqiang/go-remind 感兴趣可以看看。

相关文章
|
4月前
|
存储 Go 调度
go-zero 如何应对海量定时/延迟任务?
go-zero 如何应对海量定时/延迟任务?
|
6月前
|
并行计算 Go 数据处理
掌握Go语言:Go 并发编程,轻松应对大规模任务处理和高并发请求(34)
掌握Go语言:Go 并发编程,轻松应对大规模任务处理和高并发请求(34)
|
6月前
|
Go
go之channel任意任务完成、全部任务完成退出
go之channel任意任务完成、全部任务完成退出
|
7月前
|
NoSQL Go Redis
Go异步任务处理解决方案:Asynq
Go异步任务处理解决方案:Asynq
311 1
Go异步任务处理解决方案:Asynq
|
消息中间件 运维 Kubernetes
工作中用Go: Go中异步任务怎么写
工作中用Go: Go中异步任务怎么写
3089 0
工作中用Go: Go中异步任务怎么写
|
监控 NoSQL 数据可视化
一文带您了解Go异步任务处理解决方案:Asynq
一文带您了解Go异步任务处理解决方案:Asynq
558 0
|
运维 Prometheus Kubernetes
工作用Go: 异步任务怎么写6 | Asynq: 专业异步任务框架
工作用Go: 异步任务怎么写6 | Asynq: 专业异步任务框架
1936 0
工作用Go: 异步任务怎么写6 | Asynq: 专业异步任务框架
|
Go
工作用Go: 异步任务怎么写5 | 异步任务: 能否更优雅点
工作用Go: 异步任务怎么写5 | 异步任务: 能否更优雅点
286 0
工作用Go: 异步任务怎么写5 | 异步任务: 能否更优雅点
|
消息中间件 监控 NoSQL
工作用Go: 异步任务怎么写4 | Trace: 异步任务还能进行链路追踪么?
工作用Go: 异步任务怎么写4 | Trace: 异步任务还能进行链路追踪么?
598 0
工作用Go: 异步任务怎么写4 | Trace: 异步任务还能进行链路追踪么?
|
Go
工作用Go: 异步任务怎么写3 | 避坑: 野生 Goroutine
工作用Go: 异步任务怎么写3 | 避坑: 野生 Goroutine
349 0
工作用Go: 异步任务怎么写3 | 避坑: 野生 Goroutine