30分钟快速打造一个完善的直播聊天系统

简介: 下面的代码基于高性能的通信王牌工具 Netty。我们将一些实际场景都添加进去,比如用户身份的验证,游客只能浏览不能发言,多房间(频道)的聊天。 这篇文章非常适合和我一样的 Java 新手,适合作为学习 Java 的切入点,不需要考虑tomcat、spring、mybatis等。唯一的知识点就是 m

下面的代码基于高性能的通信王牌工具 Netty。我们将一些实际场景都添加进去,比如用户身份的验证,游客只能浏览不能发言,多房间(频道)的聊天。

这篇文章非常适合和我一样的 Java 新手,适合作为学习 Java 的切入点,不需要考虑tomcatspringmybatis等。唯一的知识点就是 maven 的基础使用。

完整的代码地址

https://github.com/zhoumengkang/netty-websocket

项目结果如下

├── WebSocketServer.java                启动服务器端口监听
├── WebSocketServerInitializer.java     初始化服务
├── WebSocketServerHandler.java         接管WebSocket数据连接
├── dto
│   └── Response.java                   返回给客户端数据对象
├── entity
│   └── Client.java                     每个连接到WebSocket服务的客户端对象
└── service
    ├── MessageService.java             完成发送消息
    └── RequestService.java             WebSocket初始化连接握手时的数据处理

功能设计概述

身份认证

客户端将用户 id 、进入的房间的 rid、用户 token json_encode,例如{id:1;rid:21;token:'xxx'}。然后在 base64 处理,通过参数request传到服务器,然后在服务器做 id 和 token 的验证(我的做法是 token 存放在redis string 5秒的过期时间)

房间表

使用一个Map channelGroupMap 来存放各个房间(频道),以客户端传握手时传过来的base64 字符串中获取到定义的房间 ID,然后为该房间 ID 新建一个ChannelGroupChannelGroup 方便对该组内的所有客户端广播消息)

在 pom.xml 中引入netty 5

现在大家都有自己的包管理工具,不需要实现下载了然后放到本地lib库中,和 nodejs 的 npm, php 的 compser 一样。

<dependencies>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>5.0.0.Alpha2</version>
    </dependency>
    <dependency>
        <groupId>com.jcraft</groupId>
        <artifactId>jzlib</artifactId>
        <version>1.1.2</version>
    </dependency>
    <dependency>
        <groupId>org.json</groupId>
        <artifactId>json</artifactId>
        <version>20141113</version>
    </dependency>
    <dependency>
        <groupId>commons-codec</groupId>
        <artifactId>commons-codec</artifactId>
        <version>1.10</version>
    </dependency>
</dependencies>

创建服务器

package net.mengkang;
 
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
 
 
public final class WebSocketServer {
 
    private static final int PORT = 8083;
 
    public static void main(String[] args) throws Exception {
 
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new WebSocketServerInitializer());
 
            Channel ch = b.bind(PORT).sync().channel();
            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

package net.mengkang;
 
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
 
 
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
 
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
 
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(65536));
        pipeline.addLast(new WebSocketServerCompressionHandler());
        pipeline.addLast(new WebSocketServerHandler());
    }
}

处理长连接

下面程序中最的处理在握手阶段handleHttpRequest,里面处理参数的判断,用户的认证,登录用户表的维护,直播房间表维护。详细的请大家对照代码来浏览。

握手完成之后的消息传递则在handleWebSocketFrame中处理。

整理的执行流程,大家可以对各个方法打断点予以调试,就会很清楚整个执行的脉络啦。

