RocketMQ安装和使用

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: RocketMQ安装和使用

概念

下载安装

下载地址:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip

安装:

unzip rocketmq-all-4.9.4-bin-release.zip

cd rocketmq-all-4.9.4-bin-release/

修改broker配置(默认8g, 4g内存太大了)

vim ./bin/runbroker.sh

vim ./bin/runserver.sh

vim conf/broker.conf 在随后添加下面两行

namesrvAddr=127.0.0.1:9876
brokerIP1=192.168.174.129 # 自己的IP地址

启动Name Server

nohup sh bin/mqnamesrv &

tail -f ~/logs/rocketmqlogs/namesrv.log

(关闭sh bin/mqshutdown namesrv)

启动Broker

nohup bin/mqbroker -n localhost:9876 -c conf/broker.conf autoCreateTopicEnable=true &

tail -f ~/logs/rocketmqlogs/broker.log

(关闭sh bin/mqshutdown broker)

RocketMQ控制台安装

  1. 在git上下载下面的工程 rocketmq-console-1.0.0
    https://github.com/apache/rocketmq-externals/releases
  2. 修改配置文件 rocketmq-console\src\main\resources\application.properties
    server.port=7777 #项目启动后的端口号
    rocketmq.config.namesrvAddr=192.168.88.128:9876 #nameserv的地址,注意关闭防火墙
  3. 进入控制台项目,将工程打成jar包
    mvn clean package -Dmaven.test.skip=true
  4. 启动控制台
    java -jar target/rocketmq-console-ng-1.0.0.jar
  5. 访问控制台(可以创建主题,查看消息)
    http://localhost:7777/#/message

基本使用

发送同步消息

同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方

式。此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。

@Autowired
    private RocketMQTemplate rocketMQTemplate;
    //同步消息
    @Test
    public void testSyncSend() {
        //参数一: topic:tag
        //参数二: 消息体
        //参数三: 超时时间
        SendResult result =
                rocketMQTemplate.syncSend("test-topic-1:tag", "这是一条同步消息", 10000);
        System.out.println(result);
    }

发送异步消息

异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送

方通过回调接口接收服务器响应,并对响应结果进行处理。

异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知

启动转码服务,转码完成后通知推送转码结果等。

//异步消息
    @Test
    public void testAsyncSend() throws InterruptedException {
        //参数一: topic:tag
        //参数二: 消息体
        //参数三: 回调
        rocketMQTemplate.asyncSend("test-topic-1", "这是一条异步消息", new SendCallback() {
            //成功响应的回调
            @Override
            public void onSuccess(SendResult result) {
                System.out.println(result);
            }
            //异常响应的回调
            @Override
            public void onException(Throwable throwable) {
                System.out.println(throwable);
            }
        });
        System.out.println("==================");
        Thread.sleep(300000000);
    }

发送单向消息

单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不

等待应答。

适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

//单向消息
    @Test
    public void testOneWay() {
        for (int i = 0; i < 10; i++) {
            rocketMQTemplate.sendOneWay("test-topic-1", "这是一条单向消息");
        }
    }

发送顺序消息

顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型。

就是将原来方法换为xxxOrderly, 例如:

//单向顺序消息(其他顺序消息都是调用对应的xxxSendOrderly方法)
    @Test
    public void testOneWayOrderly() {
        for (int i = 0; i < 10; i++) {
            //第三个参数的作用是用来决定这些消息发送到哪个队列的上的(随便写,唯一即可)
            rocketMQTemplate.sendOneWayOrderly("test-topic-1", "这是一条单向消息","xx");
        }
    }
    //异步顺序消息
    @Test
    public void testAsyncSendOrderly() throws InterruptedException {
        //参数一: topic:tag
        //参数二: 消息体
        //参数三: 回调
        rocketMQTemplate.asyncSendOrderly("test-topic-1", "这是一条异步顺序消息", "yy", new SendCallback() {
            //成功响应的回调
            @Override
            public void onSuccess(SendResult result) {
                System.out.println(result);
            }
            //异常响应的回调
            @Override
            public void onException(Throwable throwable) {
                System.out.println(throwable);
            }
        });
        System.out.println("==================");
        Thread.sleep(300000000);
    }

发送事务消息

交互流程

@Service
public class OrderServiceImpl4 {
    @Autowired
    private OrderDao orderDao;
    @Autowired
    private TxLogDao txLogDao; // 用来记录事务日志
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    public void createOrderBefore(Order order) {
        // 事务id
        String txId = UUID.randomUUID().toString();
        //发送半事务消息
        rocketMQTemplate.sendMessageInTransaction(
                "tx_producer_group",
                "tx_topic",
                MessageBuilder.withPayload(order).setHeader("txId", txId).build(),
                order
        );
    }
    
