(十七) 整合spring cloud云架构 -消息驱动 Spring Cloud Stream

简介: 简介: Spring cloud Stream 数据流操作开发包,封装了与Redis,Rabbit、Kafka等发送接收消息。 2. 使用工具: rabbit,具体的下载和安装细节我这里不做太多讲解,网上的实例太多了 3.

在使用spring cloud云架构的时候,我们不得不使用Spring cloud Stream,因为消息中间件的使用在项目中无处不在,我们公司后面做了娱乐方面的APP,在使用spring cloud做架构的时候,其中消息的异步通知,业务的异步处理都需要使用消息中间件机制。spring cloud的官方给出的集成建议(使用rabbit mq和kafka),我看了一下源码和配置,只要把rabbit mq集成,kafka只是换了一个pom配置jar包而已,闲话少说,我们就直接进入配置实施:

  1. 简介:
    Spring cloud Stream 数据流操作开发包,封装了与Redis,Rabbit、Kafka等发送接收消息。
  2. 使用工具:
    rabbit,具体的下载和安装细节我这里不做太多讲解,网上的实例太多了
  3. 创建commonservice-mq-producer消息的发送者项目,在pom里面配置stream-rabbit的依赖
<span style="font-size: 16px;"><!-- 引入MQ消息驱动的微服务包,引入stream只需要进行配置化即可,是对rabbit、kafka很好的封装 -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency></span>

4.在yml文件里面配置rabbit mq

<span style="font-size: 16px;">server:
  port: 5666
spring:
  application:
    name: commonservice-mq-producer
  profiles: 
    active: dev
  cloud:
    config:
      discovery: 
        enabled: true
        service-id: commonservice-config-server
  <span style="color: rgb(255, 0, 0);"># rabbitmq和kafka都有相关配置的默认值,如果修改,可以再次进行配置
    stream:
      bindings:
        mqScoreOutput: 
          destination: honghu_exchange
          contentType: application/json
          
  rabbitmq:
     host: localhost
     port: 5672
     username: honghu
     password: honghu</span>
eureka: 
  client:
    service-url:
      defaultZone: http://honghu:123456@localhost:8761/eureka
  instance:
    prefer-ip-address: true</span>

5定义接口ProducerService

<span style="font-size: 16px;">package com.honghu.cloud.producer;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;

public interface ProducerService {
    
    String SCORE_OUPUT = "mqScoreOutput";
    
    @Output(ProducerService.SCORE_OUPUT)
    SubscribableChannel sendMessage();
}</span>

6 定义绑定

<span style="font-size: 16px;">package com.honghu.cloud.producer;

import org.springframework.cloud.stream.annotation.EnableBinding;

@EnableBinding(ProducerService.class)
public class SendServerConfig {

}</span>

7定义发送消息业务ProducerController

<span style="font-size: 16px;">package com.honghu.cloud.controller;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.honghu.cloud.common.code.ResponseCode;
import com.honghu.cloud.common.code.ResponseVO;
import com.honghu.cloud.entity.User;
import com.honghu.cloud.producer.ProducerService;

import net.sf.json.JSONObject;

@RestController
@RequestMapping(value = "producer")
public class ProducerController {
    
    @Autowired
    private ProducerService producerService;
    
    
    /**
     * 通过get方式发送</span>对象<span style="font-size: 16px;">
     * @param name 路径参数
     * @return 成功|失败
     */
    @RequestMapping(value = "/sendObj", method = RequestMethod.GET)
    public ResponseVO sendObj() {
        User user = new User(1, "hello User");
        <span style="color: rgb(255, 0, 0);">Message<User> msg = MessageBuilder.withPayload(user).build();</span>
        boolean result = producerService.sendMessage().send(msg);
        if(result){
            return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);
        }
        return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);
    }
    
    
    /**
     * 通过get方式发送字符串消息
     * @param name 路径参数
     * @return 成功|失败
     */
    @RequestMapping(value = "/send/{name}", method = RequestMethod.GET)
    public ResponseVO send(@PathVariable(value = "name", required = true) String name) {
        Message msg = MessageBuilder.withPayload(name.getBytes()).build();
        boolean result = producerService.sendMessage().send(msg);
        if(result){
            return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);
        }
        return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);
    }
    
    /**
     * 通过post方式发送</span>json对象<span style="font-size: 16px;">
     * @param name 路径参数
     * @return 成功|失败
     */
    @RequestMapping(value = "/sendJsonObj", method = RequestMethod.POST)
    public ResponseVO sendJsonObj(@RequestBody JSONObject jsonObj) {
        Message<JSONObject> msg = MessageBuilder.withPayload(jsonObj).build();
        boolean result = producerService.sendMessage().send(msg);
        if(result){
            return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);
        }
        return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);
    }
}
</span>

8创建commonservice-mq-consumer1消息的消费者项目,在pom里面配置stream-rabbit的依赖

<!-- 引入MQ消息驱动的微服务包,引入stream只需要进行配置化即可,是对rabbit、kafka很好的封装 -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

