kafka topic 管理api

简介: kafka topic 管理api

1.使用AdminClient进行处理

2.topic 创建、删除等操作代码

pom文件

<dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.0.0</version>
    </dependency>

操作代码

package com.vince.xq.kafka;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertTrue;
/**
 * Unit test for simple App.
 */
public class AppTest {
    private String BROKER_LIST = "localhost:9092";
    /**
     * Rigorous Test :-)
     */
    @Test
    public void shouldAnswerWithTrue() {
        assertTrue(true);
    }
    @Test
    public void createTopic() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");  //kafka服务地址
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        AdminClient client = KafkaAdminClient.create(props);//创建操作客户端
        //创建名称为test1的topic,有5个分区
        NewTopic topic = new NewTopic("test01", 5, (short) 1);
        CreateTopicsResult createTopicsResult = client.createTopics(Arrays.asList(topic));
        client.close();//关闭
    }
    @Test
    public void deleteTopic() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");  //kafka服务地址
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        AdminClient client = KafkaAdminClient.create(props);//创建操作客户端
        client.deleteTopics(Arrays.asList("test2"));
        client.close();//关闭
    }
    @Test
    public void getTopic() throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");  //kafka服务地址
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        AdminClient client = KafkaAdminClient.create(props);//创建操作客户端
        DescribeTopicsResult describeTopicsResult = client.describeTopics(Arrays.asList("test1"));
        Map<String, TopicDescription> descriptionMap = describeTopicsResult.all().get();
        descriptionMap.forEach((key, value) -> {
            System.out.println("name: " + key + " desc: " + value);
        });
    }
    @Test
    public void listTopic() throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");  //kafka服务地址
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        AdminClient client = KafkaAdminClient.create(props);//创建操作客户端
        ListTopicsResult listTopicsResult = client.listTopics();
        Set<String> names = listTopicsResult.names().get();
        //打印names
        names.stream().forEach(System.out::println);
        client.close();//关闭
    }
    @Test
    public void producer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        //异步发送20条消息
        for (int i = 1; i <= 20; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("test1", "key" + i, "message" + i);
            //aysn 发送
            //producer.send(record);
            //同步发送
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("success:" + recordMetadata.offset());
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        producer.close();
    }
    @Test
    public void consumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");//groupid相同的属于同一个消费者组
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//自动提交offset
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //消费test1主题
        consumer.subscribe(Arrays.asList("test1"));
        while (true) {
            System.out.println("consumer is polling");
            //5秒等待
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(String.format("offset=%d,key=%s,value=%s",
                        record.offset(), record.key(), record.value()));
            }
            //同步提交,失败会重试
            consumer.commitSync();
            //异步提交,失败不会重试
            //consumer.commitAsync();
        }
    }
}


相关文章
|
1月前
|
消息中间件 NoSQL Kafka
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
61 5
|
1月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
61 4
|
3月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
130 58
|
2月前
|
消息中间件 Kafka Apache
kafka: invalid configuration (That topic/partition is already being consumed)
kafka: invalid configuration (That topic/partition is already being consumed)
|
4月前
|
消息中间件 存储 Kafka
深入理解Kafka核心设计及原理(四):主题管理
深入理解Kafka核心设计及原理(四):主题管理
76 8
|
5月前
|
消息中间件 负载均衡 Kafka
一文读懂Kafka API:Producer、Consumer和Streams全解析
大家好,今天我们将深入探讨Kafka的三大核心API。通过这篇文章,你将了解如何使用Producer API发布记录流,利用Consumer API订阅和处理数据,以及通过Streams API实现复杂的流处理。一起开启Kafka的探索之旅吧!
189 2
|
4月前
|
消息中间件 监控 Kafka
查询Kafka集群中消费组(group)信息和对应topic的消费情况
查询Kafka集群中消费组(group)信息和对应topic的消费情况
2361 0
|
6月前
|
监控 Cloud Native 安全
【阿里云云原生专栏】云原生下的API管理:阿里云API Gateway的应用场景与优势
【5月更文挑战第23天】阿里云API Gateway是高性能的API托管服务,适用于微服务API聚合、安全管理及流量控制。它提供统一入口、多种认证方式和流量控制策略,确保服务稳定性。具备高度可扩展性、丰富插件生态和简化API生命周期管理等特点。通过简单步骤,如创建API、配置后端服务、设置认证和发布,即可快速上手。作为云原生时代的API管理解决方案,阿里云API Gateway助力企业高效、安全地管理API,推动业务创新和数字化转型。
96 1
|
6月前
|
存储 自然语言处理 搜索推荐
Elasticsearch 8.10 同义词管理新篇章:引入同义词 API
Elasticsearch 8.10 同义词管理新篇章:引入同义词 API
167 1
|
6月前
|
前端开发 JavaScript API
React的Context API:全局状态管理的利器
【4月更文挑战第25天】React的Context API解决了深层组件间状态共享的难题,提供全局状态管理方案。通过`Provider`和`Consumer`组件,或结合`useContext` Hook,实现状态在组件树中的传递。最佳实践包括避免过度使用,分离逻辑,以及在必要时与Redux或MobX结合。Context API简化了数据传递,但需谨慎使用以保持代码清晰。

热门文章

最新文章