【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常

简介: 【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常

问题描述

开发Java Spring Cloud应用,需要发送消息到Azure Event Hub中。使用 Spring Cloud Stream Event Hubs Binder 依赖,应用执行一会就会遇见报错:reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.

 

问题解答

从错误来看,这明显是多线程并发处理时,多个线程同时触发了onSubscribe 或 onNext 或 onError 或 onComplete 事件,而这些事件在与 Subscriber处理时只能一个一个串行处理。

因为SpringCloud的 Controller 并发请求时,会分配多个线程同时调用many.emitNext(),这时如果之前请求线程处理还未结束,新请求的线程会直接这样的报错。

异常产生的代码为:

private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult.equals(Sinks.EmitFailureHandler.FAIL_FAST);

Sinks 对 FAIL_FAST 和 FAIL_NON_SERIALIZED的枚举值说明参考:reactor-core/Sinks.java at main · reactor/reactor-core · GitHub

  • FAIL_FAST:表示对失败不会进行任何重试,会马上触发异常处理机制,这里就是抛出EmissionException异常。( A pre-made handler that will not instruct to retry any failure and trigger the failure handling immediately.)
  • FAIL_NON_SERIALIZED:表示会持续重试,直至成功。

如果发送到Event Hub的消息允许丢失,可以通过Try Catch捕获异常后记录日志即可。

但是,如果发送的消息不能丢失,必须成功传递到Event Hub中,就可以使用 FAIL_NON_SERIALIZED 模式。

修改为:

private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED);

 

 

参考资料

ReactorDispatcher: Making sure spec 1.3 is not violated and under race, signals are not lost upon concurrent ClosedChannelException : https://github.com/Azure/azure-sdk-for-java/issues/27320

FAIL NON SERIALIZED :

https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java#L89

https://github.com/reactor/reactor-core/blob/main/reactor-core/src/main/java/reactor/core/publisher/Sinks.java#L118

/**

* Has successfully emitted the signal

*/

OK,

/**

* Has failed to emit the signal because the sink was previously terminated successfully or with an error

*/

FAIL_TERMINATED,

/**

* Has failed to emit the signal because the sink does not have buffering capacity left

*/

FAIL_OVERFLOW,

/**

* Has failed to emit the signal because the sink was previously interrupted by its consumer

*/

FAIL_CANCELLED,

/**

* Has failed to emit the signal because the access was not serialized

*/

FAIL_NON_SERIALIZED,

/**

* Has failed to emit the signal because the sink has never been subscribed to has no capacity

* to buffer the signal.

*/

FAIL_ZERO_SUBSCRIBER;

相关文章
|
5天前
|
监控 Java C#
Spring Event 的介绍
Spring Event 是 Spring 框架中的事件驱动机制,允许组件间进行同步或异步消息传递,无需直接依赖。它包括事件(Event)、事件发布者(Publisher)和事件监听器(Listener),通过 `ApplicationEventPublisher` 广播事件,实现松耦合通信,增强模块化和可维护性。Spring 还提供了多种内置事件,如 `ContextRefreshedEvent` 和 `ContextClosedEvent`,支持同步及异步处理,并具备良好的扩展性。
|
20天前
|
缓存 NoSQL Java
【Azure Redis 缓存】示例使用 redisson-spring-boot-starter 连接/使用 Azure Redis 服务
【Azure Redis 缓存】示例使用 redisson-spring-boot-starter 连接/使用 Azure Redis 服务
|
15天前
|
前端开发 小程序 Java
【规范】SpringBoot接口返回结果及异常统一处理,这样封装才优雅
本文详细介绍了如何在SpringBoot项目中统一处理接口返回结果及全局异常。首先,通过封装`ResponseResult`类,实现了接口返回结果的规范化,包括状态码、状态信息、返回信息和数据等字段,提供了多种成功和失败的返回方法。其次,利用`@RestControllerAdvice`和`@ExceptionHandler`注解配置全局异常处理,捕获并友好地处理各种异常信息。
111 0
【规范】SpringBoot接口返回结果及异常统一处理,这样封装才优雅
|
20天前
|
Java Spring
【Azure Spring Cloud】Spring Cloud Azure 4.0 调用Key Vault遇见认证错误 AADSTS90002: Tenant not found.
【Azure Spring Cloud】Spring Cloud Azure 4.0 调用Key Vault遇见认证错误 AADSTS90002: Tenant not found.
|
20天前
|
Java 应用服务中间件 nginx
【Azure Spring Apps】Spring App部署上云遇见 502 Bad Gateway nginx
【Azure Spring Apps】Spring App部署上云遇见 502 Bad Gateway nginx
|
20天前
|
Java Spring
【Azure Service Bus】使用Spring Cloud integration示例代码,为多个 Service Bus的连接使用 ConnectionString 方式
【Azure Service Bus】使用Spring Cloud integration示例代码,为多个 Service Bus的连接使用 ConnectionString 方式
|
20天前
|
Java Spring 容器
【Azure Spring Cloud】在Azure Spring Apps上看见 App Memory Usage 和 jvm.menory.use 的指标的疑问及OOM
【Azure Spring Cloud】在Azure Spring Apps上看见 App Memory Usage 和 jvm.menory.use 的指标的疑问及OOM
|
4月前
|
安全 Java 定位技术
Android 浅度解析:AIDL & Binder (1)
Android 浅度解析:AIDL & Binder (1)
135 0
|
2月前
|
缓存 安全 Java
Android深入Binder拦截问题分析
【7月更文挑战第1天】Android Binder 拦截可实现虚拟化、测试、SDK检测、逆向分析及ROM扩展。通过Java层aidl代理,利用IBinder接口规范来拦截通信。拦截步骤包括:替换Binder服务缓存对象,如ActivityManagerService;代理ServiceManager以控制服务获取。此操作需系统权限,可能涉及安全风险和版本差异,非必要时应谨慎。
|
Java API Android开发
Android中Binder在项目中的具体使用详解
Android中Binder在项目中的具体使用详解
159 0