主项目链接:https://gitee.com/java_wxid/java_wxid
项目架构及博文总结:
- 点击:【使用Spring Boot快速构建应用】
- 点击:【使用Spring Cloud Open Feign基于动态代理动态构造请求实现与其他系统进行交互】
- 点击:【使用Spring Cloud Hystrix实现服务容错、熔断、降级、监控】
- 点击:【使用Spring Cloud Ribbon以库的方式集成到服务的消费方实现客户端负载均衡】
- 点击:【使用Spring Cloud Gateway作为API网关服务进行请求拦截、服务分发、降级、限流】
- 点击:【使用Spring Cloud Security Oauth2作为微服务统一认证中心实现用户认证和授权访问】
- 点击:【使用Spring Cloud Stream作为消息驱动用于动态的切换中间件】
- 点击:【使用Spring Cloud Skywalking基于字节码注入通过探针方式进行链路追踪、分布式追踪、性能指标分析、应用和服务依赖分析】
- 点击:【使用Spring Cloud Alibaba Nacos实现服务注册/发现/续约/剔除/下线、心跳检测、服务配置管理、基于长轮训机制实现配置动态变更】
- 点击:【使用Spring Cloud Alibaba Seata作为对项目代码无入侵的分布式事务解决方案】
- 点击:【使用Spring Cloud Alibaba Sentinel实现高可用流量防护】
- 点击:【使用Apache ShardingSphere作为关系型数据库中间件实现分库分表、读写分离】
- 点击:【使用Apache Mybatis作为持久层框架用于定制化SQL、存储过程以及高级映射】
- 点击:【使用Redis作为高性能分布式缓存数据库】
- 点击:【使用ElasticSearch全文搜索】
- 点击:【使用MongoDB非关系型数据库】
- 点击:【使用xxl-job作为分布式任务调度平台】
- 点击:【使用Elasticsearch + Logstash + Kibana作为日志收集系统】
- 点击:【使用Apifox作为API文档、API调试、API Mock、API自动化测试】
- 点击:【使用Apache Spark作为基于内存计算的大数据分析引擎用于批处理、交互式查询】
- 点击:【使用ETL工具将数据源抽取到HDFS作为高可靠、高吞吐量的分布式文件系统存储,通过Hive清洗、处理和计算原始数据,Hive清洗处理后的结果,将存入Hbase,海量数据随机查询场景从HBase查询数据】
- 点击:【使用领域驱动DDD设计和设计模式进行开发】
- 点击:【使用Netty基于Java NIO封装的高性能的网络通信框架】
- 点击:【使用k8s、docker、docker-compose、宝塔面板进行环境搭建和部署】
- 点击:【使用Vue渐进式JavaScript框架作为适用场景丰富的Web前端框架】
- 点击:【分享人才筛选、工作分配、高效办公、项目推动等团队管理经验】
项目模块:
前期规划,实现部分
java_wxid ├── demo // 演示模块 │ └── 模块名称:apache-mybatis-demo模块 //Apache Mybatis集成(已实现并有博文总结) │ └── 模块名称:apache-shardingsphere-demo模块 //Apache ShardingSphere集成(已实现并有博文总结) │ └── 模块名称:design-demo模块 //设计模式实战落地(已实现并有博文总结) │ └── 模块名称:elasticsearch-demo模块 //ElasticSearch集成(已实现并有博文总结) │ └── 模块名称:mongodb-demo模块 //MongoDB集成(已实现并有博文总结) │ └── 模块名称:redis-demo模块 //Redis集成(已实现并有博文总结) │ └── 模块名称:spring-boot-demo模块 //Spring Boot快速构建应用(已实现并有博文总结) │ └── 模块名称:spring-cloud-alibaba-nacos-demo模块 //Spring Cloud Alibaba Nacos集成(已实现并有博文总结) │ └── 模块名称:spring-cloud-alibaba-seata-demo模块 //Spring Cloud Alibaba Seata集成(已实现并有博文总结) │ └── 模块名称:spring-cloud-alibaba-sentinel-demo模块 //Spring Cloud Alibaba Sentinel集成(已实现并有博文总结) │ └── 模块名称:spring-cloud-gateway-demo模块 //Spring Cloud Gateway集成(已实现并有博文总结) │ └── 模块名称:spring-cloud-hystrix-demo模块 //Spring Cloud Hystrix集成(已实现并有博文总结) │ └── 模块名称:spring-cloud-open-feign-demo模块 //Spring Cloud Open Feign集成(已实现并有博文总结) │ └── 模块名称:spring-cloud-ribbon-demo模块 //Spring Cloud Ribbon集成(已实现并有博文总结) │ └── 模块名称:spring-cloud-security-oauth2-demo模块 //Spring Cloud Security Oauth2集成(已实现并有博文总结) │ └── 模块名称:spring-cloud-security-oauth2-sso-client-demo模块 //Spring Cloud Security Oauth2集成(已实现并有博文总结) │ └── 模块名称:spring-cloud-skywalking-demo模块 //Spring Cloud Skywalking集成(已实现并有博文总结) │ └── 模块名称:spring-cloud-stream-demo模块 //Spring Cloud Stream集成(已实现并有博文总结) │ └── 模块名称:swagger-demo模块 //springfox-swagger2集成(已实现并有博文总结) │ └── 模块名称:xxl-job模块 //xxl-job集成(已实现并有博文总结) │ └── 模块名称:apache-spark-demo模块 //Apache Spark集成 │ └── 模块名称:etl-hdfs-hive-hbase-demo模块 //ETL、HDFS、Hive、Hbase集成 │ └── 模块名称:ddd-mode-demo模块 //DDD领域设计 │ └── 模块名称:netty-demo模块 //Netty集成 │ └── 模块名称:vue-demo模块 //前端vue集成 ├── document // 文档 │ └── JavaKnowledgeDocument //java知识点 │ └── java基础知识点.md │ └── mq知识点.md │ └── mysql知识点.md │ └── redis知识点.md │ └── springcould知识点.md │ └── spring知识点.md │ └── FounderDocument //创始人 │ └── 创始人.md
系列文章:快速集成各种微服务相关的技术,帮助大家可以快速集成到自己的项目中,节约开发时间。
提示:系列文章还未全部完成,后续的文章,会慢慢补充进去的。
文章目录
创建elasticsearch-demo项目
修改pom.xml
修改SpringCloudStreamDemoApplication
创建application.yml
创建KafkaConsumer
创建RabbitmqConsumer
创建RocketmqConsumer
创建KafkaController
创建RabbitmqController
创建RocketmqController
创建StreamSink
创建StreamSource
创建StreamProducer
创建RabbitMQUtil
校验是否正常工作
使用rocketmq发消息
创建elasticsearch-demo项目
项目代码:https://gitee.com/java_wxid/java_wxid/tree/master/demo/spring-cloud-stream-demo
项目结构如下(示例):
修改pom.xml
代码如下(示例):
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>spring-cloud-stream-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-cloud-stream-demo</name> <description>Demo project for Spring Boot</description> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <!--引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,进行依赖版本的管理,防止不兼容。 在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 开发团队推荐了三者的依赖关系--> <spring.boot.version>2.3.12.RELEASE</spring.boot.version> <spring.cloud.version>Hoxton.SR12</spring.cloud.version> <spring.cloud.alibaba.version>2.2.7.RELEASE</spring.cloud.alibaba.version> </properties> <!-- 引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,进行依赖版本的管理,防止不兼容。 在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 开发团队推荐了三者的依赖关系 --> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring.cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>${spring.cloud.alibaba.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <!-- 集成spring-cloud-starter-stream-rocketmq --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> <version>2.0.1.RELEASE</version> </dependency> <!-- 集成spring-cloud-starter-stream-kafka --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> <version>2.0.1.RELEASE</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency> <!-- 集成spring-cloud-starter-stream-rabbitmq --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <version>2.0.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.3.3.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
修改SpringCloudStreamDemoApplication
代码如下(示例):
package com.example.springcloudstreamdemo; import com.example.springcloudstreamdemo.passage.*; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; @EnableBinding(value = {StreamSource.class, StreamSink.class}) @SpringBootApplication //Spring boot的CommandLineRunner接口主要用于实现在应用初始化后,去执行一段代码块逻辑,这段初始化代码在整个应用生命周期内只会执行一次 public class SpringCloudStreamDemoApplication { public static void main(String[] args) { SpringApplication.run(SpringCloudStreamDemoApplication.class, args); } }
创建application.yml
代码如下(示例):
server: port: 8097 spring: rabbitmq: # host: 106.14.132.94 # port: 5672 # username: admin # password: java@2022 # virtual-host: / host: 192.168.160.128 port: 5672 username: admin password: admin virtual-host: / application: name: spring-cloud-stream-demo autoconfigure: # 使用multiple RabbitMQ binders 时需要排除RabbitAutoConfiguration exclude: - org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration cloud: stream: default-binder: rocketmq #选择默认绑定器 rocketmq: binder: name-server: 192.168.160.128:9876 kafka: binder: brokers: 106.14.132.94:9092 # 自动创建Topic auto-create-topics: true binders: #可以绑定多个消息中间件 rabbit: #表示定义的名称,用于binding整合 名字可以自定义 在此处配置要绑定的rabbitmq的服务信息 type: rabbit # 消息组件类型 rocketmq: #表示定义的名称,用于binding整合 名字可以自定义 在此处配置要绑定的rocket的服务信息 type: rocketmq kafka: type: kafka bindings: # 服务的整合处理 rabbitmqOutput: # 这个名字是一个binding的名称(自定义) destination: rabbitmq-destination # 通道,如果用的是RabbitMQ就是交换机名称 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” default-binder: rabbit # 如果没设定,就使用default-binder默认的 producer: routing-key-expression: headers.routingKey # 发送端路由key delayed-exchange: true # 开启延时队列 # 指定了消息分区的数量。 partitionCount: 2 # 指定分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键; partition-key-expression: headers.id1 rabbitmqInput: # 这个名字是一个binding的名称(自定义) destination: rabbitmq-destination # 通道,如果用的是RabbitMQ就是交换机名称 content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain” default-binder: rabbit # 设置要绑定的消息服务的具体设置 group: my-rabbitmq-group # 分组名称,在rabbit当中其实就是交换机绑定的队列名称 consumer: concurrency: 2 # 初始/最少/空闲时 消费者数量,默认1 max-attempts: 3 #重试次数 partitioned: true #通过该参数开启消费者分区功能; kafkaOutput: # 通道名称 destination: kafka-destination # 消息的主题名 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json) default-binder: kafka # 如果没设定,就使用default-binder默认的 # 指定了消息分区的数量。 partitionCount: 2 # 指定分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键; partition-key-expression: headers.id2 kafkaInput: destination: kafka-destination # 消息的主题名 消息发往的目的地,对应topic content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain” default-binder: kafka # 设置要绑定的消息服务的具体设置 group: my-kafka-group # 分组名称,在kafka当中其实就是交换机绑定的队列名称 consumer: concurrency: 2 # 初始/最少/空闲时 消费者数量,默认1 max-attempts: 3 #重试次数 partitioned: true #通过该参数开启消费者分区功能; rocketmqOutput: # 通道名称 destination: rocket-destination # 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json) default-binder: rocketmq # 如果没设定,就使用default-binder默认的 # 指定了消息分区的数量。 partitionCount: 2 # 指定分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键; partition-key-expression: headers.id3 rocketmqInput: destination: rocket-destination # 消息发往的目的地,对应topic content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain” default-binder: rocketmq # 设置要绑定的消息服务的具体设置 group: my-rocketmq-group # 分组名称,在rocket当中其实就是交换机绑定的队列名称 consumer: concurrency: 2 # 初始/最少/空闲时 消费者数量,默认1 max-attempts: 3 #重试次数 partitioned: true #通过该参数开启消费者分区功能;
创建KafkaConsumer
代码如下(示例):
package com.example.springcloudstreamdemo.consumer; import com.example.springcloudstreamdemo.passage.StreamSink; import org.springframework.cloud.stream.annotation.EnableBinding; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * @author zhiwei Liao * @Description * @Date create in 2022/9/12 0012 21:03 */ @Component @EnableBinding(StreamSink.class) public class KafkaConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); @StreamListener(target = StreamSink.KAFKAINPUT) public void consume(String message) { logger.info("Kafka recieved a string message : " + message); } @StreamListener(target = StreamSink.KAFKAINPUT, condition = "headers['type']=='kafka'") public void handle(String message) { logger.info("Kafka recieved a complex message: {}", message); } }
创建RabbitmqConsumer
代码如下(示例):
package com.example.springcloudstreamdemo.consumer; import com.example.springcloudstreamdemo.passage.StreamSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * @author zhiwei Liao * @Description * @Date create in 2022/9/12 0012 21:03 */ @Component @EnableBinding(StreamSink.class) public class RabbitmqConsumer { private static final Logger logger = LoggerFactory.getLogger(RabbitmqConsumer.class); @StreamListener(target = StreamSink.RABBITMQINPUT) public void consume(String message) { logger.info("rabbitmq recieved a string message : " + message); } @StreamListener(target = StreamSink.RABBITMQINPUT, condition = "headers['type']=='rabbitmq'") public void handle(String message) { logger.info("rabbitmq recieved a complex message : {}", message); } }
创建RocketmqConsumer
代码如下(示例):
package com.example.springcloudstreamdemo.consumer; import com.example.springcloudstreamdemo.passage.StreamSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * @author zhiwei Liao * @Description * @Date create in 2022/9/12 0012 21:03 */ @Component @EnableBinding(StreamSink.class) public class RocketmqConsumer { private static final Logger logger = LoggerFactory.getLogger(RocketmqConsumer.class); @StreamListener(target = StreamSink.ROCKETMQINPUT) public void consume(String message) { logger.info("Rocketmq recieved a string message : " + message); } @StreamListener(target = StreamSink.ROCKETMQINPUT, condition = "headers['type']=='rocketmq'") public void handle(String message) { logger.info("Rocketmq recieved a complex message : {}", message); } }
创建KafkaController
代码如下(示例):
package com.example.springcloudstreamdemo.controller; import com.example.springcloudstreamdemo.producer.StraamProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author zhiwei Liao * @Description * @Date create in 2022/9/12 0012 21:03 */ @RestController @RequestMapping("/kafka") public class KafkaController { @Autowired private StraamProducer straamProducer; /** * 发送消息 * @param payload */ @GetMapping("/sendSucceed") public void sendSucceed(String payload) { straamProducer.kafkaSendMessage(payload); } }
创建RabbitmqController
代码如下(示例):
package com.example.springcloudstreamdemo.controller; import com.example.springcloudstreamdemo.producer.StraamProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author zhiwei Liao * @Description * @Date create in 2022/9/12 0012 21:03 */ @RestController @RequestMapping("/rabbitmq") public class RabbitmqController { @Autowired private StraamProducer straamProducer; /** * 发送消息 * @param payload */ @GetMapping("/sendSucceed") public boolean sendSucceed(String payload) { return straamProducer.rabbitmqSendMessage(payload); } }
创建RocketmqController
代码如下(示例):
package com.example.springcloudstreamdemo.controller; import com.example.springcloudstreamdemo.passage.StreamSink; import com.example.springcloudstreamdemo.producer.StraamProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * @author zhiwei Liao * @Description * @Date create in 2022/9/12 0012 21:03 */ @EnableBinding(StreamSink.class) @RestController @RequestMapping("/rocketmq") public class RocketmqController { @Resource private StraamProducer straamProducer; @GetMapping("/sendSucceed") public boolean sendSucceed(@RequestParam(value="payload") String payload) { return straamProducer.rocketmqSendMessage(payload); } }
创建StreamSink
代码如下(示例):
package com.example.springcloudstreamdemo.passage; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; /** * @author zhiwei Liao * @Description * @Date create in 2022/9/12 0012 21:03 */ public interface StreamSink { String KAFKAINPUT = "kafkaInput"; @Input(KAFKAINPUT) SubscribableChannel kafkaInput(); String ROCKETMQINPUT = "rocketmqInput"; @Input(ROCKETMQINPUT) SubscribableChannel rocketmqInput(); String RABBITMQINPUT = "rabbitmqInput"; @Input(RABBITMQINPUT) SubscribableChannel rabbitmqInput(); }
创建StreamSource
代码如下(示例):
package com.example.springcloudstreamdemo.passage; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * @author zhiwei Liao * @Description * @Date create in 2022/9/12 0012 21:03 */ public interface StreamSource { String ROCKETMQOUTPUT = "rocketmqOutput"; @Output(ROCKETMQOUTPUT) MessageChannel rocketmqOutput(); String RABBITMQOUTPUT = "rabbitmqOutput"; @Output(RABBITMQOUTPUT) MessageChannel rabbitmqOutput(); String KAFKAOUTPUT = "kafkaOutput"; @Output(KAFKAOUTPUT) MessageChannel kafkaOutput(); }
创建StreamProducer
代码如下(示例):
package com.example.springcloudstreamdemo.producer; import com.example.springcloudstreamdemo.passage.StreamSource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; /** * @author zhiweiLiao * @Description kafka的生产者 * @Date create in 2022/9/13 0013 14:39 */ @EnableBinding({StreamSource.class}) public class StreamProducer { @Autowired private StreamSource streamSource; @Autowired private StreamBridge streamBridge; public boolean rocketmqSendMessage(Object payload) { Message<Object> message = MessageBuilder.withPayload(payload) .setHeader("type", "rocketmq") .setHeader("x-delay", 5000) .build(); // return streamBridge.send("rocketmqOutput",message); return streamSource.rocketmqOutput().send(message); } public boolean rabbitmqSendMessage(Object payload) { Message<Object> message = MessageBuilder.withPayload(payload) .setHeader("type", "rabbitmq") .setHeader("x-delay", 5000) .build(); return streamSource.rabbitmqOutput().send(message); } public boolean kafkaSendMessage(Object payload) { Message<Object> message = MessageBuilder.withPayload(payload) .setHeader("type", "kafka") .setHeader("x-delay", 5000) .build(); return streamSource.kafkaOutput().send(message); } }
创建RabbitMQUtil
代码如下(示例):
package com.example.springcloudstreamdemo.util; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitMQUtil { private static ConnectionFactory factory; //程序一加载就会启动 static { //1.创建连接工厂对象 factory=new ConnectionFactory(); //设置连接对象的参数 factory.setHost("106.14.132.94"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("java@2022"); factory.setVirtualHost("/"); } public static Connection getconnection(){ Connection connection=null; try{ connection=factory.newConnection(); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } return connection; } public static void main(String[] args) { System.out.println(getconnection()); } }
校验是否正常工作
使用rocketmq发消息
如下图(示例):
调用接口发消息,如下图(示例):
控制台打印,如下图(示例):