4.1 之前,池化功能还不成熟,默认是非池化实现
4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现;windows平台默认是开辟的池化管理
测试一下:版本是4.1.3,也就是4.1以后,默认是开启池化的 /** * 查看ByteBuf是否池化、采用的是直接内存或堆内存 */ public static void seeByteBufClassDemo(){ //buffer():默认是直接内存 System.out.println(ByteBufAllocator.DEFAULT.buffer().getClass()); //directBuffer():直接内存 System.out.println(ByteBufAllocator.DEFAULT.directBuffer().getClass()); //heapBuffer():堆内存 System.out.println(ByteBufAllocator.DEFAULT.heapBuffer().getClass()); }
若是在4.1之后想使用非池化需要指定系统环境变量才程序运行时:
//-Dio.netty.allocator.type={unpooled|pooled} 设置非池化 -Dio.netty.allocator.type=unpooled
若是不想要通过配置参数,也可以调用指定的类Unpooled来生成非池化的字节缓冲区:
//class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf System.out.println(Unpooled.buffer().getClass());
3.6.4、ByteBuf组成
ByteBuf是netty基于nio中的ByteBuffer的封装改进。
特点:
1、读写指针最开始都在 0 位置(图中两个颜色指针)。
2、规定了容量与最大容量:为了将来在容量不够时才去申请更多的内存,实现按需所取。
3、包含两个指针(读写指针):当进行写入数据的时候写指针向后移动,此时读指针与写指针这部分数据表示是可读部分。若是读取数据,读指针也会向后移动。那么也就是说写指针与读指针之间是未读取的数据。已经读过的部分则是废弃部分。
4、对于ByteBuf由四个部分组成:废弃部分(已读)、可读部分(未读)、可写字节(未写)、可扩容部分(等待容量满进行分配)
与ByteBuffer比较:相对于bytebuf只有一个指针,若是想要进行读需要切换到读模式,想要写要切换到写模式。用起来不方便。
两个方便进行了改进:①读和写使用了两个指针。②可以动态扩容。
3.6.5、写入
常用的方法:
方法签名 |
含义 |
备注 |
writeBoolean(boolean value) |
写入 boolean 值 |
用一字节 01|00 代表 true|false |
writeByte(int value) |
写入 byte 值 |
|
writeShort(int value) |
写入 short 值 |
|
writeInt(int value) |
写入 int 值 |
Big Endian,即 0x250,写入后 00 00 02 50 |
writeIntLE(int value) |
写入 int 值 |
Little Endian,即 0x250,写入后 50 02 00 00 |
writeLong(long value) |
写入 long 值 |
|
writeChar(int value) |
写入 char 值 |
|
writeFloat(float value) |
写入 float 值 |
|
writeDouble(double value) |
写入 double 值 |
writeBytes(ByteBuf src) |
写入 netty 的 ByteBuf |
|
writeBytes(byte[] src) |
写入 byte[] |
|
writeBytes(ByteBuffer src) |
写入 nio 的 ByteBuffer |
|
int writeCharSequence(CharSequence sequence, Charset charset) |
写入字符串 |
带有LE的就是大端写入,不带的则是小端写入。网络编程中的两个名词,代表的是先写高位字节,还是先写低位字节;一般采用大端写入!
大端写入:低位靠后,先写高位的0。
小端写入:低位先写,与大端相反。
对于ByteBuf提供了写入ByteBuf以及stringbuilder、stringbuffer、string的API。
注意点:①这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用。②网络传输,默认习惯是 Big Endian。
demo
案例目的:测试是否能够正常写入字符串、字节等。
/** * 03、测试ByteBuf的写入与扩容 */ public static void writeToByteBufDemo(){ final ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20); buffer.writeBytes("c".getBytes());//写入字节 final StringBuilder builder = new StringBuilder("hang"); buffer.writeCharSequence(builder, Charset.defaultCharset());//写入stringbuilder buffer.writeCharSequence("lu", Charset.defaultCharset());//写入字符串 log(buffer); //测试扩容 buffer.writeCharSequence(",helloworld", Charset.defaultCharset()); log(buffer); }
3.6.5、扩容
默认若是不指定的话则最大容量是整数的最大值。
扩容规则是
如何写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16
如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 210=1024(29=512 已经不够了)
扩容不能超过 max capacity 会报错
3.6.7、读取
案例目的:读取字节以及标记重复读取
/** * 04、测试ByteBuf的读取:包含重复读取某个字节 */ public static void readByteBufDemo(){ final ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20); buffer.writeBytes("123456789".getBytes());//写入字节 System.out.println(buffer.readByte());//读取一个字节 System.out.println(buffer.readByte()); System.out.println(buffer.readByte()); System.out.println(buffer.readByte()); buffer.markReaderIndex();//可标记读索引以及写索引 buffer.readBytes(4); buffer.resetReaderIndex();//重置读索引 log.debug("读取读索引的字节"); System.out.println(buffer.readByte()); }
1、读取内容使用read开头的API,这类API会移动读指针。
2、若是使用get开头API,不会移动读指针。
3、若是想要回读或重读可以设置mark标记,同样也可以设置读或写标记!
3.6.8、retain & release (释放ByteBuf)
3.6.8.1、释放分析
由于 Netty 中有堆外内存(指的是直接内存)的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。
UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可。
UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存。
PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存。
扩展:可达性分析是通过一系列的GC ROOTS对象来连接有用的对象,走过的路径会形成一条链,当有对象到GC ROOTS没有一条引用链的时候就要被回收了。
核心:在实际业务场景中,入站、出站操作中都会使用到ByteBuf,针对于池化的Bytebuf则会将用完之后的ByteBuf还回内存池,来达到内存重用!在入站、出站过程中经历多个handler,其中head、tail handler是netty默认定义好的,两者都能够进行收尾工作(指的是若是最终传递得到的Object msg的对象ByteBuf就会进行自动回收,若是其他类型则不处理)。:
误解:不要觉得头和尾都可以释放我们中途就可以不管bytebuf的释放了,因为其释放时机需要把bytebuf对象一直传到头或尾handler才会释放。若是在中途已经将bytebuf转换成字符串了接着进行下面的传递,此时到tail拿到的仅仅是那个字符串了就不是bytebuf了,既然如此就不会做释放处理。
最合适的释放时机:谁最后拿到bytebuf(传递已对bytebuf进行解析并将解析后的内容向后传递的handler)就要对ByteBuf进行释放。若是从头置尾handler直接都是传递的ByteBuf中间也可以不手动释放,最后也会给我们进行释放,不过最好就是哪里用完了ByteBuf(解析完)就进行释放!
3.6.8.2、源码分析(head、tail)
基本规则是,谁是最后使用者,谁负责 release,详细分析如下
起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
入站 ByteBuf 处理原则
对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
出站 ByteBuf 处理原则
出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
异常处理原则
有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true
tail handler:入站最后执行的处理器
//可以看到实现了ChannelInboundHandler接口 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { //关注其中的read方法 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { DefaultChannelPipeline.this.onUnhandledInboundMessage(msg); } } protected void onUnhandledInboundMessage(Object msg) { try { logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg); } finally { //使用了一个工具类来进行尝试释放 ReferenceCountUtil.release(msg); } } public static boolean release(Object msg) { //可以看到会使用instanceOf来判断是否是ByteBuf,因为ByteBuf实现了引用计数的接口,若是是的话就会进行释放 //public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> { return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false; }
head handler:出站的最后一个handler执行器
//可以注意到其实现了ChannelOutboundHandler、ChannelInboundHandler,则表示又是入站执行器,也是出站执行器。 final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { //对于出站就要关注其write方法 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { this.unsafe.write(msg, promise); } } //AbstractChannel.class public final void write(Object msg, ChannelPromise promise) { this.assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; //这里做了一次是否为出栈BUffer判定。若是的话则表示当前方法是在出站时进行调用的。 if (outboundBuffer == null) { this.safeSetFailure(promise, AbstractChannel.WRITE_CLOSED_CHANNEL_EXCEPTION); //可以看到这里也进行了释放操作,内部源码实际上就是对msg类型进行判断,若是ByteBuf就释放。 ReferenceCountUtil.release(msg); } else { ... } }
3.6.8、零拷贝
netty的零拷贝体现在网络数据传输、文件传输以及数据操作的优化,下面就主要介绍数据操作的零拷贝优化。
netty中的零拷贝主要也是指减少数据复制,提升性能。
通过wrap(),可将byte[]数组、ByteBuf、ByteBuffer等包装成一个Netty ByteBuf对象,避免了复制拷贝操作。
通过duplicate(),可将整个ByteBuf进行零拷贝。
通过slice(),可将ByteBuf分解为多个共享同一个存储区域的ByteBuf, 避免内存的拷贝。
通过CompositeByteBuf,可将多个ByteBuf进行合并。
3.6.8.1、slice:切割
slice是数据零拷贝的体现之一
①实际应用
案例目的:对某个Bytebuf进行数据分割放置到两个ByteBuf中。
/** * 实际应用:零拷贝获取head、body */ public static void practicalUse(){ final ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20); buffer.writeCharSequence("head,body", Charset.defaultCharset()); //若是要对某一个ByteBuf进行切割操作,第一部分要的是前5个,第二部分要的是后5个 //应用场景:对请求body、head进行切割。分割得到的两个部分实际上使用的是原先Buffer的共享内存 final ByteBuf front = buffer.slice(0, 4);//第一个参数是切割的位置,第二个参数是切割的数量 log(front); final ByteBuf end = buffer.slice(5, 4); log(end); }
②修改切割得到的某个ByteBuf位置内容也会影响源ByteBuf;切割得到的ByteBuf无法写入
/** * Slice切片得到的ByteBuf进行测试 */ public static void sliceTest(){ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20); buffer.writeBytes(new byte[]{1,2,3,4}); final ByteBuf sliceBuf = buffer.slice(0, 4); //1、修改切片得到的ByteBuf也会影响原始的ByteBuf,因为使用的是同一块内存 sliceBuf.setByte(0,6); log(buffer); //2、无法对切片进行write操作,会抛出异常IndexOutOfBoundsException sliceBuf.writeByte(10); }
③release()与retain()应用场景
release()与retain()可对使用相同内存的ByteBuf同时进行引用计数!
/** * release()与retain()使用 */ public static void sliceTest2(){ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20); buffer.writeBytes(new byte[]{1,2,3,4}); final ByteBuf sliceBuf = buffer.slice(0, 4); //这里引用计数+1,对于原ByteBuf以及切割得到的ByteBuf都有影响,因为是占用的同一块内存 sliceBuf.retain();//引用计数+1 buffer.release(); //若是直接对原ByteBuf进行清理,然后使用切片得到的ByteBuf会抛出异常IllegalReferenceCountException: refCnt: 0 //若是在release()之后也想正常使用,可以在此之前使用retain()进行引用+1,release()相对于会引用-1,此时就不会真正释放内存,自然也就能欧使用 log(sliceBuf); }
3.6.8.2、duplicate:整块
效果:好比截取了原始 ByteBuf 所有内容,并且没有 max capacity 的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的。
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import java.nio.charset.Charset; import static com.changlu.No3Netty入门.No2Netty组件.ByteBuf.ByteBufTest.log; /** * @ClassName DuplicateTest * @Author ChangLu * @Date 2022/1/7 23:32 * @Description Duplicate:整块零拷贝 */ public class DuplicateTest { public static void main(String[] args) { final ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20); buffer.writeCharSequence("changlu", Charset.defaultCharset()); final ByteBuf dupBuf = buffer.duplicate(); //对整块进行零拷贝的进行修改 dupBuf.setByte(0,1); log(buffer);//测试源ByteBuf受到影响 } }
效果:
3.6.8.3、copy:深拷贝(非零拷贝)
copy:就是对整个ByteBuf进行深拷贝,拷贝过后的能够进行写入,并且修改的位置内容不会影响源位置。
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import java.nio.charset.Charset; import static com.changlu.No3Netty入门.No2Netty组件.ByteBuf.ByteBufTest.log; /** * @ClassName CopyTest * @Author ChangLu * @Date 2022/1/7 23:37 * @Description Copy:整个ByteBuf进行深拷贝 */ public class CopyTest { public static void main(String[] args) { final ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20); buffer.writeCharSequence("changlu", Charset.defaultCharset()); //进行深拷贝 final ByteBuf copyBuf = buffer.copy(); copyBuf.setByte(0,1); copyBuf.writeByte(6); //测试源buffer log(buffer); //测试深拷贝得到buffer log(copyBuf); } }
效果:
3.6.8.4、CompositeBuffer:组装ByteBuf
CompositeByteBuf是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。
优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
缺点,复杂了很多,多次操作会带来性能的损耗
功能:可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝。注意要设置true来让其调整读,写指针。
案例:包含两个测试
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import java.nio.charset.Charset; import static com.changlu.No3Netty入门.No2Netty组件.ByteBuf.ByteBufTest.log; /** * @ClassName CompositeBufferTest * @Author ChangLu * @Date 2022/1/7 23:48 * @Description CompositeBuffer:零拷贝之一,合并ByteBuf */ public class CompositeBufferTest { public static void main(String[] args) { final ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20); buffer.writeCharSequence("changlu", Charset.defaultCharset()); final ByteBuf buffer1 = ByteBufAllocator.DEFAULT.buffer(20); buffer1.writeCharSequence("liner", Charset.defaultCharset()); //效率较低方案:直接通过writeBytes()写入字节方式写入 // log(ByteBufAllocator.DEFAULT.buffer(20).writeBytes(buffer).writeBytes(buffer1)); //零拷贝:合并两个Buffer到一个Buffer中,使用的共享内存 final CompositeByteBuf comBuf = ByteBufAllocator.DEFAULT.compositeBuffer(); //测试一:不设置true // comBuf.addComponents(buffer, buffer1);//若是不设置true,则不会自动调整读、写指针位置造成数据不会加进来 //测试二:设置true comBuf.addComponents(true, buffer, buffer1); log(comBuf); } }
效果:
测试一:
测试二:
3.6.8.5、工具类Unpooled(提供了非池化的 ByteBuf 创建、组合、复制等操作)
Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作。
这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法,可以用来包装 ByteBuf。
案例目的:测试组合方法wrappedBuffer
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import static com.changlu.No3Netty入门.No2Netty组件.ByteBuf.ByteBufTest.log; /** * @ClassName UnpooledTest * @Author ChangLu * @Date 2022/1/7 23:59 * @Description Unpooled:非池化ByteBuf进行零拷贝的工具类 */ public class UnpooledTest { public static void main(String[] args) { ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5); buf1.writeBytes(new byte[]{1, 2, 3, 4, 5}); ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5); buf2.writeBytes(new byte[]{6, 7, 8, 9, 10}); // 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2); buf3.setByte(0,6); log(buf1); } }
效果:
3.6.9、ByteBuf优势汇总
1、池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能。
2、读写指针分离,不需要像 ByteBuffer 一样切换读写模式。
3、实现自动扩容。
4、支持链式调用,使用更流畅。
5、很多地方体现零拷贝,例如 wrap、slice、duplicate、CompositeByteBuf。
案例、回显服务器(双向通信)
描述+code(netty)
前提描述
实现功能:客户端向服务器发什么,服务端就返回什么。
出现的问题:bytebuf的释放问题,下面是问题和解答(个人见解)。
服务器接收到客户端发来的数据,是否要手动释放?
若是不手动调用ctx.fireChannelRead(),就不会走到tail handler!(debug测试测出来)一般两种情况,①若是在该handler中使用完了ByteBuf,那么就直接手动释放;②若是没有进行解析之类的操作,那么可以直接传递到后面handler,也就是tail handler也会帮你进行释放操作,ctx.fireChannelRead()。
回显业务必然会创建一个ByteBuf对象,是否需要手动释放?
对于自己创建的ByteBuf,则需要进行手动释放,在这里回显业务是调用了writeAndFlush这是一个异步操作,那么添加一个监听器当写入完毕之后就进行手动释放!
code
服务器:
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; import java.nio.charset.Charset; /** * @ClassName Server * @Author ChangLu * @Date 2022/1/8 9:42 * @Description echoserver:提供回显服务的服务器,就是收到什么,然后就发送什么的程序。 */ @Slf4j public class Server { public static void main(String[] args) throws InterruptedException { new ServerBootstrap() .group(new NioEventLoopGroup(), new NioEventLoopGroup(2)) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; log.debug("收到客户端发送数据:{}", buf.toString(Charset.defaultCharset())); final ByteBuf response = ctx.alloc().buffer(); response.writeBytes(buf); //向客户端回发数据:需要手动释放 ctx.writeAndFlush(response).addListener((future)->{ //释放ByteBuf ReferenceCountUtil.release(response); }); //向后传递让Tail handler来进行释放msg super.channelRead(ctx, msg); } }); } }) .bind(8080).sync(); System.out.println("服务器启动成功!"); } }
客户端:
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringEncoder; import lombok.extern.slf4j.Slf4j; import java.nio.charset.Charset; import java.util.Scanner; /** * @ClassName Client * @Author ChangLu * @Date 2022/1/8 9:49 * @Description Client:客户端连接 */ @Slf4j public class Client { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup(); Channel channel = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder());//String=>ByteBuf ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buffer = (ByteBuf) msg; log.debug("收到服务端发送的数据:{}", buffer.toString(Charset.defaultCharset())); //同理这里也需要进行向后传递进行释放ByteBuf super.channelRead(ctx, msg); } }); } }).connect("127.0.0.1", 8080).sync().channel(); log.debug("客户端连接成功:{}", channel); channel.closeFuture().addListener(future -> { group.shutdownGracefully(); }); new Thread(() -> { Scanner scanner = new Scanner(System.in); while (true) { String line = scanner.nextLine(); if ("q".equals(line)) { channel.close(); break; } channel.writeAndFlush(line); } }).start(); } }
效果:
回显效果:
扩展:读写误解解答(含socket实现)
只有在 netty,nio 这样的多路复用 IO 模型时,读写才不会相互阻塞,才可以实现高效的双向通信,这是不正确的。
实际上,Java Socket 是全双工的:在任意时刻,线路上存在A 到 B 和 B 到 A 的双向信号传输。即使是阻塞 IO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读。
案例demo
案例目的:测试同一个Socket的读、写操作是否是双向信号通信,也就是全双工!(通过给写线程打上断点,之后看读线程是否能够正常运行)
Server: import java.io.*; import java.net.ServerSocket; import java.net.Socket; /** * @ClassName Server * @Author ChangLu * @Date 2022/1/8 10:35 * @Description 服务端:接收到连接之后,启动读写线程 */ public class Server { public static void main(String[] args) throws IOException { ServerSocket ss = new ServerSocket(8888); Socket s = ss.accept(); new Thread(() -> { try { BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream())); while (true) { System.out.println(reader.readLine()); } } catch (IOException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream())); // 例如在这个位置加入 thread 级别断点,可以发现即使不写入数据,也不妨碍前面线程读取客户端数据 for (int i = 0; i < 100; i++) { writer.write(String.valueOf(i)); writer.newLine(); writer.flush(); } } catch (IOException e) { e.printStackTrace(); } }).start(); } }
client: import java.io.*; import java.net.Socket; /** * @ClassName Client * @Author ChangLu * @Date 2022/1/8 10:35 * @Description 客户端:同样有读写线程,建立连接之后写线程向服务端发送数据,读线程监听服务端发来的数据 */ public class Client { public static void main(String[] args) throws IOException { Socket s = new Socket("localhost", 8888); new Thread(() -> { try { BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream())); while (true) { System.out.println(reader.readLine()); } } catch (IOException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream())); for (int i = 0; i < 100; i++) { writer.write(String.valueOf(i)); writer.newLine(); writer.flush(); } } catch (IOException e) { e.printStackTrace(); } }).start(); } }