Configuration
// ... type RouterConfig struct { // CloseTimeout 确定路由在关闭时应为处理程序工作多长时间。 CloseTimeout time.Duration } func (c *RouterConfig) setDefaults() { if c.CloseTimeout == 0 { c.CloseTimeout = time.Second * 30 } } func (c RouterConfig) Validate() error { return nil } // ...
Handler
首先,您需要实现 HandlerFunc:
// ... // HandlerFunc 是在收到消息时调用的函数。 // // msg.Ack() 会在 HandlerFunc 没有返回错误时自动调用。 // 当 HandlerFunc 返回错误时,msg.Nack() 被调用。 // 当 msg.Ack() 在 handler 中被调用并且 HandlerFunc 返回错误时, // msg.Nack() 将不会被发送,因为 Ack 已经发送了。 // // HandlerFunc 在接收到多条消息时并行执行 // (因为 msg.Ack() 是在 HandlerFunc 中发送的,或者订阅者支持多个消费者) type HandlerFunc func(msg *Message) ([]*Message, error) // ...
接下来,您必须使用 Router.AddHandler
添加新的处理程序:
// ... // AddHandler 添加一个新的处理程序。 // // handlerName 必须唯一。目前,它仅用于调试。 // // subscribeTopic 是一个处理程序将从其中接收消息的 topic。 // // publishTopic 是一个 router 将生成 handlerFunc 返回的消息的 topic。 // // 当处理程序需要发布到多个主题时, // 建议仅向处理程序注入 Publisher 或实现中间件, // 该中间件将捕获消息并基于例如元数据发布到主题。 func (r *Router) AddHandler( handlerName string, subscribeTopic string, subscriber Subscriber, publishTopic string, publisher Publisher, handlerFunc HandlerFunc, ) *Handler { r.logger.Info("Adding handler", watermill.LogFields{ "handler_name": handlerName, "topic": subscribeTopic, }) if _, ok := r.handlers[handlerName]; ok { panic(DuplicateHandlerNameError{handlerName}) } publisherName, subscriberName := internal.StructName(publisher), internal.StructName(subscriber) newHandler := &handler{ name: handlerName, logger: r.logger, subscriber: subscriber, subscribeTopic: subscribeTopic, subscriberName: subscriberName, publisher: publisher, publishTopic: publishTopic, publisherName: publisherName, handlerFunc: handlerFunc, runningHandlersWg: r.runningHandlersWg, messagesCh: nil, closeCh: r.closeCh, } r.handlers[handlerName] = newHandler return &Handler{ router: r, handler: newHandler, } } // AddNoPublisherHandler 添加一个新的 handler。 // 该 handler 无法返回消息。 // 当消息返回时,它将发生一个错误,Nack 将被发送。 // // handlerName 必须唯一。目前,它仅用于调试。 // // subscribeTopic 是一个 handler 将从其中接收消息的 topic。 // // subscriber 是将从其消费消息的 Subscriber。 func (r *Router) AddNoPublisherHandler( handlerName string, subscribeTopic string, subscriber Subscriber, handlerFunc NoPublishHandlerFunc, ) { // ...
参见入门的示例用法:
// ... handler := router.AddHandler( "struct_handler", // handler 名称,必须是唯一的 "incoming_messages_topic", // 我们将从中读取事件的 topic pubSub, "outgoing_messages_topic", // 我们将向其发布事件的 topic pubSub, structHandler{}.Handler, ) // ...
No publisher handler
并非每个处理程序都会产生新消息。您可以使用 Router.AddNoPublisherHandler
添加此类处理程序:
// ... // AddNoPublisherHandler 添加一个新的 handler。 // 该 handler 无法返回消息。 // 当消息返回时,它将发生一个错误,Nack 将被发送。 // // handlerName 必须唯一。目前,它仅用于调试。 // // subscribeTopic 是一个 handler 将从其中接收消息的 topic。 // // subscriber 是将从其消费消息的 Subscriber。 func (r *Router) AddNoPublisherHandler( handlerName string, subscribeTopic string, subscriber Subscriber, handlerFunc NoPublishHandlerFunc, ) { // ...
Ack
默认情况下,当 handfunc 没有返回错误时,会调用 msg.Ack()。如果返回一个错误,msg.Nack() 将被调用。因此,您不必在处理消息后调用 msg.Ack() 或 msg.Nack() (当然,如果您愿意,也可以这样做)。
Producing messages
从处理程序返回多条消息时,请注意,大多数 Publisher 实现都不支持消息的原子发布。如果代理或存储不可用,它可能最终仅产生一些消息并发送 msg.Nack()。
如果这是一个问题,考虑使用每个处理程序只发布一条消息。
Running the Router
要运行 Router,你需要调用 run()。
// ... // Run 运行所有插件和处理程序,并开始订阅所提供的 topic。 // 当 router 正在运行时,此调用被阻塞。 // // 当所有处理程序都停止时(例如,因为订阅已关闭),router 也将停止。 // // 要停止 Run(),你应该在路由器上调用 Close()。 // // ctx 将传播给所有订阅者。 // // 当所有处理程序都停止时(例如:因为关闭连接),Run() 也将停止。 func (r *Router) Run(ctx context.Context) (err error) { // ...
Ensuring that the Router is running
知道 router 是否在运行是很有用的。对此,您可以使用 Running()
方法。
// ... // Running is closed when router is running. // In other words: you can wait till router is running using // fmt.Println("Starting router") // go r.Run(ctx) // <- r.Running() // fmt.Println("Router is running") func (r *Router) Running() chan struct{} { // ...
Execution models
订阅者可以一次使用一条消息,也可以并行使用多条消息。
- 单消息流是最简单的方法,这意味着在调用msg.Ack()之前,订阅者不会收到任何新消息。
- 只有某些订阅者支持多个消息流。通过一次订阅多个主题分区,多个消息可以同时被使用,即使是之前没有被ack的消息(例如,Kafka订阅器是这样工作的)。路由器通过运行并发处理函数来处理这个模型,每个分区一个处理函数。
请参阅所选的 Pub/Sub 文档以获取受支持的执行模型。
Middleware
// ... // HandlerMiddleware 允许我们编写类似 HandlerFunc 的装饰器。 // 它可以在处理程序之前执行某些事情(例如:修改已消费的消息) // 或之后(修改产生的消息,对被消费的消息进行 ack/nack,处理错误,记录日志,等等)执行一些事情。 // // 它可以通过 `AddMiddleware` 方法附加到路由器上。 // // Example: // func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc { // return func(message *message.Message) ([]*message.Message, error) { // fmt.Println("executed before handler") // producedMessages, err := h(message) // fmt.Println("executed after handler") // // return producedMessages, err // } // } type HandlerMiddleware func(h HandlerFunc) HandlerFunc // ...
在 Middlewares 中可以找到标准中间件的完整列表。
Plugin
// ... // RouterPlugin 是一个函数,在 Router 启动时执行。 type RouterPlugin func(*Router) error // ...
完整的标准插件列表可以在 message/router/plugin 中找到。
Context
handler 接收到的每条消息在上下文中都保存着一些有用的值:
// ... // HandlerNameFromCtx 返回使用该消息的路由中的消息处理程序的名称。 func HandlerNameFromCtx(ctx context.Context) string { return valFromCtx(ctx, handlerNameKey) } // PublisherNameFromCtx 返回在路由中发布消息的消息发布者类型的名称。 // For example, for Kafka it will be `kafka.Publisher`. func PublisherNameFromCtx(ctx context.Context) string { return valFromCtx(ctx, publisherNameKey) } // SubscriberNameFromCtx 返回在路由中订阅该消息的消息订阅者类型的名称。 // For example, for Kafka it will be `kafka.Subscriber`. func SubscriberNameFromCtx(ctx context.Context) string { return valFromCtx(ctx, subscriberNameKey) } // SubscribeTopicFromCtx 返回从路由接收到消息的主题。 func SubscribeTopicFromCtx(ctx context.Context) string { return valFromCtx(ctx, subscribeTopicKey) } // PublishTopicFromCtx 返回路由将向其发布消息的主题。 func PublishTopicFromCtx(ctx context.Context) string { return valFromCtx(ctx, publishTopicKey) } // ...