场景描述:上文写到,不断接收数据并存放到OSS,现在要把数据存到MQ的kafka一份。
springboot版本为1.5.9。
开工之前先阅读阿里云官方kafka消息接入说明:https://help.aliyun.com/document_detail/52376.html
1、首先引入kafka jar包
spring-kafka目前最新版本为2.1.2,其依赖的kafka-clients是1.0.x,但Kafka 服务端版本是 0.10,Client 版本建议 0.10,所以此处需排除依赖重新引入,否则一直报错:
Bootstrap broker kafka-ons-internet.aliyun.com:8080 disconnected
2、KafkaConfiguration.java
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Value("${kafka.broker.address}")
private String brokerAddress;
@Value("${kafka.default.topic}")
private String defaultTopic;
@Value("${kafka.jks.location}")
private String jksLocation;
public KafkaConfiguration() {
URL authLocation = KafkaConfiguration.class.getClassLoader().getResource("kafka_client_jaas.conf");
if (System.getProperty("java.security.auth.login.config") == null) {
System.setProperty("java.security.auth.login.config", authLocation.toExternalForm());
}
}
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
if (StringUtils.isEmpty(jksLocation)) {
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, KafkaConfiguration.class.getClassLoader().getResource("kafka.client.truststore.jks").getPath());
} else {
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, jksLocation);
}
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "ONS");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<String, String>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());
kafkaTemplate.setDefaultTopic(defaultTopic);
return kafkaTemplate;
}
}
注意此处定义了三个变量,通过配置文件注入:
brokerAddress kafka服务器地址
defaultTopic kafka默认topic
jksLocation JKS文件地址(开发环境无需定义,直接读取resources下的jks,但生产环境需读取jar包外部的jks文件,所以此处需配置路径)
3、将《kafka消息接入说明》中的kafka_client_jaas.conf和根证书kafka.client.truststore.jks放到resources/目录下
4、KafkaService.java
@Component
public class KafkaService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(ProbeData probeData) {
String deviceMac = probeData.getDev_mac();
String shopId = probeData.getShopId();
List<UserData> data = probeData.getData();
StringBuilder msgSB = new StringBuilder();
if (data != null && !data.isEmpty()) {
for (UserData userData : data) {
String visitorMac = userData.getUsr_mac();
String visitorTime = userData.getUsr_cap_time();
String msg = shopId + "," + deviceMac + "," + visitorMac + "," + visitorTime;
msgSB.append(msg).append(";");
}
ListenableFuture futher = kafkaTemplate.sendDefault(msgSB.toString());
/*
// 此处用于控制是否同步
try {
futher.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
*/
}
}
}
关键代码就一句:
kafkaTemplate.sendDefault(msgSB.toString());
此处为异步发送,一开始用测试类测试时总是写入不了kafka,后来发现是因为公网异步写入太慢,而测试类执行完后退出,导致异步中断。
测试时可以改为同步发送,即:
ListenableFuture futher = kafkaTemplate.sendDefault(msgSB.toString());
futher.get();