【Kafka从入门到放弃系列 八】Kafka的API调用

简介: 【Kafka从入门到放弃系列 八】Kafka的API调用

上篇blog安装了可视化的监控工具后,就到了我们最常用的环节,也就是通过代码来控制Kafka,使用API来调用。Kafka文档地址为Kafka官方文档,接下来我们会充分使用到官方文档中的示例,本篇blog分为如下几个部分:

  • 环境准备:创建一个java project,用来进行kafka代码的编写
  • 生产者API:探讨生产者的发送方式,使用不同的生产者接口发送【同步发送、异步发送】
  • 消费者API:探讨生产者的发送方式,使用不同的生产者接口发送【offset提交】

接下来按照如下流程来一起学习吧,奥利给!

环境准备

首先新建一个java project,打开idea新建一个maven项目:

然后引入kafka的的maven依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>7</source>
                    <target>7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.6.0</version>
    </dependency>
     <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.5</version>
     </dependency>
    </dependencies>
</project>

生产者API

在官方文档中,我们可以看到Kafka的消费者API列表生产者API,这些都是当前Kafka支持的生产者相关的API,有如下四种构造方法:

也有如下13种方法【非抽象的实例方法】:接下来分成几个模式分别介绍下

发送方式

发送方式分为两种,同步发送和异步发送,主体的发送流程二者是相同的,主体流程如下:

  • 首先创建ProducerRecord对象,此对象除了包括需要发送的数据value之外还必须指定topic,另外也可以指定key和分区。当发送ProducerRecord的时候,生产者做的第一件事就是把key和value序列化为ByteArrays,以便它们可以通过网络发送。
  • 接下来,数据会被发送到分区器。如果在ProducerRecord中指定了一个分区,那么分区器会直接返回指定的分区;否则,分区器通常会基于ProducerRecord的key值计算出一个分区。一旦分区被确定,生产者就知道数据会被发送到哪个topic和分区。然后数据会被添加到同一批发送到相同topic和分区的数据里面,一个单独的线程会负责把那些批数据发送到对应的brokers。
  • 当broker接收到数据的时候,如果数据已被成功写入到Kafka,会返回一个包含topic、分区和偏移量offset的RecordMetadata对象;如果broker写入数据失败,会返回一个异常信息给生产者。当生产者接收到异常信息时会尝试重新发送数据,如果尝试失败则抛出异常。

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程 ——main 线程和 Sender 线程,以及 一个线程共享变量 ——RecordAccumulator。main 线程将消息发送给RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker

只有数据积累到 batch.size 之后,sender 才会发送数据。如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据,也就是发往broker的数据是一批一批过去的。

异步发送

异步发送的含义是:消息的发送者只是将消息发送过去,并不关心消息的发送状态,如果leader在发送ack后宕机的话,重复发送的消息将不能保证原来的顺序。最好选用带回调函数的方法。

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class Producer {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers", "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092");
        //ack模式,all是最慢但最安全的
        props.put("acks", "-1");
        //失败重试次数
        props.put("retries", 1);
        //每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端
        props.put("batch.size", 10);
        //props.put("max.request.size",10);
        //消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
        props.put("linger.ms", 10000);
        //整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端
        //buffer.memory要大于batch.size,否则会报申请内存不足的错误
        props.put("buffer.memory", 10240);
        //序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        org.apache.kafka.clients.producer.Producer producer=new KafkaProducer(props);
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("tml-second", Integer.toString(i), "tml-second消息:"+i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    System.out.println("消息发送状态监测");
                }
            });
        producer.close();
    }
}

我们可以从机器上看到消息记录

为了更准确一些,我们用命令消费一下:

同步发送

