项目集成Websocket

简介: 之前接触的公安项目,里边包含大屏这一块,要求数据的实时性,考虑到这种场景,目前常见的方案有两种,轮询和Websocket通信。轮询,其实就是定时发送请求,也就是普通的客户端与服务端通信过程,只不过是一种无限循环发送的方式,这样可以保证服务端一旦有最新消息,就可以被客户端获取。但是这种方式有两个明显的缺点,开销大并且很被动。Websocket的特点是具有一次握手,持久连接,以及主动推送,并且性能损耗较低,是解决上述场景的完美方式。

什么是websocket

首先要区别Websocket和我们常用的HTTP协议,HTTP协议只能实现客户端请求,服务端响应的这种单项通信。而WebSocket可以实现客户端与服务端的双向通讯,说白了,最大也是最明显的区别就是可以做到服务端主动将消息推送给客户端。

项目中的运用

在项目中采用了基于netty搭建websocket,实现消息的主动推送。

架构图

下图是项目中有关使用websocket实时推送数据到大屏端的服务流程图。

项目结构

socket集成

添加依赖

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>5.0.0.Alpha2</version></dependency>

基于Netty的Socket服务初始化加载类

@ComponentpublicclassNettyServerRunnerimplementsCommandLineRunner {
privatefinalLoggerLOGGER=LogUtils.getLogger(NettyServerRunner.class);
@Overridepublicvoidrun(String... args) throwsException {
//要监听的端口intport=18000;
// socket地址Stringuri="/ws/api/datapush";
EventLoopGroupbossGroup=newNioEventLoopGroup(1);
EventLoopGroupworkerGroup=newNioEventLoopGroup();
try {
ServerBootstrapbootstrap=newServerBootstrap();
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// 绑定主线程组和工作线程组bootstrap.group(bossGroup, workerGroup) 
    .channel(NioServerSocketChannel.class)
    .handler(newLoggingHandler(LogLevel.INFO))
// 绑定监听端口    .localAddress(port)
// 绑定客户端连接时候触发操作    .childHandler(newMyChannelInitializer(uri))
    .childOption(ChannelOption.TCP_NODELAY, true)
             .childOption(ChannelOption.SO_KEEPALIVE, true);
// 服务器异步创建绑定ChannelFuturecf=bootstrap.bind().sync(); 
LOGGER.warn("启动正在监听: "+cf.channel().localAddress());
// 关闭服务器通道cf.channel().closeFuture().sync();
    } finally {
// 释放线程池资源workerGroup.shutdownGracefully().sync();
bossGroup.shutdownGracefully().sync();
    }
 }

初始化Socket服务端配置项

publicclassMyChannelInitializerextendsChannelInitializer<SocketChannel> {
privatestaticfinalintMAX_CONTENT_LENGTH=1024*1024;
privatestaticfinalCharsetCHARSET_UTF8=Charset.forName(CenterKeys.CHARSET_UTF8);
privateStringwsUri;
publicMyChannelInitializer(StringwsUri) {
Preconditions.checkArgument(StringUtils.isNotBlank(wsUri), "Socket地址不能为空");
this.wsUri=wsUri;
    }
@OverrideprotectedvoidinitChannel(SocketChannelchannel) throwsException    {
ChannelPipelinepipeline=channel.pipeline();
// HTTP 服务的解码器pipeline.addLast(newHttpServerCodec());
// HTTP 消息的合并处理pipeline.addLast(newHttpObjectAggregator(MAX_CONTENT_LENGTH));
pipeline.addLast(newStringDecoder(CHARSET_UTF8));
pipeline.addLast(newStringEncoder(CHARSET_UTF8));
// 以块的方式来写的处理器pipeline.addLast(newChunkedWriteHandler());
//自定义的业务handlerpipeline.addLast(MyWebSocketHandler.getInstance());
pipeline.addLast(newWebSocketServerProtocolHandler(wsUri, "WebSocket", true));
    }
}

Socket服务端监听事件处理

@SharablepublicclassMyWebSocketHandlerextendsSimpleChannelInboundHandler<TextWebSocketFrame> {
privatestaticfinalLoggerLOGGER=LogUtils.getLogger(MyWebSocketHandler.class);
privatestaticfinalChannelGroupCHANNEL_GROUP=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);
privatestaticfinalMyWebSocketHandlerINSTANCE=newMyWebSocketHandler();
privateMyWebSocketHandler() {}
publicstaticMyWebSocketHandlergetInstance() {
returnINSTANCE;
    }
@OverridepublicvoidchannelRegistered(ChannelHandlerContextctx) throwsException    {
StringclientIp=getClientAddress(ctx);
super.channelRegistered(ctx);
LOGGER.info("=======收到客户端{}注册通知=======", clientIp);
    }
@OverridepublicvoidhandlerAdded(ChannelHandlerContextctx) throwsException    {
StringclientIp=getClientAddress(ctx);
CHANNEL_GROUP.add(ctx.channel());
super.handlerAdded(ctx);
LOGGER.info("=======收到客户端{}加入通知=======", clientIp);
    }
@OverridepublicvoidhandlerRemoved(ChannelHandlerContextctx) throwsException    {
StringclientIp=getClientAddress(ctx);
CHANNEL_GROUP.remove(ctx.channel());
super.handlerRemoved(ctx);
LOGGER.info("=======收到客户端{}离开通知=======", clientIp);
    }
//当有客户端连接时候会被channelActive监听到,//当断开时会被channelInactive监听到,一般在这两个方法中去保存/移除客户端的通道信息@OverridepublicvoidchannelActive(ChannelHandlerContextctx) throwsException    {
StringclientIp=getClientAddress(ctx);
super.channelActive(ctx);
LOGGER.info("=======收到客户端{}在线通知=======", clientIp);
    }
