一、添加依赖
<!-- kafka-clients--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.5.1</version> </dependency>
二、生产者
自定义分区,可忽略
import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; public class MyPatitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { String msgStr = value.toString(); if(msgStr.contains("a")){ return 1; } return 0; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
1、普通消息
public static void main(String[] args) throws ExecutionException, InterruptedException { //配置 Properties properties = new Properties(); //连接参数 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092"); //序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //关联自定义分区器 可选 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner"); //优化参数 可选 //缓冲器大小 32M properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024); //批次大小 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024); //Linger.ms properties.put(ProducerConfig.LINGER_MS_CONFIG, 5); //压缩 properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); //acks properties.put(ProducerConfig.ACKS_CONFIG, "-1"); //重试次数 properties.put(ProducerConfig.RETRIES_CONFIG, 3); //创建生产者 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); //异步发送数据 for (int i = 0; i < 10; i++) { //给first主题发消息 kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i)); //回调异步发送 kafkaProducer.send(new ProducerRecord<String, String>("first", "hello2" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { System.out.println("主题:" + recordMetadata.topic() + "分区:" + recordMetadata.partition()); } } }); kafkaProducer.send(new ProducerRecord<String, String>("first", "a" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { System.out.println("主题:" + recordMetadata.topic() + "分区" + recordMetadata.partition() + "a"); } } }); Thread.sleep(500); } //同步 for (int i = 0; i < 10; i++) { //给first主题发消息 kafkaProducer.send(new ProducerRecord<String, String>("first", "sync_hello" + i)).get(); } //关闭资源 kafkaProducer.close(); }
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first a0 hello0 hello20 a1 hello1 hello21 a2 hello2 hello22 a3 hello3 hello23 a4 hello4 hello24 a5 hello5 hello25 a6 hello6 hello26 a7 hello7 hello27 a8 hello8 hello28 a9 hello9 hello29 sync_hello0 sync_hello1 sync_hello2 sync_hello3 sync_hello4 sync_hello5 sync_hello6 sync_hello7 sync_hello8 sync_hello9
2、事务消息
public static void main(String[] args) throws ExecutionException, InterruptedException { //配置 Properties properties = new Properties(); //连接参数 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092"); //序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //关联自定义分区器 可选 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner"); //优化参数 可选 //缓冲器大小 32M properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024); //批次大小 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024); //Linger.ms properties.put(ProducerConfig.LINGER_MS_CONFIG, 5); //压缩 properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); //acks properties.put(ProducerConfig.ACKS_CONFIG, "-1"); //重试次数 properties.put(ProducerConfig.RETRIES_CONFIG, 3); //指定事务ID properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactional_id_01"); properties.put("enable.idempotence", "true"); //创建生产者 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); //事务消息 初始化 kafkaProducer.initTransactions(); //开始事务 kafkaProducer.beginTransaction(); try { kafkaProducer.send(new ProducerRecord<String, String>("first", "Transactions")).get(); //提交事务 kafkaProducer.commitTransaction(); } catch (Exception e) { //终止事务 kafkaProducer.abortTransaction(); } finally { //关闭资源 kafkaProducer.close(); } }
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first Transactions