同步发送用的比较少,唯一的不同就是他要求发送时按照顺序,如果当条数据发送失败,那么就阻塞线程,这样就保证了消息的严格顺序【即使在重试状态下发送的消息】

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Producer {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers", "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092");
        //ack模式,all是最慢但最安全的
        props.put("acks", "-1");
        //失败重试次数
        props.put("retries", 1);
        //每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端
        props.put("batch.size", 10);
        //props.put("max.request.size",10);
        //消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
        props.put("linger.ms", 10000);
        //整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端
        //buffer.memory要大于batch.size,否则会报申请内存不足的错误
        props.put("buffer.memory", 10240);
        //序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        org.apache.kafka.clients.producer.Producer producer=new KafkaProducer(props);
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("tml-second", Integer.toString(i), "tml-second消息:"+i)).get();
        producer.close();
    }
}

防止消息重复提交

生产者策略的时候我们提到过,需要防止消息重复提交,也即精准一次提交,我们有两种级别,一种是幂等模式【一个broker的会话周期精准一次】,另一种是事务模式【全局的精准一次】

幂等模式

代码写法类似,只需要给配置里加一个配置项

//幂等模式
 props.put("enable.idempotence", true);

一旦设置了该属性,那么retries默认是Integer.MAX_VALUE ,acks默认是all【-1】。

事务模式

事务模式的写法略有不同:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Producer {
    public static void main(String[] args)  {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092");
        props.put("transactional.id", "my_transactional_id");
        org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
        producer.initTransactions();
        try {
            //数据发送必须在beginTransaction()和commitTransaction()中间,否则会报状态不对的异常
            producer.beginTransaction();
            for (int i = 0; i < 100; i++)
                producer.send(new ProducerRecord<>("tml-second", Integer.toString(i), Integer.toString(i)));
            producer.commitTransaction();
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
            // 这些异常不能被恢复,因此必须要关闭并退出Producer
            producer.close();
        } catch (KafkaException e) {
            // 出现其它异常,终止事务
            producer.abortTransaction();
        }
        producer.close();
    }
}

消费者API

在官方文档中,我们可以看到Kafka的消费者API列表消费者API,有构造方法,和实例方法。构造方法有如下四种:

也有45种方法【非抽象的实例方法】以及4种弃用方法。消费者提交方式有以下几种:

  • 自动提交:kafka管理offset的提交
  • 手动提交:手动同步提交和手动异步提交

按照这种结构我们看下提交方式。

自动提交offset

提交的代码如下:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
    public static void main(String[] args)  {
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers", "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092");
        //设置消费者组,组名字自定义,组名字相同的消费者在一个组
        props.put("group.id", "tml-group");
        //开启offset自动提交
        props.put("enable.auto.commit", "true");
        //自动提交时间间隔
        props.put("auto.commit.interval.ms", "1000");
        //序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //实例化一个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //消费者订阅主题,可以订阅多个主题
        consumer.subscribe(Arrays.asList("tml-second"));
        //死循环不停的从broker中拿数据
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

可以看到提交的效果

手动同步提交offset

通常从Kafka拿到的消息是要做业务处理,而且业务处理完成才算真正消费成功,所以需要客户端控制offset提交时间

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class Consumer {
    public static void main(String[] args)  {
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers",  "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092");
        //设置消费者组,组名字自定义,组名字相同的消费者在一个组
        props.put("group.id", "tml_group");
        //开启offset自动提交
        props.put("enable.auto.commit", "false");
        //序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //实例化一个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //消费者订阅主题,可以订阅多个主题
        consumer.subscribe(Arrays.asList("tml-second"));
        final int minBatchSize = 50;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {
                //insertIntoDb(buffer);
                for (ConsumerRecord bf : buffer) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", bf.offset(), bf.key(), bf.value());
                }
                consumer.commitSync();
                buffer.clear();
            }
        }
    }
}

手动异步提交offset

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class Consumer {
    public static void main(String[] args)  {
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers",  "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092");
        //设置消费者组,组名字自定义,组名字相同的消费者在一个组
        props.put("group.id", "tml_group");
        //开启offset自动提交
        props.put("enable.auto.commit", "false");
        //序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //实例化一个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //消费者订阅主题,可以订阅多个主题
        consumer.subscribe(Arrays.asList("tml-second"));
        final int minBatchSize = 50;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {
                //insertIntoDb(buffer);
                for (ConsumerRecord bf : buffer) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", bf.offset(), bf.key(), bf.value());
                }
                consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public  void  onComplete(Map<TopicPartition,
                                                OffsetAndMetadata> offsets, Exception exception) {
                        if (exception != null) {
                            System.err.println("Commit  failed  for"  +
                                    offsets);
                        }
                    }
                });
                buffer.clear();
            }
        }
    }
}

