Docker部署kafka|Go操作实践

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 本文讲解了使用docker-compose部署单节点kafka的流程,并且在文章的后半部分给出了使用Go语言操作kafka进行生产消费的代码案例。

前言

写作本文的背景是由于字节的暑期青训营中,某个项目要求编写一个简易的流处理引擎(flink),开发语言不限,推荐Java,本着好奇心的驱使,我打算使用Go语言进行部分尝试。

既然是流处理引擎,那么首先需要有流式的数据源,一般而言,flink会配合从kafka中获取数据流,先不考虑后续编写引擎的部分,本文将着重于kafka的部署,并且后半段将给出使用Go语言编写kafka生产者消费者

如果你只是希望完成kafka的部署,而不想局限于Go语言,只需要着重阅读文章的前半部分,后文的Go语言操作部分可以给你提供一些思路,你只需要找寻适合语言如Javakafka client库去完成生产者消费者的编写即可。

部署kafka

docker前置知识

下文的实践需要你拥有基本的docker操作能力,如果未曾掌握docker知识点,推荐阅读这两篇文章:

docker | jenkins 实现自动化部署项目,后端躺着把运维的钱挣了!(上)

docker | jenkins 自动化CI/CD,后端躺着把运维的钱挣了!(下)

docker-compose

编写docker-compose.yml,通过docker容器部署单节点kafka

version: '3'
services:
    zookeeper: 
        image: wurstmeister/zookeeper:3.4.6 
        volumes: 
            - ./zookeeper_data:/opt/zookeeper-3.4.6/data 
        container_name: zookeeper 
        ports: 
            - "10002:2181" 
            - "10003:2182" 
        restart: always
​
    kafka: 
        image: wurstmeister/kafka 
        container_name: kafka_01 
        depends_on: 
            - zookeeper 
        ports: 
            - "10004:9092" 
        volumes: 
            - ./kafka_log:/kafka 
        environment: 
            - KAFKA_BROKER_NO=0 
            - KAFKA_BROKER_ID=0 
            - KAFKA_LISTENERS=PLAINTEXT://kafka_01:9092                     # kafka tcp 侦听的ip
            - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://服务器ip:10004        # kafka broker侦听的ip
            - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT 
            - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 
            - KAFKA_HEAP_OPTS=-Xmx512M -Xms16M
        restart: always
    # kafka集群管理面板
    kafka_manager: 
        image: sheepkiller/kafka-manager 
        ports: 
            - "10005:9000" 
        environment: 
            - ZK_HOSTS=zookeeper:2181 
        depends_on: 
            - zookeeper 
            - kafka 
        restart: always

后台运行

docker-compose up -d

docker ps命令查看容器是否启动成功

image-20220804102532075

通过上述docker-compose.yml部署会运行三个容器,选择进入kafka容器

image-20220804101807534

docker exec -it kafka容器id /bin/bash
# 进入kafka目录
cd /opt/kafka_2.13-2.8.1/

在容器内创建topictopic是kafka中数据管理的基本单位,或者说集合,每一个topic可以管理多个partition,编码操作时:你可以往对应kafka服务器ip+port+topic+partition去发送和读取数据。

bin/kafka-topics.sh --create --zookeeper 服务器ip:2181 --replication-factor 1 -partitions 1 --topic test

业务编写

Go语言中连接kafka使用第三方库: github.com/Shopify/sarama

go get github.com/segmentio/kafka-go

sarama库的简易操作可以参照文档(消费者的编写文档中有坑):文档地址

如下使用kafka client库进行编码所涉及的API操作比较简单,流程上或许不够规范,请酌情参考。

producer

文档中生产者只发送了一条数据后就会关闭,这里我改成了每秒钟发送一次。

func main() {
   config := sarama.NewConfig()
   config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
   config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
   config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

   // 构造一个消息
   msg := &sarama.ProducerMessage{}
   msg.Topic = "test"
   // 连接kafka
   client, err := sarama.NewSyncProducer([]string{"82.156.171.8:10004"}, config)
   if err != nil {
      fmt.Println("producer closed, err:", err)
      return
   }
   defer client.Close()
   // 发送消息

   for {
      time.Sleep(time.Second * 1)
      msg.Value = sarama.StringEncoder("this is a test log")
      pid, offset, err := client.SendMessage(msg)
      if err != nil {
         fmt.Println("send msg failed, err:", err)
         return
      }
      fmt.Printf("pid:%v offset:%v\n", pid, offset)
   }

}

consumer

文档中消费者虽然开启了Go协程(类比于Java的线程)去读取kafka的数据,但是由于主程序执行顺序执行完毕后,子协程也会终止,导致子协程还没有读取成功/打印数据,整个程序就已经关闭运行了。

因此我做了一些改动,在子协程退出之前,保持主程序不会退出(使用Go语言的WaitGroup),如果简单粗暴在main函数末尾设置一个很长的程序sleep时间,也是可以实现打印输出的。

