1.使用AdminClient进行处理
2.topic 创建、删除等操作代码
pom文件
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency>
操作代码
package com.vince.xq.kafka; import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Test; import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertTrue; /** * Unit test for simple App. */ public class AppTest { private String BROKER_LIST = "localhost:9092"; /** * Rigorous Test :-) */ @Test public void shouldAnswerWithTrue() { assertTrue(true); } @Test public void createTopic() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); //kafka服务地址 props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); AdminClient client = KafkaAdminClient.create(props);//创建操作客户端 //创建名称为test1的topic,有5个分区 NewTopic topic = new NewTopic("test01", 5, (short) 1); CreateTopicsResult createTopicsResult = client.createTopics(Arrays.asList(topic)); client.close();//关闭 } @Test public void deleteTopic() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); //kafka服务地址 props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); AdminClient client = KafkaAdminClient.create(props);//创建操作客户端 client.deleteTopics(Arrays.asList("test2")); client.close();//关闭 } @Test public void getTopic() throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); //kafka服务地址 props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); AdminClient client = KafkaAdminClient.create(props);//创建操作客户端 DescribeTopicsResult describeTopicsResult = client.describeTopics(Arrays.asList("test1")); Map<String, TopicDescription> descriptionMap = describeTopicsResult.all().get(); descriptionMap.forEach((key, value) -> { System.out.println("name: " + key + " desc: " + value); }); } @Test public void listTopic() throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); //kafka服务地址 props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); AdminClient client = KafkaAdminClient.create(props);//创建操作客户端 ListTopicsResult listTopicsResult = client.listTopics(); Set<String> names = listTopicsResult.names().get(); //打印names names.stream().forEach(System.out::println); client.close();//关闭 } @Test public void producer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(props); //异步发送20条消息 for (int i = 1; i <= 20; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("test1", "key" + i, "message" + i); //aysn 发送 //producer.send(record); //同步发送 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { System.out.println("success:" + recordMetadata.offset()); } else { e.printStackTrace(); } } }); } producer.close(); } @Test public void consumer() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");//groupid相同的属于同一个消费者组 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//自动提交offset props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消费test1主题 consumer.subscribe(Arrays.asList("test1")); while (true) { System.out.println("consumer is polling"); //5秒等待 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("offset=%d,key=%s,value=%s", record.offset(), record.key(), record.value())); } //同步提交,失败会重试 consumer.commitSync(); //异步提交,失败不会重试 //consumer.commitAsync(); } } }