【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错

本文涉及的产品
函数计算FC,每月15万CU 3个月
应用实时监控服务-应用监控,每月50GB免费额度
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.

问题描述

根据 Spring Cloud Stream Binder Kafka 发送消息/接收消息的代码示例,只需要配置好 application.yaml 中的 event hub connection string, Event Hub Name就可以正常运行并查看执行结果。

但是,当使用JMeter作并发测试时候,就会遇见异常,  主要的异常信息为:reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially. 这是Sinks中发送者发送消息的时候,调用 onSubscribe、onNext、onError 和 onComplete 必须顺序执行,不能并发操作。


完整的异常日志:

2025-02-18T13:08:34.399+08:00 ERROR 35416 --- [io-8080-exec-41] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : 
Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: reactor.core.publisher.Sinks$EmissionException: 
Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.] with root cause
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
 at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:56) ~[reactor-core-3.7.2.jar:3.7.2]
 at com.azure.spring.sample.eventhubs.SourceExample.sendMessage(SourceExample.java:22) ~[classes/:na]
 at jdk.internal.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) ~[na:na]
 at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
 at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
 at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:257) ~[spring-web-6.2.2.jar:6.2.2]
 at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:190) ~[spring-web-6.2.2.jar:6.2.2]
 at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:118) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:986) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:891) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1088) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:978) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1014) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:914) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:590) ~[tomcat-embed-core-10.1.34.jar:6.0]
 at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:658) ~[tomcat-embed-core-10.1.34.jar:6.0]
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:195) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51) ~[tomcat-embed-websocket-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:164) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-6.2.2.jar:6.2.2]
 at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.2.2.jar:6.2.2]
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:164) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-6.2.2.jar:6.2.2]
 at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.2.2.jar:6.2.2]
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:164) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-6.2.2.jar:6.2.2]
 at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.2.2.jar:6.2.2]
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:164) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:167) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:90) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:483) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:115) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:93) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:344) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:397) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:905) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1741) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1190) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:63) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]


 

问题解答

错误的原因主要是在发送消息的方法中使用了 Sinks.EmitFailureHandler.FAIL_FAST参数,这个参数表示只要发送消息失败,不进行任何重试,直接返回异常。

在Sinks类中,因无法使用 EmitResult 中的其它参数来代替EmitFailureHandler.FAIL_FAST, 所以只能使用 tryEmitNext 方法来缓解此问题。

 


参考资料


Sending and Receiving Message by Azure Event Hubs and Spring Cloud Stream Binder Kafka in Spring Boot Application : https://github.com/Azure-Samples/azure-spring-boot-samples/tree/main/eventhubs/spring-cloud-azure-starter/spring-cloud-azure-sample-eventhubs-kafka


【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常 :https://www.cnblogs.com/lulight/p/17357267.html

 

相关文章
|
2月前
|
前端开发 Java API
Spring Cloud Gateway Server Web MVC报错“Unsupported transfer encoding: chunked”解决
本文解析了Spring Cloud Gateway中出现“Unsupported transfer encoding: chunked”错误的原因,指出该问题源于Feign依赖的HTTP客户端与服务端的`chunked`传输编码不兼容,并提供了具体的解决方案。通过规范Feign客户端接口的返回类型,可有效避免该异常,提升系统兼容性与稳定性。
180 0
|
5月前
|
Java 开发工具 Spring
【Azure Application Insights】为Spring Boot应用集成Application Insight SDK
本文以Java Spring Boot项目为例,详细说明如何集成Azure Application Insights SDK以收集和展示日志。内容包括三步配置:1) 在`pom.xml`中添加依赖项`applicationinsights-runtime-attach`和`applicationinsights-core`;2) 在main函数中调用`ApplicationInsights.attach()`;3) 配置`applicationinsights.json`文件。同时提供问题排查建议及自定义日志方法示例,帮助用户顺利集成并使用Application Insights服务。
128 8
|
8月前
|
Java Spring 容器
springcloud-config客户端启用服务发现报错找不到bean EurekaHttpClient
解决 Spring Cloud Config 客户端启用服务发现时报错找不到 bean `EurekaHttpClient` 的问题,主要涉及版本兼容性、依赖配置和正确的配置文件设置。通过检查依赖版本、添加必要的依赖项、配置文件的正确性以及启用服务发现注解,可以有效解决此问题。确保日志中没有其他错误信息也是关键步骤之一。通过这些方法,可以确保 Spring Cloud Config 与 Eureka 客户端正常工作。
154 6
|
Java 应用服务中间件 nginx
【Azure Spring Apps】Spring App部署上云遇见 502 Bad Gateway nginx
【Azure Spring Apps】Spring App部署上云遇见 502 Bad Gateway nginx
137 0
|
Java Spring
【Azure Service Bus】使用Spring Cloud integration示例代码,为多个 Service Bus的连接使用 ConnectionString 方式
【Azure Service Bus】使用Spring Cloud integration示例代码,为多个 Service Bus的连接使用 ConnectionString 方式
125 0
|
消息中间件 Java 开发工具
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
132 0
|
8月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
11月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
377 1
|
11月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
262 1
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
899 9

相关产品

  • 云消息队列 Kafka 版