Kafka API实践

简介: 系统学习三步骤走:理解原理、搭建系统、Api练习。从哪里找到Api?Document和git。例如,Kafka在github上的地址github.com/apache/kafka,找到example目录。

系统学习三步骤走:理解原理、搭建系统、Api练习。
从哪里找到Api?Document和git。
例如,Kafka在github上的地址github.com/apache/kafka,找到example目录。
这也算是一个小技巧/apache/xxx,就是XXX的git目录。

Kafka文档路径更好找,就在kafka.apache.org
别用百度搜索,再跳转一次,记住xxx.apache.org就是apache项目的主目录。

Producer 和 Comsumer

如图,Kafka系统中包含三种角色,(1)producer生产者(2)Kafka Cluster消息队列(3)consumer消费者。

在上篇文章中,介绍了Kafka安装,通过启动Kafka server,实现了Kafka Cluster。而生产者消费者,可以通过Api实现写入和读取消息队列。

一、 pom.xml文件,引入依赖

Kafka Api 被包含在Kafka-clients包中,修改pom.xml文件。

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
        </dependency>

二、编写Producer

1.Producer 配置

Properties props = new Properties();
props.put("bootstrap.servers", "hbase:9092,datanode2:9092,datanode3:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  • bootstrap.servers:kafka server的地址
  • acks:写入kafka时,leader负责一个该partion读写,当写入partition时,需要将记录同步到repli节点,all是全部同步节点都返回成功,leader才返回ack。
  • retris:写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
  • batch.size:produce积累到一定数据,一次发送。
  • buffer.memory: produce积累数据一次发送,缓存大小达到buffer.memory就发送数据。
  • linger.ms :当设置了缓冲区,消息就不会即时发送,如果消息总不够条数、或者消息不够buffer大小就不发送了吗?当消息超过linger时间,也会发送。
  • key/value serializer:序列化类。

2.KafkaProducer

  • KafkaProducer
import org.apache.kafka.clients.producer.KafkaProducer;

Properties props = getConfig();
Producer<String, String> producer =
                        new KafkaProducer<String, String>(props);
  • Producer是一个接口,声明了同步send和异步send两个重要方法。
    public Future<RecordMetadata> send(ProducerRecord<K, V> record);
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
  • ProducerRecord 消息实体类,每条消息由(topic,key,value,timestamp)四元组封装。一条消息key可以为空和timestamp可以设置当前时间为默认值。
ProducerRecord record = new ProducerRecord<String, String>
("exam2", Integer.toString(i), Integer.toString(i));//exam2为topic
producer.send(record);

异步发送

long startTime = System.currentTimeMillis();
producer.send(new ProducerRecord<>(topic,messagekey,messageValue), 
        new DemoCallBack(startTime, messagekey, messageValue));

DemoCallBack异步回调接口,包含2个函数,构造函数和onCompletion函数。
返回的对象RecordMetadata包含partition和offset两个信息。

class DemoCallBack implements Callback {

    private final long startTime;
    private final String key;
    private final String message;

    public DemoCallBack(long startTime, String key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }
    /**
     * @param metadata  The metadata for the record that was sent (i.e. the partition and offset). Null if an error
     *                  occurred.
     * @param exception The exception thrown during processing of this record. Null if no error occurred.
     */
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.println(
                "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                    "), " +
                    "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
        } else {
            exception.printStackTrace();
        }
    }
}

控制台输出结果,能够看出回调函数不是异步执行的。

i:0
i:1
message(0, 0) sent to partition(6), offset(303) in 680 ms
i:2
message(1, 1) sent to partition(9), offset(295) in 126 ms
message(2, 2) sent to partition(8), offset(343) in 53 ms
i:3
message(3, 3) sent to partition(3), offset(331) in 18 ms
i:4
message(4, 4) sent to partition(3), offset(332) in 8 ms
i:5
message(5, 5) sent to partition(0), offset(310) in 22 ms
i:6
message(6, 6) sent to partition(8), offset(344) in 8 ms
i:7
message(7, 7) sent to partition(9), offset(296) in 19 ms
i:8
i:9
message(9, 9) sent to partition(3), offset(333) in 23 ms
message(8, 8) sent to partition(7), offset(287) in 136 ms
i:10
message(10, 10) sent to partition(6), offset(304) in 21 ms

三、编写Consumer

1.Consumer 配置

Properties props = new Properties();
props.put("bootstrap.servers", "hbase:9092,datanode2:9092,datanode3:9092");
props.put("group.id", "testGroup");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  • group.id:testGroup。由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名。
  • enable.auto.commit:true。设置自动提交offset。

2.KafkaConsumer

KafkaConsumer

import org.apache.kafka.clients.consumer.KafkaConsumer;

