Kratos微服务框架下的消息队列应用

简介: Kratos微服务框架下的消息队列应用,包括了:Kafka、Rabbitmq、mqtt、redis,nats,websocket等。

什么是消息队列

MQ就是消息队列,是Message Queue的缩写。消息队列是一种通信方式。消息的本质就是一种数据结构。因为MQ把项目中的消息集中式的处理和存储,所以MQ主要有解耦,并发,和削峰的功能。

为什么要使用消息队列

1. 异步

通常的微服务实现里面,都是通过RPC进行微服务之间的相互调用,这是同步的。如果消息队列的话,可以实现异步的调用。至于异步有啥好处呢,主要是为了削峰。

2. 削峰

同步的调用会带来一个问题:瞬时流量。客户的调用同步接口节奏,你是无法把控的,流量将会是忽高忽低的,猛的来一波,搞不好系统就崩了溃了。

如果消息队列的话,可以实现异步的调用,并且可以实现削峰,请求进来,我先放到消息队列里面去,慢慢消化掉,不至于猛的来一下,把系统击垮。

3. 解耦

通常的微服务实现里面,都是通过RPC进行微服务之间的相互调用,那么意味着,你要做到一件事情,你必须要知道做事情的对方是谁。在微服务的世界里面,如果设计得不好,那就是一团糟的相互调用网络,看得你晕晕的,运维会疯,后面接手的开发人员也得疯。

应用了消息队列,你就只需要跟消息队列这个代理打交道,单线联系,关系简单。我们只需要生产消息,消费消息,至于是谁消费的,谁生产的,完全不用去管它。架构上,就会清爽多了。所以,要对服务进行解耦,消息队列是一个很好的选择。

Kratos与消息队列

Kratos现在的版本(v2.2.1)中,还没有对消息队列的直接支持,但是要运用还是容易的。官方有一个空壳示例代码BeerShop,可以看到,在data层,使用Kafka的痕迹。

对于在Kratos微服务框架里面应用消息队列,我认为有两种方式可以实现:

  1. 在data层,使用消息队列,但是在这个层,你只能在那生产消息,而不好消费消息。
  2. 将消息队列的客户端实现为微服务的一个Server,然后在微服务的Service中消费消息和生产消息。

第一个方式的应用面不广,更多的时候,第二种方式的应用面会更广一些,我选择了第二种方式。但是,Kratos官方并没有支持这一种方式。故而,我只能够自己动手实现了,我从另外一个微服务Go-Micro里面提取了其Broker的实现,并且将其实现为Kratos框架里面的一个Server。事实证明,这样是可行的,并且很好使。

你可能会问,为什么我不直接使用Go-Micro呢?因为Go-Micro是一个很重的微服务框架,尽管它的功能很丰富,几乎支持了大部分的微服务需求。但是对于一个应用来说,我并不需要使用所有的技术栈、中间件,我只需要部分的技术栈。所以,我宁愿做加法,也不愿意去做减法。对于服务端来说,可控、可用、可维护是最重要的。极简,是一个很好的选择。另外,我还要腹诽一点,我从Go-Micro提取出来的Broker在测试的过程中发现,都有一些瑕疵。

我实现的代码,我放到了github:https://github.com/tx7do/kratos-transport

它所支持的协议和消息队列有:

  • Kafka
  • RabbitMQ
  • NATS
  • Redis
  • MQTT
  • WebSocket

基本上是够用了。

kratos-transport的应用

它主要分为了3个部分:

1. Codec 编解码器

这一块和Broker都是从Go-Micro提取出来的,但是它对我的应用来说,还并没有什么用处,因为我的编解码器是很独特的,需要定制的。所以,我暂时还没用上这一块。

2. Broker 消息队列

可以直接拿来用,我拿Kafka举例:

packagekafkaimport (
"context""fmt""github.com/tx7do/kratos-transport/broker""os""os/signal""syscall""testing")
funcTestSubscribe(t*testing.T) {
interrupt :=make(chanos.Signal, 1)
signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
ctx :=context.Background()
b :=NewBroker(
broker.Addrs("127.0.0.1:9092"),
broker.OptionContext(ctx),
    )
_, _=b.Subscribe("logger.sensor.ts", receive,
broker.SubscribeContext(ctx),
broker.Queue("fx-group"),
    )
<-interrupt}
funcreceive(eventbroker.Event) error {
fmt.Println("Topic: ", event.Topic(), " Payload: ", string(event.Message().Body))
//_ = event.Ack()returnnil}
funcTestPublish(t*testing.T) {
ctx :=context.Background()
b :=NewBroker(
broker.Addrs("127.0.0.1:9092"),
broker.OptionContext(ctx),
    )
varmsgbroker.Messagemsg.Body= []byte(`{"Humidity":60, "Temperature":25}`)
_=b.Publish("logger.sensor.ts", &msg)
}

3. Server 封装给Kratos的Server实现

还是拿Kafka举例:

packagemainimport (
"fmt""github.com/go-kratos/kratos/v2""log""github.com/tx7do/kratos-transport/broker""github.com/tx7do/kratos-transport/transport/kafka")
funcmain() {
//ctx := context.Background()kafkaSrv :=kafka.NewServer(
kafka.Address("127.0.0.1:9092"),
kafka.Subscribe("test_topic", "a-group", receive),
    )
app :=kratos.New(
kratos.Name("kafka"),
kratos.Server(
kafkaSrv,
        ),
    )
iferr :=app.Run(); err!=nil {
log.Println(err)
    }
}
funcreceive(eventbroker.Event) error {
fmt.Println("Topic: ", event.Topic(), " Payload: ", string(event.Message().Body))
returnnil}
funcsendData(sendData []byte) error {
varmsgbroker.Messagemsg.Body=sendDatakafkaSrv.Publish(topic, &msg)
}

另外再看一个例子,是Websocket的,它的应用其实也是很广的:

packagemainimport (
"fmt""log""github.com/go-kratos/kratos/v2""github.com/tx7do/kratos-transport/transport/websocket")
funcmain() {
//ctx := context.Background()wsSrv :=websocket.NewServer(
websocket.Address(":8800"),
websocket.ReadHandle("/ws", handleMessage),
websocket.ConnectHandle(handleConnect),
    )
app :=kratos.New(
kratos.Name("websocket"),
kratos.Server(
wsSrv,
        ),
    )
iferr :=app.Run(); err!=nil {
log.Println(err)
    }
}
funchandleConnect(connectionIdstring, registerbool) {
ifregister {
fmt.Printf("%s connected\n", connectionId)
    } else {
fmt.Printf("%s disconnect\n", connectionId)
    }
}
funchandleMessage(connectionIdstring, message*websocket.Message) (*websocket.Message, error) {
fmt.Printf("[%s] Payload: %s\n", connectionId, string(message.Body))
varrelyMsgwebsocket.MessagerelyMsg.Body= []byte("hello")
return&relyMsg, nil}

具体的应用实例

我写了两个实例代码,并且都已经提交到了Kratos的examples代码仓库中去了。这两个例子都是物联网方面的应用。

kratos-cqrs

这是一个简单的CQRS的实现,主要就是拿了Kafka来消费来自于传感器的遥感数据,然后把数据存储到数据库中去。

需要注意的是,这个实例并不够完整,我并没有实现MQTT的消费,没有实现前端页面等等。只实现了对Kafka的消费。

kratos-realtimemap

这是一个完整的例子,有前端,有后端,可以完整的跑起来看。

通过MQTT接收一个开放的公交遥测数据源,然后通过REST和Websocket向前端发送数据,在地图上展现出来车辆的轨迹、车辆的位置、车辆的速度、开关门状态等等。

