golang实现Pub/Sub模式
一、什么是Pub/Sub模式
Pub/Sub 是一种可扩缩的异步消息传递服务,可将生成消息的服务与处理这些消息的服务分离开来。
Pub/Sub 允许服务异步通信,延迟时间大约为 100 毫秒。
Pub/Sub 用于流式分析和数据集成流水线,以注入和分发数据。无论是作为用于消息整合的消息传递中间件,还是作为并行处理任务的队列,它都非常有效。
通过 Pub/Sub,可以创建事件提供方和使用方的系统,称为发布者和订阅者。发布者通过广播事件而不是同步远程过程调用 (RPC) 与订阅者异步通信。
发布者将事件发送到 Pub/Sub 服务,而不考虑如何或何时处理这些事件。然后,Pub/Sub 会将事件传送到对其做出响应的所有服务。在通过 RPC 进行通信的系统中,发布商必须等待订阅者接收数据。但是,Pub/Sub 中的异步集成可以提高整个系统的灵活性和稳健性。
二、用Go实现简单的Pub/Sub服务
hub.go 实现Pub/Sub 服务,类似于中心枢纽,用于接受和分发消息
package main import ( "context" "sync" ) type PubSubHub struct { sync.Mutex subs map[*subscriber]struct{} } func (h *PubSubHub) publish(ctx context.Context, msg *message) { for s := range h.subs { s.publish(ctx, msg) } } func (h *PubSubHub) subscribe(ctx context.Context, s *subscriber) { h.subs[s] = struct{}{} go func() { select { case <-s.quit: case <-ctx.Done(): delete(h.subs, s) } }() go s.run(ctx) } func (h *PubSubHub) unsubscribe(ctx context.Context, s *subscriber) { delete(h.subs, s) close(s.quit) } func (h *PubSubHub) subscribers() int { return len(h.subs) } func newPubSubHub() *PubSubHub { return &PubSubHub{ subs: map[*subscriber]struct{}{}, } }
subscriber.go 实现消息订阅和消费
package main import ( "context" "fmt" "sync" ) type message struct { data []byte } type subscriber struct { sync.Mutex name string handler chan *message quit chan struct{} } func (s *subscriber) run(ctx context.Context) { for { select { case msg:=<-s.handler: fmt.Println(s.name, string(msg.data)) case <-s.quit: return case <-ctx.Done(): return } } } func (s *subscriber) publish(ctx context.Context, msg *message) { select { case <-ctx.Done(): return case s.handler <- msg: default: } } func newSubscriber(name string) *subscriber { return &subscriber{ name: name, handler: make(chan *message, 100), quit: make(chan struct{}), } }
三、验证
main.go
package main import ( "context" "time" ) func main() { ctx := context.Background() h := newPubSubHub() sub01 := newSubscriber("sub01") sub02 := newSubscriber("sub02") sub03 := newSubscriber("sub03") h.subscribe(ctx, sub01) h.subscribe(ctx, sub02) h.subscribe(ctx, sub03) h.publish(ctx, &message{ data: []byte("test01"), }) time.Sleep(1*time.Second) h.unsubscribe(ctx, sub03) h.publish(ctx, &message{ data: []byte("test02"), }) time.Sleep(1*time.Second) }
运行main.go并且打印结果:
sub03 test01 sub01 test01 sub02 test01 sub02 test02 sub01 test02
打印的结果是对的,sub1-3都订阅了Pub/Sub服务,发送消息test01,三者都收到了。紧接着sub03取消订阅,然后再次发送消息test02,只有sub1-2收到了。
四、流程图
1、Subscriber 订阅消息
2、Publisher 发送消息
3、Unsubscribe 取消订阅