网络通信的核心机制:Socket如何实现高效数据传输(下)

本文涉及的产品
数据传输服务 DTS,数据迁移 small 3个月
推荐场景:
MySQL数据库上云
数据传输服务 DTS,数据同步 small 3个月
推荐场景:
数据库上云
密钥管理服务KMS,1000个密钥,100个凭据,1个月
简介: 网络通信的核心机制:Socket如何实现高效数据传输

5)其他编解码方式

使用编解码器来充当编码器和解码器的组合失去了单独使用编码器或解码器的灵活性,编解码器是要么都有要么都没有。你可能想知道是否有解决这个僵化问题的方式,还可以让编码器和解码器在ChannelPipeline中作为一个逻辑单元。幸运的是,Netty提供了一种解决方案,使用CombinedChannelDuplexHandler。虽然这个类不是编解码器API的一部分,但是它经常被用来简历一个编解码器。

CombinedChannelDuplexHandler

如何使用CombinedChannelDuplexHandler来结合解码器和编码器呢?下面我们从两个简单的例子看了解。

[java] view plaincopy
1. /** 
2.  * 解码器,将byte转成char 
3.  * @author c.k 
4.  * 
5.  */  
6. public class ByteToCharDecoder extends ByteToMessageDecoder {  
7.   
8.     @Override  
9.     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {  
10.         while(in.readableBytes() >= 2){  
11.             out.add(Character.valueOf(in.readChar()));  
12.         }  
13.     }  
14.   
15. }


[java] view plaincopy
1. /** 
2.  * 编码器,将char转成byte 
3.  * @author Administrator 
4.  * 
5.  */  
6. public class CharToByteEncoder extends MessageToByteEncoder<Character> {  
7.   
8.     @Override  
9.     protected void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception {  
10.         out.writeChar(msg);  
11.     }  
12. }  
[java] view plaincopy
1. /** 
2.  * 继承CombinedChannelDuplexHandler,用于绑定解码器和编码器 
3.  * @author c.k 
4.  * 
5.  */  
6. public class CharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {  
7.     public CharCodec(){  
8.         super(new ByteToCharDecoder(), new CharToByteEncoder());  
9.     }  
10. }

从上面代码可以看出,使用CombinedChannelDuplexHandler绑定解码器和编码器很容易实现,比使用*Codec更灵活。

Netty还提供了其他的协议支持,放在io.netty.handler.codec包下,如:

  • Google的protobuf,在io.netty.handler.codec.protobuf包下
  • Google的SPDY协议
  • RTSP(Real Time Streaming Protocol,实时流传输协议),在io.netty.handler.codec.rtsp包下
  • SCTP(Stream Control Transmission Protocol,流控制传输协议),在io.netty.handler.codec.sctp包下

8)ChannelHandler和Codec

使用SSL/TLS创建安全的Netty程序

  • 使用Netty创建HTTP/HTTPS程序
  • 处理空闲连接和超时
  • 解码分隔符和基于长度的协议
  • 写大数据
  • 序列化数据

上一章讲解了如何创建自己的编解码器,我们现在可以用上一章的知识来编写自己的编解码器。不过Netty提供了一些标准的ChannelHandler和Codec。Netty提供了很多协议的支持,所以我们不必自己发明轮子。Netty提供的这些实现可以解决我们的大部分需求。本章讲解Netty中使用SSL/TLS编写安全的应用程序,编写HTTP协议服务器,以及使用如WebSocket或Google的SPDY协议来使HTTP服务获得更好的性能;这些都是很常见的应用,本章还会介绍数据压缩,在数据量比较大的时候,压缩数据是很有必要的。

使用SSL/TLS创建安全的Netty程序

通信数据在网络上传输一般是不安全的,因为传输的数据可以发送纯文本或二进制的数据,很容易被破解。我们很有必要对网络上的数据进行加密。SSL和TLS是众所周知的标准和分层的协议,它们可以确保数据时私有的。例如,使用HTTPS或SMTPS都使用了SSL/TLS对数据进行了加密。

对于SSL/TLS,Java中提供了抽象的SslContext和SslEngine。实际上,SslContext可以用来获取SslEngine来进行加密和解密。使用指定的加密技术是高度可配置的,但是这不在本章范围。Netty扩展了Java的SslEngine,添加了一些新功能,使其更适合基于Netty的应用程序。Netty提供的这个扩展是SslHandler,是SslEngine的包装类,用来对网络数据进行加密和解密。

如何使用ChannelInitializer将SslHandler添加到ChannelPipeline,看下面代码:

[java] view plaincopy
1. public class SslChannelInitializer extends ChannelInitializer<Channel> {  
2.   
3.     private final SSLContext context;  
4.     private final boolean client;  
5.     private final boolean startTls;  
6.   
7.     public SslChannelInitializer(SSLContext context, boolean client, boolean startTls) {  
8.         this.context = context;  
9.         this.client = client;  
10.         this.startTls = startTls;  
11.     }  
12.   
13.     @Override  
14.     protected void initChannel(Channel ch) throws Exception {  
15.         SSLEngine engine = context.createSSLEngine();  
16.         engine.setUseClientMode(client);  
17.         ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));  
18.     }  
19. }