Properties props = getConfig();
consumer = new KafkaConsumer<String, String>(props);

Consumer接口,声明了subscribe和poll两个重要方法。KafkaConsumer实现了Consumer接口。

public void subscribe(Collection<String> topics);
public ConsumerRecords<K, V> poll(long timeout);

可以创建多个consumer线程,并发拉取消息。由于consumer是线程不安全的,合适的做法是每个线程创建并维护一个consumer对象。

自定义KafkaConsumerRunner是一个多线程类,维护一个KafkaConsumer对象。

// Thread to consume kafka data
public static class KafkaConsumerRunner implements Runnable
{
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer<String, String> consumer;
    private final String topic;

    public KafkaConsumerRunner(String topic)
    {
        Properties props = getConfig();
        consumer = new KafkaConsumer<String, String>(props);
        this.topic = topic;
    }

    public void handleRecord(ConsumerRecord record)
    {
        System.out.println("name: " + Thread.currentThread().getName()
                + " ; topic: " + record.topic() + "; partition:"+record.partition()+
                " ; offset" + record.offset() + " ; key: " + record.key() + " ; value: " + record.value());
    }

    public void run()
    {
        try {
            // subscribe
            consumer.subscribe(Arrays.asList(topic));
            while (!closed.get()) {
                //read data
                ConsumerRecords<String, String> records = consumer.poll(1000);
                // Handle new records
                for (ConsumerRecord<String, String> record : records) {
                    handleRecord(record);
                }
            }
        }
        catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) {
                throw e;
            }
        }
        finally {
            consumer.close();
        }
    }

    // Shutdown hook which can be called from a separate thread
    public void shutdown()
    {
        closed.set(true);
        consumer.wakeup();
    }
}

线程池启动多个consumer线程,

int numConsumers = 3;
final String topic = "exam2";
final ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
final List<KafkaConsumerRunner> consumers = new ArrayList<KafkaConsumerRunner>();
for (int i = 0; i < numConsumers; i++) {
    KafkaConsumerRunner consumer = new KafkaConsumerRunner(topic);
    consumers.add(consumer);
    executor.submit(consumer);
}
执行结果:

name: pool-1-thread-3 ; topic: exam2; partition:9 ; offset445 ; key: 1 ; value: 1
name: pool-1-thread-2 ; topic: exam2; partition:6 ; offset448 ; key: 0 ; value: 0
name: pool-1-thread-3 ; topic: exam2; partition:8 ; offset508 ; key: 2 ; value: 2
name: pool-1-thread-1 ; topic: exam2; partition:3 ; offset495 ; key: 3 ; value: 3
name: pool-1-thread-1 ; topic: exam2; partition:3 ; offset496 ; key: 4 ; value: 4
name: pool-1-thread-1 ; topic: exam2; partition:0 ; offset461 ; key: 5 ; value: 5
name: pool-1-thread-3 ; topic: exam2; partition:8 ; offset509 ; key: 6 ; value: 6
name: pool-1-thread-3 ; topic: exam2; partition:9 ; offset446 ; key: 7 ; value: 7
name: pool-1-thread-3 ; topic: exam2; partition:7 ; offset428 ; key: 8 ; value: 8
name: pool-1-thread-1 ; topic: exam2; partition:3 ; offset497 ; key: 9 ; value: 9
name: pool-1-thread-2 ; topic: exam2; partition:6 ; offset449 ; key: 10 ; value: 10
name: pool-1-thread-3 ; topic: exam2; partition:8 ; offset510 ; key: 11 ; value: 11

观察结果
  1. 保证每个consumer线程消费不同的partition。
  2. partition之间不能保证顺序进行,里如key:1和key:0
  3. 同一个partition内保证顺序性,即offset保证在同一partition内顺序进行。

优雅的关闭子线程

在main函数中,添加hook进程关闭的函数。new Thread 在进程关闭时触发,调用Consumer的shutdown函数,设置while循环的退出条件while (!closed.get())

