前言:
在前面有了对NIO、BIO知识的学习,以及对netty结构组的基本了解,接下来将学习一下如何使用netty去实现一个群聊功能,读者可自行去对比基于NIO、BIO、Netty实现群聊功能的不同方式,以更深刻的理解IO网络编程。
学习内容:
整体思路:
1) 编写一个Netty群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
2) 实现多人聊天
3) 服务器端:可以监测用户上线,离线,并实现消息转发
4) 客户端: 通过channel可以无阻塞发送消息给其他所有用户,同时可接收到其他用户发送的消息)
5)目的:进一步理解Netty非阻塞网络编程机制
具体实现
服务端代码
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import java.util.Scanner; /** * @PackageName: com.netty.demo.chart * @author: youjp * @create: 2021-01-22 10:05 * @description: 群聊服务端 * @Version: 1.0 */ public class NettyChartServer { /** * 服务端口号 */ private int port; public NettyChartServer(int port) { this.port = port; } /** * 服务执行:用于处理客户端请求 */ public void run() { //服务线程组 EventLoopGroup bossgroup = new NioEventLoopGroup(1); //工作组线程池,默认为CPU核数*2 EventLoopGroup workgroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); try { bootstrap.group(bossgroup, workgroup) .channel(NioServerSocketChannel.class) //可以监听新进来的TCP连接的通道 .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态 .handler(new LoggingHandler(LogLevel.INFO))//心跳检测日志 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //获取到piepline管道 ChannelPipeline pipeline = socketChannel.pipeline(); //向pipeline加入解码器 pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入编码器 pipeline.addLast("encoder", new StringEncoder()); //心跳检测handle pipeline.addLast("idcheck",new IdleStateHandler(3,5,8, TimeUnit.SECONDS)); //加入自己的业务处理handler pipeline.addLast("handle",new MyNettyServerHandler()); } }); ChannelFuture ch = bootstrap.bind(port).sync(); System.out.println("服务器 is ready-------"); //异步监听端口 ch.addListeners(e->{ if(e.isSuccess()){ System.out.println("监听端口 "+port+" 成功"); }else { System.out.println("监听端口 "+port+" 失败"); } }); //异步关闭通道 ch.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { //优雅退出线程池 bossgroup.shutdownGracefully(); workgroup.shutdownGracefully(); } } public static void main(String[] args) { NettyChartServer server=new NettyChartServer(8888); server.run(); } }
服务端主要作用是对8888端口进行监听,等待客户端的请求。使用了主从Reactor模式,去实现接收处理多个客户端请求。利用自定义业务处理handler 完成对信息的获取,以及客户端直接信息转发。
服务端业务处理器
import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.text.SimpleDateFormat; import java.util.HashMap; /** * @PackageName: com.netty.demo.chart * @author: youjp * @create: 2021-01-22 10:43 * @description: 业务自定义的逻辑处理类:它是入站 ChannelInboundHandler 类型的处理器,负责接收解码后的 HTTP 请求数据,并将请求处理结果写回客户端。 * @Version: 1.0 */ public class MyNettyServerHandler extends SimpleChannelInboundHandler<String> { //定义一个channerl集合,类似于List<Channel> ,用于存储不同的channel通道转发信息 private static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); //設計一個集合可存儲通道通道对应的用户,未开发改功能 private static HashMap<String,Channel> channelHashMap=new HashMap<>(); private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * 该channel建立连接后: 生命周期 handlerAdded》 channelRegistered-》channelActive, * 表示连接建立,一旦连接,第一个执行 * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("------------handlerAdded------"); Channel channel = ctx.channel(); channelGroup.add(channel); channelGroup.writeAndFlush("【客戶端】"+channel.remoteAddress()+"加入聊天"); } /** * 连接中断 * @param ctx * @throws Exception */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("------------handlerRemoved------"); Channel channel=ctx.channel(); //服务下线,移除该channel channelGroup.remove(channel); //并通知,已离线 channelGroup.writeAndFlush("【客戶端】"+channel.remoteAddress()+"离开了~"); } /** * 表示该channel处于活动状态 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("------------channelActive------"); System.out.println(ctx.channel().remoteAddress() + " 上线了~"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("------------channelInactive------"); System.out.println(ctx.channel().remoteAddress() + " 下线了~"); } /** * 读取消息 * @param channelHandlerContext * @param s * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { System.out.println("------------channelRead0------"); Channel channel = channelHandlerContext.channel(); //将消息转发给其他客户端,并且排除自己 channelGroup.forEach(e->{ if (channel!=e){ // e.writeAndFlush("【客戶端】"+channel.remoteAddress()+"说:"+msg); }else { e.writeAndFlush("【自己】"+"说:"+msg); } }); } /** * 消息读取后 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("------------channelReadComplete------"); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("------------channelRegistered------"); } /** * 离线生命: * @param ctx exceptionCaught-》channelInactive》channelUnregistered》handlerRemoved * @throws Exception */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("------------channelUnregistered------"); } /** * 处理心跳检测 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // System.out.println("------------userEventTriggered------"); if (evt instanceof IdleStateEvent){ //向下转型 IdleStateEvent event=(IdleStateEvent)evt; String eventType=null; switch (event.state()){ case READER_IDLE: eventType="读空闲"; break; case WRITER_IDLE: eventType="写空闲"; break; case ALL_IDLE: eventType="读写空闲"; break; default:break; } System.out.println(ctx.channel().remoteAddress()+"---超时时间---"+eventType); System.out.println("服务器做相应操作"); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("------------exceptionCaught------"); } }
业务自定义的逻辑处理类:它是入站 ChannelInboundHandler 类型的处理器,负责接收解码后的 HTTP 请求数据,并将请求处理结果写回客户端。当channle通道建立以后,便开启了生命周期。其中 channelRegistered 是用于channnel通道注册,channelActive用于查看通道是否活跃 、channelRead0用于读取客户端信息、exceptionCaught用于异常处理
客户端代码
客户端启动类
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import java.util.Scanner; /** * @PackageName: com.netty.demo.chart * @author: youjp * @create: 2021-01-22 10:06 * @description: 客户端 * @Version: 1.0 */ public class NettyChartClient { private String ip; private int port; public NettyChartClient(String ip, int port) { this.ip = ip; this.port = port; } /** * 客户端启动 */ public void run() { //创建工作线程池 CPU核数*2 EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE,true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //得到Pipeline ChannelPipeline pipeline = socketChannel.pipeline(); //加入相关handler pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", (ChannelHandler) new MyNettyClientHandler()); } }); System.out.println("---客户端启动了----"); //连接服务端 ChannelFuture channelFuture = bootstrap.connect(ip, port).sync(); Scanner can=new Scanner(System.in); while (can.hasNext()){ String str=can.nextLine(); channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(""+str, CharsetUtil.UTF_8)); } //异步关闭通道 channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) { NettyChartClient chartClient=new NettyChartClient("127.0.0.1",8888); chartClient.run(); } }
客户端处理器
import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; /** * @PackageName: com.netty.demo.chart * @author: youjp * @create: 2021-01-22 11:16 * @description: 客户端处理器 * @Version: 1.0 */ public class MyNettyClientHandler extends SimpleChannelInboundHandler<String> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8)); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { System.out.println(""+msg); } }
客户端只需要获取服务端传来的消息,并且可以自己编写消息发送即可。
运行测试
接下来,先将服务端启动类启动,然后再运行多个客户端。查看控制台,并发送消息,查看channel的生命周期
分别在各自的客户端控制台输入信息,测试。基于Netty实现的群聊功能就这样实现了
有兴趣的老爷,可以关注我的公众号【一起收破烂】,回复【006】获取2021最新java面试资料以及简历模型120套哦~