一招让Kafka达到最佳吞吐量

简介: 一招让Kafka达到最佳吞吐量

通过上一篇文章对 dq 生产者 的分析,我们知道 dq 是基于 beanstalk 的封装。至于 生产者 我们在后续的文章继续分享,本篇文章先来分析一下 go-queue 中的 kq

kq 基于 kafka 封装,设计之初是为了使 kafka 的使用更人性化。那就来看看 kq 的使用。

上手使用

func main() {
  // 1. 初始化
 pusher := kq.NewPusher([]string{
  "127.0.0.1:19092",
  "127.0.0.1:19092",
  "127.0.0.1:19092",
 }, "kq")
 ticker := time.NewTicker(time.Millisecond)
 for round := 0; round < 3; round++ {
  select {
  case <-ticker.C:
   count := rand.Intn(100)
   m := message{
    Key:     strconv.FormatInt(time.Now().UnixNano(), 10),
    Value:   fmt.Sprintf("%d,%d", round, count),
    Payload: fmt.Sprintf("%d,%d", round, count),
   }
   body, err := json.Marshal(m)
   if err != nil {
    log.Fatal(err)
   }
   fmt.Println(string(body))
      // 2. 写入
   if err := pusher.Push(string(body)); err != nil {
    log.Fatal(err)
   }
  }
 }
}

kafka cluster 配置以及 topic 传入,你就得到一个操作 kafkapush operator

至于写入消息,简单的调用 pusher.Push(msg) 就行。是的,就这么简单!

当然,目前只支持单个 msg 写入。可能有人会疑惑,那就继续往下看,为什么只能一条一条写入?

初始化

一起看看 pusher 初始化哪些步骤:

NewPusher(clusterAddrs, topic, opts...)
 |- kafka.NewWriter(kfConfig)        // 与 kf 之前的连接
 |- executor = executors.NewChunkExecutor()  // 设置内部写入的executor为字节数定量写入
  1. 建立与 kafka cluster 的连接。此处肯定就要传入 kafka config
  2. 设置内部暂存区的写入函数以及刷新规则。

使用 chunkExecutor 作用不言而喻:将随机写 -> 批量写,减少 I/O 消耗;同时保证单次写入不能超过默认的 1M 或者自己设定的最大写入字节数。

其实再往  chunkExecutor 内部看,其实每次触发插入有两个指标:

  • maxChunkSize:单次最大写入字节数
  • flushInterval:刷新暂存消息插入的间隔时间

在触发写入,只要满足任意一个指标都会执行写入。同时在 executors 都有设置插入间隔时间,以防暂存区写入阻塞而暂存区内消息一直不被刷新清空。

更多关于 executors 可以参看以下:https://zeromicro.github.io/go-zero/executors.html

生产者插入

根据上述初始化对 executors 介绍,插入过程中也少不了它的配合:

func (p *Pusher) Push(v string) error {
  // 1. 将 msg -> kafka 内部的 Message
 msg := kafka.Message{
  Key:   []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
  Value: []byte(v),
 }
  
  // 使用 executor.Add() 插入内部的 container
  // 当 executor 初始化失败或者是内部发生错误,也会将 Message 直接插入 kafka
 if p.executor != nil {
  return p.executor.Add(msg, len(v))
 } else {
  return p.produer.WriteMessages(context.Background(), msg)
 }
}

过程其实很简单。那 executors.Add(msg, len(msg)) 是怎么把 msg 插入到 kafka 呢?

插入的逻辑其实在初始化中就声明了:

pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
  chunk := make([]kafka.Message, len(tasks))
   // 1
  for i := range tasks {
   chunk[i] = tasks[i].(kafka.Message)
  }
   // 2
  if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {
   logx.Error(err)
  }
 }, newOptions(opts)...)
  1. 触发插入时,将暂存区中存储的 []msg 依次拿出,作为最终插入消息集合;
  2. 将上一步的消息集合,作为一个批次插入 kafkatopic

