Docker部署kafka|Go操作实践

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: 本文讲解了使用docker-compose部署单节点kafka的流程,并且在文章的后半部分给出了使用Go语言操作kafka进行生产消费的代码案例。

前言

写作本文的背景是由于字节的暑期青训营中,某个项目要求编写一个简易的流处理引擎(flink),开发语言不限,推荐Java,本着好奇心的驱使,我打算使用Go语言进行部分尝试。

既然是流处理引擎,那么首先需要有流式的数据源,一般而言,flink会配合从kafka中获取数据流,先不考虑后续编写引擎的部分,本文将着重于kafka的部署,并且后半段将给出使用Go语言编写kafka生产者消费者

如果你只是希望完成kafka的部署,而不想局限于Go语言,只需要着重阅读文章的前半部分,后文的Go语言操作部分可以给你提供一些思路,你只需要找寻适合语言如Javakafka client库去完成生产者消费者的编写即可。

部署kafka

docker前置知识

下文的实践需要你拥有基本的docker操作能力,如果未曾掌握docker知识点,推荐阅读这两篇文章:

docker | jenkins 实现自动化部署项目,后端躺着把运维的钱挣了!(上)

docker | jenkins 自动化CI/CD,后端躺着把运维的钱挣了!(下)

docker-compose

编写docker-compose.yml,通过docker容器部署单节点kafka

version: '3'
services:
    zookeeper: 
        image: wurstmeister/zookeeper:3.4.6 
        volumes: 
            - ./zookeeper_data:/opt/zookeeper-3.4.6/data 
        container_name: zookeeper 
        ports: 
            - "10002:2181" 
            - "10003:2182" 
        restart: always
​
    kafka: 
        image: wurstmeister/kafka 
        container_name: kafka_01 
        depends_on: 
            - zookeeper 
        ports: 
            - "10004:9092" 
        volumes: 
            - ./kafka_log:/kafka 
        environment: 
            - KAFKA_BROKER_NO=0 
            - KAFKA_BROKER_ID=0 
            - KAFKA_LISTENERS=PLAINTEXT://kafka_01:9092                     # kafka tcp 侦听的ip
            - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://服务器ip:10004        # kafka broker侦听的ip
            - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT 
            - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 
            - KAFKA_HEAP_OPTS=-Xmx512M -Xms16M
        restart: always
    # kafka集群管理面板
    kafka_manager: 
        image: sheepkiller/kafka-manager 
        ports: 
            - "10005:9000" 
        environment: 
            - ZK_HOSTS=zookeeper:2181 
        depends_on: 
            - zookeeper 
            - kafka 
        restart: always

后台运行

docker-compose up -d

docker ps命令查看容器是否启动成功

image-20220804102532075

通过上述docker-compose.yml部署会运行三个容器,选择进入kafka容器

image-20220804101807534

docker exec -it kafka容器id /bin/bash
# 进入kafka目录
cd /opt/kafka_2.13-2.8.1/

在容器内创建topictopic是kafka中数据管理的基本单位,或者说集合,每一个topic可以管理多个partition,编码操作时:你可以往对应kafka服务器ip+port+topic+partition去发送和读取数据。

bin/kafka-topics.sh --create --zookeeper 服务器ip:2181 --replication-factor 1 -partitions 1 --topic test

业务编写

Go语言中连接kafka使用第三方库: github.com/Shopify/sarama

go get github.com/segmentio/kafka-go

sarama库的简易操作可以参照文档(消费者的编写文档中有坑):文档地址

如下使用kafka client库进行编码所涉及的API操作比较简单,流程上或许不够规范,请酌情参考。

producer

文档中生产者只发送了一条数据后就会关闭,这里我改成了每秒钟发送一次。

func main() {
   config := sarama.NewConfig()
   config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
   config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
   config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

   // 构造一个消息
   msg := &sarama.ProducerMessage{}
   msg.Topic = "test"
   // 连接kafka
   client, err := sarama.NewSyncProducer([]string{"82.156.171.8:10004"}, config)
   if err != nil {
      fmt.Println("producer closed, err:", err)
      return
   }
   defer client.Close()
   // 发送消息

   for {
      time.Sleep(time.Second * 1)
      msg.Value = sarama.StringEncoder("this is a test log")
      pid, offset, err := client.SendMessage(msg)
      if err != nil {
         fmt.Println("send msg failed, err:", err)
         return
      }
      fmt.Printf("pid:%v offset:%v\n", pid, offset)
   }

}

