Kafka——使用spring进行集成

简介: 生产者: 消费者: ...


生产者:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/context
         http://www.springframework.org/schema/context/spring-context.xsd">

	<!-- 定义producer的参数 -->
	<bean id="producerProperties" class="java.util.HashMap">
		<constructor-arg>
			<map>
				<entry key="bootstrap.servers" value="10.0.1.72:9092,10.0.1.73:9092,10.0.1.74:9092,10.0.1.75:9092" />
				<entry key="group.id" value="0" />
				<entry key="retries" value="1" />
				<entry key="batch.size" value="16384" />
				<entry key="linger.ms" value="1" />
				<entry key="buffer.memory" value="33554432" />
				<entry key="key.serializer"
					   value="org.apache.kafka.common.serialization.StringSerializer" />
				<entry key="value.serializer"
					   value="org.apache.kafka.common.serialization.StringSerializer" />
			</map>

		</constructor-arg>
	</bean>

	<!-- 创建kafkatemplate需要使用的producerfactory bean -->
	<bean id="producerFactory"
		  class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
		<constructor-arg>
			<ref bean="producerProperties" />
		</constructor-arg>
	</bean>

	<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
	<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
		<constructor-arg ref="producerFactory" />
		<constructor-arg name="autoFlush" value="true" />
		<property name="defaultTopic" value="defaultTopic" />
	<!--	<property name="producerListener" ref="producerListener"/>-->
	</bean>

<!--	<bean id="producerListener" class="KafkaTest.KafkaProducerListener" />-->
</beans>




消费者:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	   xmlns:context="http://www.springframework.org/schema/context"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/tx
     http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
     http://www.springframework.org/schema/jee
     http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
     http://www.springframework.org/schema/context
      http://www.springframework.org/schema/context/spring-context-3.0.xsd">


	<!-- 定义consumer的参数 -->
	<bean id="consumerProperties" class="java.util.HashMap">
		<constructor-arg>
			<map>
				<entry key="bootstrap.servers" value="10.0.1.72:9092,10.0.1.73:9092,10.0.1.74:9092,10.0.1.75:9092"/>
				<entry key="group.id" value="0"/>
				<entry key="enable.auto.commit" value="true"/>
				<entry key="auto.commit.interval.ms" value="1000"/>
				<entry key="session.timeout.ms" value="30000"/>
				<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
				<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
			</map>
		</constructor-arg>
	</bean>

	<!-- 创建consumerFactory bean -->
	<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
		<constructor-arg>
			<ref bean="consumerProperties"/>
		</constructor-arg>
	</bean>

	<!-- 实际执行消息消费的类 -->
	<bean id="messageListernerConsumerService" class="KafkaTest.KafkaConsumerServer"/>

	<!-- 消费者容器配置信息 -->
	<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
		<constructor-arg value="defaultTopic"/>
		<property name="messageListener" ref="messageListernerConsumerService"/>
	</bean>


	<!-- 创建messageListenerContainer bean,使用的时候,只需要注入这个bean -->
	<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
		  init-method="doStart">
		<constructor-arg ref="consumerFactory"/>
		<constructor-arg ref="containerProperties"/>
	</bean>



</beans>



生产者发送消息代码:


首先注入:


 @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;


之后进行发送:


 ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, valueString);



消费端:


/**
 * kafka消费
 * Created by liuhuichao on 2017/5/12.
 */
public class KafkaConsumerServer implements MessageListener<String, String> {
    private Logger logger = Logger.getLogger(KafkaConsumerServer.class);

    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        logger.info("KafkaConsumerServer=============kafkaConsumer开始消费=============");
        String topic = record.topic();
        String key = record.key();
        String value = record.value();
        long offset = record.offset();
        int partition = record.partition();
        logger.info("KafkaConsumerServer-------------topic:"+topic);
        logger.info("KafkaConsumerServer-------------value:"+value);
        logger.info("KafkaConsumerServer-------------key:"+key);
        logger.info("KafkaConsumerServer-------------offset:"+offset);
        logger.info("KafkaConsumerServer-------------partition:"+partition);
        logger.info("~~~~~~~~~~~~~kafkaConsumer消费结束~~~~~~~~~~~~~");
        System.out.println("消费成功***************************************************************");
    }
}




除了集成kafka,熟悉spring的同志们,可能还用spring集成过redis,hbase这些东西,可以完全放心把连接什么的都交给spring,自己只关注该关注的东西。




ps:

       

              doc地址:http://docs.spring.io/spring-kafka/docs/1.0.4.RELEASE/reference/html/index.html





目录
相关文章
消息中间件 Java Kafka
41 0
|
21天前
|
数据可视化 Java BI
将 Spring 微服务与 BI 工具集成:最佳实践
本文探讨了 Spring 微服务与商业智能(BI)工具集成的潜力与实践。随着微服务架构和数据分析需求的增长,Spring Boot 和 Spring Cloud 提供了构建可扩展、弹性服务的框架,而 BI 工具则增强了数据可视化与实时分析能力。文章介绍了 Spring 微服务的核心概念、BI 工具在企业中的作用,并深入分析了两者集成带来的优势,如实时数据处理、个性化报告、数据聚合与安全保障。同时,文中还总结了集成过程中的最佳实践,包括事件驱动架构、集中配置管理、数据安全控制、模块化设计与持续优化策略,旨在帮助企业构建高效、智能的数据驱动系统。
将 Spring 微服务与 BI 工具集成:最佳实践
|
22天前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
23天前
|
监控 Cloud Native Java
Spring Integration 企业集成模式技术详解与实践指南
本文档全面介绍 Spring Integration 框架的核心概念、架构设计和实际应用。作为 Spring 生态系统中的企业集成解决方案,Spring Integration 基于著名的 Enterprise Integration Patterns(EIP)提供了轻量级的消息驱动架构。本文将深入探讨其消息通道、端点、过滤器、转换器等核心组件,以及如何构建可靠的企业集成解决方案。
85 0
|
3月前
|
XML 人工智能 Java
Spring Boot集成Aviator实现参数校验
Aviator是一个高性能、轻量级的Java表达式求值引擎,适用于动态表达式计算。其特点包括支持多种运算符、函数调用、正则匹配、自动类型转换及嵌套变量访问,性能优异且依赖小。适用于规则引擎、公式计算和动态脚本控制等场景。本文介绍了如何结合Aviator与AOP实现参数校验,并附有代码示例和仓库链接。
171 0
|
3月前
|
Java 关系型数据库 数据库连接
Spring Boot项目集成MyBatis Plus操作PostgreSQL全解析
集成 Spring Boot、PostgreSQL 和 MyBatis Plus 的步骤与 MyBatis 类似,只不过在 MyBatis Plus 中提供了更多的便利功能,如自动生成 SQL、分页查询、Wrapper 查询等。
278 3
|
3月前
|
安全 Java 数据库
第16课:Spring Boot中集成 Shiro
第16课:Spring Boot中集成 Shiro
560 0
|
3月前
|
消息中间件 存储 Java
第15课: Spring Boot中集成ActiveMQ
第15课: Spring Boot中集成ActiveMQ
325 0
|
Java 数据库连接 数据库