go操作Kafka

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: go操作Kafka

1. Kafka介绍

1.1.1. Kafka是什么

    kafka使用scala开发,支持多语言客户端(c++、java、python、go等)
    Kafka最先由LinkedIn公司开发,之后成为Apache的顶级项目。
    Kafka是一个分布式的、分区化、可复制提交的日志服务
    LinkedIn使用Kafka实现了公司不同应用程序之间的松耦和,那么作为一个可扩展、高可靠的消息系统
    支持高Throughput的应用
    scale out:无需停机即可扩展机器
    持久化:通过将数据持久化到硬盘以及replication防止数据丢失
    支持online和offline的场景

1.1.2. Kafka的特点

Kafka是分布式的,其所有的构件borker(服务端集群)、producer(消息生产)、consumer(消息消费者)都可以是分布式的。


在消息的生产时可以使用一个标识topic来区分,且可以进行分区;每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。


同时为发布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。


消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡


1.1.3. 常用的场景

监控:主机通过Kafka发送与系统和应用程序健康相关的指标,然后这些信息会被收集和处理从而创建监控仪表盘并发送警告。


消息队列: 应用程度使用Kafka作为传统的消息系统实现标准的队列和消息的发布—订阅,例如搜索和内容提要(Content Feed)。比起大多数的消息系统来说,Kafka有更好的吞吐量,内置的分区,冗余及容错性,这让Kafka成为了一个很好的大规模消息处理应用的解决方案。消息系统 一般吞吐量相对较低,但是需要更小的端到端延时,并尝尝依赖于Kafka提供的强大的持久性保障。在这个领域,Kafka足以媲美传统消息系统,如ActiveMR或RabbitMQ


站点的用户活动追踪: 为了更好地理解用户行为,改善用户体验,将用户查看了哪个页面、点击了哪些内容等信息发送到每个数据中心的Kafka集群上,并通过Hadoop进行分析、生成日常报告。


流处理:保存收集流数据,以提供之后对接的Storm或其他流式计算框架进行处理。很多用户会将那些从原始topic来的数据进行 阶段性处理,汇总,扩充或者以其他的方式转换到新的topic下再继续后面的处理。例如一个文章推荐的处理流程,可能是先从RSS数据源中抓取文章的内 容,然后将其丢入一个叫做“文章”的topic中;后续操作可能是需要对这个内容进行清理,比如回复正常数据或者删除重复数据,最后再将内容匹配的结果返 还给用户。这就在一个独立的topic之外,产生了一系列的实时数据处理的流程。


日志聚合:使用Kafka代替日志聚合(log aggregation)。日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或HDFS)进行处理。然而Kafka忽略掉 文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让Kafka处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的 系统比如Scribe或者Flume来说,Kafka提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟


持久性日志:Kafka可以为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。Kafka中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka类似于Apache BookKeeper项目。


1.1.4. Kafka中包含以下基础概念

    1.Topic(话题):Kafka中用于区分不同类别信息的类别名称。由producer指定
    2.Producer(生产者):将消息发布到Kafka特定的Topic的对象(过程)
    3.Consumers(消费者):订阅并处理特定的Topic中的消息的对象(过程)
    4.Broker(Kafka服务集群):已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。
    5.Partition(分区):Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)
    Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。

1.1.5. 消息

消息由一个固定大小的报头和可变长度但不透明的字节阵列负载。报头包含格式版本和CRC32效验和以检测损坏或截断

1.1.6. 消息格式

    1. 4 byte CRC32 of the message
    2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
    3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
       bit 0 ~ 2 : Compression codec
           0 : no compression
           1 : gzip
           2 : snappy
           3 : lz4
       bit 3 : Timestamp type
           0 : create time
           1 : log append time
       bit 4 ~ 7 : reserved
    4. (可选) 8 byte timestamp only if "magic" identifier is greater than 0
    5. 4 byte key length, containing length K
    6. K byte key
    7. 4 byte payload length, containing length V
    8. V byte payload


2. Kafka深层介绍

2.1.1. 架构介绍

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ieGSQMR1-1670167723057)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/82134aab3d36429ebeb079a7b84390a5~tplv-k3u1fbpfcp-watermark.image?)]


Producer:Producer即生产者,消息的产生者,是消息的⼊口。


kafka cluster:kafka集群,一台或多台服务器组成


Broker:Broker是指部署了Kafka实例的服务器节点。每个服务器上有一个或多个kafka的实 例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的 编号,如图中的broker-0、broker-1等……

Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上 都可以创建多个topic。实际应用中通常是一个业务线建一个topic。

Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞 吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的⽂件夹!

Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的 时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10 个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机 器对同一个分区也只可能存放一个副本(包括自己)。

