netty-socketio 示例代码

简介: socket.io是一个不错的websocket项目,github上有它的java实现:netty-socketio 及 示例项目 netty-socketio-demo,基本上看看demo示例项目就能很快上手了,但是demo中的示例代码场景为js做客户端,如果需要在java中连接websocket server,可以参考下面的示例: 一、服务端代码 package com.

socket.io是一个不错的websocket项目,github上有它的java实现:netty-socketio 及 示例项目 netty-socketio-demo,基本上看看demo示例项目就能很快上手了,但是demo中的示例代码场景为js做客户端,如果需要在java中连接websocket server,可以参考下面的示例:

一、服务端代码

package com.corundumstudio.socketio.demo.server;

import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.listener.ConnectListener;
import com.corundumstudio.socketio.listener.DataListener;
import io.socket.client.Socket;

/**
 * Created by yangjunming on 2017/1/13.
 */
public class DemoSocketServer {


    public static void main(String[] args) throws InterruptedException {

        Configuration config = new Configuration();
        config.setHostname("localhost");
        config.setPort(9092);

        final SocketIOServer server = new SocketIOServer(config);

        server.addConnectListener(new ConnectListener() {
            @Override
            public void onConnect(SocketIOClient client) {
                String token = client.getHandshakeData().getUrlParams().get("token").get(0);
                if (!token.equals("87df42a424c48313ef6063e6a5c63297")) {
                    client.disconnect();//校验token示例
                }
                System.out.println("sessionId:" + client.getSessionId() + ",token:" + token);
            }
        });


        server.addEventListener(Socket.EVENT_MESSAGE, String.class, new DataListener<String>() {
            @Override
            public void onData(SocketIOClient client, String data, AckRequest ackSender) throws Exception {
                System.out.println("client data:" + data);
                server.getBroadcastOperations().sendEvent(Socket.EVENT_MESSAGE, "hi");
            }
        });

        server.start();
        Thread.sleep(Integer.MAX_VALUE);
        server.stop();
    }

}

服务端的主要工作,就是添加各种事件的监听,然后在监听处理中,做相应的处理即可。

注:添加事件监听时,如果重复添加监听,会导致事件被处理多次,所以最好在添加事件监听前,先移除之前已经存在的监听,类似下面这样

        chat1namespace.removeAllListeners(Socket.EVENT_MESSAGE);
        chat1namespace.addEventListener(Socket.EVENT_MESSAGE, String.class,...

  

 

二、客户端代码

java连接netty-socketio,还要借助另一个开源项目:socket.io-client-java

package com.corundumstudio.socketio.demo.client;

import io.socket.client.IO;
import io.socket.client.Socket;
import io.socket.emitter.Emitter;

import java.net.URISyntaxException;

/**
 * Created by yangjunming on 2017/1/13.
 */
public class DemoSocketClient {

    public static void main(String[] args) throws URISyntaxException, InterruptedException {
        IO.Options options = new IO.Options();
        options.transports = new String[]{"websocket"};
        options.reconnectionAttempts = 2;
        options.reconnectionDelay = 1000;//失败重连的时间间隔
        options.timeout = 500;//连接超时时间(ms)

//        final Socket socket = IO.socket("http://localhost:9092/?token=123456", options);//错误的token值连接示例
        final Socket socket = IO.socket("http://localhost:9092/?token=87df42a424c48313ef6063e6a5c63297", options);

        socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                socket.send("hello");
            }
        });

        socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                System.out.println("连接关闭");
            }
        });

        socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                System.out.println("sessionId:" + socket.id());
                for (Object obj : args) {
                    System.out.println(obj);
                }
                System.out.println("收到服务器应答,将要断开连接...");
                socket.disconnect();
            }
        });
        socket.connect();
    }
}

客户端类似,也是加一些事件监听,然后做相应处理即可。

 

上面的例子,演示了client向server连接时,如何做基本的连接认证(基于token),以及基本的消息收发。

运行效果:

服务端输出

sessionId:f52e9fa3-6216-4742-87de-3228a74469f9,token:87df42a424c48313ef6063e6a5c63297
client data:hello

客户端输出

sessionId:f52e9fa3-6216-4742-87de-3228a74469f9
hi
收到服务器应答,将要断开连接...
连接关闭

注:框架已经自带了一些预设的事件,见下面的代码片段

    /**
     * Called on a successful connection.
     */
    public static final String EVENT_OPEN = "open";

    /**
     * Called on a disconnection.
     */
    public static final String EVENT_CLOSE = "close";

    public static final String EVENT_PACKET = "packet";
    public static final String EVENT_ERROR = "error";

    /**
     * Called on a connection error.
     */
    public static final String EVENT_CONNECT_ERROR = "connect_error";

    /**
     * Called on a connection timeout.
     */
    public static final String EVENT_CONNECT_TIMEOUT = "connect_timeout";

    /**
     * Called on a successful reconnection.
     */
    public static final String EVENT_RECONNECT = "reconnect";

    /**
     * Called on a reconnection attempt error.
     */
    public static final String EVENT_RECONNECT_ERROR = "reconnect_error";

    public static final String EVENT_RECONNECT_FAILED = "reconnect_failed";

    public static final String EVENT_RECONNECT_ATTEMPT = "reconnect_attempt";

    public static final String EVENT_RECONNECTING = "reconnecting";

    public static final String EVENT_PING = "ping";

    public static final String EVENT_PONG = "pong";

