Python 数学应用(四)(3)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,182元/月
云原生网关 MSE Higress,422元/月
简介: Python 数学应用(四)

Python 数学应用(四)(2)https://developer.aliyun.com/article/1506409

处理数据流

一些数据是从各种来源以恒定流的形式接收的。例如,我们可能会遇到多个温度探头通过 Kafka 服务器定期报告值的情况。Kafka 是一个流数据消息代理,根据主题将消息传递给不同的处理代理。

处理流数据是异步 Python 的完美应用。这使我们能够同时处理更大量的数据,这在应用程序中可能非常重要。当然,在异步上下文中我们不能直接对这些数据进行长时间的分析,因为这会干扰事件循环的执行。

使用 Python 的异步编程功能处理 Kafka 流时,我们可以使用 Faust 包。该包允许我们定义异步函数,这些函数将充当处理代理或服务,可以处理或以其他方式与来自 Kafka 服务器的数据流进行交互。

在这个食谱中,我们将学习如何使用 Faust 包来处理来自 Kafka 服务器的数据流。

准备工作

与本书中大多数食谱不同,由于我们将从命令行运行生成的应用程序,因此无法在 Jupyter 笔记本中运行此食谱。

对于这个食谱,我们需要导入 Faust 包:

import faust

我们还需要从 NumPy 包中运行默认随机数生成器的实例:

from numpy.random import default_rng
rng = default_rng(12345)

我们还需要在本地机器上运行 Kafka 服务的实例,以便我们的 Faust 应用程序可以与消息代理进行交互。

一旦您下载了 Kafka 并解压了下载的源代码,就导航到 Kafka 应用程序所在的文件夹。在终端中打开此文件夹。使用以下命令启动 ZooKeeper 服务器(适用于 Linux 或 Mac):

bin/zookeeper-server-start.sh config/zookeeper.properties

如果您使用 Windows,改用以下命令:

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

然后,在一个新的终端中,使用以下命令启动 Kafka 服务器(适用于 Linux 或 Mac):

bin/kafka-server-start.sh config/server.properties

如果您使用 Windows,改用以下命令:

bin\windows\kafka-server-start.bat config\server.properties

在每个终端中,您应该看到一些日志信息,指示服务器正在运行。

操作步骤…

按照以下步骤创建一个 Faust 应用程序,该应用程序将读取(和写入)数据到 Kafka 服务器并进行一些简单的处理:

  1. 首先,我们需要创建一个 FaustApp实例,它将充当 Python 和 Kafka 服务器之间的接口:
app = faust.App("sample", broker="kafka://localhost")
  1. 接下来,我们将创建一个记录类型,模拟我们从服务器期望的数据:
class Record(faust.Record):
    id_string: str
    value: float
  1. 现在,我们将向 FaustApp对象添加一个主题,将值类型设置为我们刚刚定义的Record类:
topic = app.topic("sample-topic", value_type=Record)
  1. 现在,我们定义一个代理,这是一个包装在App对象上的agent装饰器的异步函数:
@app.agent(topic)
async def process_record(records):
    async for record in records:
        print(f"Got {record.id_string}: {record.value}")
  1. 接下来,我们定义两个源函数,将记录发布到我们设置的样本主题的 Kafka 服务器上。这些是异步函数,包装在timer装饰器中,并设置适当的间隔:
@app.timer(interval=1.0)
async def producer1(app):
    await app.send(
        "sample-topic",
        value=Record(id_string="producer 1", value=
            rng.uniform(0, 2))
    )
@app.timer(interval=2.0)
async def producer2(app):
    await app.send(
        "sample-topic",
        value=Record(id_string="producer 2", value=
            rng.uniform(0, 5))
    )
  1. 在文件底部,我们启动应用程序的main函数:
app.main()
  1. 现在,在一个新的终端中,我们可以使用以下命令启动应用程序的工作进程(假设我们的应用程序存储在working-with-data-streams.py中):
python3.8 working-with-data-streams.py worker

在这个阶段,您应该看到代理生成的一些输出被打印到终端中,如下所示:

[2020-06-21 14:15:27,986] [18762] [WARNING] Got producer 1: 0.4546720449343393 
[2020-06-21 14:15:28,985] [18762] [WARNING] Got producer 2: 1.5837916985487643 
[2020-06-21 14:15:28,989] [18762] [WARNING] Got producer 1: 1.5947309146654682 
[2020-06-21 14:15:29,988] [18762] [WARNING] Got producer 1: 1.3525093415019491

这将是由 Faust 生成的一些应用程序信息的下方。

  1. 按下Ctrl + C关闭工作进程,并确保以相同的方式关闭 Kafka 服务器和 Zookeeper 服务器。

工作原理…

这是 Faust 应用程序的一个非常基本的示例。通常,我们不会生成记录并通过 Kafka 服务器发送它们,并在同一个应用程序中处理它们。但是,这对于本演示来说是可以的。在生产环境中,我们可能会连接到远程 Kafka 服务器,该服务器连接到多个来源并同时发布到多个不同的主题。

