[python]使用diagrams绘制架构图

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 Tair(兼容Redis),内存型 2GB
简介: [python]使用diagrams绘制架构图

简介

diagrams是python的一个第三方库,用于实现使用代码绘制架构图。

安装

依赖于 Graphviz,安装diagrams之前需要先安装 Graphviz(下载压缩包后,将bin目录添加到系统环境变量Path里即可)。

python3 -m pip install diagrams

快速入门

  • main.py
from diagrams import Diagram
from diagrams.aws.compute import EC2
from diagrams.aws.database import RDS
from diagrams.aws.network import ELB
with Diagram("Web Service", show=False):
    ELB("lb") >> EC2("web") >> RDS("userdb")
  • 运行
python main.py

示例

简单的应用组

from diagrams import Diagram
from diagrams.aws.compute import EC2
from diagrams.aws.database import RDS
from diagrams.aws.network import ELB
with Diagram("Grouped Workers", show=False, direction="TB"):
    ELB("lb") >> [EC2("worker1"),
                  EC2("worker2"),
                  EC2("worker3"),
                  EC2("worker4"),
                  EC2("worker5")] >> RDS("events")

web 服务集群

from diagrams import Cluster, Diagram
from diagrams.aws.compute import ECS
from diagrams.aws.database import ElastiCache, RDS
from diagrams.aws.network import ELB
from diagrams.aws.network import Route53
graph_attr = {
    "bgcolor": "transparent" # 透明背景
}
with Diagram("Web 服务集群", show=False, filename="3", graph_attr=graph_attr):
    dns = Route53("DNS")
    lb = ELB("网关")
    with Cluster("Services"):
        svc_group = [ECS("web1"),
                     ECS("web2"),
                     ECS("web3")]
    with Cluster("数据库集群"):
        db_primary = RDS("主库")
        db_primary - [RDS("只读从库")]
    memcached = ElastiCache("memcached")
    dns >> lb >> svc_group
    svc_group >> db_primary
    svc_group >> memcached

事件处理队列

from diagrams import Cluster, Diagram
from diagrams.aws.compute import ECS, EKS, Lambda
from diagrams.aws.database import Redshift
from diagrams.aws.integration import SQS
from diagrams.aws.storage import S3
with Diagram("事件处理", show=False, filename="4"):
    source = EKS("k8s source")
    with Cluster("事件流"):
        with Cluster("Event Workers"):
            workers = [ECS("worker1"),
                       ECS("worker2"),
                       ECS("worker3")]
        queue = SQS("event 队列")
        with Cluster("处理器"):
            handlers = [Lambda("proc1"),
                        Lambda("proc2"),
                        Lambda("proc3")]
    store = S3("事件存储")
    dw = Redshift("analytics")
    source >> workers >> queue >> handlers
    handlers >> store
    handlers >> dw

消息收集系统

from diagrams import Cluster, Diagram
from diagrams.gcp.analytics import BigQuery, Dataflow, PubSub
from diagrams.gcp.compute import AppEngine, Functions
from diagrams.gcp.database import BigTable
from diagrams.gcp.iot import IotCore
from diagrams.gcp.storage import GCS
with Diagram("Message Collecting", show=False, filename="5"):
    pubsub = PubSub("pubsub")
    with Cluster("Source of Data"):
        [IotCore("core1"),
         IotCore("core2"),
         IotCore("core3")] >> pubsub
    with Cluster("Targets"):
        with Cluster("Data Flow"):
            flow = Dataflow("data flow")
        with Cluster("Data Lake"):
            flow >> [BigQuery("bq"),
                     GCS("storage")]
        with Cluster("Event Driven"):
            with Cluster("Processing"):
                flow >> AppEngine("engine") >> BigTable("bigtable")
            with Cluster("Serverless"):
                flow >> Functions("func") >> AppEngine("appengine")
    pubsub >> flow

k8s中3副本pod

from diagrams import Diagram
from diagrams.k8s.clusterconfig import HPA
from diagrams.k8s.compute import Deployment, Pod, ReplicaSet
from diagrams.k8s.network import Ingress, Service
with Diagram("Exposed Pod with 3 Replicas", show=False, filename="6"):
    net = Ingress("domain.com") >> Service("svc")
    net >> [Pod("pod1"),
            Pod("pod2"),
            Pod("pod3")] << ReplicaSet("rs") << Deployment("dp") << HPA("hpa")

