消息同步的实现

简介:

一、背景

在编程中通信可以说是使用的最频繁的了,一个好的通信框架可以很大的提高系统的稳定性和编程的简洁性。可能之前我们使用现有的框架可以不考虑消息同步的问题,但是我觉得了解一点这方面的内容还是很有帮助的。今天给大家讲讲一个比较好的消息的同步的策略。

二、为什么使用消息同步

在通信的时候使用消息同步可以降低代码的复杂度,避免使用回调来处理业务,可以使代码更加的优雅。使用封装好的消息同步模块,即使是在在编写会使用通信的代码时,也可以像编写普通的同步代码一样,在代码中完全感受不到通信的存在。(只需要调用一个方法,返回值就是服务器发送过来的消息)

三、消息同步的实现

我们先来网络通信的时序图吧。
image

下面是消息同步的实现过程
image

然后我们看看使用了消息同步策略的发送消息的时序图
image

封装好了的话我们编写需要通信的消息就可以像下面这样了
BaseMsg reponseMsg = syncSendMsg(msg); // 发送一条请求并接收响应。
怎么样,业务代码是不是很简单。下面我们看下具体的实现流程吧!
首先我们先定义一个消息对象BaseMsg,所有与外部应用通信的消息都必须使用同样的消息格式。然后定义一个处理消息的单实例对象SendMsgObject,用来发送和接收所有的消息。然后在定义一个消息等待对象RspMsgWaiter,发送一个消息后,就new一个消息等待对象,并将该对象放进一个表中,通过执行wait()方法来等待接收消息。当接收到响应后,首先先去表中查找有没有对应的消息等待对象,如果有的话,则调用该消息等待对象的notifyAll()方法,消息等待对象执行了notifyAll()方法后,等待接收对象的wait()方法就会跳过,将消息响应返回给业务层。

四:注意事项

1.每个消息必须有个唯一的消息码。系统会根据消息码寻找对应的发送消息的对象。

五、代码实例(基于Java语言)

客户端

public class BaseMsg {
    
    /**消息长度 */
    private int length;
    
    /**消息序号 */
    private int sequence; 
    
    /**消息体 */
    private byte[] msg;
    
    /**
     * 解码消息
     * @throws IOException 
     * */
    public void decodeMsg(DataInputStream inputStream) throws IOException {
        length = inputStream.readInt();
        sequence = inputStream.readInt();
        msg = new byte[length];
        inputStream.read(msg);
    }
    
    /**
     * 编码消息
     * @throws IOException 
     * */
    public void encodingMsg(OutputStream outPutStream ) throws IOException {
        DataOutputStream dataOutputStream = new DataOutputStream(outPutStream);
        dataOutputStream.writeInt(msg.length);
        dataOutputStream.writeInt(sequence);
        dataOutputStream.write(msg);
    }
    
    /**
     * 编码消息
     * @throws IOException 
     * */
    public byte[] encodingMsg() throws IOException  {
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        
        DataOutputStream dataOutputStream = new DataOutputStream(stream);
        dataOutputStream.writeInt(msg.length);
        dataOutputStream.writeInt(sequence);
        dataOutputStream.write(msg);
        return stream.toByteArray();
    }
    
    
    public Integer getLength() {
        return length;
    }

    public void setLength(Integer length) {
        this.length = length;
    }

    public byte[] getMsg() {
        return msg;
    }

    public void setMsg(byte[] msg) {
        this.msg = msg;
    }

    public Integer getSequence() {
        return sequence;
    }

    public void setSequence(Integer sequence) {
        this.sequence = (int) (Math.random() *Integer.MAX_VALUE);
        
    }
    
    
    
    @Override
    public String toString() {
        try {
            return "BaseMsg [length=" + length + ", sequence=" + sequence + ", msg=" + new String(msg,"utf-8") + "]";
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }
}


public class ClientMain {
    static int count = 0;
    
    
    public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
        MsgSendObject sendObject = MsgSendObject.getInstance(); // 得到一个消息发送对象
        
        BaseMsg msg = new BaseMsg();  // 定义一个需要发送的消息
        String msgBody = System.currentTimeMillis() + "clientSend"+ count;
        msg.setMsg(msgBody.getBytes("utf-8"));
        
