Android中 使用 WebSocket 实现消息通信

简介: Android中 使用 WebSocket 实现消息通信

 前言

消息推送功能可以说移动APP不可缺少的功能之一,一般简单的推送我们可以使用第三方推送的SDK,比如极光推送、信鸽推送等,但是对于消息聊天这种及时性有要求的或者三方推送不满足业务需求的,我们就需要使用WebSocket实现消息推送功能。

基本流程

WebSocket是什么,这里就不做介绍了,我们这里使用的开源框架是https://github.com/TakahikoKawasaki/nv-websocket-client

基于开源协议我们封装实现WebSocket的连接、注册、心跳、消息分发、超时任务功能,基本流程如下:

image.gifimage.gif

连接功能

首先我们新建一个项目,在build.grade中添加配置

compile 'com.neovisionaries:nv-websocket-client:2.2'

image.gif

新建websocket管理类WsManger

public class WsManager {
    private volatile static WsManager wsManger;
    private WsManager() {
    }
    public static WsManager getWsManger() {
        if (wsManger == null) {
            synchronized (WsManager.class) {
                if (wsManger == null) {
                    wsManger = new WsManager();
                }
            }
        }
        return wsManger;
    }
}

image.gif

接下来添加连接方法,我们将webSocket的状态分为三种,新建WsStatue枚举类对应起来

public enum WsStatus {
    /**
     * 连接成功
     */
    CONNECT_SUCCESS,
    /**
     * 连接失败
     */
    CONNECT_FAIL,
    /**
     * 正在连接
     */
    CONNECTING;
}

image.gif

连接方法如下所示:

/**
 * 连接方法 这里要判断是否登录 此处省略
 */
public void connect() {
    //WEB_SOCKET_API 是连接的url地址,
    // CONNECT_TIMEOUT是连接的超时时间 这里是 5秒
    try {
        ws = new WebSocketFactory().createSocket(WEB_SOCKET_API, CONNECT_TIMEOUT)
                //设置帧队列最大值为5
                .setFrameQueueSize(5)
                //设置不允许服务端关闭连接却未发送关闭帧
                .setMissingCloseFrameAllowed(false)
                //添加回调监听
                .addListener(new WsListener())
                //异步连接
                .connectAsynchronously();
    } catch (IOException e) {
        e.printStackTrace();
    }
    setStatus(WsStatus.CONNECTING);
}

image.gif

调用连接方法后 我们来看连接的回调 也就是WsListener

/**
 * websocket回调事件
 */
private class WsListener extends WebSocketAdapter {
    @Override
    public void onConnected(WebSocket websocket, Map<String, List<String>> headers) throws Exception {
        Log.d(TAG, "onConnected: 连接成功");
    }
    @Override
    public void onConnectError(WebSocket websocket, WebSocketException exception) throws Exception {
        Log.d(TAG, "onConnectError: 连接失败");
    }
    @Override
    public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame,
                               WebSocketFrame clientCloseFrame,
                               boolean closedByServer) throws Exception {
        Log.d(TAG, "onDisconnected: 断开连接");
    }
    @Override
    public void onTextMessage(WebSocket websocket, String text) throws Exception {
        Log.d(TAG, "onTextMessage: 收到消息:" + text);
    }
}

image.gif

下面我们调用连接方法

WsManager.getWsManger().connect();

image.gif

运行项目我们可以看到如下打印:

image.gif

此处我们要做的处理是,如果收到连接失败或者断开连接的回调 需要重新连接,我们重新调用一次连接方法即可,并且如果超过三次重连失败,我们在业务中可以通过调用接口来获取数据,避免数据丢失,此处细节省略。

协议封装

此处协议如下所示:

{

   "action":"",

   "requestChild":{

       "clientType":"",

       "id":""

   }

}

心跳、发送请求都属于客户端主动发送请求,对于请求结果我们分为成功和失败以及超时,发送超时我们是收不到服务器任何回复的,所以我们需要在发送之后将发送放在超时任务队列中,如果请求成功将任务从超时队列中移除,超时从超时队列中获取任务重新请求。

超时任务队列中回调有成功、失败、超时。

我们按照上述协议,新增对应实体类,采用Builder设计模式

