RocketMq消息队列使用

简介:

最近在看消息队列框架 ,alibaba的RocketMQ单机支持1万以上的持久化队列,支持诸多特性,

目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景

比kafka还是有过之无不及,其实kafka文档很丰富

但RocketMQ网上的文章太少,找不到相关的操作教程

于是研究了下源码 做个单机操作的教程,如果你也对此有兴趣不妨共同研究

下载源码的地址 https://github.com/alibaba/RocketMQ/releases

  • 首选通过在java项目里面Maven依赖方式引用RocketMQ Java SDK

    <dependency>
        <groupId>com.alibaba.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>3.2.6</version>
    </dependency>

Downloads

在linux 下用wget 下载源码然后解压出来

在runserver.sh里面可以配置 jvm启动的参数 JAVA_OPT_1="-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"  

可以 vi runserver.sh

分别给 mqnamesrv mqbroker play.sh 执行的权限

chmod +x  mqnamersrv 

chmod +x  mqbroker 

chmod +x  play.sh 

下面红线框的这段 命令输入错误了,忽略不用看

通过 nohup sh mqnamesrv& 启动 RocketMq

目前没看到结束的命令,也没找到相关的介绍,

我这里用的 ps -ef|grep rocketmq  查到进程pid

然后kill pid号

或则pkill -9 java [慎用]

用jps -v 查看下java进程的参数

 rocketmq启动后监听 9876端口,这里还是在看源码里面看到的,资料实在是太少了

在防火墙配置里面加上 9876端口,设置iptables对外开放

部署Broker 

nohup sh mqbroker -n "127.0.0.1:9876" -c ../conf/2m-2s-async/broker-a.properties & 

这里ip换成本机的就是单机实例,如果配置主从 这里可以配其他的ip

 Master和Slave的配置文件参考conf目录下的配置文件

 Master与Slave通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数

 一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分

 部署一Master一Slave,集群采用异步复制方式:

 Master: nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a.properties &  

Slave:   nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a-s.properties &  

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package  com.pgsqlmybatis.common.rocketmq; /*
***************************************************************
* 公司名称    :
* 系统名称    :信用管家专业版
* 类 名 称    :Ios渠道idfa统计,推广统计用
* 功能描述    :
* 业务描述    :
* 作 者 名    :@Author Royal
* 开发日期    :2016-05-15
* Created     :IntelliJ IDEA
***************************************************************
* 修改日期    :
* 修 改 者    :
* 修改内容    :
***************************************************************
*/
 
import  com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import  com.alibaba.rocketmq.client.producer.SendResult;
import  com.alibaba.rocketmq.common.message.Message;
 
public  class  Producer {
     public  static  void  main(String[] args) {
         DefaultMQProducer producer =  new  DefaultMQProducer( "Producer" );
         producer.setNamesrvAddr( "xxxxxxxxxx:9876" );
         try  {
             producer.start();
 
             String pushMsg= "kafka activeMq rocketMq 消息队列使用1" ;
             Message msg =  new  Message( "PushTopic" , "push" , "1" ,
                     pushMsg.getBytes( "UTF-8" ));
 
             SendResult result = producer.send(msg);
             System.out.println( "id:"  + result.getMsgId() +
                     " result:"  + result.getSendStatus());
 
             String pushMsg2= "海量级消息记录单机测试2" ;
             msg =  new  Message( "PushTopic" , "push" , "2" ,pushMsg2.getBytes( "UTF-8" ));
 
             result = producer.send(msg);
             System.out.println( "id:"  + result.getMsgId() +
                     " result:"  + result.getSendStatus());
 
             String pushMsg3= "海量级消息记录单机测试3" ;
             msg =  new  Message( "PullTopic" , "pull" , "1" ,pushMsg3.getBytes());
 
             result = producer.send(msg);
             System.out.println( "id:"  + result.getMsgId() +
                     " result:"  + result.getSendStatus());
         catch  (Exception e) {
             e.printStackTrace();
         finally  {
             producer.shutdown();
         }
     }
}

  

启动生成者

 

启动消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package  com.pgsqlmybatis.common.rocketmq; /*
***************************************************************
* 公司名称    :
* 系统名称    :信用管家专业版
* 类 名 称    :Ios渠道idfa统计,推广统计用
* 功能描述    :
* 业务描述    :
* 作 者 名    :@Author Royal
* 开发日期    :2016-05-15
* Created     :IntelliJ IDEA
***************************************************************
* 修改日期    :
* 修 改 者    :
* 修改内容    :
***************************************************************
*/
 
import  java.io.UnsupportedEncodingException;
import  java.util.List;
 
import  com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import  com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import  com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import  com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import  com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import  com.alibaba.rocketmq.common.message.Message;
import  com.alibaba.rocketmq.common.message.MessageExt;
 
public  class  Consumer {
     public  static  void  main(String[] args){
         DefaultMQPushConsumer consumer =
                 new  DefaultMQPushConsumer( "PushConsumer" );
         consumer.setNamesrvAddr( "xxxxxxxxxxxx:9876" );
         try  {
             consumer.subscribe( "PushTopic" "push" );
             /**
              * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
              * 如果非第一次启动,那么按照上次消费的位置继续消费
              */
             consumer.setConsumeFromWhere(
                     ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
             consumer.registerMessageListener(
                     new  MessageListenerConcurrently() {
                         public  ConsumeConcurrentlyStatus consumeMessage(
                                 List<MessageExt> list,
                                 ConsumeConcurrentlyContext Context) {
                             Message msg = list.get( 0 );
                             System.out.println(msg.toString());
                             String recString=  null ;
                             try  {
                                 recString =  new  String(msg.getBody() , "UTF-8" );
                             catch  (UnsupportedEncodingException e) {
                                 e.printStackTrace();
                             }
                             System.out.println(recString);
                             return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                         }
                     }
             );
             consumer.start();
         catch  (Exception e) {
             e.printStackTrace();
         }
     }
}

   

 

以上为单机实例配置

如果你遇到什么问题可以私信我,如果觉得此文对你很有帮助,点下赞推荐下额^_^ 

参考:http://blog.csdn.net/a19881029/article/details/34446629

        http://sofar.blog.51cto.com/353572/1540874

        http://blog.csdn.net/loongshawn/article/details/51086876

        RocketMq最佳实践

       《RocketMQ原理简介》

       分布式开放消息系统(RocketMQ)的原理与实践      

       《RocketMQ用户指南》

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
26天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
66 4
|
21天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
24天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
1月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
67 6
|
1月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
1月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
64 4
|
2月前
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
80 16
|
2月前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
73 9
|
2月前
|
消息中间件 监控 数据处理
解决方案 | 云消息队列RabbitMQ实践
解决方案 | 云消息队列RabbitMQ实践
52 1

相关产品

  • 云消息队列 MQ