当时犹豫和很久这些网络相关的实现放在网络编程里还是Netty里,但因为都是通过Netty实现的,顾还是放在Netty实战系列里比较合适,毕竟网络编程里实现方式很多种嘛?
一、UDP单播和广播
在网络编程一 - 计算机网络体系基础知识中,我们已经讲过UDP协议。在UDP 这样的无连接协议中,并没有持久化连接这样的概念,并且每个消息(一个UDP 数据报)都是一个单独的传输单元。此外,UDP 也没有TCP 的纠错机制。
通过类比,TCP 连接就像打电话,其中一系列的有序消息将会在两个方向上流动。相反,UDP 则类似于往邮箱中投入一叠明信片。你无法知道它们将以何种顺序到达它们的目的地,或者它们是否所有的都能够到达它们的目的地。
UDP的这些方面可能会让你感觉到严重的局限性,但是它们也解释了为何它会比TCP快那么多:所有的握手以及消息管理机制的开销都已经被消除了。显然,UDP很适合那些能够处理或者容忍消息丢失的应用程序,但可能不适合那些处理金融交易的应用程序。
本身作为无连接的不可靠的传输协议(适合频繁发送较小的数据包),他不会对数据包进行合并发送(也就没有Nagle算法之说了),他直接是一端发送什么数据,直接就发出去了,既然他不会对数据合并,每一个数据包都是完整的(数据+UDP头+IP头等等发一次数据封装一次)也就没有粘包一说了。
1、单播
定义为发送消息给一个由唯一的地址所标识的单一的网络目的地。面向连接的协议和无连接协议都支持这种模式。
2、广播
传输到网络(或者子网)上的所有主机。就像村里的大喇叭。只要在接收范围内。都能监听到喇叭的信息。
3、应用场景
单播和广播的应用场景主要就是消息中间件MQ。进行消息的通知和订阅。
二、Netty 的UDP 相关类
Netty里已经帮我们封装好了UDP相关的实现类。使用起来也非常方便
1、AddressedEnvelope 接口
interface AddressedEnvelope<M, A extends SocketAddress> extends ReferenceCounted
定义一个消息,其包装了另一个消息并带有发送者和接收者地址。其中M 是消息类型;A 是地址类型
2、DefaultAddressedEnvelope类
class DefaultAddressedEnvelope<M, A extends SocketAddress>implements AddressedEnvelope<M,A>
提供了interface AddressedEnvelope的默认实现
3、DatagramPacket 类
class DatagramPacket extends DefaultAddressedEnvelope<ByteBuf, InetSocketAddress> implements ByteBufHolder
扩展了DefaultAddressedEnvelope 以使用ByteBuf 作为消息数据容器。DatagramPacket是final类不能被继承,只能被使用。
他有三个重要方法
content() 来获取消息内容
sender() 来获取发送者的消息
recipient() 来获取接收者的消息。
4、DatagramChannel 接口
interface DatagramChannel extends Channel
扩展了Netty 的Channel 抽象以支持UDP 的多播组管理
5、NioDatagramChannel
class NioDatagramChannel extends AbstractNioMessageChannel implements DatagramChannel
定义了一个能够发送和接收Addressed-Envelope 消息的Channel 类型
Netty 的DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程节点通信。类似于在我们先前的类比中的明信片,它包含了接收者(和可选的发送者)的地址以及消息的有效负载本身。
三、Netty实现UDP单播
1、 AnswerHandler
定义消息应答服务处理类,改类主要随机从字符串数组中选择一个发送给客户端
/** * 作者:DarkKIng * 类说明:应答处理Handler */ public class AnswerHandler extends SimpleChannelInboundHandler<DatagramPacket> { /*应答的具体内容从常量字符串数组中取得,由nextQuote方法随机获取*/ private static final String[] DICTIONARY = { "一个男生暗恋一个女生很久了。一天自习课上,男生偷偷的传了小纸条给女生,上面写着“其实我注意你很久了”。不一会儿,女生传了另一张纸条,男生心急火燎的打开一看“拜托你不要告诉老师,我保证以后再也不嗑瓜子了”。。。。。。男生一脸懵逼", "昨天因为一件事骂儿子,说你妈妈是猪,你也是头猪。儿子却反过来说我:爸爸你怎么这么衰,娶了一头猪,还生了一只猪!你说你这熊孩子,这是不是找打。", "火云邪神苦练多年,终于将蛤蟆功练至顶级并成功产下8个小蝌蚪。", "老婆永远是对的,这话永远也是对的。但老婆没想到的是,她不一定永远是老婆", "人生天地间没有谁是容易的,就算是思聪也得每天犯愁怎么花钱。", "今天去理发,洗剪吹68,烫发和染发668。我就做了个洗剪吹,结账的时候发现居然收我66"}; private static Random r = new Random(); private String nextQuote(){ return DICTIONARY[r.nextInt(DICTIONARY.length-1)]; } @Override protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception { //获得请求 String req = packet.content().toString(CharsetUtil.UTF_8); System.out.println("接收到请求:"+req); if(UdpQuestionSide.QUESTION.equals(req)){ String answer = UdpAnswerSide.ANSWER+nextQuote(); System.out.println("接收到请求:"+req); /** * 重新 new 一个DatagramPacket对象,我们通过packet.sender()来获取发送者的消息。重新发送出去! */ ctx.writeAndFlush( new DatagramPacket( Unpooled.copiedBuffer( answer, CharsetUtil.UTF_8), packet.sender())); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(); } }
2、UdpAnswerSide
定义应答服务器
public final static String ANSWER = "笑话来了:"; public void run(int port) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try { /*和tcp的不同,udp没有接受连接的说法,所以即使是接收端, 也使用Bootstrap*/ Bootstrap b = new Bootstrap(); /*由于我们用的是UDP协议,所以要用NioDatagramChannel来创建*/ b.group(group) .channel(NioDatagramChannel.class) .handler(new AnswerHandler()); //没有接受客户端连接的过程,监听本地端口即可 ChannelFuture f = b.bind(port).sync(); System.out.println("应答服务已启动....."); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } public static void main(String [] args) throws Exception{ int port = 8080; new UdpAnswerSide().run(port); }
3、QuestoinHandler
定义应答服务器处理handler
/** * 作者:DarkKIng * 类说明:订阅handler,读取服务器的应答 */ public class QuestoinHandler extends SimpleChannelInboundHandler<DatagramPacket> { @Override protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { //获得应答,DatagramPacket提供了content()方法取得报文的实际内容 String response = msg.content().toString(CharsetUtil.UTF_8); if (response.startsWith(UdpAnswerSide.ANSWER)) { System.out.println(response); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
4、UdpQuestionSide
定义了一个请求客户端
/** * 作者:DarkKIng * 类说明:订阅服务器 */ public class UdpQuestionSide { public final static String QUESTION = "我想听个笑话"; public void run(int port) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) /*由于我们用的是UDP协议,所以要用NioDatagramChannel来创建*/ .channel(NioDatagramChannel.class) .handler(new QuestoinHandler()); //不需要建立连接 Channel ch = b.bind(0).sync().channel(); //将UDP请求的报文以DatagramPacket打包发送给接受端 ch.writeAndFlush( new DatagramPacket( Unpooled.copiedBuffer(QUESTION, CharsetUtil.UTF_8), new InetSocketAddress("127.0.0.1", port))) .sync(); //不知道接收端能否收到报文,也不知道能否收到接收端的应答报文 // 所以等待15秒后,不再等待,关闭通信 if(!ch.closeFuture().await(15000)){ System.out.println("查询超时!"); } } catch (Exception e) { group.shutdownGracefully(); } } public static void main(String [] args) throws Exception{ int answerPort = 8080; new UdpQuestionSide().run(answerPort); } }
5、程序演示
该程序主要实现了客户单向服务器单点请求一个笑话。服务器随机返回一个笑话。
开启应答服务
开启客户端发送请求
在开启一个客户端发送请求
四、Netty实现UDP广播
1、LogConst
定义消息常量类,用来模拟日志
/** * 作者:DarkKIng * 类说明:日志信息,用String数组代替 */ public class LogConst { public final static int MONITOR_SIDE_PORT = 9998; private static final String[] LOG_INFOS = { "晨光微好,暖在夕阳。幽幽的巷子里,有着岁月酝酿的酒,愈久愈淳。一笔墨香,一盏明灯,记千帆过浪,数不尽的悲欢离合,待那水莲花开。", "未来无期,静在安好。一剪寒梅,纷扰了岁月,抚平了伤痕。摆动的双桨,拨动了心的潭水。陌上花开,落一地秋霜,红枫染了红尘,便许了你十里红装。", "离别的风,风干了月的泪。夜里的美", "是梦的呢喃低语,挥走一片彩云,段落成珠。拂袖离去,乘鹤而来,古道西风瘦马。斑驳的树影中,眉目如画的眼,轻语告别了往事如烟。", "无言的殇,几世沧桑,几生悲凉。一起剪了西窗烛,听了夜来风吹雨。昨日的叹息,今日的迷离,执一伞,存了一世一笔的温情。一曲长歌,唱尽了一世繁华,一世缘……", "一世恋书,那便十里花开。一生凄凉,那便霜花十里。" , "一抹浓烟,便翻页书,展颜一笑,是时间带来遥远的梦。细数树的年轮,感受昨日惆怅,留一半清醒,梦一半叶落。在指尖流过的沙,海边浪花一朵朵,不相遇,才有不约而同。", "这世俗,太多牵挂留在心间,一点朱砂泪,一曲相诗歌。岁月朦胧,梦醒了人生,风雨相容,演绎了一段风情。雪亦梦,雨亦梦,万张红纸从天洒来。惊动了山,撼动了天。" + "一纸情愁,一指烟凉。一相思,一思量,水漫岸头,我们都有着自己不同的三生故事。迎一夜秋风,送一世暖阳,一切冰雪里的花开,是我一生的柔情。" + "记忆中的短笛,有着清风须来的气息,那时我们面向大海,海风在耳边述说着大海边缘的温暖故事。安好一轮冷月,静好了一残红日,这便是我的语言,我的情丝。" + "一漫山水,一段情,留在了岁月,拭去了风,晴雨清风,倒是暖阳拂绿草。" + "这便,晨光微好,花开静好……"}; private final static Random r = new Random(); public static String getLogInfo(){ return LOG_INFOS[r.nextInt(LOG_INFOS.length-1)]; } }
2、LogMsg
消息实体类
/** * 作者:DarkKIng * 类说明:日志实体类 */ public final class LogMsg { public static final byte SEPARATOR = (byte) ':'; /*源的 InetSocketAddress*/ private final InetSocketAddress source; /*消息内容*/ private final String msg; /*消息id*/ private final long msgId; /*消息发送的时间*/ private final long time; //用于传入消息的构造函数 public LogMsg(String msg) { this(null, msg,-1,System.currentTimeMillis()); } //用于传出消息的构造函数 public LogMsg(InetSocketAddress source, long msgId, String msg) { this(source,msg,msgId,System.currentTimeMillis()); } public LogMsg(InetSocketAddress source, String msg, long msgId, long time) { this.source = source; this.msg = msg; this.msgId = msgId; this.time = time; } //返回发送 LogMsg 的源的 InetSocketAddress public InetSocketAddress getSource() { return source; } //返回消息内容 public String getMsg() { return msg; } //返回消息id public long getMsgId() { return msgId; } //返回消息中的时间 public long getTime() { return time; } }
3、LogEventEncoder
日志编码类
/** * 作者:DarkKIng * 类说明:编码,将实际的日志实体类编码为DatagramPacket */ public class LogEventEncoder extends MessageToMessageEncoder<LogMsg> { private final InetSocketAddress remoteAddress; //LogEventEncoder 创建了即将被发送到指定的 InetSocketAddress // 的 DatagramPacket 消息 public LogEventEncoder(InetSocketAddress remoteAddress) { this.remoteAddress = remoteAddress; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, LogMsg logMsg, List<Object> out) throws Exception { byte[] msg = logMsg.getMsg().getBytes(CharsetUtil.UTF_8); //容量的计算:两个long型+消息的内容+分割符 ByteBuf buf = channelHandlerContext.alloc() .buffer(8*2 + msg.length + 1); //将发送时间写入到 ByteBuf中 buf.writeLong(logMsg.getTime()); //将消息id写入到 ByteBuf中 buf.writeLong(logMsg.getMsgId()); //添加一个 SEPARATOR buf.writeByte(LogMsg.SEPARATOR); //将日志消息写入 ByteBuf中 buf.writeBytes(msg); //将一个拥有数据和目的地地址的新 DatagramPacket 添加到出站的消息列表中 out.add(new DatagramPacket(buf, remoteAddress)); } }
4、LogEventBroadcaster
日志广播端
/** * 作者:DarkKIng * 类说明:日志的广播端 */ public class LogEventBroadcaster { private final EventLoopGroup group; private final Bootstrap bootstrap; public LogEventBroadcaster(InetSocketAddress remoteAddress) { group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); //引导该 NioDatagramChannel(无连接的) bootstrap.group(group).channel(NioDatagramChannel.class) //设置 SO_BROADCAST 套接字选项 .option(ChannelOption.SO_BROADCAST, true) .handler(new LogEventEncoder(remoteAddress)); } public void run() throws Exception { //绑定 Channel Channel ch = bootstrap.bind(0).sync().channel(); long count = 0; //启动主处理循环,模拟日志发送 for (;;) { ch.writeAndFlush(new LogMsg(null, ++count, LogConst.getLogInfo())); try { //休眠 2 秒,如果被中断,则退出循环; Thread.sleep(2000); } catch (InterruptedException e) { Thread.interrupted(); break; } } } public void stop() { group.shutdownGracefully(); } public static void main(String[] args) throws Exception { //创建并启动一个新的 UdpQuestionSide 的实例 LogEventBroadcaster broadcaster = new LogEventBroadcaster( //表明本应用发送的报文并没有一个确定的目的地,也就是进行广播 new InetSocketAddress("255.255.255.255", LogConst.MONITOR_SIDE_PORT)); try { System.out.println("广播服务启动"); broadcaster.run(); } finally { broadcaster.stop(); } } }
5、LogEventDecoder
日志解码类,将DatagramPacket解码为实际的日志实体类
/** * 作者:DarkKIng * 类说明:解码,将DatagramPacket解码为实际的日志实体类 */ public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> { @Override protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception { //获取对 DatagramPacket 中的数据(ByteBuf)的引用 ByteBuf data = datagramPacket.content(); long time = new Date().getTime(); System.out.println(time+" 接受到发送的消息:"); //获得消息的id long msgId = data.readLong(); //获得分隔符SEPARATOR byte sepa = data.readByte(); //获取读索引的当前位置,就是分隔符的索引+1 int idx = data.readerIndex(); //提取日志消息,从读索引开始,到最后为日志的信息 String sendMsg = data.slice(idx , data.readableBytes()).toString(CharsetUtil.UTF_8); //构建一个新的 LogMsg 对象,并且将它添加到(已经解码的消息的)列表中 LogMsg event = new LogMsg(datagramPacket.sender(), msgId, sendMsg); //作为本handler的处理结果,交给后面的handler进行处理 out.add(event); } }
6、LogEventHandler
日志的业务处理类,实际的业务处理,接受日志信息
/** * 作者:DarkKIng * 类说明:日志的业务处理类,实际的业务处理,接受日志信息 */ public class LogEventHandler extends SimpleChannelInboundHandler<LogMsg> { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //当异常发生时,打印栈跟踪信息,并关闭对应的 Channel cause.printStackTrace(); ctx.close(); } @Override public void channelRead0(ChannelHandlerContext ctx, LogMsg event) throws Exception { //创建 StringBuilder,并且构建输出的字符串 StringBuilder builder = new StringBuilder(); builder.append(event.getTime()); builder.append(" ["); builder.append(event.getSource().toString()); builder.append("] :["); builder.append(event.getMsgId()); builder.append("] :"); builder.append(event.getMsg()); //打印 LogMsg 的数据 System.out.println(builder.toString()); } }
7、LogEventMonitor
日志订阅类(接收客户端)
** * 作者:DarkKIng * 类说明:日志的接受端 */ public class LogEventMonitor { private final EventLoopGroup group; private final Bootstrap bootstrap; public LogEventMonitor(InetSocketAddress address) { group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); //引导该 NioDatagramChannel bootstrap.group(group) .channel(NioDatagramChannel.class) //设置套接字选项 SO_BROADCAST .option(ChannelOption.SO_BROADCAST, true) //允许重用 .option(ChannelOption.SO_REUSEADDR,true) .handler( new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new LogEventDecoder()); pipeline.addLast(new LogEventHandler()); } } ) .localAddress(address); } public Channel bind() { //绑定 Channel。注意,DatagramChannel 是无连接的 return bootstrap.bind().syncUninterruptibly().channel(); } public void stop() { group.shutdownGracefully(); } public static void main(String[] args) throws Exception { //构造一个新的 UdpAnswerSide并指明监听端口 LogEventMonitor monitor = new LogEventMonitor( new InetSocketAddress(LogConst.MONITOR_SIDE_PORT)); try { //绑定本地监听端口 Channel channel = monitor.bind(); System.out.println("UdpAnswerSide running"); channel.closeFuture().sync(); } finally { monitor.stop(); } } }
8、程序演示
该程序模拟客户端订阅服务器端,服务器定时向订阅客户端发送日志信息。
启动广播服务器
开启三个订阅客户端
查看打印,都一模一样
本章主要介绍下Netty实现UDP的广播应用,大家可以好好理解体验下。顺便考虑下实现一个消息中间件需要注意哪些呢? 演示代码已经提交到Github。大家需要可以下载 https://github.com/379685397/netty