【java_wxid项目】【第十四章】【Spring Cloud Stream集成】

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 【java_wxid项目】【第十四章】【Spring Cloud Stream集成】

主项目链接:https://gitee.com/java_wxid/java_wxid

项目架构及博文总结:


  • 点击:【使用Spring Boot快速构建应用】
  • 点击:【使用Spring Cloud Open Feign基于动态代理动态构造请求实现与其他系统进行交互】
  • 点击:【使用Spring Cloud Hystrix实现服务容错、熔断、降级、监控】
  • 点击:【使用Spring Cloud Ribbon以库的方式集成到服务的消费方实现客户端负载均衡】
  • 点击:【使用Spring Cloud Gateway作为API网关服务进行请求拦截、服务分发、降级、限流】
  • 点击:【使用Spring Cloud Security Oauth2作为微服务统一认证中心实现用户认证和授权访问】
  • 点击:【使用Spring Cloud Stream作为消息驱动用于动态的切换中间件】
  • 点击:【使用Spring Cloud Skywalking基于字节码注入通过探针方式进行链路追踪、分布式追踪、性能指标分析、应用和服务依赖分析】
  • 点击:【使用Spring Cloud Alibaba Nacos实现服务注册/发现/续约/剔除/下线、心跳检测、服务配置管理、基于长轮训机制实现配置动态变更】
  • 点击:【使用Spring Cloud Alibaba Seata作为对项目代码无入侵的分布式事务解决方案】
  • 点击:【使用Spring Cloud Alibaba Sentinel实现高可用流量防护】
  • 点击:【使用Apache ShardingSphere作为关系型数据库中间件实现分库分表、读写分离】
  • 点击:【使用Apache Mybatis作为持久层框架用于定制化SQL、存储过程以及高级映射】
  • 点击:【使用Redis作为高性能分布式缓存数据库】
  • 点击:【使用ElasticSearch全文搜索】
  • 点击:【使用MongoDB非关系型数据库】
  • 点击:【使用xxl-job作为分布式任务调度平台】
  • 点击:【使用Elasticsearch + Logstash + Kibana作为日志收集系统】
  • 点击:【使用Apifox作为API文档、API调试、API Mock、API自动化测试】
  • 点击:【使用Apache Spark作为基于内存计算的大数据分析引擎用于批处理、交互式查询】
  • 点击:【使用ETL工具将数据源抽取到HDFS作为高可靠、高吞吐量的分布式文件系统存储,通过Hive清洗、处理和计算原始数据,Hive清洗处理后的结果,将存入Hbase,海量数据随机查询场景从HBase查询数据】
  • 点击:【使用领域驱动DDD设计和设计模式进行开发】
  • 点击:【使用Netty基于Java NIO封装的高性能的网络通信框架】
  • 点击:【使用k8s、docker、docker-compose、宝塔面板进行环境搭建和部署】
  • 点击:【使用Vue渐进式JavaScript框架作为适用场景丰富的Web前端框架】
  • 点击:【分享人才筛选、工作分配、高效办公、项目推动等团队管理经验】


项目模块:


前期规划,实现部分


