RocketMQ NettyClientPublicExecutor_ 这个线程是做什么事情的?
处理客户端请求:接收客户端发来的请求,如获取Topic的路由信息、发送心跳等。
执行非阻塞操作:执行一些不会阻塞主业务流程的操作,例如异步的网络请求处理。
参考示例
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RocketMQClient {
private final String host;
private final int port;
private final ExecutorService publicExecutor;
public RocketMQClient(String host, int port) {
this.host = host;
this.port = port;
// 创建一个固定大小的线程池,用于处理公共任务
this.publicExecutor = Executors.newFixedThreadPool(10);
}
public void start() throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new StringDecoder(), new StringEncoder(),
new RocketMQClientHandler(publicExecutor));
}
});
Channel ch = b.connect(host, port).sync().channel();
// 这里可以添加代码来发送和接收消息
// ...
} finally {
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
RocketMQClient client = new RocketMQClient("localhost", 9876);
client.start();
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/