        BaseMsg receiveMsg = sendObject.syncSendMsg(msg);  // 使用消息发送对象将消息发送出去,并且收到响应
        
        System.out.println("receiveMsg:" + receiveMsg);
    }
}

public class MsgSendObject {
    private static MsgSendObject INSTANEC;
    
    private Socket socket = null;
    private int DEFAULT_PORT = 9389;
    private OutputStream outStream ;
    private InputStream inputStream;
    
    private ExecutorService executors;  // 定义一个线程池
    private Map<Integer, RspMsgWaiter> msgPool = new HashMap<>();
    
    
    
    private  MsgSendObject() throws IOException {
        executors = Executors.newSingleThreadExecutor();
        
        socket = new Socket();
        socket.connect(new InetSocketAddress("127.0.0.1", DEFAULT_PORT)); // 连接到服务器
        outStream = socket.getOutputStream();
        inputStream = socket.getInputStream();
        
        receiveMsg(); // 接收消息
    }
    public static synchronized MsgSendObject getInstance() throws IOException {
        if(INSTANEC==null){
            INSTANEC = new MsgSendObject();
        }
        return INSTANEC;
    }
    
    
    /**
     * 发送同步消息
     * @throws ExecutionException 
     * @throws InterruptedException 
     * */
    public BaseMsg syncSendMsg(final BaseMsg msg) throws InterruptedException, ExecutionException {
        return sendMsg(msg, null);
    }
    /**
     * 发送同步消息
     * @param waitTime 超时时间
     * @param repertTime 重试
     * @throws ExecutionException 
     * @throws InterruptedException 
     * */
    public BaseMsg syncSendMsg(final BaseMsg msg, Long waitTime, int repertTime) throws InterruptedException, ExecutionException {
        BaseMsg retBaseMsg = null;
        for(int i = 0; i < repertTime; i ++) {
            retBaseMsg = sendMsg(msg, waitTime);
            if(retBaseMsg != null) {
                return retBaseMsg;
            }
        }
        return retBaseMsg;
    }
    
    /**
     * 发送消息
     * */
    private BaseMsg sendMsg(final BaseMsg msg, Long waitTime) throws InterruptedException, ExecutionException {
        Future<BaseMsg> baseMsgFuture = executors.submit(new Callable<BaseMsg>() {
            @Override
            public BaseMsg call() throws Exception {
                RspMsgWaiter waiter = null;
                if(waitTime == null){
                    waiter = new RspMsgWaiter();
                } else {
                    waiter = new RspMsgWaiter(waitTime);
                }
                msgPool.put(msg.getSequence(), waiter);
                outStream.write(msg.encodingMsg());
                return waiter.waitRsp();
            }
        });
        return baseMsgFuture.get();
    }
    
    
    /**
     * 接收消息
     * 
     * @throws IOException
     */
    public void receiveMsg() throws IOException {
        new Thread(){
            @Override
            public void run() {
                while (true) {
                    try {
                        BaseMsg baseMsg = new BaseMsg();
                        baseMsg.decodeMsg(new DataInputStream(inputStream));
                        if(msgPool.containsKey(baseMsg.getSequence())) {
                            RspMsgWaiter waiter = msgPool.get(baseMsg.getSequence());
                            waiter.onRsp(baseMsg);
                            msgPool.remove(baseMsg.getSequence());
                        }
                    } catch (Exception e) {

                    }
                }
            }
        }.start();
    }
    
}

interface RspCallback {
    /**
     * 超时
     * */
    public void onTimeout(long time);
    /**
     * 接收到响应
     * */
    public void onRsp(BaseMsg rsp);
    
    public BaseMsg waitRsp()  throws InterruptedException;
}

class RspMsgWaiter implements RspCallback {

    public static final int DEFAULT_WAIT_TIMEOUT = 500;
    
    private BaseMsg rspMsg;
    
    private long timeout;
    
    public RspMsgWaiter() {
        timeout = DEFAULT_WAIT_TIMEOUT; 
    }
    
    public RspMsgWaiter(long timeout) {
        this.timeout = timeout;
    }
    
    @Override
    public void onRsp(BaseMsg rsp) {
        synchronized(this) {
            rspMsg = rsp;
            notifyAll();
        }
    }