java_wxid   
├── demo                                                            // 演示模块
│     └── 模块名称:apache-mybatis-demo模块                            //Apache Mybatis集成(已实现并有博文总结)
│     └── 模块名称:apache-shardingsphere-demo模块                     //Apache ShardingSphere集成(已实现并有博文总结)
│     └── 模块名称:design-demo模块                                    //设计模式实战落地(已实现并有博文总结)
│     └── 模块名称:elasticsearch-demo模块                             //ElasticSearch集成(已实现并有博文总结)
│     └── 模块名称:mongodb-demo模块                                   //MongoDB集成(已实现并有博文总结)
│     └── 模块名称:redis-demo模块                                     //Redis集成(已实现并有博文总结)
│     └── 模块名称:spring-boot-demo模块                               //Spring Boot快速构建应用(已实现并有博文总结)
│     └── 模块名称:spring-cloud-alibaba-nacos-demo模块                //Spring Cloud Alibaba Nacos集成(已实现并有博文总结)
│     └── 模块名称:spring-cloud-alibaba-seata-demo模块                //Spring Cloud Alibaba Seata集成(已实现并有博文总结)
│     └── 模块名称:spring-cloud-alibaba-sentinel-demo模块             //Spring Cloud Alibaba Sentinel集成(已实现并有博文总结)
│     └── 模块名称:spring-cloud-gateway-demo模块                      //Spring Cloud Gateway集成(已实现并有博文总结)
│     └── 模块名称:spring-cloud-hystrix-demo模块                      //Spring Cloud Hystrix集成(已实现并有博文总结)
│     └── 模块名称:spring-cloud-open-feign-demo模块                   //Spring Cloud Open Feign集成(已实现并有博文总结)
│     └── 模块名称:spring-cloud-ribbon-demo模块                       //Spring Cloud Ribbon集成(已实现并有博文总结)
│     └── 模块名称:spring-cloud-security-oauth2-demo模块              //Spring Cloud Security Oauth2集成(已实现并有博文总结)
│     └── 模块名称:spring-cloud-security-oauth2-sso-client-demo模块   //Spring Cloud Security Oauth2集成(已实现并有博文总结)
│     └── 模块名称:spring-cloud-skywalking-demo模块                   //Spring Cloud Skywalking集成(已实现并有博文总结)
│     └── 模块名称:spring-cloud-stream-demo模块                       //Spring Cloud Stream集成(已实现并有博文总结)
│     └── 模块名称:swagger-demo模块                                   //springfox-swagger2集成(已实现并有博文总结)
│     └── 模块名称:xxl-job模块                                        //xxl-job集成(已实现并有博文总结)
│     └── 模块名称:apache-spark-demo模块                              //Apache Spark集成
│     └── 模块名称:etl-hdfs-hive-hbase-demo模块                       //ETL、HDFS、Hive、Hbase集成
│     └── 模块名称:ddd-mode-demo模块                                  //DDD领域设计
│     └── 模块名称:netty-demo模块                                     //Netty集成
│     └── 模块名称:vue-demo模块                                       //前端vue集成
├── document                                                        // 文档
│     └── JavaKnowledgeDocument                                     //java知识点
│           └── java基础知识点.md                     
│           └── mq知识点.md
│           └── mysql知识点.md
│           └── redis知识点.md
│           └── springcould知识点.md
│           └── spring知识点.md
│     └── FounderDocument                                           //创始人
│           └── 创始人.md


系列文章:快速集成各种微服务相关的技术,帮助大家可以快速集成到自己的项目中,节约开发时间。

提示:系列文章还未全部完成,后续的文章,会慢慢补充进去的。


文章目录


创建elasticsearch-demo项目

修改pom.xml

修改SpringCloudStreamDemoApplication

创建application.yml

创建KafkaConsumer

创建RabbitmqConsumer

创建RocketmqConsumer

创建KafkaController

创建RabbitmqController

创建RocketmqController

创建StreamSink

创建StreamSource

创建StreamProducer

创建RabbitMQUtil

校验是否正常工作

使用rocketmq发消息


创建elasticsearch-demo项目


项目代码:https://gitee.com/java_wxid/java_wxid/tree/master/demo/spring-cloud-stream-demo


项目结构如下(示例):


17d9256734e84457a3b43955a225e0ae.png


修改pom.xml


代码如下(示例):


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>spring-cloud-stream-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-cloud-stream-demo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <!--引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,进行依赖版本的管理,防止不兼容。
        在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 开发团队推荐了三者的依赖关系-->
        <spring.boot.version>2.3.12.RELEASE</spring.boot.version>
        <spring.cloud.version>Hoxton.SR12</spring.cloud.version>
        <spring.cloud.alibaba.version>2.2.7.RELEASE</spring.cloud.alibaba.version>
    </properties>
    <!--
        引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,进行依赖版本的管理,防止不兼容。
        在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 开发团队推荐了三者的依赖关系
     -->
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring.cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring.cloud.alibaba.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <!-- 集成spring-cloud-starter-stream-rocketmq -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
        <!-- 集成spring-cloud-starter-stream-kafka -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <!-- 集成spring-cloud-starter-stream-rabbitmq -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.3.3.RELEASE</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>


修改SpringCloudStreamDemoApplication


