Hydras 是什么?
Hydra 是一个 NodeJS 包(技术栈不是重点,思想!思想!思想!
),它有助于构建分布式应用程序,比如微服务。
Hydra 提供服务发现(service discovery
)、分布式消息传递(distributed messaging
)、 消息负载平衡(message load balancing
)、日志记录(logging
)、呈现(presence
)和运行状况(health
)监视等功能。Hydra 利用 Redis 的强大力量做到了这一点。
例如,使用 Hydra
的 sendMessage
和 makeAPIRequest
调用, 一个应用程序可以按名称与其他应用程序通信,而无需指定 IP 或端口信息。
在运行命名服务的多个实例的常见用例中, Hydra
将根据可用实例的存在信息对其请求进行负载平衡。如果目标实例没有响应,Hydra
甚至会重新路由请求。
Hydra
项目为 ExpressJS
开发人员提供了 hydra-express 模块。Hydra-express API 路由可以自动注册并提供给 Hydra 项目的 hydra-router service
, 它是一种 service 感知的 API 和消息传递路由器。Hydra-router 甚至将接受 WebSocket 消息并将其路由到其相应的服务。
为什么要用 Hydra?
Hydra
提供的大部分功能都可以通过组合使用其他库、框架和基础设施工具来实现。那么,为什么要用 Hydra 呢?
我们创建 Hydra
的目标是创建一个单一的、轻量级的 NPM
包,它为微服务提供解决方案, 而不会迫使开发人员以不同的方式来构建微服务。在很大程度上,我们想让熟悉 ExpressJS
的开发者也能使用 Hydra
。另一个目标是,我们相信开发人员不必是 DevOps
专业人员才能开始使用微服务。简而言之,Hydra
降低了构建和部署微服务的门槛。
Hydra 与 Redis
Hydra 项目的主要目标是在不牺牲健壮性和可伸缩性的情况下, 降低构建和部署可伸缩 Node 微服务的障碍。作为轻量级的库,hydra-core
已在从小型 IoT
设备到 Dockerized
云基础架构的各种情况下使用。
Hydra 利用 Redis
的强大功能来解决以下微服务问题:
- 存在(
presence
)和健康(health
)监测 - 服务发现(
service discovery
) - 路由发现(
route discovery
) - 服务间消息传递(
inter-service messaging
)
让我们检查每一个关注点。
存活状态(presence
)和健康(health
)监测
微服务需要传达其存在状态,以便可以对其进行监视并通过网络与之可靠地进行通信。将消息发送到不正常的服务可能会导致级联的结果,具体取决于应用程序体系结构的健壮性。
应用程序需要在调用它们之前了解对等微服务的状态。这样路由软件可以避免不健康的服务,并在问题致命之前将其报告出来。
使用 Hydra 的微服务将其状态和运行状况存储在 Redis 中, 该信息可供对等服务和监视代理使用。
服务发现(service discovery
)
分布式应用程序通过相互发送消息来协同工作。大多数情况下,这些消息采用 HTTP Restful API
的形式。另一种常见的方法是基于套接字(socket-based
)的消息传递。
为了彼此合作,应用程序需要知道对等服务的位置。也就是说,他们需要知道目标 IP 和端口地址。使用 DNS 条目或反向代理服务器(例如 Nginx )可以部分解决此问题。但是,这种方法的缺点是这些解决方案需要管理。意思是,没有自动或动态发现发生。
由 Hydra 提供支持的服务可以使用 Redis 注册其服务名称(service name
),IP地址和端口。结合注册和服务存在信息,可以使对等服务彼此发现。
路由发现(route discovery
)
使用 Hydra
的应用程序可以注册其 Restful API
路由,以便对等应用程序可以发现它们。 Hydra-Router
是一种动态且可感知服务的路由器, 它使用存储的路由信息将外部请求定向到云或集群环境中的服务。
服务间消息传递(inter-service messaging
)
Redis
提供消息传递服务,以允许应用程序发布和订阅消息。这种通信是基于套接字的(socket-based
),并且性能很高。Hydra 在 Redis 消息传递之上添加了一层, 以通过发送包含路由信息(例如命名服务 named services)的JSON消息,使应用程序彼此通信。您无需考虑IP地址或端口,也无需考虑哪个服务实例将收到消息。
Redis 的灵活性
Redis 是理想的,其原因有很多,Redis 在开发人员中的受欢迎程度持续上升。在在线民意调查中,它的排名也很高。
Redis 可能已经在您的环境中,因为它通常是智能缓存的首选解决方案。
在 Redis 之上构建 Hydra 的一个关键原因是因为 Redis 可在 IoT 设备, 笔记本电脑,云基础架构上使用,并受到 RedisLabs 等托管环境的良好支持。这确实使开发人员能够将 Node 微服务构建和部署到任何这些环境。
Hydra Express-快速教程
Hydra 是一个 NPM 模块,用于快速构建 Node-based 的微服务。Hydra-express 是包装 Hydra 和 ExpressJS 的模块。在本指南中,我们将着眼于创建一个 hydra-express 应用程序,并了解它可以做什么。
第 1 步-获取 Redis
Hydra 需要使用 Redis 服务器。如果您从未使用过 Redis,我们认为这将是一次改变生活的开发人员经验, 希望这是最终尝试它的一个很好的理由!
如果您已经在使用 Redis,那么恭喜您已经升级了,请随时跳至第2步!
有很多方法可以访问 Redis 服务器。最简单的方法之一是通过 RedisLabs 等提供商注册免费套餐。
如果你喜欢 Docker,你可以在几分钟内安装官方的 Redis 镜像。对于 PC 用户来说,这也是一个不错的选择。
在 Mac 上,您可以使用一个简单的命令通过 Homebrew 安装Redis:brew install redis
。
如果您不介意从源代码构建 Redis,请查看《Redis快速入门指南》
。
但最简单的方法是第一种选择,它不需要安装——只需登录免费的云服务提供商。
这里强烈建议使用 Docker
!
第 2 步-Hyda CLI 工具
有了 Redis 的访问权限,您现在应该安装 hydra 工具:
确保您使用的是 NodeJS 6.2.1
或更高版本-Hydra 是使用 ES6 构建的!
sudo npm install -g yo generator-fwsp-hydra hydra-cli
这样就安装了方便的 Yeoman 和 hydra 生成器以及命令行客户端。
让我们配置Hydra命令行客户端。
$ hydra-cli config redisUrl: 127.0.0.1 redisPort: 6379 redisDb: 15
上面的例子假设你已经在本地安装了 redis
。如果没有,只需提供云服务提供的 redisUrl
和 redisDb
即可。
现在我们都准备好了。让我们构建一个微服务!
第 3 步-构建和测试微服务
让我们构建一个名为 hello的服务。我们将使用方便的 Hydra生成器,大多数情况下选择默认设置。
$ yo fwsp-hydra ? Name of the service (`-service` will be appended automatically) hello ? Host the service runs on? ? Port the service runs on? 0 ? What does this service do? Says hello ? Does this service need auth? No ? Is this a hydra-express service? Yes ? Set up a view engine? No ? Enable CORS on serverResponses? No ? Run npm install? No create hello-service/.editorconfig create hello-service/.eslintrc create hello-service/.gitattributes create hello-service/.nvmrc create hello-service/.jscsrc create hello-service/specs/test.js create hello-service/specs/helpers/chai.js create hello-service/.gitignore create hello-service/package.json create hello-service/README.md create hello-service/hello-service.js create hello-service/config/sample-config.json create hello-service/config/config.json create hello-service/routes/hello-v1-routes.js Done! 'cd hello-service' then 'npm install' and 'npm start'
这是创建的:
. ├── README.md ├── config │ ├── config.json │ └── sample-config.json ├── hello-service.js ├── node_modules ├── package.json ├── routes │ └── hello-v1-routes.js └── specs ├── helpers └── test.js
编辑 routes/hello-v1-routes.js
,使事情变得更有趣。
将第18行更改为:
result: {}
到:
result: { msg: `${hydra.getServiceName()} - ${hydra.getInstanceID()}` }
按照上述说明,我们可以继续构建我们的服务。
$ cd hello-service $ npm install $ npm start
启动服务后,我们看到它使用随机端口启动。
serviceInfo { serviceName: 'hello-service', serviceIP: '10.1.1.163', servicePort: 8891 }
您可以通过 curl
访问该服务:
$ curl 10.1.1.163:8891/v1/hello {"statusCode":200,"statusMessage":"OK","statusDescription":"Request succeeded without error","result":{"msg":"hello-service - 50bf4346dd492c2036cfd57ad8bd2844"}}
或通过浏览器:http://10.1.1.163:8891/v1/hello
我们还可以使用已安装的 hydrai-cli
app 获取有关我们服务的信息:
$ hydra-cli nodes [ { "serviceName": "hello-service", "serviceDescription": "Says hello", "version": "0.0.1", "instanceID": "b1554f404acc3268c1511dc84ae43c50", "updatedOn": "2016-11-15T18:18:56.438Z", "processID": 20542, "ip": "10.1.1.163", "port": 8891, "elapsed": 4 } ]
{ "hello-service": [ "[GET]/_config/hello-service", "[get]/v1/hello/" ] }
此信息由我们的服务发出,它使服务可以彼此发现并相互发送消息。与 Hydra-Router
结合使用,您可以构建整个微服务网络。
要了解如何使用新的微服务,请参见 Hydra 方法。
推荐的后续步骤
- 使用 hydra 生成器创建您自己的测试项目
- 查看示例演示项目
Core
Hydra(core)
是为 Hydra
项目提供动力的 NPM 包。如果您正在使用 ExpressJS
构建您的服务, 您应该检查看 Hydra-Express package 包, 它是专门为利用 ExpressJS 的底层功能而设计的。
本节介绍了核心 Hydra 模块,该模块旨在使微服务的构建和/或使非服务(non-service
)应用程序能够发现和利用微服务。因此,Hydra 在构建分布式应用程序时可帮助解决各种问题。
虽然 Hydra 是为 NodeJS 实现的,但它支持的功能也可以在其他平台上实现。
核心服务依赖于共享的 Redis 实例或集群,比如 Amazon 的 ElasticCache
- 要了解更多关于 Redis 的信息,请查看我们的快速入门教程 并访问 Redis.io
作为一个 Node module,Hydra 提供了嵌入式(drop-in
)功能,旨在解决以下微服务问题:
- 服务注册(
Service Registration
):允许服务在上线时注册自己并发布它们的HTTP API
路由。 - API 可路由性(
API Routability
):允许将API调用路由到微服务。 - 消息传递通信(
Messaging Communication
):通过发布和订阅通道以及消息队列进行的服务间通信。 - 服务负载平衡(
Service Load Balancing
):基于可用的(现有的)微服务实例自动平衡请求。 - 服务发现(
Service Discovery
):在不需要硬编码其IP地址和端口信息的情况下定位服务。 - 运行状况报告(
Health Reporting
):自动运行状况检查报告,用于回答以下问题:应用程序是否健康?它运作正常吗? - 存在状态报告(
Presence Reporting
):服务实例实际可用吗?
在本文档中,我们将引用服务(services
)和服务实例(service instances
)。服务实例和服务节点指的是同一件事。服务只是赋予一个或多个服务实例的名称,将其视为服务的一类。例如,我们可能有一个服务来处理图像大小调整,而我们可以简单地调用该服务 image-resizer
。在我们的云基础架构中,为了响应高需求,我们可能会运行三个 image-resizer
服务实例。每个实例都是服务实例或节点。
在 Hydra 中,服务实例仅仅是使用 Hydra 处理微服务问题的过程。
安装和初始化 Hydra
要从另一个项目中使用 Hydra
:
$ npm install hydra
导入 Hydra
const hydra = require('hydra');
初始化
导入时,会加载 Hydra 模块,但必须先将其初始化才能使用。
hydra.init(initObject);
初始化对象包含以下字段:
{ serviceName: 'hydramcp', serviceDescription: 'Hydra Master Control Program', serviceIP: '', servicePort: 0, serviceType: 'mcp', redis: { host: '127.0.0.1', port: 6379, db: 0 } }
所有显示的字段都是必需的。但是,如果您的应用程序不打算作为服务运行,那么下面的值可以为空并将被忽略。如果您不打算使用这些值,那么最好将它们空白。但是,此时 serviceName
不能为空。
serviceDescription: '', serviceDNS: '', serviceIP: '', servicePort: 0, serviceType: '',
重要:
- 当
Hydra
在一个服务中使用时,如果serviceIP
等于一个空字符串(''),那么将使用该机器的本地IP,否则需要一个四段IP地址(52.9.201.160)
。如果servicePort
等于0
,那么 Hydra 将选择一个随机端口。在需要微服务使用特定端口地址的情况下设置servicePort
。 hydra.serviceDNS
条目允许您指定服务 DNS 而不是 IP 地址。
这使您可以将服务放置在外部负载平衡器(例如 Nginx
或 Docker Swarm
的内部 DNS
)之后。存在值时,serviceDNS
条目将忽略 serviceIP
字段-即使它包含一个值。
- 对于集群中的所有网络服务,必须将
hydra.redis.dbvalue
设置为相同的值。
不这样做会影响服务的可发现性和监视。在 Hydra 中未对 redis 数据库值进行硬编码的原因是, 不能保证 Redis 实例上存在的数据库数量在提供商之间是相同的。因此,最终服务实现者(您?)需要设置此值的灵活性,从而承担责任。
在一个实际的生产系统中 Hydra JSON
可能被嵌入到一个更大的配置文件中,比如 properties.js
文件:
exports.value = { appServiceName: 'hydramcp', cluster: false, environment: 'development', maxSockets: 500, logPath: '', hydra: { serviceName: 'hydramcp', serviceDescription: 'Hydra Master Control Program', serviceVersion: '1.0.0', serviceIP: '', serviceDNS: '', servicePort: 0, serviceType: 'mcp', serviceWorker: false, redis: { host: '127.0.0.1', port: 6379, db: 0 } } };
当使用这种方法时,只需在初始化过程中传递 hydra 分支:
hydra.init(config.hydra);
如果要在要初始化文件的同一文件中使用 hydra
, 则可以先等待 hydra.init()
返回的 promise
,然后再使用其他 hydra
方法。
// index.js hydra.init({...}) .then(...);
但是,如果从单独的文件导入 hydra
实例,则需要调用 hydra.ready()
方法。
hydra.ready()
返回与 hydra.init()
完全相同的 promise
,尽管这样做无需重新初始化 hydra
实例。
// my-hydra.js import hydra from 'hydra'; hydra.init({...}); export default hydra;
// service.js import hydra from './my-hydra.js'; hydra.ready().then(...);
调用 hydra.init()
之后,可以随时使用 hydra.ready()
来等待初始化完成。
Redis 配置
除了 host
、port
和 db
,你可以通过 node redis client createClient
方法支持的任何选项。
retry_strategy
是一个例外,它在 redis.createClient
中带有一个函数参数。 Hydra
提供了 retry_strategy(hydra._redisRetryStrategy)
,它是通过hydra.redis.retry_strategy
选项配置的,而不是直接传递给 redis.createClient
:
redis: { host: "127.0.0.1", port: 6379, db: 15, retry_strategy: { maxReconnectionPeriod: 15, maxDelayBetweenReconnections: 5 } }
您还可以选择使用 url
参数代替 host
,port
,db
和 password
。有关详细信息,请参见 IANA registration。以下等效于上面的 host/port/db
:
redis: { url: 'redis://127.0.0.1:6379/15' }
注意:如果你传入一些 host
、port
、db
和 password
的组合,url
中的值会被更具体的条目覆盖:
redis: { url: 'redis://127.0.0.1:6379/15', db: 10 }
这将连接到数据库 10
,而不是数据库 15
。
通过 unix socket
连接
{ "redis": { "path": "/tmp/redis.sock" } }
Hydra 模式
Hydra
可配置为两种使用模式之一:
- 服务模式(
Service mode
)—— 充当服务和其他服务的消费者。 - 消费者模式(
Consumer mode
)—— 只能充当服务消费者,而不能成为服务。
服务模式(Service mode
)
要在 Service mode
下使用 Hydra,您必须先使用以下方式注册:
hydra.registerService();
注意:如果您的应用程序不需要成为服务,那么您就不需要执行此注册。
注册服务后,hydra 会在生成日志事件或消息到达时发出 NodeJS 事件。您可以按以下方式监听这些事件:
hydra.registerService(); hydra.on('log', function(entry) {}); hydra.on('message', function(message) {});
消费者模式(Consumer mode
)
如果消费者模式
应用程序调用与服务模式
相关的方法, 将导致异常(exception
)或 promise
失败。每个调用都在本文档的最后被清楚地记录下来,以帮助避免误用。但是始终要确保您的应用程序经过了充分的测试。
Service Discovery(服务发现)
服务(Service
)和消费者(Consumer
)模式应用程序都可以发现其他服务。但是请记住,消费者模式应用程序本身无法被发现。只能发现注册的服务。
使用 findService()
方法发现服务(Services
)。 findService()
方法接受服务名称,并返回一个 promise
, 该 promise
将 resolve
为服务信息对象;如果找不到该服务,则返回一个被拒绝的 promise
。
hydra.findService('imageprocessor') .then((service) => { console.log(service); }) .catch((err) => { console.log('catch err', err); });
返回的服务对象可能如下所示:
{ "serviceName": "imageprocessor", "processID": 25246, "registeredOn": "2016-03-26T18:26:31.334Z", "ip": "10.0.0.4", "port": 9001, "type": "image:processing" }
然后,应用程序可以使用 ip
和 port
信息来调用 imageprocessor
服务上的 API
。
Presence(存活状态)
仅仅因为可以找到服务并不意味着它当前已在线且处于活动状态。在不幸的情况下,所讨论的服务可能会失败和/或暂时不可用。
Hydra 提供了 getServicePresence()
方法来确定服务当前是否可用。如果可用,则返回这样的对象:
{ "updatedOn": "2016-03-28T01:43:45.756Z" }
如果不可用,则 getservicepresence()
返回一个被拒绝的 promise
。
健康检查(Health
)与存活状态(Presence
)
将 Hydra
配置为服务模式(service mode
)后, 它将自动在指定的 Redis
服务器中记录机器和应用程序级别的信息。此外,Hydra
还发送存活状态(Presence
)信息。不幸的是,如果主机应用程序崩溃,那么 Hydra 自然会停止更新存活状态信息。
此外,Hydra
会维护一个内部日志,用于存储检测到的问题。我们可以将其视为黑匣子飞行记录器(flight recorder
)。
尽管所有这些都是自动发生的, 但是您的应用程序可以使用 Hydra
的 sendToHealthLog()
方法来扩充存储的信息。您还可以使用 getServiceHealthLog()
方法检索日志。
记住,您还可以通过在服务注册期间注册日志事件侦听器,在这些日志条目发生时直接接收它们。
使用 Hydra 监视服务
HydraMCP web
应用程序演示了如何监视 Hydra
服务。有两种监测方法:
- 读取 hydra 服务写入 Redis 的数据
- 使用 Hydra 方法接收聚合服务(
aggregate service
)数据。
后一种方法被推荐,因为它被认为对未来 Hydra 如何在 Redis 中存储数据的潜在变化更具弹性。
以下方法有助于服务的自省(introspection
)和控制(control
)。
Method | Description |
getServices | 检索已注册服务的列表。 |
findService | 找到特定的服务。 |
getServicePresence | 检索特定服务的存活状态 |
getServiceHealthAll | 检索所有注册服务的健康信息和健康日志。 |
makeAPIRequest | 向命名服务发出API请求。 |
有关 Hydra 功能的完整列表,请参阅本文档的最后部分。
messaging(消息传递)
Hydra 通过以下方式支持服务间通信:
- 发现并直接使用服务器的网络信息(IP和端口)。
- 通过使用
makeAPIRequest
方法。 - 使用服务间(
inter-service
)消息传递。 - 使用服务消息队列(
service message queues
)。
您使用哪种方法取决于您的应用程序的需求和您愿意做的额外工作的数量。使用 Hydra 的消息传递方法抽象了您可能需要处理的网络层功能。因此,它提供了一种更简单、更可靠的与远程服务交互的方式。
发现和直接使用该服务的网络信息很简单:
let apiRoute = '/v1/email/send'; hydra.findService('emailer') .then((service) => { let url = `http://${service.ip}:${service.port}/${apiRoute}`; let options = { headers: { 'content-type': 'application/json', 'Accept': 'application/json; charset=UTF-8' }, method: 'post' }; options.body = emailObject; fetch(url, options) : :
注意:在使用上述方法之前,应该使用 getServicePresence 方法检查服务是否可用。毕竟,我们希望确保服务已注册,并且当前可用。
在这里,使用 Hydra
的 makeAPIRequest
方法变得更容易且更不容易出错。 makeAPIRequest
方法接受一个对象,该对象包含服务的名称以及其他有用但可选的信息。该方法自动处理服务可用性检查,如果该服务暂时不可用,甚至可以将消息(请求)推送到服务的消息队列中。这是可选行为,并假定这对于发送方是可接受的,并且远程服务能够将请求作为排队的消息进行处理。
let message = hydra.createUMFMessage({ to: 'emailer:/v1/email/send', from: 'website:backend', body: { to: 'user@someplace.com', from: 'marketing@company.com', emailBody: 'Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium' fallbackToQueue: true } }); hydra.makeAPIRequest(message) then() :
服务间的消息传递(Inter-service messaging
)
使用 Hydra
,您可以在服务之间发送消息,甚至可以在一系列服务之间路由消息。这是 Hydra-Router
提供的功能之一。
内置消息通道(Built-in message channels
)
每个 hydra 服务都会自动监听两个内置通道,其他服务发送的消息会在其中到达。
一个通道监听发送到某一类型服务的任何消息,另一个通道监听指向特定服务实例的消息。因此,发送到 file-processing
的消息将被该服务的所有实例接收。而发送到5585f53bd1171db38eafd79bf16e02f4@file-processing
的消息只能由 ID
为5585f53bd1171db38eafd79bf16e02f4
的服务实例处理。
要将消息发送到服务,可以使用 sendMessage
调用。
let message = hydra.createUMFMessage({ to: 'test-service:/', from: 'blue-service:/', body: { fileData: '{base64}' } }); hydra.sendMessage(message);
第一个参数是要向其发送消息的服务的名称,第二个参数是包含消息的 UMF 格式的对象。
使用 sendMessage
时,会将消息发送到随机选择的可用服务实例。如果您需要指定特定实例,则可以使用其唯一的服务 ID 来简单地对服务进行寻址。这显示在下面的 “to”
消息字段中。
let message = hydra.createUMFMessage({ to: 'cef54f47984626c9efbf070c50bfad1b@test-service:/', from: 'blue-service:/', body: { fileData: '{base64}' } }); hydra.sendMessage(message);
您可以通过 getInstanceID()
或 getServicePresence()
方法获得服务的唯一ID。
如果也需要,可以使用 sendBroadcastMessage
方法将消息发送到服务的所有可用实例。
警告:虽然,您可以使用 sendMessage
发送和响应消息 - 建议您在回复时使用 sendReplyMessage
。这样做的原因是 sendReplyMessage
使用源消息正确填写健壮消息传递所需的 UMF
字段。这包括使用源 mid
、for
、to
、from UMF
字段来制定回复消息。
您的服务可以通过将侦听器添加到已加载的 hydra 实例来接收消息。下面的示例演示了如何在必要时制定响应。
hydra.registerService(); hydra.on('message', function(message) { // message will be a UMF formatted object console.log(`Received object message: ${msg.mid}: ${JSON.stringify(msg)}`); // to send a reply message here or elsewhere in your service use the `sendReplyMessage` call. hydra.sendReplyMessage(message, hydra.createUMFMessage({ body: { // response items } })); });
UMF messaging(UMF 消息传递
)
在前面的示例中,我们使用了一个 UMF
样式的消息,它是由 Hydra createUMFMessage
方法创建的。 UMF
是 Universal Message Format
的首字母缩写,是为可路由和可排队的消息传递而设计的轻量级消息传递协议。
UMF 允许您有选择地指定将一条消息发送到一个服务, 然后依次将消息和/或(and/or
)其他结果发送到另一个服务。这样,流程可以跨服务链接在一起。
让我们通过看看 createUMFMessage
实际上做了什么来揭开 UMF
的神秘面纱。
首先,该方法接受一个 message
对象。在这个对象中需要三个字段:
{ "to":'serviceName', "from": 'sending-entity-name', "body": {} }
createUMFMessage 方法采用该对象,并返回一个带有附加字段的新对象:
{ "mid": "02d7e85b-5609-4179-b3af-fee60efc8ef0", "timestamp": "2016-03-28T15:40:05.820Z", "version": "UMF/1.2", "priority": "normal", "type": "msg", "to": "filewatcher", "from": "hydramcp", "body": { "actions": [ "restart", "processBatch" ] } }
附加字段由 UMF
规范定义,并帮助 Hydra
和其他分布式系统处理消息。
createUMFMessage
帮助程序方法有助于确保我们从格式正确的 UMF
兼容消息开始,并可以对其进行进一步扩展。
例如,在这里我们可以在将消息传递给 makeAPIRequest
方法之前更改消息的优先级(priority
)和类型(type
)。
message.priority = 'high'; message.type = 'service:control';
需要注意的是,我们可以将优先级(priority
)和类型(type
)字段添加到传递给 createUMFMessage
的原始消息中。该方法将使用您提供的字段来覆盖它在默认情况下创建的字段。因此,重要的是不要随意重写 mid
或 timestamp
。
注意:有关 UMF
规范的详细信息,请访问:Universal Messaging Format
Hydra 消息队列
当涉及到消息传递和队列时,重要的是要考虑应用程序需要的底层交付保证的级别。Hydra 提供了“基本的”消息传递和排队功能,但并不打算替代 MQTT
、Rabbit
和 Kafka
等服务器。因此,Hydra
并没有提供那些系统所具备的许多功能。
因此,接下来是对 Hydra “does” 提供的功能的解释。
像大多数 Hydra 一样,Hydra 排队依赖于内置在 Redis 中的功能。Hydra 使用了一种文档化的原子消息队列模式,这种模式在 Redis 用户中很流行。Redis 的 rpush
、rpoplpush
和 lrem
函数用于管理代表队列的列表结构中的消息状态。这只是一些背景知识,不必担心,因为 Hydra 的目标是简化这些问题。
Hydra 排队通过将消息排队到现有服务的消息队列来工作。这意味着 Hydra 没有所有微服务都可以使用的共享队列的概念。相反,任何排队的消息都被放置在特定服务的消息队列中。
为了进一步探索这一点,让我们想象一个创建和发送电子邮件的 email-service
。
任何其他想要发送电子邮件的微服务都可以向 email-service
发送消息。
这样的信息可能是这样的:
{ "to": "email-service:/", "mid": "2cae7508-c459-4794-86c6-42eb78f32573", "ts": "2018-02-16T13:34:51.540Z", "ver": "UMF/1.4.6", "bdy": { "to": "carlos.justiniano@gmail.com", "from": "accouting@xyzcorp.com", "htmlBody": "some html markup" } }
该消息可以从(比方说) accounting service
发送到 email-service
, 后者依次将消息排成队列等待最终的传递。
让我们根据我们的电子邮件示例来考虑 Hydra 的消息队列功能。
queueMessage
accounting-service
将使用 hydra queueMessage
函数在 email-service
队列中放置一条消息。实际的消息与我们之前看到的消息类似。
当 queueMessage
函数接收到 UMF
消息时,它将使用 to
字段的值并对其进行解析以提取服务名称。在我们这里的例子中,这就是电子邮件服务。服务名称在内部用于确定将消息放入哪个队列。hydra 源代码内部的外观显示,消息位于名为
hydra:service::{serviceName}:mqrecieved
的 Redis 列表中。key
的最后一部分是已接收(mqrecieved
)队列。以后再说。
/** * @name queueMessage * @summary Queue a message * @param {object} message - UMF message to queue * @return {promise} promise - resolving to the message that was queued or a rejection. */ queueMessage(message)
getQueueMessage
通过将电子邮件放置在电子邮件服务的 mqrecieved
队列中,该服务现在能够提取一条消息并开始对其进行处理。
为此,我们的 email-service
使用服务名称简单地调用了 hydra getQueuedMessage
。现在,这是一个重要的考虑因素。任何服务都可以调用 getQueuedMessage
并提供另一个服务的名称来帮助该服务处理消息!不建议这样做 - 但是可以的。它是为“知道自己在做什么”的开发人员设计的。在我们的例子中,我们的电子邮件服务将仅使用 getQueuedMessage('email-service')
来检索 accounting service
排队的消息。
/** * @name getQueuedMessage * @summary retrieve a queued message * @param {string} serviceName who's queue might provide a message * @return {promise} promise - resolving to the message that was dequeued or a rejection. */ getQueuedMessage(serviceName)
现在,您可能想知道,当我们有多个 email-service
实例时, 每个实例都在检查电子邮件队列中是否有排队的电子邮件,该怎么办?那不会导致重复的消息处理吗?
答案是否定的。因为 getQueuedMessage()
是原子的,对它的多次调用不会返回相同的消息。因此,多个服务实例可以同时尝试提取消息,但其中只有一个会接收到给定的消息。 Hydra
使用 Redis rpoplpush
函数实现了这一点。其工作方式是从 mqrecieved
队列中读取一条消息,并将其放置在 mqinprogress
队列中。因此,对 getQueuedMessage
的下一个调用将不会在接收到的队列(received queue
)中看到原始消息,因为它已被移动到进程队列(process queue
)中。同样,这只是实现细节,而不是你需要担心的东西。
因此,一旦我们的电子邮件服务实例(email-service
)构造并发送电子邮件, 它就将排队的消息标记为已成功处理。
markQueueMessage
因此,我们的电子邮件服务(email service
)调用 markQueueMessage(message, completed, reason)
来发送实际的消息,后面跟着一个 completed
(true
或false
)和一个可选的 reason
字符串。
/** * @name markQueueMessage * @summary Mark a queued message as either completed or not * @param {object} message - message in question * @param {boolean} completed - (true / false) * @param {string} reason - if not completed this is the reason processing failed * @return {promise} promise - resolving to the message that was dequeued or a rejection. */ markQueueMessage(message, completed, reason)
如果我们的电子邮件服务(email service
)无法发送消息, 则可以调用 markQueueMessage
时,让参数 completed
为 false
。这将导致该消息被重新排队以尝试其他服务。
reason
字段用于指示为什么消息被标记为已完成(completed
)或未完成(incompleted
)。
将消息标记为已完成(true
)将从 mqinprogress
队列中删除该消息。
提示和技巧
如前所述,Hydra
消息队列是最基本的功能, 但由于 Redis
的支持,它的功能也非常强大,速度也非常快。
考虑到对 Redis
的依赖,重要的是不要创建大型排队消息, 并且 Redis
的性能会受到大规模影响。解决此问题的一种方法是将一条小消息排队,该消息指向一条数据库记录或文件系统存储。
我们使用的一个很好的技巧是将一个服务队列消息(service queue messages
)放入它自己的队列中。其用法如下……假设一个服务接收到一个不能或不需要立即处理的请求。服务可以通过将消息发送给自己来对消息进行排队,以便稍后进行处理。因为服务的其他实例可能正在检查队列,所以另一个服务将接收消息并处理它。这让我想起了排球比赛,一个地方把球推到空中,让另一个球员把球猛击过网。
如果您需要比 Hydra 提供的更多的消息队列相关功能,可以考虑使用 Kue。或者是广泛使用的完善的消息传递队列系统之一。
Hydra 网络
Hydra 支持许多联网选项。本节将探讨可用的选项以及您何时要使用它们。在以下示例中,我们将使用 Hydra-router
实例 中的 config.json
文件 - 但该配置可能来自任何其他启用了hydra 的应用程序。
{ "externalRoutes": {}, "routerToken": "", "disableRouterEndpoint": false, "debugLogging": true, "queuerDB": 3, "requestTimeout": 30, "hydra": { "serviceName": "hydra-router", "serviceDescription": "Service Router", "serviceIP": "", "servicePort": "80", "serviceType": "router", "plugins": { "logger": { "logRequests": false, "toConsole": false, "noFile": true, "redact": [ "password" ], "elasticsearch": { "host": "", "port": 9200, "index": "hydra", "rotate": "daily" } } }, "redis": { "url": "redis://prod.p45rev.ng.0001.usw2.cache.amazonaws.com:6379/15" } } }
在上面的 config.json
文件中, 我们主要对 hydra.serviceIP
和 hydra.servicePort
字段感兴趣。
servicePort
允许您指定想要 hydra 监听的 IP 端口。在上面的示例中,hydraRouter
正在监听端口 80。如果您未指定 servicePort
(例如,如果 servicePort
为空), 那么 hydra 将选择一个大于 1024 的随机非特权端口。 servicePort
字段还支持指定端口范围。
在此示例中,将从 3000
到 4000
中选择一个随机服务端口。
"servicePort": "3000-4000"
另外,如果 hydra
检测到某个随机端口已在使用中,它将尝试使用指定范围内的另一个端口。
让我们关注 serviceIP
字段,如果该字段为空,hydra 将选择它找到的第一个 IPv4 地址。如果该字段包含IP地址(例如192.168.1.18
),那么 hydra 将使用该地址。如果该字段包含文本,但不是有效的IP地址,则 hydra 假定您已指定 DNS 名称。
Hydra 启动时,它将查看所有可用的网络接口。启动 Hydra-router
时,我们可以看到这一点。
_ _ _ ____ _ | | | |_ _ __| |_ __ __ _ | _ \ ___ _ _| |_ ___ _ __ | |_| | | | |/ _` | '__/ _` | | |_) / _ \| | | | __/ _ \ '__| | _ | |_| | (_| | | | (_| | | _ < (_) | |_| | || __/ | |_| |_|\__, |\__,_|_| \__,_| |_| \_\___/ \__,_|\__\___|_| |___/ Starting service hydra-router:1.4.18 on 10.255.0.13:80 Detected IPv4 IPs: * lo: 127.0.0.1 255.0.0.0 * eth0: 10.255.0.13 255.255.0.0 * eth0: 10.255.0.12 255.255.255.255 * eth1: 172.18.0.3 255.255.0.0 * eth2: 10.0.9.3 255.255.255.0 * eth2: 10.0.9.2 255.255.255.255
如果您希望 Hydra 绑定到一个特定的地址, 那么可以通过 serviceInterface key 告诉 Hydra 应该 使用哪个接口(interface
)和网络掩码(network mask
)来标识它应该使用的IP。
您可以在 config.json
文件中使用 interfaceName/interfaceMask
的值执行此操作:
"serviceInterface": "eth2/255.255.255.0",
Hydra Methods(公开导出的方法)
以下是 Hydra
公开导出的方法。
作为一个模块,Hydra 被设计用来隐藏和阻止使用它的内部方法。这有助于确保 Hydra 在越来越多的服务中按照预期的方式运行。
下面的方法列表由以下各节组织。并非所有的应用程序和服务都需要使用列出的所有方法。
- Setup - 模块设置和服务注册
- Discovery - 服务发现
- Presence - 存活状态检查
- Health - 运行状况(健康)检查和日志记录
- Messaging - 消息发送
- Routing - 消息路由
Setup
init
用配置对象初始化 Hydra。
/** * @name init * @summary Initialize Hydra with config object. * @param {object} config - configuration object containing hydra specific keys/values * @return {object} promise - resolving if init success or rejecting otherwise */ init(config)
ready
返回在初始化完成时解析的 promise。
/** * @name ready * @summary returns promise that resolves when initialization is complete * @return {object} promise - resolving if init success or rejecting otherwise */ ready()
shutdown
安全关闭 hydra
。
/** * @name _shutdown * @summary Shutdown hydra safely. */ shutdown()
registerService
将机器注册为 Hydra 实例。
/** * @name registerService * @summary Registers this machine as a Hydra instance. * @description This is an optional call as this module might just be used to monitor and query instances. * @return {object} promise - resolving if registration success or rejecting otherwise */ registerService()
Discovery
getServiceName
检索当前实例的服务名称。
/** * @name getServiceName * @summary Retrieves the service name of the current instance. * @throws Throws an error if this machine isn't a instance. * @return {string} serviceName - returns the service name. */ getServiceName()
getServiceNodes
检索服务列表(即使处于非活动状态)。
/** * @name getServiceNodes * @summary Retrieve a list of services even if inactive. * @return {promise} promise - returns a promise */ getServiceNodes()
getServices
检索可用实例服务的列表。
/** * @name getServices * @summary Retrieve a list of available instance services. * @return {promise} promise - returns a promise which resolves to an array of objects. */ getServices()
findService
查找服务。
/** * @name findService * @summary Find a service. * @param {string} name - service name - note service name is case insensitive * @return {promise} promise - which resolves with service */ findService(name)
Presence
getServicePresence
检索服务/实例的状态信息。
/** * @name getServicePresence * @summary Retrieve a service / instance's presence info. * @param {string} name - service name - note service name is case insensitive * @return {promise} promise - which resolves with service presence */ getServicePresence(name)
hasServicePresence
指示服务是否存在,表示该服务至少在一个节点中运行。
/** * @name hasServicePresence * @summary Indicate if a service has presence. * @description Indicates if a service has presence, meaning the * service is running in at least one node. * @param {string} name - service name - note service name is case insensitive * @return {promise} promise - which resolves with TRUE if presence is found, FALSE otherwise */ hasServicePresence(name)
getInstanceID
返回此进程的实例 id
。
/** * @name getInstanceID * @summary Return the instance id for this process * @return {number} id - instanceID */ getInstanceID()
Health
sendToHealthLog
将消息记录到服务的运行状况日志队列中。
/** * @name sendToHealthLog * @summary Log a message to the service instance's health log queue. * @private * @throws Throws an error if this machine isn't a instance. * @param {string} type - type of message ('error', 'info', 'debug' or user defined) * @param {string} message - message to log */ sendToHealthLog(type, message)
getServiceHealthLog
获取此服务的运行状况日志。
/** * @name getServiceHealthLog * @summary Get this service's health log. * @throws Throws an error if this machine isn't a instance * @param {string} name - name of instance, use getName() if current service is the target. * note service name is case insensitive. * @return {promise} promise - resolves to log entries */ getServiceHealthLog(name)
getHealth
检索服务运行状况信息。
/** * @name getHealth * @summary Retrieve service health info. * @private * @return {object} obj - object containing service info */ getHealth()
getServiceHealthAll
检索所有实例服务的运行状况。
/** * @name getServiceHealthAll * @summary Retrieve the health status of all instance services. * @return {promise} promise - resolves with an array of objects containing instance health information. */ getServiceHealthAll()
Messaging
createUMFMessage
创建一个 UMF
样式消息。
/** * @name createUMFMessage * @summary Create a UMF style message. * @description This is a helper function which helps format a UMF style message. * The caller is responsible for ensuring that required fields such as * "to", "from" and "body" are provided either before or after using * this function. * @param {object} message - optional message overrides. * @return {object} message - a UMF formatted message. */ createUMFMessage(message)
makeAPIRequest
向 hydra 服务发出 API 请求。
/** * @name makeAPIRequest * @summary Makes an API request to a hydra service. * @description If the service isn't present and the message object has its * message.body.fallbackToQueue value set to true, then the * message will be sent to the services message queue. * @param {object} message - UMF formatted message * @return {promise} promise - response from API in resolved promise or * error in rejected promise. */ makeAPIRequest(message)
sendMessage
向 Hydra 服务的所有当前实例发送消息。
/** * @name sendMessage * @summary Sends a message to all present instances of a hydra service. * @param {string | object} message - Plain string or UMF formatted message object * @return {promise} promise - resolved promise if sent or * error in rejected promise. */ sendMessage(message)
sendReplyMessage
根据收到的原始消息发送回复消息。
/** * @name sendReplyMessage * @summary Sends a reply message based on the original message received. * @param {object} originalMessage - UMF formatted message object * @param {object} messageResponse - UMF formatted message object * @return {object} promise - resolved promise if sent or * error in rejected promise. */ sendReplyMessage(originalMessage, messageResponse)
Routing
registerRoutes
注册路由。
/** * @name registerRoutes * @summary Register routes * @note Routes must be formatted as UMF To routes. https://github.com/cjus/umf#%20To%20field%20(routing) * @param {array} routes - array of routes * @return {object} Promise - resolving or rejecting */ registerRoutes(routes)
getAllServiceRoutes
检索所有服务路由。
/** * @name getAllServiceRoutes * @summary Retrieve all service routes. * @return {object} Promise - resolving to an object with keys and arrays of routes */ getAllServiceRoutes()
matchRoute
将路由路径匹配到已注册路由列表。
/** * @name matchRoute * @summary Matches a route path to a list of registered routes * @private * @param {string} routePath - a URL path to match * @return {boolean} match - true if match, false if not */ matchRoute(routePath)
Message queues
queueMessage
排队一个消息
/** * @name queueMessage * @summary Queue a message * @param {object} message - UMF message to queue * @return {promise} promise - resolving to the message that was queued or a rejection. */ queueMessage(message)
getQueuedMessage
检索排队的消息
/** * @name getQueuedMessage * @summary Retrieve a queued message * @param {string} serviceName who's queue might provide a message * @return {promise} promise - resolving to the message that was dequeued or a rejection. */ getQueuedMessage(serviceName)
markQueueMessage
将排队的消息标记为已完成或未完成
/** * @name markQueueMessage * @summary Mark a queued message as either completed or not * @param {object} message - message in question * @param {boolean} completed - (true / false) * @param {string} reason - if not completed this is the reason processing failed * @return {promise} promise - resolving to the message that was dequeued or a rejection. */ markQueueMessage(message, completed, reason)
Hydra Express
Hydra-Express 包使用 Hydra-core,是专门为利用 ExpressJS 的底层功能而设计的。
我们相信这是 ExpressJS 开发人员构建微服务最快最简单的方式。