开发者社区 > 云原生 > 云消息队列 > 正文

RocketMQ NettyClientPublicExecutor_ 这个线程是做什么事情的?

RocketMQ NettyClientPublicExecutor_ 这个线程是做什么事情的?

展开
收起
嘟嘟嘟嘟嘟嘟 2024-08-01 08:59:12 63 0
1 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    处理客户端请求:接收客户端发来的请求,如获取Topic的路由信息、发送心跳等。
    执行非阻塞操作:执行一些不会阻塞主业务流程的操作,例如异步的网络请求处理。

    参考示例

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class RocketMQClient {
    
        private final String host;
        private final int port;
        private final ExecutorService publicExecutor;
    
        public RocketMQClient(String host, int port) {
            this.host = host;
            this.port = port;
            // 创建一个固定大小的线程池,用于处理公共任务
            this.publicExecutor = Executors.newFixedThreadPool(10);
        }
    
        public void start() throws Exception {
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(workerGroup)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.SO_KEEPALIVE, true)
                 .handler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     protected void initChannel(SocketChannel ch) {
                         ch.pipeline().addLast(new StringDecoder(), new StringEncoder(),
                                 new RocketMQClientHandler(publicExecutor));
                     }
                 });
    
                Channel ch = b.connect(host, port).sync().channel();
    
                // 这里可以添加代码来发送和接收消息
                // ...
    
            } finally {
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            RocketMQClient client = new RocketMQClient("localhost", 9876);
            client.start();
        }
    }
    
    2024-08-01 22:40:27
    赞同 5 展开评论 打赏

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/

相关产品

  • 云消息队列 MQ
  • 相关电子书

    更多
    RocketMQ Client-GO 介绍 立即下载
    RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
    基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载