go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

环境的搭建

Kafka以及相关组件的下载

我们要实现今天的内容,不可避免的要进行对开发环境的配置,Kafka环境的配置比较繁琐,需要配置JDK,Scala,ZoopKeeper和Kafka,这里我们不做赘述,如果大家不知道如何配置环境,这里我们个大家找了一篇博文供大家参考:


sarama包的安装

今天我们所时机的内容需要用到go语言的第三方包sarama,由于1.19版本后添加了ztcd压缩算法,需要用到cgo,这里我们为了方便考虑选择下载sarama v1.19.0,所以这里我们不能直接使用go get'命令来安装第三方包,我们要使用/go mod文件来实现,下面是主要步骤:

  • 在项目中创建文件夹(博主的是Kafkademo)
  • 打开终端,输入go mod init,进行go.mod文件的初始化:

  • 我们在.mod文件内指定第三方包及其版本:
module Kafkademo
require (
  github.com/Shopify/sarama v1.19
)
go 1.21.6

其实这是已经可以使用命令go mod tidy了,但是博主在做的时候发现,这样会直接清除掉.mod文件里面的内容,所以建议先创建一个producer文件,在文件里面写:

package main
import (
  "fmt"
  "github.com/Shopify/sarama"
)
func main() {
  config := sarama.NewConfig()
  config.Producer.RequiredAcks = sarama.WaitForAll                                
}

这时候再打开终端输入go mod tidy

等待命令运行完毕,打开.mod文件,看到如下内容就OK了:

利用sarama向Kafka发送消息(消息的生产)

代码

