技术笔记:Netty专题(六)

简介: 技术笔记:Netty专题(六)

netty群聊系统


实例要求


1) 编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)


2) 实现多人群聊


3) 服务器端:可以监测用户上线,离线,并实现消息转发功能


4) 客户端:通过 channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发


得到)


服务端


1、GroupChatServer


package com.atguigu.netty.groupchat;


import io.netty.bootstrap.ServerBootstrap;


import io.netty.channel.;


import io.netty.channel.nio.NioEventLoopGroup;


import io.netty.channel.socket.SocketChannel;


import io.netty.channel.socket.nio.NioServerSocketChannel;


import io.netty.handler.codec.string.StringDecoder;


import io.netty.handler.codec.string.StringEncoder;


public class GroupChatServer {


private int port; //监听端口


public GroupChatServer(int port) {


this.port = port;


}


//编写run方法,处理客户端的请求


public void run() throws Exception{


//创建两个线程组


EventLoopGroup bossGroup = new NioEventLoopGroup(1);


EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop


try {


ServerBootstrap b = new ServerBootstrap();


b.group(bossGroup, workerGroup)


//异步的服务器端 TCP Socket 连接


.channel(NioServerSocketChannel.class)


//用来给 ServerChannel 添加配置


//SO_BACKLOG对应TCP/IP协议listen函数中的backlog参数,用来初始化服务器可连接队列大小。


.option(ChannelOption.SO_BACKLOG, 128)


//用来给接收到的通道添加配置


//SO_KEEPALIVE,一直保持连接活动状态


.childOption(ChannelOption.SO_KEEPALIVE, true)


//该方法用来设置业务处理类(自定义的 handler)


.childHandler(new ChannelInitializer() {


@Override


protected void initChannel(SocketChannel ch) throws Exception {


//获取到pipeline


ChannelPipeline pipeline = ch.pipeline();


//向pipeline加入解码器


pipeline.addLast("decoder", new StringDecoder());


//向pipeline加入编码器


pipeline.addLast("encoder", new StringEncoder());


//加入自己的业务处理handler


pipeline.addLast(new GroupChatServerHandler());


}


});


System.out.println("netty 服务器启动");


//该方法用于服务器端,用来设置占用的端口号。等待异步操作执行完毕


ChannelFuture channelFuture = b.bind(port).sync();


//监听关闭


channelFuture.channel().closeFuture().sync();


}finally {


bossGroup.shutdownGracefully();


workerGroup.shutdownGracefully();


}


}


public static void main(String【】 args) throws Exception {


new GroupChatServer(7000).run();


}


}


2、GroupChatServerHandler


package com.atguigu.netty.groupchat;


import io.netty.channel.Channel;


import io.netty.channel.ChannelHandlerContext;


import io.netty.channel.SimpleChannelInboundHandler;


import io.netty.channel.group.ChannelGroup;


import io.netty.channel.group.DefaultChannelGroup;


import io.netty.util.concurrent.GlobalEventExecutor;


import java.text.SimpleDateFormat;


import java.util.ArrayList;


import java.util.HashMap;


import java.util.List;


import java.util.Map;


public class GroupChatServerHandler extends SimpleChannelInboundHandler {


//public static List channels = new ArrayList();


//使用一个hashmap 管理


//public static Map channels = new HashMap();


//定义一个channle 组,管理所有的channel


//GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例


private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);


SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


//handlerAdded 表示连接建立,一旦连接,第一个被执行


//将当前channel 加入到 channelGroup


@Override


public void handlerAdded(ChannelHandlerContext ctx) throws Exception {


Channel channel = ctx.channel();


//将该客户加入聊天的信息推送给其它在线的客户端


/


该方法会将 channelGroup 中所有的channel 遍历,并发送 消息,


我们不需要自己遍历


/


channelGroup.writeAndFlush("【客户端】" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");


channelGroup.add(channel);


}


//断开连接, 将xx客户离开信息推送给当前在线的客户


@Override


public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {


Channel channel = ctx.channel();


channelGroup.writeAndFlush("【客户端】" + channel.remoteAddress() + " 离开了\n");


System.out.println("channelGroup size" + channelGroup.size());


}


//表示channel 处于活动状态, 提示 xx上线


@Override


public void channelActive(ChannelHandlerContext ctx) throws Exception {


System.out.println(ctx.channel().remoteAddress() + " 上线了~");


}


//表示channel 处于不活动状态, 提示 xx离线了


@Override


public void channelInactive(ChannelHandlerContext ctx) throws Exception {


System.out.println(ctx.channel().remoteAddress() + " 离线了~");


}


//读取数据


@Override


protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {


//获取到当前channel


Channel channel = ctx.channel();


//这时我们遍历channelGroup, 根据不同的情况,回送不同的消息


channelGroup.forEach(ch -> {


if(channel != ch) { //不是当前的channel,转发消息


ch.writeAndFlush("【客户】" + channel.remoteAddress() + " 发送了消息" + msg + "\n");


}else {//回显自己发送的消息给自己


ch.writeAndFlush("【自己】发送了消息" + msg + "\n");


}


});


}


@Override


public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {


//关闭通道


ctx.close();


}


}


