开发者学堂课程【RocketMQ知识精讲与项目实战(第一阶段):SQL语法过滤】学习笔记,与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/702/detail/12388
SQL语法过滤
内容介绍:
一、绑定属性
二、限制条件
一、绑定属性
结构还是一个生产者和一个消费者,consumer 和 producer 主题改为filtersqltopic,再给消息绑定一个自定义的属性,
代码示例:
producer.start
();
for (int i =
0
; i < 1
0
; i++) {
//4.创建消息对象,指定主 题Topic、tog 和消息体
/**
*参数一:消息主题 Topic
*参数二:消息Tag
*参数三:消息内容
*/
Message
msg=new Message(topic: “FilterSQLTopic”,taqs:"Tag1”,(“Hello World*+i).getBytes());
msg.
pu
tu
erProperty( name:
’’
i
’’
string,valueof(i));
//5.发送消息
SendResult result = producer.send(msg);
//发送状态
Sendstatus status=result.getSendstatus();
System.out.println("发送结果:"+result);
//线程睡一秒
TimeUnit.SEC
O
ND
S
.sleep(timeout
1);
}
//16.关闭生产者 producer
pnoducer.shutdowm();
}
二、限制条件
启动消费者,限制条件为i>5,因此可以得到发送了i为6、7、8、9的消息,
public class consumer
{
public static void main(String[] args) throws Exception
{
//1.创建消费者Consumer,指定消费者组名
DefaultNOPushConsumer
consu
m
er
=newDe
faultMQPushConsumer(consumerGroup: “group
1
”);
//2.指定 nameserver 地址
consumer.setNamesrvAddr("192.168.25
.135:
9876;192
.
168.25.138:9876
”
);
//3.订阅主题Topic和Tag
consumer.subscribe(topic:"FiltersQLTopic”,
MessageSelector.bySql("i>
5
"));
//设置回调函数,处理消息
consumer.registerMessageListener((MessageListenerConcurrently)(msgs;context){
for (MessageExt msg :msgs){
System.out.println("consumeThreadm
=
”+Thread.currentThread()-getName() +","*
+
new string(msg
.getbody()));
return ConsumeConcurrentlystatus.CONSUME_SUCCESS;
})
C
on
su
m
er
:
consuneThread
=
ConsumeMessageThread_1,Hello world
6
c
onsu
me
Thread
=
Consu
me
MessageThread
_
2,Hello
w
orld7
c
onsumeThreed
=
ConsumeMessageThread 3 Hello world
8
consumeThread=ConsumeMessageThread_4,Hello World9
虽然对方发送了十条,但根据用户的属性,过滤了消息,这就是由 sql 过滤得到
的。