Spring Cloud Stream Binder 实现

简介: Spring Cloud Stream Binder 实现JMS 实现 ActiveMQ1.增加Maven依赖<!-- 整合 Sprig Boot Starter ActiveMQ --> <!-- 间接依赖: spring jms ...

Spring Cloud Stream Binder 实现

JMS 实现 ActiveMQ

1.增加Maven依赖

<!-- 整合 Sprig Boot Starter ActiveMQ -->
        <!-- 间接依赖:
            spring jms
            jms api
            activemq
            spring boot jms
        -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
AI 代码解读

2.启动ActiveMQ broker


activemq console
AI 代码解读

3.原生API:生产消息
请注意启动后的控制台输出

private static void sendMessage() throws Exception {
        // 创建 ActiveMQ 链接,设置 Broker URL
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 创造 JMS 链接
        Connection connection = connectionFactory.createConnection();
        // 创建会话 Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建消息目的 - Queue 名称为 "TEST"
        Destination destination = session.createQueue("TEST");
        // 创建消息生产者
        MessageProducer producer = session.createProducer(destination);
        // 创建消息 - 文本消息
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setText("Hello,World");
        // 发送文本消息
        producer.send(message);

        // 关闭消息生产者
        producer.close();
        // 关闭会话
        session.close();
        // 关闭连接
        connection.close();
    }
AI 代码解读

4.原生API:消费消息

 private static void receiveMessage() throws Exception {

        // 创建 ActiveMQ 链接,设置 Broker URL
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 创造 JMS 链接
        Connection connection = connectionFactory.createConnection();
        // 启动连接
        connection.start();
        // 创建会话 Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建消息目的 - Queue 名称为 "TEST"
        Destination destination = session.createQueue("TEST");
        // 创建消息消费者
        MessageConsumer messageConsumer = session.createConsumer(destination);
        // 获取消息
        Message message = messageConsumer.receive(100);

        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("消息消费内容:" + textMessage.getText());
        }

        // 关闭消息消费者
        messageConsumer.close();
        // 关闭会话
        session.close();
        // 关闭连接
        connection.stop();
        connection.close();
    }
AI 代码解读

Spring Boot JMS+ActiveMQ

1.Maven依赖

<!-- 整合 Sprig Boot Starter ActiveMQ -->
        <!-- 间接依赖:
            spring jms
            jms api
            activemq
            spring boot jms
        -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
AI 代码解读

2.配置ActiveMQ属性
application.properties

## ActiveMQ 配置
spring.activemq.brokerUrl = tcp://localhost:61616
AI 代码解读

3.配置JMS属性
application.properties

## JMS 配置
spring.jms.template.defaultDestination = sf-users-activemq
AI 代码解读

4.改造user-service-client:实现ActiveMQ User对象消息生产
UserServiceClientController.java

@Autowired
    private JmsTemplate jmsTemplate;

    @PostMapping("/user/save/message/activemq")
    public boolean saveUserByActiveMQMessage(@RequestBody User user) throws Exception {
        jmsTemplate.convertAndSend(user);
        return true;
    }
AI 代码解读

5.启动user-service-client
预先启动“eureka-server”以及“config-server”
6.改造user-service-provider:实现ActiveMQ User对象信息消费

  @Autowired
    private JmsTemplate jmsTemplate;

    @GetMapping("/user/poll")
    public Object pollUser() {
        // 获取消息队列中,默认 destination = sf-users-activemq
        return jmsTemplate.receiveAndConvert();
    }
AI 代码解读

ActiveMQ Spring Cloud Stream Binder实现

创建spring-cloud-stream-binder-activemq 工程
引入Maven
实现Binder接口-仅实现消息发送

package com.segumentfault.spring.cloud.stream.binder.activemq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;

/**
 * Active MQ MessageChannel Binder 实现
 *
 * @author <a href="mailto:mercyblitz@gmail.com">Mercy</a>
 * @since 0.0.1
 */
public class ActiveMQMessageChannelBinder implements
        Binder<MessageChannel, ConsumerProperties, ProducerProperties> {

    @Autowired
    private JmsTemplate jmsTemplate;

    /**
     * 接受 ActiveMQ 消息
     *
     * @param name
     * @param group
     * @param inboundBindTarget
     * @param consumerProperties
     * @return
     */
    @Override
    public Binding<MessageChannel> bindConsumer(String name, String group, MessageChannel inboundBindTarget, ConsumerProperties consumerProperties) {
        // TODO: 实现消息消费
        return () -> {
        };
    }

    /**
     * 负责发送消息到 ActiveMQ
     *
     * @param name
     * @param outputChannel
     * @param producerProperties
     * @return
     */
    @Override
    public Binding<MessageChannel> bindProducer(String name, MessageChannel outputChannel, ProducerProperties producerProperties) {
        Assert.isInstanceOf(SubscribableChannel.class, outputChannel,
                "Binding is supported only for SubscribableChannel instances");

        SubscribableChannel subscribableChannel = (SubscribableChannel) outputChannel;

        subscribableChannel.subscribe(message -> {
            // 接受内部管道消息,来自于 MessageChannel#send(Message)
            // 实际并没有发送消息,而是此消息将要发送到 ActiveMQ Broker
            Object messageBody = message.getPayload();
            jmsTemplate.convertAndSend(name, messageBody);

        });

        return () -> {
            System.out.println("Unbinding");
        };
    }
}