k8s有状态架构

from diagrams import Cluster, Diagram
from diagrams.k8s.compute import Pod, StatefulSet
from diagrams.k8s.network import Service
from diagrams.k8s.storage import PV, PVC, StorageClass
with Diagram("Stateful Architecture", show=False, filename="7"):
    with Cluster("Apps"):
        svc = Service("svc")
        sts = StatefulSet("sts")
        apps = []
        for _ in range(3):
            pod = Pod("pod")
            pvc = PVC("pvc")
            pod - sts - pvc
            apps.append(svc >> pod >> pvc)
    apps << PV("pv") << StorageClass("sc")

高级web服务架构

from diagrams import Cluster, Diagram
from diagrams.onprem.analytics import Spark
from diagrams.onprem.compute import Server
from diagrams.onprem.database import PostgreSQL
from diagrams.onprem.inmemory import Redis
from diagrams.onprem.aggregator import Fluentd
from diagrams.onprem.monitoring import Grafana, Prometheus
from diagrams.onprem.network import Nginx
from diagrams.onprem.queue import Kafka
with Diagram("Advanced Web Service with On-Premise", show=False, filename="8"):
    ingress = Nginx("ingress")
    metrics = Prometheus("metric")
    metrics << Grafana("monitoring")
    with Cluster("Service Cluster"):
        grpcsvc = [
            Server("grpc1"),
            Server("grpc2"),
            Server("grpc3")]
    with Cluster("Sessions HA"):
        primary = Redis("session")
        primary - Redis("replica") << metrics
        grpcsvc >> primary
    with Cluster("Database HA"):
        primary = PostgreSQL("users")
        primary - PostgreSQL("replica") << metrics
        grpcsvc >> primary
    aggregator = Fluentd("logging")
    aggregator >> Kafka("stream") >> Spark("analytics")
    ingress >> grpcsvc >> aggregator

高级web服务架构2

from diagrams import Cluster, Diagram, Edge
from diagrams.onprem.analytics import Spark
from diagrams.onprem.compute import Server
from diagrams.onprem.database import PostgreSQL
from diagrams.onprem.inmemory import Redis
from diagrams.onprem.aggregator import Fluentd
from diagrams.onprem.monitoring import Grafana, Prometheus
from diagrams.onprem.network import Nginx
from diagrams.onprem.queue import Kafka
with Diagram(name="Advanced Web Service with On-Premise (colored)", show=False, filename="9"):
    ingress = Nginx("ingress")
    metrics = Prometheus("metric")
    metrics << Edge(color="firebrick", style="dashed") << Grafana("monitoring")
    with Cluster("Service Cluster"):
        grpcsvc = [
            Server("grpc1"),
            Server("grpc2"),
            Server("grpc3")]
    with Cluster("Sessions HA"):
        primary = Redis("session")
        primary - Edge(color="brown", style="dashed") - Redis("replica") << Edge(label="collect") << metrics
        grpcsvc >> Edge(color="brown") >> primary
    with Cluster("Database HA"):
        primary = PostgreSQL("users")
        primary - Edge(color="brown", style="dotted") - PostgreSQL("replica") << Edge(label="collect") << metrics
        grpcsvc >> Edge(color="black") >> primary
    aggregator = Fluentd("logging")
    aggregator >> Edge(label="parse") >> Kafka("stream") >> Edge(color="black", style="bold") >> Spark("analytics")
    ingress >> Edge(color="darkgreen") << grpcsvc >> Edge(color="darkorange") >> aggregator

使用自定义的图标

from urllib.request import urlretrieve
from diagrams import Cluster, Diagram
from diagrams.aws.database import Aurora
from diagrams.custom import Custom
from diagrams.k8s.compute import Pod
# Download an image to be used into a Custom Node class
rabbitmq_url = "https://jpadilla.github.io/rabbitmqapp/assets/img/icon.png"
rabbitmq_icon = "rabbitmq.png"
urlretrieve(rabbitmq_url, rabbitmq_icon)
with Diagram("Broker Consumers", show=False, filename="10"):
    with Cluster("Consumers"):
        consumers = [
            Pod("worker"),
            Pod("worker"),
            Pod("worker")]
    queue = Custom("Message queue", rabbitmq_icon)
    queue >> consumers >> Aurora("Database")

参考