这样 pusher -> chunkExecutor -> kafka 一个链路就出现了。下面用一张图形象表达一下:

image.png

框架地址

https://github.com/tal-tech/go-queue

同时在 go-queue 也大量使用 go-zero 的 批量处理工具库 executors

https://github.com/tal-tech/go-zero

欢迎使用 go-zero & go-queuestar 支持我们!一起构建 go-zero 生态!👍

相关文章
|
消息中间件 存储 缓存
【Kafka】(十)Kafka 如何实现高吞吐量
【Kafka】(十)Kafka 如何实现高吞吐量
774 0
|
1月前
|
消息中间件 存储 缓存
为什么 Kafka 的吞吐量那么高?
为什么 Kafka 的吞吐量那么高?
36 2
|
3月前
|
消息中间件 Kafka Apache
kafka vs rocketmq: 不要只顾着吞吐量而忘了延迟这个指标
这篇文章讨论了Apache RocketMQ和Kafka的对比,强调RocketMQ在低延迟、消息重试与追踪、海量Topic、多租户等方面进行了优化,特别是在小包非批量和大量分区场景下的吞吐量超越Kafka,适合电商和金融领域等高并发、高可靠和高可用场景。
108 0
|
4月前
|
消息中间件 存储 缓存
面试题Kafka问题之Kafka的生产消费基本流程如何解决
面试题Kafka问题之Kafka的生产消费基本流程如何解决
47 1
|
4月前
|
消息中间件 存储 Java
Apache Kafka是分布式消息系统,用于高吞吐量的发布订阅
【7月更文挑战第1天】Apache Kafka是分布式消息系统,用于高吞吐量的发布订阅。在Java中,开发者使用Kafka的客户端库创建生产者和消费者。生产者发送消息到主题,消费者订阅并消费。Kafka提供消息持久化、容灾机制,支持分区和复制以确保高可用性。通过优化如分区、批处理和消费者策略,可适应高并发场景。简单的Java示例展示了如何创建和交互消息。
66 0
|
5月前
|
消息中间件 缓存 监控
Kafka性能优化策略综述:提升吞吐量与可靠性
Kafka性能优化策略综述:提升吞吐量与可靠性
749 0
|
6月前
|
消息中间件 监控 Java
腾讯面试:如何提升Kafka吞吐量?
Kafka 是一个分布式流处理平台和消息系统,用于构建实时数据管道和流应用。它最初由 LinkedIn 开发,后来成为 Apache 软件基金会的顶级项目。 Kafka 特点是**高吞吐量、分布式架构、支持持久化、集群水平扩展和消费组消息消费**,具体来说: 1. **高吞吐量**:Kafka 具有高性能和低延迟的特性,能够处理大规模数据,并支持每秒数百万条消息的高吞吐量。 2. **分布式架构**:Kafka 采用分布式架构,可以水平扩展,多个节点之间能够实现负载均衡和高可用性。 3. **可持久化**:Kafka 将消息持久化到磁盘中,保证消息的可靠性,即使消费者下线或出现故障,消
92 0
|
消息中间件 关系型数据库 MySQL
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
|
消息中间件 存储 Kafka
高吞吐量分布式消息系统:深入了解 Apache Kafka
在现代的分布式系统中,消息传递已经成为实现异步通信、日志记录和事件驱动架构的核心。Apache Kafka,作为一款高吞吐量、持久性和分布式的消息系统,正被越来越多的企业和开发者用于构建实时数据流和事件处理平台。本文将为您详细介绍 Apache Kafka 的核心概念、特性以及在分布式架构中的应用。
206 0
BXA
|
消息中间件 存储 Cloud Native
Spring Boot与 Kafka实现高吞吐量消息处理大规模数据问题
现代数据量越来越庞大对数据处理的效率提出了更高的要求。Apache Kafka是目前流行的分布式消息队列之一。Spring Boot是现代Java应用程序快速开发的首选框架。综合使用Spring Boot和Apache Kafka可以实现高吞吐量消息处理。
BXA
406 0