业务需要,一个服务端同时对websocket与纯socket进行服务。问题来了 开放前两行addlast,纯socket有效,web无效; 注掉前两行,开放后3行,反之。 请问有什么办法,同时支持吗
protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringEncoder(Charset.forName("GBK"))); pipeline.addLast(new ByteArrayEncoder()); // pipeline.addLast("http-codec", new HttpServerCodec()); // pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // pipeline.addLast("http-chunked", new ChunkedWriteHandler()); pipeline.addLast("handler", new WebServerHandler()); // 客户端触发操作 }
从channel是判断不出来的, websocket协议在channel中没有体现. 如果用netty, 绑定一个端口来接收socket, websocket的连接, 可以尝试如下: 1. 将它们的inbound,outbound处理类, 都在初始化时加进去, websocket的放在前面. 2. websocket在请求连接后, 会有一个http握手请求, 普通socket连接也可以在websocket 自定义inbound处理类中处理 3. 判断是否为 websocket请求, 是(pipeline中移除socket的处理类), 否(pipeline中移除websocket的处理类), 以后再来的消息, 就只会到各自的处理类中了. ######回复 @hanzhenchao : 对的, 自定义的inbound处理类要用fireChannelRead传递处理. 有一点要注意HttpServerCodec, 这个在做解析处理时, 如果解析不成功, 不会往后传递######回复 @zb1497041270838 : 您好,请问socket请求在websocket的handler判断后加上socket的hanlder,最后怎么触发执行后续的handler?ctx.fireChannelRead???######不同端口监听######
public class WSProtobufServerInitialzer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ReadTimeoutHandler(60)); // 长时间不写会断
// ====================== 增加心跳支持 start ======================
// 针对客户端,如果在1分钟时没有向服务端发送读写心跳(ALL),则主动断开
// 如果是读空闲或者写空闲,不处理
pipeline.addLast(new IdleStateHandler(0, 0, 15));
// 自定义的空闲状态检测
//判断是websocket 还是普通socket
//如果是websocket 则添加HttpServerCodec()等 否则添加new ProtobufDecoder()等
pipeline.addLast(new SocketChooseHandle());
}
}
在SocketChooseHandle()中通过 private String getBufStart(ByteBuf in) { int length = in.readableBytes(); if (length > MAX_LENGTH) { length = MAX_LENGTH; }
// 标记读位置
in.markReaderIndex();
byte[] content = new byte[length];
in.readBytes(content);
return new String(content);
}获取握手的协议前缀
/** * 协议初始化 * <p> * 用来判定实际使用什么协议.</b> / public class SocketChooseHandle extends ByteToMessageDecoder { /* * 默认暗号长度为23 / private static final int MAX_LENGTH = 23; /* * WebSocket握手的协议前缀 */ private static final String WEBSOCKET_PREFIX = "GET /";
@Override
protected void decode(ChannelHandlerContext ch, ByteBuf in, List<Object> out) throws Exception {
String protocol = getBufStart(in);
ChannelPipeline pipeline = ch.pipeline();
if (protocol.startsWith(WEBSOCKET_PREFIX)) {
//websocket 基于http协议,所以要有http //如果不是websocket 则删除
pipeline.addLast(new HttpServerCodec());
// 对写大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
// 几乎在netty中的编程,都会使用到此hanler
pipeline.addLast(new HttpObjectAggregator(1024 * 64));
pipeline.addLast(new WebSocketServerProtocolHandler("/web"));
// 协议包解码
pipeline.addLast(new MessageToMessageDecoder<WebSocketFrame>() {
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> objs) throws Exception {
ByteBuf buf = frame.content();
objs.add(buf);
buf.retain();
}
});
// 协议包解码时指定Protobuf字节数实例化为CommonProtocol类型
pipeline.addLast(new ProtobufDecoder(MyProbuf.DataContent.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
// 协议包编码
pipeline.addLast(new MessageToMessageEncoder<MessageLiteOrBuilder>() {
@Override
protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
ByteBuf result = null;
if (msg instanceof MessageLite) {
result = Unpooled.wrappedBuffer(((MessageLite) msg).toByteArray());
}
if (msg instanceof MessageLite.Builder) {
result = Unpooled.wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
}
// ==== 上面代码片段是拷贝自TCP ProtobufEncoder 源码 ====
// 然后下面再转成websocket二进制流,因为客户端不能直接解析protobuf编码生成的
WebSocketFrame frame = new BinaryWebSocketFrame(result);
out.add(frame);
}
});
} else {
// 解码和编码,应和客户端一致
//传输的协议 Protobuf
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(MyProbuf.DataContent.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
}
//添加自定义处理器
pipeline.addLast(new ProtobufHandler());
in.resetReaderIndex();
pipeline.remove(this.getClass());
}
private String getBufStart(ByteBuf in) {
int length = in.readableBytes();
if (length > MAX_LENGTH) {
length = MAX_LENGTH;
}
// 标记读位置
in.markReaderIndex();
byte[] content = new byte[length];
in.readBytes(content);
return new String(content);
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。