go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

环境的搭建

Kafka以及相关组件的下载

我们要实现今天的内容,不可避免的要进行对开发环境的配置,Kafka环境的配置比较繁琐,需要配置JDK,Scala,ZoopKeeper和Kafka,这里我们不做赘述,如果大家不知道如何配置环境,这里我们个大家找了一篇博文供大家参考:


sarama包的安装

今天我们所时机的内容需要用到go语言的第三方包sarama,由于1.19版本后添加了ztcd压缩算法,需要用到cgo,这里我们为了方便考虑选择下载sarama v1.19.0,所以这里我们不能直接使用go get'命令来安装第三方包,我们要使用/go mod文件来实现,下面是主要步骤:

  • 在项目中创建文件夹(博主的是Kafkademo)
  • 打开终端,输入go mod init,进行go.mod文件的初始化:

  • 我们在.mod文件内指定第三方包及其版本:
module Kafkademo
require (
  github.com/Shopify/sarama v1.19
)
go 1.21.6

其实这是已经可以使用命令go mod tidy了,但是博主在做的时候发现,这样会直接清除掉.mod文件里面的内容,所以建议先创建一个producer文件,在文件里面写:

package main
import (
  "fmt"
  "github.com/Shopify/sarama"
)
func main() {
  config := sarama.NewConfig()
  config.Producer.RequiredAcks = sarama.WaitForAll                                
}

这时候再打开终端输入go mod tidy

等待命令运行完毕,打开.mod文件,看到如下内容就OK了:

利用sarama向Kafka发送消息(消息的生产)

代码