代码如下(示例):


package com.example.springcloudstreamdemo;
import com.example.springcloudstreamdemo.passage.*;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableBinding(value = {StreamSource.class, StreamSink.class})
@SpringBootApplication
//Spring boot的CommandLineRunner接口主要用于实现在应用初始化后,去执行一段代码块逻辑,这段初始化代码在整个应用生命周期内只会执行一次
public class SpringCloudStreamDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringCloudStreamDemoApplication.class, args);
    }
}


创建application.yml


代码如下(示例):


server:
  port: 8097
spring:
  rabbitmq:
#    host: 106.14.132.94
#    port: 5672
#    username: admin
#    password: java@2022
#    virtual-host: /
    host: 192.168.160.128
    port: 5672
    username: admin
    password: admin
    virtual-host: /
  application:
    name: spring-cloud-stream-demo
  autoconfigure:
    # 使用multiple RabbitMQ binders 时需要排除RabbitAutoConfiguration
    exclude:
      - org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
  cloud:
    stream:
      default-binder: rocketmq #选择默认绑定器
      rocketmq:
        binder:
          name-server: 192.168.160.128:9876
      kafka:
        binder:
          brokers: 106.14.132.94:9092
          # 自动创建Topic
          auto-create-topics: true
      binders: #可以绑定多个消息中间件
        rabbit: #表示定义的名称,用于binding整合 名字可以自定义  在此处配置要绑定的rabbitmq的服务信息
          type: rabbit # 消息组件类型
        rocketmq: #表示定义的名称,用于binding整合 名字可以自定义 在此处配置要绑定的rocket的服务信息
          type: rocketmq
        kafka:
          type: kafka
      bindings: # 服务的整合处理
        rabbitmqOutput:  # 这个名字是一个binding的名称(自定义)
          destination: rabbitmq-destination # 通道,如果用的是RabbitMQ就是交换机名称
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          default-binder: rabbit # 如果没设定,就使用default-binder默认的
          producer:
            routing-key-expression: headers.routingKey   # 发送端路由key
            delayed-exchange: true    # 开启延时队列
          # 指定了消息分区的数量。
          partitionCount: 2
          # 指定分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键;
          partition-key-expression: headers.id1
        rabbitmqInput:  # 这个名字是一个binding的名称(自定义)
          destination: rabbitmq-destination # 通道,如果用的是RabbitMQ就是交换机名称
          content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
          default-binder: rabbit # 设置要绑定的消息服务的具体设置
          group: my-rabbitmq-group  # 分组名称,在rabbit当中其实就是交换机绑定的队列名称
          consumer:
            concurrency: 2 # 初始/最少/空闲时 消费者数量,默认1
            max-attempts: 3 #重试次数
            partitioned: true #通过该参数开启消费者分区功能;
        kafkaOutput: # 通道名称
          destination: kafka-destination # 消息的主题名 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json)
          default-binder: kafka # 如果没设定,就使用default-binder默认的
          # 指定了消息分区的数量。
          partitionCount: 2
          # 指定分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键;
          partition-key-expression: headers.id2
        kafkaInput:
          destination: kafka-destination # 消息的主题名 消息发往的目的地,对应topic
          content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
          default-binder: kafka # 设置要绑定的消息服务的具体设置
          group: my-kafka-group  # 分组名称,在kafka当中其实就是交换机绑定的队列名称
          consumer:
            concurrency: 2 # 初始/最少/空闲时 消费者数量,默认1
            max-attempts: 3 #重试次数
            partitioned: true #通过该参数开启消费者分区功能;
        rocketmqOutput: # 通道名称
          destination: rocket-destination # 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json)
          default-binder: rocketmq # 如果没设定,就使用default-binder默认的
          # 指定了消息分区的数量。
          partitionCount: 2
          # 指定分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键;
          partition-key-expression: headers.id3
        rocketmqInput:
          destination: rocket-destination # 消息发往的目的地,对应topic
          content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
          default-binder: rocketmq # 设置要绑定的消息服务的具体设置
          group: my-rocketmq-group  # 分组名称,在rocket当中其实就是交换机绑定的队列名称
          consumer:
            concurrency: 2 # 初始/最少/空闲时 消费者数量,默认1
            max-attempts: 3 #重试次数
            partitioned: true #通过该参数开启消费者分区功能;


