golang实现Pub/Sub模式

简介: golang实现Pub/Sub模式

golang实现Pub/Sub模式



一、什么是Pub/Sub模式


Pub/Sub 是一种可扩缩的异步消息传递服务,可将生成消息的服务与处理这些消息的服务分离开来。


Pub/Sub 允许服务异步通信,延迟时间大约为 100 毫秒。


Pub/Sub 用于流式分析和数据集成流水线,以注入和分发数据。无论是作为用于消息整合的消息传递中间件,还是作为并行处理任务的队列,它都非常有效。


通过 Pub/Sub,可以创建事件提供方和使用方的系统,称为发布者和订阅者。发布者通过广播事件而不是同步远程过程调用 (RPC) 与订阅者异步通信。


发布者将事件发送到 Pub/Sub 服务,而不考虑如何或何时处理这些事件。然后,Pub/Sub 会将事件传送到对其做出响应的所有服务。在通过 RPC 进行通信的系统中,发布商必须等待订阅者接收数据。但是,Pub/Sub 中的异步集成可以提高整个系统的灵活性和稳健性。


二、用Go实现简单的Pub/Sub服务


hub.go 实现Pub/Sub 服务,类似于中心枢纽,用于接受和分发消息


package main
import (
 "context"
 "sync"
)
type PubSubHub struct {
 sync.Mutex
 subs map[*subscriber]struct{}
}
func (h *PubSubHub) publish(ctx context.Context, msg *message) {
 for s := range h.subs {
  s.publish(ctx, msg)
 }
}
func (h *PubSubHub) subscribe(ctx context.Context, s *subscriber) {
 h.subs[s] = struct{}{}
 go func() {
  select {
  case <-s.quit:
  case <-ctx.Done():
   delete(h.subs, s)
  }
 }()
 go s.run(ctx)
}
func (h *PubSubHub) unsubscribe(ctx context.Context, s *subscriber) {
 delete(h.subs, s)
 close(s.quit)
}
func (h *PubSubHub) subscribers() int {
 return len(h.subs)
}
func newPubSubHub() *PubSubHub {
 return &PubSubHub{
  subs: map[*subscriber]struct{}{},
 }
}


subscriber.go 实现消息订阅和消费


package main
import (
 "context"
 "fmt"
 "sync"
)
type message struct {
 data []byte
}
type subscriber struct {
 sync.Mutex
 name string
 handler chan *message
 quit chan struct{}
}
func (s *subscriber) run(ctx context.Context) {
 for {
  select {
  case msg:=<-s.handler:
   fmt.Println(s.name, string(msg.data))
  case <-s.quit:
   return
  case <-ctx.Done():
   return
  }
 }
}
func (s *subscriber) publish(ctx context.Context, msg *message) {
 select {
 case <-ctx.Done():
  return
 case s.handler <- msg:
 default:
 }
}
func newSubscriber(name string) *subscriber {
 return &subscriber{
  name: name,
  handler: make(chan *message, 100),
  quit: make(chan struct{}),
 }
}


三、验证


main.go


package main
import (
 "context"
 "time"
)
func main() {
 ctx := context.Background()
 h := newPubSubHub()
 sub01 := newSubscriber("sub01")
 sub02 := newSubscriber("sub02")
 sub03 := newSubscriber("sub03")
 h.subscribe(ctx, sub01)
 h.subscribe(ctx, sub02)
 h.subscribe(ctx, sub03)
 h.publish(ctx, &message{
  data: []byte("test01"),
 })
 time.Sleep(1*time.Second)
 h.unsubscribe(ctx, sub03)
 h.publish(ctx, &message{
  data: []byte("test02"),
 })
 time.Sleep(1*time.Second)
}


运行main.go并且打印结果:


sub03 test01
sub01 test01
sub02 test01
sub02 test02
sub01 test02

打印的结果是对的,sub1-3都订阅了Pub/Sub服务,发送消息test01,三者都收到了。紧接着sub03取消订阅,然后再次发送消息test02,只有sub1-2收到了。