public class Request {
    /**
     * 行为
     */
    private String action;
    /**
     * 请求体
     */
    private RequestChild req;
    /**
     * 请求次数
     */
    private transient int reqCount;
    /**
     * 超时的时间
     */
    private transient int timeOut;
    public Request() {
    }
    public Request(String action, int reqCount, int timeOut, RequestChild req) {
        this.action = action;
        this.req = req;
        this.reqCount = reqCount;
        this.timeOut = timeOut;
    }
    public static class Builder {
        //action 请求类型
        private String action;
        //请求子类数据 按照具体业务划分
        private RequestChild req;
        //请求次数 便于重试
        private int reqCount;
        //超时时间
        private int timeOut;
        public Builder action(String action) {
            this.action = action;
            return this;
        }
        public Builder req(RequestChild req) {
            this.req = req;
            return this;
        }
        public Builder reqCount(int reqCount) {
            this.reqCount = reqCount;
            return this;
        }
        public Builder timeOut(int timeOut) {
            this.timeOut = timeOut;
            return this;
        }
        public Request build() {
            return new Request(action, reqCount, timeOut, req);
        }
    }
}

image.gif

public class RequestChild {
    /**
     * 设备类型
     */
    private String clientType;
    /**
     * 用于用户注册的id
     */
    private String id;
    public RequestChild(String clientType, String id) {
        this.clientType = clientType;
        this.id = id;
    }
    public RequestChild() {
    }
    public static class Builder {
        private String clientType;
        private String id;
        public RequestChild.Builder setClientType(String clientType) {
            this.clientType = clientType;
            return this;
        }
        public RequestChild.Builder setId(String id) {
            this.id = id;
            return this;
        }
        public RequestChild build() {
            return new RequestChild(clientType, id);
        }
    }
}

image.gif

我们添加一个发送请求的方法如下:

/**
 * 发送请求
 *
 * @param request        请求体
 * @param reqCount       请求次数
 * @param requestListern 请求回调
 */
private void senRequest(Request request, final int reqCount, final RequestListern requestListern) {
    if (!isNetConnect()) {
        requestListern.requestFailed("网络未连接");
        return;
    }
}

image.gif

请求回调如下所示

public interface RequestListern {
    /**
     * 请求成功
     */
    void requestSuccess();
    /**
     * 请求失败
     *
     * @param message 请求失败消息提示
     */
    void requestFailed(String message);
}

image.gif

接着我们要把请求放在超时队列中,新建超时任务类,对应的分别是请求参数、请求回调、任务调度

public class TimeOutTask {
    /**
     *  请求主体
     */
    private Request request;
    /**
     * 通用返回
     */
    private RequestCallBack requestCallBack;
    /**
     * r任务
     */
    private ScheduledFuture scheduledFuture;
    public TimeOutTask(Request request,
                           RequestCallBack requestCallBack,
                           ScheduledFuture scheduledFuture) {
        this.request = request;
        this.requestCallBack = requestCallBack;
        this.scheduledFuture = scheduledFuture;
    }
    public ScheduledFuture getScheduledFuture() {
        return scheduledFuture;
    }
    public void setScheduledFuture(ScheduledFuture scheduledFuture) {
        this.scheduledFuture = scheduledFuture;
    }
    public Request getRequest() {
        return request;
    }
    public void setRequest(Request request) {
        this.request = request;
    }
    public RequestCallBack getRequestCallBack() {
        return requestCallBack;
    }
    public void setRequestCallBack(RequestCallBack requestCallBack) {
        this.requestCallBack = requestCallBack;
    }
}

image.gif

RequestCallBack是超时任务的回调,只是比请求回调多了个超时,因为超时的处理机制是一样的,所以这里我们没必要将超时回调到请求中

public interface RequestCallBack {
    /**
     * 请求成功
     */
    void requestSuccess();
    /**
     * 请求失败
     *
     * @param request  请求体
     * @param message  请求失败的消息
     */
    void requestFailed(String message, Request request);
    /**
     * 请求超时
     *
     * @param request  请求体
     */
    void timeOut(Request request);
}

image.gif

/**
 * 添加超时任务
 */
