Kafka——使用spring进行集成

简介: 生产者: 消费者: ...


生产者:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/context
         http://www.springframework.org/schema/context/spring-context.xsd">

	<!-- 定义producer的参数 -->
	<bean id="producerProperties" class="java.util.HashMap">
		<constructor-arg>
			<map>
				<entry key="bootstrap.servers" value="10.0.1.72:9092,10.0.1.73:9092,10.0.1.74:9092,10.0.1.75:9092" />
				<entry key="group.id" value="0" />
				<entry key="retries" value="1" />
				<entry key="batch.size" value="16384" />
				<entry key="linger.ms" value="1" />
				<entry key="buffer.memory" value="33554432" />
				<entry key="key.serializer"
					   value="org.apache.kafka.common.serialization.StringSerializer" />
				<entry key="value.serializer"
					   value="org.apache.kafka.common.serialization.StringSerializer" />
			</map>

		</constructor-arg>
	</bean>

	<!-- 创建kafkatemplate需要使用的producerfactory bean -->
	<bean id="producerFactory"
		  class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
		<constructor-arg>
			<ref bean="producerProperties" />
		</constructor-arg>
	</bean>

	<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
	<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
		<constructor-arg ref="producerFactory" />
		<constructor-arg name="autoFlush" value="true" />
		<property name="defaultTopic" value="defaultTopic" />
	<!--	<property name="producerListener" ref="producerListener"/>-->
	</bean>

<!--	<bean id="producerListener" class="KafkaTest.KafkaProducerListener" />-->
</beans>




消费者:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	   xmlns:context="http://www.springframework.org/schema/context"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/tx
     http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
     http://www.springframework.org/schema/jee
     http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
     http://www.springframework.org/schema/context
      http://www.springframework.org/schema/context/spring-context-3.0.xsd">


	<!-- 定义consumer的参数 -->
	<bean id="consumerProperties" class="java.util.HashMap">
		<constructor-arg>
			<map>
				<entry key="bootstrap.servers" value="10.0.1.72:9092,10.0.1.73:9092,10.0.1.74:9092,10.0.1.75:9092"/>
				<entry key="group.id" value="0"/>
				<entry key="enable.auto.commit" value="true"/>
				<entry key="auto.commit.interval.ms" value="1000"/>
				<entry key="session.timeout.ms" value="30000"/>
				<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
				<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
			</map>
		</constructor-arg>
	</bean>

	<!-- 创建consumerFactory bean -->
	<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
		<constructor-arg>
			<ref bean="consumerProperties"/>
		</constructor-arg>
	</bean>

	<!-- 实际执行消息消费的类 -->
	<bean id="messageListernerConsumerService" class="KafkaTest.KafkaConsumerServer"/>

	<!-- 消费者容器配置信息 -->
	<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
		<constructor-arg value="defaultTopic"/>
		<property name="messageListener" ref="messageListernerConsumerService"/>
	</bean>


	<!-- 创建messageListenerContainer bean,使用的时候,只需要注入这个bean -->
	<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
		  init-method="doStart">
		<constructor-arg ref="consumerFactory"/>
		<constructor-arg ref="containerProperties"/>
	</bean>



</beans>



生产者发送消息代码:


首先注入:


 @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;


之后进行发送:


 ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, valueString);



消费端:


/**
 * kafka消费
 * Created by liuhuichao on 2017/5/12.
 */
public class KafkaConsumerServer implements MessageListener<String, String> {
    private Logger logger = Logger.getLogger(KafkaConsumerServer.class);

    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        logger.info("KafkaConsumerServer=============kafkaConsumer开始消费=============");
        String topic = record.topic();
        String key = record.key();
        String value = record.value();
        long offset = record.offset();
        int partition = record.partition();
        logger.info("KafkaConsumerServer-------------topic:"+topic);
        logger.info("KafkaConsumerServer-------------value:"+value);
        logger.info("KafkaConsumerServer-------------key:"+key);
        logger.info("KafkaConsumerServer-------------offset:"+offset);
        logger.info("KafkaConsumerServer-------------partition:"+partition);
        logger.info("~~~~~~~~~~~~~kafkaConsumer消费结束~~~~~~~~~~~~~");
        System.out.println("消费成功***************************************************************");
    }
}




