52分布式电商项目 - ActiveMQ例子

简介: 52分布式电商项目 - ActiveMQ例子

代码已上传值Github

地址:https://github.com/ylw-github/ActiveMQ-Demo.git

点对点模式

点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向 ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在 activemq 服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上 ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息。

消息生产者

创建工程 jmsDemo ,引入依赖

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.11.2</version>
    </dependency>

创建类 QueueProducer main 方法代码如下:

/**
 * =======================================================
 *
 * @desc: 消息生产者
 * @version: V1.0
 * @FileName: com.pyg.p2p QueueProducer
 * @date: 2019/7/13 15:17
 * <p>
 * =======================================================
 */
public class QueueProducer {
    public static void main(String[] args) {
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        try {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
            //2.获取连接
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息生产者
            producer = session.createProducer(queue);
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界");
            //8.发送消息
            producer.send(textMessage);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //9.关闭资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

上述代码中第 4 步创建 session 的两个参数:

第 1 个参数 是否使用事务

第 2 个参数 消息的确认模式

  • AUTO_ACKNOWLEDGE = 1 自动确认
  • CLIENT_ACKNOWLEDGE = 2 客户端手动确认
  • DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
  • SESSION_TRANSACTED = 0 事务提交并确认

运行后通过 ActiveMQ 管理界面查询

消息消费者
/**
 * =======================================================
 *
 * @desc: 消息消费者
 * @version: V1.0
 * @FileName: com.pyg.p2p QueueConsumer1
 * @date: 2019/7/13 15:17
 * <p>
 * =======================================================
 */
public class QueueConsumer2 {
    public static void main(String[] args) {
        Connection connection;
        Session session;
        MessageConsumer consumer;
        try {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
            //2.获取连接
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息消费
            consumer = session.createConsumer(queue);
            //7.监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收到消息:" + textMessage.getText());
                        // 8.等待键盘输入
                        System.in.read();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        //9.关闭资源
                        if (consumer != null) {
                            try {
                                consumer.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                        if (session != null) {
                            try {
                                session.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                        if (connection != null) {
                            try {
                                connection.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
运行测试

同时开启 2 个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现只有

一个消费者会接收到消息。

发布/订阅模式

消息生产者

创建类 TopicProducer ,main 方法代码如下:

public class TopicProducer {
    public static void main(String[] args) {
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        try {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new
                    ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
            //2.获取连接
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            Topic topic = session.createTopic("test-topic");
            //6.创建消息生产者
            producer = session.createProducer(topic);
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界");
            //8.发送消息
            producer.send(textMessage);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //9.关闭资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
消息消费者

创建类 TopicConsumer ,main 方法代码如下:

public class TopicConsumer1 {
    public static void main(String[] args) {
        try {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
            //2.获取连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取
            // session (参数 1:是否启动事务,参数 2:消息确认模式)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            // Queue queue = session.createQueue("test-queue");
            Topic topic = session.createTopic("test-topic");
            //6.创建消息消费
            MessageConsumer consumer = session.createConsumer(topic);
            //7.监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收到消息:" + textMessage.getText());
                        // 8.等待键盘输入
                        System.in.read();
                    } catch (Exception e) {
                    } finally {
                        //9.关闭资源
                        if (consumer != null) {
                            try {
                                consumer.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                        if (session != null) {
                            try {
                                session.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                        if (connection != null) {
                            try {
                                connection.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
运行测试

同时开启 2 个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现每个消费者会接收到消息。

目录
相关文章
|
5月前
|
Java 调度 Maven
【分布式任务调度平台 XXL-JOB 急速入门】从零开始将 XXL-JOB 接入到自己的项目(下)
【分布式任务调度平台 XXL-JOB 急速入门】从零开始将 XXL-JOB 接入到自己的项目(下)
118 0
|
2月前
|
SpringCloudAlibaba Java 持续交付
【构建一套Spring Cloud项目的大概步骤】&【Springcloud Alibaba微服务分布式架构学习资料】
【构建一套Spring Cloud项目的大概步骤】&【Springcloud Alibaba微服务分布式架构学习资料】
170 0
|
3月前
|
存储 NoSQL 文件存储
C++ 哈希表企业级项目运用---淘宝分布式文件系统
C++ 哈希表企业级项目运用---淘宝分布式文件系统
|
4月前
|
网络协议 Devops 大数据
【分布式】大型互联网项目特点
【1月更文挑战第25天】【分布式】大型互联网项目特点
|
4月前
|
存储 缓存 监控
【分布式】大型互联网项目架构目标
【1月更文挑战第25天】【分布式】大型互联网项目架构目标
|
4月前
|
Java API
分布式锁【分布式锁概述、业务介绍、创建SpringBoot项目】(一)-全面详解(学习总结---从入门到深化)
分布式锁【分布式锁概述、业务介绍、创建SpringBoot项目】(一)-全面详解(学习总结---从入门到深化)
23 0
|
5月前
|
编译器 定位技术 开发工具
分布式版本控制系统Git的下载、安装与使用其复制GitHub项目代码的方法
分布式版本控制系统Git的下载、安装与使用其复制GitHub项目代码的方法
|
2月前
|
NoSQL 算法 安全
Redlock 算法-主从redis分布式锁主节点宕机锁丢失的问题
Redlock 算法-主从redis分布式锁主节点宕机锁丢失的问题
155 0
|
2月前
|
NoSQL 关系型数据库 MySQL
分布式锁(redis/mysql)
分布式锁(redis/mysql)
66 1
|
2月前
|
NoSQL Java Redis
如何通俗易懂的理解Redis分布式锁
在多线程并发的情况下,我们如何保证一个代码块在同一时间只能由一个线程访问呢?
39 2