用 Python、 RabbitMQ 和 Nameko 实现微服务

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介:

"微服务是一股新浪潮" - 现如今,将项目拆分成多个独立的、可扩展的服务是保障代码演变的最好选择。在 Python 的世界里,有个叫做 “Nameko” 的框架,它将微服务的实现变得简单并且强大。

微服务

在最近的几年里,“微服务架构”如雨后春笋般涌现。它用于描述一种特定的软件应用设计方式,这种方式使得应用可以由多个独立部署的服务以服务套件的形式组成。 - M. Fowler

推荐各位读一下 Fowler 的文章 以理解它背后的原理。

好吧,那它究竟意味着什么呢?

简单来说,微服务架构可以将你的系统拆分成多个负责不同任务的小的(单一上下文内)功能块responsibilities blocks,它们彼此互无感知,各自只提供用于通讯的通用指向common point。这个指向通常是已经将通讯协议和接口定义好的消息队列。

这里给大家提供一个真实案例

案例的代码可以通过 github: http://github.com/rochacbruno/nameko-example 访问,查看 service 和 api 文件夹可以获取更多信息。

想象一下,你有一个 REST API ,这个 API 有一个端点(LCTT 译注:REST 风格的 API 可以有多个端点用于处理对同一资源的不同类型的请求)用来接受数据,并且你需要将接收到的数据进行一些运算工作。那么相比阻塞接口调用者的请求来说,异步实现此接口是一个更好的选择。你可以先给用户返回一个 "OK - 你的请求稍后会处理" 的状态,然后在后台任务中完成运算。

同样,如果你想要在不阻塞主进程的前提下,在计算完成后发送一封提醒邮件,那么将“邮件发送”委托给其他服务去做会更好一些。

场景描述

用代码说话

让我们将系统创建起来,在实践中理解它:

环境

我们需要的环境:

  • 运行良好的 RabbitMQ(LCTT 译注:RabbitMQ 是一个流行的消息队列实现)
  • 由 VirtualEnv 提供的 Services 虚拟环境
  • 由 VirtualEnv 提供的 API 虚拟环境

Rabbit

在开发环境中使用 RabbitMQ 最简单的方式就是运行其官方的 docker 容器。在你已经拥有 Docker 的情况下,运行:


 
 
  1. docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

在浏览器中访问 http://localhost:15672 ,如果能够使用 guest:guest 验证信息登录 RabbitMQ 的控制面板,说明它已经在你的开发环境中运行起来了。

服务环境

现在让我们创建微服务来满足我们的任务需要。其中一个服务用来执行计算任务,另一个用来发送邮件。按以下步骤执行:

在 Shell 中创建项目的根目录


 
 
  1. $ mkdir myproject
  2. $ cd myproject

用 virtualenv 工具创建并且激活一个虚拟环境(你也可以使用 virtualenv-wrapper)


 
 
  1. $ virtualenv service_env
  2. $ source service_env/bin/activate

安装 nameko 框架和 yagmail


 
 
  1. (service_env)$ pip install nameko
  2. (service_env)$ pip install yagmail

服务的代码

现在我们已经准备好了 virtualenv 所提供的虚拟环境(可以想象成我们的服务是运行在一个独立服务器上的,而我们的 API 运行在另一个服务器上),接下来让我们编码,实现 nameko 的 RPC 服务。

我们会将这两个服务放在同一个 python 模块中,当然如果你乐意,也可以把它们放在单独的模块里并且当成不同的服务运行:

在名为 service.py 的文件中


 
 
  1. import yagmail
  2. from nameko.rpc import rpc, RpcProxy
  3. class Mail(object):
  4. name = "mail"
  5. @rpc
  6. def send(self, to, subject, contents):
  7. yag = yagmail.SMTP('myname@gmail.com', 'mypassword')
  8. # 以上的验证信息请从安全的地方进行读取
  9. # 贴士: 可以去看看 Dynaconf 设置模块
  10. yag.send(to=to.encode('utf-8),
  11. subject=subject.encode('utf-8),
  12. contents=[contents.encode('utf-8)])
  13. class Compute(object):
  14. name = "compute"
  15. mail = RpcProxy('mail')
  16. @rpc
  17. def compute(self, operation, value, other, email):
  18. operations = {'sum': lambda x, y: int(x) + int(y),
  19. 'mul': lambda x, y: int(x) * int(y),
  20. 'div': lambda x, y: int(x) / int(y),
  21. 'sub': lambda x, y: int(x) - int(y)}
  22. try:
  23. result = operations[operation](value, other)
  24. except Exception as e:
  25. self.mail.send.async(email, "An error occurred", str(e))
  26. raise
  27. else:
  28. self.mail.send.async(
  29. email,
  30. "Your operation is complete!",
  31. "The result is: %s" % result
  32. )
  33. return result

现在我们已经用以上代码定义好了两个服务,下面让我们将 Nameko RPC service 运行起来。

注意:我们会在控制台中启动并运行它。但在生产环境中,建议大家使用 supervisord 替代控制台命令。

在 Shell 中启动并运行服务


 
 
  1. (service_env)$ nameko run service --broker amqp://guest:guest@localhost
  2. starting services: mail, compute
  3. Connected to amqp://guest:**@127.0.0.1:5672//
  4. Connected to amqp://guest:**@127.0.0.1:5672//

测试

在另外一个 Shell 中(使用相同的虚拟环境),用 nameko shell 进行测试:


 
 
  1. (service_env)$ nameko shell --broker amqp://guest:guest@localhost
  2. Nameko Python 2.7.9 (default, Apr 2 2015, 15:33:21)
  3. [GCC 4.9.2] shell on linux2
  4. Broker: amqp://guest:guest@localhost
  5. >>>

现在你已经处在 RPC 客户端中了,Shell 的测试工作是通过 n.rpc 对象来进行的,它的使用方法如下:


 
 
  1. >>> n.rpc.mail.send("name@email.com", "testing", "Just testing")

上边的代码会发送一封邮件,我们同样可以调用计算服务对其进行测试。需要注意的是,此测试还会附带进行异步的邮件发送。


 
 
  1. >>> n.rpc.compute.compute('sum', 30, 10, "name@email.com")
  2. 40
  3. >>> n.rpc.compute.compute('sub', 30, 10, "name@email.com")
  4. 20
  5. >>> n.rpc.compute.compute('mul', 30, 10, "name@email.com")
  6. 300
  7. >>> n.rpc.compute.compute('div', 30, 10, "name@email.com")
  8. 3

在 API 中调用微服务

在另外一个 Shell 中(甚至可以是另外一台服务器上),准备好 API 环境。

用 virtualenv 工具创建并且激活一个虚拟环境(你也可以使用 virtualenv-wrapper)


 
 
  1. $ virtualenv api_env
  2. $ source api_env/bin/activate

安装 Nameko、 Flask 和 Flasgger


 
 
  1. (api_env)$ pip install nameko
  2. (api_env)$ pip install flask
  3. (api_env)$ pip install flasgger

注意: 在 API 中并不需要 yagmail ,因为在这里,处理邮件是服务的职责

创建含有以下内容的 api.py 文件:


 
 
  1. from flask import Flask, request
  2. from flasgger import Swagger
  3. from nameko.standalone.rpc import ClusterRpcProxy
  4. app = Flask(__name__)
  5. Swagger(app)
  6. CONFIG = {'AMQP_URI': "amqp://guest:guest@localhost"}
  7. @app.route('/compute', methods=['POST'])
  8. def compute():
  9. """
  10. Micro Service Based Compute and Mail API
  11. This API is made with Flask, Flasgger and Nameko
  12. ---
  13. parameters:
  14. - name: body
  15. in: body
  16. required: true
  17. schema:
  18. id: data
  19. properties:
  20. operation:
  21. type: string
  22. enum:
  23. - sum
  24. - mul
  25. - sub
  26. - div
  27. email:
  28. type: string
  29. value:
  30. type: integer
  31. other:
  32. type: integer
  33. responses:
  34. 200:
  35. description: Please wait the calculation, you'll receive an email with results
  36. """
  37. operation = request.json.get('operation')
  38. value = request.json.get('value')
  39. other = request.json.get('other')
  40. email = request.json.get('email')
  41. msg = "Please wait the calculation, you'll receive an email with results"
  42. subject = "API Notification"
  43. with ClusterRpcProxy(CONFIG) as rpc:
  44. # asynchronously spawning and email notification
  45. rpc.mail.send.async(email, subject, msg)
  46. # asynchronously spawning the compute task
  47. result = rpc.compute.compute.async(operation, value, other, email)
  48. return msg, 200
  49. app.run(debug=True)

在其他的 shell 或者服务器上运行此文件


 
 
  1. (api_env) $ python api.py
  2. * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)

然后访问 http://localhost:5000/apidocs/index.html 这个 url,就可以看到 Flasgger 的界面了,利用它可以进行 API 的交互并可以发布任务到队列以供服务进行消费。

注意: 你可以在 shell 中查看到服务的运行日志,打印信息和错误信息。也可以访问 RabbitMQ 控制面板来查看消息在队列中的处理情况。

原文发布时间为:2016-07-17

本文来自云栖社区合作伙伴“Linux中国”

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 存储 Java
RabbitMQ 在微服务架构中的高级应用
【8月更文第28天】在微服务架构中,服务之间需要通过轻量级的通信机制进行交互。其中一种流行的解决方案是使用消息队列,如 RabbitMQ,来实现异步通信和解耦。本文将探讨如何利用 RabbitMQ 作为服务间通信的核心组件,并构建高效的事件驱动架构。
179 2
|
4月前
|
消息中间件 存储 缓存
Python之RabbitMQ操作
Python之RabbitMQ操作
|
1月前
|
存储 算法 安全
FreeMQTT:一款Python语言实现的开源MQTT Server
FreeMQTT 是一款用 Python 语言并基于 Tornado 开发的开源 MQTT 服务器,支持 MQTT3.1.1 和 MQTT5.0 协议,提供多租户安全隔离、高效 Topic 匹配算法及实时上下线通知等功能,适用于 IoT 场景。快速启动仅需克隆仓库、安装依赖并运行服务。
|
4月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
75 0
|
5月前
|
Kubernetes Cloud Native 微服务
探索云原生技术:Kubernetes在微服务架构中的应用Python编程之旅:从基础到进阶
【7月更文挑战第31天】随着云计算技术的迅猛发展,云原生概念应运而生,它代表了一种构建和运行应用程序的全新方式。本文将通过实际代码示例,深入探讨Kubernetes这一云原生关键技术如何在微服务架构中发挥其强大的作用。我们将从容器化开始,逐步过渡到Kubernetes集群的搭建与管理,最后展示如何部署和管理一个微服务应用。
68 2
|
6月前
|
监控 持续交付 数据安全/隐私保护
Python进行微服务架构的监控
【6月更文挑战第16天】
Python进行微服务架构的监控
|
4月前
|
消息中间件 监控 调度
Celery与RabbitMQ的结合【Python】
【8月更文挑战第18天】 Celery与RabbitMQ结合是构建高效Python分布式系统的利器。Celery作为分布式任务队列,支持任务调度与结果管理;RabbitMQ则确保了消息的可靠传递。二者联用不仅提升了系统的异步处理能力,还增强了其扩展性与可靠性。通过简单的安装与配置,即可实现任务的异步执行与调度,同时利用监控工具优化性能并确保安全性。这种组合适用于需要处理大量异步任务的应用场景,极大地简化了分布式系统的设计与实现。
|
5月前
|
消息中间件 存储 缓存
架构设计篇问题之消息队列(MQ)在微服务系统中问题如何解决
架构设计篇问题之消息队列(MQ)在微服务系统中问题如何解决
|
6月前
|
消息中间件 监控 调度
构建Python中的分布式系统结合Celery与RabbitMQ
在当今的软件开发中,构建高效的分布式系统是至关重要的。Python作为一种流行的编程语言,提供了许多工具和库来帮助开发人员构建分布式系统。其中,Celery和RabbitMQ是两个强大的工具,它们结合在一起可以为你的Python应用程序提供可靠的异步任务队列和消息传递机制。