go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

环境的搭建

Kafka以及相关组件的下载

我们要实现今天的内容,不可避免的要进行对开发环境的配置,Kafka环境的配置比较繁琐,需要配置JDK,Scala,ZoopKeeper和Kafka,这里我们不做赘述,如果大家不知道如何配置环境,这里我们个大家找了一篇博文供大家参考:


sarama包的安装

今天我们所时机的内容需要用到go语言的第三方包sarama,由于1.19版本后添加了ztcd压缩算法,需要用到cgo,这里我们为了方便考虑选择下载sarama v1.19.0,所以这里我们不能直接使用go get'命令来安装第三方包,我们要使用/go mod文件来实现,下面是主要步骤:

  • 在项目中创建文件夹(博主的是Kafkademo)
  • 打开终端,输入go mod init,进行go.mod文件的初始化:

  • 我们在.mod文件内指定第三方包及其版本:
module Kafkademo
require (
  github.com/Shopify/sarama v1.19
)
go 1.21.6

其实这是已经可以使用命令go mod tidy了,但是博主在做的时候发现,这样会直接清除掉.mod文件里面的内容,所以建议先创建一个producer文件,在文件里面写:

package main
import (
  "fmt"
  "github.com/Shopify/sarama"
)
func main() {
  config := sarama.NewConfig()
  config.Producer.RequiredAcks = sarama.WaitForAll                                
}

这时候再打开终端输入go mod tidy

等待命令运行完毕,打开.mod文件,看到如下内容就OK了:

利用sarama向Kafka发送消息(消息的生产)

代码

package main
import (
  "fmt"
  "github.com/Shopify/sarama"
)
func main() {
  config := sarama.NewConfig()                              //创建config实例
  config.Producer.RequiredAcks = sarama.WaitForAll          //发送完数据需要leader和follow都确认
  config.Producer.Partitioner = sarama.NewRandomPartitioner //创建随机分区
  config.Producer.Return.Successes = true                   //成功交付的消息将在success channel返回
  //创建信息
  msg := &sarama.ProducerMessage{}
  msg.Topic = "web.log"
  msg.Value = sarama.StringEncoder("this is a test log")
  //连接KafKa
  client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
  if err != nil {
    fmt.Println("producer closed, err:", err)
    return
  }
  defer client.Close()
  //发送消息
  pid, offset, err := client.SendMessage(msg)
  if err != nil {
    fmt.Println("send msg failed,err:", err)
    return
  }
  fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

运行过程

  • 首先我们打开终端开起ZooKepper服务
zkServer
  • 然后再Kafka所在文件夹下输入命令运行Kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties

最后运行程序即可,输出结果为:

补充:消息的消费

代码

package main
import (
  "fmt"
  "github.com/Shopify/sarama"
  "time"
)
func main() {
  customer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
  if err != nil {
    fmt.Println("failed init customer,err:", err)
    return
  }
  partitionlist, err := customer.Partitions("web.log-0") //获取topic的所有分区
  if err != nil {
    fmt.Println("failed get partition list,err:", err)
    return
  }
  fmt.Println("partitions:", partitionlist)
  for partition := range partitionlist { // 遍历所有分区
    //根据消费者对象创建一个分区对象
    pc, err := customer.ConsumePartition("web.log", int32(partition), sarama.OffsetNewest)
    if err != nil {
      fmt.Println("failed get partition consumer,err:", err)
      return
    }
    defer pc.Close() // 移动到这里
    go func(consumer sarama.PartitionConsumer) {
      defer pc.AsyncClose() // 移除这行,因为已经在循环结束时关闭了
      for msg := range pc.Messages() {
        fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
      }
    }(pc)
    time.Sleep(time.Second * 10)
  }
}

不过这个不能实现直接的消费,后续我们会对这个进行补充,这里仅作介绍

相关文章
|
14天前
|
安全 大数据 Go
深入探索Go语言并发编程:Goroutines与Channels的实战应用
在当今高性能、高并发的应用需求下,Go语言以其独特的并发模型——Goroutines和Channels,成为了众多开发者眼中的璀璨明星。本文不仅阐述了Goroutines作为轻量级线程的优势,还深入剖析了Channels作为Goroutines间通信的桥梁,如何优雅地解决并发编程中的复杂问题。通过实战案例,我们将展示如何利用这些特性构建高效、可扩展的并发系统,同时探讨并发编程中常见的陷阱与最佳实践,为读者打开Go语言并发编程的广阔视野。
|
16天前
|
存储 Go
Golang语言基于go module方式管理包(package)
这篇文章详细介绍了Golang语言中基于go module方式管理包(package)的方法,包括Go Modules的发展历史、go module的介绍、常用命令和操作步骤,并通过代码示例展示了如何初始化项目、引入第三方包、组织代码结构以及运行测试。
19 3
|
1天前
|
Shell Go API
Go语言grequests库并发请求的实战案例
Go语言grequests库并发请求的实战案例
|
20天前
|
Go 开发者
|
20天前
|
编译器 Go 开发者
|
27天前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
65 9
|
1月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
52 3
|
22天前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
22天前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
26天前
|
消息中间件 缓存 Kafka
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?