消息同步的实现

简介:

一、背景

在编程中通信可以说是使用的最频繁的了,一个好的通信框架可以很大的提高系统的稳定性和编程的简洁性。可能之前我们使用现有的框架可以不考虑消息同步的问题,但是我觉得了解一点这方面的内容还是很有帮助的。今天给大家讲讲一个比较好的消息的同步的策略。

二、为什么使用消息同步

在通信的时候使用消息同步可以降低代码的复杂度,避免使用回调来处理业务,可以使代码更加的优雅。使用封装好的消息同步模块,即使是在在编写会使用通信的代码时,也可以像编写普通的同步代码一样,在代码中完全感受不到通信的存在。(只需要调用一个方法,返回值就是服务器发送过来的消息)

三、消息同步的实现

我们先来网络通信的时序图吧。
image

下面是消息同步的实现过程
image

然后我们看看使用了消息同步策略的发送消息的时序图
image

封装好了的话我们编写需要通信的消息就可以像下面这样了
BaseMsg reponseMsg = syncSendMsg(msg); // 发送一条请求并接收响应。
怎么样,业务代码是不是很简单。下面我们看下具体的实现流程吧!
首先我们先定义一个消息对象BaseMsg,所有与外部应用通信的消息都必须使用同样的消息格式。然后定义一个处理消息的单实例对象SendMsgObject,用来发送和接收所有的消息。然后在定义一个消息等待对象RspMsgWaiter,发送一个消息后,就new一个消息等待对象,并将该对象放进一个表中,通过执行wait()方法来等待接收消息。当接收到响应后,首先先去表中查找有没有对应的消息等待对象,如果有的话,则调用该消息等待对象的notifyAll()方法,消息等待对象执行了notifyAll()方法后,等待接收对象的wait()方法就会跳过,将消息响应返回给业务层。

四:注意事项

1.每个消息必须有个唯一的消息码。系统会根据消息码寻找对应的发送消息的对象。

五、代码实例(基于Java语言)

客户端

public class BaseMsg {
    
    /**消息长度 */
    private int length;
    
    /**消息序号 */
    private int sequence; 
    
    /**消息体 */
    private byte[] msg;
    
    /**
     * 解码消息
     * @throws IOException 
     * */
    public void decodeMsg(DataInputStream inputStream) throws IOException {
        length = inputStream.readInt();
        sequence = inputStream.readInt();
        msg = new byte[length];
        inputStream.read(msg);
    }
    
    /**
     * 编码消息
     * @throws IOException 
     * */
    public void encodingMsg(OutputStream outPutStream ) throws IOException {
        DataOutputStream dataOutputStream = new DataOutputStream(outPutStream);
        dataOutputStream.writeInt(msg.length);
        dataOutputStream.writeInt(sequence);
        dataOutputStream.write(msg);
    }
    
    /**
     * 编码消息
     * @throws IOException 
     * */
    public byte[] encodingMsg() throws IOException  {
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        
        DataOutputStream dataOutputStream = new DataOutputStream(stream);
        dataOutputStream.writeInt(msg.length);
        dataOutputStream.writeInt(sequence);
        dataOutputStream.write(msg);
        return stream.toByteArray();
    }
    
    
    public Integer getLength() {
        return length;
    }

    public void setLength(Integer length) {
        this.length = length;
    }

    public byte[] getMsg() {
        return msg;
    }

    public void setMsg(byte[] msg) {
        this.msg = msg;
    }

    public Integer getSequence() {
        return sequence;
    }

    public void setSequence(Integer sequence) {
        this.sequence = (int) (Math.random() *Integer.MAX_VALUE);
        
    }
    
    
    
    @Override
    public String toString() {
        try {
            return "BaseMsg [length=" + length + ", sequence=" + sequence + ", msg=" + new String(msg,"utf-8") + "]";
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }
}


public class ClientMain {
    static int count = 0;
    
    
    public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
        MsgSendObject sendObject = MsgSendObject.getInstance(); // 得到一个消息发送对象
        
        BaseMsg msg = new BaseMsg();  // 定义一个需要发送的消息
        String msgBody = System.currentTimeMillis() + "clientSend"+ count;
        msg.setMsg(msgBody.getBytes("utf-8"));
        
        BaseMsg receiveMsg = sendObject.syncSendMsg(msg);  // 使用消息发送对象将消息发送出去,并且收到响应
        
        System.out.println("receiveMsg:" + receiveMsg);
    }
}

public class MsgSendObject {
    private static MsgSendObject INSTANEC;
    
