Springboot+阿里云kafka踩坑实录

简介:

场景描述:上文写到,不断接收数据并存放到OSS,现在要把数据存到MQ的kafka一份。


springboot版本为1.5.9。

开工之前先阅读阿里云官方kafka消息接入说明:https://help.aliyun.com/document_detail/52376.html


1、首先引入kafka jar包

189cfd06de6b670f27dd719ba9d29e90ebb47417

spring-kafka目前最新版本为2.1.2,其依赖的kafka-clients是1.0.x,但Kafka 服务端版本是 0.10,Client 版本建议 0.10,所以此处需排除依赖重新引入,否则一直报错:

Bootstrap broker kafka-ons-internet.aliyun.com:8080 disconnected


2、KafkaConfiguration.java


@Configuration
@EnableKafka
public class KafkaConfiguration {

	@Value("${kafka.broker.address}")
	private String brokerAddress;

	@Value("${kafka.default.topic}")
	private String defaultTopic;

	@Value("${kafka.jks.location}")
	private String jksLocation;

	public KafkaConfiguration() {
		URL authLocation = KafkaConfiguration.class.getClassLoader().getResource("kafka_client_jaas.conf");
		if (System.getProperty("java.security.auth.login.config") == null) {
			System.setProperty("java.security.auth.login.config", authLocation.toExternalForm());
		}
	}

	public Map<String, Object> producerConfigs() {
		Map<String, Object> props = new HashMap<String, Object>();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
		if (StringUtils.isEmpty(jksLocation)) {
			props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, KafkaConfiguration.class.getClassLoader().getResource("kafka.client.truststore.jks").getPath());
		} else {
			props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, jksLocation);
		}
		props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
		props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
		props.put(SaslConfigs.SASL_MECHANISM, "ONS");
		props.put(ProducerConfig.RETRIES_CONFIG, 0);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
		return props;
	}

	public ProducerFactory<String, String> producerFactory() {
		return new DefaultKafkaProducerFactory<String, String>(producerConfigs());
	}

	@Bean
	public KafkaTemplate<String, String> kafkaTemplate() {
		KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());
		kafkaTemplate.setDefaultTopic(defaultTopic);
		return kafkaTemplate;
	}
}

注意此处定义了三个变量,通过配置文件注入:

brokerAddress kafka服务器地址

defaultTopic kafka默认topic

jksLocation JKS文件地址(开发环境无需定义,直接读取resources下的jks,但生产环境需读取jar包外部的jks文件,所以此处需配置路径)

03335a178aff1e00d0a797f8994ee46478548945


3、将《kafka消息接入说明》中的kafka_client_jaas.conf和根证书kafka.client.truststore.jks放到resources/目录下

48b2e332f3f9fa48d929fac0f9dc464e66fd45ca


4、KafkaService.java



@Component
public class KafkaService {

	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;

	public void send(ProbeData probeData) {
		String deviceMac = probeData.getDev_mac();
		String shopId = probeData.getShopId();
		List<UserData> data = probeData.getData();

		StringBuilder msgSB = new StringBuilder();
		if (data != null && !data.isEmpty()) {
			for (UserData userData : data) {
				String visitorMac = userData.getUsr_mac();
				String visitorTime = userData.getUsr_cap_time();
				String msg = shopId + "," + deviceMac + "," + visitorMac + "," + visitorTime;

				msgSB.append(msg).append(";");
			}
			ListenableFuture futher = kafkaTemplate.sendDefault(msgSB.toString());
			/*
				// 此处用于控制是否同步
				try {
					futher.get();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (ExecutionException e) {
					e.printStackTrace();
				}
			*/
		}
	}
}


关键代码就一句:

kafkaTemplate.sendDefault(msgSB.toString());


此处为异步发送,一开始用测试类测试时总是写入不了kafka,后来发现是因为公网异步写入太慢,而测试类执行完后退出,导致异步中断。

测试时可以改为同步发送,即:

ListenableFuture futher = kafkaTemplate.sendDefault(msgSB.toString());
futher.get();




目录
相关文章
|
3月前
|
消息中间件 存储 大数据
阿里云消息队列 Kafka 架构及典型应用场景
阿里云消息队列 Kafka 是一款基于 Apache Kafka 的分布式消息中间件,支持消息发布与订阅模型,满足微服务解耦、大数据处理及实时流数据分析需求。其通过存算分离架构优化成本与性能,提供基础版、标准版和专业版三种 Serverless 版本,分别适用于不同业务场景,最高 SLA 达 99.99%。阿里云 Kafka 还具备弹性扩容、多可用区部署、冷热数据缓存隔离等特性,并支持与 Flink、MaxCompute 等生态工具无缝集成,广泛应用于用户行为分析、数据入库等场景,显著提升数据处理效率与实时性。
|
5月前
|
消息中间件 Java Kafka
Spring Boot整合kafka
本文简要记录了Spring Boot与Kafka的整合过程。首先通过Docker搭建Kafka环境,包括Zookeeper和Kafka服务的配置文件。接着引入Spring Kafka依赖,并在`application.properties`中配置生产者和消费者参数。随后创建Kafka配置类,定义Topic及重试机制。最后实现生产者发送消息和消费者监听消息的功能,支持手动ACK确认。此方案适用于快速构建基于Spring Boot的Kafka消息系统。
884 7
|
6月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
174 10
|
7月前
|
消息中间件 Java Kafka
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
116 5
|
10月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
507 5
|
10月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
257 1
|
11月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
211 2
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
548 3
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
223 0
|
4月前
|
JavaScript 前端开发 Java
制造业ERP源码,工厂ERP管理系统,前端框架:Vue,后端框架:SpringBoot
这是一套基于SpringBoot+Vue技术栈开发的ERP企业管理系统,采用Java语言与vscode工具。系统涵盖采购/销售、出入库、生产、品质管理等功能,整合客户与供应商数据,支持在线协同和业务全流程管控。同时提供主数据管理、权限控制、工作流审批、报表自定义及打印、在线报表开发和自定义表单功能,助力企业实现高效自动化管理,并通过UniAPP实现移动端支持,满足多场景应用需求。
370 1