创建一个Maven项目添加下面依赖
<dependencies> <!-- 日志依赖 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.32</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.24</version> <optional>true</optional> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> <version>1.9.13</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.78</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.84.Final</version> </dependency> </dependencies>
编码解码器
package com.example.nettydemo.coder; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.nio.charset.StandardCharsets; public class NettyEncoder extends MessageToByteEncoder<String> { public NettyEncoder() { } @Override protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception { byte[] byteMsg = msg.getBytes(StandardCharsets.UTF_8); int msgLength = byteMsg.length; ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(4 + byteMsg.length); buf.writeInt(msgLength); buf.writeBytes(byteMsg, 0, msgLength); out.writeBytes(buf); buf.release(); } }
package com.example.nettydemo.coder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.extern.slf4j.Slf4j; import java.nio.charset.StandardCharsets; import java.util.List; @Slf4j public class NettyDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int beginReader = in.readerIndex(); int dataLength = in.readInt(); if (in.readableBytes() < dataLength) { in.readerIndex(beginReader); } else { byte[] data = new byte[dataLength]; in.readBytes(data); String str = new String(data, 0, dataLength, StandardCharsets.UTF_8); out.add(str); } } }
服务端
package com.example.nettydemo.server; import com.example.nettydemo.coder.NettyDecoder; import com.example.nettydemo.coder.NettyEncoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.ObjectMapper; import java.io.IOException; import java.util.Map; @Slf4j public class TcpServer { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private ServerBootstrap server; private ChannelFuture channelFuture; private Integer port; public TcpServer(Integer port) { this.port = port; // nio连接处理池 this.bossGroup = new NioEventLoopGroup(); // 处理事件池 this.workerGroup = new NioEventLoopGroup(); server = new ServerBootstrap(); server.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 自定义处理类 ch.pipeline().addLast(new NettyDecoder()); ch.pipeline().addLast(new NettyEncoder()); ch.pipeline().addLast(new TcpServerHandler()); } }); server.option(ChannelOption.SO_BACKLOG, 128); server.childOption(ChannelOption.SO_KEEPALIVE, true); } public synchronized void startListen() { try { // 绑定到指定端口 channelFuture = server.bind(port).sync(); log.info("netty服务器在[{}]端口启动监听", port); } catch (Exception e) { log.error("netty服务器在[{}]端口启动监听失败", port); e.printStackTrace(); } } public void sendMessageToClient(String clientIp, Object msg) { Map<String, Channel> channelMap = TcpServerHandler.channelSkipMap.get(port); Channel channel = channelMap.get(clientIp); String sendStr; try { sendStr = OBJECT_MAPPER.writeValueAsString(msg); } catch (JsonGenerationException e) { throw new RuntimeException(e); } catch (IOException e) { throw new RuntimeException(e); } try { log.info("向客户端 {} 发送消息内容:{}", clientIp, sendStr); channel.writeAndFlush(sendStr); } catch (Exception var4) { log.error("向客户端 {} 发送消息失败,消息内容:{}", clientIp, sendStr); throw new RuntimeException(var4); } } public void pushMessageToClients(Object msg) { Map<String, Channel> channelMap = TcpServerHandler.channelSkipMap.get(port); if (channelMap != null && !channelMap.isEmpty()) { channelMap.forEach((k, v) -> sendMessageToClient(k, msg)); } } }
package com.example.nettydemo.server; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; @Slf4j public class TcpServerHandler extends SimpleChannelInboundHandler<String> { /** * 用跳表存储连接channel */ public static Map<Integer, Map<String, Channel>> channelSkipMap = new ConcurrentSkipListMap<>(); @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("应用程序的监听通道异常!"); cause.printStackTrace(); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); // 获取每个用户端连接的ip InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress(); String clientIp = ipSocket.getAddress().getHostAddress(); InetSocketAddress localSocket = (InetSocketAddress) channel.localAddress(); // 本地端口做键 int localPort = localSocket.getPort(); Map<String, Channel> channelMap = channelSkipMap.get(localPort); if (channelMap == null || channelMap.isEmpty()) { channelMap = new HashMap<>(4); } channelMap.put(clientIp, channel); channelSkipMap.put(localPort, channelMap); log.info("应用程序添加监听通道,与客户端:{} 建立连接成功!", clientIp); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // 获取每个用户端连接的ip Channel channel = ctx.channel(); InetSocketAddress localSocket = (InetSocketAddress) channel.localAddress(); int localPort = localSocket.getPort(); InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress(); String clientIp = ipSocket.getAddress().getHostAddress(); Map<String, Channel> channelMap = channelSkipMap.get(localPort); channelMap.remove(clientIp); log.info("应用程序移除监听通道,与客户端:{} 断开连接!", clientIp); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { Channel channel = channelHandlerContext.channel(); // 获取每个用户端连接的ip InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress(); log.info("接收到客户端: {} 应用数据:{}", ipSocket, msg); } }
package com.example.nettydemo.server; public class ServerTest { public static void main(String[] args) { TcpServer tcpServer = new TcpServer(40001); tcpServer.startListen(); while (true) { try { // 每5秒向客户端发送一次 "test-朱上林123" Thread.sleep(5000); tcpServer.pushMessageToClients("test-朱上林123"); } catch (Exception e) { e.printStackTrace(); } } } }
客户端
package com.example.nettydemo.client; import com.example.nettydemo.coder.NettyDecoder; import com.example.nettydemo.coder.NettyEncoder; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; import java.io.IOException; @Slf4j public class TcpClient { private EventLoopGroup group; private ChannelFuture channelFuture; private final String ip; private final Integer port; private final ObjectMapper objectMapper = new ObjectMapper(); public TcpClient(String ip, Integer port) { this.ip = ip; this.port = port; } /** * 建立连接 * */ public synchronized void connectServer() { log.info("开始建立连接,ip:{}, port:{}", ip, port); // 生命nio连接池 this.group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); // 配置解码器以及消息处理类 b.group(this.group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new NettyEncoder()); pipeline.addLast(new NettyDecoder()); pipeline.addLast(new TcpClientHandler()); } }); // 开始连接 this.channelFuture = b.connect(ip, port).sync(); } catch (Exception var4) { log.error("连接建立失败,ip:{}, port:{}", ip, port); this.group.shutdownGracefully(); var4.printStackTrace(); } } /** * 关闭连接 */ public void close() { this.group.shutdownGracefully(); } /** * 发送消息 * * @param msg */ public synchronized void sendCommonMsg(Object msg) { String sendStr; if (!getConnectStatus()) { connectServer(); } try { sendStr = objectMapper.writeValueAsString(msg); } catch (JsonMappingException e) { throw new RuntimeException(e); } catch (JsonGenerationException e) { throw new RuntimeException(e); } catch (IOException e) { throw new RuntimeException(e); } try { log.info("发送消息内容:{}", sendStr); this.channelFuture.channel().writeAndFlush(sendStr); } catch (Exception var4) { log.error("发送消息失败,消息内容:{}", sendStr); throw new RuntimeException(var4); } } /** * 获取当前连接状态 */ public Boolean getConnectStatus() { return group != null && !group.isShutdown() && !group.isShuttingDown(); } }
package com.example.nettydemo.client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class TcpClientHandler extends SimpleChannelInboundHandler<String> { /** * 读取事件 * * @param channelHandlerContext * @param msg */ @Override public void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) { log.info("服务返回消息 :{}", msg); } /** * 发生异常 * * @param channelHandlerContext * @param throwable */ @Override public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) { log.error("通信发生异常:" + throwable.getMessage()); channelHandlerContext.close(); } }
package com.example.nettydemo.client; public class TcpClientTest { public static void main(String[] args) { TcpClient tcpClient = new TcpClient("127.0.0.1", 40001); // 客户端连接到服务器后,向服务器发送一条消息: tcpClient.connectServer(); tcpClient.sendCommonMsg("我是Client,刚刚是我连接到你的!"); } }
启动服务端和客户端实现通信
下课!