package net.mengkang;
 
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import net.mengkang.dto.Response;
import net.mengkang.entity.Client;
import net.mengkang.service.MessageService;
import net.mengkang.service.RequestService;
import org.json.JSONObject;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
 
    // websocket 服务的 uri
    private static final String WEBSOCKET_PATH = "/websocket";
 
    // 一个 ChannelGroup 代表一个直播频道
    private static Map<Integer, ChannelGroup> channelGroupMap = new HashMap<>();
 
    // 本次请求的 code
    private static final String HTTP_REQUEST_STRING = "request";
 
    private Client client = null;
 
    private WebSocketServerHandshaker handshaker;
 
    @Override
    public void messageReceived(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }
 
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }
 
    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        // Handle a bad request.
        if (!req.decoderResult().isSuccess()) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
            return;
        }
 
        // Allow only GET methods.
        if (req.method() != GET) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
            return;
        }
 
        if ("/favicon.ico".equals(req.uri()) || ("/".equals(req.uri()))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND));
            return;
        }
 
        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
        Map<String, List<String>> parameters = queryStringDecoder.parameters();
 
        if (parameters.size() == 0 || !parameters.containsKey(HTTP_REQUEST_STRING)) {
            System.err.printf(HTTP_REQUEST_STRING + "参数不可缺省");
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND));
            return;
        }
 
        client = RequestService.clientRegister(parameters.get(HTTP_REQUEST_STRING).get(0));
        if (client.getRoomId() == 0) {
            System.err.printf("房间号不可缺省");
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND));
            return;
        }
 
        // 房间列表中如果不存在则为该频道,则新增一个频道 ChannelGroup
        if (!channelGroupMap.containsKey(client.getRoomId())) {
            channelGroupMap.put(client.getRoomId(), new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
        }
        // 确定有房间号,才将客户端加入到频道中
        channelGroupMap.get(client.getRoomId()).add(ctx.channel());
 
        // Handshake
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, true);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            ChannelFuture channelFuture = handshaker.handshake(ctx.channel(), req);
 
            // 握手成功之后,业务逻辑
            if (channelFuture.isSuccess()) {
                if (client.getId() == 0) {
                    System.out.println(ctx.channel() + " 游客");
                    return;
                }
 
            }
        }
    }
 
    private void broadcast(ChannelHandlerContext ctx, WebSocketFrame frame) {
 
        if (client.getId() == 0) {
            Response response = new Response(1001, "没登录不能聊天哦");
            String msg = new JSONObject(response).toString();
            ctx.channel().write(new TextWebSocketFrame(msg));
            return;
        }
 
        String request = ((TextWebSocketFrame) frame).text();
        System.out.println(" 收到 " + ctx.channel() + request);
 
        Response response = MessageService.sendMessage(client, request);
        String msg = new JSONObject(response).toString();
        if (channelGroupMap.containsKey(client.getRoomId())) {
            channelGroupMap.get(client.getRoomId()).writeAndFlush(new TextWebSocketFrame(msg));
        }
 
    }
 
    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
 
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
        }
 
        broadcast(ctx, frame);
    }
 
    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
        if (res.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            HttpHeaderUtil.setContentLength(res, res.content().readableBytes());
        }
 
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpHeaderUtil.isKeepAlive(req) || res.status().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
 
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("收到" + incoming.remoteAddress() + " 握手请求");
    }
 
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        if (client != null && channelGroupMap.containsKey(client.getRoomId())) {
            channelGroupMap.get(client.getRoomId()).remove(ctx.channel());
        }
    }
 
    private static String getWebSocketLocation(FullHttpRequest req) {
        String location = req.headers().get(HOST) + WEBSOCKET_PATH;
        return "ws://" + location;
    }
}

客户端程序

<html>
<head><title></title></head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
  window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
  socket = new WebSocket("ws://localhost:8083/websocket/?request=e2lkOjE7cmlkOjI2O3Rva2VuOiI0MzYwNjgxMWM3MzA1Y2NjNmFiYjJiZTExNjU3OWJmZCJ9");
  socket.onmessage = function(event) {
      console.log(event.data);
  };
  socket.onopen = function(event) {
    console.log("websocket 打开了");
  };
  socket.onclose = function(event) {
    console.log("websocket 关闭了");
  };
}
 
function send(message) {
  if (!window.WebSocket) { return; }
  if (socket.readyState == WebSocket.OPEN) {
    socket.send(message);
  } else {
    alert("The socket is not open.");
  }
}
</script>
<form onsubmit="return false;">
  <input type="text" name="message" value="Hello, World!"/>
  <input type="button" value="Send Web Socket Data" onclick="send(this.form.message.value)" />
</form>
</body>
</html>

可以伪造多个房间(修改客户端请求的request参数),在 php cli 模式运行下命令来获取
php -r "echo base64_encode('{id:1;rid:26;token:\"xxx\"}');"

并发压测

感谢同事 https://github.com/ideal 写的压测脚本

https://github.com/zhoumengkang/netty-websocket/blob/master/benchmark.py

