52分布式电商项目 - ActiveMQ例子

简介: 52分布式电商项目 - ActiveMQ例子

代码已上传值Github

地址:https://github.com/ylw-github/ActiveMQ-Demo.git

点对点模式

点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向 ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在 activemq 服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上 ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息。

消息生产者

创建工程 jmsDemo ,引入依赖

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.11.2</version>
    </dependency>

创建类 QueueProducer main 方法代码如下:

/**
 * =======================================================
 *
 * @desc: 消息生产者
 * @version: V1.0
 * @FileName: com.pyg.p2p QueueProducer
 * @date: 2019/7/13 15:17
 * <p>
 * =======================================================
 */
public class QueueProducer {
    public static void main(String[] args) {
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        try {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
            //2.获取连接
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息生产者
            producer = session.createProducer(queue);
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界");
            //8.发送消息
            producer.send(textMessage);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //9.关闭资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

上述代码中第 4 步创建 session 的两个参数:

第 1 个参数 是否使用事务

第 2 个参数 消息的确认模式

  • AUTO_ACKNOWLEDGE = 1 自动确认
  • CLIENT_ACKNOWLEDGE = 2 客户端手动确认
  • DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
  • SESSION_TRANSACTED = 0 事务提交并确认

运行后通过 ActiveMQ 管理界面查询

消息消费者
/**
 * =======================================================
 *
 * @desc: 消息消费者
 * @version: V1.0
 * @FileName: com.pyg.p2p QueueConsumer1
 * @date: 2019/7/13 15:17
 * <p>
 * =======================================================
 */
public class QueueConsumer2 {
    public static void main(String[] args) {
        Connection connection;
        Session session;
        MessageConsumer consumer;
        try {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
            //2.获取连接
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息消费
            consumer = session.createConsumer(queue);
            //7.监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收到消息:" + textMessage.getText());
                        // 8.等待键盘输入
                        System.in.read();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        //9.关闭资源
                        if (consumer != null) {
                            try {
                                consumer.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                        if (session != null) {
                            try {
                                session.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                        if (connection != null) {
                            try {
                                connection.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
运行测试

同时开启 2 个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现只有

一个消费者会接收到消息。

发布/订阅模式

消息生产者

创建类 TopicProducer ,main 方法代码如下:

public class TopicProducer {
    public static void main(String[] args) {
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        try {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new
                    ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
            //2.获取连接
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            Topic topic = session.createTopic("test-topic");
            //6.创建消息生产者
            producer = session.createProducer(topic);
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界");
            //8.发送消息
            producer.send(textMessage);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //9.关闭资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
消息消费者

创建类 TopicConsumer ,main 方法代码如下:

public class TopicConsumer1 {
    public static void main(String[] args) {
        try {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
            //2.获取连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取
            // session (参数 1:是否启动事务,参数 2:消息确认模式)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            // Queue queue = session.createQueue("test-queue");
            Topic topic = session.createTopic("test-topic");
            //6.创建消息消费
            MessageConsumer consumer = session.createConsumer(topic);
            //7.监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收到消息:" + textMessage.getText());
                        // 8.等待键盘输入
                        System.in.read();
                    } catch (Exception e) {
                    } finally {
                        //9.关闭资源
                        if (consumer != null) {
                            try {
                                consumer.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                        if (session != null) {
                            try {
                                session.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                        if (connection != null) {
                            try {
                                connection.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
运行测试

同时开启 2 个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现每个消费者会接收到消息。

目录
打赏
0
0
0
0
237
分享
相关文章
|
8月前
|
19- 你的项目中哪里用到了分布式锁
在一个项目中,为解决集群环境下SpringTask定时任务的重复执行问题,采用了Redis实现分布式锁来管理任务调度,防止资源浪费。后来因任务量和执行规则增加,以及单节点效率限制,系统改用XXL-JOB,分布式锁不再使用。
79 2
面试官:项目中如何实现分布式锁?
面试官:项目中如何实现分布式锁?
115 6
面试官:项目中如何实现分布式锁?
项目环境测试问题之Schedulerx2.0通过分布式分片任务解决单机计算瓶颈如何解决
项目环境测试问题之Schedulerx2.0通过分布式分片任务解决单机计算瓶颈如何解决
项目环境测试问题之Schedulerx2.0通过分布式分片任务解决单机计算瓶颈如何解决
看看 Asp.net core Webapi 项目如何优雅地使用分布式缓存
看看 Asp.net core Webapi 项目如何优雅地使用分布式缓存
如何在Java项目中实现分布式锁
如何在Java项目中实现分布式锁
|
8月前
|
Java单体项目和分布式项目中的锁
Java单体项目和分布式项目中的锁 Java单体项目和分布式项目中的锁
103 2
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
1月前
|
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
174 5
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
90 8