看来在您当前的项目中已经存在一个模块,声明其路径为 github.com/IBM/sarama,但我们需要使用的是 github.com/Shopify/sarama。这可能会导致依赖冲突。要解决这个问题,您可以采取以下步骤:
移除或更新现有的 go.mod 文件: 确保您的项目使用的是正确的 sarama 库。
更新 go.mod 文件: 确保您的 go.mod 文件引用的是 github.com/Shopify/sarama。您可以手动编辑 go.mod 文件,或者使用 go get 命令。
首先,删除现有的 go.mod 和 go.sum 文件:
rm go.mod go.sum
然后,重新初始化 Go 模块并获取正确的 sarama 库:
go mod init your_module_name go get github.com/Shopify/sarama
以下是一个完整的示例,包括 go.mod 文件和 main.go 文件。
1. 创建 go.mod 文件 go mod init your_module_name
然后在 go.mod 文件中确保引用 github.com/Shopify/sarama:
module your_module_name go 1.18 require github.com/Shopify/sarama v1.34.1
2. 创建 main.go 文件
package main import ( "context" "fmt" "log" "os" "os/signal" "github.com/Shopify/sarama" ) func main() { // Kafka broker addresses brokers := []string{"localhost:9092"} // Kafka topic topic := "your_topic" // Consumer group ID group := "your_group" // Sarama logger configuration sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) // Sarama configuration config := sarama.NewConfig() config.Version = sarama.V0_9_0_0 // Kafka version config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin config.Consumer.Offsets.Initial = sarama.OffsetOldest // Create new consumer group consumerGroup, err := sarama.NewConsumerGroup(brokers, group, config) if err != nil { log.Fatalf("Error creating consumer group: %v", err) } defer func() { if err := consumerGroup.Close(); err != nil { log.Fatalf("Error closing consumer group: %v", err) } }() // Create a new consumer instance consumer := Consumer{ ready: make(chan bool), } // Handle termination signals ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) defer cancel() // Run the consumer group in a separate goroutine go func() { for { if err := consumerGroup.Consume(ctx, []string{topic}, &consumer); err != nil { log.Fatalf("Error consuming messages: %v", err) } // Check if context was canceled, signaling termination if ctx.Err() != nil { return } consumer.ready = make(chan bool) } }() // Wait for consumer to be ready <-consumer.ready log.Println("Sarama consumer up and running! Press Ctrl+C to stop.") // Wait for termination signal <-ctx.Done() log.Println("Terminating Sarama consumer...") } // Consumer represents a Sarama consumer group consumer type Consumer struct { ready chan bool } // Setup is run at the beginning of a new session, before ConsumeClaim func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { // Mark the consumer as ready close(consumer.ready) return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic) session.MarkMessage(message, "") } return nil }
运行代码
确保 Kafka 服务器已启动,并且指定的主题存在,然后运行以上 Go 程序来消费 Kafka 消息。
go run main.go
这段代码将在控制台中打印消费到的 Kafka 消息的值、时间戳和主题信息。这样就可以避免模块路径冲突,并确保使用正确的 sarama 库。