Spring Cloud消息驱动整合

本文涉及的产品
应用型负载均衡 ALB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
传统型负载均衡 CLB,每月750个小时 15LCU
简介: Spring Cloud Stream 使用场景消息驱动的微服务应用 目的 简化代码 统一抽象 主要概念1.应用模型 2.Binder抽象3.持久化 发布/订阅支持4.消费分组支持5.

Spring Cloud Stream

  • 使用场景
    消息驱动的微服务应用
  • 目的
    简化代码

统一抽象

  • 主要概念
    1.应用模型

2.Binder抽象
3.持久化 发布/订阅支持
4.消费分组支持
5.分区支持

  • 基本概念
    Source:Stream发送源

近义词:Producer、Publisher
Sink:Stream接收器
近义词:Consumer、Subscriber
Processor

  • 编程模型
    激活:- @EnableBinding

    • @Configuration
    • @EnableIntegration
      Source

@Output
MessgeChannel

  • 编程模型
    Sink

    • @Input
      SubschribableChannel

@ServiceActivator
@StreamListener

Spring Cloud 消息驱动整合

整合kafka
改造 user-service-client 消息发送源(Kafka 原生 API)
User 模型实现序列化接口

package com.segumentfault.spring.cloud.lesson12.domain;

import java.io.Serializable;

/**
 * 用户模型
 *
 */
public class User implements Serializable {

    private static final long serialVersionUID = -5688097732613347904L;

    /**
     * ID
     */
    private Long id;

    /**
     * 用户名称
     */
    private String name;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
AI 代码解读

增加kafka依赖

 <!-- 整合 Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
AI 代码解读

利用kafkaTemplate实现消息发送

package com.segumentfault.spring.cloud.lesson12.user.service.client.web.controller;

import com.segumentfault.spring.cloud.lesson12.api.UserService;
import com.segumentfault.spring.cloud.lesson12.domain.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

/**
 * {@link UserService} 客户端 {@link RestController}
 * <p>
 * 注意:官方建议 客户端和服务端不要同时实现 Feign 接口
 * 这里的代码只是一个说明,实际情况最好使用组合的方式,而不是继承
 *
 */
@RestController
public class UserServiceClientController implements UserService {

    @Autowired
    private UserService userService;

    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    public UserServiceClientController(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @PostMapping("/user/save/message")
    public boolean saveUserByMessage(@RequestBody User user) {
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("sf-users", 0, user);
        return future.isDone();
    }

    // 通过方法继承,URL 映射 :"/user/save"
    @Override
    public boolean saveUser(@RequestBody User user) {
        return userService.saveUser(user);
    }

    // 通过方法继承,URL 映射 :"/user/find/all"
    @Override
    public List<User> findAll() {
        return userService.findAll();
    }

}
AI 代码解读

实现kafka序列化器:Java序列化协议

package com.segumentfault.spring.cloud.lesson12.user.service.client.serializer;

import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Map;

/**
 * Java 序列化协议
 *
 */
public class ObjectSerializer implements Serializer<Object> {


    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public byte[] serialize(String topic, Object object) {

        System.out.println("topic : " + topic + " , object : " + object);

        byte[] dataArray = null;

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

        try {
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
            objectOutputStream.writeObject(object);

            dataArray = outputStream.toByteArray();

        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        return dataArray;
    }

    @Override
    public void close() {

    }
}
AI 代码解读

Spring Cloud Stream整合

改造 user-service-provider 消息接收器(Sink)

引入 spring-cloud-stream-binder-kafka

 <!-- 依赖 Spring Cloud Stream Binder Kafka -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
AI 代码解读

用户消息Stream接口定义

package com.segumentfault.spring.cloud.lesson12.user.service;

import com.segumentfault.spring.cloud.lesson12.user.stream.UserMessage;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.hystrix.EnableHystrix;
import org.springframework.cloud.stream.annotation.EnableBinding;

/**
 * 引导类
 *
 */
@SpringBootApplication
@EnableHystrix
@EnableDiscoveryClient // 激活服务发现客户端
@EnableBinding(UserMessage.class) // 激活 Stream Binding 到 UserMessage
public class UserServiceProviderApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserServiceProviderApplication.class, args);
    }
}
AI 代码解读

配置kafka以及Stream Destination

## Spring Cloud Stream Binding 配置
### destination 指定 Kafka Topic
### userMessage 为输入管道名称
spring.cloud.stream.bindings.user-message.destination = sf-users

## Kafka 生产者配置

spring.kafka.BOOTSTRAP-SERVERS=localhost:9092
spring.kafka.consumer.group-id=sf-group
spring.kafka.consumer.clientId=user-service-provider
AI 代码解读

添加 User 消息监听器
SubscribableChannel 实现

package com.segumentfault.spring.cloud.lesson12.user.service.provider.service;

import com.segumentfault.spring.cloud.lesson12.api.UserService;
import com.segumentfault.spring.cloud.lesson12.domain.User;
import com.segumentfault.spring.cloud.lesson12.user.stream.UserMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;

/**
 * 用户 消息服务
 *
 */
@Service
public class UserMessageService {

    @Autowired
    private UserMessage userMessage;

    @Autowired
    @Qualifier("inMemoryUserService")
    private UserService userService;

