52分布式电商项目 - ActiveMQ例子

简介: 52分布式电商项目 - ActiveMQ例子

代码已上传值Github

地址:https://github.com/ylw-github/ActiveMQ-Demo.git

点对点模式

点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向 ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在 activemq 服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上 ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息。

消息生产者

创建工程 jmsDemo ,引入依赖

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.11.2</version>
    </dependency>

创建类 QueueProducer main 方法代码如下:

/**
 * =======================================================
 *
 * @desc: 消息生产者
 * @version: V1.0
 * @FileName: com.pyg.p2p QueueProducer
 * @date: 2019/7/13 15:17
 * <p>
 * =======================================================
 */
public class QueueProducer {
    public static void main(String[] args) {
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        try {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
            //2.获取连接
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息生产者
            producer = session.createProducer(queue);
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界");
            //8.发送消息
            producer.send(textMessage);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //9.关闭资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

上述代码中第 4 步创建 session 的两个参数:

第 1 个参数 是否使用事务

第 2 个参数 消息的确认模式

  • AUTO_ACKNOWLEDGE = 1 自动确认
  • CLIENT_ACKNOWLEDGE = 2 客户端手动确认
  • DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
  • SESSION_TRANSACTED = 0 事务提交并确认

运行后通过 ActiveMQ 管理界面查询

消息消费者
/**
 * =======================================================
 *
 * @desc: 消息消费者
 * @version: V1.0
 * @FileName: com.pyg.p2p QueueConsumer1
 * @date: 2019/7/13 15:17
 * <p>
 * =======================================================
 */
public class QueueConsumer2 {
    public static void main(String[] args) {
        Connection connection;
        Session session;
        MessageConsumer consumer;
        try {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
            //2.获取连接
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息消费
            consumer = session.createConsumer(queue);
            //7.监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收到消息:" + textMessage.getText());
                        // 8.等待键盘输入
                        System.in.read();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        //9.关闭资源
                        if (consumer != null) {
                            try {
                                consumer.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                        if (session != null) {
                            try {
                                session.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                        if (connection != null) {
                            try {
                                connection.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
运行测试

同时开启 2 个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现只有

一个消费者会接收到消息。

发布/订阅模式

消息生产者

创建类 TopicProducer ,main 方法代码如下:

public class TopicProducer {
    public static void main(String[] args) {
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        try {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new
                    ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
            //2.获取连接
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            Topic topic = session.createTopic("test-topic");
            //6.创建消息生产者
            producer = session.createProducer(topic);
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界");
            //8.发送消息
            producer.send(textMessage);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //9.关闭资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
消息消费者

创建类 TopicConsumer ,main 方法代码如下:

public class TopicConsumer1 {
    public static void main(String[] args) {
        try {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
            //2.获取连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取
            // session (参数 1:是否启动事务,参数 2:消息确认模式)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            // Queue queue = session.createQueue("test-queue");
            Topic topic = session.createTopic("test-topic");
            //6.创建消息消费
            MessageConsumer consumer = session.createConsumer(topic);
            //7.监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收到消息:" + textMessage.getText());
                        // 8.等待键盘输入
                        System.in.read();
                    } catch (Exception e) {
                    } finally {
                        //9.关闭资源
                        if (consumer != null) {
                            try {
                                consumer.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                        if (session != null) {
                            try {
                                session.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                        if (connection != null) {
                            try {
                                connection.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
运行测试

同时开启 2 个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现每个消费者会接收到消息。

目录
相关文章
|
2月前
|
NoSQL 调度 Redis
19- 你的项目中哪里用到了分布式锁
在一个项目中,为解决集群环境下SpringTask定时任务的重复执行问题,采用了Redis实现分布式锁来管理任务调度,防止资源浪费。后来因任务量和执行规则增加,以及单节点效率限制,系统改用XXL-JOB,分布式锁不再使用。
41 2
|
6天前
|
SQL NoSQL Java
如何在Java项目中实现分布式锁
如何在Java项目中实现分布式锁
|
2月前
|
缓存 NoSQL Java
分布式项目中锁的应用(本地锁-_redis【setnx】-_redisson-_springcache)-fen-bu-shi-xiang-mu-zhong-suo-de-ying-yong--ben-de-suo--redissetnx-springcache-redisson(一)
分布式项目中锁的应用(本地锁-_redis【setnx】-_redisson-_springcache)-fen-bu-shi-xiang-mu-zhong-suo-de-ying-yong--ben-de-suo--redissetnx-springcache-redisson
75 0
|
2月前
|
SpringCloudAlibaba Java 持续交付
【构建一套Spring Cloud项目的大概步骤】&【Springcloud Alibaba微服务分布式架构学习资料】
【构建一套Spring Cloud项目的大概步骤】&【Springcloud Alibaba微服务分布式架构学习资料】
331 0
|
14天前
|
消息中间件 Java 中间件
如何在Java项目中实现分布式事务管理
如何在Java项目中实现分布式事务管理
|
2月前
|
XML NoSQL Java
Java单体项目和分布式项目中的锁
Java单体项目和分布式项目中的锁 Java单体项目和分布式项目中的锁
59 2
|
2月前
|
缓存 NoSQL Redis
分布式项目中锁的应用(本地锁-_redis【setnx】-_redisson-_springcache)-fen-bu-shi-xiang-mu-zhong-suo-de-ying-yong--ben-de-suo--redissetnx-springcache-redisson(二)
分布式项目中锁的应用(本地锁-_redis【setnx】-_redisson-_springcache)-fen-bu-shi-xiang-mu-zhong-suo-de-ying-yong--ben-de-suo--redissetnx-springcache-redisson
50 0
|
7天前
|
NoSQL Java Redis
实现基于Redis的分布式锁机制
实现基于Redis的分布式锁机制
|
20天前
|
NoSQL Redis
redis分布式锁redisson
底层会尝试去加锁,如果加锁失败,会睡眠,自旋加锁,直到获取到锁为止。
21 1
|
17天前
|
消息中间件 NoSQL Java
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
46 0