四、流程图


1、Subscriber 订阅消息


2、Publisher 发送消息

640.png


3、Unsubscribe 取消订阅



640.png

相关文章
|
8月前
|
设计模式 Go 开发工具
Golang设计模式——12中介模式
Golang设计模式——12中介模式
51 0
|
8月前
|
物联网 Go 网络性能优化
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式【1月更文挑战第21天】【1月更文挑战第104篇】
531 1
|
10天前
|
Go 数据安全/隐私保护
Golang 里的 AES、DES、3DES 加解密,支持 ECB、CBC 等多种模式组合
Openssl encryption 是 OpenSSL 库的功能包装,支持对称加密算法(AES、DES、3DES)的 ECB 和 CBC 模式。提供简便的 Go 语言接口,用于加密和解密操作。安装命令:`go get -u github.com/forgoer/openssl`。示例代码展示了 AES-ECB、AES-CBC 等模式的使用方法,支持 PKCS7 填充。
41 9
|
6月前
|
测试技术 Go
golang 的重试弹性模式
Golang 中的重试机制实现了一个名为 `Retrier` 的结构体,用于实现弹性模式。`Retrier` 创建时需要指定重试间隔(如常量间隔或指数递增间隔)和错误分类器。分类器决定了哪些错误应被重试。默认情况下,如果未提供分类器,则使用默认分类器,它简单地将非 nil 错误标记为应重试。提供了三种分类器:默认、白名单和黑名单。`Run` 和 `RunCtx` 是执行重试的函数,后者接受上下文以便处理超时。通过 `calcSleep` 计算带有随机抖动的休眠时间,增加重试的不可预测性,减少并发冲突。如果达到最大重试次数或上下文超时,重试将停止。
|
6月前
|
测试技术 Go
golang 的重试弹性模式怎么设计?
Golang的可重构弹性模式通过`Retrier`实现了重试逻辑。创建`Retrier`需指定重试间隔(隐含重试次数)及错误分类器,决定哪些错误需重试。示例代码展示了如何创建一个重试器并执行带有重试逻辑的工作函数。`Retrier`结构体包含重试间隔、分类器等字段。
|
8月前
|
前端开发 Go
Golang深入浅出之-Go语言中的异步编程与Future/Promise模式
【5月更文挑战第3天】Go语言通过goroutines和channels实现异步编程,虽无内置Future/Promise,但可借助其特性模拟。本文探讨了如何使用channel实现Future模式,提供了异步获取URL内容长度的示例,并警示了Channel泄漏、错误处理和并发控制等常见问题。为避免这些问题,建议显式关闭channel、使用context.Context、并发控制机制及有效传播错误。理解并应用这些技巧能提升Go语言异步编程的效率和健壮性。
407 5
Golang深入浅出之-Go语言中的异步编程与Future/Promise模式
|
8月前
|
设计模式 Go 调度
Golang深入浅出之-Go语言中的并发模式:Pipeline、Worker Pool等
【5月更文挑战第1天】Go语言并发模拟能力强大,Pipeline和Worker Pool是常用设计模式。Pipeline通过多阶段处理实现高效并行,常见问题包括数据竞争和死锁,可借助通道和`select`避免。Worker Pool控制并发数,防止资源消耗,需注意任务分配不均和goroutine泄露,使用缓冲通道和`sync.WaitGroup`解决。理解和实践这些模式是提升Go并发性能的关键。
96 2
|
8月前
|
设计模式 Go 开发工具
Golang设计模式——01工厂方法模式
Golang设计模式——01工厂方法模式
51 0
|
8月前
|
设计模式 算法 Go
Golang设计模式——08模板模式
Golang设计模式——08模板模式
62 0
|
8月前
|
设计模式 Go 开发工具
Golang设计模式——00简单工厂模式
Golang设计模式——00简单工厂模式
50 0