开发者社区> 问答> 正文

获取Akka Streams Sink之后的原始元素参考?

我正在尝试使用Amqp alpakka连接器作为源和接收器。

Source<CommittableReadResult, NotUsed> AMQP_SOURCE  
      -> processAndGetResponse 
      -> Sink<ByteString, CompletionStage<Done>> AMQP_SINK 

在接收器的操作成功之后,我想确认从Amqp队列获得的消息。像这样:

amqp_source(committableReadResult) 
    -> processAndGetResponse 
    -> amqp_sink 
    -> IfSinkOperationSuccess.Then(committableReadResult.ack())

我该如何实现?我基本上只想在接收器操作成功之后才将消息标记为确认。

问题来源:Stack Overflow

展开
收起
montos 2020-03-22 13:54:41 672 0
1 条回答
写回答
取消 提交回答
  • 通常,如果您有一个Sink具体化为的Future[Done](Scala API,我相信Java等效于CompletionStage[Done]),则可以Sink在一个mapAsync阶段中运行带有该子流的子流。数据仅在成功时通过。

    通过快速浏览Alpakka AMQP API,Sink所有方法都可以实现,因此这种方法将行得通。

    假设您的doSomethingWithMessage函数同时产生CommittableReadResult和WriteMessage,则Scala中类似以下内容的代码将起作用:

    def doSomethingWithMessage(in: CommittableReadResult): (CommittableReadResult, WriteMessage) = ???
    
    amqpSource
      .map(doSomethingWithMessage)
      .mapAsync(parallelism) { tup =>
        val (crr, wm) = tup
        val fut = Source.single(crr, wm).runWith(amqpSink)
        fut.flatMap(_ => crr.ack().map(_ => crr.message))
      }.runWith(Sink.ignore)
    

    回答来源:Stack Overflow

    2020-03-22 13:55:11
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Apache Flink 流式应用中状态的数据结构定义升级 立即下载
《Apache Flink-重新定义计算》PDF下载 立即下载
HBase2.0重新定义小对象实时存取 立即下载