go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

环境的搭建

Kafka以及相关组件的下载

我们要实现今天的内容,不可避免的要进行对开发环境的配置,Kafka环境的配置比较繁琐,需要配置JDK,Scala,ZoopKeeper和Kafka,这里我们不做赘述,如果大家不知道如何配置环境,这里我们个大家找了一篇博文供大家参考:


sarama包的安装

今天我们所时机的内容需要用到go语言的第三方包sarama,由于1.19版本后添加了ztcd压缩算法,需要用到cgo,这里我们为了方便考虑选择下载sarama v1.19.0,所以这里我们不能直接使用go get'命令来安装第三方包,我们要使用/go mod文件来实现,下面是主要步骤:

  • 在项目中创建文件夹(博主的是Kafkademo)
  • 打开终端,输入go mod init,进行go.mod文件的初始化:

  • 我们在.mod文件内指定第三方包及其版本:
module Kafkademo
require (
  github.com/Shopify/sarama v1.19
)
go 1.21.6

其实这是已经可以使用命令go mod tidy了,但是博主在做的时候发现,这样会直接清除掉.mod文件里面的内容,所以建议先创建一个producer文件,在文件里面写:

package main
import (
  "fmt"
  "github.com/Shopify/sarama"
)
func main() {
  config := sarama.NewConfig()
  config.Producer.RequiredAcks = sarama.WaitForAll                                
}

这时候再打开终端输入go mod tidy

等待命令运行完毕,打开.mod文件,看到如下内容就OK了:

利用sarama向Kafka发送消息(消息的生产)

代码

package main
import (
  "fmt"
  "github.com/Shopify/sarama"
)
func main() {
  config := sarama.NewConfig()                              //创建config实例
  config.Producer.RequiredAcks = sarama.WaitForAll          //发送完数据需要leader和follow都确认
  config.Producer.Partitioner = sarama.NewRandomPartitioner //创建随机分区
  config.Producer.Return.Successes = true                   //成功交付的消息将在success channel返回
  //创建信息
  msg := &sarama.ProducerMessage{}
  msg.Topic = "web.log"
  msg.Value = sarama.StringEncoder("this is a test log")
  //连接KafKa
  client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
  if err != nil {
    fmt.Println("producer closed, err:", err)
    return
  }
  defer client.Close()
  //发送消息
  pid, offset, err := client.SendMessage(msg)
  if err != nil {
    fmt.Println("send msg failed,err:", err)
    return
  }
  fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

运行过程

  • 首先我们打开终端开起ZooKepper服务
zkServer
  • 然后再Kafka所在文件夹下输入命令运行Kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties

最后运行程序即可,输出结果为:

补充:消息的消费

代码

package main
import (
  "fmt"
  "github.com/Shopify/sarama"
  "time"
)
func main() {
  customer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
  if err != nil {
    fmt.Println("failed init customer,err:", err)
    return
  }
  partitionlist, err := customer.Partitions("web.log-0") //获取topic的所有分区
  if err != nil {
    fmt.Println("failed get partition list,err:", err)
    return
  }
  fmt.Println("partitions:", partitionlist)
  for partition := range partitionlist { // 遍历所有分区
    //根据消费者对象创建一个分区对象
    pc, err := customer.ConsumePartition("web.log", int32(partition), sarama.OffsetNewest)
    if err != nil {
      fmt.Println("failed get partition consumer,err:", err)
      return
    }
    defer pc.Close() // 移动到这里
    go func(consumer sarama.PartitionConsumer) {
      defer pc.AsyncClose() // 移除这行,因为已经在循环结束时关闭了
      for msg := range pc.Messages() {
        fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
      }
    }(pc)
    time.Sleep(time.Second * 10)
  }
}

不过这个不能实现直接的消费,后续我们会对这个进行补充,这里仅作介绍

相关文章
|
12天前
|
Go
go的并发初体验、加锁、异步
go的并发初体验、加锁、异步
10 0
|
7天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【6月更文挑战第30天】Apache Kafka是分布式消息系统,用于高吞吐量的发布订阅。在Java中,开发者使用Kafka的客户端库创建生产者和消费者。生产者发送序列化消息到主题,消费者通过订阅和跟踪偏移量消费消息。Kafka以持久化、容灾和顺序写入优化I/O。Java示例代码展示了如何创建并发送/接收消息。通过分区、消费者组和压缩等策略,Kafka在高并发场景下可被优化。
17 1
|
4天前
|
消息中间件 NoSQL Kafka
基于Kafka的nginx日志收集分析与监控平台(3)
基于Kafka的nginx日志收集分析与监控平台(3)
|
4天前
|
消息中间件 监控 Kafka
基于Kafka的nginx日志收集分析与监控平台(2)
基于Kafka的nginx日志收集分析与监控平台(2)
|
4天前
|
消息中间件 负载均衡 应用服务中间件
基于Kafka的nginx日志收集分析与监控平台(1)
基于Kafka的nginx日志收集分析与监控平台(1)
|
4天前
|
消息中间件 NoSQL Kafka
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(2)
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(2)
|
4天前
|
消息中间件 应用服务中间件 Kafka
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(1)
日志收集平台项目nginx、kafka、zookeeper、filebeat搭建的基本配置(1)
|
6天前
|
消息中间件 存储 Java
Apache Kafka是分布式消息系统,用于高吞吐量的发布订阅
【7月更文挑战第1天】Apache Kafka是分布式消息系统,用于高吞吐量的发布订阅。在Java中,开发者使用Kafka的客户端库创建生产者和消费者。生产者发送消息到主题,消费者订阅并消费。Kafka提供消息持久化、容灾机制,支持分区和复制以确保高可用性。通过优化如分区、批处理和消费者策略,可适应高并发场景。简单的Java示例展示了如何创建和交互消息。
13 0
|
18天前
|
监控 Go
go语言并发实战——日志收集系统(十一)基于etcd来监视配置文件的变化
go语言并发实战——日志收集系统(十一)基于etcd来监视配置文件的变化
|
18天前
|
监控 Go
go语言并发实战——日志收集系统(十) 重构tailfile模块实现同时监控多个日志文件
go语言并发实战——日志收集系统(十) 重构tailfile模块实现同时监控多个日志文件