9在yml文件中配置:

server:
  port: 5111
spring:
  application:
    name: commonservice-mq-consumer1
  profiles: 
    active: dev
  cloud:
    config:
      discovery: 
        enabled: true
        service-id: commonservice-config-server
        
    <span style="color: rgb(255, 0, 0);">stream:
      bindings:
        mqScoreInput:
          group: honghu_queue
          destination: honghu_exchange
          contentType: application/json
          
  rabbitmq:
     host: localhost
     port: 5672
     username: honghu
     password: honghu</span>
eureka: 
  client:
    service-url:
      defaultZone: http://honghu:123456@localhost:8761/eureka
  instance:
    prefer-ip-address: true

10定义接口ConsumerService

package com.honghu.cloud.consumer;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface ConsumerService {
    
    <span style="color: rgb(255, 0, 0);">String SCORE_INPUT = "mqScoreInput";

    @Input(ConsumerService.SCORE_INPUT)
    SubscribableChannel sendMessage();</span>

}

11 定义启动类和消息消费

package com.honghu.cloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

import com.honghu.cloud.consumer.ConsumerService;
import com.honghu.cloud.entity.User;

@EnableEurekaClient
@SpringBootApplication
@EnableBinding(ConsumerService.class) //可以绑定多个接口
public class ConsumerApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
    
    <span style="color: rgb(255, 0, 0);">@StreamListener(ConsumerService.SCORE_INPUT)
    public void onMessage(Object obj) {
        System.out.println("消费者1,接收到的消息:" + obj);
    }</span>

}

12 分别启动commonservice-mq-producer、commonservice-mq-consumer1

13 通过postman来验证消息的发送和接收

可以看到接收到了消息,下一章我们介绍mq的集群方案。

到此,整个消息中心方案集成完毕
欢迎大家和我一起学习spring cloud构建微服务云架构,我这边会将近期研发的spring cloud微服务云架构的搭建过程和精髓记录下来,帮助更多有兴趣研发spring cloud框架的朋友,大家来一起探讨spring cloud架构的搭建过程及如何运用于企业项目。

目录
相关文章
|
6月前
|
消息中间件 Cloud Native Java
【Spring云原生系列】SpringBoot+Spring Cloud Stream:消息驱动架构(MDA)解析,实现异步处理与解耦合
【Spring云原生系列】SpringBoot+Spring Cloud Stream:消息驱动架构(MDA)解析,实现异步处理与解耦合
|
6月前
|
消息中间件 存储 Java
【Spring Cloud Stream 消息驱动】 —— 每天一点小知识
【Spring Cloud Stream 消息驱动】 —— 每天一点小知识
|
消息中间件 Java Kafka
Spring Cloud Stream:简化消息驱动微服务
在微服务架构中,消息驱动是一种重要的通信方式,它允许各个微服务之间进行异步通信,降低了耦合度,提高了系统的可伸缩性和可维护性。Spring Cloud Stream 是一个用于构建消息驱动微服务的框架,它提供了一种简单且一致的方式来处理消息。
|
XML Java 数据格式
Spring注解驱动开发系列(一)Spring容器组件的注册
Spring注解驱动开发系列(一)Spring容器组件的注册
|
2月前
|
人工智能 开发框架 Java
重磅发布!AI 驱动的 Java 开发框架:Spring AI Alibaba
随着生成式 AI 的快速发展,基于 AI 开发框架构建 AI 应用的诉求迅速增长,涌现出了包括 LangChain、LlamaIndex 等开发框架,但大部分框架只提供了 Python 语言的实现。但这些开发框架对于国内习惯了 Spring 开发范式的 Java 开发者而言,并非十分友好和丝滑。因此,我们基于 Spring AI 发布并快速演进 Spring AI Alibaba,通过提供一种方便的 API 抽象,帮助 Java 开发者简化 AI 应用的开发。同时,提供了完整的开源配套,包括可观测、网关、消息队列、配置中心等。
2109 21
|
3月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
65 0
|
5月前
|
消息中间件 设计模式 监控
Spring Boot中的消息驱动开发
Spring Boot中的消息驱动开发
|
5月前
|
缓存 Java 数据库连接
spring中注解驱动事务框架的源码
spring中注解驱动事务框架的源码
|
6月前
|
消息中间件 Java RocketMQ
Spring Cloud RocketMQ:构建可靠消息驱动的微服务架构
【4月更文挑战第28天】消息队列在微服务架构中扮演着至关重要的角色,能够实现服务之间的解耦、异步通信以及数据分发。Spring Cloud RocketMQ作为Apache RocketMQ的Spring Cloud集成,为微服务架构提供了可靠的消息传输机制。
253 1
|
6月前
|
XML Java 关系型数据库
注解驱动事务:Spring中基于注解的事务属性配置详解
注解驱动事务:Spring中基于注解的事务属性配置详解
109 0
注解驱动事务:Spring中基于注解的事务属性配置详解