Netty(一)Netty核心功能与线程模型2

简介: Netty(一)Netty核心功能与线程模型

Client Channel Handler

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    /**
     * 当客户端连接服务器完成就会触发该方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }
    //当通道有读取事件时会触发,即服务端发送数据给客户端
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务端的地址: " + ctx.channel().remoteAddress());
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Netty 聊天室 Demo

引入依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.35.Final</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.70</version>
</dependency>

WrapMessage(用于封装消息)

public class WrapMessage implements Serializable {
    private static final long serialVersionUID = 3165017226845753050L;
    /**
     * 1.注册 2.群发 3.私聊
     */
    private int type;
    private String username;
    private String message;
    public WrapMessage(){}
    public int getType() {
        return type;
    }
    public void setType(int type) {
        this.type = type;
    }
    public String getMessage() {
        return message;
    }
    public void setMessage(String message) {
        this.message = message;
    }
    @Override
    public String toString() {
        return "WrapMessage{" +
                "type=" + type +
                ", message='" + message + '\'' +
                '}';
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
}

Chat Server

public class ChatServer {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            //向pipeline加入解码器
                            pipeline.addLast("decoder", new StringDecoder());
                            //向pipeline加入编码器
                            pipeline.addLast("encoder", new StringEncoder());
                            pipeline.addLast(new ChatServerHandler());
                        }
                    });
            System.out.println("netty server start。。");
            //绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
            //启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
            ChannelFuture cf = bootstrap.bind(9000).sync();
            //给cf注册监听器,监听我们关心的事件
            /*cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("监听端口9000成功");
                    } else {
                        System.out.println("监听端口9000失败");
                    }
                }
            });*/
            //对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
            // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

Chat Server Channel Handler

public class ChatServerHandler extends SimpleChannelInboundHandler<String> {
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private static Map<String, String> userMap = new ConcurrentHashMap<>();
    private static Map<String, Channel> channelMap = new ConcurrentHashMap<>();
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
//        String message = "[客户端]" + channel.remoteAddress() + " 上线了";
//        System.out.println(message);
//        channelGroup.writeAndFlush(message);
        channelGroup.add(channel);
    }
    private void userLogin(String username, Channel channel) {
        String address = channel.remoteAddress().toString();
        userMap.put(address, username);
        channelMap.put(username, channel);
        channelGroup.writeAndFlush(username + " 进入了聊天室");
    }
    private void sendMessageToAll(String message, Channel channel) {
        String address = channel.remoteAddress().toString();
        String username = userMap.get(address);
        channelGroup.writeAndFlush(username + ":" + message);
    }
    private void sendMessageToOne(String username, String message, Channel channel) {
        String fromUser = userMap.get(channel.remoteAddress().toString());
        Channel toChannel = channelMap.get(username);
        String finalMessage = "用户" + fromUser + "向您发送了一条消息:" + message;
        toChannel.writeAndFlush(finalMessage);
        channel.writeAndFlush("您向" + username + "发送了一条消息:" + message);
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String messageJson) throws Exception {
        Channel channel = ctx.channel();
        WrapMessage wrapMessage = JSON.parseObject(messageJson, WrapMessage.class);
        System.out.println("接收到服务端消息:" + wrapMessage);
        int type = wrapMessage.getType();
        switch (type) {
            case 1:
                userLogin(wrapMessage.getUsername(), channel);
                break;
            case 2:
                sendMessageToAll(wrapMessage.getMessage(), channel);
                break;
            case 3:
                sendMessageToOne(wrapMessage.getUsername(), wrapMessage.getMessage(), channel);
                break;
        }
//
//        String clientMessage = "[客户端]" + channel.remoteAddress() + " :" + message;
//        channelGroup.forEach(ch -> {
//            if (ch != channel) {
//                ch.writeAndFlush(clientMessage);
//            } else {
//                channel.writeAndFlush("[自己]:" + message);
//            }
//        });
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        String address = channel.remoteAddress().toString();
        String username = userMap.get(address);
        String message = username + " 下线了";
        userMap.remove(address);
        channelMap.remove(username);
        channelGroup.writeAndFlush(message);
        System.out.println("channelGroup size=" + channelGroup.size());
    }
}

Chat Client