如果不够的话,可以自行扩展,无非就是一些字符串常量。  

 

三、广播消息隔离

前面的示例,没有"域"的概念,所有连到socket server上的client,如果收发广播的话,全都能收到,如果只希望将消息发到指定的某一"批"用户,可以让这些client归到某个域(或组织机构)里,这样在指定的域范围内广播,只有在这个域内的client才能接受广播,详见下面的示例:(其实变化很小)

server端:

package com.corundumstudio.socketio.demo.server;

import com.corundumstudio.socketio.*;
import com.corundumstudio.socketio.listener.ConnectListener;
import com.corundumstudio.socketio.listener.DataListener;
import io.socket.client.Socket;

/**
 * Created by yangjunming on 2017/1/13.
 */
public class DemoSocketServer {


    public static void main(String[] args) throws InterruptedException {
        SocketIOServer server = getServer();
        addRoom(server);
        startServer(server);
    }

    private static Configuration getConfig() {
        Configuration config = new Configuration();
        config.setHostname("localhost");
        config.setPort(9092);
        return config;
    }

    private static void handleConn(SocketIOServer server) {
        server.addConnectListener(new ConnectListener() {
            @Override
            public void onConnect(SocketIOClient client) {
                String token = client.getHandshakeData().getUrlParams().get("token").get(0);
                if (!token.equals("87df42a424c48313ef6063e6a5c63297")) {
                    client.disconnect();//校验token示例
                }
                System.out.println("sessionId:" + client.getSessionId() + ",token:" + token);
            }
        });
    }

    private static void addRoom(SocketIOServer server) {
        final SocketIONamespace chat1namespace = server.addNamespace("/room1");
        chat1namespace.addEventListener(Socket.EVENT_MESSAGE, String.class, new DataListener<String>() {
            @Override
            public void onData(SocketIOClient client, String data, AckRequest ackRequest) {
                chat1namespace.getBroadcastOperations().sendEvent(Socket.EVENT_MESSAGE, "ack:" + data);
            }
        });
    }

    private static SocketIOServer getServer() throws InterruptedException {
        final SocketIOServer server = new SocketIOServer(getConfig());
        handleConn(server);

        server.addEventListener(Socket.EVENT_MESSAGE, String.class, new DataListener<String>() {
            @Override
            public void onData(SocketIOClient client, String data, AckRequest ackSender) throws Exception {
                System.out.println("client data:" + data);
                server.getBroadcastOperations().sendEvent(Socket.EVENT_MESSAGE, "hi");
            }
        });
        return server;
    }

    private static void startServer(SocketIOServer server) throws InterruptedException {
        server.start();
        Thread.sleep(Integer.MAX_VALUE);
        server.stop();
    }

}

客户端:

package com.corundumstudio.socketio.demo.client;

import io.socket.client.IO;
import io.socket.client.Socket;
import io.socket.emitter.Emitter;

import java.net.URISyntaxException;

/**
 * Created by yangjunming on 2017/1/13.
 */
public class DemoSocketClient {

    public static void main(String[] args) throws URISyntaxException, InterruptedException {
        IO.Options options = new IO.Options();
        options.transports = new String[]{"websocket"};
        options.reconnectionAttempts = 2;
        options.reconnectionDelay = 1000;//失败重连的时间间隔
        options.timeout = 500;//连接超时时间(ms)

        //错误的token值连接示例
//        final Socket socket = IO.socket("http://localhost:9092/?token=123456", options);

        //常规连接
//        final Socket socket = IO.socket("http://localhost:9092/?token=87df42a424c48313ef6063e6a5c63297", options);

        //连接到指定的聊天室
        final Socket socket = IO.socket("http://localhost:9092/room2?token=87df42a424c48313ef6063e6a5c63297", options);

        socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                socket.send("hello");
            }
        });

        socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                System.out.println("连接关闭");
            }
        });

        socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                System.out.println("sessionId:" + socket.id());
                for (Object obj : args) {
                    System.out.println(obj);
                }
                System.out.println("收到服务器应答,将要断开连接...");
                socket.disconnect();
            }
        });
        socket.connect();
    }
}

注意上面连接时,room1的指定,其它就不多说了,代码就是最好的注释:)

目录
相关文章
|
存储 Java
Netty:NIO buffer 原理(附 示例代码)
Netty:NIO buffer 原理(附 示例代码)
Netty:NIO buffer 原理(附 示例代码)
|
JavaScript 前端开发 Java
netty-socketio
一、简介   netty-socketio是一个开源的Socket.io服务器端的一个java的实现,它基于Netty框架。项目地址为:https://github.com/mrniko/netty-socketio 。
2357 0
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13372 1
|
3月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
83 1
|
8月前
|
NoSQL Java Redis
跟着源码学IM(十二):基于Netty打造一款高性能的IM即时通讯程序
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。 原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。
133 1
|
3月前
|
编解码 前端开发 网络协议
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
88 0
|
3月前
|
编解码 安全 前端开发
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
129 0
|
10月前
|
分布式计算 网络协议 前端开发
【Netty底层数据交互源码】
【Netty底层数据交互源码】
|
10月前
|
Java 容器
【深入研究NIO与Netty线程模型的源码】
【深入研究NIO与Netty线程模型的源码】
|
编解码 弹性计算 缓存
Netty源码和Reactor模型
Netty源码和Reactor模型
85 0