客户端


1、GroupChatClient


package com.atguigu.netty.groupchat;


import io.netty.bootstrap.Bootstrap;


import io.netty.channel.;


import io.netty.channel.nio.NioEventLoopGroup;


import io.netty.channel.socket.SocketChannel;


import io.netty.channel.socket.nio.NioSocketChannel;


import io.netty.handler.codec.string.StringDecoder;


import io.netty.handler.codec.string.StringEncoder;


import java.util.Scanner;


public class GroupChatClient {


//属性


private final String host;


private final int port;


public GroupChatClient(String host, int port) {


this.host = host;


this.port = port;


}


public void run() throws Exception{


EventLoopGroup group = new NioEventLoopGroup();


try {


Bootstrap bootstrap = new Bootstrap()


.group(group)


.channel(NioSocketChannel.class)


.handler(new ChannelInitializer() {


@Override


protected void initChannel(SocketChannel ch) throws Exception {


//得到pipeline


ChannelPipeline pipeline = ch.pipeline();


//加入相关handler


pipeline.addLast("decoder", new StringDecoder());


pipeline.addLast("encoder", new StringEncoder());


//加入自定义的handler


pipeline.addLast(new GroupChatClientHandler());


}


});


ChannelFuture channelFuture = bootstrap.connect(host, port).sync();


//得到channel


Channel channel = channelFuture.channel();


//代码效果参考:http://www.lyjsj.net.cn/wx/art_23320.html

System.out.println("-------" + channel.localAddress()+ "--------");

//客户端需要输入信息,创建一个扫描器


Scanner scanner = new Scanner(System.in);


while (scanner.hasNextLine()) {


String msg = scanner.nextLine();


//通过channel 发送到服务器端


channel.writeAndFlush(msg + "\r\n");


}


}finally {


group.shutdownGracefully();


}


}


public static void main(String【】 args) throws Exception {


new GroupChatClient("127.0.0.1", 7000).run();


}


}


2、GroupChatClientHandler


package com.atguigu.netty.groupchat;


import io.netty.channel.ChannelHandlerContext;


import io.netty.channel.SimpleChannelInboundHandler;


public class GroupChatClientHandler extends SimpleChannelInboundHandler {


@Override


protected void channelRead0(ChannelHandlerContext //代码效果参考:http://www.lyjsj.net.cn/wx/art_23318.html

ctx, String msg) throws Exception {

System.out.println(msg.trim());


}


}


运行效果


netty心跳检测应用


实例要求


1) 编写一个 Netty 心跳检测机制案例, 当服务器超过 3 秒没有读时,就提示读空闲


2) 当服务器超过 5 秒没有写操作时,就提示写空闲


3) 实现当服务器超过 7 秒没有读或者写操作时,就提示读写空闲


服务端


1、MyServer


package com.atguigu.netty.heartbeat;


import io.netty.bootstrap.ServerBootstrap;


import io.netty.channel.ChannelFuture;


import io.netty.channel.ChannelInitializer;


import io.netty.channel.ChannelPipeline;


import io.netty.channel.EventLoopGroup;


import io.netty.channel.nio.NioEventLoopGroup;


import io.netty.channel.socket.SocketChannel;


import io.netty.channel.socket.nio.NioServerSocketChannel;


import io.netty.handler.logging.LogLevel;


import io.netty.handler.logging.LoggingHandler;


import io.netty.handler.timeout.IdleStateHandler;


import java.util.concurrent.TimeUnit;


