四,工作机制的初步描述以及代码实现
如下图,官网以及很明确的告诉这六种使用方式了,接下来主要描述这前五种的方式,通过代码以及具体的案例,使用原生的代码方式来实现
4.1 公共代码模块
首先需要创建一个用户或者使用默认用户guest,其次的话需要创建一个虚拟机study_mq,相当于建一个数据库一样,队列可以暂时不建立,系统会自动建立
4.1.1,rabbitmq连接工具类
package com.zhs.rabbitmq.utils; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitUtils { //连接工厂 private static ConnectionFactory connectionFactory = new ConnectionFactory(); static { //设置主机 connectionFactory.setHost("*.*.*.*"); //端口号 connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号 //用户名 connectionFactory.setUsername("123456"); //密码 connectionFactory.setPassword("123456"); //设置虚拟机,相当于数据库 connectionFactory.setVirtualHost("/study_mq"); } public static Connection getConnection(){ Connection conn = null; try { //获取长连接 conn = connectionFactory.newConnection(); return conn; } catch (Exception e) { throw new RuntimeException(e); } } }
4.1.2,参数公共类
package com.zhs.rabbitmq.utils; /** * 交换机需要手动创建,否者回报异常,队列名称建议手建 * @author zhenghuisheng */ public class RabbitConstant { //队列名称 public static final String QUEUE_HELLOWORLD = "zhs_study_mq"; public static final String QUEUE_SMS = "sms"; public static final String QUEUE_BAIDU = "baidu"; public static final String QUEUE_SINA = "sina"; //交换机名称 public static final String EXCHANGE_WEATHER = "weather"; public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing"; public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic"; }
4.2 hello wrold模式
即简单的工作队列模式,首先需要手动的创建队列,不创建的话会自动创建。主要是实现简单的工作队列模式,实现一对一的方式,生产者发布消息到队列,消费者去队列中获取消息,期间也不需要显示的设置交换机,底层会默认选择使用交换机
4.2.1 消费者
package com.zhs.rabbitmq.helloworld;- import com.zhs.rabbitmq.utils.RabbitConstant; import com.zhs.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author zhenghuisheng */ public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { //获取TCP长连接 Connection conn = RabbitUtils.getConnection(); //创建通信“通道”,相当于TCP中的虚拟连接 Channel channel = conn.createChannel(); //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列 //第一个参数:队列名称ID //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失 //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问 //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列 //其他额外的参数, null channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false, false, false, null); //从MQ服务器中获取数据,即消费数据 //创建一个消息消费者 //第一个参数:队列名 //第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法,防止mq出现异常 //第三个参数要传入DefaultConsumer的实现类 channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new Reciver(channel)); } } class Reciver extends DefaultConsumer { private Channel channel; //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到 public Reciver(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body为获取的信息 String message = new String(body); System.out.println("消费者接收到的消息:"+ message); System.out.println("消息的TagId:"+ envelope.getDeliveryTag()); //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息 channel.basicAck(envelope.getDeliveryTag(), false); } }
4.2.2 生产者
package com.zhs.rabbitmq.helloworld; import com.zhs.rabbitmq.utils.RabbitConstant; import com.zhs.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Random; import java.util.concurrent.TimeoutException; /** * @author zhenghuisheng */ public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //获取TCP长连接 Connection conn = RabbitUtils.getConnection(); //创建通信“通道”,相当于TCP中的虚拟连接,即获取通道连接 Channel channel = conn.createChannel(); //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列 //第一个参数:队列名称ID //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失 //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问 //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列 //其他额外的参数, null channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false, false, false, null); //四个参数 //exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到 //队列名称 //额外的设置属性 //最后一个参数是要传递的消息字节数组 for (int i = 0; i < 50; i++) { //需要发送的数据,发送十条消息 String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ": hello_mq " + new Random().nextInt(1000); channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null,message.getBytes()); } channel.close(); conn.close(); System.out.println("===生产者数据发送成功==="); } }
4.3 工作队列模式
就比如12306,在春节高并发期间,会使用大量的短信服务器为用户发送消息购买成功或者失败的消息,由于期间并发量特别高,所以可能会选择使用mq将消息发送到mq里面,然后使用服务器客户端去mq中拉取消息,实现消息再发送到客户手中,客户端不够的话可以增加客户端,去mq中拉取消息就行了,从而实现分摊各个服务器的压力,实现方式如下
4.3.1 消息实体类
package com.zhs.rabbitmq.workqueue; import lombok.Data; /** * @author zhenghuisheng */ @Data @AllArgsConstructor @NoArgsConstructor public class SMS { //名字 private String name; //电话号码 private String mobile; //消息 private String content; }
4.3.1 消息生产者
/** * @author zhenghuisheng * 工作队列模式 * 发送者 */ public class OrderSystem { public static void main(String[] args) throws IOException, TimeoutException { //获取mq连接 Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); //队列名称 channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); for(int i = 1 ; i <= 100 ; i++) { SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功"); String jsonSMS = new Gson().toJson(sms); //交换机,队列 channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes()); } System.out.println("发送数据成功"); channel.close(); connection.close(); } }
4.3.3 消息接收者1号
public class SMSSender1 { public static void main(String[] args) throws IOException { //获取连接 Connection connection = RabbitUtils.getConnection(); //构建通道 final Channel channel = connection.createChannel(); //队列名称,是否持久化,是否私有化,是否自动删除 channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者 //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 channel.basicQos(1);//处理完一个取一个,没消费完不要去取下一个 channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("SMSSender1-短信发送成功:" + jsonSMS); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
4.4.4 消息接收者2号
public class SMSSender2 { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者,即使用轮询的方式 //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 channel.basicQos(1);//处理完一个取一个 channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("SMSSender2-短信发送成功:" + jsonSMS); try { //阻塞,根据服务器的性能设置 Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
4.4 发布订阅模式
和工作队列的方式不一样,发送者先将数据发送到交换机里面,然后将数据发送到队列中,每个队列都要发送一样的数据,没有说实现分摊压力的,这样才能保证每个服务都能发送一样的数据,从而实现每个用户都能接收到相同的数据。就像天气预报一样,天气预报发完之后,就会通知传到各个网站上,如百度和新浪都会同时接收到总台发送的消息。
4.4.1 总台发布消息
/** * @author zhenghuisheng * 发布者发布消息 */ public class WeatherBureau { public static void main(String[] args) throws Exception { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //键盘手动输入信息 String input = new BufferedReader(new InputStreamReader(System.in)).readLine(); //通过连接获取通道 Channel channel = connection.createChannel(); //第一个参数交换机名字 其他参数和之前的一样 channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , input.getBytes()); channel.close(); connection.close(); } }
4.4.2 百度接收消息
public class BiaDu { public static void main(String[] args) throws IOException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息 channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到) channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, ""); //确认一个消息之后再拿下一个消息 channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("百度天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
4.4.3 新浪接收消息
public class Sina { public static void main(String[] args) throws IOException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息,是否持久化,是否私有化,是否自动删除 channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, ""); //每次消费一条再去取下一条消息 channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("新浪天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
4.5 route路由模式
通过上面的发布订阅模式可知,每个队列都会接收到一样的消息,但是要解决每个队列接收不同的消息问题,还是需要用到这个路由模式的,如发送者想把不同的消息发送到不同的队列里面,从而发送到不同的客户端。这样的话就可以为每个消息设置唯一标识,发送的时候携带一个唯一标识,接收者接收这个唯一标识,即可以用到这个route key来实现。消息发送者在发送到交换机的时候会在消息上携带一个routing key,在交换机将发送到队列时,需要队列也指定一个routing key,这样就能实现不同的数据发送到不同的队列,从而实现不同消息发送给不同的服务接收端。如改变上面的发布订阅模式,百度的信息发给百度,新浪的信息发给新浪,这样就不会每个队列都会接收到全部消息,而是可以接收到队列中想接收到的信息。
4.5.1 天气服务端发送信息
public class WeatherBureau { public static void main(String[] args) throws Exception { Map<String,String> map = new HashMap<String, String>(); map.put("china.jiangxi.ganzhou.20220403", "中国江西赣州20220403天气数据"); map.put("china.jiangxi.nanchang.20220403", "中国江西南昌20220403天气数据"); map.put("china.jiangxi.jiujiang.20220404", "中国江西九江20220404天气数据"); map.put("china.beijin.20220403", "中国北京20220403天气数据"); map.put("china.guangdong.shenzhen.20220404", "中国广东深圳20220404天气数据"); map.put("china.guangdong.guangzhou.20220403", "中国广东广州20220403天气数据"); map.put("china.guangdong.dongguan.20220404", "中国广东东莞20220404天气数据"); map.put("china.beijin.20220404", "中国北京20220404天气数据"); Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); Iterator<Map.Entry<String, String>> itr = map.entrySet().iterator(); while (itr.hasNext()) { Map.Entry<String, String> me = itr.next(); //第一个参数交换机名字 第二个参数作为 消息的routing key channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey() , null , me.getValue().getBytes()); } channel.close(); connection.close(); } }
4.5.2 百度客户端接收消息
public class BiaDu { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.beijin.20220404"); channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.jiangxi.jiujiang.20220404"); //确认签收一条消息之后再去拿消息,默认为轮询机制 channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("百度天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
4.2.1 新浪客户端接收消息
public class Sina { public static void main(String[] args) throws IOException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息 channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null); //指定队列与交换机以及routing key之间的关系,value就是要获取的信息 //允许队列和交换机之间存在多个路由key,即存在多个路由key的消费者消费同一个队列里面的消息 channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.jiangxi.ganzhou.20220403"); channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.jiangxi.nanchang.20220403"); channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.guangdong.guangzhou.20220403"); channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.beijin.20220403"); //设置确认签收一条信息之后再拿下一条信息 channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("新浪天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
4.6 topic通配符模式
再如上面的天气,如果是天气预报发送了江西的全部市以及县的消息,江西的气象台要去获取中央获取的消息,不需要精确的获取到每个县的天气了,直接用江西的前缀获取到全部的信息,从而在江西的天气预报里面去分类以及播报,大大的减少服务器的压力以及大大增加系统的吞吐量以及速度。主要通过 * 和 # 来表示实现, * 可以匹配一个数据, # 可以匹配多个数据。
china.# :china开头都能匹配
#.street :street结尾的都能匹配
china.* :可以匹配china携带的一个数据,如china.jiangxi,china.guangdong
china.* .*:可以携带两个数据
以此类推…
4.6.1 天气服务端发送信息
public class WeatherBureau { public static void main(String[] args) throws Exception { Map<String,String> map = new HashMap<String, String>(); map.put("china.jiangxi.ganzhou.20220403", "中国江西赣州20220403天气数据"); map.put("china.jiangxi.nanchang.20220403", "中国江西南昌20220403天气数据"); map.put("china.jiangxi.jiujiang.20220404", "中国江西九江20220404天气数据"); map.put("china.beijin.20220403", "中国北京20220403天气数据"); map.put("china.guangdong.shenzhen.20220404", "中国广东深圳20220404天气数据"); map.put("china.guangdong.guangzhou.20220403", "中国广东广州20220403天气数据"); map.put("china.guangdong.dongguan.20220404", "中国广东东莞20220404天气数据"); map.put("china.beijin.20220404", "中国北京20220404天气数据"); Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); Iterator<Map.Entry<String, String>> itr = map.entrySet().iterator(); while (itr.hasNext()) { Map.Entry<String, String> me = itr.next(); //第一个参数交换机名字 第二个参数作为 消息的routing key channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, me.getKey() , null , me.getValue().getBytes()); } channel.close(); connection.close(); } }
4.6.2 百度客户端接收消息
public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null); //queueBind用于将队列与交换机绑定 //*最多只能匹配一个词 , #匹配一个或者多个词 //参数1:队列名 参数2:交互机名 参数三:路由key channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20220403"); channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.20220403"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("百度天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); }
4.6.3 新浪客户端接收消息
public class Sina { public static void main(String[] args) throws IOException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息 channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null); //指定队列与交换机以及routing key之间的关系 channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.#"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("新浪天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
初步的代码描述就基本完成了