趟了无数的坑,终于把Kafka学习完了,接下来开始Redis之旅,开始由业务架构向基础架构渗透,上可接客户,中可玩儿平台,下可探基础。完成SaaS、PaaS以及IaaS的闭环

部分内容来自 https://blog.csdn.net/wangzhanzheng/article/details/80801059

相关文章
|
1月前
|
开发框架 .NET API
RESTful API 设计与实现:C# 开发者的一分钟入门
【10月更文挑战第5天】本文从零开始,介绍了如何使用 C# 和 ASP.NET Core 设计并实现一个简单的 RESTful API。首先解释了 RESTful API 的概念及其核心原则,然后详细说明了设计 RESTful API 的关键步骤,包括资源识别、URI 设计、HTTP 方法选择、状态码使用和错误处理。最后,通过一个用户管理 API 的示例,演示了如何创建项目、定义模型、实现控制器及运行测试,帮助读者掌握 RESTful API 的开发技巧。
62 7
|
3月前
|
前端开发 JavaScript 安全
入门Vue+.NET 8 Web Api记录(一)
入门Vue+.NET 8 Web Api记录(一)
146 4
|
1月前
|
消息中间件 NoSQL Kafka
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
62 5
|
5月前
|
JavaScript API 开发者
GraphQL API开发入门:比RESTful更高效的数据查询方式
**GraphQL API开发入门摘要** GraphQL是一种更高效的数据查询方式,解决RESTful API的过度或不足获取数据问题。它允许客户端按需获取数据,减少网络传输,支持一次请求获取多资源。强类型和自描述特性方便了开发。文章通过一个简单的Node.js示例,展示如何使用`apollo-server-express`搭建GraphQL服务器,包括定义Schema、实现Resolver和创建服务器。通过测试,显示了GraphQL如何提供精确数据和优化查询效率。对于复杂数据需求,GraphQL是现代API设计的有效选择。
70 0
|
3月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
133 58
|
1月前
|
机器学习/深度学习 算法 API
机器学习入门(五):KNN概述 | K 近邻算法 API,K值选择问题
机器学习入门(五):KNN概述 | K 近邻算法 API,K值选择问题
|
3月前
|
开发者
告别繁琐代码,JSF标签库带你走进高效开发的新时代!
【8月更文挑战第31天】JSF(JavaServer Faces)标准标签库为页面开发提供了大量组件标签,如`&lt;h:inputText&gt;`、`&lt;h:dataTable&gt;`等,简化代码、提升效率并确保稳定性。本文通过示例展示如何使用这些标签实现常见功能,如创建登录表单和展示数据列表,帮助开发者更高效地进行Web应用开发。
43 0
|
3月前
|
前端开发 API 开发者
【React状态管理新思路】Context API入门:从零开始摆脱props钻孔的优雅之道,全面解析与实战案例分享!
【8月更文挑战第31天】React 的 Context API 有效解决了多级组件间状态传递的 &quot;props 钻孔&quot; 问题,使代码更简洁、易维护。本文通过电子商务网站登录状态管理案例,详细介绍了 Context API 的使用方法,包括创建、提供及消费 Context,以及处理多个 Context 的场景,适合各水平开发者学习与应用,提高开发效率和代码质量。
40 0
|
3月前
|
API 开发工具
langchain 入门指南(一)- 准备 API KEY
langchain 入门指南(一)- 准备 API KEY
235 0
|
5月前
|
Linux API 数据安全/隐私保护
一文搞懂:【零基础】易盛9.0API入门二:登陆
一文搞懂:【零基础】易盛9.0API入门二:登陆
96 1