    private Socket socket = null;
    private int DEFAULT_PORT = 9389;
    private OutputStream outStream ;
    private InputStream inputStream;
    
    private ExecutorService executors;  // 定义一个线程池
    private Map<Integer, RspMsgWaiter> msgPool = new HashMap<>();
    
    
    
    private  MsgSendObject() throws IOException {
        executors = Executors.newSingleThreadExecutor();
        
        socket = new Socket();
        socket.connect(new InetSocketAddress("127.0.0.1", DEFAULT_PORT)); // 连接到服务器
        outStream = socket.getOutputStream();
        inputStream = socket.getInputStream();
        
        receiveMsg(); // 接收消息
    }
    public static synchronized MsgSendObject getInstance() throws IOException {
        if(INSTANEC==null){
            INSTANEC = new MsgSendObject();
        }
        return INSTANEC;
    }
    
    
    /**
     * 发送同步消息
     * @throws ExecutionException 
     * @throws InterruptedException 
     * */
    public BaseMsg syncSendMsg(final BaseMsg msg) throws InterruptedException, ExecutionException {
        return sendMsg(msg, null);
    }
    /**
     * 发送同步消息
     * @param waitTime 超时时间
     * @param repertTime 重试
     * @throws ExecutionException 
     * @throws InterruptedException 
     * */
    public BaseMsg syncSendMsg(final BaseMsg msg, Long waitTime, int repertTime) throws InterruptedException, ExecutionException {
        BaseMsg retBaseMsg = null;
        for(int i = 0; i < repertTime; i ++) {
            retBaseMsg = sendMsg(msg, waitTime);
            if(retBaseMsg != null) {
                return retBaseMsg;
            }
        }
        return retBaseMsg;
    }
    
    /**
     * 发送消息
     * */
    private BaseMsg sendMsg(final BaseMsg msg, Long waitTime) throws InterruptedException, ExecutionException {
        Future<BaseMsg> baseMsgFuture = executors.submit(new Callable<BaseMsg>() {
            @Override
            public BaseMsg call() throws Exception {
                RspMsgWaiter waiter = null;
                if(waitTime == null){
                    waiter = new RspMsgWaiter();
                } else {
                    waiter = new RspMsgWaiter(waitTime);
                }
                msgPool.put(msg.getSequence(), waiter);
                outStream.write(msg.encodingMsg());
                return waiter.waitRsp();
            }
        });
        return baseMsgFuture.get();
    }
    
    
    /**
     * 接收消息
     * 
     * @throws IOException
     */
    public void receiveMsg() throws IOException {
        new Thread(){
            @Override
            public void run() {
                while (true) {
                    try {
                        BaseMsg baseMsg = new BaseMsg();
                        baseMsg.decodeMsg(new DataInputStream(inputStream));
                        if(msgPool.containsKey(baseMsg.getSequence())) {
                            RspMsgWaiter waiter = msgPool.get(baseMsg.getSequence());
                            waiter.onRsp(baseMsg);
                            msgPool.remove(baseMsg.getSequence());
                        }
                    } catch (Exception e) {

                    }
                }
            }
        }.start();
    }
    
}

interface RspCallback {
    /**
     * 超时
     * */
    public void onTimeout(long time);
    /**
     * 接收到响应
     * */
    public void onRsp(BaseMsg rsp);
    
    public BaseMsg waitRsp()  throws InterruptedException;
}

class RspMsgWaiter implements RspCallback {

    public static final int DEFAULT_WAIT_TIMEOUT = 500;
    
    private BaseMsg rspMsg;
    
    private long timeout;
    
    public RspMsgWaiter() {
        timeout = DEFAULT_WAIT_TIMEOUT; 
    }
    
    public RspMsgWaiter(long timeout) {
        this.timeout = timeout;
    }
    
    @Override
    public void onRsp(BaseMsg rsp) {
        synchronized(this) {
            rspMsg = rsp;
            notifyAll();
        }
    }

    @Override
    public void onTimeout(long time) {
        synchronized(this) {
            notifyAll();
        }
    }
    
    @Override
    public BaseMsg waitRsp() throws InterruptedException {
        //有可能waitRsp还没来得及调用,应答就来了(onRsp被调用)
        //所以这里先判断rspMsg是否为null再wait,rspMsg不为null说明应答早就来了,直接返回rspMsg
        synchronized(this) {
            if(rspMsg != null) {
                return rspMsg;
            }
            wait(timeout);
            return rspMsg;
        }
    }
}

服务端


