rocketmq5.0消费者启动报错provider.newPushConsumerBuilder().buil
@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication
public class OrganizationApplication {
public static void main(String[] args) throws InterruptedException, IOException, ClientException, ClientException {
SpringApplication.run(OrganizationApplication.class, args);
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// Credential provider is optional for client configuration.
String accessKey = '';
String secretKey = '';
String endpoints = '192.168.1.46:8081';
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
String tag = '';
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
String consumerGroup = 'please_rename_unique_group_name_4';
String topic = 'VENDING_DEVICE_TOPIC';
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the consumer group name.
.setConsumerGroup(consumerGroup)
// Set the subscription for the consumer.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
// Handle the received message and return consume result.
ByteBuffer body = messageView.getBody();
String resultJson = StandardCharsets.UTF_8.decode(body).toString();
System.out.println('消息监听'+resultJson);
return ConsumeResult.SUCCESS;
})
.build();
}
}
赞0
踩0