1.Netty 介绍和应用场景
1.1 介绍
- Netty 是jboss的一个开源框架
- Netty是一个异步的,基于事件驱动的网络应用框架
- 基于nio
1.2 应用场景
- Rpc 例如dubbo
- 游戏
- 大数据
涉及到网络通信的应用都可以使用netty
2. i/o模型
2.1 介绍
- bio 同步并阻塞 一个连接对应服务器一个线程 适用于连接数较少的架构 jdk1.4
- nio 同步非阻塞 服务器一个线程处理多个连接 适用于连接数较多连接时间短 jdk1.4
- aio(nio.2) 异步非阻塞 适用于连接数多且连接时间长 jdk1.7
2.2 bio
blocking i/o
2.2.1 简单demo
开发一个服务端,创建一个线程池,当客户端发送一个请求,服务端对应创建一个线程处理,当有多个客户端请求时,就会创建多个线程对应处理
这里demo的客户端用telnet模拟
public static void handler(Socket socket){
try(InputStream in = socket.getInputStream();){
System.out.println("线程信息: id "+Thread.currentThread().getId()+" name " + Thread.currentThread().getName());
byte[] bytes = new byte[1024];
while (true){
int read = in.read(bytes);
if(read!=-1){
System.out.println("输出信息: "+new String(bytes,"UTF-8"));
}else {
break;
}
}
}catch (IOException e){
e.printStackTrace();
}
}
public static void main(String[] args) {
try(ServerSocket serverSocket = new ServerSocket(6666);) {
ExecutorService executorService = Executors.newCachedThreadPool();
System.out.println("线程信息: id "+Thread.currentThread().getId()+" name " + Thread.currentThread().getName());
while (true){
System.out.println("等待链接");
final Socket socket = serverSocket.accept();
System.out.println("链接到一个客户端");
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("线程信息: id "+Thread.currentThread().getId()+" name " + Thread.currentThread().getName());
handler(socket);
}
});
}
} catch (IOException e) {
e.printStackTrace();
}
}
2.3 nio
non-blocking i/o 非阻塞
2.3.1 简介
三大核心
- channel 通道
- buffer 缓冲区
- selector 选择器
简述操作原理:selector 选择可用的channel, channel 与 buffer可以相互读写,应用程序并不直接对channel进行操作,而是通过对buffer进行操作,间接操作channel
一个线程中会有多个selector,一个selector中可以注册多个channel,如果并没有数据传输,线程还可以做其他事,并不会一直等待
2.4 nio与bio 的区别
- nio非阻塞 bio阻塞
- nio用块的方式处理io bio用流的方式处理io 块的方式比流的方式要快
- bio基于字节流/字符流 nio基于缓冲区和通道(channel) selector监听多个通道的事件,因此用一个线程就可以处理多个通道的数据
图示: nio
bio
3. nio详解
3.1 nio模型三大组件的关系
- 一个线程对应一个selector
- 一个selecor对应多个channel
- 一个channel对应一个buffer
- 一个线程对应多个channel
- channel与buffer都是双向的,就是既可以读也可以写 使用flip()方法切换
- buffer就是一个内存块,读写内存比较快
- selector会根据不同事件切换不同的channel
3.2 Buffer缓冲区
3.2.1 简介
本质是一个读写数据的内存块,可以理解成一个提供了操作内存块方法的容器对象(数组)
缓冲区中内置了一些机制,这些机制可以检测到缓冲区的数据变化,状态变化
channel读写的数据必须都经过Buffer
3.2.2 源码分析
常用的几个操作方法
public static void main(String[] args) {
//allocate 规定intbuffer的长度
IntBuffer buffer = IntBuffer.allocate(5);
//capacity()获取容量
//put()写入
for(int i = 0;i<buffer.capacity();i++){
buffer.put(i*2);
}
//flip()反转 由写转为读
buffer.flip();
//读取
//get()每次读取后 索引向后移动一位
for(int i = 0;i<buffer.capacity();i++){
System.out.println(buffer.get());
}
}
3.2.2.1 定义
IntBuffer中定义了一个int数组,其他类型的buffer类似
public abstract class IntBuffer
extends Buffer
implements Comparable<IntBuffer>
{
// These fields are declared here rather than in Heap-X-Buffer in order to
// reduce the number of virtual method invocations needed to access these
// values, which is especially costly when coding small buffers.
//
final int[] hb; // Non-null only for heap buffers
final int offset;
boolean isReadOnly; // Valid only for heap buffers
最顶层的Buffer类中定义了四个属性
public abstract class Buffer {
/**
* The characteristics of Spliterators that traverse and split elements
* maintained in Buffers.
*/
static final int SPLITERATOR_CHARACTERISTICS =
Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED;
// Invariants: mark <= position <= limit <= capacity
private int mark = -1; //标记
private int position = 0; //当前索引的位置,不能超过limit
private int limit;//最大能读写的长度
private int capacity;//容量 allocate定义的长度
3.2.2.2 反转
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
可以看到,反转之后,由读变为写,或者由写变为读
将索引归0,最大读写长度不能超过上次操作的索引
3.2 channel 通道
3.2.1 简介
- 通道类似于流/连接,但是流只能写入或者读取,通道可以即读取也写入
- 通道异步读写数据
- 通道可以读写数据到缓存区
3.2.2 层级关系
当有客户端发送请求时,服务端会创建一个ServerSocketChannel(实现类:ServerSocketChannelImpl) 再由ServerSocketChannel创建一个SocketChannel(实现类:SocketChannelImpl即真正读写数据的通道),这个SocketChannel就是与这个客户端请求所对应的
3.2.3 案例剖析
3.2.3.1 FileChannle 输出文件流
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* @author: zhangyao
* @create:2020-08-25 14:50
**/
public class FileChannelTest {
public static void main(String[] args) {
FileOutputStream fileOutputStream = null;
try {
//文件输出流
fileOutputStream = new FileOutputStream("D:\\file01.txt");
//文件输出流包装为FileChannel 此处FileChannel默认实现FileChannelImpl
FileChannel fileChannel = fileOutputStream.getChannel();
//创建对应的缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//数据写入缓冲区
byteBuffer.put("hello nio".getBytes());
//反转,因为接下来需要从缓冲区读取数据写入Channel
byteBuffer.flip();
//从缓冲区写入Channel
fileChannel.write(byteBuffer);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
//关闭文件流
if(fileOutputStream!=null){
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
整体流程就是把数据写入缓冲区,在读取缓存区写入通道Channel,在由文件输出流输出
图示如下
3.2.3.2 FileChanle 输入文件流
public static void main(String[] args) {
FileInputStream fileInputStream = null;
try {
fileInputStream = new FileInputStream("D:\\file01.txt");
//获取Channel
FileChannel channel = fileInputStream.getChannel();
//创建byteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//从channel中读取数据写入buffer
channel.read(byteBuffer);
//反转 下一步需要从buffer中读取数据输出
byteBuffer.flip();
//输出
byte[] array = byteBuffer.array();
System.out.println(new String(array));
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
与上面的例子刚好相反,从文件中读取数据,通过通道写入buffer缓冲区,在输出
图示
3.2.3.3 FileChannel 拷贝文件
其实就是上面两个例子结合,把一个文件中的数据复制到另外一个文件中
public static void main(String[] args) {
FileInputStream fileInputStream = null;
FileOutputStream fileOutputStream = null;
try {
fileInputStream = new FileInputStream("D:\\file01.txt");
fileOutputStream = new FileOutputStream("D:\\file02.txt");
FileChannel channel = fileInputStream.getChannel();
FileChannel channel1 = fileOutputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (true){
//将byteBuffer复位
byteBuffer.clear();
int read = channel.read(byteBuffer);
if(read==-1){
break;
}
byteBuffer.flip();
channel1.write(byteBuffer);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
fileInputStream.close();
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
这里使用了byteBuffer.clear()方法
因为ByteBuffer缓冲区是有长度的,当读取的文件超过缓冲区的长度时,如果不对缓冲区进行清空,当进行下一次读取时,就会从上一次读取的位置开始读取,会出现死循环的情况
3.2.3.4 FileChannel 拷贝文件之TransferFrom
public static void main(String[] args) {
FileInputStream fileInputStream = null;
FileOutputStream fileOutputStream = null;
try {
fileInputStream = new FileInputStream("D:\\file01.txt");
fileOutputStream = new FileOutputStream("D:\\file02.txt");
FileChannel channel = fileInputStream.getChannel();
FileChannel channel1 = fileOutputStream.getChannel();
//从channel通道拷贝到 channel1通道
channel1.transferFrom(channel, 0, channel.size());
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
fileInputStream.close();
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.2.4 Buffer的分散和聚集
上面的例子都是使用单个buffer进行数据的读写,如果数据过大,也可用使用多个buffer(buffer数组)进行数据的读写,即用空间换时间
3.3 Selector选择器
3.3.1 基本简介
一个selector管理多个channel通道,使用异步的方式处理io
只有读写真正的发生时,才会处理数据,减小了线程的压力,不用每个请求都维护一个线程
避免了多线程之间的上下文切换导致的开销
3.3.2 selector的api
Selector:
- select() 阻塞
- select(Long timeout) 有超时时间
- selectNow() 非阻塞
- wakeup() 立即唤醒selector
3.3.2 selecor的工作流程
其实是Selector SelectionKey ServerSocketChannel SorkcetChannel的工作原理
- 当客户端链接时,通过ServerSockertChannel 得到SocketChannel 并且注册到 Selector上
- 注册源码
public abstract SelectionKey register(Selector sel, int ops)
throws ClosedChannelException;
这是SocketChannel注册到Selector上的方法,第一个参数为要注册的Selector对象,第二个参数为事件驱动的类型
public abstract class SelectionKey {
public static final int OP_READ = 1;
public static final int OP_WRITE = 4;
public static final int OP_CONNECT = 8;
public static final int OP_ACCEPT = 16;
private volatile Object attachment = null;
- 当注册完成后返回一个Selectionkey,这个selectionKey会和SocketChannel关联
- Selector通过select方法监听Channel,如果有事件发生,返回对应的selectionKey集合
- 源码
public int select(long var1) throws IOException {
if (var1 < 0L) {
throw new IllegalArgumentException("Negative timeout");
} else {
return this.lockAndDoSelect(var1 == 0L ? -1L : var1);
}
}
public int select() throws IOException {
return this.select(0L);
}
public int selectNow() throws IOException {
return this.lockAndDoSelect(0L);
}
private int lockAndDoSelect(long var1) throws IOException {
synchronized(this) {
if (!this.isOpen()) {
throw new ClosedSelectorException();
} else {
int var10000;
synchronized(this.publicKeys) {
synchronized(this.publicSelectedKeys) {
var10000 = this.doSelect(var1);
}
}
return var10000;
}
}
}
- 通过得到的selectionKey可以反向获取Channel
- 源码
public abstract SelectableChannel channel();
- 最后通过channel处理业务
3.3.3 案例
服务端思路:
- 创建serverSocketChannel绑定端口6666,把这个channel注册到Selector上,注册事件是OP_ACCEPT
- 循环监听,判断是否channel中是否有事件发生,如果有事件发生,判断不同的事件类型进行不同的链接,读/写操作
客户端思路
- 创建一个SocketChannel,连接上服务器之后,发送消息,并保持链接不关闭
3.3.3.1 server端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/**
* @author: zhangyao
* @create:2020-08-26 16:55
**/
public class ServerChannel {
public static void main(String[] args) {
try {
//生成一个ServerScoketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//设置为非阻塞的
serverSocketChannel.configureBlocking(false);
//serverSocket监听6666端口
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//创建Selector
Selector selector = Selector.open();
//serverSocketChannel注册到Selector
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//循环等待链接
while (true){
//如果没有事件发生,就继续循环
if(selector.select(1000) == 0){
System.out.println("等待1s,无连接");
continue;
}
//如果有事件驱动,就需要遍历事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
//如果事件是连接
if(key.isAcceptable()){
try {
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
SelectionKey register = channel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
System.out.println("链接成功");
} catch (IOException e) {
e.printStackTrace();
}
}
//如果是读取数据
if(key.isReadable()){
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
try {
int read = channel.read(byteBuffer);
byte[] array = byteBuffer.array();
System.out.println("读取数据:"+ new String(byteBuffer.array()));
} catch (IOException e) {
e.printStackTrace();
}
}
iterator.remove();
};
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.3.3.2 客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* @author: zhangyao
* @create:2020-08-26 17:24
**/
public class ClientChannel {
public static void main(String[] args) {
//创建一个SocketChannel
try {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
if(!socketChannel.connect(inetSocketAddress)){
while (!socketChannel.finishConnect()){
System.out.println("服务器连接中,线程并不阻塞,可以进行其他操作");
}
}
//连接成功
socketChannel.write(ByteBuffer.wrap("hello ,server".getBytes()));
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
4.Netty
4.1 简介
netty是对java nio api的封装,简化了nio程序的开发,jdk要求最低1.6
流行的网络编程通信框架,Dubbo Elasticsearch 等框架底层的网络通信框架都是 Netty
架构模型
版本
netty 共有 3.x 4.x 5.x三个大版本
3.x较老,5.x有重大bug,被官网废弃 现在主要使用4.x
4.2 线程模型
有以下几种线程模型
4.2.1 传统I/O阻塞模型
每一个链接都需要一个对应的线程进行处理,并且当链接建立后,如果当前链接没有数据传输时,此线程会被阻塞在read()方法
4.2.2 Reactor模式
原理图示如上
主要是针对了传统I/O模型一个连接会阻塞一个线程的问题进行了改进,当连接建立后都通过ServiceHandler调用线程池中的线程进行处理,这样就只用阻塞一个ServiceHandler线程,达到多路复用的目的
Reactor模式有三种实现方式
4.2.2.1单Reactor单线程
使用一个线程通过多路复用完成所有操作,包括读写连接
redis使用的就是这种模型 单线程
4.2.2.2单Reactor多线程
相对于单Reactor单线程,主线程不在进行业务处理,当有请求过来之后,具体的业务处理交与线程池中的线程处理,线程处理完成后再通过handler返回给Client
4.2.2.3 主从Reactor多线程
相比于单Reacotr,主从Reactor将Reactor分为MainReactor和SubReactor
MainReactor中负责分发和连接
SubReactor中负责读写
一个MainReactor可以对应多个SubReactor
4.2.3 Netty模型
简述Netty模型
- 角色
- BossGroup BossGroup的类型是NioEventLoopGroup,其中包含了很多NioEventLoop
- NioEventLoop nio事件循环,每个NioEventLoop中都有一个Selctor和一个任务队列
- WorkerGroup 类型是NioEventLoopGroup,与BossGroup类似,只不过功能不同,BossGroup只负责与客户端建立连接, WorkerGroup需要读写,处理业务
- PipeLine 管道,对Channel进行的封装,具体的业务处理是通过Pipline对Channel进行处理
- 具体流程
- 当客户端发送请求时,首先进入BossGroup,有NioEventLoop对请求进行事件轮询,如果是连接事件就进行处理
- 处理的步骤分为三步
- 轮询
- 注册 这里的注册指的是将生成的SocketChannel注册到workerGroup中的某个NioEventLoop中的Selector上
- 执行任务列表
- 当请求的事件是读写时,就有workerGroup对请求进行具体的业务处理
- 处理的步骤BossGroup类似
- 总结
由此可以看出,Netty的模型与主从Reactor模型类似,都是由一个主Reactor负责连接事件,由一个从Reactor负责读写事件
4.2.4 案例demo
4.2.4.1 服务端
4.2.4.1.1 NettyServer
package netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author: zhangyao
* @create:2020-09-03 08:55
**/
public class NettyServer {
public static void main(String[] args) {
//创建BossGroup 和 WorkerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
ChannelFuture channelFuture = null;
try {
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("服务器就绪.....");
//绑定端口
channelFuture = serverBootstrap.bind(6668).sync();
}catch (Exception e){
e.printStackTrace();
}finally {
try {
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
4.2.4.1.2 NettyServerHandler
package netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author: zhangyao
* @create:2020-09-03 09:12
**/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息:"+ buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:"+ ctx.channel().remoteAddress());
}
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端",CharsetUtil.UTF_8));
}
//处理异常 关闭ctx
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
4.2.4.2 客户端
4.2.4.2.1 NettyClient
package netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author: zhangyao
* @create:2020-09-03 09:52
**/
public class NettyClient {
public static void main(String[] args) {
EventLoopGroup executors = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(executors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("客户端就绪........");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//关闭通道
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
try {
executors.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
4.2.4.2.2 NettyClientHandler
package netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author: zhangyao
* @create:2020-09-03 10:00
**/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//就绪时触发
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ctx: "+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,服务端", CharsetUtil.UTF_8));
}
//读取信息
//这里读取的是服务器返回的信息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务端发送消息: "+ byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("服务端地址: "+ ctx.channel().remoteAddress());
}
//异常处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
一个简单的TCP服务通信,客户端发送消息,服务端接收消息并返回消息给客户端
4.2.4.3 案例demo源码分析
4.2.4.3.1 NioEventGroup
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor)null);
}
可以看到,如果使用无参的NioEventGroup,默认传递的是0,也可以指定线程数
一层一层找下去:
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); //NettyRuntime.availableProcessors()获取当前计算机的核数(逻辑处理器)
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
发现最后找到NioEventGroup父类的方法
如果指定了NioEventGroup的线程数,且不为0的时候,就使用指定的线程数
否则,**就使用当前计算机的核数2作为线程数
debug 看结果
电脑是12核,默认是24个线程
指定一个线程
就只有一个线程数
4.2.5 异步模型
上文中的案例Demo中的 ChannelFuture
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
异步模型Future是相对与同步来说的
异步指的是当一个异步调用发出后,不会立刻得到结果,而是通过回调,状态来通知调用者调用的结果
Netty 中的 connect 和 bind() sync方法就是返回了一个异步的结果,之后再通过监听获取到结果
也就是 Future-Listener机制
当Future对象刚创建的时候,处于未完成的状态,可以通过返回的ChannelFuture查看操作执行的状态,也可以注册监听函数来执行完成后的操作
isSucess()是否成功
isDone()是否完成
isCancelable() 是否取消
cause() 失败原因
addListener 增加监听器
//绑定端口
channelFuture = serverBootstrap.bind(6668).sync();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()){
System.out.println("监听端口成功");
}else {
System.out.println("监听端口失败");
}
}
});
4.2.6 Netty Http服务
做一个简单的demo 浏览器(客户端)访问服务器端7001端口,返回一个字符串信息
浏览器访问是http请求,服务器也需要相应一个httpResponse
4.2.6.1 服务端
NettyHttpServer 启动类
package netty.http;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author: zhangyao
* @create:2020-09-04 11:16
**/
public class NettyHttpServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new NettyHttpInitialize());
ChannelFuture channelFuture = serverBootstrap.bind(7001).sync();
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
NettyHttpInitialize 处理器类
对之前的ChannelInitialize(SocketChannel)进行封装
package netty.http;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
/**
* @author: zhangyao
* @create:2020-09-04 11:21
**/
public class NettyHttpInitialize extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//得到管道
ChannelPipeline pipeline = sc.pipeline();
//管道中加入处理器 主要是处理Http请求的,解析请求头之类的
pipeline.addLast("myDecoder",new HttpServerCodec());
//加入处理器
pipeline.addLast("myServerHandler",new NettyServerHandler());
}
}
NettyServerHandler 具体的处理(返回http响应)
package netty.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import org.springframework.http.HttpStatus;
/**
* @author: zhangyao
* @create:2020-09-04 14:01
**/
public class NettyServerHandler extends SimpleChannelInboundHandler<HttpObject> {
//读信息
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
System.out.println(httpObject);
System.out.println("客户端地址+"+ channelHandlerContext.channel().remoteAddress());
ByteBuf byteBuf = Unpooled.copiedBuffer("hello , im 服务端", CharsetUtil.UTF_8);
//返回客户端信息
FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK,byteBuf);
fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=UTF-8");
fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,byteBuf.readableBytes());
channelHandlerContext.writeAndFlush(fullHttpResponse);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
出现的问题记录:
- 第一次绑定6668端口,浏览器访问失败,换成7001就可以了
原因: 谷歌浏览器禁用了6665-6669以及其他一些不安全的端口 - 访问后第一次请求 出现异常,可以正常返回数据
原因: NettyServerHandler中没有对异常处理方法进行重写
加上这部分就可以了,报错信息也报的很明显
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
4.2.6.2 Http服务的过滤
可以对一些不希望处理的请求进行过滤,其实就是在对http请求的处理过程中判断一下请求的uri
在上文中 NettyServerHandler类中 加入以下代码,即可拦截/favicon.ico请求
HttpRequest re = (HttpRequest) httpObject;
String uri = re.uri();
if(uri.equals("/favicon.ico")){
System.out.println("不想处理,返回");
return;
}
4.3 Netty API梳理
基于上述的各种demo,对Netty常用的类和方法进行系统梳理
4.3.1 Bootstrap
- ServerBootstrap 服务端启动引导类
BootStrap 客户端启动引导类
- .group() 给BootStrap设置NioEventLoopGroup,可以设置多个
- .channel() 设置服务使用的通道类
- .option() 设置通道参数
- .handler() 对BossGroup进行设置
- .childrenHandler() 对workerGroup 进行设置
- .bind() 服务端绑定一个端口号,监听端口
- connect() 客户端用于连接服务端
4.3.2 Future
Netty中的io操作都是异步的,也就是不能立刻返回结果,而是当完成了之后再通知调用者
- Future
- ChannelFutrue
方法
- channel() 返回当前正在进行IO操作的通道
- sync() 转为同步,等待异步操作完毕
4.3.3 Channel
不同协议,不同阻塞类型都有对应的Channel
- NioSocketChannel 异步tcp协议Socket连接
- NioServerSocketChannel 异步tcp协议服务端连接
- NioDatagramChannel 异步udp连接
- NioSctpChannel 异步sctp客户端连接
- NioSctpServerChannel 异步sctp服务端连接
4.3.4 Selector
Netty 基于Nio Selector对象实现多路复用,一个selector管理多个channel
4.3.5 ChannelHandler
主要是用于对数据的处理,其中有很多封装好的方法,使用的时候继承其子类即可
实现类
子类很多,常用的几个
channelHandler
- ChannelInboundHandler
- ChannelOutboundHandler
- 适配器
- channelInboundHandlerAdapter
- channelOutboundHandlerAdapter
4.3.6 pipeline
结构图如上
channel中可以创建出一个ChannelPipeline, ChannelPipeline中有维护了一个由ChannelHandlerContext组成的双向链表
每个ChannelHandlerContext又对应了一个Channelhandler
常用方法:
addFirst(); 添加一个Handler到链表中的第一个位置
addLast(); 添加到链表的最后一个位置
4.3.7 channelHandlerContext
每一个channelhandlerContext包含了一个channelHandler(业务处理)
channelHandlerContext中还可以获取到对应的channel和pipeline信息
channelHandlerContext.channel();
channelHandlerCOntext.pipeline();
4.3.8 EventLoopGroup
netty一般提供两个EventLoopGroup BossEventLoopGroup 和 workerEventLoopGroup
EventLoopGroup可以指定使用的核心是多少个
4.3.9 Unplooed
Netty提供的用来操作缓冲区数据的工具类
常用方法:
copiedBuffer(); 返回的是Netty提供的 Bytebuf对象
4.3.10 ByteBuf
Netty的数据容器(缓冲区)
可以直接进行读/写 读写之间不需要进行flip(),原因是ByteBuf内部维护了两个索引 readIndex writeIndex
常用方法
getByte()
readByte()
writeByte()
capacity()
4.4 Netty 心跳检测机制
当客户端长时间没有读/写操作时,服务端需要检测客户端是否还处于连接状态,也就是心跳检测
Netty提供了心跳检测的处理类 IdleStateHandler
示例代码
package netty.hearbeat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/**
* @author: zhangyao
* @create:2020-09-10 10:04
**/
public class MyServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));
pipeline.addLast("inleHandler",new MyHearBeatHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
package netty.hearbeat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
/**
* @author: zhangyao
* @create:2020-09-10 16:50
**/
public class MyHearBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
if(o instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent) o;
IdleState state = event.state();
switch (state){
case READER_IDLE:
System.out.println("读空闲");
break;
case WRITER_IDLE:
System.out.println("写空闲");
break;
case ALL_IDLE:
System.out.println("读写空闲");
break;
}
}
}
}
客户端没有区别
4.5 Netty 之 webSocket
使用netty编写webSocket长连接
4.5.1 服务端
package netty.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* @author: zhangyao
* @create:2020-09-11 14:43
**/
public class MyServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
//添加http解码器
pipeline.addLast(new HttpServerCodec());
//添加块传输处理器
pipeline.addLast(new ChunkedWriteHandler());
//http分段传输,增加一个聚合处理
pipeline.addLast(new HttpObjectAggregator(8192));
//增加websocket协议处理
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
//增加自定义处理业务处理器
pipeline.addLast(new MyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
4.5.2 服务端自定义的处理器
package netty.websocket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Date;
/**
* @author: zhangyao
* @create:2020-09-11 14:49
**/
public class MyServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
//当客户端关闭连接
System.out.println("客户端关闭连接..."+ ctx.channel().id());
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//当客户端连接到服务端
System.out.println("有客户端连接到服务端 id为" + ctx.channel().id());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//异常关闭连接
ctx.close();
}
//收到消息
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
//读取消息并返回相同的消息返回给客户端
String text = textWebSocketFrame.text();
System.out.println("服务端收到消息" + text);
//返回给客户端
Channel channel = channelHandlerContext.channel();
channel.writeAndFlush(new TextWebSocketFrame(LocalDateTime.now()+" 服务端返回消息:" + text));
}
}
4.5.3 页面(webSocket客户端)
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>测试netty+webSocket</title>
</head>
<body>
<form onsubmit="return false">
<textarea id="sendMessage" style="height: 300px;width: 300px" placeholder="请输入要发送的消息"></textarea>
<button onclick="send(document.getElementById('sendMessage').value)">发送消息</button>
<textarea id="responseMessage" style="height: 300px;width: 300px" ></textarea>
<button onclick="document.getElementById('responseMessage').value=''">清空消息</button>
</form>
</body>
<script type="application/javascript">
var websocket;
if(!window.WebSocket){
alert("浏览器不支持webSocket")
}else {
//进行webSocket的开启 关闭
websocket = new WebSocket("ws://localhost:7000/hello");
//webSocket 开启事件
//给消息返回框加入一条数据
websocket.onopen = function (ev) {
document.getElementById("responseMessage").value = '连接到服务端';
}
websocket.onclose = function (ev) {
document.getElementById("responseMessage").value += '\n 连接关闭';
}
//当服务端响应消息时触发 将服务端返回的消息回显致文本框
websocket.onmessage = function (ev) {
document.getElementById("responseMessage").value += '\n ' ;
document.getElementById("responseMessage").value += ev.data ;
}
}
//发送消息
function send (message) {
if(!window.websocket){
alert("socket还未初始化完成");
return;
}
if(websocket.readyState == WebSocket.OPEN){
websocket.send(message);
document.getElementById('sendMessage').value=''
}
}
</script>
</html>
4.6 Netty 编码解码
网络传输过程中的编码解码过程
codec 编解码器 包含 encoder编码器 和 decoder解码器
netty中提供了一些StringCodec ObjectCodec的编解码器,但是这些编解码器还是依赖java底层的序列化技术,java底层的序列化是比较低效的,所以需要引入新的高效的序列化技术
4.6.1 ProtoBuf
4.6.2 自定义编码解码器
package netty.inboundAndOutbound;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import netty.inboundAndOutbound.client.MyClientMessageToByteHandler;
/**
* @author: zhangyao
* @create:2020-09-18 14:53
**/
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//添加入栈解码器
pipeline.addLast(new ByteToMessageHandler());
//添加出栈编码器
pipeline.addLast(new MyClientMessageToByteHandler());
//添加自定义处理器
pipeline.addLast(new MyServerHandler());
}
}
package netty.inboundAndOutbound;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* @author: zhangyao
* @create:2020-09-18 14:54
**/
public class ByteToMessageHandler extends ByteToMessageDecoder {
//自定义实现的入栈解码器
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if(byteBuf.readableBytes()>=8){
list.add(byteBuf.readLong());
}
}
}
package netty.inboundAndOutbound.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* @author: zhangyao
* @create:2020-09-18 16:16
**/
public class MyClientByteToLong extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if(byteBuf.readableBytes()>=8){
list.add(byteBuf.readLong());
}
}
}
4.6.3 handler处理机制及出栈入栈
通过上面的编码解码进而延申到handler的处理机制
简单的netty出栈入栈的解释图
出栈和入栈是相对于而言的,当客户端发送消息到达服务端,对于客户端来说就是出栈,对于服务端来说就是入栈,反之亦然
4.7 Tcp沾包 拆包
4.7.1 沾包拆包介绍
tcp服务再发送消息的时候,如果发送的多个包.数据量小且包的数量比较多,Tcp就会通过算法将多个包合并为一个大的数据包发送给接受端,这样产生的问题就是接收端无法识别出完整的包,由此产生的问题就是沾包拆包
如上图所示,Client端向Server端发送D1 D2两个数据包
Server端读取的时候,可能会产生四种情况
1.分两次读取 分别读取到了D1 D2两个数据包,不存在沾包拆包现象
2.一次读取,读到了D1D2两个数据包合在一起的包,出现沾包现象
3.分两次读取,第一次读取到了D1和D2的一部分数据,第二次读取到了D2的剩余部分数据,出现了拆包现象
4.分两次读取,第一次读取到了D1的一部分数据,第二次读取到 了D1的剩余部分数据和D2的所有数据,出现拆包现象
4.7.2 解决方案
思路:控制接收端读取内容的长度来解决问题
方案: 通过自定义解析+编解码器来解决拆包沾包问题