目录
相关文章
|
2月前
|
Prometheus 监控 Kubernetes
Prometheus 在微服务架构中的应用
【8月更文第29天】随着微服务架构的普及,监控和跟踪各个服务的状态变得尤为重要。Prometheus 是一个开源的监控系统和时间序列数据库,非常适合用于微服务架构中的监控。本文将详细介绍 Prometheus 如何支持微服务架构下的监控需求,包括服务发现、服务间的监控指标收集以及如何配置 Prometheus 来适应这些需求。
58 0
|
2月前
|
监控 JavaScript 测试技术
从单体应用迁移到微服务的最佳实践
【8月更文第29天】随着软件架构的发展,越来越多的企业开始考虑从传统的单体应用迁移到微服务架构。虽然迁移可以带来诸如更好的可扩展性、更高的灵活性等优势,但这一过程也可能充满挑战。本文将详细介绍如何顺利地进行这一转变,并提供一些实用的步骤和示例代码。
73 0
|
1月前
|
Dubbo Java 应用服务中间件
微服务框架Dubbo环境部署实战
微服务框架Dubbo环境部署的实战指南,涵盖了Dubbo的概述、服务部署、以及Dubbo web管理页面的部署,旨在指导读者如何搭建和使用Dubbo框架。
85 17
微服务框架Dubbo环境部署实战
|
15天前
|
Kubernetes Java Android开发
用 Quarkus 框架优化 Java 微服务架构的设计与实现
Quarkus 是专为 GraalVM 和 OpenJDK HotSpot 设计的 Kubernetes Native Java 框架,提供快速启动、低内存占用及高效开发体验,显著优化了 Java 在微服务架构中的表现。它采用提前编译和懒加载技术实现毫秒级启动,通过优化类加载机制降低内存消耗,并支持多种技术和框架集成,如 Kubernetes、Docker 及 Eclipse MicroProfile,助力开发者轻松构建强大微服务应用。例如,在电商场景中,可利用 Quarkus 快速搭建商品管理和订单管理等微服务,提升系统响应速度与稳定性。
31 5
|
22天前
|
存储 搜索推荐 数据库
MarkLogic在微服务架构中的应用:提供服务间通信和数据共享的机制
随着微服务架构的发展,服务间通信和数据共享成为关键挑战。本文介绍MarkLogic数据库在微服务架构中的应用,阐述其多模型支持、索引搜索、事务处理及高可用性等优势,以及如何利用MarkLogic实现数据共享、服务间通信、事件驱动架构和数据分析,提升系统的可伸缩性和可靠性。
22 5
|
22天前
|
运维 Cloud Native Devops
云原生架构的崛起与实践云原生架构是一种通过容器化、微服务和DevOps等技术手段,帮助应用系统实现敏捷部署、弹性扩展和高效运维的技术理念。本文将探讨云原生的概念、核心技术以及其在企业中的应用实践,揭示云原生如何成为现代软件开发和运营的主流方式。##
云原生架构是现代IT领域的一场革命,它依托于容器化、微服务和DevOps等核心技术,旨在解决传统架构在应对复杂业务需求时的不足。通过采用云原生方法,企业可以实现敏捷部署、弹性扩展和高效运维,从而大幅提升开发效率和系统可靠性。本文详细阐述了云原生的核心概念、主要技术和实际应用案例,并探讨了企业在实施云原生过程中的挑战与解决方案。无论是正在转型的传统企业,还是寻求创新的互联网企业,云原生都提供了一条实现高效能、高灵活性和高可靠性的技术路径。 ##
29 3
|
27天前
|
Cloud Native 持续交付 云计算
云原生之旅:从传统应用到容器化微服务
随着数字化转型的浪潮不断推进,企业对IT系统的要求日益提高。本文将引导你了解如何将传统应用转变为云原生架构,重点介绍容器化和微服务的概念、优势以及实施步骤,旨在帮助读者掌握将应用迁移到云平台的关键技巧,确保在云计算时代保持竞争力。
22 5
|
1月前
|
存储 Java Maven
从零到微服务专家:用Micronaut框架轻松构建未来架构
【9月更文挑战第5天】在现代软件开发中,微服务架构因提升应用的可伸缩性和灵活性而广受欢迎。Micronaut 是一个轻量级的 Java 框架,适合构建微服务。本文介绍如何从零开始使用 Micronaut 搭建微服务架构,包括设置开发环境、创建 Maven 项目并添加 Micronaut 依赖,编写主类启动应用,以及添加控制器处理 HTTP 请求。通过示例代码展示如何实现简单的 “Hello, World!” 功能,并介绍如何通过添加更多依赖来扩展应用功能,如数据访问、验证和安全性等。Micronaut 的强大和灵活性使你能够快速构建复杂的微服务系统。
61 5
|
29天前
|
缓存 Java 应用服务中间件
随着微服务架构的兴起,Spring Boot凭借其快速开发和易部署的特点,成为构建RESTful API的首选框架
【9月更文挑战第6天】随着微服务架构的兴起,Spring Boot凭借其快速开发和易部署的特点,成为构建RESTful API的首选框架。Nginx作为高性能的HTTP反向代理服务器,常用于前端负载均衡,提升应用的可用性和响应速度。本文详细介绍如何通过合理配置实现Spring Boot与Nginx的高效协同工作,包括负载均衡策略、静态资源缓存、数据压缩传输及Spring Boot内部优化(如线程池配置、缓存策略等)。通过这些方法,开发者可以显著提升系统的整体性能,打造高性能、高可用的Web应用。
58 2
|
1月前
|
Cloud Native 安全 Java
Micronaut对决Spring Boot:谁是微服务领域的王者?揭秘两者优劣,选对框架至关重要!
【9月更文挑战第5天】近年来,微服务架构备受关注,Micronaut和Spring Boot成为热门选择。Micronaut由OCI开发,基于注解的依赖注入,内置多种特性,轻量级且启动迅速;Spring Boot则简化了Spring应用开发,拥有丰富的生态支持。选择框架需考虑项目需求、团队经验、性能要求及社区支持等因素。希望本文能帮助您选择合适的微服务框架,助力您的软件开发项目取得成功!
99 2

热门文章

最新文章

下一篇
无影云桌面