consumer

文档中消费者虽然开启了Go协程(类比于Java的线程)去读取kafka的数据,但是由于主程序执行顺序执行完毕后,子协程也会终止,导致子协程还没有读取成功/打印数据,整个程序就已经关闭运行了。

因此我做了一些改动,在子协程退出之前,保持主程序不会退出(使用Go语言的WaitGroup),如果简单粗暴在main函数末尾设置一个很长的程序sleep时间,也是可以实现打印输出的。

func main() {
   consumer, err := sarama.NewConsumer([]string{"82.156.171.8:10004"}, nil)
   if err != nil {
      fmt.Printf("fail to start consumer, err:%v\n", err)
      return
   }
   partitionList, err := consumer.Partitions("test") // 根据topic取到所有的分区
   if err != nil {
      fmt.Printf("fail to get list of partition:err%v\n", err)
      return
   }
   fmt.Println("list = ", partitionList, len(partitionList))
   var wg sync.WaitGroup
   for partition := range partitionList { // 遍历所有的分区
      wg.Add(1)
      // 针对每个分区创建一个对应的分区消费者
      pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
      if err != nil {
         fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
         return
      }
      defer pc.AsyncClose()
      go func(sarama.PartitionConsumer, *sync.WaitGroup) {
         for msg := range pc.Messages() {
            //fmt.Println("打印信息")
            fmt.Println("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
         }
         wg.Done()
      }(pc, &wg)
   }
   wg.Wait()
}

生产&消费

确保kafka容器正常运行,kafka服务器防火墙端口正常开放,运行消费者程序,运行生产者程序。这个生产者每秒向kafka发送一条测试数据:this is a test log,你也可以添加上程序运行时间进行测试。

事实上被客户端消费后的数据并没有马上从kafka删除,这里不多做介绍,各位自行了解~

image-20220804120429711

小结

本文讲解了使用docker-compose部署单节点kafka的流程,后续通过修改docker-compose.yml的内容也可以实现kafka集群的部署,并且,在较新版本的kafka中,集群的部署可以脱离zookeeper,但是经过了解,由于功能并不完善,这里还是选择了基于zookeeper的部署。

相关文章
|
3天前
|
JSON JavaScript 开发者
Composerize神器:自动化转换Docker运行命令至Compose配置,简化容器部署流程
【8月更文挑战第7天】Composerize神器:自动化转换Docker运行命令至Compose配置,简化容器部署流程
Composerize神器:自动化转换Docker运行命令至Compose配置,简化容器部署流程
|
1天前
|
运维 Java Devops
阿里云云效操作报错合集之部署docker时遇到报错,该怎么办
本合集将整理呈现用户在使用过程中遇到的报错及其对应的解决办法,包括但不限于账户权限设置错误、项目配置不正确、代码提交冲突、构建任务执行失败、测试环境异常、需求流转阻塞等问题。阿里云云效是一站式企业级研发协同和DevOps平台,为企业提供从需求规划、开发、测试、发布到运维、运营的全流程端到端服务和工具支撑,致力于提升企业的研发效能和创新能力。
|
4天前
|
Java Nacos Docker
"揭秘!Docker部署Seata遇上Nacos,注册成功却报错?这些坑你不得不防!一网打尽解决秘籍,让你的分布式事务稳如老狗!"
【8月更文挑战第15天】在微服务架构中,Nacos搭配Seata确保数据一致性时,Docker部署Seata后可能出现客户端连接错误,如“can not connect to services-server”。此问题多由网络配置不当、配置文件错误或版本不兼容引起。解决策略包括:调整Docker网络设置确保可达性;检查并修正`file.conf`和`registry.conf`中的Nacos地址和端口;验证Seata与Nacos版本兼容性;修改配置后重启服务;参考官方文档和最佳实践进行配置。通过这些步骤,能有效排除故障,保障服务稳定运行。
13 0
|
4天前
|
存储 Ubuntu Linux
如何在 Ubuntu 上使用 Docker 容器化和部署多个 WordPress 应用程序
如何在 Ubuntu 上使用 Docker 容器化和部署多个 WordPress 应用程序
11 0
|
4天前
|
存储 Shell API
G6VP 与 GraphScope部署问题之拉取并启动 GraphScope 的 Docker 镜像如何解决
G6VP 与 GraphScope部署问题之拉取并启动 GraphScope 的 Docker 镜像如何解决
|
8天前
|
JSON 中间件 Go
go语言后端开发学习(四) —— 在go项目中使用Zap日志库
本文详细介绍了如何在Go项目中集成并配置Zap日志库。首先通过`go get -u go.uber.org/zap`命令安装Zap,接着展示了`Logger`与`Sugared Logger`两种日志记录器的基本用法。随后深入探讨了Zap的高级配置,包括如何将日志输出至文件、调整时间格式、记录调用者信息以及日志分割等。最后,文章演示了如何在gin框架中集成Zap,通过自定义中间件实现了日志记录和异常恢复功能。通过这些步骤,读者可以掌握Zap在实际项目中的应用与定制方法
go语言后端开发学习(四) —— 在go项目中使用Zap日志库
|
1天前
|
安全 Java Go
探索Go语言在高并发环境中的优势
在当今的技术环境中,高并发处理能力成为评估编程语言性能的关键因素之一。Go语言(Golang),作为Google开发的一种编程语言,以其独特的并发处理模型和高效的性能赢得了广泛关注。本文将深入探讨Go语言在高并发环境中的优势,尤其是其goroutine和channel机制如何简化并发编程,提升系统的响应速度和稳定性。通过具体的案例分析和性能对比,本文揭示了Go语言在实际应用中的高效性,并为开发者在选择合适技术栈时提供参考。
|
5天前
|
运维 Kubernetes Go
"解锁K8s二开新姿势!client-go:你不可不知的Go语言神器,让Kubernetes集群管理如虎添翼,秒变运维大神!"
【8月更文挑战第14天】随着云原生技术的发展,Kubernetes (K8s) 成为容器编排的首选。client-go作为K8s的官方Go语言客户端库,通过封装RESTful API,使开发者能便捷地管理集群资源,如Pods和服务。本文介绍client-go基本概念、使用方法及自定义操作。涵盖ClientSet、DynamicClient等客户端实现,以及lister、informer等组件,通过示例展示如何列出集群中的所有Pods。client-go的强大功能助力高效开发和运维。
22 1
|
5天前
|
SQL 关系型数据库 MySQL
Go语言中使用 sqlx 来操作 MySQL
Go语言因其高效的性能和简洁的语法而受到开发者们的欢迎。在开发过程中,数据库操作不可或缺。虽然Go的标准库提供了`database/sql`包支持数据库操作,但使用起来稍显复杂。为此,`sqlx`应运而生,作为`database/sql`的扩展库,它简化了许多常见的数据库任务。本文介绍如何使用`sqlx`包操作MySQL数据库,包括安装所需的包、连接数据库、创建表、插入/查询/更新/删除数据等操作,并展示了如何利用命名参数来进一步简化代码。通过`sqlx`,开发者可以更加高效且简洁地完成数据库交互任务。
12 1
|
5天前
|
算法 NoSQL 中间件
go语言后端开发学习(六) ——基于雪花算法生成用户ID
本文介绍了分布式ID生成中的Snowflake(雪花)算法。为解决用户ID安全性与唯一性问题,Snowflake算法生成的ID具备全局唯一性、递增性、高可用性和高性能性等特点。64位ID由符号位(固定为0)、41位时间戳、10位标识位(含数据中心与机器ID)及12位序列号组成。面对ID重复风险,可通过预分配、动态或统一分配标识位解决。Go语言实现示例展示了如何使用第三方包`sonyflake`生成ID,确保不同节点产生的ID始终唯一。
go语言后端开发学习(六) ——基于雪花算法生成用户ID