基于Netty实现TCP通信

简介: 基于Netty实现TCP通信

创建一个Maven项目添加下面依赖

<dependencies>
        <!-- 日志依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.32</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>1.9.13</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.78</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.84.Final</version>
        </dependency>
    </dependencies>

编码解码器

package com.example.nettydemo.coder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.nio.charset.StandardCharsets;
public class NettyEncoder extends MessageToByteEncoder<String> {
    public NettyEncoder() {
    }
    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
        byte[] byteMsg = msg.getBytes(StandardCharsets.UTF_8);
        int msgLength = byteMsg.length;
        ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(4 + byteMsg.length);
        buf.writeInt(msgLength);
        buf.writeBytes(byteMsg, 0, msgLength);
        out.writeBytes(buf);
        buf.release();
    }
}
package com.example.nettydemo.coder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.List;
@Slf4j
public class NettyDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int beginReader = in.readerIndex();
        int dataLength = in.readInt();
        if (in.readableBytes() < dataLength) {
            in.readerIndex(beginReader);
        } else {
            byte[] data = new byte[dataLength];
            in.readBytes(data);
            String str = new String(data, 0, dataLength, StandardCharsets.UTF_8);
            out.add(str);
        }
    }
}

服务端

package com.example.nettydemo.server;
import com.example.nettydemo.coder.NettyDecoder;
import com.example.nettydemo.coder.NettyEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.util.Map;
@Slf4j
public class TcpServer {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private ServerBootstrap server;
    private ChannelFuture channelFuture;
    private Integer port;
    public TcpServer(Integer port) {
        this.port = port;
        // nio连接处理池
        this.bossGroup = new NioEventLoopGroup();
        // 处理事件池
        this.workerGroup = new NioEventLoopGroup();
        server = new ServerBootstrap();
        server.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 自定义处理类
                        ch.pipeline().addLast(new NettyDecoder());
                        ch.pipeline().addLast(new NettyEncoder());
                        ch.pipeline().addLast(new TcpServerHandler());
                    }
                });
        server.option(ChannelOption.SO_BACKLOG, 128);
        server.childOption(ChannelOption.SO_KEEPALIVE, true);
    }
    public synchronized void startListen() {
        try {
            // 绑定到指定端口
            channelFuture = server.bind(port).sync();
            log.info("netty服务器在[{}]端口启动监听", port);
        } catch (Exception e) {
            log.error("netty服务器在[{}]端口启动监听失败", port);
            e.printStackTrace();
        }
    }
    public void sendMessageToClient(String clientIp, Object msg) {
        Map<String, Channel> channelMap = TcpServerHandler.channelSkipMap.get(port);
        Channel channel = channelMap.get(clientIp);
        String sendStr;
        try {
            sendStr = OBJECT_MAPPER.writeValueAsString(msg);
        } catch (JsonGenerationException e) {
            throw new RuntimeException(e);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        try {
            log.info("向客户端 {} 发送消息内容:{}", clientIp, sendStr);
            channel.writeAndFlush(sendStr);
        } catch (Exception var4) {
            log.error("向客户端 {} 发送消息失败,消息内容:{}", clientIp, sendStr);
            throw new RuntimeException(var4);
        }
    }
    public void pushMessageToClients(Object msg) {
        Map<String, Channel> channelMap = TcpServerHandler.channelSkipMap.get(port);
        if (channelMap != null && !channelMap.isEmpty()) {
            channelMap.forEach((k, v) -> sendMessageToClient(k, msg));
        }
    }
}
package com.example.nettydemo.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
@Slf4j
public class TcpServerHandler extends SimpleChannelInboundHandler<String> {
    /**
     * 用跳表存储连接channel
     */
    public static Map<Integer, Map<String, Channel>> channelSkipMap = new ConcurrentSkipListMap<>();
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("应用程序的监听通道异常!");
        cause.printStackTrace();
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        // 获取每个用户端连接的ip
        InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();
        String clientIp = ipSocket.getAddress().getHostAddress();
        InetSocketAddress localSocket = (InetSocketAddress) channel.localAddress();
        // 本地端口做键
        int localPort = localSocket.getPort();
        Map<String, Channel> channelMap = channelSkipMap.get(localPort);
        if (channelMap == null || channelMap.isEmpty()) {
            channelMap = new HashMap<>(4);
        }
        channelMap.put(clientIp, channel);
        channelSkipMap.put(localPort, channelMap);
        log.info("应用程序添加监听通道,与客户端:{} 建立连接成功!", clientIp);
    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // 获取每个用户端连接的ip
        Channel channel = ctx.channel();
        InetSocketAddress localSocket = (InetSocketAddress) channel.localAddress();
        int localPort = localSocket.getPort();
        InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();
        String clientIp = ipSocket.getAddress().getHostAddress();
        Map<String, Channel> channelMap = channelSkipMap.get(localPort);
        channelMap.remove(clientIp);
        log.info("应用程序移除监听通道,与客户端:{} 断开连接!", clientIp);
    }
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        Channel channel = channelHandlerContext.channel();
        // 获取每个用户端连接的ip
        InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();
        log.info("接收到客户端: {} 应用数据:{}", ipSocket, msg);
    }
}
package com.example.nettydemo.server;
public class ServerTest {
    public static void main(String[] args) {
        TcpServer tcpServer = new TcpServer(40001);
        tcpServer.startListen();
        while (true) {
            try {
                // 每5秒向客户端发送一次 "test-朱上林123"
                Thread.sleep(5000);
                tcpServer.pushMessageToClients("test-朱上林123");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

客户端

package com.example.nettydemo.client;
import com.example.nettydemo.coder.NettyDecoder;
import com.example.nettydemo.coder.NettyEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
@Slf4j
public class TcpClient {
    private EventLoopGroup group;
    private ChannelFuture channelFuture;
    private final String ip;
    private final Integer port;
    private final ObjectMapper objectMapper = new ObjectMapper();
    public TcpClient(String ip, Integer port) {
        this.ip = ip;
        this.port = port;
    }
    /**
     * 建立连接
     *
     */
    public synchronized void connectServer() {
        log.info("开始建立连接,ip:{}, port:{}", ip, port);
        // 生命nio连接池
        this.group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            // 配置解码器以及消息处理类
            b.group(this.group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new NettyEncoder());
                            pipeline.addLast(new NettyDecoder());
                            pipeline.addLast(new TcpClientHandler());
                        }
                    });
            // 开始连接
            this.channelFuture = b.connect(ip, port).sync();
        } catch (Exception var4) {
            log.error("连接建立失败,ip:{}, port:{}", ip, port);
            this.group.shutdownGracefully();
            var4.printStackTrace();
        }
    }
    /**
     * 关闭连接
     */
    public void close() {
        this.group.shutdownGracefully();
    }
    /**
     * 发送消息
     *
     * @param msg
     */
    public synchronized void sendCommonMsg(Object msg) {
        String sendStr;
        if (!getConnectStatus()) {
            connectServer();
        }
        try {
            sendStr = objectMapper.writeValueAsString(msg);
        } catch (JsonMappingException e) {
            throw new RuntimeException(e);
        } catch (JsonGenerationException e) {
            throw new RuntimeException(e);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        try {
            log.info("发送消息内容:{}", sendStr);
            this.channelFuture.channel().writeAndFlush(sendStr);
        } catch (Exception var4) {
            log.error("发送消息失败,消息内容:{}", sendStr);
            throw new RuntimeException(var4);
        }
    }
    /**
     * 获取当前连接状态
     */
    public Boolean getConnectStatus() {
        return group != null && !group.isShutdown() && !group.isShuttingDown();
    }
}
package com.example.nettydemo.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TcpClientHandler extends SimpleChannelInboundHandler<String> {
    /**
     * 读取事件
     *
     * @param channelHandlerContext
     * @param msg
     */
    @Override
    public void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) {
        log.info("服务返回消息 :{}", msg);
    }
    /**
     * 发生异常
     *
     * @param channelHandlerContext
     * @param throwable
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) {
        log.error("通信发生异常:" + throwable.getMessage());
        channelHandlerContext.close();
    }
}
package com.example.nettydemo.client;
public class TcpClientTest {
    public static void main(String[] args) {
        TcpClient tcpClient = new TcpClient("127.0.0.1", 40001);
        // 客户端连接到服务器后,向服务器发送一条消息:
        tcpClient.connectServer();
        tcpClient.sendCommonMsg("我是Client,刚刚是我连接到你的!");
    }
}

启动服务端和客户端实现通信

下课!

目录
相关文章
|
3月前
|
网络协议
【Netty 网络通信】Socket 通信原理
【1月更文挑战第9天】【Netty 网络通信】Socket 通信原理
|
4月前
|
编解码 缓存 移动开发
TCP粘包/拆包与Netty解决方案
TCP粘包/拆包与Netty解决方案
45 0
|
2月前
|
网络协议
Netty实现TCP通信
Netty实现TCP通信
43 0
|
8月前
|
网络协议 安全 Java
Java 网络编程TCP协议之发送数据和接收数据的详解
Java 网络编程TCP协议之发送数据和接收数据的详解
170 0
|
8月前
|
存储 缓存 网络协议
Kill Anxiety-Netty-TCP粘包半包
Kill Anxiety-Netty-TCP粘包半包
43 0
|
11月前
|
前端开发 Java 网络性能优化
Netty网络编程(五):使用UDP协议
Netty网络编程(五):使用UDP协议
445 0
|
网络协议 安全 Java
java实现UDP及TCP通信
java实现UDP及TCP通信,UDP(User Datagram Protocol)用户数据报协议,TCP(Transmission Control Protocol) 传输控制协议,是传输层的两个重要协议。
170 0
|
前端开发 Java 网络性能优化
netty系列之:使用UDP协议
netty系列之:使用UDP协议
netty系列之:使用UDP协议
|
编解码 网络协议 Oracle
网络编程(TCP通信、Socket)入门详解(一)
网络编程(TCP通信、Socket)入门详解(一)
364 0
网络编程(TCP通信、Socket)入门详解(一)
|
网络协议
Netty之第一次 TCP 连接时发生了什么
Netty之第一次 TCP 连接时发生了什么
133 0