package main
import (
  "fmt"
  "github.com/Shopify/sarama"
)
func main() {
  config := sarama.NewConfig()                              //创建config实例
  config.Producer.RequiredAcks = sarama.WaitForAll          //发送完数据需要leader和follow都确认
  config.Producer.Partitioner = sarama.NewRandomPartitioner //创建随机分区
  config.Producer.Return.Successes = true                   //成功交付的消息将在success channel返回
  //创建信息
  msg := &sarama.ProducerMessage{}
  msg.Topic = "web.log"
  msg.Value = sarama.StringEncoder("this is a test log")
  //连接KafKa
  client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
  if err != nil {
    fmt.Println("producer closed, err:", err)
    return
  }
  defer client.Close()
  //发送消息
  pid, offset, err := client.SendMessage(msg)
  if err != nil {
    fmt.Println("send msg failed,err:", err)
    return
  }
  fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

运行过程

  • 首先我们打开终端开起ZooKepper服务
zkServer
  • 然后再Kafka所在文件夹下输入命令运行Kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties

最后运行程序即可,输出结果为:

补充:消息的消费

代码

package main
import (
  "fmt"
  "github.com/Shopify/sarama"
  "time"
)
func main() {
  customer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
  if err != nil {
    fmt.Println("failed init customer,err:", err)
    return
  }
  partitionlist, err := customer.Partitions("web.log-0") //获取topic的所有分区
  if err != nil {
    fmt.Println("failed get partition list,err:", err)
    return
  }
  fmt.Println("partitions:", partitionlist)
  for partition := range partitionlist { // 遍历所有分区
    //根据消费者对象创建一个分区对象
    pc, err := customer.ConsumePartition("web.log", int32(partition), sarama.OffsetNewest)
    if err != nil {
      fmt.Println("failed get partition consumer,err:", err)
      return
    }
    defer pc.Close() // 移动到这里
    go func(consumer sarama.PartitionConsumer) {
      defer pc.AsyncClose() // 移除这行,因为已经在循环结束时关闭了
      for msg := range pc.Messages() {
        fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
      }
    }(pc)
    time.Sleep(time.Second * 10)
  }
}

不过这个不能实现直接的消费,后续我们会对这个进行补充,这里仅作介绍

相关文章
|
21天前
|
存储 监控 算法
防止员工泄密软件中文件访问日志管理的 Go 语言 B + 树算法
B+树凭借高效范围查询与稳定插入删除性能,为防止员工泄密软件提供高响应、可追溯的日志管理方案,显著提升海量文件操作日志的存储与检索效率。
55 2
|
5月前
|
Go 开发者
Go语言包的组织与导入 -《Go语言实战指南》
本章详细介绍了Go语言中的包(Package)概念及其使用方法。包是实现代码模块化、复用性和可维护性的核心单位,内容涵盖包的基本定义、命名规则、组织结构以及导入方式。通过示例说明了如何创建和调用包,并深入讲解了`go.mod`文件对包路径的管理。此外,还提供了多种导入技巧,如别名导入、匿名导入等,帮助开发者优化代码结构与可读性。最后以表格形式总结了关键点,便于快速回顾和应用。
236 61
|
1月前
|
Java 编译器 Go
【Golang】(1)Go的运行流程步骤与包的概念
初次上手Go语言!先来了解它的运行流程吧! 在Go中对包的概念又有怎样不同的见解呢?
91 4
|
4月前
|
JSON 中间件 Go
Go语言实战指南 —— Go中的反射机制:reflect 包使用
Go语言中的反射机制通过`reflect`包实现,允许程序在运行时动态检查变量类型、获取或设置值、调用方法等。它适用于初中级开发者深入理解Go的动态能力,帮助构建通用工具、中间件和ORM系统等。
286 63
|
3月前
|
缓存 监控 安全
告别缓存击穿!Go 语言中的防并发神器:singleflight 包深度解析
在高并发场景中,多个请求同时访问同一资源易导致缓存击穿、数据库压力过大。Go 语言提供的 `singleflight` 包可将相同 key 的请求合并,仅执行一次实际操作,其余请求共享结果,有效降低系统负载。本文详解其原理、实现及典型应用场景,并附示例代码,助你掌握高并发优化技巧。
273 0
|
6月前
|
Go 持续交付 开发者
Go语言包与模块(module)的基本使用-《Go语言实战指南》
本章深入讲解Go语言中的包(Package)和模块(Module)概念。包是代码组织的最小单位,每个`.go`文件属于一个包,通过`import`实现复用;主程序包需命名为`main`。模块是Go 1.11引入的依赖管理机制,支持自动版本管理和私有/远程仓库,无需依赖GOPATH。通过实际示例,如自定义包`mathutil`和第三方模块`gin`的引入,展示其使用方法。常用命令包括`go mod init`、`go mod tidy`等,帮助开发者高效管理项目依赖。最后总结,包负责功能划分,模块实现现代化依赖管理,提升团队协作效率。
264 15
|
7月前
|
存储 NoSQL Redis
阿里面试:Redis 为啥那么快?怎么实现的100W并发?说出了6大架构,面试官跪地: 纯内存 + 尖端结构 + 无锁架构 + EDA架构 + 异步日志 + 集群架构
阿里面试:Redis 为啥那么快?怎么实现的100W并发?说出了6大架构,面试官跪地: 纯内存 + 尖端结构 + 无锁架构 + EDA架构 + 异步日志 + 集群架构
阿里面试:Redis 为啥那么快?怎么实现的100W并发?说出了6大架构,面试官跪地: 纯内存 + 尖端结构 +  无锁架构 +  EDA架构  + 异步日志 + 集群架构
|
8月前
|
监控 Shell Linux
Android调试终极指南:ADB安装+多设备连接+ANR日志抓取全流程解析,覆盖环境变量配置/多设备调试/ANR日志分析全流程,附Win/Mac/Linux三平台解决方案
ADB(Android Debug Bridge)是安卓开发中的重要工具,用于连接电脑与安卓设备,实现文件传输、应用管理、日志抓取等功能。本文介绍了 ADB 的基本概念、安装配置及常用命令。包括:1) 基本命令如 `adb version` 和 `adb devices`;2) 权限操作如 `adb root` 和 `adb shell`;3) APK 操作如安装、卸载应用;4) 文件传输如 `adb push` 和 `adb pull`;5) 日志记录如 `adb logcat`;6) 系统信息获取如屏幕截图和录屏。通过这些功能,用户可高效调试和管理安卓设备。
|
8月前
|
存储 JSON Go
PHP 日志系统的最佳搭档:一个 Go 写的远程日志收集服务
为了不再 SSH 上去翻日志,我写了个 Go 小脚本,用来接收远程日志。PHP 负责记录日志,Go 负责存储和展示,按天存储、支持 API 访问、可远程管理,终于能第一时间知道项目炸了。
141 10
|
11月前
|
Linux Go iOS开发
怎么禁用 vscode 中点击 go 包名时自动打开浏览器跳转到 pkg.go.dev
本文介绍了如何在 VSCode 中禁用点击 Go 包名时自动打开浏览器跳转到 pkg.go.dev 的功能。通过将 gopls 的 `ui.navigation.importShortcut` 设置为 "Definition",可以实现仅跳转到定义处而不打开链接。具体操作步骤包括:打开设置、搜索 gopls、编辑 settings.json 文件并保存更改,最后重启 VSCode 使设置生效。
411 8
怎么禁用 vscode 中点击 go 包名时自动打开浏览器跳转到 pkg.go.dev

热门文章

最新文章