    @Transactional
    public void createOrder(String txId, Order order) {
        //保存订单
        orderDao.save(order);
        TxLog txLog = new TxLog();
        txLog.setTxId(txId);
        txLog.setDate(new Date());
        //记录事物日志(事务日志和保存订单是同成功,同失败的)
        txLogDao.save(txLog);
    }
}
@Service
//这里值必须跟发送消息的时候txProducerGroup参数值保持一致
@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public class OrderServiceImpl4Listener implements RocketMQLocalTransactionListener {
    @Autowired
    private OrderServiceImpl4 orderServiceImpl4;
    @Autowired
    private TxLogDao txLogDao;
    //执行本地事物
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String txId = (String) msg.getHeaders().get("txId");
        try {
            //本地事物,成功就commit, 异常就rollback
            Order order = (Order) arg;
            orderServiceImpl4.createOrder(txId,order);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    //消息回查
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String txId = (String) msg.getHeaders().get("txId");
        TxLog txLog = txLogDao.findById(txId).get();
        if (txLog != null){
            //本地事物(订单)成功了
            return RocketMQLocalTransactionState.COMMIT;
        }else {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

生产案例

加依赖

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

写配置

#rocketmq
rocketmq:
  name-server: 192.168.174.129:9876   #rocketMQ服务的地址
  producer:
    group: shop-order # 生产者组

发送消息

@Autowired
    private RocketMQTemplate rocketMQTemplate;
    //下单--fegin
    @RequestMapping("/order/prod/{pid}")
    public Order order(@PathVariable("pid") Integer pid) {
        log.info("接收到{}号商品的下单请求,接下来调用商品微服务查询此商品信息", pid);
        //调用商品微服务,查询商品信息
        Product product = productService.findByPid(pid);
        if (product.getPid() == -100) {
            Order order = new Order();
            order.setOid(-100L);
            order.setPname("下单失败");
            return order;
        }
        log.info("查询到{}号商品的信息,内容是:{}", pid, JSON.toJSONString(product));
        //下单(创建订单)
        Order order = new Order();
        order.setUid(1);
        order.setUsername("测试用户");
        order.setPid(pid);
        order.setPname(product.getPname());
        order.setPprice(product.getPprice());
        order.setNumber(1);
        orderService.createOrder(order);
        log.info("创建订单成功,订单信息为{}", JSON.toJSONString(order));
        //向mq中投递一个下单成功的消息
        //参数一: 指定topic
        //参数二: 指定消息体
        rocketMQTemplate.convertAndSend("order-topic", order);
        return order;
    }

消费消息

@Slf4j
@Service("shopSmsService")
//consumerGroup-消费者组名  topic-要消费的主题
@RocketMQMessageListener(
        consumerGroup = "shop-user", //消费者组名
        topic = "order-topic",//消费主题
        consumeMode = ConsumeMode.CONCURRENTLY,//消费模式,指定是否顺序消费 CONCURRENTLY(同步,默认) ORDERLY(顺序)
        messageModel = MessageModel.CLUSTERING//消息模式 BROADCASTING(广播)  CLUSTERING(集群,默认)
)
public class SmsService implements RocketMQListener<Order> {
    //消费逻辑
    @Override
    public void onMessage(Order message) {
        log.info("接收到了一个订单信息{},接下来就可以发送短信通知了", message);
    }
}

常见问题和解决方案

RemotingConnectException: connect to <172.17.0.1: 10911>

vim conf/broker.conf 在随后添加下面两行

namesrvAddr=127.0.0.1:9876
brokerIP1=192.168.174.129 # 自己的IP地址

然后 重启nameserv和broker

bin/mqshutdown broker
bin/mqshutdown namesrv
nohup ./bin/mqnamesrv &
nohup bin/mqbroker -n localhost:9876 -c conf/broker.conf &
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 Linux API
centos7 安装rabbitmq自定义版本及配置
centos7 安装rabbitmq自定义版本及配置
|
6月前
|
消息中间件 关系型数据库 MySQL
入职必会-开发环境搭建52-RabbitMQ安装
RabbitMQ 是一款开源的消息队列软件,最初由 LShift 公司开发,后来成为 Pivotal Software(现在是 VMware 的一部分)的一部分。它是基于 AMQP(高级消息队列协议)标准的消息中间件,旨在帮助不同应用程序之间进行可靠的数据传输和通信。 RabbitMQ 提供了高度灵活的消息队列机制,可以在分布式环境中实现应用程序之间的异步通信。它支持多种消息传递模式,包括点对点、发布/订阅、请求/响应等,能够满足各种复杂的消息通信需求。
入职必会-开发环境搭建52-RabbitMQ安装
|
6月前
|
消息中间件 存储 Linux
RabbitMQ安装及配套Laravel使用
RabbitMQ安装及配套Laravel使用
123 4
|
3月前
|
消息中间件 数据安全/隐私保护 Docker
Docker安装rabbitmq
如何使用Docker安装和配置RabbitMQ服务,包括拉取RabbitMQ镜像、创建容器、配置持久化和访问管理界面的步骤。
203 0
Docker安装rabbitmq
|
4月前
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
|
4月前
|
消息中间件 Linux
centos7安装rabbitmq
centos7安装rabbitmq
|
5月前
|
消息中间件 Docker 容器
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
这篇文章提供了RabbitMQ的安装和基本使用教程,包括如何使用Docker拉取RabbitMQ镜像、创建容器、通过浏览器访问管理界面,以及如何创建交换机、队列、绑定和使用direct、fanout和topic三种类型的交换器进行消息发布和接收的测试。
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
|
4月前
|
消息中间件 Linux
linux之centos安装rabbitmq
linux之centos安装rabbitmq
|
5月前
|
存储 Ubuntu 安全
在Ubuntu 16.04上安装和保护Mosquitto MQTT消息代理的方法
在Ubuntu 16.04上安装和保护Mosquitto MQTT消息代理的方法
129 1
|
5月前
|
Linux 数据安全/隐私保护 Docker
MQTT(EMQX) - Linux CentOS Docker 安装
MQTT(EMQX) - Linux CentOS Docker 安装
330 0