Faust 应用程序控制 Python 代码与 Kafka 服务器之间的交互。我们使用agent装饰器添加一个函数来处理发布到特定通道的信息。每当新数据被推送到样本主题时,将执行此异步函数。在这个食谱中,我们定义的代理只是将Record对象中包含的信息打印到终端中。

timer装饰器定义了一个服务,定期在指定的间隔执行某些操作。在我们的情况下,计时器通过App对象向 Kafka 服务器发送消息。然后将这些消息推送给代理进行处理。

Faust 命令行界面用于启动运行应用程序的工作进程。这些工作进程实际上是在 Kafka 服务器上或本地进程中对事件做出反应的处理者,例如本示例中定义的定时器服务。较大的应用程序可能会使用多个工作进程来处理大量数据。

此外

Faust 文档提供了有关 Faust 功能的更多详细信息,以及 Faust 的各种替代方案:faust.readthedocs.io/en/latest/

有关 Kafka 的更多信息可以在 Apache Kafka 网站上找到:kafka.apache.org/

使用 Cython 加速代码

Python 经常因为速度慢而受到批评——这是一个无休止的争论。使用具有 Python 接口的高性能编译库(例如科学 Python 堆栈)可以解决许多这些批评,从而大大提高性能。然而,在某些情况下,很难避免 Python 不是编译语言的事实。在这些(相当罕见的)情况下,改善性能的一种方法是编写 C 扩展(甚至完全重写代码为 C)以加速关键部分。这肯定会使代码运行更快,但可能会使维护软件包变得更加困难。相反,我们可以使用 Cython,这是 Python 语言的扩展,可以转换为 C 并编译以获得更好的性能改进。

例如,我们可以考虑一些用于生成 Mandelbrot 集图像的代码。为了比较,我们假设纯 Python 代码——我们假设这是我们的起点——如下所示:

# mandelbrot/python_mandel.py
import numpy as np
def in_mandel(cx, cy, max_iter):
    x = cx
    y = cy
    for i in range(max_iter):
        x2 = x**2
        y2 = y**2
        if (x2 + y2) >= 4:
            return i
        y = 2.0*x*y + cy
        x = x2 - y2 + cx
    return max_iter
def compute_mandel(N_x, N_y, N_iter):
    xlim_l = -2.5
    xlim_u = 0.5
    ylim_l = -1.2
    ylim_u = 1.2
    x_vals = np.linspace(xlim_l, xlim_u, N_x, dtype=np.float64)
    y_vals = np.linspace(ylim_l, ylim_u, N_y, dtype=np.float64)
    height = np.empty((N_x, N_y), dtype=np.int64)
    for i in range(N_x):
        for j in range(N_y):
            height[i, j] = in_mandel(x_vals[i], y_vals[j], N_iter)
    return height

纯 Python 中这段代码相对较慢的原因是相当明显的:嵌套循环。为了演示目的,让我们假设我们无法使用 NumPy 对这段代码进行矢量化。一些初步测试显示,使用这些函数生成 Mandelbrot 集的 320×240 点和 255 步大约需要 6.3 秒。您的时间可能会有所不同,这取决于您的系统。

在这个示例中,我们将使用 Cython 大大提高前面代码的性能,以生成 Mandelbrot 集图像。

准备工作

对于这个示例,我们需要安装 NumPy 包和 Cython 包。您还需要在系统上安装 GCC 等 C 编译器。例如,在 Windows 上,您可以通过安装 MinGW 来获取 GCC 的版本。

操作步骤

按照以下步骤使用 Cython 大大提高生成 Mandelbrot 集图像的代码性能:

  1. mandelbrot文件夹中创建一个名为cython_mandel.pyx的新文件。在这个文件中,我们将添加一些简单的导入和类型定义:
# mandelbrot/cython_mandel.pyx
import numpy as np
cimport numpy as np
cimport cython
ctypedef Py_ssize_t Int
ctypedef np.float64_t Double
  1. 接下来,我们使用 Cython 语法定义in_mandel例程的新版本。我们在这个例程的前几行添加了一些声明:
cdef int in_mandel(Double cx, Double cy, int max_iter):
    cdef Double x = cx
    cdef Double y = cy
    cdef Double x2, y2
    cdef Int i
  1. 函数的其余部分与 Python 版本的函数相同:
for i in range(max_iter):
        x2 = x**2
        y2 = y**2
        if (x2 + y2) >= 4:
            return i
        y = 2.0*x*y + cy
        x = x2 - y2 + cx
    return max_iter
  1. 接下来,我们定义compute_mandel函数的新版本。我们向这个函数添加了 Cython 包的两个装饰器:
@cython.boundscheck(False)
@cython.wraparound(False)
def compute_mandel(int N_x, int N_y, int N_iter):
  1. 然后,我们像在原始例程中一样定义常量:
cdef double xlim_l = -2.5
    cdef double xlim_u = 0.5
    cdef double ylim_l = -1.2
    cdef double ylim_u = 1.2
  1. 我们使用 NumPy 包中的linspaceempty例程的方式与 Python 版本完全相同。这里唯一的添加是我们声明了ij变量,它们是Int类型的:
cdef np.ndarray x_vals = np.linspace(xlim_l, xlim_u, 
        N_x, dtype=np.float64)
    cdef np.ndarray y_vals = np.linspace(ylim_l, ylim_u, 
        N_y, dtype=np.float64)
    cdef np.ndarray height = np.empty((N_x, N_y), dtype=np.int64)
    cdef Int i, j
  1. 定义的其余部分与 Python 版本完全相同:
for i in range(N_x):
        for j in range(N_y):
            height[i, j] = in_mandel(x_vals[i], y_vals[j], N_iter)
    return height
  1. 接下来,在mandelbrot文件夹中创建一个名为setup.py的新文件,并将以下导入添加到此文件的顶部:
# mandelbrot/setup.py
import numpy as np
from setuptools import setup, Extension
from Cython.Build import cythonize
  1. 之后,我们使用指向原始python_mandel.py文件的源定义一个扩展模块。将此模块的名称设置为hybrid_mandel
hybrid = Extension(
    "hybrid_mandel",
    sources=["python_mandel.py"],
    include_dirs=[np.get_include()],
    define_macros=[("NPY_NO_DEPRECATED_API", 
       "NPY_1_7_API_VERSION")]
)

Python 数学应用(四)(4)https://developer.aliyun.com/article/1506415

相关文章
|
20天前
|
监控 数据可视化 数据挖掘
Python Rich库使用指南:打造更美观的命令行应用
Rich库是Python的终端美化利器,支持彩色文本、智能表格、动态进度条和语法高亮,大幅提升命令行应用的可视化效果与用户体验。
74 0
|
2月前
|
数据采集 监控 Java
Python 函数式编程的执行效率:实际应用中的权衡
Python 函数式编程的执行效率:实际应用中的权衡
207 102
|
21天前
|
机器学习/深度学习 算法 安全
【强化学习应用(八)】基于Q-learning的无人机物流路径规划研究(Python代码实现)
【强化学习应用(八)】基于Q-learning的无人机物流路径规划研究(Python代码实现)
|
1月前
|
设计模式 缓存 运维
Python装饰器实战场景解析:从原理到应用的10个经典案例
Python装饰器是函数式编程的精华,通过10个实战场景,从日志记录、权限验证到插件系统,全面解析其应用。掌握装饰器,让代码更优雅、灵活,提升开发效率。
94 0
|
2月前
|
数据采集 存储 数据可视化
Python网络爬虫在环境保护中的应用:污染源监测数据抓取与分析
在环保领域,数据是决策基础,但分散在多个平台,获取困难。Python网络爬虫技术灵活高效,可自动化抓取空气质量、水质、污染源等数据,实现多平台整合、实时更新、结构化存储与异常预警。本文详解爬虫实战应用,涵盖技术选型、代码实现、反爬策略与数据分析,助力环保数据高效利用。
125 0
|
2月前
|
存储 程序员 数据处理
Python列表基础操作全解析:从创建到灵活应用
本文深入浅出地讲解了Python列表的各类操作,从创建、增删改查到遍历与性能优化,内容详实且贴近实战,适合初学者快速掌握这一核心数据结构。
191 0
|
2月前
|
中间件 机器人 API
Python多态实战:从基础到高阶的“魔法”应用指南
Python多态机制通过“鸭子类型”实现灵活接口,使不同对象统一调用同一方法,自动执行各自行为。它简化代码逻辑、提升扩展性,适用于数据处理、策略切换、接口适配等场景。掌握多态思维,能有效减少冗余判断,使程序更优雅、易维护。
130 0
|
2月前
|
存储 监控 安全
Python剪贴板监控实战:clipboard-monitor库的深度解析与扩展应用
本文介绍了基于Python的剪贴板监控技术,结合clipboard-monitor库实现高效、安全的数据追踪。内容涵盖技术选型、核心功能开发、性能优化及实战应用,适用于安全审计、自动化办公等场景,助力提升数据管理效率与安全性。
113 0
|
3月前
|
存储 监控 安全
Python剪贴板监控实战:clipboard-monitor库的深度解析与扩展应用
本文介绍如何利用Python的clipboard-monitor库实现剪贴板监控系统,涵盖文本与图片的实时监听、防重复存储、GUI界面开发及数据加密等核心技术,适用于安全审计与自动化办公场景。
114 0
|
3月前
|
数据采集 API 调度
Python爬虫框架对比:Scrapy vs Requests在API调用中的应用
本文对比了 Python 中 Scrapy 与 Requests 两大爬虫框架在 API 调用中的差异,涵盖架构设计、调用模式、性能优化及适用场景,并提供实战建议,助力开发者根据项目需求选择合适工具。

热门文章

最新文章

推荐镜像

更多