创建KafkaConsumer


代码如下(示例):


package com.example.springcloudstreamdemo.consumer;
import com.example.springcloudstreamdemo.passage.StreamSink;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
/**
 * @author zhiwei Liao
 * @Description
 * @Date create in 2022/9/12 0012 21:03
 */
@Component
@EnableBinding(StreamSink.class)
public class KafkaConsumer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    @StreamListener(target = StreamSink.KAFKAINPUT)
    public void consume(String message) {
        logger.info("Kafka recieved a string message : " + message);
    }
    @StreamListener(target = StreamSink.KAFKAINPUT, condition = "headers['type']=='kafka'")
    public void handle(String message) {
        logger.info("Kafka recieved a complex message: {}",  message);
    }
}


创建RabbitmqConsumer


代码如下(示例):


package com.example.springcloudstreamdemo.consumer;
import com.example.springcloudstreamdemo.passage.StreamSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
/**
 * @author zhiwei Liao
 * @Description
 * @Date create in 2022/9/12 0012 21:03
 */
@Component
@EnableBinding(StreamSink.class)
public class RabbitmqConsumer {
    private static final Logger logger = LoggerFactory.getLogger(RabbitmqConsumer.class);
    @StreamListener(target = StreamSink.RABBITMQINPUT)
    public void consume(String message) {
        logger.info("rabbitmq recieved a string message : " + message);
    }
    @StreamListener(target = StreamSink.RABBITMQINPUT, condition = "headers['type']=='rabbitmq'")
    public void handle(String message) {
        logger.info("rabbitmq recieved a complex message : {}",  message);
    }
}


创建RocketmqConsumer


代码如下(示例):


package com.example.springcloudstreamdemo.consumer;
import com.example.springcloudstreamdemo.passage.StreamSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
/**
 * @author zhiwei Liao
 * @Description
 * @Date create in 2022/9/12 0012 21:03
 */
@Component
@EnableBinding(StreamSink.class)
public class RocketmqConsumer {
    private static final Logger logger = LoggerFactory.getLogger(RocketmqConsumer.class);
    @StreamListener(target = StreamSink.ROCKETMQINPUT)
    public void consume(String message) {
        logger.info("Rocketmq recieved a string message : " + message);
    }
    @StreamListener(target = StreamSink.ROCKETMQINPUT, condition = "headers['type']=='rocketmq'")
    public void handle(String message) {
        logger.info("Rocketmq recieved a complex message : {}", message);
    }
}


创建KafkaController


代码如下(示例):


package com.example.springcloudstreamdemo.controller;
import com.example.springcloudstreamdemo.producer.StraamProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author zhiwei Liao
 * @Description
 * @Date create in 2022/9/12 0012 21:03
 */
@RestController
@RequestMapping("/kafka")
public class KafkaController {
    @Autowired
    private StraamProducer straamProducer;
    /**
     * 发送消息
     * @param payload
     */
    @GetMapping("/sendSucceed")
    public void sendSucceed(String payload) {
        straamProducer.kafkaSendMessage(payload);
    }
}


创建RabbitmqController


代码如下(示例):


package com.example.springcloudstreamdemo.controller;
import com.example.springcloudstreamdemo.producer.StraamProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author zhiwei Liao
 * @Description
 * @Date create in 2022/9/12 0012 21:03
 */
@RestController
@RequestMapping("/rabbitmq")
public class RabbitmqController {
    @Autowired
    private StraamProducer straamProducer;
    /**
     * 发送消息
     * @param payload
     */
    @GetMapping("/sendSucceed")
    public boolean sendSucceed(String payload) {
        return straamProducer.rabbitmqSendMessage(payload);
    }
}


创建RocketmqController


代码如下(示例):


package com.example.springcloudstreamdemo.controller;
import com.example.springcloudstreamdemo.passage.StreamSink;
import com.example.springcloudstreamdemo.producer.StraamProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
 * @author zhiwei Liao
 * @Description
 * @Date create in 2022/9/12 0012 21:03
 */
