什么是websocket
首先要区别Websocket和我们常用的HTTP协议,HTTP协议只能实现客户端请求,服务端响应的这种单项通信。而WebSocket可以实现客户端与服务端的双向通讯,说白了,最大也是最明显的区别就是可以做到服务端主动将消息推送给客户端。
项目中的运用
在项目中采用了基于netty搭建websocket,实现消息的主动推送。
架构图
下图是项目中有关使用websocket实时推送数据到大屏端的服务流程图。
项目结构
socket集成
添加依赖
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>5.0.0.Alpha2</version></dependency>
基于Netty的Socket服务初始化加载类
publicclassNettyServerRunnerimplementsCommandLineRunner { privatefinalLoggerLOGGER=LogUtils.getLogger(NettyServerRunner.class); publicvoidrun(String... args) throwsException { //要监听的端口intport=18000; // socket地址Stringuri="/ws/api/datapush"; EventLoopGroupbossGroup=newNioEventLoopGroup(1); EventLoopGroupworkerGroup=newNioEventLoopGroup(); try { ServerBootstrapbootstrap=newServerBootstrap(); bootstrap.option(ChannelOption.SO_BACKLOG, 1024); // 绑定主线程组和工作线程组bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(newLoggingHandler(LogLevel.INFO)) // 绑定监听端口 .localAddress(port) // 绑定客户端连接时候触发操作 .childHandler(newMyChannelInitializer(uri)) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true); // 服务器异步创建绑定ChannelFuturecf=bootstrap.bind().sync(); LOGGER.warn("启动正在监听: "+cf.channel().localAddress()); // 关闭服务器通道cf.channel().closeFuture().sync(); } finally { // 释放线程池资源workerGroup.shutdownGracefully().sync(); bossGroup.shutdownGracefully().sync(); } }
初始化Socket服务端配置项
publicclassMyChannelInitializerextendsChannelInitializer<SocketChannel> { privatestaticfinalintMAX_CONTENT_LENGTH=1024*1024; privatestaticfinalCharsetCHARSET_UTF8=Charset.forName(CenterKeys.CHARSET_UTF8); privateStringwsUri; publicMyChannelInitializer(StringwsUri) { Preconditions.checkArgument(StringUtils.isNotBlank(wsUri), "Socket地址不能为空"); this.wsUri=wsUri; } protectedvoidinitChannel(SocketChannelchannel) throwsException { ChannelPipelinepipeline=channel.pipeline(); // HTTP 服务的解码器pipeline.addLast(newHttpServerCodec()); // HTTP 消息的合并处理pipeline.addLast(newHttpObjectAggregator(MAX_CONTENT_LENGTH)); pipeline.addLast(newStringDecoder(CHARSET_UTF8)); pipeline.addLast(newStringEncoder(CHARSET_UTF8)); // 以块的方式来写的处理器pipeline.addLast(newChunkedWriteHandler()); //自定义的业务handlerpipeline.addLast(MyWebSocketHandler.getInstance()); pipeline.addLast(newWebSocketServerProtocolHandler(wsUri, "WebSocket", true)); } }
Socket服务端监听事件处理
publicclassMyWebSocketHandlerextendsSimpleChannelInboundHandler<TextWebSocketFrame> { privatestaticfinalLoggerLOGGER=LogUtils.getLogger(MyWebSocketHandler.class); privatestaticfinalChannelGroupCHANNEL_GROUP=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE); privatestaticfinalMyWebSocketHandlerINSTANCE=newMyWebSocketHandler(); privateMyWebSocketHandler() {} publicstaticMyWebSocketHandlergetInstance() { returnINSTANCE; } publicvoidchannelRegistered(ChannelHandlerContextctx) throwsException { StringclientIp=getClientAddress(ctx); super.channelRegistered(ctx); LOGGER.info("=======收到客户端{}注册通知=======", clientIp); } publicvoidhandlerAdded(ChannelHandlerContextctx) throwsException { StringclientIp=getClientAddress(ctx); CHANNEL_GROUP.add(ctx.channel()); super.handlerAdded(ctx); LOGGER.info("=======收到客户端{}加入通知=======", clientIp); } publicvoidhandlerRemoved(ChannelHandlerContextctx) throwsException { StringclientIp=getClientAddress(ctx); CHANNEL_GROUP.remove(ctx.channel()); super.handlerRemoved(ctx); LOGGER.info("=======收到客户端{}离开通知=======", clientIp); } //当有客户端连接时候会被channelActive监听到,//当断开时会被channelInactive监听到,一般在这两个方法中去保存/移除客户端的通道信息publicvoidchannelActive(ChannelHandlerContextctx) throwsException { StringclientIp=getClientAddress(ctx); super.channelActive(ctx); LOGGER.info("=======收到客户端{}在线通知=======", clientIp); } publicvoidchannelInactive(ChannelHandlerContextctx) throwsException { StringclientIp=getClientAddress(ctx); CHANNEL_GROUP.remove(ctx.channel()); super.channelInactive(ctx); LOGGER.info("=======收到客户端{}离线通知=======", clientIp); } publicvoidexceptionCaught(ChannelHandlerContextctx, Throwablecause) { ctx.close(); } publicvoidchannelRead(ChannelHandlerContextctx, Objectmessage) throwsException { if (null!=message&&messageinstanceofTextWebSocketFrame) { TextWebSocketFrameframe= (TextWebSocketFrame) message; LOGGER.info("接收到客户端请求参数:{}。", frame.text()); Stringresult="要通知的消息"LOGGER.info("客户端连接个数:{}。", CHANNEL_GROUP.size()); // 收到信息后,群发给所有channelCHANNEL_GROUP.writeAndFlush(newTextWebSocketFrame(result)); } super.channelRead(ctx, message); } /*** .推送消息通知到客户端* PushFuncType 可以定义一个Socket消息推送业务功能类型的枚举类*/publicstaticvoidnotify(PushFuncTypefuncType) throwsException { // 收到信息后,群发给所有channelLOGGER.info("客户端连接个数:{}。", CHANNEL_GROUP.size()); CHANNEL_GROUP.writeAndFlush(newTextWebSocketFrame(funcType.toString())); } protectedvoidmessageReceived(ChannelHandlerContextchannelHandlerContext, TextWebSocketFramemessage) throwsException { } privateStringgetClientAddress(ChannelHandlerContextctx) { InetSocketAddressinsocket= (InetSocketAddress) ctx.channel().remoteAddress(); returninsocket.getAddress().getHostAddress(); } }
服务端推送方式
方式一:通过channelRead(ChannelHandlerContext ctx, Object message)方法,将推送消息群发给所有channel。
方式二:通过void notify(PushFuncType funcType)方法,将消息通知发送给所有channel,服务端根据消息通知种类,发送对应的HTTP请求获取消息内容。
小结
客户端代码,只需要javascript的内置对象Websocket即可。本文基于Netty搭建Websocket,是在项目中能够快速集成Websocket的一种方式,实现服务端主动实时推送数据到客户端,实现方案在代码中注释的很清楚,阅读代码即可。关于Netty的相关知识本人也尚在学习之中,这里就不再过多赘述。