public class MyServer {


public static void main(String【】 args) throws Exception{


//创建两个线程组


EventLoopGroup bossGroup = new NioEventLoopGroup(1);


EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop


try {


ServerBootstrap serverBootstrap = new ServerBootstrap();


serverBootstrap.group(bossGroup, workerGroup);


serverBootstrap.channel(NioServerSocketChannel.class);


serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));


serverBootstrap.childHandler(new ChannelInitializer() {


@Override


protected void initChannel(SocketChannel ch) throws Exception {


ChannelPipeline pipeline = ch.pipeline();


//加入一个netty 提供 IdleStateHandler


/


说明


1. IdleStateHandler 是netty 提供的处理空闲状态的处理器


2. long readerIdleTime : 表示多长时间没有读, 就会发送一个心跳检测包检测是否连接


3. long writerIdleTime : 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接


4. long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接


5. 文档说明


triggers an {@link IdleStateEvent} when a {@link Channel} has not performed


read, write, or both operation for a while.


6. 当 IdleStateEvent 触发后 , 就会传递给管道 的下一个handler去处理


通过调用(触发)下一个handler 的 userEventTiggered , 在该方法中去处理 IdleStateEvent(读空闲,写空闲,读写空闲)


*/


pipeline.addLast(new IdleStateHandler(7000,7000,10, TimeUnit.SECONDS));


//加入一个对空闲检测进一步处理的handler(自定义)


pipeline.addLast(new MyServerHandler());


}


});


//启动服务器


ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();


channelFuture.channel().closeFuture().sync();


}finally {


bossGroup.shutdownGracefully();


workerGroup.shutdownGracefully();


}


}


}


2、MyServerHandler


相关文章
|
6月前
|
Java API 容器
《跟闪电侠学Netty》阅读笔记 - 数据载体ByteBuf
《跟闪电侠学Netty》阅读笔记 - 数据载体ByteBuf
132 0
|
6月前
|
缓存 网络协议 算法
《跟闪电侠学Netty》阅读笔记 - Netty入门程序解析
《跟闪电侠学Netty》阅读笔记 - Netty入门程序解析
208 0
|
6月前
|
Java Unix Linux
【Netty技术专题】「原理分析系列」Netty强大特性之Native transports扩展开发实战
当涉及到网络通信和高性能的Java应用程序时,Netty是一个强大的框架。它提供了许多功能和组件,其中之一是JNI传输。JNI传输是Netty的一个特性,它为特定平台提供了高效的网络传输。 在本文中,我们将深入探讨Netty提供的特定平台的JNI传输功能,分析其优势和适用场景。我们将介绍每个特定平台的JNI传输,并讨论其性能、可靠性和可扩展性。通过了解这些特定平台的JNI传输,您将能够更好地选择和配置适合您应用程序需求的网络传输方式,以实现最佳的性能和可靠性。
142 7
【Netty技术专题】「原理分析系列」Netty强大特性之Native transports扩展开发实战
|
5月前
|
消息中间件 存储 Java
美团面试:说说Netty的零拷贝技术?
零拷贝技术(Zero-Copy)是一个大家耳熟能详的技术名词了,它主要用于提升 IO(Input & Output)的传输性能。 那么问题来了,为什么零拷贝技术能提升 IO 性能? ## 1.零拷贝技术和性能 在传统的 IO 操作中,当我们需要读取并传输数据时,我们需要在用户态(用户空间)和内核态(内核空间)中进行数据拷贝,它的执行流程如下: ![](https://cdn.nlark.com/yuque/0/2024/png/92791/1706491312473-52f5904a-2742-4e99-9b78-995e9a8b9696.png?x-oss-process=image%2F
53 0
|
6月前
|
Dubbo Java 应用服务中间件
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)
今天,我要向大家实现一个基于Netty实现的高性能远程通信框架!这个框架利用了 Netty 的强大功能,提供了快速、可靠的远程通信能力。 无论是构建大规模微服务架构还是实现分布式计算,这个分布式通信框架都是一个不可或缺的利器。
147 2
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)
|
6月前
|
负载均衡 Java 调度
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)
经过阅读《【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)》,相信您已经对网络通信框架的网络通信层的实现原理和协议模型有了一定的认识和理解。
87 0
【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)
|
6月前
|
消息中间件 缓存 Java
《跟闪电侠学Netty》阅读笔记 - 开篇入门Netty
《跟闪电侠学Netty》阅读笔记 - 开篇入门Netty
169 0
|
6月前
|
开发工具 git
网络编程(三)netty学习demo和笔记和推荐的4本书
网络编程(三)netty学习demo和笔记和推荐的4本书
128 0
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13496 1
|
6月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
127 1