我正在尝试使用Amqp alpakka连接器作为源和接收器。
Source<CommittableReadResult, NotUsed> AMQP_SOURCE
-> processAndGetResponse
-> Sink<ByteString, CompletionStage<Done>> AMQP_SINK
在接收器的操作成功之后,我想确认从Amqp队列获得的消息。像这样:
amqp_source(committableReadResult)
-> processAndGetResponse
-> amqp_sink
-> IfSinkOperationSuccess.Then(committableReadResult.ack())
我该如何实现?我基本上只想在接收器操作成功之后才将消息标记为确认。
问题来源:Stack Overflow
通常,如果您有一个Sink具体化为的Future[Done](Scala API,我相信Java等效于CompletionStage[Done]),则可以Sink在一个mapAsync阶段中运行带有该子流的子流。数据仅在成功时通过。
通过快速浏览Alpakka AMQP API,Sink所有方法都可以实现,因此这种方法将行得通。
假设您的doSomethingWithMessage函数同时产生CommittableReadResult和WriteMessage,则Scala中类似以下内容的代码将起作用:
def doSomethingWithMessage(in: CommittableReadResult): (CommittableReadResult, WriteMessage) = ???
amqpSource
.map(doSomethingWithMessage)
.mapAsync(parallelism) { tup =>
val (crr, wm) = tup
val fut = Source.single(crr, wm).runWith(amqpSink)
fut.flatMap(_ => crr.ack().map(_ => crr.message))
}.runWith(Sink.ignore)
回答来源:Stack Overflow
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。