package main
import (
  "fmt"
  "github.com/Shopify/sarama"
)
func main() {
  config := sarama.NewConfig()                              //创建config实例
  config.Producer.RequiredAcks = sarama.WaitForAll          //发送完数据需要leader和follow都确认
  config.Producer.Partitioner = sarama.NewRandomPartitioner //创建随机分区
  config.Producer.Return.Successes = true                   //成功交付的消息将在success channel返回
  //创建信息
  msg := &sarama.ProducerMessage{}
  msg.Topic = "web.log"
  msg.Value = sarama.StringEncoder("this is a test log")
  //连接KafKa
  client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
  if err != nil {
    fmt.Println("producer closed, err:", err)
    return
  }
  defer client.Close()
  //发送消息
  pid, offset, err := client.SendMessage(msg)
  if err != nil {
    fmt.Println("send msg failed,err:", err)
    return
  }
  fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

运行过程

  • 首先我们打开终端开起ZooKepper服务
zkServer
  • 然后再Kafka所在文件夹下输入命令运行Kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties

最后运行程序即可,输出结果为:

补充:消息的消费

代码

package main
import (
  "fmt"
  "github.com/Shopify/sarama"
  "time"
)
func main() {
  customer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
  if err != nil {
    fmt.Println("failed init customer,err:", err)
    return
  }
  partitionlist, err := customer.Partitions("web.log-0") //获取topic的所有分区
  if err != nil {
    fmt.Println("failed get partition list,err:", err)
    return
  }
  fmt.Println("partitions:", partitionlist)
  for partition := range partitionlist { // 遍历所有分区
    //根据消费者对象创建一个分区对象
    pc, err := customer.ConsumePartition("web.log", int32(partition), sarama.OffsetNewest)
    if err != nil {
      fmt.Println("failed get partition consumer,err:", err)
      return
    }
    defer pc.Close() // 移动到这里
    go func(consumer sarama.PartitionConsumer) {
      defer pc.AsyncClose() // 移除这行,因为已经在循环结束时关闭了
      for msg := range pc.Messages() {
        fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
      }
    }(pc)
    time.Sleep(time.Second * 10)
  }
}

不过这个不能实现直接的消费,后续我们会对这个进行补充,这里仅作介绍

相关文章
Grafana Loki,轻量级日志系统
本文介绍了基于Grafana、Loki和Alloy构建的轻量级日志系统。Loki是一个由Grafana Labs开发的日志聚合系统,具备高可用性和多租户支持,专注于日志而非指标,通过标签索引而非内容索引实现高效存储。Alloy则是用于收集和转发日志至Loki的强大工具。文章详细描述了系统的架构、组件及其工作流程,并提供了快速搭建指南,包括准备步骤、部署命令及验证方法。此外,还展示了如何使用Grafana查看日志,以及一些基本的LogQL查询示例。最后,作者探讨了Loki架构的独特之处,提出了“巨型单体模块化”的概念,即一个应用既可单体部署也可分布式部署,整体协同实现全部功能。
321 69
Grafana Loki,轻量级日志系统
|
3月前
|
用 Zap 轻松搞定 Go 语言中的结构化日志
在现代应用程序开发中,日志记录至关重要。Go 语言中有许多日志库,而 Zap 因其高性能和灵活性脱颖而出。本文详细介绍如何在 Go 项目中使用 Zap 进行结构化日志记录,并展示如何定制日志输出,满足生产环境需求。通过基础示例、SugaredLogger 的便捷使用以及自定义日志配置,帮助你在实际开发中高效管理日志。
105 1
Spring Boot 3 集成Spring AOP实现系统日志记录
本文介绍了如何在Spring Boot 3中集成Spring AOP实现系统日志记录功能。通过定义`SysLog`注解和配置相应的AOP切面,可以在方法执行前后自动记录日志信息,包括操作的开始时间、结束时间、请求参数、返回结果、异常信息等,并将这些信息保存到数据库中。此外,还使用了`ThreadLocal`变量来存储每个线程独立的日志数据,确保线程安全。文中还展示了项目实战中的部分代码片段,以及基于Spring Boot 3 + Vue 3构建的快速开发框架的简介与内置功能列表。此框架结合了当前主流技术栈,提供了用户管理、权限控制、接口文档自动生成等多项实用特性。
87 8
go高并发之路——消息中间件kafka
本文介绍了高并发业务中的流量高峰应对措施,重点讲解了Kafka消息中间件的使用,包括常用的Go语言库sarama及其版本问题,以及Kafka的版本选择建议。文中还详细解释了Kafka生产者的四种分区策略:轮询、随机、按Key和指定分区,并提供了相应的代码示例。
go高并发之路——消息中间件kafka
什么是事件日志管理系统?事件日志管理系统有哪些用处?
事件日志管理系统是IT安全的重要工具,用于集中收集、分析和解释来自组织IT基础设施各组件的事件日志,如防火墙、路由器、交换机等,帮助提升网络安全、实现主动威胁检测和促进合规性。系统支持多种日志类型,包括Windows事件日志、Syslog日志和应用程序日志,通过实时监测、告警及可视化分析,为企业提供强大的安全保障。然而,实施过程中也面临数据量大、日志管理和分析复杂等挑战。EventLog Analyzer作为一款高效工具,不仅提供实时监测与告警、可视化分析和报告功能,还支持多种合规性报告,帮助企业克服挑战,提升网络安全水平。
118 2
centos系统清理docker日志文件
通过以上方法,可以有效清理和管理CentOS系统中的Docker日志文件,防止日志文件占用过多磁盘空间。选择合适的方法取决于具体的应用场景和需求,可以结合手动清理、logrotate和调整日志驱动等多种方式,确保系统的高效运行。
353 2
|
4月前
|
告别简陋:Java日志系统的最佳实践
【10月更文挑战第19天】 在Java开发中,`System.out.println()` 是最基本的输出方法,但它在实际项目中往往被认为是不专业和不足够的。本文将探讨为什么在现代Java应用中应该避免使用 `System.out.println()`,并介绍几种更先进的日志解决方案。
106 1
Linux系统日志管理
Linux系统日志管理
94 3
#637481#基于django和neo4j的日志分析系统
#637481#基于django和neo4j的日志分析系统
62 4
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
72 1

热门文章

最新文章