golang实现Pub/Sub模式

简介: golang实现Pub/Sub模式

golang实现Pub/Sub模式



一、什么是Pub/Sub模式


Pub/Sub 是一种可扩缩的异步消息传递服务,可将生成消息的服务与处理这些消息的服务分离开来。


Pub/Sub 允许服务异步通信,延迟时间大约为 100 毫秒。


Pub/Sub 用于流式分析和数据集成流水线,以注入和分发数据。无论是作为用于消息整合的消息传递中间件,还是作为并行处理任务的队列,它都非常有效。


通过 Pub/Sub,可以创建事件提供方和使用方的系统,称为发布者和订阅者。发布者通过广播事件而不是同步远程过程调用 (RPC) 与订阅者异步通信。


发布者将事件发送到 Pub/Sub 服务,而不考虑如何或何时处理这些事件。然后,Pub/Sub 会将事件传送到对其做出响应的所有服务。在通过 RPC 进行通信的系统中,发布商必须等待订阅者接收数据。但是,Pub/Sub 中的异步集成可以提高整个系统的灵活性和稳健性。


二、用Go实现简单的Pub/Sub服务


hub.go 实现Pub/Sub 服务,类似于中心枢纽,用于接受和分发消息


package main
import (
 "context"
 "sync"
)
type PubSubHub struct {
 sync.Mutex
 subs map[*subscriber]struct{}
}
func (h *PubSubHub) publish(ctx context.Context, msg *message) {
 for s := range h.subs {
  s.publish(ctx, msg)
 }
}
func (h *PubSubHub) subscribe(ctx context.Context, s *subscriber) {
 h.subs[s] = struct{}{}
 go func() {
  select {
  case <-s.quit:
  case <-ctx.Done():
   delete(h.subs, s)
  }
 }()
 go s.run(ctx)
}
func (h *PubSubHub) unsubscribe(ctx context.Context, s *subscriber) {
 delete(h.subs, s)
 close(s.quit)
}
func (h *PubSubHub) subscribers() int {
 return len(h.subs)
}
func newPubSubHub() *PubSubHub {
 return &PubSubHub{
  subs: map[*subscriber]struct{}{},
 }
}


subscriber.go 实现消息订阅和消费


package main
import (
 "context"
 "fmt"
 "sync"
)
type message struct {
 data []byte
}
type subscriber struct {
 sync.Mutex
 name string
 handler chan *message
 quit chan struct{}
}
func (s *subscriber) run(ctx context.Context) {
 for {
  select {
  case msg:=<-s.handler:
   fmt.Println(s.name, string(msg.data))
  case <-s.quit:
   return
  case <-ctx.Done():
   return
  }
 }
}
func (s *subscriber) publish(ctx context.Context, msg *message) {
 select {
 case <-ctx.Done():
  return
 case s.handler <- msg:
 default:
 }
}
func newSubscriber(name string) *subscriber {
 return &subscriber{
  name: name,
  handler: make(chan *message, 100),
  quit: make(chan struct{}),
 }
}


三、验证


main.go


package main
import (
 "context"
 "time"
)
func main() {
 ctx := context.Background()
 h := newPubSubHub()
 sub01 := newSubscriber("sub01")
 sub02 := newSubscriber("sub02")
 sub03 := newSubscriber("sub03")
 h.subscribe(ctx, sub01)
 h.subscribe(ctx, sub02)
 h.subscribe(ctx, sub03)
 h.publish(ctx, &message{
  data: []byte("test01"),
 })
 time.Sleep(1*time.Second)
 h.unsubscribe(ctx, sub03)
 h.publish(ctx, &message{
  data: []byte("test02"),
 })
 time.Sleep(1*time.Second)
}


运行main.go并且打印结果:


sub03 test01
sub01 test01
sub02 test01
sub02 test02
sub01 test02

打印的结果是对的,sub1-3都订阅了Pub/Sub服务,发送消息test01,三者都收到了。紧接着sub03取消订阅,然后再次发送消息test02,只有sub1-2收到了。


四、流程图


1、Subscriber 订阅消息


2、Publisher 发送消息

640.png


3、Unsubscribe 取消订阅



640.png

相关文章
|
4月前
|
设计模式 Go 开发工具
Golang设计模式——12中介模式
Golang设计模式——12中介模式
26 0
|
4月前
|
物联网 Go 网络性能优化
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式【1月更文挑战第21天】【1月更文挑战第104篇】
123 1
|
9天前
|
前端开发 Go
Golang深入浅出之-Go语言中的异步编程与Future/Promise模式
【5月更文挑战第3天】Go语言通过goroutines和channels实现异步编程,虽无内置Future/Promise,但可借助其特性模拟。本文探讨了如何使用channel实现Future模式,提供了异步获取URL内容长度的示例,并警示了Channel泄漏、错误处理和并发控制等常见问题。为避免这些问题,建议显式关闭channel、使用context.Context、并发控制机制及有效传播错误。理解并应用这些技巧能提升Go语言异步编程的效率和健壮性。
27 5
Golang深入浅出之-Go语言中的异步编程与Future/Promise模式
|
10天前
|
设计模式 Go 调度
Golang深入浅出之-Go语言中的并发模式:Pipeline、Worker Pool等
【5月更文挑战第1天】Go语言并发模拟能力强大,Pipeline和Worker Pool是常用设计模式。Pipeline通过多阶段处理实现高效并行,常见问题包括数据竞争和死锁,可借助通道和`select`避免。Worker Pool控制并发数,防止资源消耗,需注意任务分配不均和goroutine泄露,使用缓冲通道和`sync.WaitGroup`解决。理解和实践这些模式是提升Go并发性能的关键。
28 2
|
4月前
|
设计模式 Go 开发工具
Golang设计模式——01工厂方法模式
Golang设计模式——01工厂方法模式
23 0
|
4月前
|
设计模式 算法 Go
Golang设计模式——08模板模式
Golang设计模式——08模板模式
26 0
|
4月前
|
设计模式 Go 开发工具
Golang设计模式——00简单工厂模式
Golang设计模式——00简单工厂模式
22 0
|
8月前
|
设计模式 安全 Java
深入剖析Golang中单例模式
深入剖析Golang中单例模式
|
10月前
|
Go
golang的Fan模式在项目中实战,我后悔了
golang的Fan模式在项目中实战,我后悔了
|
10月前
|
Go
golang构造函数的最佳实践-FOP模式
golang构造函数的最佳实践-FOP模式