go高并发之路——消息中间件kafka

简介: 本文介绍了高并发业务中的流量高峰应对措施,重点讲解了Kafka消息中间件的使用,包括常用的Go语言库sarama及其版本问题,以及Kafka的版本选择建议。文中还详细解释了Kafka生产者的四种分区策略:轮询、随机、按Key和指定分区,并提供了相应的代码示例。

一般高并发的业务都是某个时间段的请求量特别大,比如本人负责的直播业务,基本上一天就两个高峰段:早上和晚上的特定时间段。其它的时间里,流量基本都比较平稳。那么面对流量高峰,我们可以采取哪些措施呢?常见的有服务器和DB提前扩容、监控告警(盯监控)、流量削峰、加缓存、网关限流、服务降级等措施,具体问题具体分析。接下来,我们就学习下常见的抵御流量洪峰的一个手段——消息中间件。市面上常见的消息中间件有很多种类,比如Kafka、RabbitMQ、RocketMQ、ActiveMQ、ZeroMQ等。今天就和大家一起学习下kafka的一些常见的知识点和一些坑点。

一、kafka的类库

go常用的kafka库有sarama(推荐), 还有 confluent-kafka-go 等。 我基本上用的都是第一个类,但是有一个比较大的坑:就是在v1.32.0中,会出现生产kafka消息超时的一个问题。这个当时也是定位了蛮久的,通过日志排查、升级Kafka服务器版本、换代码写法、找腾讯云厂商定位等等措施,最终才定位到是这个版本有问题,被官方所废弃了。最终是将SDK升级到更高的版本才解决了此问题。

二、kafka的版本号

Kafka 目前总共演进了 8 个大版本,分别是 0.7、0.8、0.9、0.10、0.11、1.0 和 2.0、3.0,其中的小版本和 Patch 版本很多。有兴趣的可以去了解下各个版本都更新了什么  kafka官网

建议使用0.10.0.0之后的版本,因为这是里程碑式的大版本,该版本引入了 Kafka Streams。从这个版本起,Kafka 正式升级成分布式流处理平台,虽然此时的 Kafka Streams 还基本不能线上部署使用。0.10 大版本包含两个小版本:0.10.1 和 0.10.2,它们的主要功能变更都是在 Kafka Streams 组件上。自 0.10.2.1 版本起,新版本 Consumer API 算是比较稳定了。据我了解,目前公司现网环境使用的最低的版本也是这个0.10.2.1。

最后强烈建议,不论你用的是哪个版本,都得尽量保持服务器端版本和客户端版本一致,否则你将损失很多 Kafka 为你提供的性能优化收益。而且可能会出现很多莫名其妙的问题,比如kafka进程假死、连接不上broker等问题。

三、kafka生产者分区策略

使用过kafka的小伙伴都应该比较清楚,kafka下真实存储数据的地方是topic(主题)之下的partition(分区),而topic下的每条消息只会保存在某一个partition中,不会在多个分区中被保存多份。之所以topic之下还有partition,主要作用是为了提高kafka负载均衡的能力,提高系统的吞吐性。

标题中提到的分区策略就是决定生产者将消息发送到哪个分区的算法,那么kafka分区都有哪些策略呢? 主要有四个:

1、轮询策略,即按顺序分配,默认分区策略。举个例子,假设一个主题包含3个分区。第一条消息会被发送到分区0,第二条消息会被发送到分区1,第三条消息会被发送到分区2。接着,当生产第4条消息时,分配将重新开始,这条消息会被发送到分区0。以此类推。

2、随机策略,就是随意地将消息放置到任何一个分区,这个本质上和轮询差不多,也是为了将数据打散,使其均匀分布,但是打散效果比轮询差一点,好像新版本的kafka已经废弃了,改为默认是轮询分配了。

3、按key消息建保存策略。Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key可以是某个业务的标识划分比如公司、部门、业务ID等等。只要消息定义了key,那么就可以保证同一个key的所有消息都进入相同的分区里面。如果指定了 Key,那么默认实现按消息键保存策略;如果没有指定 Key,则使用轮询策略。这个方式作用非常强大,当你需要实现消息的顺序消费的时候,就可以指定这个key。