public class ChatClient {
    public static void main(String[] args) throws Exception {
        //客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //创建客户端启动对象
            //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //设置相关参数
            bootstrap.group(group) //设置线程组
                    .channel(NioSocketChannel.class) // 使用 NioSocketChannel 作为客户端的通道实现
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            ChannelPipeline pipeline = channel.pipeline();
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new ChatClientHandler());
                        }
                    });
            System.out.println("netty client start");
            //启动客户端去连接服务器端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
            //--------以下为业务逻辑代码
            Channel channel = channelFuture.channel();
            Scanner scanner = new Scanner(System.in);
            System.out.print("您的用户名为:");
            String username = scanner.nextLine();
            WrapMessage wrapMessage = new WrapMessage();
            wrapMessage.setType(1);
            wrapMessage.setUsername(username);
            ByteBuf buf = Unpooled.copiedBuffer(JSON.toJSONString(wrapMessage), CharsetUtil.UTF_8);
            channel.writeAndFlush(buf);
            System.err.println("按任意键开始发送消息,按[q]退出");
            String s = scanner.nextLine();
            if("q".equals(s)){
                return;
            }
            System.out.print("您的消息类型(2.群发消息 3.私聊)为:");
            while (scanner.hasNextLine()){
                String type = scanner.nextLine();
                int messageType = Integer.parseInt(type);
                wrapMessage = new WrapMessage();
                wrapMessage.setType(messageType);
                if(messageType == 2){
                    System.out.print("您想发送的消息:");
                }else if(messageType == 3){
                    System.out.print("您想发送的用户:");
                    username = scanner.nextLine();
                    wrapMessage.setUsername(username);
                    System.out.print("您想发送的消息:");
                }
                String message = scanner.nextLine();
                wrapMessage.setMessage(message);
                buf = Unpooled.copiedBuffer(JSON.toJSONString(wrapMessage), CharsetUtil.UTF_8);
                channel.writeAndFlush(buf);
                s = scanner.nextLine();
                if("q".equals(s)){
                    return;
                }
                System.out.print("您的消息类型(1.注册 2.群发消息 3.私聊)为:");
            }
            //对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

Chat Client Channel Handler

public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String message) throws Exception {
        System.out.println(message);
    }
}
目录
相关文章
|
2月前
|
编解码 网络协议 API
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
Netty运行原理问题之Netty的主次Reactor多线程模型工作的问题如何解决
|
25天前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
68 20
剖析 Redis List 消息队列的三种消费线程模型
|
10天前
|
存储 机器人 Linux
Netty(二)-服务端网络编程常见网络IO模型讲解
Netty(二)-服务端网络编程常见网络IO模型讲解
|
1月前
|
网络协议 C语言
C语言 网络编程(十四)并发的TCP服务端-以线程完成功能
这段代码实现了一个基于TCP协议的多线程服务器和客户端程序,服务器端通过为每个客户端创建独立的线程来处理并发请求,解决了粘包问题并支持不定长数据传输。服务器监听在IP地址`172.17.140.183`的`8080`端口上,接收客户端发来的数据,并将接收到的消息添加“-回传”后返回给客户端。客户端则可以循环输入并发送数据,同时接收服务器回传的信息。当输入“exit”时,客户端会结束与服务器的通信并关闭连接。
|
1月前
|
C语言
C语言 网络编程(九)并发的UDP服务端 以线程完成功能
这是一个基于UDP协议的客户端和服务端程序,其中服务端采用多线程并发处理客户端请求。客户端通过UDP向服务端发送登录请求,并根据登录结果与服务端的新子线程进行后续交互。服务端在主线程中接收客户端请求并创建新线程处理登录验证及后续通信,子线程创建新的套接字并与客户端进行数据交换。该程序展示了如何利用线程和UDP实现简单的并发服务器架构。
|
2月前
|
Java 开发者
Java多线程教程:使用ReentrantLock实现高级锁功能
Java多线程教程:使用ReentrantLock实现高级锁功能
34 1
|
3月前
|
缓存 编译器 Go
开发与运维线程问题之Go语言的goroutine基于线程模型实现如何解决
开发与运维线程问题之Go语言的goroutine基于线程模型实现如何解决
51 3
|
3月前
|
算法 调度 人工智能
人工智能线程问题之无锁化编程如何解决
人工智能线程问题之无锁化编程如何解决
43 2
|
2月前
|
存储 Kubernetes NoSQL
Tair的发展问题之Tair在适配不同的存储介质时对于线程模型该如何选择
Tair的发展问题之Tair在适配不同的存储介质时对于线程模型该如何选择
|
3月前
|
Java Linux
Java演进问题之1:1线程模型对于I/O密集型任务如何解决
Java演进问题之1:1线程模型对于I/O密集型任务如何解决