Python 数学应用(四)(2)https://developer.aliyun.com/article/1506409
处理数据流
一些数据是从各种来源以恒定流的形式接收的。例如,我们可能会遇到多个温度探头通过 Kafka 服务器定期报告值的情况。Kafka 是一个流数据消息代理,根据主题将消息传递给不同的处理代理。
处理流数据是异步 Python 的完美应用。这使我们能够同时处理更大量的数据,这在应用程序中可能非常重要。当然,在异步上下文中我们不能直接对这些数据进行长时间的分析,因为这会干扰事件循环的执行。
使用 Python 的异步编程功能处理 Kafka 流时,我们可以使用 Faust 包。该包允许我们定义异步函数,这些函数将充当处理代理或服务,可以处理或以其他方式与来自 Kafka 服务器的数据流进行交互。
在这个食谱中,我们将学习如何使用 Faust 包来处理来自 Kafka 服务器的数据流。
准备工作
与本书中大多数食谱不同,由于我们将从命令行运行生成的应用程序,因此无法在 Jupyter 笔记本中运行此食谱。
对于这个食谱,我们需要导入 Faust 包:
import faust
我们还需要从 NumPy 包中运行默认随机数生成器的实例:
from numpy.random import default_rng rng = default_rng(12345)
我们还需要在本地机器上运行 Kafka 服务的实例,以便我们的 Faust 应用程序可以与消息代理进行交互。
一旦您下载了 Kafka 并解压了下载的源代码,就导航到 Kafka 应用程序所在的文件夹。在终端中打开此文件夹。使用以下命令启动 ZooKeeper 服务器(适用于 Linux 或 Mac):
bin/zookeeper-server-start.sh config/zookeeper.properties
如果您使用 Windows,改用以下命令:
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
然后,在一个新的终端中,使用以下命令启动 Kafka 服务器(适用于 Linux 或 Mac):
bin/kafka-server-start.sh config/server.properties
如果您使用 Windows,改用以下命令:
bin\windows\kafka-server-start.bat config\server.properties
在每个终端中,您应该看到一些日志信息,指示服务器正在运行。
操作步骤…
按照以下步骤创建一个 Faust 应用程序,该应用程序将读取(和写入)数据到 Kafka 服务器并进行一些简单的处理:
- 首先,我们需要创建一个 Faust
App
实例,它将充当 Python 和 Kafka 服务器之间的接口:
app = faust.App("sample", broker="kafka://localhost")
- 接下来,我们将创建一个记录类型,模拟我们从服务器期望的数据:
class Record(faust.Record): id_string: str value: float
- 现在,我们将向 Faust
App
对象添加一个主题,将值类型设置为我们刚刚定义的Record
类:
topic = app.topic("sample-topic", value_type=Record)
- 现在,我们定义一个代理,这是一个包装在
App
对象上的agent
装饰器的异步函数:
@app.agent(topic) async def process_record(records): async for record in records: print(f"Got {record.id_string}: {record.value}")
- 接下来,我们定义两个源函数,将记录发布到我们设置的样本主题的 Kafka 服务器上。这些是异步函数,包装在
timer
装饰器中,并设置适当的间隔:
@app.timer(interval=1.0) async def producer1(app): await app.send( "sample-topic", value=Record(id_string="producer 1", value= rng.uniform(0, 2)) ) @app.timer(interval=2.0) async def producer2(app): await app.send( "sample-topic", value=Record(id_string="producer 2", value= rng.uniform(0, 5)) )
- 在文件底部,我们启动应用程序的
main
函数:
app.main()
- 现在,在一个新的终端中,我们可以使用以下命令启动应用程序的工作进程(假设我们的应用程序存储在
working-with-data-streams.py
中):
python3.8 working-with-data-streams.py worker
在这个阶段,您应该看到代理生成的一些输出被打印到终端中,如下所示:
[2020-06-21 14:15:27,986] [18762] [WARNING] Got producer 1: 0.4546720449343393 [2020-06-21 14:15:28,985] [18762] [WARNING] Got producer 2: 1.5837916985487643 [2020-06-21 14:15:28,989] [18762] [WARNING] Got producer 1: 1.5947309146654682 [2020-06-21 14:15:29,988] [18762] [WARNING] Got producer 1: 1.3525093415019491
这将是由 Faust 生成的一些应用程序信息的下方。
- 按下Ctrl + C关闭工作进程,并确保以相同的方式关闭 Kafka 服务器和 Zookeeper 服务器。
工作原理…
这是 Faust 应用程序的一个非常基本的示例。通常,我们不会生成记录并通过 Kafka 服务器发送它们,并在同一个应用程序中处理它们。但是,这对于本演示来说是可以的。在生产环境中,我们可能会连接到远程 Kafka 服务器,该服务器连接到多个来源并同时发布到多个不同的主题。
Faust 应用程序控制 Python 代码与 Kafka 服务器之间的交互。我们使用agent
装饰器添加一个函数来处理发布到特定通道的信息。每当新数据被推送到样本主题时,将执行此异步函数。在这个食谱中,我们定义的代理只是将Record
对象中包含的信息打印到终端中。
timer
装饰器定义了一个服务,定期在指定的间隔执行某些操作。在我们的情况下,计时器通过App
对象向 Kafka 服务器发送消息。然后将这些消息推送给代理进行处理。
Faust 命令行界面用于启动运行应用程序的工作进程。这些工作进程实际上是在 Kafka 服务器上或本地进程中对事件做出反应的处理者,例如本示例中定义的定时器服务。较大的应用程序可能会使用多个工作进程来处理大量数据。
此外
Faust 文档提供了有关 Faust 功能的更多详细信息,以及 Faust 的各种替代方案:faust.readthedocs.io/en/latest/
。
有关 Kafka 的更多信息可以在 Apache Kafka 网站上找到:kafka.apache.org/
。
使用 Cython 加速代码
Python 经常因为速度慢而受到批评——这是一个无休止的争论。使用具有 Python 接口的高性能编译库(例如科学 Python 堆栈)可以解决许多这些批评,从而大大提高性能。然而,在某些情况下,很难避免 Python 不是编译语言的事实。在这些(相当罕见的)情况下,改善性能的一种方法是编写 C 扩展(甚至完全重写代码为 C)以加速关键部分。这肯定会使代码运行更快,但可能会使维护软件包变得更加困难。相反,我们可以使用 Cython,这是 Python 语言的扩展,可以转换为 C 并编译以获得更好的性能改进。
例如,我们可以考虑一些用于生成 Mandelbrot 集图像的代码。为了比较,我们假设纯 Python 代码——我们假设这是我们的起点——如下所示:
# mandelbrot/python_mandel.py import numpy as np def in_mandel(cx, cy, max_iter): x = cx y = cy for i in range(max_iter): x2 = x**2 y2 = y**2 if (x2 + y2) >= 4: return i y = 2.0*x*y + cy x = x2 - y2 + cx return max_iter def compute_mandel(N_x, N_y, N_iter): xlim_l = -2.5 xlim_u = 0.5 ylim_l = -1.2 ylim_u = 1.2 x_vals = np.linspace(xlim_l, xlim_u, N_x, dtype=np.float64) y_vals = np.linspace(ylim_l, ylim_u, N_y, dtype=np.float64) height = np.empty((N_x, N_y), dtype=np.int64) for i in range(N_x): for j in range(N_y): height[i, j] = in_mandel(x_vals[i], y_vals[j], N_iter) return height
纯 Python 中这段代码相对较慢的原因是相当明显的:嵌套循环。为了演示目的,让我们假设我们无法使用 NumPy 对这段代码进行矢量化。一些初步测试显示,使用这些函数生成 Mandelbrot 集的 320×240 点和 255 步大约需要 6.3 秒。您的时间可能会有所不同,这取决于您的系统。
在这个示例中,我们将使用 Cython 大大提高前面代码的性能,以生成 Mandelbrot 集图像。
准备工作
对于这个示例,我们需要安装 NumPy 包和 Cython 包。您还需要在系统上安装 GCC 等 C 编译器。例如,在 Windows 上,您可以通过安装 MinGW 来获取 GCC 的版本。
操作步骤
按照以下步骤使用 Cython 大大提高生成 Mandelbrot 集图像的代码性能:
- 在
mandelbrot
文件夹中创建一个名为cython_mandel.pyx
的新文件。在这个文件中,我们将添加一些简单的导入和类型定义:
# mandelbrot/cython_mandel.pyx import numpy as np cimport numpy as np cimport cython ctypedef Py_ssize_t Int ctypedef np.float64_t Double
- 接下来,我们使用 Cython 语法定义
in_mandel
例程的新版本。我们在这个例程的前几行添加了一些声明:
cdef int in_mandel(Double cx, Double cy, int max_iter): cdef Double x = cx cdef Double y = cy cdef Double x2, y2 cdef Int i
- 函数的其余部分与 Python 版本的函数相同:
for i in range(max_iter): x2 = x**2 y2 = y**2 if (x2 + y2) >= 4: return i y = 2.0*x*y + cy x = x2 - y2 + cx return max_iter
- 接下来,我们定义
compute_mandel
函数的新版本。我们向这个函数添加了 Cython 包的两个装饰器:
@cython.boundscheck(False) @cython.wraparound(False) def compute_mandel(int N_x, int N_y, int N_iter):
- 然后,我们像在原始例程中一样定义常量:
cdef double xlim_l = -2.5 cdef double xlim_u = 0.5 cdef double ylim_l = -1.2 cdef double ylim_u = 1.2
- 我们使用 NumPy 包中的
linspace
和empty
例程的方式与 Python 版本完全相同。这里唯一的添加是我们声明了i
和j
变量,它们是Int
类型的:
cdef np.ndarray x_vals = np.linspace(xlim_l, xlim_u, N_x, dtype=np.float64) cdef np.ndarray y_vals = np.linspace(ylim_l, ylim_u, N_y, dtype=np.float64) cdef np.ndarray height = np.empty((N_x, N_y), dtype=np.int64) cdef Int i, j
- 定义的其余部分与 Python 版本完全相同:
for i in range(N_x): for j in range(N_y): height[i, j] = in_mandel(x_vals[i], y_vals[j], N_iter) return height
- 接下来,在
mandelbrot
文件夹中创建一个名为setup.py
的新文件,并将以下导入添加到此文件的顶部:
# mandelbrot/setup.py import numpy as np from setuptools import setup, Extension from Cython.Build import cythonize
- 之后,我们使用指向原始
python_mandel.py
文件的源定义一个扩展模块。将此模块的名称设置为hybrid_mandel
:
hybrid = Extension( "hybrid_mandel", sources=["python_mandel.py"], include_dirs=[np.get_include()], define_macros=[("NPY_NO_DEPRECATED_API", "NPY_1_7_API_VERSION")] )
Python 数学应用(四)(4)https://developer.aliyun.com/article/1506415