以下是抛出的异常
E:\Java\JDK\jdk17.0.4.1\bin\java.exe "-javaagent:E:\idea\IntelliJ IDEA 2023.1.4\lib\idea_rt.jar=22912:E:\idea\IntelliJ IDEA 2023.1.4\bin" -Dfile.encoding=UTF-8 -classpath I:\rocketmq\quickstart-rocketmq-01\target\classes;E:\maven\repository\org\apache\rocketmq\rocketmq-client-java\5.0.5\rocketmq-client-java-5.0.5.jar;E:\maven\repository\org\apache\tomcat\annotations-api\6.0.53\annotations-api-6.0.53.jar com.hff.rocketmq.RocketmqConsumer
Exception in thread "main" java.lang.IllegalStateException: Expected the service PushConsumerImpl-0 [FAILED] to be RUNNING, but the service has FAILED
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:381)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:305)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService.awaitRunning(AbstractIdleService.java:165)
at org.apache.rocketmq.client.java.impl.consumer.PushConsumerBuilderImpl.build(PushConsumerBuilderImpl.java:128)
at com.hff.rocketmq.RocketmqConsumer.main(RocketmqConsumer.java:64)
Caused by: java.util.concurrent.CancellationException: Task was cancelled.
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.cancellationExceptionWithCause(AbstractFuture.java:1543)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:586)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:567)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:91)
at org.apache.rocketmq.client.java.impl.ClientImpl.startUp(ClientImpl.java:188)
at org.apache.rocketmq.client.java.impl.consumer.PushConsumerImpl.startUp(PushConsumerImpl.java:161)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService$DelegateService$1.run(AbstractIdleService.java:62)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.Callables.lambda$threadRenaming$3(Callables.java:103)
at java.base/java.lang.Thread.run(Thread.java:833)
Process finished with exit code 1
我的docker-compose
version: "3"
# networks:
# rocketmq-net: rocketmq-net
services:
rocketmq-nameserver:
image: apache/rocketmq:5.1.3
container_name: rocketmq-nameserver
environment:
- NAMESRV_ADDR=rocketmq-nameserver:9876
ports:
- "9876:9876"
networks:
- rocketmq-net
command: bash -c "./mqnamesrv;sh ./mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster"
mqborker-proxy:
image: apache/rocketmq:5.1.3
container_name: mqborker-proxy
environment:
- NAMESRV_ADDR=rocketmq-nameserver:9876
ports:
- "8081:8081"
- "10909:10909"
networks:
- rocketmq-net
command: ./mqbroker -n rocketmq-nameserver:9876 --enable-proxy
networks:
rocketmq-net:
name: rocketmq-net
external: true
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoints = "192.168.200.131:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
SimpleConsumerBuilderImpl builder = new SimpleConsumerBuilderImpl();
// 订阅消息的过滤规则,表示订阅所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 为消费者指定所属的消费者分组,Group需要提前创建。
String consumerGroup = "YourConsumerGroup";
// 指定需要订阅哪个目标Topic,Topic需要提前创建。
String topic = "TestTopic";
// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 设置消费者分组。
.setConsumerGroup(consumerGroup)
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置消费监听器。
.setMessageListener(new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView.toString());
ByteBuffer body = messageView.getBody();
int capacity = body.capacity();
byte [] bytes=new byte[capacity];
for (int i = 0; i < capacity; i++) {
byte b = body.get(i);
bytes[i]=b;
}
String strBody =new String(bytes, StandardCharsets.UTF_8);
System.out.println(strBody);
return ConsumeResult.SUCCESS;
//return null;
}
})
.build();
//Thread.sleep(Long.MAX_VALUE);
// 如果不需要再使用 PushConsumer,可关闭该实例。
// pushConsumer.close();
我不适用docker部署就可以 一使用docker 部署就出这个错误, 是我docker少了哪些配置嘛, 大佬们求解 T^T
使用docker部署 rocketmq的时候 java客户端报错Task was cancelle
以下是抛出的异常
E:\Java\JDK\jdk17.0.4.1\bin\java.exe "-javaagent:E:\idea\IntelliJ IDEA 2023.1.4\lib\idea_rt.jar=22912:E:\idea\IntelliJ IDEA 2023.1.4\bin" -Dfile.encoding=UTF-8 -classpath I:\rocketmq\quickstart-rocketmq-01\target\classes;E:\maven\repository\org\apache\rocketmq\rocketmq-client-java\5.0.5\roc
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/