Client Channel Handler
public class NettyClientHandler extends ChannelInboundHandlerAdapter { /** * 当客户端连接服务器完成就会触发该方法 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8); ctx.writeAndFlush(buf); } //当通道有读取事件时会触发,即服务端发送数据给客户端 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服务端的地址: " + ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
Netty 聊天室 Demo
引入依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.35.Final</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.70</version> </dependency>
WrapMessage(用于封装消息)
public class WrapMessage implements Serializable { private static final long serialVersionUID = 3165017226845753050L; /** * 1.注册 2.群发 3.私聊 */ private int type; private String username; private String message; public WrapMessage(){} public int getType() { return type; } public void setType(int type) { this.type = type; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } @Override public String toString() { return "WrapMessage{" + "type=" + type + ", message='" + message + '\'' + '}'; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } }
Chat Server
public class ChatServer { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //向pipeline加入解码器 pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入编码器 pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast(new ChatServerHandler()); } }); System.out.println("netty server start。。"); //绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况 //启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕 ChannelFuture cf = bootstrap.bind(9000).sync(); //给cf注册监听器,监听我们关心的事件 /*cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("监听端口9000成功"); } else { System.out.println("监听端口9000失败"); } } });*/ //对通道关闭进行监听,closeFuture是异步操作,监听通道关闭 // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成 cf.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
Chat Server Channel Handler
public class ChatServerHandler extends SimpleChannelInboundHandler<String> { private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private static Map<String, String> userMap = new ConcurrentHashMap<>(); private static Map<String, Channel> channelMap = new ConcurrentHashMap<>(); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); // String message = "[客户端]" + channel.remoteAddress() + " 上线了"; // System.out.println(message); // channelGroup.writeAndFlush(message); channelGroup.add(channel); } private void userLogin(String username, Channel channel) { String address = channel.remoteAddress().toString(); userMap.put(address, username); channelMap.put(username, channel); channelGroup.writeAndFlush(username + " 进入了聊天室"); } private void sendMessageToAll(String message, Channel channel) { String address = channel.remoteAddress().toString(); String username = userMap.get(address); channelGroup.writeAndFlush(username + ":" + message); } private void sendMessageToOne(String username, String message, Channel channel) { String fromUser = userMap.get(channel.remoteAddress().toString()); Channel toChannel = channelMap.get(username); String finalMessage = "用户" + fromUser + "向您发送了一条消息:" + message; toChannel.writeAndFlush(finalMessage); channel.writeAndFlush("您向" + username + "发送了一条消息:" + message); } @Override protected void channelRead0(ChannelHandlerContext ctx, String messageJson) throws Exception { Channel channel = ctx.channel(); WrapMessage wrapMessage = JSON.parseObject(messageJson, WrapMessage.class); System.out.println("接收到服务端消息:" + wrapMessage); int type = wrapMessage.getType(); switch (type) { case 1: userLogin(wrapMessage.getUsername(), channel); break; case 2: sendMessageToAll(wrapMessage.getMessage(), channel); break; case 3: sendMessageToOne(wrapMessage.getUsername(), wrapMessage.getMessage(), channel); break; } // // String clientMessage = "[客户端]" + channel.remoteAddress() + " :" + message; // channelGroup.forEach(ch -> { // if (ch != channel) { // ch.writeAndFlush(clientMessage); // } else { // channel.writeAndFlush("[自己]:" + message); // } // }); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); String address = channel.remoteAddress().toString(); String username = userMap.get(address); String message = username + " 下线了"; userMap.remove(address); channelMap.remove(username); channelGroup.writeAndFlush(message); System.out.println("channelGroup size=" + channelGroup.size()); } }
Chat Client
public class ChatClient { public static void main(String[] args) throws Exception { //客户端需要一个事件循环组 EventLoopGroup group = new NioEventLoopGroup(); try { //创建客户端启动对象 //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap Bootstrap bootstrap = new Bootstrap(); //设置相关参数 bootstrap.group(group) //设置线程组 .channel(NioSocketChannel.class) // 使用 NioSocketChannel 作为客户端的通道实现 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new StringEncoder()); pipeline.addLast(new StringDecoder()); pipeline.addLast(new ChatClientHandler()); } }); System.out.println("netty client start"); //启动客户端去连接服务器端 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync(); //--------以下为业务逻辑代码 Channel channel = channelFuture.channel(); Scanner scanner = new Scanner(System.in); System.out.print("您的用户名为:"); String username = scanner.nextLine(); WrapMessage wrapMessage = new WrapMessage(); wrapMessage.setType(1); wrapMessage.setUsername(username); ByteBuf buf = Unpooled.copiedBuffer(JSON.toJSONString(wrapMessage), CharsetUtil.UTF_8); channel.writeAndFlush(buf); System.err.println("按任意键开始发送消息,按[q]退出"); String s = scanner.nextLine(); if("q".equals(s)){ return; } System.out.print("您的消息类型(2.群发消息 3.私聊)为:"); while (scanner.hasNextLine()){ String type = scanner.nextLine(); int messageType = Integer.parseInt(type); wrapMessage = new WrapMessage(); wrapMessage.setType(messageType); if(messageType == 2){ System.out.print("您想发送的消息:"); }else if(messageType == 3){ System.out.print("您想发送的用户:"); username = scanner.nextLine(); wrapMessage.setUsername(username); System.out.print("您想发送的消息:"); } String message = scanner.nextLine(); wrapMessage.setMessage(message); buf = Unpooled.copiedBuffer(JSON.toJSONString(wrapMessage), CharsetUtil.UTF_8); channel.writeAndFlush(buf); s = scanner.nextLine(); if("q".equals(s)){ return; } System.out.print("您的消息类型(1.注册 2.群发消息 3.私聊)为:"); } //对关闭通道进行监听 channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
Chat Client Channel Handler
public class ChatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String message) throws Exception { System.out.println(message); } }