举个实际的使用场景,我这里有一个业务,用户会有两种行为,新增和更改,这两种行为我这边都会生产kafka消息给下游消费,那么这种情况下能使用上面的轮询和随机策略吗?很明显不行,假如新增和更改只隔了很短的一个时间间隔,然后这两条消息被推送到不同的分区,那么就可能出现这样的情况:消费者先消费了更改的数据,然后再消费到新增的数据,这样数据就乱了啊。那这时候,按key分区的策略就派上用场了,我可以将用户ID设置成一个key,那么该用户的数据都会落到同一个分区,且有先后顺序了,这样就不会出问题了。

下面是使用sarama实现的一个demo:

go

代码解读

复制代码

package main

import (
	"fmt"
	"log"
	"strconv"

	"github.com/IBM/sarama"
)

func main() {
	// 创建生产者配置
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Version = sarama.V1_1_1_0 //kafka指定版本号,与broker保持一致

	// 创建生产者
	producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		log.Fatalf("Failed to create Kafka producer: %v", err)
	}
	defer producer.Close()

	for i := 0; i < 100; i++ { //生产100条消息
		// 创建消息并指定分区
		message := &sarama.ProducerMessage{
			Topic: "live-task-reward",
			Key:   sarama.StringEncoder("jay"), //指定key,那么该key的100条消息都会落在同一个分区,落在哪个分区根据这个key计算出来
			Value: sarama.StringEncoder("Hello, Kafka!" + strconv.Itoa(i)),
		}

		// 发送消息
		partition, offset, err := producer.SendMessage(message)
		if err != nil {
			log.Fatalf("Failed to send message: %v", err)
		}

		fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)
	}

}

4、指定分区。就是在生产消息的时候可以直接指定分区生产,使消息落入到具体的某个分区中。下面是使用sarama实现的一个demo:

go

代码解读

复制代码

package main

import (
	"fmt"
	"log"
	"strconv"

	"github.com/IBM/sarama"
)

func main() {
	// 创建生产者配置
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.Partitioner = sarama.NewManualPartitioner //如果需要指定分区的时候,这个参数必须设置
	config.Version = sarama.V1_1_1_0                          //kafka指定版本号,与broker保持一致

	// 创建生产者
	producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		log.Fatalf("Failed to create Kafka producer: %v", err)
	}
	defer producer.Close()

	for i := 0; i < 100; i++ { //生产100条消息
		// 创建消息并指定分区
		message := &sarama.ProducerMessage{
			Topic:     "live-task-reward",
			Key:       sarama.StringEncoder("jay"), //即使这里指定了key,但kafka不会去计算该key。因为下面指定了分区1,那么所有数据都会落在分区1
			Value:     sarama.StringEncoder("Hello, Kafka!" + strconv.Itoa(i)),
			Partition: 1, // 指定分区为 1
		}

		// 发送消息
		partition, offset, err := producer.SendMessage(message)
		if err != nil {
			log.Fatalf("Failed to send message: %v", err)
		}

		fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)
	}

}

以上,分别讲了kafka的常用类库、版本和生产者分区策略的一些知识点和踩过的一些坑,这也都是用好kafka必须掌握的一些基础知识。


转载来源:https://juejin.cn/post/7397323474273910784

