使用 Hydra 监视服务
HydraMCP web
应用程序演示了如何监视 Hydra
服务。有两种监测方法:
- 读取 hydra 服务写入 Redis 的数据
- 使用 Hydra 方法接收聚合服务(
aggregate service
)数据。
后一种方法被推荐,因为它被认为对未来 Hydra 如何在 Redis 中存储数据的潜在变化更具弹性。
以下方法有助于服务的自省(introspection
)和控制(control
)。
Method | Description |
getServices | 检索已注册服务的列表。 |
findService | 找到特定的服务。 |
getServicePresence | 检索特定服务的存活状态 |
getServiceHealthAll | 检索所有注册服务的健康信息和健康日志。 |
makeAPIRequest | 向命名服务发出API请求。 |
有关 Hydra 功能的完整列表,请参阅本文档的最后部分。
messaging(消息传递)
Hydra 通过以下方式支持服务间通信:
- 发现并直接使用服务器的网络信息(IP和端口)。
- 通过使用
makeAPIRequest
方法。 - 使用服务间(
inter-service
)消息传递。 - 使用服务消息队列(
service message queues
)。
您使用哪种方法取决于您的应用程序的需求和您愿意做的额外工作的数量。使用 Hydra 的消息传递方法抽象了您可能需要处理的网络层功能。因此,它提供了一种更简单、更可靠的与远程服务交互的方式。
发现和直接使用该服务的网络信息很简单:
let apiRoute = '/v1/email/send'; hydra.findService('emailer') .then((service) => { let url = `http://${service.ip}:${service.port}/${apiRoute}`; let options = { headers: { 'content-type': 'application/json', 'Accept': 'application/json; charset=UTF-8' }, method: 'post' }; options.body = emailObject; fetch(url, options) : :
注意:在使用上述方法之前,应该使用 getServicePresence 方法检查服务是否可用。毕竟,我们希望确保服务已注册,并且当前可用。
在这里,使用 Hydra
的 makeAPIRequest
方法变得更容易且更不容易出错。 makeAPIRequest
方法接受一个对象,该对象包含服务的名称以及其他有用但可选的信息。该方法自动处理服务可用性检查,如果该服务暂时不可用,甚至可以将消息(请求)推送到服务的消息队列中。这是可选行为,并假定这对于发送方是可接受的,并且远程服务能够将请求作为排队的消息进行处理。
let message = hydra.createUMFMessage({ to: 'emailer:/v1/email/send', from: 'website:backend', body: { to: 'user@someplace.com', from: 'marketing@company.com', emailBody: 'Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium' fallbackToQueue: true } }); hydra.makeAPIRequest(message) then() :
服务间的消息传递(Inter-service messaging
)
使用 Hydra
,您可以在服务之间发送消息,甚至可以在一系列服务之间路由消息。这是 Hydra-Router
提供的功能之一。
内置消息通道(Built-in message channels
)
每个 hydra 服务都会自动监听两个内置通道,其他服务发送的消息会在其中到达。
一个通道监听发送到某一类型服务的任何消息,另一个通道监听指向特定服务实例的消息。因此,发送到 file-processing
的消息将被该服务的所有实例接收。而发送到5585f53bd1171db38eafd79bf16e02f4@file-processing
的消息只能由 ID
为5585f53bd1171db38eafd79bf16e02f4
的服务实例处理。
要将消息发送到服务,可以使用 sendMessage
调用。
let message = hydra.createUMFMessage({ to: 'test-service:/', from: 'blue-service:/', body: { fileData: '{base64}' } }); hydra.sendMessage(message);
第一个参数是要向其发送消息的服务的名称,第二个参数是包含消息的 UMF 格式的对象。
使用 sendMessage
时,会将消息发送到随机选择的可用服务实例。如果您需要指定特定实例,则可以使用其唯一的服务 ID 来简单地对服务进行寻址。这显示在下面的 “to”
消息字段中。
let message = hydra.createUMFMessage({ to: 'cef54f47984626c9efbf070c50bfad1b@test-service:/', from: 'blue-service:/', body: { fileData: '{base64}' } }); hydra.sendMessage(message);
您可以通过 getInstanceID()
或 getServicePresence()
方法获得服务的唯一ID。
如果也需要,可以使用 sendBroadcastMessage
方法将消息发送到服务的所有可用实例。
警告:虽然,您可以使用 sendMessage
发送和响应消息 - 建议您在回复时使用 sendReplyMessage
。这样做的原因是 sendReplyMessage
使用源消息正确填写健壮消息传递所需的 UMF
字段。这包括使用源 mid
、for
、to
、from UMF
字段来制定回复消息。
您的服务可以通过将侦听器添加到已加载的 hydra 实例来接收消息。下面的示例演示了如何在必要时制定响应。
hydra.registerService(); hydra.on('message', function(message) { // message will be a UMF formatted object console.log(`Received object message: ${msg.mid}: ${JSON.stringify(msg)}`); // to send a reply message here or elsewhere in your service use the `sendReplyMessage` call. hydra.sendReplyMessage(message, hydra.createUMFMessage({ body: { // response items } })); });
UMF messaging(UMF 消息传递
)
在前面的示例中,我们使用了一个 UMF
样式的消息,它是由 Hydra createUMFMessage
方法创建的。 UMF
是 Universal Message Format
的首字母缩写,是为可路由和可排队的消息传递而设计的轻量级消息传递协议。
UMF 允许您有选择地指定将一条消息发送到一个服务, 然后依次将消息和/或(and/or
)其他结果发送到另一个服务。这样,流程可以跨服务链接在一起。
让我们通过看看 createUMFMessage
实际上做了什么来揭开 UMF
的神秘面纱。
首先,该方法接受一个 message
对象。在这个对象中需要三个字段:
{ "to":'serviceName', "from": 'sending-entity-name', "body": {} }
createUMFMessage 方法采用该对象,并返回一个带有附加字段的新对象:
{ "mid": "02d7e85b-5609-4179-b3af-fee60efc8ef0", "timestamp": "2016-03-28T15:40:05.820Z", "version": "UMF/1.2", "priority": "normal", "type": "msg", "to": "filewatcher", "from": "hydramcp", "body": { "actions": [ "restart", "processBatch" ] } }
附加字段由 UMF
规范定义,并帮助 Hydra
和其他分布式系统处理消息。
createUMFMessage
帮助程序方法有助于确保我们从格式正确的 UMF
兼容消息开始,并可以对其进行进一步扩展。
例如,在这里我们可以在将消息传递给 makeAPIRequest
方法之前更改消息的优先级(priority
)和类型(type
)。
message.priority = 'high'; message.type = 'service:control';
需要注意的是,我们可以将优先级(priority
)和类型(type
)字段添加到传递给 createUMFMessage
的原始消息中。该方法将使用您提供的字段来覆盖它在默认情况下创建的字段。因此,重要的是不要随意重写 mid
或 timestamp
。
注意:有关 UMF
规范的详细信息,请访问:Universal Messaging Format
Hydra 消息队列
当涉及到消息传递和队列时,重要的是要考虑应用程序需要的底层交付保证的级别。Hydra 提供了“基本的”消息传递和排队功能,但并不打算替代 MQTT
、Rabbit
和 Kafka
等服务器。因此,Hydra
并没有提供那些系统所具备的许多功能。
因此,接下来是对 Hydra “does” 提供的功能的解释。
像大多数 Hydra 一样,Hydra 排队依赖于内置在 Redis 中的功能。Hydra 使用了一种文档化的原子消息队列模式,这种模式在 Redis 用户中很流行。Redis 的 rpush
、rpoplpush
和 lrem
函数用于管理代表队列的列表结构中的消息状态。这只是一些背景知识,不必担心,因为 Hydra 的目标是简化这些问题。
Hydra 排队通过将消息排队到现有服务的消息队列来工作。这意味着 Hydra 没有所有微服务都可以使用的共享队列的概念。相反,任何排队的消息都被放置在特定服务的消息队列中。
为了进一步探索这一点,让我们想象一个创建和发送电子邮件的 email-service
。
任何其他想要发送电子邮件的微服务都可以向 email-service
发送消息。
这样的信息可能是这样的:
{ "to": "email-service:/", "mid": "2cae7508-c459-4794-86c6-42eb78f32573", "ts": "2018-02-16T13:34:51.540Z", "ver": "UMF/1.4.6", "bdy": { "to": "carlos.justiniano@gmail.com", "from": "accouting@xyzcorp.com", "htmlBody": "some html markup" } }
该消息可以从(比方说) accounting service
发送到 email-service
, 后者依次将消息排成队列等待最终的传递。
让我们根据我们的电子邮件示例来考虑 Hydra 的消息队列功能。
queueMessage
accounting-service
将使用 hydra queueMessage
函数在 email-service
队列中放置一条消息。实际的消息与我们之前看到的消息类似。
当 queueMessage
函数接收到 UMF
消息时,它将使用 to
字段的值并对其进行解析以提取服务名称。在我们这里的例子中,这就是电子邮件服务。服务名称在内部用于确定将消息放入哪个队列。hydra 源代码内部的外观显示,消息位于名为 hydra:service::{serviceName}:mqrecieved
的 Redis 列表中。key
的最后一部分是已接收(mqrecieved
)队列。以后再说。
/** * @name queueMessage * @summary Queue a message * @param {object} message - UMF message to queue * @return {promise} promise - resolving to the message that was queued or a rejection. */ queueMessage(message)
getQueueMessage
通过将电子邮件放置在电子邮件服务的 mqrecieved
队列中,该服务现在能够提取一条消息并开始对其进行处理。
为此,我们的 email-service
使用服务名称简单地调用了 hydra getQueuedMessage
。现在,这是一个重要的考虑因素。任何服务都可以调用 getQueuedMessage
并提供另一个服务的名称来帮助该服务处理消息!不建议这样做 - 但是可以的。它是为“知道自己在做什么”的开发人员设计的。在我们的例子中,我们的电子邮件服务将仅使用 getQueuedMessage('email-service')
来检索 accounting service
排队的消息。
/** * @name getQueuedMessage * @summary retrieve a queued message * @param {string} serviceName who's queue might provide a message * @return {promise} promise - resolving to the message that was dequeued or a rejection. */ getQueuedMessage(serviceName)
现在,您可能想知道,当我们有多个 email-service
实例时, 每个实例都在检查电子邮件队列中是否有排队的电子邮件,该怎么办?那不会导致重复的消息处理吗?
答案是否定的。因为 getQueuedMessage()
是原子的,对它的多次调用不会返回相同的消息。因此,多个服务实例可以同时尝试提取消息,但其中只有一个会接收到给定的消息。 Hydra
使用 Redis rpoplpush
函数实现了这一点。其工作方式是从 mqrecieved
队列中读取一条消息,并将其放置在 mqinprogress
队列中。因此,对 getQueuedMessage
的下一个调用将不会在接收到的队列(received queue
)中看到原始消息,因为它已被移动到进程队列(process queue
)中。同样,这只是实现细节,而不是你需要担心的东西。
因此,一旦我们的电子邮件服务实例(email-service
)构造并发送电子邮件, 它就将排队的消息标记为已成功处理。
markQueueMessage
因此,我们的电子邮件服务(email service
)调用 markQueueMessage(message, completed, reason)
来发送实际的消息,后面跟着一个 completed
(true
或false
)和一个可选的 reason
字符串。
/** * @name markQueueMessage * @summary Mark a queued message as either completed or not * @param {object} message - message in question * @param {boolean} completed - (true / false) * @param {string} reason - if not completed this is the reason processing failed * @return {promise} promise - resolving to the message that was dequeued or a rejection. */ markQueueMessage(message, completed, reason)
如果我们的电子邮件服务(email service
)无法发送消息, 则可以调用 markQueueMessage
时,让参数 completed
为 false
。这将导致该消息被重新排队以尝试其他服务。
reason
字段用于指示为什么消息被标记为已完成(completed
)或未完成(incompleted
)。
将消息标记为已完成(true
)将从 mqinprogress
队列中删除该消息。
提示和技巧
如前所述,Hydra
消息队列是最基本的功能, 但由于 Redis
的支持,它的功能也非常强大,速度也非常快。
考虑到对 Redis
的依赖,重要的是不要创建大型排队消息, 并且 Redis
的性能会受到大规模影响。解决此问题的一种方法是将一条小消息排队,该消息指向一条数据库记录或文件系统存储。
我们使用的一个很好的技巧是将一个服务队列消息(service queue messages
)放入它自己的队列中。其用法如下……假设一个服务接收到一个不能或不需要立即处理的请求。服务可以通过将消息发送给自己来对消息进行排队,以便稍后进行处理。因为服务的其他实例可能正在检查队列,所以另一个服务将接收消息并处理它。这让我想起了排球比赛,一个地方把球推到空中,让另一个球员把球猛击过网。
如果您需要比 Hydra 提供的更多的消息队列相关功能,可以考虑使用 Kue。或者是广泛使用的完善的消息传递队列系统之一。
Hydra 网络
Hydra 支持许多联网选项。本节将探讨可用的选项以及您何时要使用它们。在以下示例中,我们将使用 Hydra-router
实例 中的 config.json
文件 - 但该配置可能来自任何其他启用了hydra 的应用程序。
{ "externalRoutes": {}, "routerToken": "", "disableRouterEndpoint": false, "debugLogging": true, "queuerDB": 3, "requestTimeout": 30, "hydra": { "serviceName": "hydra-router", "serviceDescription": "Service Router", "serviceIP": "", "servicePort": "80", "serviceType": "router", "plugins": { "logger": { "logRequests": false, "toConsole": false, "noFile": true, "redact": [ "password" ], "elasticsearch": { "host": "", "port": 9200, "index": "hydra", "rotate": "daily" } } }, "redis": { "url": "redis://prod.p45rev.ng.0001.usw2.cache.amazonaws.com:6379/15" } } }
在上面的 config.json
文件中, 我们主要对 hydra.serviceIP
和 hydra.servicePort
字段感兴趣。
servicePort
允许您指定想要 hydra 监听的 IP 端口。在上面的示例中,hydraRouter
正在监听端口 80。如果您未指定 servicePort
(例如,如果 servicePort
为空), 那么 hydra 将选择一个大于 1024 的随机非特权端口。 servicePort
字段还支持指定端口范围。
在此示例中,将从 3000
到 4000
中选择一个随机服务端口。
"servicePort": "3000-4000"
另外,如果 hydra
检测到某个随机端口已在使用中,它将尝试使用指定范围内的另一个端口。
让我们关注 serviceIP
字段,如果该字段为空,hydra 将选择它找到的第一个 IPv4 地址。如果该字段包含IP地址(例如192.168.1.18
),那么 hydra 将使用该地址。如果该字段包含文本,但不是有效的IP地址,则 hydra 假定您已指定 DNS 名称。
Hydra 启动时,它将查看所有可用的网络接口。启动 Hydra-router
时,我们可以看到这一点。
_ _ _ ____ _ | | | |_ _ __| |_ __ __ _ | _ \ ___ _ _| |_ ___ _ __ | |_| | | | |/ _` | '__/ _` | | |_) / _ \| | | | __/ _ \ '__| | _ | |_| | (_| | | | (_| | | _ < (_) | |_| | || __/ | |_| |_|\__, |\__,_|_| \__,_| |_| \_\___/ \__,_|\__\___|_| |___/ Starting service hydra-router:1.4.18 on 10.255.0.13:80 Detected IPv4 IPs: * lo: 127.0.0.1 255.0.0.0 * eth0: 10.255.0.13 255.255.0.0 * eth0: 10.255.0.12 255.255.255.255 * eth1: 172.18.0.3 255.255.0.0 * eth2: 10.0.9.3 255.255.255.0 * eth2: 10.0.9.2 255.255.255.255
如果您希望 Hydra 绑定到一个特定的地址, 那么可以通过 serviceInterface key 告诉 Hydra 应该 使用哪个接口(interface
)和网络掩码(network mask
)来标识它应该使用的IP。
您可以在 config.json
文件中使用 interfaceName/interfaceMask
的值执行此操作:
"serviceInterface": "eth2/255.255.255.0",