需要注意一点,SslHandler必须要添加到ChannelPipeline的第一个位置,可能有一些例外,但是最好这样来做。回想一下之前讲解的ChannelHandler,ChannelPipeline就像是一个在处理“入站”数据时先进先出,在处理“出站”数据时后进先出的队列。最先添加的SslHandler会啊在其他Handler处理逻辑数据之前对数据进行加密,从而确保Netty服务端的所有的Handler的变化都是安全的。

SslHandler提供了一些有用的方法,可以用来修改其行为或得到通知,一旦SSL/TLS完成握手(在握手过程中的两个对等通道互相验证对方,然后选择一个加密密码),SSL/TLS是自动执行的。看下面方法列表:

  • setHandshakeTimeout(long handshakeTimeout, TimeUnit unit),设置握手超时时间,ChannelFuture将得到通知
  • setHandshakeTimeoutMillis(long handshakeTimeoutMillis),设置握手超时时间,ChannelFuture将得到通知
  • getHandshakeTimeoutMillis(),获取握手超时时间值
  • setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit),设置关闭通知超时时间,若超时,ChannelFuture会关闭失败
  • setHandshakeTimeoutMillis(long handshakeTimeoutMillis),设置关闭通知超时时间,若超时,ChannelFuture会关闭失败
  • getCloseNotifyTimeoutMillis(),获取关闭通知超时时间
  • handshakeFuture(),返回完成握手后的ChannelFuture
  • close(),发送关闭通知请求关闭和销毁

使用Netty创建HTTP/HTTPS程序

HTTP/HTTPS是最常用的协议之一,可以通过HTTP/HTTPS访问网站,或者是提供对外公开的接口服务等等。Netty附带了使用HTTP/HTTPS的handlers,而不需要我们自己来编写编解码器。

Netty的HTTP编码器,解码器和编解码器

HTTP是请求-响应模式,客户端发送一个http请求,服务就响应此请求。Netty提供了简单的编码解码HTTP协议消息的Handler。下图显示了http请求和响应:

一个HTTP请求/响应消息可能包含不止一个,但最终都会有LastHttpContent消息。FullHttpRequest和FullHttpResponse是Netty提供的两个接口,分别用来完成http请求和响应。所有的HTTP消息类型都实现了HttpObject接口。下面是类关系图:

Netty提供了HTTP请求和响应的编码器和解码器,看下面列表:

  • HttpRequestEncoder,将HttpRequest或HttpContent编码成ByteBuf
  • HttpRequestDecoder,将ByteBuf解码成HttpRequest和HttpContent
  • HttpResponseEncoder,将HttpResponse或HttpContent编码成ByteBuf
  • HttpResponseDecoder,将ByteBuf解码成HttpResponse和HttpContent

看下面代码:

[java] view plaincopy
1. public class HttpDecoderEncoderInitializer extends ChannelInitializer<Channel> {  
2.   
3.     private final boolean client;  
4.   
5.     public HttpDecoderEncoderInitializer(boolean client) {  
6.         this.client = client;  
7.     }  
8.   
9.     @Override  
10.     protected void initChannel(Channel ch) throws Exception {  
11.         ChannelPipeline pipeline = ch.pipeline();  
12.         if (client) {  
13.             pipeline.addLast("decoder", new HttpResponseDecoder());  
14.             pipeline.addLast("", new HttpRequestEncoder());  
15.         } else {  
16.             pipeline.addLast("decoder", new HttpRequestDecoder());  
17.             pipeline.addLast("encoder", new HttpResponseEncoder());  
18.         }  
19.     }  
20. }

如果你需要在ChannelPipeline中有一个解码器和编码器,还分别有一个在客户端和服务器简单的编解码器:HttpClientCodec和HttpServerCodec。

在ChannelPipelien中有解码器和编码器(或编解码器)后就可以操作不同的HttpObject消息了;但是HTTP请求和响应可以有很多消息数据,你需要处理不同的部分,可能也需要聚合这些消息数据,这是很麻烦的。为了解决这个问题,Netty提供了一个聚合器,它将消息部分合并到FullHttpRequest和FullHttpResponse,因此不需要担心接收碎片消息数据。

HTTP消息聚合

处理HTTP时可能接收HTTP消息片段,Netty需要缓冲直到接收完整个消息。要完成的处理HTTP消息,并且内存开销也不会很大,Netty为此提供了HttpObjectAggregator。通过HttpObjectAggregator,Netty可以聚合HTTP消息,使用FullHttpResponse和FullHttpRequest到ChannelPipeline中的下一个ChannelHandler,这就消除了断裂消息,保证了消息的完整。下面代码显示了如何聚合:

[java] view plaincopy
1. /** 
2.  * 添加聚合http消息的Handler 
3.  *  
4.  * @author c.k 
5.  *  
6.  */  
7. public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {  
8.   
9.     private final boolean client;  
10.   
11.     public HttpAggregatorInitializer(boolean client) {  
12.         this.client = client;  
13.     }  
14.   
15.     @Override  
16.     protected void initChannel(Channel ch) throws Exception {  
17.         ChannelPipeline pipeline = ch.pipeline();  
18.         if (client) {  
19.             pipeline.addLast("codec", new HttpClientCodec());  
20.         } else {  
21.             pipeline.addLast("codec", new HttpServerCodec());  
22.         }  
23.         pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024));  
24.     }  
25.   
26. }

如上面代码,很容使用Netty自动聚合消息。但是请注意,为了防止Dos攻击服务器,需要合理的限制消息的大小。应设置多大取决于实际的需求,当然也得有足够的内存可用。