    private void saveUser(byte[] data) {
        // message body 是字节流 byte[]
        ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
        try {
            ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
            User user = (User) objectInputStream.readObject(); // 反序列化成 User 对象
            userService.saveUser(user);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
  
     @PostConstruct
    public void init() {

        SubscribableChannel subscribableChannel = userMessage.input();
        subscribableChannel.subscribe(message -> {
            System.out.println("Subscribe by SubscribableChannel");
            // message body 是字节流 byte[]
            byte[] body = (byte[]) message.getPayload();
            saveUser(body);

        });
    }

}
AI 代码解读

ServiceActivator 实现

   @ServiceActivator(inputChannel = INPUT)
    public void listen(byte[] data) {
        System.out.println("Subscribe by @ServiceActivator");
        saveUser(data);
    }
AI 代码解读

@StreamListener 实现

 @StreamListener(INPUT)
    public void onMessage(byte[] data) {
        System.out.println("Subscribe by @StreamListener");
        saveUser(data);
    }
AI 代码解读

改造 user-service-client 消息发送源( Stream Binder : Rabbit MQ)
增加 spring-cloud-stream-binder-rabbitmq 依赖

<!-- 整合 Spring Cloud Stream Binder Rabbit MQ -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
AI 代码解读

配置发送源管道
添加用户消息接口

package com.segumentfault.spring.cloud.lesson12.user.service.client.stream;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * 用户消息(输出)
 *
 */
public interface UserMessage {

    @Output("user-message-out")
    MessageChannel output();

}
AI 代码解读

激活用户消息接口

package com.segumentfault.spring.cloud.lesson12.user.service.client;

import com.netflix.loadbalancer.IRule;
import com.segumentfault.spring.cloud.lesson12.api.UserService;
import com.segumentfault.spring.cloud.lesson12.user.service.client.rule.MyRule;
import com.segumentfault.spring.cloud.lesson12.user.service.client.stream.UserMessage;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.netflix.feign.EnableFeignClients;
import org.springframework.cloud.netflix.ribbon.RibbonClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

/**
 * 引导类
 *
 */
@SpringBootApplication
@RibbonClient("user-service-provider") // 指定目标应用名称
@EnableCircuitBreaker // 使用服务短路
@EnableFeignClients(clients = UserService.class) // 申明 UserService 接口作为 Feign Client 调用
@EnableDiscoveryClient // 激活服务发现客户端
@EnableBinding(UserMessage.class)
public class UserServiceClientApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserServiceClientApplication.class, args);
    }

    /**
     * 将 {@link MyRule} 暴露成 {@link Bean}
     *
     * @return {@link MyRule}
     */
    @Bean
    public IRule myRule() {
        return new MyRule();
    }

    /**
     * 申明 具有负载均衡能力 {@link RestTemplate}
     *
     * @return
     */
    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

}
AI 代码解读

实现消息发送到 RabbitMQ

@PostMapping("/user/save/message/rabbit")
    public boolean saveUserByRabbitMessage(@RequestBody User user) throws JsonProcessingException {
        MessageChannel messageChannel = userMessage.output();
        // User 序列化成 JSON
        String payload = objectMapper.writeValueAsString(user);
        GenericMessage<String> message = new GenericMessage<String>(payload);
        // 发送消息
        return messageChannel.send(message);
    }
AI 代码解读

启动 Rabbit MQ
改造 user-service-provider 消息接收器( Stream Binder : Rabbit MQ)
替换依赖

<!--&lt;!&ndash; 依赖 Spring Cloud Stream Binder Kafka &ndash;&gt;-->
        <!--<dependency>-->
            <!--<groupId>org.springframework.cloud</groupId>-->
            <!--<artifactId>spring-cloud-stream-binder-kafka</artifactId>-->
        <!--</dependency>-->

        <!-- 整合 Spring Cloud Stream Binder Rabbit MQ -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
AI 代码解读
目录
打赏
0
0
0
0
9
分享
相关文章
深入理解Spring Cloud中的服务网关
深入理解Spring Cloud中的服务网关
重磅发布!AI 驱动的 Java 开发框架:Spring AI Alibaba
随着生成式 AI 的快速发展,基于 AI 开发框架构建 AI 应用的诉求迅速增长,涌现出了包括 LangChain、LlamaIndex 等开发框架,但大部分框架只提供了 Python 语言的实现。但这些开发框架对于国内习惯了 Spring 开发范式的 Java 开发者而言,并非十分友好和丝滑。因此,我们基于 Spring AI 发布并快速演进 Spring AI Alibaba,通过提供一种方便的 API 抽象,帮助 Java 开发者简化 AI 应用的开发。同时,提供了完整的开源配套,包括可观测、网关、消息队列、配置中心等。
6110 73
Spring Cloud Alibaba 集成分布式定时任务调度功能
Spring Cloud Alibaba 发布了 Scheduling 任务调度模块 [#3732]提供了一套开源、轻量级、高可用的定时任务解决方案,帮助您快速开发微服务体系下的分布式定时任务。
15540 85
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
101 6
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
138 5
|
7月前
|
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
128 5
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
172 1
Spring cloud gateway 如何在路由时进行负载均衡
Spring cloud gateway 如何在路由时进行负载均衡
1295 15
|
12月前
|
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
240 3
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问