一:背景介绍
二:消息队列
概念:
1、MQ全程为Message Queue,消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息来通信,而无需专用连接来链接它们。
2、消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
目的:
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削峰等问题。
解耦:
![譬如签到送积分,签到和送积分是两个操作。签到产生了很重要的数据,它可以把消息发送到MQ,然后积分系统需要该数据,从MQ中直接获取即可。这样签到系统就做到了和积分系统解耦,不必担心积分系统挂了怎么办,是不是需要重试等,而这些都可以在积分系统内部自己实现,再者,如果以后另外一套系统也需要该签到数据,直接从MQ中获取即可,实际上与签单系统已无关系。
异步
当做到解耦后,实现异步就是自然而然的事情,如果签到只需要1ms,而送积分,或者其他操作需要500ms,那不可能等所有操作完成之后再去返回数据给用户,这样就做到了异步。串联变并联。
同步变成异步
流量削峰
削峰是指当并发访问高峰期,通过MQ达到限流的目的,从而减少对数据库MySQL的压力,这里也用到了池化思想。
三:实现过程
解耦和异步
消息处理中心 Broker
public class Broker { private final static int MAX_SIZE = 3; private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE); public static void produce(String msg){ if(messageQueue.offer(msg)){ System.out.println("已成功向消息处理中心发送消息: " + msg + ",当前缓存的消息数量是:"+ messageQueue.size()); } else{ System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!"); } System.out.println("-----------------------------"); } public static String consume(){ String msg = messageQueue.poll(); if(msg != null){ System.out.println("已经消费的消息:" + msg + ",当前暂存消息的数量是:" + messageQueue.size()); } else { System.out.println("消息处理中心内没有可供消费的消息!"); } System.out.println("-----------------------------"); return msg; } }
BrokerSever用来提供Broker类得对外服务,BrokerSever类实现Runnable接口,实现run方法。用new Thread(Runnable target).start()方法来启动
public class BrokerSever implements Runnable{ public static int SERVICE_PORT = 9999; private final Socket socket; public BrokerSever(Socket socket){ this.socket = socket; } @Override public void run() { try( BufferedReader in = new BufferedReader(new InputStreamReader( socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream())) { while (true){ String str = in.readLine(); if (str == null){ continue; } System.out.println("接收到原始数据: " + str); if (str.equals("CONSUME")){ String message = Broker.consume(); out.println(message); out.flush(); }else { Broker.produce(str); } } } catch (Exception e){ e.printStackTrace(); } } public static void main(String[] args) throws Exception{ ServerSocket server = new ServerSocket(SERVICE_PORT); while(true){ BrokerSever brokerServer = new BrokerSever(server.accept()); new Thread(brokerServer).start(); } } }
消息生产者
public class ProduceClient { public static void main(String[] args) throws Exception{ MyClient client = new MyClient(); client.produce("hello World."); } }
消息消费者
public class ConsumeClient { public static void main(String[] args) throws Exception{ MyClient client = new MyClient(); String message = client.consume(); System.out.println("获得的消息为: " + message); } }
MyClient与消息服务器进行通信
public class MyClient { public static void produce(String message) throws Exception{ Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT); try( PrintWriter out = new PrintWriter(socket.getOutputStream()) ){ out.println(message); out.flush(); } } public static String consume() throws Exception{ Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT); try( BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream()) ){ out.println("CONSUME"); out.flush(); String message = in.readLine(); return message; } } }
流量削峰
定义一个消息生产者
@Test public void test() throws Exception { for (int i = 0; i < 1000 ; i++) { rabbitTemplate.convertAndSend("test-queue ", "消息发送); } Thread.sleep(1000 * 1000); }
使用@RabbitListener注解定义一个消息消费者
@Component @RabbitListener(queuesToDeclare = @Queue(name = "test-queue")) public class Consumer { private int count = 0; @RabbitHandler public void receive(String msg, Channel channel, Message message) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { Thread.sleep(1000); System.out.println("=====消息处理===>"); channel.basicAck(deliveryTag, true); System.out.println("current count is:" + ++count); } catch (Exception e) { } } }
四:总结
使用队列有得有失,凡事都有成本。具体上边的开门小例子如何体现得失的我们还需要研究研究。