HTTP压缩

使用HTTP时建议压缩数据以减少传输流量,压缩数据会增加CPU负载,现在的硬件设施都很强大,大多数时候压缩数据时一个好主意。Netty支持“gzip”和“deflate”,为此提供了两个ChannelHandler实现分别用于压缩和解压。看下面代码:

[java] view plaincopy
1. @Override  
2. protected void initChannel(Channel ch) throws Exception {  
3.     ChannelPipeline pipeline = ch.pipeline();  
4.     if (client) {  
5.         pipeline.addLast("codec", new HttpClientCodec());  
6.         //添加解压缩Handler  
7.         pipeline.addLast("decompressor", new HttpContentDecompressor());  
8.     } else {  
9.         pipeline.addLast("codec", new HttpServerCodec());  
10.         //添加解压缩Handler  
11.         pipeline.addLast("decompressor", new HttpContentDecompressor());  
12.     }  
13.     pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024));  
14. }

使用HTTPS

网络中传输的重要数据需要加密来保护,使用Netty提供的SslHandler可以很容易实现,看下面代码:

[java] view plaincopy
1. /** 
2.  * 使用SSL对HTTP消息加密 
3.  *  
4.  * @author c.k 
5.  *  
6.  */  
7. public class HttpsCodecInitializer extends ChannelInitializer<Channel> {  
8.   
9.     private final SSLContext context;  
10.     private final boolean client;  
11.   
12.     public HttpsCodecInitializer(SSLContext context, boolean client) {  
13.         this.context = context;  
14.         this.client = client;  
15.     }  
16.   
17.     @Override  
18.     protected void initChannel(Channel ch) throws Exception {  
19.         SSLEngine engine = context.createSSLEngine();  
20.         engine.setUseClientMode(client);  
21.         ChannelPipeline pipeline = ch.pipeline();  
22.         pipeline.addFirst("ssl", new SslHandler(engine));  
23.         if (client) {  
24.             pipeline.addLast("codec", new HttpClientCodec());  
25.         } else {  
26.             pipeline.addLast("codec", new HttpServerCodec());  
27.         }  
28.     }  
29.   
30. }

WebSocket

HTTP是不错的协议,但是如果需要实时发布信息怎么做?有个做法就是客户端一直轮询请求服务器,这种方式虽然可以达到目的,但是其缺点很多,也不是优秀的解决方案,为了解决这个问题,便出现了WebSocket。

WebSocket允许数据双向传输,而不需要请求-响应模式。早期的WebSocket只能发送文本数据,然后现在不仅可以发送文本数据,也可以发送二进制数据,这使得可以使用WebSocket构建你想要的程序。下图是WebSocket的通信示例图:

在应用程序中添加WebSocket支持很容易,Netty附带了WebSocket的支持,通过ChannelHandler来实现。使用WebSocket有不同的消息类型需要处理。下面列表列出了Netty中WebSocket类型:

  • BinaryWebSocketFrame,包含二进制数据
  • TextWebSocketFrame,包含文本数据
  • ContinuationWebSocketFrame,包含二进制数据或文本数据,BinaryWebSocketFrame和TextWebSocketFrame的结合体
  • CloseWebSocketFrame,WebSocketFrame代表一个关闭请求,包含关闭状态码和短语
  • PingWebSocketFrame,WebSocketFrame要求PongWebSocketFrame发送数据
  • PongWebSocketFrame,WebSocketFrame要求PingWebSocketFrame响应

为了简化,我们只看看如何使用WebSocket服务器。客户端使用可以看Netty自带的WebSocket例子。

Netty提供了许多方法来使用WebSocket,但最简单常用的方法是使用WebSocketServerProtocolHandler。看下面代码:

[java] view plaincopy
1. /** 
2.  * WebSocket Server,若想使用SSL加密,将SslHandler加载ChannelPipeline的最前面即可 
3.  * @author c.k 
4.  * 
5.  */  
6. public class WebSocketServerInitializer extends ChannelInitializer<Channel> {  
7.   
8.     @Override  
9.     protected void initChannel(Channel ch) throws Exception {  
10.         ch.pipeline().addLast(new HttpServerCodec(),   
11.                 new HttpObjectAggregator(65536),  
12.                 new WebSocketServerProtocolHandler("/websocket"),  
13.                 new TextFrameHandler(),  
14.                 new BinaryFrameHandler(),  
15.                 new ContinuationFrameHandler());  
16.     }  
17.   
18.     public static final class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {  
19.         @Override  
20.         protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {  
21.             // handler text frame  
22.         }  
23.     }  
24.       
25.     public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame>{  
26.         @Override  
27.         protected void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception {  
28.             //handler binary frame  
29.         }  
30.     }  
31.       
32.     public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame>{  
33.         @Override  
34.         protected void channelRead0(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception {  
35.             //handler continuation frame  
36.         }  
37.     }  
38. }

SPDY

SPDY(读作“SPeeDY”)是Google开发的基于TCP的应用层协议,用以最小化网络延迟,提升网络速度,优化用户的网络使用体验。SPDY并不是一种用于替代HTTP的协议,而是对HTTP协议的增强。新协议的功能包括数据流的多路复用、请求优先级以及HTTP报头压缩。谷歌表示,引入SPDY协议后,在实验室测试中页面加载速度比原先快64%。

SPDY的定位:

  • 将页面加载时间减少50%。
  • 最大限度地减少部署的复杂性。SPDY使用TCP作为传输层,因此无需改变现有的网络设施。
  • 避免网站开发者改动内容。支持SPDY唯一需要变化的是客户端代理和Web服务器应用程序。

SPDY实现技术:

  • 单个TCP连接支持并发的HTTP请求。
  • 压缩报头和去掉不必要的头部来减少当前HTTP使用的带宽。
  • 定义一个容易实现,在服务器端高效率的协议。通过减少边缘情况、定义易解析的消息格式来减少HTTP的复杂性。
  • 强制使用SSL,让SSL协议在现存的网络设施下有更好的安全性和兼容性。
  • 允许服务器在需要时发起对客户端的连接并推送数据。
  • SPDY具体的细节知识及使用可以查阅相关资料,这里不作赘述了。

处理空闲连接和超时

处理空闲连接和超时是网络应用程序的核心部分。当发送一条消息后,可以检测连接是否还处于活跃状态,若很长时间没用了就可以断开连接。Netty提供了很好的解决方案,有三种不同的ChannelHandler处理闲置和超时连接:

  • IdleStateHandler,当一个通道没有进行读写或运行了一段时间后出发IdleStateEvent
  • ReadTimeoutHandler,在指定时间内没有接收到任何数据将抛出ReadTimeoutException
  • WriteTimeoutHandler,在指定时间内有写入数据将抛出WriteTimeoutException

最常用的是IdleStateHandler,下面代码显示了如何使用IdleStateHandler,如果60秒内没有接收数据或发送数据,操作将失败,连接将关闭:

[java] view plaincopy
1. public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {  
2.   
3.     @Override  
4.     protected void initChannel(Channel ch) throws Exception {  
5.         ChannelPipeline pipeline = ch.pipeline();  
6.         pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));  
7.         pipeline.addLast(new HeartbeatHandler());  
8.     }  
9.   
10.     public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter {  
11.         private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(  
12.                 "HEARTBEAT", CharsetUtil.UTF_8));  
13.   
14.         @Override  
15.         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
16.             if (evt instanceof IdleStateEvent) {  
17.                 ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);  
18.             } else {  
19.                 super.userEventTriggered(ctx, evt);  
20.             }  
21.         }  
22.     }  
23. }

解码分隔符和基于长度的协议

使用Netty时会遇到需要解码以分隔符和长度为基础的协议,本节讲解Netty如何解码这些协议。分隔符协议

经常需要处理分隔符协议或创建基于它们的协议,例如SMTP、POP3、IMAP、Telnet等等;Netty附带的handlers可以很容易的提取一些序列分隔:

  • DelimiterBasedFrameDecoder,解码器,接收ByteBuf由一个或多个分隔符拆分,如NUL或换行符
  • LineBasedFrameDecoder,解码器,接收ByteBuf以分割线结束,如"\n"和"\r\n"

下面代码显示使用LineBasedFrameDecoder提取"\r\n"分隔帧:

[java] view plaincopy
1. /** 
2.  * 处理换行分隔符消息 
3.  * @author c.k 
4.  * 
5.  */  
6. public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {  
7.   
8.     @Override  
9.     protected void initChannel(Channel ch) throws Exception {  
10.         ch.pipeline().addLast(new LineBasedFrameDecoder(65 * 1204), new FrameHandler());  
11.     }  
12.   
13.     public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {  
14.         @Override  
15.         protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {  
16.             // do something with the frame  
17.         }  
18.     }  
19. }

如果框架的东西除了换行符还有别的分隔符,可以使用DelimiterBasedFrameDecoder,只需要将分隔符传递到构造方法中。如果想实现自己的以分隔符为基础的协议,这些解码器是有用的。例如,现在有个协议,它只处理命令,这些命令由名称和参数形成,名称和参数由一个空格分隔,实现这个需求的代码如下:

[java] view plaincopy
1. /** 
2.  * 自定义以分隔符为基础的协议 
3.  * @author c.k 
4.  * 
5.  */  
6. public class CmdHandlerInitializer extends ChannelInitializer<Channel> {  
7.   
8.     @Override  
9.     protected void initChannel(Channel ch) throws Exception {  
10.         ch.pipeline().addLast(new CmdDecoder(65 * 1024), new CmdHandler());  
11.     }  
12.   
13.     public static final class Cmd {  
14.         private final ByteBuf name;  
15.         private final ByteBuf args;  
16.   
17.         public Cmd(ByteBuf name, ByteBuf args) {  
18.             this.name = name;  
19.             this.args = args;  
20.         }  
21.   
22.         public ByteBuf getName() {  
23.             return name;  
24.         }  
25.   
26.         public ByteBuf getArgs() {  
27.             return args;  
28.         }  
29.     }  
30.   
31.     public static final class CmdDecoder extends LineBasedFrameDecoder {  
32.   
33.         public CmdDecoder(int maxLength) {  
34.             super(maxLength);  
35.         }  
36.   
37.         @Override  
38.         protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {  
39.             ByteBuf frame = (ByteBuf) super.decode(ctx, buffer);  
40.             if (frame == null) {  
41.                 return null;  
42.             }  
43.             int index = frame.indexOf(frame.readerIndex(), frame.writerIndex(), (byte) ' ');  
44.             return new Cmd(frame.slice(frame.readerIndex(), index), frame.slice(index + 1, frame.writerIndex()));  
45.         }  
46.     }  
47.   
48.     public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd> {  
49.         @Override  
50.         protected void channelRead0(ChannelHandlerContext ctx, Cmd msg) throws Exception {  
51.             // do something with the command  
52.         }  
53.     }  
54.   
55. }

