开发者社区> 问答> 正文

如何从Akka Streams Sink中抛出的异常中恢复?

如何从Akka流接收器中引发的异常中恢复?

简单的例子:

Source<Integer, NotUsed> integerSource = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9));

    integerSource.runWith(Sink.foreach(x -> {
      if (x == 4) {
        throw new Exception("Error Occurred");
      }
      System.out.println("Sink: " + x);
    }), system);

输出:

Sink: 1
Sink: 2
Sink: 3

如何处理异常并从源代码移至下一个元素?(aka 5,6,7,8,9)

问题来源:Stack Overflow

展开
收起
montos 2020-03-22 14:55:44 617 0
1 条回答
写回答
取消 提交回答
  • 默认情况下,监视策略在引发异常时停止流。要更改监视策略以删除引起异常的消息并继续下一条消息,请使用“恢复”策略。例如:

    final Function<Throwable, Supervision.Directive> decider =
      exc -> {
        return Supervision.resume();
      };
    
    final Sink<Integer, CompletionStage<Done>> printSink =
      Sink.foreach(x -> {
        if (x == 4) {
          throw new Exception("Error Occurred");
        }
        System.out.println("Sink: " + x);
      });
    
    final RunnableGraph<CompletionStage<Done>> runnableGraph =
      integerSource.toMat(printSink, Keep.right());
    
    final RunnableGraph<CompletionStage<Done>> withResumingSupervision =
      runnableGraph.withAttributes(ActorAttributes.withSupervisionStrategy(decider));
    
    final CompletionStage<Done> result = withResumingSupervision.run(system);
    

    您还可以为不同种类的异常定义不同的监管策略:

    final Function<Throwable, Supervision.Directive> decider =
      exc -> {
        if (exc instanceof MySpecificException) return Supervision.resume();
        else return Supervision.stop();
      };
    

    回答来源:Stack Overflow

    2020-03-22 14:56:47
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
Apache Flink 流式应用中状态的数据结构定义升级 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载