并测试为N个客户端,每个客户端发送10条消息,服务器配置2核4G内存,广播给所有的客户端,我们测试1000个并发的时候,负载在后期陡升。

实际情况下,不可能那么多人同时说话广播,而是说话的人少,接受广播的人多。

实际线上之后,在不限制刷帖频率大家狂轰滥炸的情况下,1500多人在线,半小时,负载一直都处于0.5以下。



目录
相关文章
|
存储 缓存 监控
直播系统聊天技术(九):千万级实时直播弹幕的技术实践
疫情期间,线上演唱会是一种很常见的直播娱乐形式,由于线下社交距离的限制,线上形式演唱会比以往更火爆,而对技术的要求也更高。 本文基于网易云信针对TFBOYS某场线上演唱会的技术支持,为你分享千万级在线用户量的直播系统中实时弹幕功能的技术实践,希望能带给你启发。
450 0
|
前端开发 机器人
赛事比分直播系统程序 快至三天搭建上线开发解决方案
采用东莞梦幻网络科技独创研发体育比分直播系统平台,功能齐全,支持各类赛事直播/比分预测/赛程数据/比分数据/赛事情报/赛事社区/微短视频/新闻话题/会员中心等模块功能。采用PHP+JAVA+VUE+Object-c的技术语言开发,PC+H5+Android+IOS多端程序。
赛事比分直播系统程序 快至三天搭建上线开发解决方案
|
开发框架 JavaScript 前端开发
梦幻体育赛事直播系统的解决方案和技术分析
以确保用户能够享受到流畅的直播体验,梦幻体育赛事直播系统需要采用一个可靠且高效的解决方案,以下是我们对该系统所需的技术和框架的分析。
梦幻体育赛事直播系统的解决方案和技术分析
|
移动开发 安全 JavaScript
各类赛事直播平台开发解决方案
方案1、使用现有的赛事直播源码进行开发(由东莞梦幻网络科技提供) 方案2、进行赛事直播系统定制开发
|
存储 消息中间件 缓存
直播系统聊天技术(七):直播间海量聊天消息的架构设计难点实践
本文将主要从高可用、弹性扩缩容、用户管理、消息分发、客户端优化等角度,分享直播间海量聊天消息的架构设计技术难点的实践经验。
1112 0
直播系统聊天技术(七):直播间海量聊天消息的架构设计难点实践
|
编解码 5G 定位技术
为完善生态拼了 - 乐视超级手机 1 体验评测
凭借拥有庞大优秀影视资源,乐视超级电视在国内智能电视市场中取得相当不错的成绩。经过一年的静默,在 4月 14 日新品发布会上,乐视宣布进军智能手机行业,一口气推出了乐 1 、乐 1 Pro 、乐 1 Max 三款超级手机,其中乐 1 最低配机型定价 1499 元,在市场上多款 1500 元价位的智能手机产品中,乐 1 的硬件配置相当大气,采用联发科 MTK helio X10 八核处理器,5.5 寸 1080p 全高清屏幕,内置 3GB LPDDR3 运行内存和 16G 存储空间,从 Android 手机经常采用堆积硬件提高性能的角度看,乐 1 是一款性价比极高的大屏智能手机。
340 0
为完善生态拼了 - 乐视超级手机 1 体验评测
|
Web App开发 编解码 JavaScript
互动直播之WebRTC服务开源技术选型
介绍了直播的基础知识,对比几种传输标得出WebRTC的优势,常见的WebRTC架构及开源方案。
4379 0
互动直播之WebRTC服务开源技术选型
|
缓存 5G 视频直播
一对一直播平台源码开发的新思路,从直播开始分析
现如今科技发展飞速,一对一直播平台开发也没有想象中的那么困难,但是如果没有相对的开发经验,开发周期可能会相对较长,也比较容易踩坑。这时候可以选择靠谱的一对一直播平台源码,再进行二次开发,节省时间和成本,还可以保证一对一直播平台源码运行的稳定性。
|
存储 编解码 人工智能
提升短视频应用体验,短视频源码要做哪些完善?
如何把短视频的产品和体验做好是开发者一直探索的问题,短视频源码又该如何打造好一款好的短视频app呢?
提升短视频应用体验,短视频源码要做哪些完善?

热门文章

最新文章