1. 优化
1.1 扩展序列化算法
序列化,反序列化主要用在消息正文的转换上
序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理
目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下
// 反序列化 byte[] body = new byte[bodyLength]; byteByf.readBytes(body); ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body)); Message message = (Message) in.readObject(); message.setSequenceId(sequenceId); // 序列化 ByteArrayOutputStream out = new ByteArrayOutputStream(); new ObjectOutputStream(out).writeObject(message); byte[] bytes = out.toByteArray();
为了支持更多序列化算法,抽象一个 Serializer 接口
public interface Serializer { // 反序列化方法 <T> T deserialize(Class<T> clazz, byte[] bytes); // 序列化方法 <T> byte[] serialize(T object); }
提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中
enum SerializerAlgorithm implements Serializer { // Java 实现 Java { @Override public <T> T deserialize(Class<T> clazz, byte[] bytes) { try { ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes)); Object object = in.readObject(); return (T) object; } catch (IOException | ClassNotFoundException e) { throw new RuntimeException("SerializerAlgorithm.Java 反序列化错误", e); } } @Override public <T> byte[] serialize(T object) { try { ByteArrayOutputStream out = new ByteArrayOutputStream(); new ObjectOutputStream(out).writeObject(object); return out.toByteArray(); } catch (IOException e) { throw new RuntimeException("SerializerAlgorithm.Java 序列化错误", e); } } }, // Json 实现(引入了 Gson 依赖) Json { @Override public <T> T deserialize(Class<T> clazz, byte[] bytes) { return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz); } @Override public <T> byte[] serialize(T object) { return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8); } }; // 需要从协议的字节中得到是哪种序列化算法 public static SerializerAlgorithm getByInt(int type) { SerializerAlgorithm[] array = SerializerAlgorithm.values(); if (type < 0 || type > array.length - 1) { throw new IllegalArgumentException("超过 SerializerAlgorithm 范围"); } return array[type]; } }
增加配置类和配置文件
public abstract class Config { static Properties properties; static { try (InputStream in = Config.class.getResourceAsStream("/application.properties")) { properties = new Properties(); properties.load(in); } catch (IOException e) { throw new ExceptionInInitializerError(e); } } public static int getServerPort() { String value = properties.getProperty("server.port"); if(value == null) { return 8080; } else { return Integer.parseInt(value); } } public static Serializer.Algorithm getSerializerAlgorithm() { String value = properties.getProperty("serializer.algorithm"); if(value == null) { return Serializer.Algorithm.Java; } else { return Serializer.Algorithm.valueOf(value); } } }
配置文件
serializer.algorithm=Json
修改编解码器
/** * 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的 */ public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> { @Override public void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception { ByteBuf out = ctx.alloc().buffer(); // 1. 4 字节的魔数 out.writeBytes(new byte[]{1, 2, 3, 4}); // 2. 1 字节的版本, out.writeByte(1); // 3. 1 字节的序列化方式 jdk 0 , json 1 out.writeByte(Config.getSerializerAlgorithm().ordinal()); // 4. 1 字节的指令类型 out.writeByte(msg.getMessageType()); // 5. 4 个字节 out.writeInt(msg.getSequenceId()); // 无意义,对齐填充 out.writeByte(0xff); // 6. 获取内容的字节数组 byte[] bytes = Config.getSerializerAlgorithm().serialize(msg); // 7. 长度 out.writeInt(bytes.length); // 8. 写入内容 out.writeBytes(bytes); outList.add(out); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int magicNum = in.readInt(); byte version = in.readByte(); byte serializerAlgorithm = in.readByte(); // 0 或 1 byte messageType = in.readByte(); // 0,1,2... int sequenceId = in.readInt(); in.readByte(); int length = in.readInt(); byte[] bytes = new byte[length]; in.readBytes(bytes, 0, length); // 找到反序列化算法 Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm]; // 确定具体消息类型 Class<? extends Message> messageClass = Message.getMessageClass(messageType); Message message = algorithm.deserialize(messageClass, bytes); // log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length); // log.debug("{}", message); out.add(message); } }
其中确定具体消息类型,可以根据 消息类型字节
获取到对应的 消息 class
@Data public abstract class Message implements Serializable { /** * 根据消息类型字节,获得对应的消息 class * @param messageType 消息类型字节 * @return 消息 class */ public static Class<? extends Message> getMessageClass(int messageType) { return messageClasses.get(messageType); } private int sequenceId; private int messageType; public abstract int getMessageType(); public static final int LoginRequestMessage = 0; public static final int LoginResponseMessage = 1; public static final int ChatRequestMessage = 2; public static final int ChatResponseMessage = 3; public static final int GroupCreateRequestMessage = 4; public static final int GroupCreateResponseMessage = 5; public static final int GroupJoinRequestMessage = 6; public static final int GroupJoinResponseMessage = 7; public static final int GroupQuitRequestMessage = 8; public static final int GroupQuitResponseMessage = 9; public static final int GroupChatRequestMessage = 10; public static final int GroupChatResponseMessage = 11; public static final int GroupMembersRequestMessage = 12; public static final int GroupMembersResponseMessage = 13; public static final int PingMessage = 14; public static final int PongMessage = 15; private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>(); static { messageClasses.put(LoginRequestMessage, LoginRequestMessage.class); messageClasses.put(LoginResponseMessage, LoginResponseMessage.class); messageClasses.put(ChatRequestMessage, ChatRequestMessage.class); messageClasses.put(ChatResponseMessage, ChatResponseMessage.class); messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class); messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class); messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class); messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class); messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class); messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class); messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class); messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class); messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class); messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class); } }
2 参数调优
2.1 CONNECT_TIMEOUT_MILLIS
- 属于 SocketChannal 参数
- 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
- SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间
@Slf4j public class TestConnectionTimeout { public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300) .channel(NioSocketChannel.class) .handler(new LoggingHandler()); ChannelFuture future = bootstrap.connect("127.0.0.1", 8080); future.sync().channel().closeFuture().sync(); // 断点1 } catch (Exception e) { e.printStackTrace(); log.debug("timeout"); } finally { group.shutdownGracefully(); } } }
另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
@Override public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { // ... // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); // 断点2 if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } // ... }
2.2 SO_BACKLOG
- 属于 ServerSocketChannal 参数
第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
- 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue
其中
在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制
sync queue - 半连接队列
大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
accept queue - 全连接队列
其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client
netty 中
可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小
可以通过下面源码查看默认大小
public class DefaultServerSocketChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig { private volatile int backlog = NetUtil.SOMAXCONN; // ... }
课堂调试关键断点为:io.netty.channel.nio.NioEventLoop#processSelectedKey
oio 中更容易说明,不用 debug 模式
public class Server { public static void main(String[] args) throws IOException { ServerSocket ss = new ServerSocket(8888, 2); Socket accept = ss.accept(); System.out.println(accept); System.in.read(); } }
客户端启动 4 个
public class Client { public static void main(String[] args) throws IOException { try { Socket s = new Socket(); System.out.println(new Date()+" connecting..."); s.connect(new InetSocketAddress("localhost", 8888),1000); System.out.println(new Date()+" connected..."); s.getOutputStream().write(1); System.in.read(); } catch (IOException e) { System.out.println(new Date()+" connecting timeout..."); e.printStackTrace(); } } }
第 1,2,3 个客户端都打印,但除了第一个处于 accpet 外,其它两个都处于 accept queue 中
Tue Apr 21 20:30:28 CST 2020 connecting... Tue Apr 21 20:30:28 CST 2020 connected...
第 4 个客户端连接时
Tue Apr 21 20:53:58 CST 2020 connecting... Tue Apr 21 20:53:59 CST 2020 connecting timeout... java.net.SocketTimeoutException: connect timed out
2.3 ulimit -n
- 属于操作系统参数
2.4 TCP_NODELAY
- 属于 SocketChannal 参数
2.5 SO_SNDBUF & SO_RCVBUF
- SO_SNDBUF 属于 SocketChannal 参数
- SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
2.6 ALLOCATOR
- 属于 SocketChannal 参数
- 用来分配 ByteBuf, ctx.alloc()
2.7 RCVBUF_ALLOCATOR
- 属于 SocketChannal 参数
- 控制 netty 接收缓冲区大小
- 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定