go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

环境的搭建

Kafka以及相关组件的下载

我们要实现今天的内容,不可避免的要进行对开发环境的配置,Kafka环境的配置比较繁琐,需要配置JDK,Scala,ZoopKeeper和Kafka,这里我们不做赘述,如果大家不知道如何配置环境,这里我们个大家找了一篇博文供大家参考:


sarama包的安装

今天我们所时机的内容需要用到go语言的第三方包sarama,由于1.19版本后添加了ztcd压缩算法,需要用到cgo,这里我们为了方便考虑选择下载sarama v1.19.0,所以这里我们不能直接使用go get'命令来安装第三方包,我们要使用/go mod文件来实现,下面是主要步骤:

  • 在项目中创建文件夹(博主的是Kafkademo)
  • 打开终端,输入go mod init,进行go.mod文件的初始化:

  • 我们在.mod文件内指定第三方包及其版本:
module Kafkademo
require (
  github.com/Shopify/sarama v1.19
)
go 1.21.6

其实这是已经可以使用命令go mod tidy了,但是博主在做的时候发现,这样会直接清除掉.mod文件里面的内容,所以建议先创建一个producer文件,在文件里面写:

package main
import (
  "fmt"
  "github.com/Shopify/sarama"
)
func main() {
  config := sarama.NewConfig()
  config.Producer.RequiredAcks = sarama.WaitForAll                                
}

这时候再打开终端输入go mod tidy

等待命令运行完毕,打开.mod文件,看到如下内容就OK了:

利用sarama向Kafka发送消息(消息的生产)

代码