private ScheduledFuture enqueueTimeout(final Request request, final long timeout) {
    Log.d(TAG, "  " + "enqueueTimeout: 添加超时任务类型为:" + request.getAction());
    return executor.schedule(new Runnable() {
        @Override
        public void run() {
            TimeOutTask timeoutTask = callbacks.remove(request.getAction());
            if (timeoutTask != null) {
                timeoutTask.getRequestCallBack().timeOut(timeoutTask.getRequest());
            }
        }
    }, timeout, TimeUnit.MILLISECONDS);
}

image.gif

超时任务的方法 是通过任务调度定时调用,请求成功后我们会把超时任务移除,当到了超时时间时,任务还存在就说明任务超时了。

每次的任务我们以action为键值存在hashMap中

private Map<String, CallbackWrapper> callbacks = new HashMap<>();

image.gif

将任务放入超时任务代码如下所示:

final ScheduledFuture timeoutTask = enqueueTimeout(request, request.getTimeOut());
final RequestCallBack requestCallBack = new RequestCallBack() {
    @Override
    public void requestSuccess() {
        requestListern.requestSuccess();
    }
    @Override
    public void requestFailed(String message, Request request) {
        requestListern.requestFailed(message);
    }
    @Override
    public void timeOut(Request request) {
        timeOutHanlder(request);
    }
};
callbacks.put(request.getAction(),
        new CallbackWrapper(request, requestCallBack, timeoutTask));

image.gif

一般而言,任务超时都是由于连接原因导致,所以我们这里可以尝试重试一次,如果还是超时,通过 timeOutHanlder(request);方法 进行重新连接,重连代码和连接代码一样,这里就省略了,做好这步操作,我们就可以发送消息了。

/**
 * 超时任务
 */
private void timeOutHanlder(Request requset) {
    setStatus(WsStatus.CONNECT_FAIL);
    //这里假装有重连
    Log.d(TAG, "timeOutHanlder: 请求超时 准备重连");
}

image.gif

到这里我们的流程基本可以走通了。

心跳

首先我们要了解下心跳的作用是什么,心跳是在连接成功后,通过固定的间隔时间向服务器发送询问,当前是否还在线,有很多人说心跳失败我们就重连,成功就继续心跳,但是这里要注意的是,我们一般是收不到心跳失败回调的,心跳也是向服务器发送数据,所以我们要将所有的主动请求都放在超时任务队列中,

所以对websocket来说 请求结果有三种:成功、失败、超时,对于用户 只有成功、失败即可。

至于心跳、注册等请求发送的数据是什么,这就得看我们与服务端定的协议是什么样了,通常来说 分为action 和 requestBody,协议格式我们再第二步已经封装好了,这里我们以心跳任务为例验证上面的封装。

/**
 * 心跳
 */
void keepAlive() {
    Request request = new Request.Builder()
            .reqCount(0)
            .timeOut(REQUEST_TIMEOUT)
            .action(ACTION_KEEPALIVE).build();
    WsManager.getWsManger().senRequest(request, request.getReqCount() + 1, new RequestListern() {
        @Override
        public void requestSuccess() {
            Log.d(TAG, "requestSuccess: 心跳发送成功了");
        }
        @Override
        public void requestFailed(String message) {
        }
    });
}

image.gif

我们每间隔10s中开启一次心跳任务

/**
 * 开始心跳
 */
public void startKeepAlive() {
    mHandler.postDelayed(mKeepAliveTask, HEART_BEAT_RATE);
}

image.gif

/**
 * 心跳任务
 */
private Runnable mKeepAliveTask = new Runnable() {
    @Override
    public void run() {
        keepAlive();
        mHandler.removeCallbacks(mKeepAliveTask);
        mHandler.postDelayed(mKeepAliveTask, HEART_BEAT_RATE);
    }
};

image.gif

为了便于操作演示,在主页面上加个按钮 ,点击按钮调用startKeepAlive方法,运行如下所示:

image.gif

我们可以看到心跳返回的statue是300 不成功,5秒之后走到了请求超时的方法中,所以如果状态返回成功的话,我们需要回调给调用者

/**
 * 处理 任务回调
 *
 * @param action 请求类型
 */
void disPatchCallbackWarp(String action, boolean isSuccess) {
    CallbackWrapper callBackWarp = callbacks.remove(action);
    if (callBackWarp == null) {
        Logger.d(TAG+"  "+ "disPatchCallbackWarp: 任务队列为空");
    } else {
        callBackWarp.getScheduledFuture().cancel(true);
        if (isSuccess) {
            callBackWarp.getRequestCallBack().requestSuccess();
        } else {
            callBackWarp.getRequestCallBack().requestFailed("", new Request());
        }
    }
}

