Watermill(Golang 事件驱动库)Message Router 解析

简介: Watermill(Golang 事件驱动库)Message Router 解析

Configuration


// ...
type RouterConfig struct {
   // CloseTimeout 确定路由在关闭时应为处理程序工作多长时间。
   CloseTimeout time.Duration
}
func (c *RouterConfig) setDefaults() {
    if c.CloseTimeout == 0 {
        c.CloseTimeout = time.Second * 30
    }
}
func (c RouterConfig) Validate() error {
    return nil
}
// ...


Handler


首先,您需要实现 HandlerFunc:


// ...
// HandlerFunc 是在收到消息时调用的函数。
//
// msg.Ack() 会在 HandlerFunc 没有返回错误时自动调用。
// 当 HandlerFunc 返回错误时,msg.Nack() 被调用。
// 当 msg.Ack() 在 handler 中被调用并且 HandlerFunc 返回错误时,
// msg.Nack() 将不会被发送,因为 Ack 已经发送了。
//
// HandlerFunc 在接收到多条消息时并行执行
// (因为 msg.Ack() 是在 HandlerFunc 中发送的,或者订阅者支持多个消费者)
type HandlerFunc func(msg *Message) ([]*Message, error)
// ...


接下来,您必须使用 Router.AddHandler 添加新的处理程序:


// ...
// AddHandler 添加一个新的处理程序。
//
// handlerName 必须唯一。目前,它仅用于调试。
//
// subscribeTopic 是一个处理程序将从其中接收消息的 topic。
//
// publishTopic 是一个 router 将生成 handlerFunc 返回的消息的 topic。
// 
// 当处理程序需要发布到多个主题时,
// 建议仅向处理程序注入 Publisher 或实现中间件,
// 该中间件将捕获消息并基于例如元数据发布到主题。
func (r *Router) AddHandler(
    handlerName string,
    subscribeTopic string,
    subscriber Subscriber,
    publishTopic string,
    publisher Publisher,
    handlerFunc HandlerFunc,
) *Handler {
    r.logger.Info("Adding handler", watermill.LogFields{
        "handler_name": handlerName,
        "topic":        subscribeTopic,
    })
    if _, ok := r.handlers[handlerName]; ok {
        panic(DuplicateHandlerNameError{handlerName})
    }
    publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber)
    newHandler := &handler{
        name:   handlerName,
        logger: r.logger,
        subscriber:     subscriber,
        subscribeTopic: subscribeTopic,
        subscriberName: subscriberName,
        publisher:     publisher,
        publishTopic:  publishTopic,
        publisherName: publisherName,
        handlerFunc:       handlerFunc,
        runningHandlersWg: r.runningHandlersWg,
        messagesCh:        nil,
        closeCh:           r.closeCh,
    }
    r.handlers[handlerName] = newHandler
    return &Handler{
        router:  r,
        handler: newHandler,
    }
}
// AddNoPublisherHandler 添加一个新的 handler。
// 该 handler 无法返回消息。
// 当消息返回时,它将发生一个错误,Nack 将被发送。
//
// handlerName 必须唯一。目前,它仅用于调试。
//
// subscribeTopic 是一个 handler 将从其中接收消息的 topic。
//
// subscriber 是将从其消费消息的 Subscriber。
func (r *Router) AddNoPublisherHandler(
    handlerName string,
    subscribeTopic string,
    subscriber Subscriber,
    handlerFunc NoPublishHandlerFunc,
) {
// ...


参见入门的示例用法:


// ...
   handler := router.AddHandler(
        "struct_handler",          // handler 名称,必须是唯一的
       "incoming_messages_topic", // 我们将从中读取事件的 topic
       pubSub,
        "outgoing_messages_topic", // 我们将向其发布事件的 topic
       pubSub,
        structHandler{}.Handler,
    )
// ...


No publisher handler


并非每个处理程序都会产生新消息。您可以使用 Router.AddNoPublisherHandler 添加此类处理程序:


// ...
// AddNoPublisherHandler 添加一个新的 handler。
// 该 handler 无法返回消息。
// 当消息返回时,它将发生一个错误,Nack 将被发送。
//
// handlerName 必须唯一。目前,它仅用于调试。
//
// subscribeTopic 是一个 handler 将从其中接收消息的 topic。
//
// subscriber 是将从其消费消息的 Subscriber。
func (r *Router) AddNoPublisherHandler(
    handlerName string,
    subscribeTopic string,
    subscriber Subscriber,
    handlerFunc NoPublishHandlerFunc,
) {
// ...


Ack


默认情况下,当 handfunc 没有返回错误时,会调用 msg.Ack()。如果返回一个错误,msg.Nack() 将被调用。因此,您不必在处理消息后调用 msg.Ack() 或 msg.Nack() (当然,如果您愿意,也可以这样做)。


Producing messages


从处理程序返回多条消息时,请注意,大多数 Publisher 实现都不支持消息的原子发布。如果代理或存储不可用,它可能最终仅产生一些消息并发送 msg.Nack()。

如果这是一个问题,考虑使用每个处理程序只发布一条消息。


Running the Router


要运行 Router,你需要调用 run()。


// ...
// Run 运行所有插件和处理程序,并开始订阅所提供的 topic。
// 当 router 正在运行时,此调用被阻塞。
//
// 当所有处理程序都停止时(例如,因为订阅已关闭),router 也将停止。
//
// 要停止 Run(),你应该在路由器上调用 Close()。
//
// ctx 将传播给所有订阅者。
//
// 当所有处理程序都停止时(例如:因为关闭连接),Run() 也将停止。
func (r *Router) Run(ctx context.Context) (err error) {
// ...


Ensuring that the Router is running


知道 router 是否在运行是很有用的。对此,您可以使用 Running() 方法。


// ...
// Running is closed when router is running.
// In other words: you can wait till router is running using
//        fmt.Println("Starting router")
//        go r.Run(ctx)
//        <- r.Running()
//        fmt.Println("Router is running")
func (r *Router) Running() chan struct{} {
// ...


Execution models


订阅者可以一次使用一条消息,也可以并行使用多条消息。

  • 单消息流是最简单的方法,这意味着在调用msg.Ack()之前,订阅者不会收到任何新消息。
  • 只有某些订阅者支持多个消息流。通过一次订阅多个主题分区,多个消息可以同时被使用,即使是之前没有被ack的消息(例如,Kafka订阅器是这样工作的)。路由器通过运行并发处理函数来处理这个模型,每个分区一个处理函数。

请参阅所选的 Pub/Sub 文档以获取受支持的执行模型。


Middleware


// ...
// HandlerMiddleware 允许我们编写类似 HandlerFunc 的装饰器。
// 它可以在处理程序之前执行某些事情(例如:修改已消费的消息)
// 或之后(修改产生的消息,对被消费的消息进行 ack/nack,处理错误,记录日志,等等)执行一些事情。
//
// 它可以通过 `AddMiddleware` 方法附加到路由器上。
//
// Example:
//        func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
//            return func(message *message.Message) ([]*message.Message, error) {
//                fmt.Println("executed before handler")
//                producedMessages, err := h(message)
//                fmt.Println("executed after handler")
//
//                return producedMessages, err
//            }
//        }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
// ...


在 Middlewares 中可以找到标准中间件的完整列表。


Plugin


// ...
// RouterPlugin 是一个函数,在 Router 启动时执行。
type RouterPlugin func(*Router) error
// ...


完整的标准插件列表可以在 message/router/plugin 中找到。


Context


handler 接收到的每条消息在上下文中都保存着一些有用的值:


// ...
// HandlerNameFromCtx 返回使用该消息的路由中的消息处理程序的名称。
func HandlerNameFromCtx(ctx context.Context) string {
    return valFromCtx(ctx, handlerNameKey)
}
// PublisherNameFromCtx 返回在路由中发布消息的消息发布者类型的名称。
// For example, for Kafka it will be `kafka.Publisher`.
func PublisherNameFromCtx(ctx context.Context) string {
    return valFromCtx(ctx, publisherNameKey)
}
// SubscriberNameFromCtx 返回在路由中订阅该消息的消息订阅者类型的名称。
// For example, for Kafka it will be `kafka.Subscriber`.
func SubscriberNameFromCtx(ctx context.Context) string {
    return valFromCtx(ctx, subscriberNameKey)
}
// SubscribeTopicFromCtx 返回从路由接收到消息的主题。
func SubscribeTopicFromCtx(ctx context.Context) string {
    return valFromCtx(ctx, subscribeTopicKey)
}
// PublishTopicFromCtx 返回路由将向其发布消息的主题。
func PublishTopicFromCtx(ctx context.Context) string {
    return valFromCtx(ctx, publishTopicKey)
}
// ...


相关文章
|
1月前
|
Cloud Native 安全 Java
Go语言深度解析:从入门到精通的完整指南
🌟蒋星熠Jaxonic,Go语言探索者。深耕云计算、微服务与并发编程,以代码为笔,在二进制星河中书写极客诗篇。分享Go核心原理、性能优化与实战架构,助力开发者掌握云原生时代利器。#Go语言 #并发编程 #性能优化
372 43
Go语言深度解析:从入门到精通的完整指南
|
3月前
|
数据采集 数据挖掘 测试技术
Go与Python爬虫实战对比:从开发效率到性能瓶颈的深度解析
本文对比了Python与Go在爬虫开发中的特点。Python凭借Scrapy等框架在开发效率和易用性上占优,适合快速开发与中小型项目;而Go凭借高并发和高性能优势,适用于大规模、长期运行的爬虫服务。文章通过代码示例和性能测试,分析了两者在并发能力、错误处理、部署维护等方面的差异,并探讨了未来融合发展的趋势。
317 0
|
2月前
|
Cloud Native 安全 Java
Go语言深度解析:从入门到精通的完整指南
🌟 蒋星熠Jaxonic,执着的星际旅人,用Go语言编写代码诗篇。🚀 Go语言以简洁、高效、并发为核心,助力云计算与微服务革新。📚 本文详解Go语法、并发模型、性能优化与实战案例,助你掌握现代编程精髓。🌌 从goroutine到channel,从内存优化到高并发架构,全面解析Go的强大力量。🔧 实战构建高性能Web服务,展现Go在云原生时代的无限可能。✨ 附技术对比、最佳实践与生态全景,带你踏上Go语言的星辰征途。#Go语言 #并发编程 #云原生 #性能优化
|
7月前
|
算法 Go 索引
【LeetCode 热题100】45:跳跃游戏 II(详细解析)(Go语言版)
本文详细解析了力扣第45题“跳跃游戏II”的三种解法:贪心算法、动态规划和反向贪心。贪心算法通过选择每一步能跳到的最远位置,实现O(n)时间复杂度与O(1)空间复杂度,是面试首选;动态规划以自底向上的方式构建状态转移方程,适合初学者理解但效率较低;反向贪心从终点逆向寻找最优跳点,逻辑清晰但性能欠佳。文章对比了各方法的优劣,并提供了Go语言代码实现,助你掌握最小跳跃次数问题的核心技巧。
320 15
|
7月前
|
机器学习/深度学习 存储 算法
【LeetCode 热题100】347:前 K 个高频元素(详细解析)(Go语言版)
这篇文章详细解析了力扣热题 347——前 K 个高频元素的三种解法:哈希表+小顶堆、哈希表+快速排序和哈希表+桶排序。每种方法都附有清晰的思路讲解和 Go 语言代码实现。小顶堆方法时间复杂度为 O(n log k),适合处理大规模数据;快速排序方法时间复杂度为 O(n log n),适用于数据量较小的场景;桶排序方法在特定条件下能达到线性时间复杂度 O(n)。文章通过对比分析,帮助读者根据实际需求选择最优解法,并提供了完整的代码示例,是一篇非常实用的算法学习资料。
471 90
|
3月前
|
缓存 监控 安全
告别缓存击穿!Go 语言中的防并发神器:singleflight 包深度解析
在高并发场景中,多个请求同时访问同一资源易导致缓存击穿、数据库压力过大。Go 语言提供的 `singleflight` 包可将相同 key 的请求合并,仅执行一次实际操作,其余请求共享结果,有效降低系统负载。本文详解其原理、实现及典型应用场景,并附示例代码,助你掌握高并发优化技巧。
294 0
|
3月前
|
数据采集 JSON Go
Go语言实战案例:实现HTTP客户端请求并解析响应
本文是 Go 网络与并发实战系列的第 2 篇,详细介绍如何使用 Go 构建 HTTP 客户端,涵盖请求发送、响应解析、错误处理、Header 与 Body 提取等流程,并通过实战代码演示如何并发请求多个 URL,适合希望掌握 Go 网络编程基础的开发者。
|
5月前
|
存储 设计模式 安全
Go 语言单例模式全解析:从青铜到王者段位的实现方案
单例模式确保一个类只有一个实例,并提供全局访问点,适用于日志、配置管理、数据库连接池等场景。在 Go 中,常用实现方式包括懒汉模式、饿汉模式、双重检查锁定,最佳实践是使用 `sync.Once`,它并发安全、简洁高效。本文详解各种实现方式的优缺点,并提供代码示例与最佳应用建议。
178 5
|
6月前
|
存储 算法 Go
【LeetCode 热题100】17:电话号码的字母组合(详细解析)(Go语言版)
LeetCode 17题解题思路采用回溯算法,通过递归构建所有可能的组合。关键点包括:每位数字对应多个字母,依次尝试;递归构建下一个字符;递归出口为组合长度等于输入数字长度。Go语言实现中,使用map存储数字到字母的映射,通过回溯函数递归生成组合。时间复杂度为O(3^n * 4^m),空间复杂度为O(n)。类似题目包括括号生成、组合、全排列等。掌握回溯法的核心思想,能够解决多种排列组合问题。
250 11
|
6月前
|
Go
【LeetCode 热题100】155:最小栈(详细解析)(Go语言版)
本文详细解析了力扣热题155:最小栈的解题思路与实现方法。题目要求设计一个支持 push、核心思路是使用辅助栈法,通过两个栈(主栈和辅助栈)来维护当前栈中的最小值。具体操作包括:push 时同步更新辅助栈,pop 时检查是否需要弹出辅助栈的栈顶,getMin 时直接返回辅助栈的栈顶。文章还提供了 Go 语言的实现代码,并对复杂度进行了分析。此外,还介绍了单栈 + 差值记录法的进阶思路,并总结了常见易错点,如 pop 操作时忘记同步弹出辅助栈等。
218 6

推荐镜像

更多
下一篇
oss云网关配置