开发者社区 > 云原生 > 消息队列 > 正文

使用docker部署 rocketmq的时候 java客户端报错Task was cancelle

以下是抛出的异常

E:\Java\JDK\jdk17.0.4.1\bin\java.exe "-javaagent:E:\idea\IntelliJ IDEA 2023.1.4\lib\idea_rt.jar=22912:E:\idea\IntelliJ IDEA 2023.1.4\bin" -Dfile.encoding=UTF-8 -classpath I:\rocketmq\quickstart-rocketmq-01\target\classes;E:\maven\repository\org\apache\rocketmq\rocketmq-client-java\5.0.5\rocketmq-client-java-5.0.5.jar;E:\maven\repository\org\apache\tomcat\annotations-api\6.0.53\annotations-api-6.0.53.jar com.hff.rocketmq.RocketmqConsumer
Exception in thread "main" java.lang.IllegalStateException: Expected the service PushConsumerImpl-0 [FAILED] to be RUNNING, but the service has FAILED
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:381)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:305)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService.awaitRunning(AbstractIdleService.java:165)
at org.apache.rocketmq.client.java.impl.consumer.PushConsumerBuilderImpl.build(PushConsumerBuilderImpl.java:128)
at com.hff.rocketmq.RocketmqConsumer.main(RocketmqConsumer.java:64)
Caused by: java.util.concurrent.CancellationException: Task was cancelled.
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.cancellationExceptionWithCause(AbstractFuture.java:1543)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:586)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:567)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:91)
at org.apache.rocketmq.client.java.impl.ClientImpl.startUp(ClientImpl.java:188)
at org.apache.rocketmq.client.java.impl.consumer.PushConsumerImpl.startUp(PushConsumerImpl.java:161)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService$DelegateService$1.run(AbstractIdleService.java:62)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.Callables.lambda$threadRenaming$3(Callables.java:103)
at java.base/java.lang.Thread.run(Thread.java:833)

Process finished with exit code 1

我的docker-compose

version: "3"

# networks:
#   rocketmq-net: rocketmq-net

services:
  rocketmq-nameserver:
    image: apache/rocketmq:5.1.3
    container_name: rocketmq-nameserver
    environment:
      - NAMESRV_ADDR=rocketmq-nameserver:9876
    ports:
      - "9876:9876"
    networks:
      - rocketmq-net
    command: bash -c "./mqnamesrv;sh ./mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster"

  mqborker-proxy:
    image: apache/rocketmq:5.1.3
    container_name: mqborker-proxy
    environment:
      - NAMESRV_ADDR=rocketmq-nameserver:9876
    ports:
      - "8081:8081"
      - "10909:10909"
    networks:
      - rocketmq-net
    command: ./mqbroker -n rocketmq-nameserver:9876 --enable-proxy

networks:
  rocketmq-net:
    name: rocketmq-net
    external: true
final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        String endpoints = "192.168.200.131:8081";

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        SimpleConsumerBuilderImpl builder = new SimpleConsumerBuilderImpl();


        // 订阅消息的过滤规则,表示订阅所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // 为消费者指定所属的消费者分组,Group需要提前创建。
        String consumerGroup = "YourConsumerGroup";
        // 指定需要订阅哪个目标Topic,Topic需要提前创建。
        String topic = "TestTopic";

        // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // 设置消费者分组。
                .setConsumerGroup(consumerGroup)
                // 设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 设置消费监听器。
                .setMessageListener(new MessageListener() {
                    @Override
                    public ConsumeResult consume(MessageView messageView) {
                        System.out.println(messageView.toString());
                        ByteBuffer body = messageView.getBody();
                        int capacity = body.capacity();
                        byte [] bytes=new byte[capacity];
                        for (int i = 0; i < capacity; i++) {
                            byte b = body.get(i);
                            bytes[i]=b;
                        }
                        String strBody =new String(bytes, StandardCharsets.UTF_8);
                        System.out.println(strBody);
                        return ConsumeResult.SUCCESS;
                        //return null;
                    }
                })
                .build();
        //Thread.sleep(Long.MAX_VALUE);
        // 如果不需要再使用 PushConsumer,可关闭该实例。
        // pushConsumer.close();

我不适用docker部署就可以 一使用docker 部署就出这个错误, 是我docker少了哪些配置嘛, 大佬们求解 T^T

展开
收起
7oqjggubu25ao 2023-08-11 20:13:05 959 0
3 条回答
写回答
取消 提交回答
  • hxd,解决这个问题了吗

    2024-01-30 10:52:09
    赞同 展开评论 打赏
  • op解决了吗?我现在也遇到这个问题了

    2023-11-06 11:14:09
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    使用docker部署 rocketmq的时候 java客户端报错Task was cancelle
    以下是抛出的异常

    E:\Java\JDK\jdk17.0.4.1\bin\java.exe "-javaagent:E:\idea\IntelliJ IDEA 2023.1.4\lib\idea_rt.jar=22912:E:\idea\IntelliJ IDEA 2023.1.4\bin" -Dfile.encoding=UTF-8 -classpath I:\rocketmq\quickstart-rocketmq-01\target\classes;E:\maven\repository\org\apache\rocketmq\rocketmq-client-java\5.0.5\roc

    2023-08-12 08:08:55
    赞同 展开评论 打赏

多个子产品线联合打造金融级高可用消息服务以及对物联网的原生支持,覆盖多行业。

相关产品

  • 云消息队列 MQ
  • 热门讨论

    热门文章

    相关电子书

    更多
    Spring Cloud Alibaba - 重新定义 Java Cloud-Native 立即下载
    The Reactive Cloud Native Arch 立即下载
    JAVA开发手册1.5.0 立即下载