Consumer:消费者,即消息的消费方,是消息的出口。


Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分 区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个 topic的不同分区的数据,这也是为了提高kafka的吞吐量!

2.1.2. ⼯作流程

我们看上⾯的架构图中,producer就是生产者,是数据的入口。Producer在写入数据的时候会把数据 写入到leader中,不会直接将数据写入follower!那leader怎么找呢?写入的流程又是什么样的呢?我 们看下图:


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NEZFHbN8-1670167723058)(https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/dc72b5ee79be4f84a2b767ead1b3c9bd~tplv-k3u1fbpfcp-watermark.image?)]

    1.⽣产者从Kafka集群获取分区leader信息
    2.⽣产者将消息发送给leader
    3.leader将消息写入本地磁盘
    4.follower从leader拉取消息数据
    5.follower将消息写入本地磁盘后向leader发送ACK
    6.leader收到所有的follower的ACK之后向生产者发送ACK

2.1.3. 选择partition的原则

那在kafka中,如果某个topic有多个partition,producer⼜怎么知道该将数据发往哪个partition呢? kafka中有几个原则:

    1.partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
    2.如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
    3.如果既没指定partition,又没有设置key,则会采用轮询⽅式,即每次取一小段时间的数据写入某
    个partition,下一小段的时间写入下一个partition

2.1.4. ACK应答机制

producer在向kafka写入消息的时候,可以设置参数来确定是否确认kafka接收到数据,这个参数可设置 的值为 0,1,all


0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效 率最高。

1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。

all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保 leader发送成功和所有的副本都完成备份。安全性最⾼高,但是效率最低。

最后要注意的是,如果往不存在的topic写数据,kafka会⾃动创建topic,partition和replication的数量 默认配置都是1。

2.1.5. Topic和数据⽇志

topic 是同⼀类别的消息记录(record)的集合。在Kafka中,⼀个主题通常有多个订阅者。对于每个 主题,Kafka集群维护了⼀个分区数据⽇志⽂件结构如下:


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rUtESF3x-1670167723059)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ab0bf83c38604244ac4b723fac2ea710~tplv-k3u1fbpfcp-watermark.image?)]


每个partition都是⼀个有序并且不可变的消息记录集合。当新的数据写⼊时,就被追加到partition的末 尾。在每个partition中,每条消息都会被分配⼀个顺序的唯⼀标识,这个标识被称为offset,即偏移 量。注意,Kafka只保证在同⼀个partition内部消息是有序的,在不同partition之间,并不能保证消息 有序。


Kafka可以配置⼀个保留期限,⽤来标识⽇志会在Kafka集群内保留多⻓时间。Kafka集群会保留在保留 期限内所有被发布的消息,不管这些消息是否被消费过。⽐如保留期限设置为两天,那么数据被发布到 Kafka集群的两天以内,所有的这些数据都可以被消费。当超过两天,这些数据将会被清空,以便为后 续的数据腾出空间。由于Kafka会将数据进⾏持久化存储(即写⼊到硬盘上),所以保留的数据⼤⼩可 以设置为⼀个⽐较⼤的值。


2.1.6. Partition结构

Partition在服务器上的表现形式就是⼀个⼀个的⽂件夹,每个partition的⽂件夹下⾯会有多组segment ⽂件,每组segment⽂件⼜包含 .index ⽂件、 .log ⽂件、 .timeindex ⽂件三个⽂件,其中 .log ⽂ 件就是实际存储message的地⽅,⽽ .index 和 .timeindex ⽂件为索引⽂件,⽤于检索消息。


2.1.7. 消费数据

多个消费者实例可以组成⼀个消费者组,并⽤⼀个标签来标识这个消费者组。⼀个消费者组中的不同消 费者实例可以运⾏在不同的进程甚⾄不同的服务器上。


如果所有的消费者实例都在同⼀个消费者组中,那么消息记录会被很好的均衡的发送到每个消费者实 例。


如果所有的消费者实例都在不同的消费者组,那么每⼀条消息记录会被⼴播到每⼀个消费者实例。


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-U3eYhFiv-1670167723060)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/1ecf0667fcf641ed80af3aa49c4c0fee~tplv-k3u1fbpfcp-watermark.image?)]


举个例⼦,如上图所示⼀个两个节点的Kafka集群上拥有⼀个四个partition(P0-P3)的topic。有两个 消费者组都在消费这个topic中的数据,消费者组A有两个消费者实例,消费者组B有四个消费者实例。 从图中我们可以看到,在同⼀个消费者组中,每个消费者实例可以消费多个分区,但是每个分区最多只 能被消费者组中的⼀个实例消费。也就是说,如果有⼀个4个分区的主题,那么消费者组中最多只能有4 个消费者实例去消费,多出来的都不会被分配到分区。其实这也很好理解,如果允许两个消费者实例同 时消费同⼀个分区,那么就⽆法记录这个分区被这个消费者组消费的offset了。如果在消费者组中动态 的上线或下线消费者,那么Kafka集群会⾃动调整分区与消费者实例间的对应关系。


