项目集成Websocket

简介: 之前接触的公安项目,里边包含大屏这一块,要求数据的实时性,考虑到这种场景,目前常见的方案有两种,轮询和Websocket通信。轮询,其实就是定时发送请求,也就是普通的客户端与服务端通信过程,只不过是一种无限循环发送的方式,这样可以保证服务端一旦有最新消息,就可以被客户端获取。但是这种方式有两个明显的缺点,开销大并且很被动。Websocket的特点是具有一次握手,持久连接,以及主动推送,并且性能损耗较低,是解决上述场景的完美方式。

什么是websocket

首先要区别Websocket和我们常用的HTTP协议,HTTP协议只能实现客户端请求,服务端响应的这种单项通信。而WebSocket可以实现客户端与服务端的双向通讯,说白了,最大也是最明显的区别就是可以做到服务端主动将消息推送给客户端。

项目中的运用

在项目中采用了基于netty搭建websocket,实现消息的主动推送。

架构图

下图是项目中有关使用websocket实时推送数据到大屏端的服务流程图。

项目结构

socket集成

添加依赖

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>5.0.0.Alpha2</version></dependency>

基于Netty的Socket服务初始化加载类

@ComponentpublicclassNettyServerRunnerimplementsCommandLineRunner {
privatefinalLoggerLOGGER=LogUtils.getLogger(NettyServerRunner.class);
@Overridepublicvoidrun(String... args) throwsException {
//要监听的端口intport=18000;
// socket地址Stringuri="/ws/api/datapush";
EventLoopGroupbossGroup=newNioEventLoopGroup(1);
EventLoopGroupworkerGroup=newNioEventLoopGroup();
try {
ServerBootstrapbootstrap=newServerBootstrap();
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// 绑定主线程组和工作线程组bootstrap.group(bossGroup, workerGroup) 
    .channel(NioServerSocketChannel.class)
    .handler(newLoggingHandler(LogLevel.INFO))
// 绑定监听端口    .localAddress(port)
// 绑定客户端连接时候触发操作    .childHandler(newMyChannelInitializer(uri))
    .childOption(ChannelOption.TCP_NODELAY, true)
             .childOption(ChannelOption.SO_KEEPALIVE, true);
// 服务器异步创建绑定ChannelFuturecf=bootstrap.bind().sync(); 
LOGGER.warn("启动正在监听: "+cf.channel().localAddress());
// 关闭服务器通道cf.channel().closeFuture().sync();
    } finally {
// 释放线程池资源workerGroup.shutdownGracefully().sync();
bossGroup.shutdownGracefully().sync();
    }
 }

初始化Socket服务端配置项

publicclassMyChannelInitializerextendsChannelInitializer<SocketChannel> {
privatestaticfinalintMAX_CONTENT_LENGTH=1024*1024;
privatestaticfinalCharsetCHARSET_UTF8=Charset.forName(CenterKeys.CHARSET_UTF8);
privateStringwsUri;
publicMyChannelInitializer(StringwsUri) {
Preconditions.checkArgument(StringUtils.isNotBlank(wsUri), "Socket地址不能为空");
this.wsUri=wsUri;
    }
@OverrideprotectedvoidinitChannel(SocketChannelchannel) throwsException    {
ChannelPipelinepipeline=channel.pipeline();
// HTTP 服务的解码器pipeline.addLast(newHttpServerCodec());
// HTTP 消息的合并处理pipeline.addLast(newHttpObjectAggregator(MAX_CONTENT_LENGTH));
pipeline.addLast(newStringDecoder(CHARSET_UTF8));
pipeline.addLast(newStringEncoder(CHARSET_UTF8));
// 以块的方式来写的处理器pipeline.addLast(newChunkedWriteHandler());
//自定义的业务handlerpipeline.addLast(MyWebSocketHandler.getInstance());
pipeline.addLast(newWebSocketServerProtocolHandler(wsUri, "WebSocket", true));
    }
}

Socket服务端监听事件处理

@SharablepublicclassMyWebSocketHandlerextendsSimpleChannelInboundHandler<TextWebSocketFrame> {
privatestaticfinalLoggerLOGGER=LogUtils.getLogger(MyWebSocketHandler.class);
privatestaticfinalChannelGroupCHANNEL_GROUP=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);
privatestaticfinalMyWebSocketHandlerINSTANCE=newMyWebSocketHandler();
privateMyWebSocketHandler() {}
publicstaticMyWebSocketHandlergetInstance() {
returnINSTANCE;
    }
@OverridepublicvoidchannelRegistered(ChannelHandlerContextctx) throwsException    {
StringclientIp=getClientAddress(ctx);
super.channelRegistered(ctx);
LOGGER.info("=======收到客户端{}注册通知=======", clientIp);
    }
@OverridepublicvoidhandlerAdded(ChannelHandlerContextctx) throwsException    {
StringclientIp=getClientAddress(ctx);
CHANNEL_GROUP.add(ctx.channel());
super.handlerAdded(ctx);
LOGGER.info("=======收到客户端{}加入通知=======", clientIp);
    }
@OverridepublicvoidhandlerRemoved(ChannelHandlerContextctx) throwsException    {
StringclientIp=getClientAddress(ctx);
CHANNEL_GROUP.remove(ctx.channel());
super.handlerRemoved(ctx);
LOGGER.info("=======收到客户端{}离开通知=======", clientIp);
    }
//当有客户端连接时候会被channelActive监听到,//当断开时会被channelInactive监听到,一般在这两个方法中去保存/移除客户端的通道信息@OverridepublicvoidchannelActive(ChannelHandlerContextctx) throwsException    {
StringclientIp=getClientAddress(ctx);
super.channelActive(ctx);
LOGGER.info("=======收到客户端{}在线通知=======", clientIp);
    }
