需求
1)上线或者下线给其它人员通知
2)A发送消息其它人员都可见
设计思路
客户端与服务端建立连接后会触发 serverHandler中的 channelActive 方法,把channel保存到ChannelGroup中,当客户端给服务端发送消息时,把channelGroup中的每一个channel都把消息发送一遍,就实现群发功能
代码实现(亲测可用)
pom
<dependencies> <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency> </dependencies>
MyChatServerHandler
package mychat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; /** * @author CBeann * @create 2019-10-16 15:55 */ public class MyChatServerHandler extends SimpleChannelInboundHandler<String> { //用一个ChannelGroup保存所有连接到服务器的客户端通道 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { Channel channel = channelHandlerContext.channel(); //服务器收到消息 // "[服务端] " + channel.remoteAddress() + "通道关闭"; String body = s; //群发 channelGroup.forEach((x) -> { if (x != channel) { x.writeAndFlush(channel.remoteAddress() + "说===>" + s); } else { x.writeAndFlush("自己说===>" + s); } }); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); String notice = "[服务端] " + channel.remoteAddress() + "通道激活"; System.out.println(notice); channelGroup.writeAndFlush(notice); //添加建立连接的channel channelGroup.add(channel); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //删除失效的channel channelGroup.remove(channel); String notice = "[服务端] " + channel.remoteAddress() + "通道关闭"; channelGroup.writeAndFlush(notice); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel channel = ctx.channel(); System.out.println("[服务端] " + channel.remoteAddress() + "出现异常"); ctx.close(); } }
MyChatServer
package mychat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; /** * @author CBeann * @create 2019-10-16 15:51 */ public class MyChatServer { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //TimeClientHandler是自己定义的方法 socketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new MyChatServerHandler()); } }); //绑定端口 ChannelFuture f = b.bind(8888).sync(); //等待服务端监听端口关闭 f.channel().closeFuture().sync(); } catch ( Exception e) { } finally { //优雅关闭,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
MyChatClientHandler
package mychat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * @author CBeann * @create 2019-10-16 21:23 */ public class MyChatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { //收到服务端发送的消息 String body = s; System.out.println(body); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel channel = ctx.channel(); System.out.println("[客户端出现异常"); ctx.close(); } }
MyChatClient
package mychat; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import java.util.Scanner; /** * @author CBeann * @create 2019-10-16 21:23 */ public class MyChatClient { public static void main(String[] args) throws Exception { int port = 8888; String host = "127.0.0.1"; //配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); //TimeClientHandler是自己定义的方法 socketChannel.pipeline().addLast(new MyChatClientHandler()); } }); //发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); // //发送数据 Scanner reader = new Scanner(System.in); String body = reader.nextLine(); while (!"exit".equals(body)) { f.channel().writeAndFlush(body); body = reader.nextLine(); } //等待客户端链路关闭 f.channel().closeFuture().sync(); } catch (Exception e) { } finally { //优雅关闭 group.shutdownGracefully(); } } }
常见问题
1)IDEA怎么把一个启动类同时运行多次
先运行一下程序,在按照下面的操作进行