func main() {
   consumer, err := sarama.NewConsumer([]string{"82.156.171.8:10004"}, nil)
   if err != nil {
      fmt.Printf("fail to start consumer, err:%v\n", err)
      return
   }
   partitionList, err := consumer.Partitions("test") // 根据topic取到所有的分区
   if err != nil {
      fmt.Printf("fail to get list of partition:err%v\n", err)
      return
   }
   fmt.Println("list = ", partitionList, len(partitionList))
   var wg sync.WaitGroup
   for partition := range partitionList { // 遍历所有的分区
      wg.Add(1)
      // 针对每个分区创建一个对应的分区消费者
      pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
      if err != nil {
         fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
         return
      }
      defer pc.AsyncClose()
      go func(sarama.PartitionConsumer, *sync.WaitGroup) {
         for msg := range pc.Messages() {
            //fmt.Println("打印信息")
            fmt.Println("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
         }
         wg.Done()
      }(pc, &wg)
   }
   wg.Wait()
}

生产&消费

确保kafka容器正常运行,kafka服务器防火墙端口正常开放,运行消费者程序,运行生产者程序。这个生产者每秒向kafka发送一条测试数据:this is a test log,你也可以添加上程序运行时间进行测试。

事实上被客户端消费后的数据并没有马上从kafka删除,这里不多做介绍,各位自行了解~

image-20220804120429711

小结

本文讲解了使用docker-compose部署单节点kafka的流程,后续通过修改docker-compose.yml的内容也可以实现kafka集群的部署,并且,在较新版本的kafka中,集群的部署可以脱离zookeeper,但是经过了解,由于功能并不完善,这里还是选择了基于zookeeper的部署。

相关文章
|
1月前
|
监控 Java Go
无感改造,完美监控:Docker 多阶段构建 Go 应用无侵入观测
本文将介绍一种基于 Docker 多阶段构建的无侵入 Golang 应用观测方法,通过此方法用户无需对 Golang 应用源代码或者编译指令做任何改造,即可零成本为 Golang 应用注入可观测能力。
|
1月前
|
Ubuntu 安全 Docker
Ubuntu下部署及操作Docker技巧
以上就是在Ubuntu下部署及操作Docker的具体步骤。但这只是冰山一角,Docker的魅力远不仅如此。你可以将其视为存放各种工具的小箱子,随时随地取用,极大地提升工作效率。你也可以私人订制,适应不同的开发环境,就像一个拥有各种口味冰淇淋的冰箱,满足各种各样的需求。好了,现在你已经掌握了基本的Docker运用技巧,快去尝试使用吧!记住,沉浸在探索中,你会找到无尽的乐趣和满满的收获。
102 23
|
2月前
|
Ubuntu 关系型数据库 MySQL
容器技术实践:在Ubuntu上使用Docker安装MySQL的步骤。
通过以上的操作,你已经步入了Docker和MySQL的世界,享受了容器技术给你带来的便利。这个旅程中你可能会遇到各种挑战,但是只要你沿着我们划定的路线行进,你就一定可以达到目的地。这就是Ubuntu、Docker和MySQL的灵魂所在,它们为你开辟了一条通往新探索的道路,带你亲身感受到了技术的力量。欢迎在Ubuntu的广阔大海中探索,用Docker技术引领你的航行,随时准备感受新技术带来的震撼和乐趣。
124 16
|
2月前
|
安全 API 算法框架/工具
大模型文件Docker镜像化部署技术详解
大模型文件Docker镜像化部署技术详解
266 2
|
2月前
|
JSON 运维 Ubuntu
在Docker上部署Ollama+AnythingLLM完成本地LLM Agent部署
通过以上步骤,您可以成功在Docker上部署Ollama和AnythingLLM,实现本地LLM Agent的功能。在部署过程中,确保环境和配置正确,以避免不必要的问题。希望本文能够帮助您顺利完成部署,并在本地环境中高效地使用LLM模型。
784 8
|
2月前
|
Docker Python 容器
Docker——阿里云服务器使用Docker部署python项目全程小记
本文记录了我在阿里云服务器上使用Docker部署python项目(flask为例)的全过程,在这里记录和分享一下,希望可以给大家提供一些参考。
231 0
|
1月前
|
存储 SQL 关系型数据库
docker部署n9e开源版本7.4.0
n9e开源版本7.4.0
66 0
|
3月前
|
运维 监控 算法
监控局域网其他电脑:Go 语言迪杰斯特拉算法的高效应用
在信息化时代,监控局域网成为网络管理与安全防护的关键需求。本文探讨了迪杰斯特拉(Dijkstra)算法在监控局域网中的应用,通过计算最短路径优化数据传输和故障检测。文中提供了使用Go语言实现的代码例程,展示了如何高效地进行网络监控,确保局域网的稳定运行和数据安全。迪杰斯特拉算法能减少传输延迟和带宽消耗,及时发现并处理网络故障,适用于复杂网络环境下的管理和维护。
|
3月前
|
编译器 Go
揭秘 Go 语言中空结构体的强大用法
Go 语言中的空结构体 `struct{}` 不包含任何字段,不占用内存空间。它在实际编程中有多种典型用法:1) 结合 map 实现集合(set)类型;2) 与 channel 搭配用于信号通知;3) 申请超大容量的 Slice 和 Array 以节省内存;4) 作为接口实现时明确表示不关注值。此外,需要注意的是,空结构体作为字段时可能会因内存对齐原因占用额外空间。建议将空结构体放在外层结构体的第一个字段以优化内存使用。
|
3月前
|
存储 Go
Go 语言入门指南:切片
Golang中的切片(Slice)是基于数组的动态序列,支持变长操作。它由指针、长度和容量三部分组成,底层引用一个连续的数组片段。切片提供灵活的增减元素功能,语法形式为`[]T`,其中T为元素类型。相比固定长度的数组,切片更常用,允许动态调整大小,并且多个切片可以共享同一底层数组。通过内置的`make`函数可创建指定长度和容量的切片。需要注意的是,切片不能直接比较,只能与`nil`比较,且空切片的长度为0。
Go 语言入门指南:切片