相关文章
|
7天前
|
人工智能 自动驾驶 大数据
预告 | 阿里云邀您参加2024中国生成式AI大会上海站,马上报名
大会以“智能跃进 创造无限”为主题,设置主会场峰会、分会场研讨会及展览区,聚焦大模型、AI Infra等热点议题。阿里云智算集群产品解决方案负责人丛培岩将出席并发表《高性能智算集群设计思考与实践》主题演讲。观众报名现已开放。
|
23天前
|
存储 人工智能 弹性计算
阿里云弹性计算_加速计算专场精华概览 | 2024云栖大会回顾
2024年9月19-21日,2024云栖大会在杭州云栖小镇举行,阿里云智能集团资深技术专家、异构计算产品技术负责人王超等多位产品、技术专家,共同带来了题为《AI Infra的前沿技术与应用实践》的专场session。本次专场重点介绍了阿里云AI Infra 产品架构与技术能力,及用户如何使用阿里云灵骏产品进行AI大模型开发、训练和应用。围绕当下大模型训练和推理的技术难点,专家们分享了如何在阿里云上实现稳定、高效、经济的大模型训练,并通过多个客户案例展示了云上大模型训练的显著优势。
|
27天前
|
存储 人工智能 调度
阿里云吴结生:高性能计算持续创新,响应数据+AI时代的多元化负载需求
在数字化转型的大潮中,每家公司都在积极探索如何利用数据驱动业务增长,而AI技术的快速发展更是加速了这一进程。
|
18天前
|
并行计算 前端开发 物联网
全网首发!真·从0到1!万字长文带你入门Qwen2.5-Coder——介绍、体验、本地部署及简单微调
2024年11月12日,阿里云通义大模型团队正式开源通义千问代码模型全系列,包括6款Qwen2.5-Coder模型,每个规模包含Base和Instruct两个版本。其中32B尺寸的旗舰代码模型在多项基准评测中取得开源最佳成绩,成为全球最强开源代码模型,多项关键能力超越GPT-4o。Qwen2.5-Coder具备强大、多样和实用等优点,通过持续训练,结合源代码、文本代码混合数据及合成数据,显著提升了代码生成、推理和修复等核心任务的性能。此外,该模型还支持多种编程语言,并在人类偏好对齐方面表现出色。本文为周周的奇妙编程原创,阿里云社区首发,未经同意不得转载。
11735 12
|
12天前
|
人工智能 自然语言处理 前端开发
100个降噪蓝牙耳机免费领,用通义灵码从 0 开始打造一个完整APP
打开手机,录制下你完成的代码效果,发布到你的社交媒体,前 100 个@玺哥超Carry、@通义灵码的粉丝,可以免费获得一个降噪蓝牙耳机。
5397 14
|
19天前
|
人工智能 自然语言处理 前端开发
用通义灵码,从 0 开始打造一个完整APP,无需编程经验就可以完成
通义灵码携手科技博主@玺哥超carry 打造全网第一个完整的、面向普通人的自然语言编程教程。完全使用 AI,再配合简单易懂的方法,只要你会打字,就能真正做出一个完整的应用。本教程完全免费,而且为大家准备了 100 个降噪蓝牙耳机,送给前 100 个完成的粉丝。获奖的方式非常简单,只要你跟着教程完成第一课的内容就能获得。
9611 15
|
1月前
|
缓存 监控 Linux
Python 实时获取Linux服务器信息
Python 实时获取Linux服务器信息
|
17天前
|
人工智能 自然语言处理 前端开发
什么?!通义千问也可以在线开发应用了?!
阿里巴巴推出的通义千问,是一个超大规模语言模型,旨在高效处理信息和生成创意内容。它不仅能在创意文案、办公助理、学习助手等领域提供丰富交互体验,还支持定制化解决方案。近日,通义千问推出代码模式,基于Qwen2.5-Coder模型,用户即使不懂编程也能用自然语言生成应用,如个人简历、2048小游戏等。该模式通过预置模板和灵活的自定义选项,极大简化了应用开发过程,助力用户快速实现创意。
|
5天前
|
机器学习/深度学习 人工智能 安全
通义千问开源的QwQ模型,一个会思考的AI,百炼邀您第一时间体验
Qwen团队推出新成员QwQ-32B-Preview,专注于增强AI推理能力。通过深入探索和试验,该模型在数学和编程领域展现了卓越的理解力,但仍在学习和完善中。目前,QwQ-32B-Preview已上线阿里云百炼平台,提供免费体验。
|
13天前
|
人工智能 C++ iOS开发
ollama + qwen2.5-coder + VS Code + Continue 实现本地AI 辅助写代码
本文介绍在Apple M4 MacOS环境下搭建Ollama和qwen2.5-coder模型的过程。首先通过官网或Brew安装Ollama,然后下载qwen2.5-coder模型,可通过终端命令`ollama run qwen2.5-coder`启动模型进行测试。最后,在VS Code中安装Continue插件,并配置qwen2.5-coder模型用于代码开发辅助。
917 5