实现 Spring Cloud Stream Binder 自动装配
package com.segumentfault.spring.cloud.stream.binder.activemq;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * ActiveMQ Stream Binder 自动装配
 *
 */
@Configuration
@ConditionalOnMissingBean(Binder.class)
public class ActiveMQStreamBinderAutoConfiguration {

    @Bean
    public ActiveMQMessageChannelBinder activeMQMessageChannelBinder() {
        return new ActiveMQMessageChannelBinder();
    }
}
AI 代码解读

2.配置META-INF/spring.binders

activemq :\
com.segumentfault.spring.cloud.stream.binder.activemq.ActiveMQStreamBinderAutoConfiguration
AI 代码解读

3.整合消息生产者user-service-client
引入 ActiveMQ Spring Cloud Stream Binder Maven 依赖

 <!-- 引入 Active MQ Spring Cloud Stream Binder 实现 -->
        <dependency>
            <groupId>com.segumentfault</groupId>
            <artifactId>spring-cloud-stream-binder-activemq</artifactId>
            <version>${project.version}</version>
        </dependency>
AI 代码解读

4.配置ActiveMQ Spring Cloud Stream Binder 属性

## Spring Cloud Stream 默认 Binder
spring.cloud.stream.defaultBinder=rabbit

### 消息管道 activemq-out 配置
spring.cloud.stream.bindings.activemq-out.binder = activemq
spring.cloud.stream.bindings.activemq-out.destination = sf-users-activemq
AI 代码解读

5.实现Binder接口 - 实现消息消费

 @Override
    public Binding<MessageChannel> bindConsumer(String name, String group, MessageChannel inputChannel, ConsumerProperties consumerProperties) {

        ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory();
        try {
            // 创造 JMS 链接
            Connection connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建会话 Session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建消息目的
            Destination destination = session.createQueue(name);
            // 创建消息消费者
            MessageConsumer messageConsumer = session.createConsumer(destination);

            messageConsumer.setMessageListener(message -> {
                // message 来自于 ActiveMQ
                if (message instanceof ObjectMessage) {
                    ObjectMessage objectMessage = (ObjectMessage) message;
                    try {
                        Object object = objectMessage.getObject();
                        inputChannel.send(new GenericMessage<Object>(object));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        } catch (JMSException e) {
            e.printStackTrace();
        }

        return () -> {
        };
    }
AI 代码解读

6.整合消息消费者- user-service-provider
引入 ActiveMQ Spring Cloud Stream Binder Maven 依赖

<!-- 引入 Active MQ Spring Cloud Stream Binder 实现 -->
        <dependency>
            <groupId>com.segumentfault</groupId>
            <artifactId>spring-cloud-stream-binder-activemq</artifactId>
            <version>${project.version}</version>
        </dependency>
AI 代码解读

7.配置ActiveMQ Spring Cloud Stream Binder 属性

## Spring Cloud Stream 默认 Binder
spring.cloud.stream.defaultBinder=rabbit

### 消息管道 activemq-out 配置
spring.cloud.stream.bindings.activemq-in.binder = activemq
spring.cloud.stream.bindings.activemq-in.destination = sf-users-activemq
AI 代码解读

8.实现User消息监听

 @StreamListener("activemq-in")
    public void onUserMessage(User user) throws IOException {
        System.out.println("Subscribe by @StreamListener");
        userService.saveUser(user);
    }

    // 监听 ActiveMQ Stream
    userMessage.activeMQIn().subscribe(message -> {

        if (message instanceof GenericMessage) {
            GenericMessage genericMessage = (GenericMessage) message;
            User user = (User) genericMessage.getPayload();
            userService.saveUser(user);
        }
    });
AI 代码解读
目录
打赏
0
0
0
0
9
分享
相关文章
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
49 6
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
83 5
|
2月前
|
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
62 5
Spring cloud gateway 如何在路由时进行负载均衡
Spring cloud gateway 如何在路由时进行负载均衡
783 15
|
7月前
|
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
178 3
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
【Azure 事件中心】Spring Boot 集成 Event Hub(azure-spring-cloud-stream-binder-eventhubs)指定Partition Key有异常消息
【Azure 事件中心】Spring Boot 集成 Event Hub(azure-spring-cloud-stream-binder-eventhubs)指定Partition Key有异常消息
通用快照方案问题之通过Spring Cloud实现配置的自动更新如何解决
通用快照方案问题之通过Spring Cloud实现配置的自动更新如何解决
100 0
通用快照方案问题之Spring Boot Admin的定义如何解决
通用快照方案问题之Spring Boot Admin的定义如何解决
84 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等