    @Override
    public void onTimeout(long time) {
        synchronized(this) {
            notifyAll();
        }
    }
    
    @Override
    public BaseMsg waitRsp() throws InterruptedException {
        //有可能waitRsp还没来得及调用,应答就来了(onRsp被调用)
        //所以这里先判断rspMsg是否为null再wait,rspMsg不为null说明应答早就来了,直接返回rspMsg
        synchronized(this) {
            if(rspMsg != null) {
                return rspMsg;
            }
            wait(timeout);
            return rspMsg;
        }
    }
}

服务端


public class ServiceMain {
    static ServerSocket server = null;
    static int default_port = 9389;
    
    public static List<Client> clientList = new LinkedList<>();
    
    public static void main(String[] args) throws IOException {
        server = new ServerSocket(default_port);
        clientConnect();
    }

    
    public static void clientConnect() throws IOException {
        new Thread(){
            public void run(){
                while(true){
                    Socket clientSocket = null;
                    try {
                        clientSocket = server.accept();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    Client client = null;
                    try {
                        client = new Client(clientSocket);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    clientList.add(client);
                }
            }
        }.start();
    }
}


class Client implements Runnable {
    private boolean isConnect = true;
    
    private InputStream inputStream;
    private OutputStream outputStream;
    public Client(Socket socket) throws IOException {
        this.inputStream = socket.getInputStream();
        this.outputStream = socket.getOutputStream();
        new Thread(this).start();
    }
    
    @Override
    public void run() {
        while(isConnect) {
            try {
                BaseMsg baseMsg = new BaseMsg();
                baseMsg.decodeMsg(new DataInputStream(inputStream));
                System.out.println("Server Receive msg"+ baseMsg.toString());
                baseMsg.setMsg(baseMsg.toString().getBytes("UTF-8"));
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                sendMsg(baseMsg);
            } catch (IOException e) {
                isConnect = false;
                ServiceMain.clientList.remove(this);
            }
        }
    }
    
    /**
     *
     * */
    private void sendMsg(BaseMsg baseMsg) throws IOException{
        baseMsg.encodingMsg(outputStream);
    }
}
目录
相关文章
|
7月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
489 4
|
2月前
|
消息中间件 监控 Java
RocketMQ 同步发送、异步发送和单向发送,如何选择?
本文详细分析了 RocketMQ 中同步发送、异步发送和单向发送三种消息发送方式的原理、优缺点及适用场景。同步发送可靠性高但延迟较大,适合订单系统等场景;异步发送非阻塞且延迟低,适用于实时数据处理等场景;单向发送高效但可靠性低,适用于日志收集等场景。文章还提供了示例代码和核心源码分析,帮助读者更好地理解每种发送方式的特点。
232 4
|
5月前
|
消息中间件 SQL RocketMQ
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现
84 1
|
7月前
|
消息中间件
【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
【1月更文挑战第27天】【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
|
消息中间件
RabbitMQ如何支持事务性消息的发送和接收
RabbitMQ消息的发送和接收
222 0
|
7月前
|
消息中间件 存储 Java
MQ怎么确保消息不丢失
MQ怎么确保消息不丢失
|
消息中间件 存储 安全
mq 消费者监听经常断会出现丢消息的问题吗
在消息队列(MQ)系统中,消费者监听经常断开可能会导致消息丢失的问题,具体取决于消息队列系统的设计和配置,以及你的应用程序的处理方式。以下是一些可能导致消息丢失问题的情况: 1. **消费者断开连接:** 如果消费者监听过程中发生意外断开,例如网络故障、消费者应用程序崩溃等,那么在断开连接的瞬间,可能存在未被消费的消息。 2. **消息确认机制:** 消息队列通常提供消息确认机制,确保消息在被成功处理后才被从队列中移除。如果你的消费者应用程序在处理消息时没有发送确认,或者确认机制配置不正确,可能导致消息在被处理前被从队列中移除,从而丢失。 3. **持久化设置:** 消息队列通常提供持久
163 1
|
消息中间件 Kafka 测试技术
MQ 学习日志(七) 保证消息消费的顺序性
保证消息消费的顺序性
186 0
|
消息中间件 Java 数据库
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)
384 0
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)
|
消息中间件 JSON NoSQL