业务需要,一个服务端同时对websocket与纯socket进行服务。问题来了
开放前两行addlast,纯socket有效,web无效;
注掉前两行,开放后3行,反之。
请问有什么办法,同时支持吗
protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringEncoder(Charset.forName("GBK"))); pipeline.addLast(new ByteArrayEncoder()); // pipeline.addLast("http-codec", new HttpServerCodec()); // pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // pipeline.addLast("http-chunked", new ChunkedWriteHandler()); pipeline.addLast("handler", new WebServerHandler()); // 客户端触发操作 }
从channel是判断不出来的, websocket协议在channel中没有体现.
如果用netty, 绑定一个端口来接收socket, websocket的连接, 可以尝试如下:
1. 将它们的inbound,outbound处理类, 都在初始化时加进去, websocket的放在前面.
2. websocket在请求连接后, 会有一个http握手请求, 普通socket连接也可以在websocket 自定义inbound处理类中处理
3. 判断是否为 websocket请求, 是(pipeline中移除socket的处理类), 否(pipeline中移除websocket的处理类), 以后再来的消息, 就只会到各自的处理类中了.
######回复 @hanzhenchao : 对的, 自定义的inbound处理类要用fireChannelRead传递处理. 有一点要注意HttpServerCodec, 这个在做解析处理时, 如果解析不成功, 不会往后传递######回复 @zb1497041270838 : 您好,请问socket请求在websocket的handler判断后加上socket的hanlder,最后怎么触发执行后续的handler?ctx.fireChannelRead???######
不同端口监听
######public class WSProtobufServerInitialzer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ReadTimeoutHandler(60)); // 长时间不写会断 // ====================== 增加心跳支持 start ====================== // 针对客户端,如果在1分钟时没有向服务端发送读写心跳(ALL),则主动断开 // 如果是读空闲或者写空闲,不处理 pipeline.addLast(new IdleStateHandler(0, 0, 15)); // 自定义的空闲状态检测 //判断是websocket 还是普通socket //如果是websocket 则添加HttpServerCodec()等 否则添加new ProtobufDecoder()等 pipeline.addLast(new SocketChooseHandle()); } } 在SocketChooseHandle()中通过 private String getBufStart(ByteBuf in) { int length = in.readableBytes(); if (length > MAX_LENGTH) { length = MAX_LENGTH; } // 标记读位置 in.markReaderIndex(); byte[] content = new byte[length]; in.readBytes(content); return new String(content); }获取握手的协议前缀
/** * 协议初始化 * <p> * 用来判定实际使用什么协议.</b> */ public class SocketChooseHandle extends ByteToMessageDecoder { /** * 默认暗号长度为23 */ private static final int MAX_LENGTH = 23; /** * WebSocket握手的协议前缀 */ private static final String WEBSOCKET_PREFIX = "GET /"; @Override protected void decode(ChannelHandlerContext ch, ByteBuf in, List<Object> out) throws Exception { String protocol = getBufStart(in); ChannelPipeline pipeline = ch.pipeline(); if (protocol.startsWith(WEBSOCKET_PREFIX)) { //websocket 基于http协议,所以要有http //如果不是websocket 则删除 pipeline.addLast(new HttpServerCodec()); // 对写大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); // 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse // 几乎在netty中的编程,都会使用到此hanler pipeline.addLast(new HttpObjectAggregator(1024 * 64)); pipeline.addLast(new WebSocketServerProtocolHandler("/web")); // 协议包解码 pipeline.addLast(new MessageToMessageDecoder<WebSocketFrame>() { @Override protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> objs) throws Exception { ByteBuf buf = frame.content(); objs.add(buf); buf.retain(); } }); // 协议包解码时指定Protobuf字节数实例化为CommonProtocol类型 pipeline.addLast(new ProtobufDecoder(MyProbuf.DataContent.getDefaultInstance())); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); // 协议包编码 pipeline.addLast(new MessageToMessageEncoder<MessageLiteOrBuilder>() { @Override protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception { ByteBuf result = null; if (msg instanceof MessageLite) { result = Unpooled.wrappedBuffer(((MessageLite) msg).toByteArray()); } if (msg instanceof MessageLite.Builder) { result = Unpooled.wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray()); } // ==== 上面代码片段是拷贝自TCP ProtobufEncoder 源码 ==== // 然后下面再转成websocket二进制流,因为客户端不能直接解析protobuf编码生成的 WebSocketFrame frame = new BinaryWebSocketFrame(result); out.add(frame); } }); } else { // 解码和编码,应和客户端一致 //传输的协议 Protobuf pipeline.addLast(new ProtobufVarint32FrameDecoder()); pipeline.addLast(new ProtobufDecoder(MyProbuf.DataContent.getDefaultInstance())); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); } //添加自定义处理器 pipeline.addLast(new ProtobufHandler()); in.resetReaderIndex(); pipeline.remove(this.getClass()); } private String getBufStart(ByteBuf in) { int length = in.readableBytes(); if (length > MAX_LENGTH) { length = MAX_LENGTH; } // 标记读位置 in.markReaderIndex(); byte[] content = new byte[length]; in.readBytes(content); return new String(content); } }
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。