kafka topic 管理api

简介: kafka topic 管理api

1.使用AdminClient进行处理

2.topic 创建、删除等操作代码

pom文件

<dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.0.0</version>
    </dependency>

操作代码

package com.vince.xq.kafka;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertTrue;
/**
 * Unit test for simple App.
 */
public class AppTest {
    private String BROKER_LIST = "localhost:9092";
    /**
     * Rigorous Test :-)
     */
    @Test
    public void shouldAnswerWithTrue() {
        assertTrue(true);
    }
    @Test
    public void createTopic() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");  //kafka服务地址
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        AdminClient client = KafkaAdminClient.create(props);//创建操作客户端
        //创建名称为test1的topic,有5个分区
        NewTopic topic = new NewTopic("test01", 5, (short) 1);
        CreateTopicsResult createTopicsResult = client.createTopics(Arrays.asList(topic));
        client.close();//关闭
    }
    @Test
    public void deleteTopic() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");  //kafka服务地址
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        AdminClient client = KafkaAdminClient.create(props);//创建操作客户端
        client.deleteTopics(Arrays.asList("test2"));
        client.close();//关闭
    }
    @Test
    public void getTopic() throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");  //kafka服务地址
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        AdminClient client = KafkaAdminClient.create(props);//创建操作客户端
        DescribeTopicsResult describeTopicsResult = client.describeTopics(Arrays.asList("test1"));
        Map<String, TopicDescription> descriptionMap = describeTopicsResult.all().get();
        descriptionMap.forEach((key, value) -> {
            System.out.println("name: " + key + " desc: " + value);
        });
    }
    @Test
    public void listTopic() throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");  //kafka服务地址
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        AdminClient client = KafkaAdminClient.create(props);//创建操作客户端
        ListTopicsResult listTopicsResult = client.listTopics();
        Set<String> names = listTopicsResult.names().get();
        //打印names
        names.stream().forEach(System.out::println);
        client.close();//关闭
    }
    @Test
    public void producer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        //异步发送20条消息
        for (int i = 1; i <= 20; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("test1", "key" + i, "message" + i);
            //aysn 发送
            //producer.send(record);
            //同步发送
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("success:" + recordMetadata.offset());
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        producer.close();
    }
    @Test
    public void consumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");//groupid相同的属于同一个消费者组
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//自动提交offset
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //消费test1主题
        consumer.subscribe(Arrays.asList("test1"));
        while (true) {
            System.out.println("consumer is polling");
            //5秒等待
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(String.format("offset=%d,key=%s,value=%s",
                        record.offset(), record.key(), record.value()));
            }
            //同步提交,失败会重试
            consumer.commitSync();
            //异步提交,失败不会重试
            //consumer.commitAsync();
        }
    }
}


相关文章
|
2月前
|
消息中间件 Kafka API
kafka Consumer high-level api 之白名单
kafka Consumer high-level api 之白名单
|
3月前
|
消息中间件 Kafka 流计算
Flink的分区表订阅功能是通过Kafka的topic分区来实现的
Flink的分区表订阅功能是通过Kafka的topic分区来实现的【1月更文挑战第6天】【1月更文挑战第26篇】
100 1
|
3月前
|
消息中间件 分布式计算 Java
探究Kafka原理-3.生产者消费者API原理解析(上)
探究Kafka原理-3.生产者消费者API原理解析
35 0
|
1月前
|
监控 供应链 测试技术
如何利用API接口进行高效的商品变体管理?
要利用API接口进行高效的商品变体管理,您需要执行一系列策略和技术步骤来确保数据的准确性和实时性。以下是详细的指南:
|
2月前
|
消息中间件 缓存 Java
Kafka Consumer java api 配置
Kafka Consumer java api 配置
|
3月前
|
运维 监控 前端开发
功能强大的国产API管理神器 Eolink,亲测好用
功能强大的国产API管理神器 Eolink,亲测好用
26 0
功能强大的国产API管理神器 Eolink,亲测好用
|
3月前
|
消息中间件 存储 Java
Kafka的Topic CRUD演示
Kafka的Topic CRUD演示
25 0
|
3月前
|
消息中间件 存储 运维
探究Kafka原理-4.API使用
探究Kafka原理-4.API使用
34 0
|
3月前
|
消息中间件 缓存 Kafka
探究Kafka原理-3.生产者消费者API原理解析(下)
探究Kafka原理-3.生产者消费者API原理解析
133 0
|
3月前
|
消息中间件 Kafka API
Kafka - 异步/同步发送API
Kafka - 异步/同步发送API
45 0