@OverridepublicvoidchannelInactive(ChannelHandlerContextctx) throwsException    {
StringclientIp=getClientAddress(ctx);
CHANNEL_GROUP.remove(ctx.channel());
super.channelInactive(ctx);
LOGGER.info("=======收到客户端{}离线通知=======", clientIp);
    }
@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx, Throwablecause) {
ctx.close();
    }
@OverridepublicvoidchannelRead(ChannelHandlerContextctx, Objectmessage) throwsException    {
if (null!=message&&messageinstanceofTextWebSocketFrame) 
        {
TextWebSocketFrameframe= (TextWebSocketFrame) message;
LOGGER.info("接收到客户端请求参数:{}。", frame.text());
Stringresult="要通知的消息"LOGGER.info("客户端连接个数:{}。", CHANNEL_GROUP.size());
// 收到信息后,群发给所有channelCHANNEL_GROUP.writeAndFlush(newTextWebSocketFrame(result));
        }
super.channelRead(ctx, message);
    }
/*** .推送消息通知到客户端*  PushFuncType  可以定义一个Socket消息推送业务功能类型的枚举类*/publicstaticvoidnotify(PushFuncTypefuncType) throwsException    {
// 收到信息后,群发给所有channelLOGGER.info("客户端连接个数:{}。", CHANNEL_GROUP.size());
CHANNEL_GROUP.writeAndFlush(newTextWebSocketFrame(funcType.toString()));
    }
@OverrideprotectedvoidmessageReceived(ChannelHandlerContextchannelHandlerContext, TextWebSocketFramemessage)
throwsException {
    }
privateStringgetClientAddress(ChannelHandlerContextctx) 
    {
InetSocketAddressinsocket= (InetSocketAddress) ctx.channel().remoteAddress();
returninsocket.getAddress().getHostAddress();
    }
}

服务端推送方式

方式一:通过channelRead(ChannelHandlerContext ctx, Object message)方法,将推送消息群发给所有channel。

方式二:通过void notify(PushFuncType funcType)方法,将消息通知发送给所有channel,服务端根据消息通知种类,发送对应的HTTP请求获取消息内容。

小结

客户端代码,只需要javascript的内置对象Websocket即可。本文基于Netty搭建Websocket,是在项目中能够快速集成Websocket的一种方式,实现服务端主动实时推送数据到客户端,实现方案在代码中注释的很清楚,阅读代码即可。关于Netty的相关知识本人也尚在学习之中,这里就不再过多赘述。

相关文章
|
7天前
|
jenkins Java 持续交付
运用Jenkins实现Java项目的持续集成与自动化部署
在新建的Jenkins Job中,我们需要配置源码管理,通常选择Git、SVN等版本控制系统,并填入仓库地址和凭据。接着,设置构建触发器,如定时构建、轮询SCM变更、GitHub Webhook等方式,以便在代码提交后自动触发构建过程。
28 2
|
7天前
|
存储 NoSQL Java
大事件后端项目34_登录优化----redis_SpringBoot集成redis
大事件后端项目34_登录优化----redis_SpringBoot集成redis
大事件后端项目34_登录优化----redis_SpringBoot集成redis
|
17天前
|
安全 算法 Java
在Spring Boot项目中集成Jasypt(Java Simplified Encryption)
在Spring Boot项目中集成Jasypt(Java Simplified Encryption)
28 7
|
17天前
|
消息中间件 Java Kafka
集成Kafka到Spring Boot项目中的步骤和配置
集成Kafka到Spring Boot项目中的步骤和配置
51 7
|
17天前
|
监控 前端开发 Java
五分钟后,你将学会在SpringBoot项目中如何集成CAT调用链
五分钟后,你将学会在SpringBoot项目中如何集成CAT调用链
|
2天前
|
XML Java 数据格式
支付系统----微信支付20---创建案例项目--集成Mybatis-plus的补充,target下只有接口的编译文件,xml文件了,添加日志的写法
支付系统----微信支付20---创建案例项目--集成Mybatis-plus的补充,target下只有接口的编译文件,xml文件了,添加日志的写法
|
5天前
Vite 项目中如何去集成 Mock 环境 (插件:vite-plugin-mock)
Vite 项目中如何去集成 Mock 环境 (插件:vite-plugin-mock)
13 0
|
5天前
|
JavaScript
Vite 项目中如何去集成 Sass
Vite 项目中如何去集成 Sass
7 0
|
7天前
|
对象存储
大事件后端项目32--------文件上传_阿里云OSS_程序集成
大事件后端项目32--------文件上传_阿里云OSS_程序集成
|
10天前
|
JavaScript 数据管理 API
从壹开始 [ Ids4实战 ] 之五 ║ 多项目集成统一认证中心的思考
从壹开始 [ Ids4实战 ] 之五 ║ 多项目集成统一认证中心的思考