长度为基础的协议

一般经常会碰到以长度为基础的协议,对于这种情况Netty有两个不同的解码器可以帮助我们来解码:

  • FixedLengthFrameDecoder
  • LengthFieldBasedFrameDecoder

FixedLengthFrameDecoder提取固定长度,例子中的是8字节。大部分时候帧的大小被编码在头部,这种情况可以使用LengthFieldBasedFrameDecoder,它会读取头部长度并提取帧的长度。

如果长度字段是提取框架的一部分,可以在LengthFieldBasedFrameDecoder的构造方法中配置,还可以指定提供的长度。FixedLengthFrameDecoder很容易使用,我们重点讲解LengthFieldBasedFrameDecoder。下面代码显示如何使用LengthFieldBasedFrameDecoder提取8字节长度:

[java] view plaincopy
1. public class LengthBasedInitializer extends ChannelInitializer<Channel> {  
2.   
3.     @Override  
4.     protected void initChannel(Channel ch) throws Exception {  
5.         ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65*1024, 0, 8))  
6.             .addLast(new FrameHandler());  
7.     }  
8.       
9.     public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf>{  
10.         @Override  
11.         protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {  
12.             //do something with the frame  
13.         }  
14.     }  
15. }

写大数据

写大量的数据的一个有效的方法是使用异步框架,如果内存和网络都处于饱满负荷状态,你需要停止写,否则会报OutOfMemoryError。Netty提供了写文件内容时zero-memory-copy机制,这种方法再将文件内容写到网络堆栈空间时可以获得最大的性能。使用零拷贝写文件的内容时通过DefaultFileRegion、ChannelHandlerContext、ChannelPipeline,看下面代码:

[java] view plaincopy
1. @Override  
2. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
3.     File file = new File("test.txt");  
4.     FileInputStream fis = new FileInputStream(file);  
5.     FileRegion region = new DefaultFileRegion(fis.getChannel(), 0, file.length());  
6.     Channel channel = ctx.channel();  
7.     channel.writeAndFlush(region).addListener(new ChannelFutureListener() {  
8.           
9.         @Override  
10.         public void operationComplete(ChannelFuture future) throws Exception {  
11.             if(!future.isSuccess()){  
12.                 Throwable cause = future.cause();  
13.                 // do something  
14.             }  
15.         }  
16.     });  
17. }

如果只想发送文件中指定的数据块应该怎么做呢?Netty提供了ChunkedWriteHandler,允许通过处理ChunkedInput来写大的数据块。下面是ChunkedInput的一些实现类:

  • ChunkedFile
  • ChunkedNioFile
  • ChunkedStream
  • ChunkedNioStream

看下面代码:

[java] view plaincopy
1. public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {  
2.     private final File file;  
3.   
4.     public ChunkedWriteHandlerInitializer(File file) {  
5.         this.file = file;  
6.     }  
7.   
8.     @Override  
9.     protected void initChannel(Channel ch) throws Exception {  
10.         ch.pipeline().addLast(new ChunkedWriteHandler())  
11.             .addLast(new WriteStreamHandler());  
12.     }  
13.   
14.     public final class WriteStreamHandler extends ChannelInboundHandlerAdapter {  
15.         @Override  
16.         public void channelActive(ChannelHandlerContext ctx) throws Exception {  
17.             super.channelActive(ctx);  
18.             ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));  
19.         }  
20.     }  
21. }

序列化数据

开发网络程序过程中,很多时候需要传输结构化对象数据POJO,Java中提供了ObjectInputStream和ObjectOutputStream及其他的一些对象序列化接口。Netty中提供基于JDK序列化接口的序列化接口。

普通的JDK序列化

如果你使用ObjectInputStream和ObjectOutputStream,并且需要保持兼容性,不想有外部依赖,那么JDK的序列化是首选。Netty提供了下面的一些接口,这些接口放在io.netty.handler.codec.serialization包下面:

  • CompatibleObjectEncoder
  • CompactObjectInputStream
  • CompactObjectOutputStream
  • ObjectEncoder
  • ObjectDecoder
  • ObjectEncoderOutputStream
  • ObjectDecoderInputStream

通过JBoss编组序列化

如果你想使用外部依赖的接口,JBoss编组是个好方法。JBoss Marshalling序列化的速度是JDK的3倍,并且序列化的结构更紧凑,从而使序列化后的数据更小。Netty附带了JBoss编组序列化的实现,这些实现接口放在io.netty.handler.codec.marshalling包下面:

  • CompatibleMarshallingEncoder
  • CompatibleMarshallingDecoder
  • MarshallingEncoder
  • MarshallingDecoder

看下面代码:

[java] view plaincopy
1. /** 
2.  * 使用JBoss Marshalling 
3.  * @author c.k 
4.  * 
5.  */  
6. public class MarshallingInitializer extends ChannelInitializer<Channel> {  
7.     private final MarshallerProvider marshallerProvider;  
8.     private final UnmarshallerProvider unmarshallerProvider;  
9.   
10.     public MarshallingInitializer(MarshallerProvider marshallerProvider, UnmarshallerProvider unmarshallerProvider) {  
11.         this.marshallerProvider = marshallerProvider;  
12.         this.unmarshallerProvider = unmarshallerProvider;  
13.     }  
14.   
15.     @Override  
16.     protected void initChannel(Channel ch) throws Exception {  
17.         ch.pipeline().addLast(new MarshallingDecoder(unmarshallerProvider))  
18.             .addLast(new MarshallingEncoder(marshallerProvider))  
19.             .addLast(new ObjectHandler());  
20.     }  
21.   
22.     public final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {  
23.         @Override  
24.         protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception {  
25.             // do something  
26.         }  
27.     }  
28. }

使用ProtoBuf序列化:

最有一个序列化方案是Netty附带的ProtoBuf。protobuf是Google开源的一种编码和解码技术,它的作用是使序列化数据更高效。并且谷歌提供了protobuf的不同语言的实现,所以protobuf在跨平台项目中是非常好的选择。Netty附带的protobuf放在io.netty.handler.codec.protobuf包下面:

  • ProtobufDecoder
  • ProtobufEncoder
  • ProtobufVarint32FrameDecoder
  • ProtobufVarint32LengthFieldPrepender

看下面代码:

[java] view plaincopy
1. /** 
2.  * 使用protobuf序列化数据,进行编码解码 
3.  * 注意:使用protobuf需要protobuf-java-2.5.0.jar 
4.  * @author Administrator 
5.  * 
6.  */  
7. public class ProtoBufInitializer extends ChannelInitializer<Channel> {  
8.   
9.     private final MessageLite lite;  
10.   
11.     public ProtoBufInitializer(MessageLite lite) {  
12.         this.lite = lite;  
13.     }  
14.   
15.     @Override  
16.     protected void initChannel(Channel ch) throws Exception {  
17.         ch.pipeline().addLast(new ProtobufVarint32FrameDecoder())  
18.             .addLast(new ProtobufEncoder())  
19.             .addLast(new ProtobufDecoder(lite))  
20.             .addLast(new ObjectHandler());  
21.     }  
22.   
23.     public final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {  
24.         @Override  
25.         protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception {  
26.             // do something  
27.         }  
28.     }  
29. }

四、Socket套接字简单应用

所谓通信循环,简单理解就是客户端可以给服务端循环发送信息并获得反馈的过程。

4.1基础版

通信循环的程序分为两部分,即两个python模块,分别为客户端.py和服务端.py

第一部分:服务端.py

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
import socket
phone = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM) # 实例化了一个对象,phone即是一个套接字对象
phone.bind(('127.0.0.1',3301))  # 绑定指定IP和端口
phone.listen(5) # 开机,监听
conn,client = phone.accept() # 等待链接,并将获取的链接赋给conn和client,conn是另一个套接字对象
while True:
    data = conn.recv(1024)   # 收讯息
    print('收到来自客户端的讯息:',data)
    conn.send(data.title()) # 发送讯息
conn.close() # 关闭链接
phone.close() # 关机

第二部分:客户端.py

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
import socket
phone = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM) # 实例化了一个对象,phone即是一个套接字对象
# SOCK_STREAM 指的是TCP协议(或称流式协议)
# SOCK_DGRAM 指的是UDP协议(或称数据报协议)
phone.connect(('127.0.0.1',3301)) # 链接指定IP和端口的服务端
while True:
    mes = input('在此输入>>:').strip()
    phone.send(mes.encode('utf-8')) # 发讯息
    data = phone.recv(1024)                 # 收讯息
    print('收到来自服务端的消息:',data.decode('utf-8'))
phone.close()     # 关机

4.2改进版

上面的程序代码其实是存在bug的,比如没有对用户未输入和输入错误等进行异常处理,改进后的程序代码如下:

第一部分:服务端.py

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
import socket
phone = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM) # 实例化了一个对象,phone即是一个套接字对象
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) # 端口重用, 解决程序关闭后端口仍然占用的情况
phone.bind(('127.0.0.1',3301))  # 绑定指定IP和端口
phone.listen(5) # 开机,监听
conn,client = phone.accept() # 等待链接,并将获取的链接赋给conn和client,conn是另一个套接字对象
while True:
    try:
        data = conn.recv(1024)   # 收讯息
        if not data:break  # 仅适用于linux系统
        print('收到来自客户端的讯息:',data)
        conn.send(data.title()) # 发送讯息
    except ConnectionResetError: # 适用于windows系统
        break
conn.close() # 关闭链接
phone.close() # 关机

第二部分:客户端.py

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
import socket
phone = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM) # 实例化了一个对象,phone即是一个套接字对象
phone.connect(('127.0.0.1',3301)) # 链接指定IP和端口的服务端
while True:
    mes = input('在此输入>>:').strip()
    if not mes:continue  # 防止未输入
    phone.send(mes.encode('utf-8')) # 发讯息
    data = phone.recv(1024)                 # 收讯息
    print('收到来自服务端的消息:',data.decode('utf-8'))
phone.close()     # 关机

4.3Socket套接字实现链接循环 

现在我们不满足于简单地通信循环了,我们的需求增加了链接循环,要求程序不光可以进行通信循环,还可以进行链接循环。

链接循环:我的理解就是服务端一直保持运行,而客户端可以在多个间切换分别与服务端进行连接(简而言之,就是多个客户端与一个服务端间的通信)。

增加功能后,改变的主要是服务端.py,程序同样是分为两部分。

