spring集成kafka

简介: 一、添加依赖项 compile 'org.springframework.kafka:spring-kafka:1.2.2.RELEASE'   二、发消息(生产者) 2.1 xml配置 1 2 6 7 8 9 ...

一、添加依赖项

compile 'org.springframework.kafka:spring-kafka:1.2.2.RELEASE'

 

二、发消息(生产者)

2.1 xml配置

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans
 5          http://www.springframework.org/schema/beans/spring-beans.xsd">
 6 
 7     <bean id="producerProperties" class="java.util.HashMap">
 8         <constructor-arg>
 9             <map>
10                 <!--kafka的服务地址,多个地址用英文逗号连接-->
11                 <entry key="bootstrap.servers" value="192.168.0.10:9092,192.168.0.11:9092,192.168.0.12:9092"/>
12                 <entry key="group.id" value="0"/>
13                 <entry key="retries" value="10"/>
14                 <entry key="batch.size" value="16384"/>
15                 <entry key="linger.ms" value="1"/>
16                 <entry key="buffer.memory" value="33554432"/>
17                 <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>
18                 <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
19             </map>
20         </constructor-arg>
21     </bean>
22 
23     <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
24         <constructor-arg>
25             <ref bean="producerProperties"/>
26         </constructor-arg>
27     </bean>
28 
29     <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
30         <constructor-arg ref="producerFactory"/>
31         <constructor-arg name="autoFlush" value="true"/>
32         <!--topic名字-->
33         <property name="defaultTopic" value="dc-monitor"/>
34     </bean>
35 
36 </beans>

 

2.2 发送代码示例

    @Test
    public void send() throws InterruptedException, ExecutionException, TimeoutException {
        KafkaTemplate template = context.getBean(KafkaTemplate.class);
        String msg = "中华人民共和国万岁!";
        ListenableFuture<SendResult<String, String>> future = template.sendDefault(msg);
        SendResult<String, String> result = future.get(10, TimeUnit.SECONDS);
        System.out.println("发送成功=====>" + msg);
    }

  

三、收消息(消费者)

3.1 xml配置

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans
 5          http://www.springframework.org/schema/beans/spring-beans.xsd">
 6 
 7     <bean id="consumerProperties" class="java.util.HashMap">
 8         <constructor-arg>
 9             <map>
10                 <!--kafka的服务地址,多个地址用英文逗号连接-->
11                 <entry key="bootstrap.servers" value="192.168.0.10:9092,192.168.0.11:9092,192.168.0.12:9092"/>
12                 <entry key="group.id" value="0"/>
13                 <entry key="enable.auto.commit" value="true"/>
14                 <entry key="auto.commit.interval.ms" value="1000"/>
15                 <entry key="session.timeout.ms" value="15000"/>
16                 <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
17                 <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
18             </map>
19         </constructor-arg>
20     </bean>
21 
22     <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
23         <constructor-arg ref="consumerProperties"/>
24     </bean>
25 
26     <!-- 实际执行消息消费的类 -->
27     <bean id="kafkaConsumer" class="com.cnblogs.yjmyzz.consumer.DemoKafkaConsumer"/>
28 
29     <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
30         <!--topic名字-->
31         <constructor-arg value="dc-monitor"/>
32         <property name="messageListener" ref="kafkaConsumer"/>
33     </bean>
34 
35     <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
36           init-method="doStart">
37         <constructor-arg ref="consumerFactory"/>
38         <constructor-arg ref="containerProperties"/>
39     </bean>
40 
41 </beans>

 

3.2 接收代码示例

public class DemoKafkaConsumer implements MessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        System.out.println("收到消息=====>" + data.value());
    }
}

 

目录
相关文章
消息中间件 Java Kafka
712 0
|
9月前
|
数据可视化 Java BI
将 Spring 微服务与 BI 工具集成:最佳实践
本文探讨了 Spring 微服务与商业智能(BI)工具集成的潜力与实践。随着微服务架构和数据分析需求的增长,Spring Boot 和 Spring Cloud 提供了构建可扩展、弹性服务的框架,而 BI 工具则增强了数据可视化与实时分析能力。文章介绍了 Spring 微服务的核心概念、BI 工具在企业中的作用,并深入分析了两者集成带来的优势,如实时数据处理、个性化报告、数据聚合与安全保障。同时,文中还总结了集成过程中的最佳实践,包括事件驱动架构、集中配置管理、数据安全控制、模块化设计与持续优化策略,旨在帮助企业构建高效、智能的数据驱动系统。
437 1
将 Spring 微服务与 BI 工具集成:最佳实践
|
9月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
626 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
9月前
|
监控 Cloud Native Java
Spring Integration 企业集成模式技术详解与实践指南
本文档全面介绍 Spring Integration 框架的核心概念、架构设计和实际应用。作为 Spring 生态系统中的企业集成解决方案,Spring Integration 基于著名的 Enterprise Integration Patterns(EIP)提供了轻量级的消息驱动架构。本文将深入探讨其消息通道、端点、过滤器、转换器等核心组件,以及如何构建可靠的企业集成解决方案。
806 0
|
11月前
|
XML 人工智能 Java
Spring Boot集成Aviator实现参数校验
Aviator是一个高性能、轻量级的Java表达式求值引擎,适用于动态表达式计算。其特点包括支持多种运算符、函数调用、正则匹配、自动类型转换及嵌套变量访问,性能优异且依赖小。适用于规则引擎、公式计算和动态脚本控制等场景。本文介绍了如何结合Aviator与AOP实现参数校验,并附有代码示例和仓库链接。
687 0
|
11月前
|
Java 关系型数据库 数据库连接
Spring Boot项目集成MyBatis Plus操作PostgreSQL全解析
集成 Spring Boot、PostgreSQL 和 MyBatis Plus 的步骤与 MyBatis 类似,只不过在 MyBatis Plus 中提供了更多的便利功能,如自动生成 SQL、分页查询、Wrapper 查询等。
1033 2
|
11月前
|
安全 Java 数据库
第16课:Spring Boot中集成 Shiro
第16课:Spring Boot中集成 Shiro
1143 0
|
11月前
|
消息中间件 存储 Java
第15课: Spring Boot中集成ActiveMQ
第15课: Spring Boot中集成ActiveMQ
677 0
|
11月前
|
缓存 JSON 前端开发
第07课:Spring Boot集成Thymeleaf模板引擎
第07课:Spring Boot集成Thymeleaf模板引擎
944 0
第07课:Spring Boot集成Thymeleaf模板引擎

热门文章

最新文章