package main
import (
  "fmt"
  "github.com/Shopify/sarama"
)
func main() {
  config := sarama.NewConfig()                              //创建config实例
  config.Producer.RequiredAcks = sarama.WaitForAll          //发送完数据需要leader和follow都确认
  config.Producer.Partitioner = sarama.NewRandomPartitioner //创建随机分区
  config.Producer.Return.Successes = true                   //成功交付的消息将在success channel返回
  //创建信息
  msg := &sarama.ProducerMessage{}
  msg.Topic = "web.log"
  msg.Value = sarama.StringEncoder("this is a test log")
  //连接KafKa
  client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
  if err != nil {
    fmt.Println("producer closed, err:", err)
    return
  }
  defer client.Close()
  //发送消息
  pid, offset, err := client.SendMessage(msg)
  if err != nil {
    fmt.Println("send msg failed,err:", err)
    return
  }
  fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

运行过程

  • 首先我们打开终端开起ZooKepper服务
zkServer
  • 然后再Kafka所在文件夹下输入命令运行Kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties

最后运行程序即可,输出结果为:

补充:消息的消费

代码

package main
import (
  "fmt"
  "github.com/Shopify/sarama"
  "time"
)
func main() {
  customer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
  if err != nil {
    fmt.Println("failed init customer,err:", err)
    return
  }
  partitionlist, err := customer.Partitions("web.log-0") //获取topic的所有分区
  if err != nil {
    fmt.Println("failed get partition list,err:", err)
    return
  }
  fmt.Println("partitions:", partitionlist)
  for partition := range partitionlist { // 遍历所有分区
    //根据消费者对象创建一个分区对象
    pc, err := customer.ConsumePartition("web.log", int32(partition), sarama.OffsetNewest)
    if err != nil {
      fmt.Println("failed get partition consumer,err:", err)
      return
    }
    defer pc.Close() // 移动到这里
    go func(consumer sarama.PartitionConsumer) {
      defer pc.AsyncClose() // 移除这行,因为已经在循环结束时关闭了
      for msg := range pc.Messages() {
        fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
      }
    }(pc)
    time.Sleep(time.Second * 10)
  }
}

不过这个不能实现直接的消费,后续我们会对这个进行补充,这里仅作介绍

相关文章
|
1月前
|
存储 前端开发 数据可视化
Grafana Loki,轻量级日志系统
本文介绍了基于Grafana、Loki和Alloy构建的轻量级日志系统。Loki是一个由Grafana Labs开发的日志聚合系统,具备高可用性和多租户支持,专注于日志而非指标,通过标签索引而非内容索引实现高效存储。Alloy则是用于收集和转发日志至Loki的强大工具。文章详细描述了系统的架构、组件及其工作流程,并提供了快速搭建指南,包括准备步骤、部署命令及验证方法。此外,还展示了如何使用Grafana查看日志,以及一些基本的LogQL查询示例。最后,作者探讨了Loki架构的独特之处,提出了“巨型单体模块化”的概念,即一个应用既可单体部署也可分布式部署,整体协同实现全部功能。
444 69
Grafana Loki,轻量级日志系统
|
4月前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
225 7
|
4月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
170 5
|
14天前
|
存储 缓存 安全
Go 语言中的 Sync.Map 详解:并发安全的 Map 实现
`sync.Map` 是 Go 语言中用于并发安全操作的 Map 实现,适用于读多写少的场景。它通过两个底层 Map(`read` 和 `dirty`)实现读写分离,提供高效的读性能。主要方法包括 `Store`、`Load`、`Delete` 等。在大量写入时性能可能下降,需谨慎选择使用场景。
|
2月前
|
存储 安全 Java
Spring Boot 3 集成Spring AOP实现系统日志记录
本文介绍了如何在Spring Boot 3中集成Spring AOP实现系统日志记录功能。通过定义`SysLog`注解和配置相应的AOP切面,可以在方法执行前后自动记录日志信息,包括操作的开始时间、结束时间、请求参数、返回结果、异常信息等,并将这些信息保存到数据库中。此外,还使用了`ThreadLocal`变量来存储每个线程独立的日志数据,确保线程安全。文中还展示了项目实战中的部分代码片段,以及基于Spring Boot 3 + Vue 3构建的快速开发框架的简介与内置功能列表。此框架结合了当前主流技术栈,提供了用户管理、权限控制、接口文档自动生成等多项实用特性。
101 8
|
3月前
|
存储 监控 安全
什么是事件日志管理系统?事件日志管理系统有哪些用处?
事件日志管理系统是IT安全的重要工具,用于集中收集、分析和解释来自组织IT基础设施各组件的事件日志,如防火墙、路由器、交换机等,帮助提升网络安全、实现主动威胁检测和促进合规性。系统支持多种日志类型,包括Windows事件日志、Syslog日志和应用程序日志,通过实时监测、告警及可视化分析,为企业提供强大的安全保障。然而,实施过程中也面临数据量大、日志管理和分析复杂等挑战。EventLog Analyzer作为一款高效工具,不仅提供实时监测与告警、可视化分析和报告功能,还支持多种合规性报告,帮助企业克服挑战,提升网络安全水平。
131 2
|
4月前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
149 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
4月前
|
存储 负载均衡 监控
如何利用Go语言的高效性、并发支持、简洁性和跨平台性等优势,通过合理设计架构、实现负载均衡、构建容错机制、建立监控体系、优化数据存储及实施服务治理等步骤,打造稳定可靠的服务架构。
在数字化时代,构建高可靠性服务架构至关重要。本文探讨了如何利用Go语言的高效性、并发支持、简洁性和跨平台性等优势,通过合理设计架构、实现负载均衡、构建容错机制、建立监控体系、优化数据存储及实施服务治理等步骤,打造稳定可靠的服务架构。
96 1
|
4月前
|
Go 调度 开发者
探索Go语言中的并发模式:goroutine与channel
在本文中,我们将深入探讨Go语言中的核心并发特性——goroutine和channel。不同于传统的并发模型,Go语言的并发机制以其简洁性和高效性著称。本文将通过实际代码示例,展示如何利用goroutine实现轻量级的并发执行,以及如何通过channel安全地在goroutine之间传递数据。摘要部分将概述这些概念,并提示读者本文将提供哪些具体的技术洞见。
|
4月前
|
存储 Linux Docker
centos系统清理docker日志文件
通过以上方法,可以有效清理和管理CentOS系统中的Docker日志文件,防止日志文件占用过多磁盘空间。选择合适的方法取决于具体的应用场景和需求,可以结合手动清理、logrotate和调整日志驱动等多种方式,确保系统的高效运行。
395 2