第一部分:服务端.py

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
import socket
phone = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM) # 实例化了一个对象,phone即是一个套接字对象
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) # 端口重用, 解决程序关闭后端口仍然占用的情况
phone.bind(('127.0.0.1',3301))  # 绑定指定IP和端口
phone.listen(5) # 开机,监听
while True:  # 链接循环
    conn,client = phone.accept() # 等待链接,并将获取的链接赋给conn和client,conn是另一个套接字对象
    while True: # 通信循环
        try:
            data = conn.recv(1024)   # 收讯息
            if not data:break  # 仅适用于linux系统
            print('收到来自客户端的讯息:',data)
            conn.send(data.title()) # 发送讯息
        except ConnectionResetError: # 适用于windows系统
            break
    conn.close() # 关闭链接
phone.close() # 关机
'''
同时有多个客户端发送请求时,链接循环条件下服务端只能一次执行一个,服务端可以一直执行,只有当此客户端结束运行,
才能继续执行另一客户端的请求,以此类推客户端1结束后,客户端2执行,2结束,3才执行
'''

第二部分:客户端.py

客户端1.py

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
import socket
phone = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM) # 实例化了一个对象,phone即是一个套接字对象
phone.connect(('127.0.0.1',3301)) # 链接指定IP和端口的服务端
while True:
    mes = input('在此输入>>:').strip()
    if not mes:continue  # 防止未输入
    phone.send(mes.encode('utf-8')) # 发讯息
    data = phone.recv(1024)                 # 收讯息
    print('收到来自服务端的消息:',data.decode('utf-8'))
phone.close()     # 关机

客户端2.py

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
import socket
phone = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM) # 实例化了一个对象,phone即是一个套接字对象
phone.connect(('127.0.0.1',3301)) # 链接指定IP和端口的服务端
while True:
    mes = input('在此输入>>:').strip()
    if not mes:continue  # 防止未输入
    phone.send(mes.encode('utf-8')) # 发讯息
    data = phone.recv(1024)                 # 收讯息
    print('收到来自服务端的消息:',data.decode('utf-8'))
phone.close()     # 关机

客户端3.py

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
import socket
phone = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM) # 实例化了一个对象,phone即是一个套接字对象
phone.connect(('127.0.0.1',3301)) # 链接指定IP和端口的服务端
while True:
    mes = input('在此输入>>:').strip()
    if not mes:continue  # 防止未输入
    phone.send(mes.encode('utf-8')) # 发讯息
    data = phone.recv(1024)                 # 收讯息
    print('收到来自服务端的消息:',data.decode('utf-8'))
phone.close()     # 关机

4.4ssh远程执行命令

1、几点预备知识

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
import os
'''
windows下测试:
    dir:查看某一个文件夹下的子文件名和文件夹名
    ipconfig :查看本地网卡的IP信息
    tasklist:查看运行的进程
'''
# 执行系统命令,并拿到命令的结果
res = os.system('dir')
res = os.system('ipconfig')
print('命令结果:',res) # 命令结果:0 ,0表示执行成功,非0表示失败
import subprocess
obj = subprocess.Popen('dir',shell=True,
                       stdout=subprocess.PIPE, # 获取'dir'命令执行正确时的返回信息
                       stderr=subprocess.PIPE) # # 获取错误命令执行时的返回信息
print(obj) # <subprocess.Popen object at 0x00000252715E70F0>
print('正确时执行stdout>>:',obj.stdout.read().decode('gbk')) # 因为执行的是windows系统的命令,默认编码是gbk, read()获取的是bytes数据,需要decode解码转为操作系统可读的
print('错误时执行stderr>>:',obj.stderr.read().decode('gbk')) # 如果是linx系统,则默认是utf-8


2、核心程序仍然分为两部分

服务端.py

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
import socket
import subprocess
phone = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM) # 实例化一个套接字对象
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) # 端口重用,解决程序结束而端口仍被占用的情况
phone.bind(('127.0.0.1',3301)) # 绑定IP和端口
phone.listen(5) # 监听
while True:
    conn,client = phone.accept() # 获取链接对象和IP、端口
    while True: # 通讯循环
        try:  # 适用于windows系统
            # 1、接受命令
            cmd = conn.recv(1024)
            if not cmd:break # 适用于linux系统
            print('收到来自客户端的命令:',cmd.decode('gbk'))  # 打印接收到的命令
            # 2、执行命令
            obj = subprocess.Popen(cmd.decode('gbk'),shell=True,
                                   stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE)
            print(obj)
            # 3、返回命令执行后的结果
            stdout = obj.stdout.read() # 命令执行正确时结果
            stderr = obj.stderr.read() # 错误命令执行的结果
            conn.send(stdout+stderr)   # 返回执行命令的结果
        except ConnectionResetError:
            break
    conn.close()
phone.close()

客户端.py

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
import socket
phone = socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM) # 创建套接字对象
phone.connect(('127.0.0.1',3301)) # 建立链接
while True: # 通信循环
    cmd = input('输入命令>>:').strip()
    if not cmd:continue
    phone.send(cmd.encode('gbk')) # 发送命令
    data = phone.recv(1024)  # 接收信息
    print('接收来自服务端的信息:',data.decode('gbk')) # 打印接收的信息
phone.close() # 关机