Runtime.getRuntime().addShutdownHook(new Thread()
{
    @Override
    public void run()
    {
        for (KafkaConsumerRunner consumer : consumers) {
            consumer.shutdown();
        }
        executor.shutdown();
        try {
            executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("process quit");
    }
});

手动控制offset

//设置由用户触发提交offset
props.put("enable.auto.commit", "false");
for (ConsumerRecord<String, String> record : records) {
    handleRecord(record);
}
consumer.commitAsync();
运行结果:
  1. poll拉取的数据还是顺序返回,不会反复拉取offset的数据。
  2. 重启进程,由于offset没有提交,会重头处理offset。

四、总结

本文测试了kafka提供的Api。
在实际应用中kafka会和spark stream结合,采用流式计算的方式处理kafka中数据。

目录
相关文章
|
2月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
110 4
|
3月前
|
缓存 数据挖掘 API
商品详情API接口的应用实践
本文探讨了商品详情API接口在电商领域的应用实践,介绍了其作为高效数据交互方式的重要性,包括实时获取商品信息、提升用户体验和运营效率。文章详细描述了API接口的特点、应用场景如商品展示、SEO优化、数据分析及跨平台整合,并提出了缓存机制、分页加载、异步加载和错误处理等优化策略,旨在全面提升电商运营效果。
|
13天前
|
存储 API 计算机视觉
自学记录HarmonyOS Next Image API 13:图像处理与传输的开发实践
在完成数字版权管理(DRM)项目后,我决定挑战HarmonyOS Next的图像处理功能,学习Image API和SendableImage API。这两个API支持图像加载、编辑、存储及跨设备发送共享。我计划开发一个简单的图像编辑与发送工具,实现图像裁剪、缩放及跨设备共享功能。通过研究,我深刻体会到HarmonyOS的强大设计,未来这些功能可应用于照片编辑、媒体共享等场景。如果你对图像处理感兴趣,不妨一起探索更多高级特性,共同进步。
68 11
|
9天前
|
人工智能 数据可视化 API
自学记录鸿蒙API 13:Calendar Kit日历功能从学习到实践
本文介绍了使用HarmonyOS的Calendar Kit开发日程管理应用的过程。通过API 13版本,不仅实现了创建、查询、更新和删除日程等基础功能,还深入探索了权限请求、日历配置、事件添加及查询筛选等功能。实战项目中,开发了一个智能日程管理工具,具备可视化管理、模糊查询和智能提醒等特性。最终,作者总结了模块化开发的优势,并展望了未来加入语音助手和AI推荐功能的计划。
123 1
|
2月前
|
XML JSON 缓存
深入理解RESTful API设计原则与实践
在现代软件开发中,构建高效、可扩展的应用程序接口(API)是至关重要的。本文旨在探讨RESTful API的核心设计理念,包括其基于HTTP协议的特性,以及如何在实际应用中遵循这些原则来优化API设计。我们将通过具体示例和最佳实践,展示如何创建易于理解、维护且性能优良的RESTful服务,从而提升前后端分离架构下的开发效率和用户体验。
|
1月前
|
机器学习/深度学习 搜索推荐 API
淘宝/天猫按图搜索(拍立淘)API的深度解析与应用实践
在数字化时代,电商行业迅速发展,个性化、便捷性和高效性成为消费者新需求。淘宝/天猫推出的拍立淘API,利用图像识别技术,提供精准的购物搜索体验。本文深入探讨其原理、优势、应用场景及实现方法,助力电商技术和用户体验提升。
|
2月前
|
缓存 API 开发者
构建高效后端服务:RESTful API设计原则与实践
【10月更文挑战第43天】在数字化时代的浪潮中,后端服务的稳定性和效率成为企业竞争力的关键。本文将深入探讨如何构建高效的后端服务,重点介绍RESTful API的设计原则和实践技巧,帮助开发者提升服务的可用性、可扩展性和安全性。通过实际代码示例,我们将展示如何将这些原则应用到日常开发工作中,以确保后端服务能够支撑起现代Web和移动应用的需求。
|
2月前
|
存储 JSON 测试技术
构建高效后端API:实践和原则
【10月更文挑战第43天】本文深入探讨了如何设计和实现高效、可维护的后端API,强调了设计哲学、最佳实践和常见陷阱。通过具体示例,我们展示了如何运用这些原则来提高API的性能和可用性。
|
2月前
|
Prometheus 监控 Java
深入探索:自制Agent监控API接口耗时实践
在微服务架构中,监控API接口的调用耗时对于性能优化至关重要。通过监控接口耗时,我们可以识别性能瓶颈,优化服务响应速度。本文将分享如何自己动手实现一个Agent来统计API接口的调用耗时,提供一种实用的技术解决方案。
66 3
|
1月前
|
监控 搜索推荐 测试技术
电商API的测试与用途:深度解析与实践
在电子商务蓬勃发展的今天,电商API成为连接电商平台、商家、消费者和第三方开发者的重要桥梁。本文深入探讨了电商API的核心功能,包括订单管理、商品管理、用户管理、支付管理和物流管理,并介绍了有效的测试技巧,如理解API文档、设计测试用例、搭建测试环境、自动化测试、压力测试、安全性测试等。文章还详细阐述了电商API的多样化用途,如商品信息获取、订单管理自动化、用户数据管理、库存同步、物流跟踪、支付处理、促销活动管理、评价管理、数据报告和分析、扩展平台功能及跨境电商等,旨在为开发者和电商平台提供有益的参考。
47 0
下一篇
开通oss服务