Apache Flex是基于MXML和ActionScript的Flash程序设计框架,可以快速开发RIA(富Internet应用)程序,Netty是JAVA实现的高性能的网络通信框架,可以快速构建网络应用的服务端。即时通讯现在已经非常普遍了,本文以简单的WEB版聊天为例浅谈IM系统的设计与原理,最后再探讨下大规模集群下的服务端的瓶颈与解决思路。
1、可用的方案选型
(1)socket
基于浏览器的Flash插件的Socket/XMLSocket,可以采用Flex实现
(2)websocket
HTML5的websocket是非常新的技术,但需要浏览器的支持,面临的问题是老版本的浏览器不可用。
(3)HTTP
基于AJAX的长轮询和基于HTTP流,开源的有Comet架构的Pushlet
(4)JavaFX与Java Applet
本文采用的是方案1,用Flex编写客户端应用,目前我还没有研究如何将Flash与JS的混合,而是完全采用的Flex技术。
关于Flex的学习,我推荐到Apache上去找,也可以通过阅读《Flex实战》学习,我目前还对Flex了解很少,只能进行简单的编程,编写Flex程序前我只看了两天相关的资料,而且只用到了Socket(Flex中的一个知识点),主要还是ActionScript的学习,ActionScript与JavaScript很类似,能看懂JS的应该也能看懂AS,这里不过多描述。
2、通讯协议设计
在即时通讯中使用的协议很多,熟知的有Jabbe/XMPP/SIP等,还有很多私有的自定义协议。协议是双方协定的数据交互规则,具体采用什么样的协议要根据场景和需求,我为了实现通讯功能,简单地设计了一个规则,具体如下:
我的使用场景包括用户的登录、注销、获取通讯录和发送消息,对于应用场景复杂的应该要考虑更多的情况,根据我在工作中的经验,通讯协议不是一下就设计好的,而是在开发过程进行不断修改与完善,可以说没有协议的设计只能遵循具体的原则,没有最终版。
我在工作中原本是基于XMPP开发的,由于做的是移动互联的应用,受限于移动网络的网速,而XMPP的协议过于庞大,对用户的流量需求太高,为此我开始寻找XMPP的替代品,我最开始选的方案是Google的ProtoBuf,类似的还有Apache的Thrift,这两者都是二进制级别的编码,虽然两者的压缩程度和性能都非常好,但在通讯协议方面不太适合,因为编码后没有可读性,出了问题不好定位。后来我在工作选了JSON来设计,相比XML来说拓展性与性能都要好很多。
3、客户端程序设计
客户端采用的是Flex,使用的集成开发工具是IntelliJ IDEA,基于Apache Flex SDK
根据通讯协议的设计进行客户端的开发,具体代码如下:
4、服务端设计
服务端采用的是Netty,设计方案如下:
根据方案开发的主要代码如下:
1、可用的方案选型
(1)socket
基于浏览器的Flash插件的Socket/XMLSocket,可以采用Flex实现
(2)websocket
HTML5的websocket是非常新的技术,但需要浏览器的支持,面临的问题是老版本的浏览器不可用。
(3)HTTP
基于AJAX的长轮询和基于HTTP流,开源的有Comet架构的Pushlet
(4)JavaFX与Java Applet
本文采用的是方案1,用Flex编写客户端应用,目前我还没有研究如何将Flash与JS的混合,而是完全采用的Flex技术。
关于Flex的学习,我推荐到Apache上去找,也可以通过阅读《Flex实战》学习,我目前还对Flex了解很少,只能进行简单的编程,编写Flex程序前我只看了两天相关的资料,而且只用到了Socket(Flex中的一个知识点),主要还是ActionScript的学习,ActionScript与JavaScript很类似,能看懂JS的应该也能看懂AS,这里不过多描述。
2、通讯协议设计
在即时通讯中使用的协议很多,熟知的有Jabbe/XMPP/SIP等,还有很多私有的自定义协议。协议是双方协定的数据交互规则,具体采用什么样的协议要根据场景和需求,我为了实现通讯功能,简单地设计了一个规则,具体如下:
我的使用场景包括用户的登录、注销、获取通讯录和发送消息,对于应用场景复杂的应该要考虑更多的情况,根据我在工作中的经验,通讯协议不是一下就设计好的,而是在开发过程进行不断修改与完善,可以说没有协议的设计只能遵循具体的原则,没有最终版。
我在工作中原本是基于XMPP开发的,由于做的是移动互联的应用,受限于移动网络的网速,而XMPP的协议过于庞大,对用户的流量需求太高,为此我开始寻找XMPP的替代品,我最开始选的方案是Google的ProtoBuf,类似的还有Apache的Thrift,这两者都是二进制级别的编码,虽然两者的压缩程度和性能都非常好,但在通讯协议方面不太适合,因为编码后没有可读性,出了问题不好定位。后来我在工作选了JSON来设计,相比XML来说拓展性与性能都要好很多。
3、客户端程序设计
客户端采用的是Flex,使用的集成开发工具是IntelliJ IDEA,基于Apache Flex SDK
根据通讯协议的设计进行客户端的开发,具体代码如下:
<?xml version="1.0"?>
<s:Application xmlns:fx="http://ns.adobe.com/mxml/2009" xmlns:s="library://ns.adobe.com/flex/spark">
<fx:Script><![CDATA[
import mx.controls.Alert;
var socket:Socket = null;
// 初始化连接
public function conn():void
{
socket = new Socket();
// 增加事件监听处理
socket.addEventListener(Event.CLOSE,closeHandler);
socket.addEventListener(Event.CONNECT,connectHandler);
socket.addEventListener(ProgressEvent.SOCKET_DATA, socketDataHandler);
// 建立连接
socket.connect('127.0.0.1',8888);
}
// 监听关闭事件
private function closeHandler(event:Event):void
{
trace("closeHandler: " + event);
Alert.show('closed!');
}
// 监听连接成功事件
private function connectHandler(event:Event):void
{
trace("connectHandler: " + event);
// 注册用户
setName();
Alert.show('connected!');
}
// 处理接收消息
private function socketDataHandler(event:ProgressEvent):void
{
var str:String = socket.readUTFBytes(socket.bytesAvailable);
trace("receive data : " + str);
// 沙箱处理
if(str.indexOf("<?xml version=\"1.0\"?>") == 0){
//Alert.show(str);
} else if (str.indexOf("ROSTER:") == 0){
this.roster.text = str; // 处理通讯录
} else {
this.content.appendText(str +"\n"); // 普通消息
}
}
// 点击按钮发送消息,内容为输入框中的文本
public function send():void
{
var message:String = this.messageField.text;
trace("client send : " + message);
socket.writeUTFBytes("MSG:" + this.receive.text + "#" + this.setname.text + ":" + message);
socket.flush();
// 设置对话框展示效果
this.content.appendText(this.setname.text + ":" + this.messageField.text +"\n");
this.messageField.text = '';
}
// 发送字符串函数,用户注册时使用
private function sendMsg(str:String):void
{
trace("client send : " + str);
socket.writeUTFBytes(str);
socket.flush();
}
// 点击关闭
public function close():void
{
trace("close the connect");
var nickname:String = this.setname.text;
// 根据注册的用户注销用户
sendMsg("QUIT:" + nickname);
// 关闭连接
socket.close();
}
// 设置用户名,用于注册
public function setName():void{
var nickname:String = this.setname.text;
sendMsg("AUTH:" + nickname);
}
]]></fx:Script>
<s:Label text="用户名:" x="10" y="10" />
<s:TextInput x="50" y="0" width="100" height="31" id="setname"/>
<s:Button click="conn()" label="连接" x="160" y="0" width="60" height="31"/>
<s:Label text="接收者:" x="10" y="50"/>
<s:TextInput x="50" y="40" width="100" height="31" id="receive"/>
<s:TextInput x="160" y="40" width="200" height="31" id="messageField"/>
<s:Button click="send()" label="发送" x="370" y="40" width="60" height="31"/>
<s:Button click="close()" label="关闭" x="440" y="40" width="60" height="31"/>
<s:Label text="消息:" x="10" y="100"/>
<s:Label text="通讯录:" x="320" y="100"/>
<s:TextArea x="10" y="130" width="300" height="100" id="content"/>
<s:TextArea x="320" y="130" width="200" height="100" id="roster"/>
</s:Application>
4、服务端设计
服务端采用的是Netty,设计方案如下:
根据方案开发的主要代码如下:
package org.jcluster.im.server;
import org.jcluster.im.component.ComponentManager;
import org.jcluster.im.component.InterpreterComponent;
import org.jcluster.im.handler.ConnectionHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class StartServer {
private int port;
public StartServer(int port) {
this.port = port;
}
public void run() throws Exception {
// Acceptor:threads default is availableProcessors * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(2);
// Handler
EventLoopGroup workerGroup = new NioEventLoopGroup(4);
try {
ServerBootstrap server = new ServerBootstrap();
ChannelHandler handler = new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ConnectionHandler());
}
};
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(handler)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Start the client
ChannelFuture future = server.bind(port).sync();
InterpreterComponent component = new InterpreterComponent();
ComponentManager.getInstance().addComponent("test", component);
System.out.println("IM Server start");
// Wait until the connection is closed
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new StartServer(8888).run();
}
}
package org.jcluster.im.handler;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.Iterator;
import org.jcluster.im.session.LocalChannelManger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class ConnectionHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
//SocketAddress address = ctx.channel().remoteAddress();
//LocalChannelManger.getInstance().addContext(address.toString(), ctx);
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
LocalChannelManger.getInstance().removeContext(ctx);
syncRoster();
SocketAddress address = ctx.channel().remoteAddress();
System.out.println(address.toString() + "channelUnregistered");
int count = LocalChannelManger.getInstance().staticClients();
System.out.println("current clients : " + count);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf in = (ByteBuf) msg;
String message = in.toString(Charset.forName("UTF-8"));
// Flash沙箱处理
String xml = "<?xml version=\"1.0\"?><cross-domain-policy><site-control permitted-cross-domain-policies=\"all\"/><allow-access-from domain=\"*\" to-ports=\"*\"/></cross-domain-policy>\0";
if(message.trim().equals("<policy-file-request/>")){
ctx.writeAndFlush(Unpooled.copiedBuffer(xml,CharsetUtil.UTF_8));
}
if(message.startsWith("AUTH:")){
String name = (message.split(":"))[1];
LocalChannelManger.getInstance().addContext(name, ctx);
int count = LocalChannelManger.getInstance().staticClients();
System.out.println("current clients : " + count);
syncRoster();
} else if (message.startsWith("MSG:")){
String content = message.substring(4);
String[] temp = content.split("#");
String to = temp[0];
String body = "";
for(int i=1;i<temp.length;i++){
if(i > 1){
body += "#";
}
body += temp[i];
}
if(LocalChannelManger.getInstance().isAvailable(to)){
LocalChannelManger.getInstance().getContext(to).writeAndFlush(Unpooled.copiedBuffer(body,CharsetUtil.UTF_8));
}
} else if (message.startsWith("QUIT:")){
String name = (message.split(":"))[1];
LocalChannelManger.getInstance().removeContext(name);
int count = LocalChannelManger.getInstance().staticClients();
System.out.println("current clients : " + count);
syncRoster();
}
System.out.println(message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
//ctx.close();
//System.out.println("server closed!");
}
// update all clients roster
private void syncRoster(){
String respone = "ROSTER:";
for(String s : LocalChannelManger.getInstance().getAll()){
respone += s + ",";
}
Iterator<ChannelHandlerContext> it = LocalChannelManger.getInstance().getAllClient().iterator();
while(it.hasNext()){
it.next().writeAndFlush(Unpooled.copiedBuffer(respone,CharsetUtil.UTF_8));
}
}
}
用户的会话管理设计:


package org.jcluster.im.session;
import io.netty.channel.ChannelHandlerContext;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class LocalChannelManger {
// 存储用户名与连接上下文对象的映射
final private Map<String, ChannelHandlerContext> sessions = new ConcurrentHashMap<String, ChannelHandlerContext>();
// 存储连接上下文与用户名的映射
final private Map<String, String> relations = new ConcurrentHashMap<String, String>();
private static LocalChannelManger instance = new LocalChannelManger();
public static LocalChannelManger getInstance(){
return instance;
}
// 增加用户与连接的上下文映射
public void addContext(String name, ChannelHandlerContext ctx){
synchronized (sessions) {
sessions.put(name, ctx);
relations.put(ctx.toString(), name);
}
}
// 获取指定用户的连接上下文
public ChannelHandlerContext getContext(String name){
return sessions.get(name);
}
// 根据用户名删除session
public void removeContext(String name){
sessions.remove(name);
}
// 判断指定的用户名当前是否在线
public boolean isAvailable(String name){
return sessions.containsKey(name) && (sessions.get(name) != null);
}
// 获取所有的用户名
public synchronized Set<String> getAll(){
return sessions.keySet();
}
// 获取所有连接的上下文对象
public synchronized Collection<ChannelHandlerContext> getAllClient(){
return sessions.values();
}
// 根据上下文删除用户session
public void removeContext(ChannelHandlerContext ctx){
String name = relations.get(ctx.toString());
if(name != null){
sessions.remove(name);
relations.remove(ctx.toString());
}
}
// 统计当前在线人数
public int staticClients(){
return relations.size();
}
}
5、系统可拓展性
当用户量大时,服务端布署就多了,实际生产环境下是有server to server的消息转发,即登录在一台的用户发消息给登录在另一台的用户,这时候系统的瓶颈就出现了,我模仿网络中TCP报文的路由思想,写了一个路由策略,具体代码如下:
package org.jcluster.im;
import java.util.Map;
import java.util.Map.Entry;
/**
* 消息路由器
* @author zhaowen
*
*/
public class RouteServer {
// key:IP, value:IP and UDP Port
private Map<String,String> routeTable;
// userId/device,chanelId
private Map<String,String> localSession;
// off-line message push
private OfflinePush offlinePush;
// local session route
private LocalRouter localRouter;
public void route(Package pack){
if(localSession.containsKey(pack.getTo().toString())){
// localRoute
localRouter.route(pack);
} else {
// route track count
int count = 0;
for(Entry<String,String> node : routeTable.entrySet()){
if(!pack.getTrack().containsKey(node.getKey())){
// localHost add to the track
pack.getTrack().put("localNode", "IP");
// route to the next RouteServer though UDP or MQ
UDPUtil.send(pack, node.getValue());
} else {
// the route track count+1
count++;
}
}
if(count >= routeTable.size()){
// off-line message push
offlinePush.push(pack.getMsg());
}
}
}
// 观察者模式,消息监听器
public class Task implements RouteListener{
@Override
public void receive(Package pack) {
route(pack);
}
}
public Task newTask(){
return new Task();
}
}
6、测试
服务器日志: IM Server start channelActive current clients : 1 AUTH:hello channelActive current clients : 2 AUTH:jj MSG:hello#jj:dfasd#asddsf#df$%^#$^$#^ MSG:jj#hello:sdfas#@$@#SDGSgQ#$!@#$!^$%& MSG:jj#hello:#SDGSgQ#$!@#$!^$%& MSG:jj#hello:#SDGSgQ#$!@#$!^$%& MSG:jj#hello:#SDGSgQ#$!@#$!^$%& MSG:jj#hello:dfasd#SDGSgQ#$!@#$!^$%& MSG:jj#hello:dsfad#SDGSgQ#$!@#$!^$%& MSG:jj#hello:#SDGSgQ#$!@#$!^$%& MSG:jj#hello:#SDGSgQ#$!@#$!^$%& MSG:jj#hello:#SDGSgQ#$!@#$!^$%& MSG:jj#hello:#SDGSgQ#$!@#$!^$%& MSG:jj#hello:#SDGSgQ#$!@#$!^$%& MSG:jj#hello:#SDGSgQ#$!@#$!^$%& MSG:jj#hello:#SDGSgQ#$!@#$!^$%& MSG:jj#hello:#SDGSgQ#$!@#$!^$%& MSG:jj#hello:#SDGSgQ#$!@#$!^$%&sdfsafs MSG:jj#hello:asdf#SDGSgQ#$!@#$!^dfasfsdf$%& /127.0.0.1:60385channelUnregistered current clients : 1 /127.0.0.1:60386channelUnregistered current clients : 0