相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
Sqoop 企业级大数据迁移方案实战
Sqoop是一个用于在Hadoop和关系数据库服务器之间传输数据的工具。它用于从关系数据库(如MySQL,Oracle)导入数据到Hadoop HDFS,并从Hadoop文件系统导出到关系数据库。 本课程主要讲解了Sqoop的设计思想及原理、部署安装及配置、详细具体的使用方法技巧与实操案例、企业级任务管理等。结合日常工作实践,培养解决实际问题的能力。本课程由黑马程序员提供。
相关文章
|
2月前
|
开发者 Python
Python Socket编程:不只是基础,更有进阶秘籍,让你的网络应用飞起来!
在数字时代,网络应用成为连接世界的桥梁。Python凭借简洁的语法和丰富的库支持,成为开发高效网络应用的首选。本文通过实时聊天室案例,介绍Python Socket编程的基础与进阶技巧。基础篇涵盖服务器和客户端的建立与数据交换;进阶篇则探讨多线程与异步IO优化方案,助力提升应用性能。通过本案例,你将掌握Socket编程的核心技能,推动网络应用飞得更高、更远。
44 1
|
22天前
|
网络协议 Java 应用服务中间件
深入浅出Tomcat网络通信的高并发处理机制
【10月更文挑战第3天】本文详细解析了Tomcat在处理高并发网络请求时的机制,重点关注了其三种不同的IO模型:NioEndPoint、Nio2EndPoint 和 AprEndPoint。NioEndPoint 采用多路复用模型,通过 Acceptor 接收连接、Poller 监听事件及 Executor 处理请求;Nio2EndPoint 则使用 AIO 异步模型,通过回调函数处理连接和数据就绪事件;AprEndPoint 通过 JNI 调用本地库实现高性能,但已在 Tomcat 10 中弃用
深入浅出Tomcat网络通信的高并发处理机制
|
3天前
|
Kubernetes 网络协议 Python
Python网络编程:从Socket到Web应用
在信息时代,网络编程是软件开发的重要组成部分。Python作为多用途编程语言,提供了从Socket编程到Web应用开发的强大支持。本文将从基础的Socket编程入手,逐步深入到复杂的Web应用开发,涵盖Flask、Django等框架的应用,以及异步Web编程和微服务架构。通过本文,读者将全面了解Python在网络编程领域的应用。
6 1
|
6天前
|
Java
[Java]Socket套接字(网络编程入门)
本文介绍了基于Java Socket实现的一对一和多对多聊天模式。一对一模式通过Server和Client类实现简单的消息收发;多对多模式则通过Server类维护客户端集合,并使用多线程实现实时消息广播。文章旨在帮助读者理解Socket的基本原理和应用。
12 1
|
12天前
|
消息中间件 监控 网络协议
Python中的Socket魔法:如何利用socket模块构建强大的网络通信
本文介绍了Python的`socket`模块,讲解了其基本概念、语法和使用方法。通过简单的TCP服务器和客户端示例,展示了如何创建、绑定、监听、接受连接及发送/接收数据。进一步探讨了多用户聊天室的实现,并介绍了非阻塞IO和多路复用技术以提高并发处理能力。最后,讨论了`socket`模块在现代网络编程中的应用及其与其他通信方式的关系。
|
14天前
|
网络协议 Linux 应用服务中间件
Socket通信之网络协议基本原理
【10月更文挑战第10天】网络协议定义了机器间通信的标准格式,确保信息准确无损地传输。主要分为两种模型:OSI七层模型与TCP/IP模型。
|
2月前
|
网络协议 Python
网络世界的建筑师:Python Socket编程基础与进阶,构建你的网络帝国!
在数字宇宙中,网络如同复杂脉络连接每个角落,Python Socket编程则是开启这一世界的钥匙。本文将引导你从基础概念入手,逐步掌握Socket编程,并通过实战示例构建TCP/UDP服务器与客户端。你将学会使用Python的socket模块进行网络通信,了解TCP与UDP的区别,并运用多线程与异步IO提升服务器性能。跟随本文指引,成为网络世界的建筑师,构建自己的网络帝国。
33 2
|
2月前
|
网络协议 Python
告别网络编程迷雾!Python Socket编程基础与实战,让你秒变网络达人!
在网络编程的世界里,Socket编程是连接数据与服务的关键桥梁。对于初学者,这往往是最棘手的部分。本文将用Python带你轻松入门Socket编程,从创建TCP服务器与客户端的基础搭建,到处理并发连接的实战技巧,逐步揭开网络编程的神秘面纱。通过具体的代码示例,我们将掌握Socket的基本概念与操作,让你成为网络编程的高手。无论是简单的数据传输还是复杂的并发处理,Python都能助你一臂之力。希望这篇文章成为你网络编程旅程的良好开端。
50 3
|
2月前
|
网络协议 开发者 Python
网络编程小白秒变大咖!Python Socket基础与进阶教程,轻松上手无压力!
在网络技术飞速发展的今天,掌握网络编程已成为开发者的重要技能。本文以Python为工具,带你从Socket编程基础逐步深入至进阶领域。首先介绍Socket的概念及TCP/UDP协议,接着演示如何用Python创建、绑定、监听Socket,实现数据收发;最后通过构建简单的聊天服务器,巩固所学知识。让初学者也能迅速上手,成为网络编程高手。
66 1
|
27天前
|
网络协议 测试技术 网络安全
Python编程-Socket网络编程
Python编程-Socket网络编程
19 0