image.gif

这样调用者才知道成功或失败。

发送其他消息与心跳一样,只是请求参数不同而已,修改Request参数即可。这样我们根据协议和业务就实现一个比较规范的webSocket消息推送流程了。

目录
相关文章
|
5月前
|
前端开发 网络协议 JavaScript
在Spring Boot中实现基于WebSocket的实时通信
在Spring Boot中实现基于WebSocket的实时通信
|
1月前
|
Kubernetes Cloud Native JavaScript
为使用WebSocket构建的双向通信应用带来基于服务网格的全链路灰度
介绍如何使用为基于WebSocket的云原生应用构建全链路灰度方案。
|
3月前
|
Java Android开发 数据安全/隐私保护
Android中多进程通信有几种方式?需要注意哪些问题?
本文介绍了Android中的多进程通信(IPC),探讨了IPC的重要性及其实现方式,如Intent、Binder、AIDL等,并通过一个使用Binder机制的示例详细说明了其实现过程。
349 4
|
5月前
|
Java Android开发 Spring
Android Spingboot 实现SSE通信案例
【7月更文挑战第14天】以下是使用Android和Spring Boot实现SSE(Server-Sent Events)通信的案例摘要: 在`MainActivity`中: - 初始化界面元素并设置按钮点击事件。 - `startSseRequest`方法创建`WebClient`对象,设置请求头,发送请求,并处理响应和错误。 请确保将`your-server-url`替换为实际的服务器地址。
128 14
|
4月前
|
Android开发
Android项目架构设计问题之C与B通信如何解决
Android项目架构设计问题之C与B通信如何解决
18 0
|
4月前
|
移动开发 前端开发 weex
Android项目架构设计问题之模块化后调用式通信如何解决
Android项目架构设计问题之模块化后调用式通信如何解决
19 0
|
5月前
|
前端开发 JavaScript API
探索Python Django中的WebSocket集成:为前后端分离应用添加实时通信功能
【7月更文挑战第17天】现代Web开发趋势中,前后端分离配合WebSocket满足实时通信需求。Django Channels扩展了Django,支持WebSocket连接和异步功能。通过安装Channels、配置设置、定义路由和消费者,能在Django中实现WebSocket交互。前端使用WebSocket API连接后端,实现双向数据流,如在线聊天功能。集成Channels提升Web应用的实时性和用户体验,适应实时交互场景的需求。**
207 6
|
5月前
|
安全 数据安全/隐私保护 UED
优化用户体验:前后端分离架构下Python WebSocket实时通信的性能考量
【7月更文挑战第17天】前后端分离趋势下,WebSocket成为实时通信的关键,Python有`websockets`等库支持WebSocket服务。与HTTP轮询相比,WebSocket减少延迟,提高响应。连接管理、消息传输效率、并发处理及安全性是性能考量重点。使用WebSocket能优化用户体验,尤其适合社交、游戏等实时场景。开发应考虑场景需求,充分利用WebSocket优势。
155 3
|
5月前
|
前端开发 Python
前后端分离的进化:Python Web项目中的WebSocket实时通信解决方案
【7月更文挑战第18天】在Python的Flask框架中,结合Flask-SocketIO库可轻松实现WebSocket实时通信,促进前后端分离项目中的高效交互。示例展示了一个简单的聊天应用:Flask路由渲染HTML,客户端通过Socket.IO库连接服务器,发送消息并监听广播。此方法支持多种实时通信协议,适应不同环境,提供流畅的实时体验。
98 3
|
5月前
|
JavaScript 前端开发 UED
WebSocket在Python Web开发中的革新应用:解锁实时通信的新可能
【7月更文挑战第16天】WebSocket是实现Web实时通信的协议,与HTTP不同,它提供持久双向连接,允许服务器主动推送数据。Python有多种库如websockets和Flask-SocketIO支持WebSocket开发。使用Flask-SocketIO的简单示例包括定义路由、监听消息事件,并在HTML中用JavaScript建立连接。WebSocket提高了实时性、减少了服务器压力,广泛应用于聊天、游戏等场景。
51 1