开发者社区 > 云原生 > 云消息队列 > 正文

使用docker部署 rocketmq的时候 java客户端报错Task was cancelle

以下是抛出的异常

E:\Java\JDK\jdk17.0.4.1\bin\java.exe "-javaagent:E:\idea\IntelliJ IDEA 2023.1.4\lib\idea_rt.jar=22912:E:\idea\IntelliJ IDEA 2023.1.4\bin" -Dfile.encoding=UTF-8 -classpath I:\rocketmq\quickstart-rocketmq-01\target\classes;E:\maven\repository\org\apache\rocketmq\rocketmq-client-java\5.0.5\rocketmq-client-java-5.0.5.jar;E:\maven\repository\org\apache\tomcat\annotations-api\6.0.53\annotations-api-6.0.53.jar com.hff.rocketmq.RocketmqConsumer
Exception in thread "main" java.lang.IllegalStateException: Expected the service PushConsumerImpl-0 [FAILED] to be RUNNING, but the service has FAILED
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:381)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:305)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService.awaitRunning(AbstractIdleService.java:165)
at org.apache.rocketmq.client.java.impl.consumer.PushConsumerBuilderImpl.build(PushConsumerBuilderImpl.java:128)
at com.hff.rocketmq.RocketmqConsumer.main(RocketmqConsumer.java:64)
Caused by: java.util.concurrent.CancellationException: Task was cancelled.
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.cancellationExceptionWithCause(AbstractFuture.java:1543)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:586)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:567)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:91)
at org.apache.rocketmq.client.java.impl.ClientImpl.startUp(ClientImpl.java:188)
at org.apache.rocketmq.client.java.impl.consumer.PushConsumerImpl.startUp(PushConsumerImpl.java:161)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService$DelegateService$1.run(AbstractIdleService.java:62)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.Callables.lambda$threadRenaming$3(Callables.java:103)
at java.base/java.lang.Thread.run(Thread.java:833)

Process finished with exit code 1

我的docker-compose

version: "3"

# networks:
#   rocketmq-net: rocketmq-net

services:
  rocketmq-nameserver:
    image: apache/rocketmq:5.1.3
    container_name: rocketmq-nameserver
    environment:
      - NAMESRV_ADDR=rocketmq-nameserver:9876
    ports:
      - "9876:9876"
    networks:
      - rocketmq-net
    command: bash -c "./mqnamesrv;sh ./mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster"

  mqborker-proxy:
    image: apache/rocketmq:5.1.3
    container_name: mqborker-proxy
    environment:
      - NAMESRV_ADDR=rocketmq-nameserver:9876
    ports:
      - "8081:8081"
      - "10909:10909"
    networks:
      - rocketmq-net
    command: ./mqbroker -n rocketmq-nameserver:9876 --enable-proxy

networks:
  rocketmq-net:
    name: rocketmq-net
    external: true
final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        String endpoints = "192.168.200.131:8081";

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        SimpleConsumerBuilderImpl builder = new SimpleConsumerBuilderImpl();


        // 订阅消息的过滤规则,表示订阅所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // 为消费者指定所属的消费者分组,Group需要提前创建。
        String consumerGroup = "YourConsumerGroup";
        // 指定需要订阅哪个目标Topic,Topic需要提前创建。
        String topic = "TestTopic";

        // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // 设置消费者分组。
                .setConsumerGroup(consumerGroup)
                // 设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 设置消费监听器。
                .setMessageListener(new MessageListener() {
                    @Override
                    public ConsumeResult consume(MessageView messageView) {
                        System.out.println(messageView.toString());
                        ByteBuffer body = messageView.getBody();
                        int capacity = body.capacity();
                        byte [] bytes=new byte[capacity];
                        for (int i = 0; i < capacity; i++) {
                            byte b = body.get(i);
                            bytes[i]=b;
                        }
                        String strBody =new String(bytes, StandardCharsets.UTF_8);
                        System.out.println(strBody);
                        return ConsumeResult.SUCCESS;
                        //return null;
                    }
                })
                .build();
        //Thread.sleep(Long.MAX_VALUE);
        // 如果不需要再使用 PushConsumer,可关闭该实例。
        // pushConsumer.close();

我不适用docker部署就可以 一使用docker 部署就出这个错误, 是我docker少了哪些配置嘛, 大佬们求解 T^T

展开
收起
7oqjggubu25ao 2023-08-11 20:13:05 1359 0
3 条回答
写回答
取消 提交回答
  • hxd,解决这个问题了吗

    2024-01-30 10:52:09
    赞同 1 展开评论 打赏
  • op解决了吗?我现在也遇到这个问题了

    2023-11-06 11:14:09
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    使用docker部署 rocketmq的时候 java客户端报错Task was cancelle
    以下是抛出的异常

    E:\Java\JDK\jdk17.0.4.1\bin\java.exe "-javaagent:E:\idea\IntelliJ IDEA 2023.1.4\lib\idea_rt.jar=22912:E:\idea\IntelliJ IDEA 2023.1.4\bin" -Dfile.encoding=UTF-8 -classpath I:\rocketmq\quickstart-rocketmq-01\target\classes;E:\maven\repository\org\apache\rocketmq\rocketmq-client-java\5.0.5\roc

    2023-08-12 08:08:55
    赞同 展开评论 打赏

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/

相关产品

  • 云消息队列 MQ
  • 热门讨论

    热门文章

    相关电子书

    更多
    RocketMQ Client-GO 介绍 立即下载
    RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
    基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载