public class Consumer implements CommandLineRunner {
private String namesrvAddr = "192.168.186.129:9876";
public void messageListener(){
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("SilktechDashboardConsumer");
consumer.setNamesrvAddr(namesrvAddr);
try {
consumer.subscribe("example4", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeMessageBatchMaxSize(1);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for(Message msg:msgs){
try {
System.out.println(msg.getBody());
System.out.println("Received message====>"+new String(msg.getBody(),"utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override public void run(String... args) throws Exception { this.messageListener(); }
rocketmq版本:4.4.0 canal版本:1.1.4 flatMessage=false
原提问者GitHub用户shineabel
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。