public class ServiceMain {
    static ServerSocket server = null;
    static int default_port = 9389;
    
    public static List<Client> clientList = new LinkedList<>();
    
    public static void main(String[] args) throws IOException {
        server = new ServerSocket(default_port);
        clientConnect();
    }

    
    public static void clientConnect() throws IOException {
        new Thread(){
            public void run(){
                while(true){
                    Socket clientSocket = null;
                    try {
                        clientSocket = server.accept();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    Client client = null;
                    try {
                        client = new Client(clientSocket);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    clientList.add(client);
                }
            }
        }.start();
    }
}


class Client implements Runnable {
    private boolean isConnect = true;
    
    private InputStream inputStream;
    private OutputStream outputStream;
    public Client(Socket socket) throws IOException {
        this.inputStream = socket.getInputStream();
        this.outputStream = socket.getOutputStream();
        new Thread(this).start();
    }
    
    @Override
    public void run() {
        while(isConnect) {
            try {
                BaseMsg baseMsg = new BaseMsg();
                baseMsg.decodeMsg(new DataInputStream(inputStream));
                System.out.println("Server Receive msg"+ baseMsg.toString());
                baseMsg.setMsg(baseMsg.toString().getBytes("UTF-8"));
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                sendMsg(baseMsg);
            } catch (IOException e) {
                isConnect = false;
                ServiceMain.clientList.remove(this);
            }
        }
    }
    
    /**
     *
     * */
    private void sendMsg(BaseMsg baseMsg) throws IOException{
        baseMsg.encodingMsg(outputStream);
    }
}
目录
相关文章
|
Shell Linux 网络安全
linux系统防CC攻击自动拉黑IP增强版(Shell脚本)
linux系统防CC攻击自动拉黑IP增强版(Shell脚本)
331 0
|
机器学习/深度学习 人工智能 NoSQL
数据库与人工智能的关系
随着AI技术的飞速发展,数据库与人工智能的联系日益紧密。数据成为AI的关键部分,预计到2023年全球数据量将达到33ZB。AI通过机器学习和神经网络等方式处理数据,优化企业运营,预测模式并创造机会。数据库利用AI进行复杂数据分析,如机器学习识别销售趋势,深度学习处理和分类客户数据。悦数图数据库作为高性能图数据库,为AI提供实时、准确的数据支持,尤其在金融风控、实时推荐和知识图谱等领域展现出强大效能,推动AI在各行业的应用和发展。
|
存储 缓存 资源调度
yarn 现代的包管理工具 介绍
本文介绍了yarn作为现代的包管理工具,由Facebook开发,具有依赖管理、版本锁定、性能优化、安全性增强等优点,并提供了使用yarn进行项目初始化、添加依赖、安装依赖和运行脚本的基本命令和操作步骤。
yarn 现代的包管理工具 介绍
|
搜索推荐 API
淘宝商品数据洞察:解锁精准营销新策略
在快速变化的商业环境中,高效的营销策略对企业至关重要。通过API获取淘宝APP的商品细节数据,企业可以精准分析产品特性、强化卖点,并制定灵活的价格策略。利用用户画像实现个性化营销,选择最佳渠道并优化内容,从而提升品牌影响力。这一方法不仅帮助企业抓住目标消费者,还能增强市场竞争力,促进长期发展。
|
12月前
|
XML API 网络架构
API协议 的十种技术特点及适用场景
本文介绍了十种常见的API协议技术,包括REST、GraphQL、gRPC、SOAP、WebSocket、AMF和XML-RPC等,每种技术都有其特点和适用场景,如REST适用于轻量级Web服务开发,gRPC适合高性能分布式系统,而WebSocket则适用于需要低延迟交互的应用。
|
机器学习/深度学习 监控 TensorFlow
使用Python实现深度学习模型:智能宠物监控与管理
使用Python实现深度学习模型:智能宠物监控与管理
371 0
|
关系型数据库 BI 分布式数据库
PolarDB产品使用问题之启动报错,该怎么办
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
12月前
|
数据采集 存储 人工智能
cdga|数据治理:应对核心业务数据质量参差不齐的挑战与策略
数据治理是指通过制定并实施一系列政策、流程和技术手段,确保数据的可用性、完整性、准确性和安全性,以支持企业的决策和业务运营。对于核心业务数据质量参差不齐的问题,数据治理的重要性不言而喻
|
关系型数据库 MySQL Linux
Centos7下使用RPM包安装MySQL8
Centos7下使用RPM包安装MySQL8
1319 0