@EnableBinding(StreamSink.class)
@RestController
@RequestMapping("/rocketmq")
public class RocketmqController {
    @Resource
    private StraamProducer straamProducer;
    @GetMapping("/sendSucceed")
    public boolean sendSucceed(@RequestParam(value="payload") String payload) {
        return straamProducer.rocketmqSendMessage(payload);
    }
}


创建StreamSink


代码如下(示例):


package com.example.springcloudstreamdemo.passage;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
 * @author zhiwei Liao
 * @Description
 * @Date create in 2022/9/12 0012 21:03
 */
public interface StreamSink {
    String KAFKAINPUT = "kafkaInput";
    @Input(KAFKAINPUT)
    SubscribableChannel kafkaInput();
    String ROCKETMQINPUT = "rocketmqInput";
    @Input(ROCKETMQINPUT)
    SubscribableChannel rocketmqInput();
    String RABBITMQINPUT = "rabbitmqInput";
    @Input(RABBITMQINPUT)
    SubscribableChannel rabbitmqInput();
}


创建StreamSource


代码如下(示例):


package com.example.springcloudstreamdemo.passage;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
 * @author zhiwei Liao
 * @Description
 * @Date create in 2022/9/12 0012 21:03
 */
public interface StreamSource {
    String ROCKETMQOUTPUT = "rocketmqOutput";
    @Output(ROCKETMQOUTPUT)
    MessageChannel rocketmqOutput();
    String RABBITMQOUTPUT = "rabbitmqOutput";
    @Output(RABBITMQOUTPUT)
    MessageChannel rabbitmqOutput();
    String KAFKAOUTPUT = "kafkaOutput";
    @Output(KAFKAOUTPUT)
    MessageChannel kafkaOutput();
}


创建StreamProducer


代码如下(示例):


package com.example.springcloudstreamdemo.producer;
import com.example.springcloudstreamdemo.passage.StreamSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
/**
 * @author zhiweiLiao
 * @Description kafka的生产者
 * @Date create in 2022/9/13 0013 14:39
 */
@EnableBinding({StreamSource.class})
public class StreamProducer {
    @Autowired
    private StreamSource streamSource;
    @Autowired
    private StreamBridge streamBridge;
    public boolean rocketmqSendMessage(Object payload) {
        Message<Object> message = MessageBuilder.withPayload(payload)
                .setHeader("type", "rocketmq")
                .setHeader("x-delay", 5000)
                .build();
//        return streamBridge.send("rocketmqOutput",message);
        return streamSource.rocketmqOutput().send(message);
    }
    public boolean rabbitmqSendMessage(Object payload) {
        Message<Object> message = MessageBuilder.withPayload(payload)
                .setHeader("type", "rabbitmq")
                .setHeader("x-delay", 5000)
                .build();
        return streamSource.rabbitmqOutput().send(message);
    }
    public boolean kafkaSendMessage(Object payload) {
        Message<Object> message = MessageBuilder.withPayload(payload)
                .setHeader("type", "kafka")
                .setHeader("x-delay", 5000)
                .build();
        return streamSource.kafkaOutput().send(message);
    }
}


创建RabbitMQUtil


代码如下(示例):