3. 操作Kafka

3.1.1. sarama

Go语言中连接kafka使用第三方库: github.com/Shopify/sarama。

3.1.2. 下载及安装

    go get github.com/Shopify/sarama

注意事项: sarama v1.20之后的版本加入了zstd压缩算法,需要用到cgo,在Windows平台编译时会提示类似如下错误: github.com/DataDog/zstd exec: “gcc”:executable file not found in %PATH% 所以在Windows平台请使用v1.19版本的sarama。(如果不会版本控制请查看博客里面的go module章节)


3.1.3. 连接kafka发送消息

package main
import (
    "fmt"
    "github.com/Shopify/sarama"
)
// 基于sarama第三方库开发的kafka client
func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
    config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
    config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
    // 构造一个消息
    msg := &sarama.ProducerMessage{}
    msg.Topic = "web_log"
    msg.Value = sarama.StringEncoder("this is a test log")
    // 连接kafka
    client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    if err != nil {
        fmt.Println("producer closed, err:", err)
        return
    }
    defer client.Close()
    // 发送消息
    pid, offset, err := client.SendMessage(msg)
    if err != nil {
        fmt.Println("send msg failed, err:", err)
        return
    }
    fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

3.1.4. 连接kafka消费消息

package main
import (
    "fmt"
    "github.com/Shopify/sarama"
)
// kafka consumer
func main() {
    consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
    if err != nil {
        fmt.Printf("fail to start consumer, err:%v\n", err)
        return
    }
    partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
    if err != nil {
        fmt.Printf("fail to get list of partition:err%v\n", err)
        return
    }
    fmt.Println(partitionList)
    for partition := range partitionList { // 遍历所有的分区
        // 针对每个分区创建一个对应的分区消费者
        pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
            return
        }
        defer pc.AsyncClose()
        // 异步从每个分区消费信息
        go func(sarama.PartitionConsumer) {
            for msg := range pc.Messages() {
                fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
            }
        }(pc)
    }
}


相关文章
|
5月前
|
SQL 关系型数据库 MySQL
go 通过sql操作mysql
go 通过sql操作mysql
30 1
|
5月前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
1011 0
|
4月前
|
消息中间件 Kafka 数据处理
实时计算 Flink版操作报错合集之使用kafka connector时,报错:java.lang.ClassNotFoundException,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
监控 Go
go语言并发实战——日志收集系统(八) go语言操作etcd以及利用watch实现对键值的监控
go语言并发实战——日志收集系统(八) go语言操作etcd以及利用watch实现对键值的监控
go语言并发实战——日志收集系统(八) go语言操作etcd以及利用watch实现对键值的监控
|
5月前
|
消息中间件 Java Kafka
kafka 磁盘扩容与数据均衡操作代码
Kafka 的磁盘扩容和数据均衡是与保证Kafka集群可用性和性能相关的两个重要方面。在 Kafka 中,分区数据的存储和平衡对集群的运行至关重要。以下是有关Kafka磁盘扩容和数据均衡的一些建议
58 1
|
5月前
|
消息中间件 Kafka
KafKa脚本操作
KafKa脚本操作
|
4月前
|
分布式计算 大数据 Go
MaxCompute操作报错合集之使用go sdk调用GetTunnelEndpoint出现报错:InvalidAction.NotFoundSpecified api is not found,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
运维 Devops API
阿里云云效操作报错合集之直接用API调用可以使用,但是本地用Go代码调用失败,是什么导致的?
本合集将整理呈现用户在使用过程中遇到的报错及其对应的解决办法,包括但不限于账户权限设置错误、项目配置不正确、代码提交冲突、构建任务执行失败、测试环境异常、需求流转阻塞等问题。阿里云云效是一站式企业级研发协同和DevOps平台,为企业提供从需求规划、开发、测试、发布到运维、运营的全流程端到端服务和工具支撑,致力于提升企业的研发效能和创新能力。
|
5月前
|
关系型数据库 MySQL Go
Go语言介绍以及如何在Go语言中操作MySQL数据库
Go语言介绍以及如何在Go语言中操作MySQL数据库
60 3
|
5月前
|
消息中间件 存储 Kafka
go语言并发实战——日志收集系统(二) Kafka简介
go语言并发实战——日志收集系统(二) Kafka简介
119 1
下一篇
无影云桌面