开发者社区> 问答> 正文

netty怎么判断客户端是纯socket还是websocket - netty报错

业务需要,一个服务端同时对websocket与纯socket进行服务。问题来了

开放前两行addlast,纯socket有效,web无效;

注掉前两行,开放后3行,反之。

请问有什么办法,同时支持吗

protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new StringEncoder(Charset.forName("GBK")));
                        pipeline.addLast(new ByteArrayEncoder());
//                        pipeline.addLast("http-codec", new HttpServerCodec());
//                        pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
//                        pipeline.addLast("http-chunked", new ChunkedWriteHandler());
                        pipeline.addLast("handler", new WebServerHandler()); // 客户端触发操作
                    }

展开
收起
montos 2020-06-02 19:14:21 1195 0
1 条回答
写回答
取消 提交回答
  • 从channel是判断不出来的, websocket协议在channel中没有体现. 

    如果用netty, 绑定一个端口来接收socket, websocket的连接, 可以尝试如下:

    1. 将它们的inbound,outbound处理类, 都在初始化时加进去, websocket的放在前面.

    2. websocket在请求连接后, 会有一个http握手请求, 普通socket连接也可以在websocket 自定义inbound处理类中处理

    3. 判断是否为 websocket请求, 是(pipeline中移除socket的处理类), 否(pipeline中移除websocket的处理类), 以后再来的消息, 就只会到各自的处理类中了.

     

     

    ######回复 @hanzhenchao : 对的, 自定义的inbound处理类要用fireChannelRead传递处理. 有一点要注意HttpServerCodec, 这个在做解析处理时, 如果解析不成功, 不会往后传递######回复 @zb1497041270838 : 您好,请问socket请求在websocket的handler判断后加上socket的hanlder,最后怎么触发执行后续的handler?ctx.fireChannelRead???######

    不同端口监听

    ######
    public class WSProtobufServerInitialzer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
    
            pipeline.addLast(new ReadTimeoutHandler(60)); // 长时间不写会断
    
    
            // ====================== 增加心跳支持 start    ======================
            // 针对客户端,如果在1分钟时没有向服务端发送读写心跳(ALL),则主动断开
            // 如果是读空闲或者写空闲,不处理
            pipeline.addLast(new IdleStateHandler(0, 0, 15));
            // 自定义的空闲状态检测
    
            //判断是websocket 还是普通socket
            //如果是websocket 则添加HttpServerCodec()等   否则添加new ProtobufDecoder()等
            pipeline.addLast(new SocketChooseHandle());
    
        }
    
    }
    
    在SocketChooseHandle()中通过
    private String getBufStart(ByteBuf in) {
        int length = in.readableBytes();
        if (length > MAX_LENGTH) {
            length = MAX_LENGTH;
        }
    
        // 标记读位置
        in.markReaderIndex();
        byte[] content = new byte[length];
        in.readBytes(content);
        return new String(content);
    }获取握手的协议前缀
    /**
     * 协议初始化
     * <p>
     * 用来判定实际使用什么协议.</b>
     */
    public class SocketChooseHandle extends ByteToMessageDecoder {
        /**
         * 默认暗号长度为23
         */
        private static final int MAX_LENGTH = 23;
        /**
         * WebSocket握手的协议前缀
         */
        private static final String WEBSOCKET_PREFIX = "GET /";
    
    
        @Override
        protected void decode(ChannelHandlerContext ch, ByteBuf in, List<Object> out) throws Exception {
            String protocol = getBufStart(in);
    
            ChannelPipeline pipeline = ch.pipeline();
            if (protocol.startsWith(WEBSOCKET_PREFIX)) {
                //websocket 基于http协议,所以要有http  //如果不是websocket 则删除
                pipeline.addLast(new HttpServerCodec());
                // 对写大数据流的支持
                pipeline.addLast(new ChunkedWriteHandler());
                // 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
                // 几乎在netty中的编程,都会使用到此hanler
                pipeline.addLast(new HttpObjectAggregator(1024 * 64));
    
                pipeline.addLast(new WebSocketServerProtocolHandler("/web"));
    
    
                // 协议包解码
                pipeline.addLast(new MessageToMessageDecoder<WebSocketFrame>() {
                    @Override
                    protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> objs) throws Exception {
                        ByteBuf buf = frame.content();
                        objs.add(buf);
                        buf.retain();
                    }
                });
    
                // 协议包解码时指定Protobuf字节数实例化为CommonProtocol类型
                pipeline.addLast(new ProtobufDecoder(MyProbuf.DataContent.getDefaultInstance()));
    
    
                pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
    
                // 协议包编码
                pipeline.addLast(new MessageToMessageEncoder<MessageLiteOrBuilder>() {
                    @Override
                    protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
                        ByteBuf result = null;
                        if (msg instanceof MessageLite) {
                            result = Unpooled.wrappedBuffer(((MessageLite) msg).toByteArray());
                        }
                        if (msg instanceof MessageLite.Builder) {
                            result = Unpooled.wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
                        }
    
                        // ==== 上面代码片段是拷贝自TCP ProtobufEncoder 源码 ====
                        // 然后下面再转成websocket二进制流,因为客户端不能直接解析protobuf编码生成的
    
                        WebSocketFrame frame = new BinaryWebSocketFrame(result);
                        out.add(frame);
                    }
                });
    
            } else {
                // 解码和编码,应和客户端一致
                //传输的协议 Protobuf
                pipeline.addLast(new ProtobufVarint32FrameDecoder());
                pipeline.addLast(new ProtobufDecoder(MyProbuf.DataContent.getDefaultInstance()));
                pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
                pipeline.addLast(new ProtobufEncoder());
            }
    
            //添加自定义处理器
            pipeline.addLast(new ProtobufHandler());
    
            in.resetReaderIndex();
            pipeline.remove(this.getClass());
    
        }
    
    
        private String getBufStart(ByteBuf in) {
            int length = in.readableBytes();
            if (length > MAX_LENGTH) {
                length = MAX_LENGTH;
            }
    
            // 标记读位置
            in.markReaderIndex();
            byte[] content = new byte[length];
            in.readBytes(content);
            return new String(content);
        }
    }

     

    2020-06-02 19:14:32
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载