package com.example.springcloudstreamdemo.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQUtil {
    private static ConnectionFactory factory;
    //程序一加载就会启动
    static {
        //1.创建连接工厂对象
        factory=new ConnectionFactory();
        //设置连接对象的参数
        factory.setHost("106.14.132.94");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("java@2022");
        factory.setVirtualHost("/");
    }
    public static Connection getconnection(){
        Connection connection=null;
        try{
            connection=factory.newConnection();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return connection;
    }
    public static void main(String[] args) {
        System.out.println(getconnection());
    }
}


校验是否正常工作


使用rocketmq发消息


如下图(示例):


ec44546ace304c5bbd80f688c33ef1ed.png


调用接口发消息,如下图(示例):


1f8d0ee3a5994229b8278ff965f3ad9f.png


控制台打印,如下图(示例):


0d404d7db9c145aca74b367fa40d4b66.png

相关文章
|
5天前
|
监控 Java Nacos
使用Spring Boot集成Nacos
通过上述步骤,Spring Boot应用可以成功集成Nacos,利用Nacos的服务发现和配置管理功能来提升微服务架构的灵活性和可维护性。通过这种集成,开发者可以更高效地管理和部署微服务。
69 17
|
1天前
|
算法 搜索推荐 Java
【潜意识Java】深度解析黑马项目《苍穹外卖》与蓝桥杯算法的结合问题
本文探讨了如何将算法学习与实际项目相结合,以提升编程竞赛中的解题能力。通过《苍穹外卖》项目,介绍了订单配送路径规划(基于动态规划解决旅行商问题)和商品推荐系统(基于贪心算法)。这些实例不仅展示了算法在实际业务中的应用,还帮助读者更好地准备蓝桥杯等编程竞赛。结合具体代码实现和解析,文章详细说明了如何运用算法优化项目功能,提高解决问题的能力。
23 6
|
7天前
|
缓存 安全 Java
Spring Boot 3 集成 Spring Security + JWT
本文详细介绍了如何使用Spring Boot 3和Spring Security集成JWT,实现前后端分离的安全认证概述了从入门到引入数据库,再到使用JWT的完整流程。列举了项目中用到的关键依赖,如MyBatis-Plus、Hutool等。简要提及了系统配置表、部门表、字典表等表结构。使用Hutool-jwt工具类进行JWT校验。配置忽略路径、禁用CSRF、添加JWT校验过滤器等。实现登录接口,返回token等信息。
134 12
|
1天前
|
Java 数据库连接 数据库
【潜意识Java】深度分析黑马项目《苍穹外卖》在Java学习中的重要性
《苍穹外卖》项目对Java学习至关重要。它涵盖了用户管理、商品查询、订单处理等模块,涉及Spring Boot、MyBatis、Redis等技术栈。
18 4
|
6天前
|
人工智能 安全 Dubbo
Spring AI 智能体通过 MCP 集成本地文件数据
MCP 作为一款开放协议,直接规范了应用程序如何向 LLM 提供上下文。MCP 就像是面向 AI 应用程序的 USB-C 端口,正如 USB-C 提供了一种将设备连接到各种外围设备和配件的标准化方式一样,MCP 提供了一个将 AI 模型连接到不同数据源和工具的标准化方法。
|
15天前
|
Java Spring
Java Spring Boot监听事件和处理事件
通过上述步骤,我们可以在Java Spring Boot应用中实现事件的发布和监听。事件驱动模型可以帮助我们实现组件间的松耦合,提升系统的可维护性和可扩展性。无论是处理业务逻辑还是系统事件,Spring Boot的事件机制都提供了强大的支持和灵活性。希望本文能为您的开发工作提供实用的指导和帮助。
67 15
|
19天前
|
存储 Java BI
java怎么统计每个项目下的每个类别的数据
通过本文,我们详细介绍了如何在Java中统计每个项目下的每个类别的数据,包括数据模型设计、数据存储和统计方法。通过定义 `Category`和 `Project`类,并使用 `ProjectManager`类进行管理,可以轻松实现项目和类别的数据统计。希望本文能够帮助您理解和实现类似的统计需求。
68 17
|
13天前
|
监控 JavaScript 数据可视化
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
|
13天前
|
存储 安全 Java
Spring Boot 3 集成Spring AOP实现系统日志记录
本文介绍了如何在Spring Boot 3中集成Spring AOP实现系统日志记录功能。通过定义`SysLog`注解和配置相应的AOP切面,可以在方法执行前后自动记录日志信息,包括操作的开始时间、结束时间、请求参数、返回结果、异常信息等,并将这些信息保存到数据库中。此外,还使用了`ThreadLocal`变量来存储每个线程独立的日志数据,确保线程安全。文中还展示了项目实战中的部分代码片段,以及基于Spring Boot 3 + Vue 3构建的快速开发框架的简介与内置功能列表。此框架结合了当前主流技术栈,提供了用户管理、权限控制、接口文档自动生成等多项实用特性。
56 8
|
4月前
|
SpringCloudAlibaba API 开发者
新版-SpringCloud+SpringCloud Alibaba
新版-SpringCloud+SpringCloud Alibaba

热门文章

最新文章