@OverridepublicvoidchannelInactive(ChannelHandlerContextctx) throwsException    {
StringclientIp=getClientAddress(ctx);
CHANNEL_GROUP.remove(ctx.channel());
super.channelInactive(ctx);
LOGGER.info("=======收到客户端{}离线通知=======", clientIp);
    }
@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx, Throwablecause) {
ctx.close();
    }
@OverridepublicvoidchannelRead(ChannelHandlerContextctx, Objectmessage) throwsException    {
if (null!=message&&messageinstanceofTextWebSocketFrame) 
        {
TextWebSocketFrameframe= (TextWebSocketFrame) message;
LOGGER.info("接收到客户端请求参数:{}。", frame.text());
Stringresult="要通知的消息"LOGGER.info("客户端连接个数:{}。", CHANNEL_GROUP.size());
// 收到信息后,群发给所有channelCHANNEL_GROUP.writeAndFlush(newTextWebSocketFrame(result));
        }
super.channelRead(ctx, message);
    }
/*** .推送消息通知到客户端*  PushFuncType  可以定义一个Socket消息推送业务功能类型的枚举类*/publicstaticvoidnotify(PushFuncTypefuncType) throwsException    {
// 收到信息后,群发给所有channelLOGGER.info("客户端连接个数:{}。", CHANNEL_GROUP.size());
CHANNEL_GROUP.writeAndFlush(newTextWebSocketFrame(funcType.toString()));
    }
@OverrideprotectedvoidmessageReceived(ChannelHandlerContextchannelHandlerContext, TextWebSocketFramemessage)
throwsException {
    }
privateStringgetClientAddress(ChannelHandlerContextctx) 
    {
InetSocketAddressinsocket= (InetSocketAddress) ctx.channel().remoteAddress();
returninsocket.getAddress().getHostAddress();
    }
}

服务端推送方式

方式一:通过channelRead(ChannelHandlerContext ctx, Object message)方法,将推送消息群发给所有channel。

方式二:通过void notify(PushFuncType funcType)方法,将消息通知发送给所有channel,服务端根据消息通知种类,发送对应的HTTP请求获取消息内容。

小结

客户端代码,只需要javascript的内置对象Websocket即可。本文基于Netty搭建Websocket,是在项目中能够快速集成Websocket的一种方式,实现服务端主动实时推送数据到客户端,实现方案在代码中注释的很清楚,阅读代码即可。关于Netty的相关知识本人也尚在学习之中,这里就不再过多赘述。

相关文章
|
2月前
|
安全 Java 数据库
SpringSecurity认证授权及项目集成
本文介绍了基于Spring Security的权限管理框架,涵盖认证、授权与鉴权核心概念,通过快速入门示例演示集成流程,并结合数据库实现用户认证。进一步扩展实现正常登录,JWT登录及鉴权管理器,实现灵活的安全控制,适用于前后端分离项目中的权限设计与实践。
241 4
|
2月前
|
资源调度 JavaScript 前端开发
在Vue 3项目中集成Element Plus组件库的步骤
总结起来,在集成过程当中我们关注于库本身提供功能与特性、环境搭建与依赖管理、模块化编程思想以及前端工程化等方面知识点;同时也涵盖前端性能优化(比如上文提及“按需加载”)与定制化开发(例如“自定义主题”)等高级话题.
219 16
|
4月前
|
JSON 分布式计算 大数据
springboot项目集成大数据第三方dolphinscheduler调度器
springboot项目集成大数据第三方dolphinscheduler调度器
243 3
|
4月前
|
Java 关系型数据库 数据库连接
Spring Boot项目集成MyBatis Plus操作PostgreSQL全解析
集成 Spring Boot、PostgreSQL 和 MyBatis Plus 的步骤与 MyBatis 类似,只不过在 MyBatis Plus 中提供了更多的便利功能,如自动生成 SQL、分页查询、Wrapper 查询等。
371 3
|
4月前
|
Java 关系型数据库 MySQL
springboot项目集成dolphinscheduler调度器 实现datax数据同步任务
springboot项目集成dolphinscheduler调度器 实现datax数据同步任务
505 2
|
4月前
|
分布式计算 Java 大数据
springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理
springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理
268 2
|
4月前
|
物联网 Linux 开发者
快速部署自己私有MQTT-Broker-下载安装到运行不到一分钟,快速简单且易于集成到自己项目中
本文给物联网开发的朋友推荐的是GMQT,让物联网开发者快速拥有合适自己的MQTT-Broker,本文从下载程序到安装部署手把手教大家安装用上私有化MQTT服务器。
1221 5
|
分布式计算 大数据 Java
springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务
springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务
75 0
|
分布式计算 Java 大数据
springboot项目集成dolphinscheduler调度器 项目管理
springboot项目集成dolphinscheduler调度器 项目管理
113 0
|
9月前
|
消息中间件 XML 前端开发
springBoot集成websocket实时消息推送
本文介绍了如何在Spring Boot项目中集成WebSocket实现实时消息推送。首先,通过引入`spring-boot-starter-websocket`依赖,配置`WebSocketConfig`类来启用WebSocket支持。接着,创建`WebSocketTest`服务器类,处理连接、消息收发及错误等事件,并使用`ConcurrentHashMap`管理用户连接。最后,前端通过JavaScript建立WebSocket连接,监听消息并进行相应处理。此方案适用于需要实时通信的应用场景,如聊天室、通知系统等。
1273 2