相关文章
|
2月前
|
运维 负载均衡 安全
深度解析:Python Web前后端分离架构中WebSocket的选型与实现策略
深度解析:Python Web前后端分离架构中WebSocket的选型与实现策略
123 0
|
4月前
|
监控 安全 中间件
Python Django 后端架构开发: 中间件架构设计
Python Django 后端架构开发: 中间件架构设计
51 1
|
6月前
|
监控 持续交付 数据安全/隐私保护
Python进行微服务架构的监控
【6月更文挑战第16天】
76 5
Python进行微服务架构的监控
|
5月前
|
监控 前端开发 JavaScript
构建高效实时应用:Python WebSocket在前后端分离架构中的实践
【7月更文挑战第18天】WebSocket助力实时Web应用,通过一次握手建立持久连接,解决HTTP实时性问题。Python中可用Flask-SocketIO创建WebSocket服务器,前端JavaScript使用Socket.IO库连接。确保安全可采用HTTPS、认证及跨域限制。示例代码展示如何实现双向实时通信。
117 4
|
5月前
|
安全 数据安全/隐私保护 UED
优化用户体验:前后端分离架构下Python WebSocket实时通信的性能考量
【7月更文挑战第17天】前后端分离趋势下,WebSocket成为实时通信的关键,Python有`websockets`等库支持WebSocket服务。与HTTP轮询相比,WebSocket减少延迟,提高响应。连接管理、消息传输效率、并发处理及安全性是性能考量重点。使用WebSocket能优化用户体验,尤其适合社交、游戏等实时场景。开发应考虑场景需求,充分利用WebSocket优势。
164 3
|
5月前
|
机器学习/深度学习 算法 文件存储
使用Python实现深度学习模型:神经架构搜索与自动机器学习
【7月更文挑战第5天】 使用Python实现深度学习模型:神经架构搜索与自动机器学习
88 2
|
5月前
|
运维 负载均衡 前端开发
深度解析:Python Web前后端分离架构中WebSocket的选型与实现策略
【7月更文挑战第16天】Python Web开发中,前后端分离常见于实时通信场景,WebSocket作为全双工协议,常用于此类应用。选型时考虑性能、功能、易用性、社区支持和成本。Flask-SocketIO是实现WebSocket的一个选项,它简化了与Flask的集成。案例展示了如何用Flask-SocketIO创建一个实时聊天室:后端处理消息广播,前端通过Socket.IO库连接并显示消息。此实现策略演示了在Python中实现实时通信的基本步骤。
123 0
|
6月前
|
消息中间件 数据库 微服务
Python进行微服务架构的设计与实现
【6月更文挑战第5天】微服务架构成为软件开发热门,通过拆分小型自治服务提升灵活性、可扩展性和可维护性。Python以其易用性和强大功能,成为实现微服务的理想选择。本文介绍如何利用Python设计和实现微服务,包括: 1. **微服务概述**:解释微服务架构的基本原理,强调松耦合、可伸缩性、灵活性和易维护性等优点。 2. **设计步骤**:确定服务边界、定义接口、实现服务和配置部署。 3. **案例代码**:展示使用Flask实现用户服务和订单服务的简单示例。 4. **代码扩展**:探讨数据持久化、身份验证、异步通信和日志记录等实践。 5. **更多可能性**:讨论服务发现、负载均衡、安全性
98 4
|
6月前
|
Java API Nacos
通过 Python+Nacos实现微服务,细解微服务架构
`shigen`是一名擅长多种编程语言的博主,致力于分享技术成长和认知。他尝试将Python服务构建为微服务架构,模仿Java领域的微服务设计。通过Nacos服务发现和注册,实现了Python Flask应用的微服务化,包括网关、用户中心、鉴权和文档服务。代码示例展示了服务注册、心跳维持、HTTP接口以及网关的代理和认证逻辑。此实现促进了服务安全调用,增强了对数据的保护。通过这种方式,`shigen`揭示了Python+Nacos实现微服务的细节,鼓励读者深入理解微服务工作原理。
484 0
通过 Python+Nacos实现微服务,细解微服务架构
|
7月前
|
架构师 开发工具 C++
最新python--类与面向对象-1,一线互联网架构师360°全方面性能调优
最新python--类与面向对象-1,一线互联网架构师360°全方面性能调优
最新python--类与面向对象-1,一线互联网架构师360°全方面性能调优
下一篇
DataWorks