除了集成kafka,熟悉spring的同志们,可能还用spring集成过redis,hbase这些东西,可以完全放心把连接什么的都交给spring,自己只关注该关注的东西。




ps:

       

              doc地址:http://docs.spring.io/spring-kafka/docs/1.0.4.RELEASE/reference/html/index.html





目录
相关文章
|
1月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
3月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
定时任务在企业应用中至关重要,常用于异步数据处理、自动化运维等场景。在单体应用中,利用Java的`java.util.Timer`或Spring的`@Scheduled`即可轻松实现。然而,进入微服务架构后,任务可能因多节点并发执行而重复。Spring Cloud Alibaba为此发布了Scheduling模块,提供轻量级、高可用的分布式定时任务解决方案,支持防重复执行、分片运行等功能,并可通过`spring-cloud-starter-alibaba-schedulerx`快速集成。用户可选择基于阿里云SchedulerX托管服务或采用本地开源方案(如ShedLock)
126 1
|
1月前
|
前端开发 Java 程序员
springboot 学习十五:Spring Boot 优雅的集成Swagger2、Knife4j
这篇文章是关于如何在Spring Boot项目中集成Swagger2和Knife4j来生成和美化API接口文档的详细教程。
104 1
|
1月前
|
存储 前端开发 Java
Spring Boot 集成 MinIO 与 KKFile 实现文件预览功能
本文详细介绍如何在Spring Boot项目中集成MinIO对象存储系统与KKFileView文件预览工具,实现文件上传及在线预览功能。首先搭建MinIO服务器,并在Spring Boot中配置MinIO SDK进行文件管理;接着通过KKFileView提供文件预览服务,最终实现文档管理系统的高效文件处理能力。
278 11
|
1月前
|
Java Spring
springboot 学习十一:Spring Boot 优雅的集成 Lombok
这篇文章是关于如何在Spring Boot项目中集成Lombok,以简化JavaBean的编写,避免冗余代码,并提供了相关的配置步骤和常用注解的介绍。
99 0
|
3月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
111 3
|
3月前
|
消息中间件 Java Kafka
|
3月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
110 0
|
3月前
|
测试技术 Java Spring
Spring 框架中的测试之道:揭秘单元测试与集成测试的双重保障,你的应用真的安全了吗?
【8月更文挑战第31天】本文以问答形式深入探讨了Spring框架中的测试策略,包括单元测试与集成测试的有效编写方法,及其对提升代码质量和可靠性的重要性。通过具体示例,展示了如何使用`@MockBean`、`@SpringBootTest`等注解来进行服务和控制器的测试,同时介绍了Spring Boot提供的测试工具,如`@DataJpaTest`,以简化数据库测试流程。合理运用这些测试策略和工具,将助力开发者构建更为稳健的软件系统。
59 0
|
3月前
|
数据库 开发者 Java
颠覆传统开发:Hibernate与Spring Boot的集成,让你的开发效率飞跃式提升!
【8月更文挑战第31天】在 Java 开发中,Spring Boot 和 Hibernate 已成为许多开发者的首选技术栈。Spring Boot 简化了配置和部署过程,而 Hibernate 则是一个强大的 ORM 框架,用于管理数据库交互。将两者结合使用,可以极大提升开发效率并构建高性能的现代 Java 应用。本文将通过代码示例展示如何在 Spring Boot 项目中集成 Hibernate,并实现基本的数据库操作,包括添加依赖、配置数据源、创建实体类和仓库接口,以及在服务层和控制器中处理 HTTP 请求。这种组合不仅简化了配置,还提供了一套强大的工具来快速开发现代 Java 应用程序。
205 0

热门文章

最新文章

下一篇
无影云桌面