Python 数学应用(四)(3)

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Python 数学应用(四)

Python 数学应用(四)(2)https://developer.aliyun.com/article/1506409

处理数据流

一些数据是从各种来源以恒定流的形式接收的。例如,我们可能会遇到多个温度探头通过 Kafka 服务器定期报告值的情况。Kafka 是一个流数据消息代理,根据主题将消息传递给不同的处理代理。

处理流数据是异步 Python 的完美应用。这使我们能够同时处理更大量的数据,这在应用程序中可能非常重要。当然,在异步上下文中我们不能直接对这些数据进行长时间的分析,因为这会干扰事件循环的执行。

使用 Python 的异步编程功能处理 Kafka 流时,我们可以使用 Faust 包。该包允许我们定义异步函数,这些函数将充当处理代理或服务,可以处理或以其他方式与来自 Kafka 服务器的数据流进行交互。

在这个食谱中,我们将学习如何使用 Faust 包来处理来自 Kafka 服务器的数据流。

准备工作

与本书中大多数食谱不同,由于我们将从命令行运行生成的应用程序,因此无法在 Jupyter 笔记本中运行此食谱。

对于这个食谱,我们需要导入 Faust 包:

import faust

我们还需要从 NumPy 包中运行默认随机数生成器的实例:

from numpy.random import default_rng
rng = default_rng(12345)

我们还需要在本地机器上运行 Kafka 服务的实例,以便我们的 Faust 应用程序可以与消息代理进行交互。

一旦您下载了 Kafka 并解压了下载的源代码,就导航到 Kafka 应用程序所在的文件夹。在终端中打开此文件夹。使用以下命令启动 ZooKeeper 服务器(适用于 Linux 或 Mac):

bin/zookeeper-server-start.sh config/zookeeper.properties

如果您使用 Windows,改用以下命令:

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

然后,在一个新的终端中,使用以下命令启动 Kafka 服务器(适用于 Linux 或 Mac):

bin/kafka-server-start.sh config/server.properties

如果您使用 Windows,改用以下命令:

bin\windows\kafka-server-start.bat config\server.properties

在每个终端中,您应该看到一些日志信息,指示服务器正在运行。

操作步骤…

按照以下步骤创建一个 Faust 应用程序,该应用程序将读取(和写入)数据到 Kafka 服务器并进行一些简单的处理:

  1. 首先,我们需要创建一个 FaustApp实例,它将充当 Python 和 Kafka 服务器之间的接口:
app = faust.App("sample", broker="kafka://localhost")
  1. 接下来,我们将创建一个记录类型,模拟我们从服务器期望的数据:
class Record(faust.Record):
    id_string: str
    value: float
  1. 现在,我们将向 FaustApp对象添加一个主题,将值类型设置为我们刚刚定义的Record类:
topic = app.topic("sample-topic", value_type=Record)
  1. 现在,我们定义一个代理,这是一个包装在App对象上的agent装饰器的异步函数:
@app.agent(topic)
async def process_record(records):
    async for record in records:
        print(f"Got {record.id_string}: {record.value}")
  1. 接下来,我们定义两个源函数,将记录发布到我们设置的样本主题的 Kafka 服务器上。这些是异步函数,包装在timer装饰器中,并设置适当的间隔:
@app.timer(interval=1.0)
async def producer1(app):
    await app.send(
        "sample-topic",
        value=Record(id_string="producer 1", value=
            rng.uniform(0, 2))
    )
@app.timer(interval=2.0)
async def producer2(app):
    await app.send(
        "sample-topic",
        value=Record(id_string="producer 2", value=
            rng.uniform(0, 5))
    )
  1. 在文件底部,我们启动应用程序的main函数:
app.main()
  1. 现在,在一个新的终端中,我们可以使用以下命令启动应用程序的工作进程(假设我们的应用程序存储在working-with-data-streams.py中):
python3.8 working-with-data-streams.py worker

在这个阶段,您应该看到代理生成的一些输出被打印到终端中,如下所示:

[2020-06-21 14:15:27,986] [18762] [WARNING] Got producer 1: 0.4546720449343393 
[2020-06-21 14:15:28,985] [18762] [WARNING] Got producer 2: 1.5837916985487643 
[2020-06-21 14:15:28,989] [18762] [WARNING] Got producer 1: 1.5947309146654682 
[2020-06-21 14:15:29,988] [18762] [WARNING] Got producer 1: 1.3525093415019491

这将是由 Faust 生成的一些应用程序信息的下方。

  1. 按下Ctrl + C关闭工作进程,并确保以相同的方式关闭 Kafka 服务器和 Zookeeper 服务器。

工作原理…

这是 Faust 应用程序的一个非常基本的示例。通常,我们不会生成记录并通过 Kafka 服务器发送它们,并在同一个应用程序中处理它们。但是,这对于本演示来说是可以的。在生产环境中,我们可能会连接到远程 Kafka 服务器,该服务器连接到多个来源并同时发布到多个不同的主题。

Faust 应用程序控制 Python 代码与 Kafka 服务器之间的交互。我们使用agent装饰器添加一个函数来处理发布到特定通道的信息。每当新数据被推送到样本主题时,将执行此异步函数。在这个食谱中,我们定义的代理只是将Record对象中包含的信息打印到终端中。

timer装饰器定义了一个服务,定期在指定的间隔执行某些操作。在我们的情况下,计时器通过App对象向 Kafka 服务器发送消息。然后将这些消息推送给代理进行处理。

Faust 命令行界面用于启动运行应用程序的工作进程。这些工作进程实际上是在 Kafka 服务器上或本地进程中对事件做出反应的处理者,例如本示例中定义的定时器服务。较大的应用程序可能会使用多个工作进程来处理大量数据。

此外

Faust 文档提供了有关 Faust 功能的更多详细信息,以及 Faust 的各种替代方案:faust.readthedocs.io/en/latest/

有关 Kafka 的更多信息可以在 Apache Kafka 网站上找到:kafka.apache.org/

使用 Cython 加速代码

Python 经常因为速度慢而受到批评——这是一个无休止的争论。使用具有 Python 接口的高性能编译库(例如科学 Python 堆栈)可以解决许多这些批评,从而大大提高性能。然而,在某些情况下,很难避免 Python 不是编译语言的事实。在这些(相当罕见的)情况下,改善性能的一种方法是编写 C 扩展(甚至完全重写代码为 C)以加速关键部分。这肯定会使代码运行更快,但可能会使维护软件包变得更加困难。相反,我们可以使用 Cython,这是 Python 语言的扩展,可以转换为 C 并编译以获得更好的性能改进。

例如,我们可以考虑一些用于生成 Mandelbrot 集图像的代码。为了比较,我们假设纯 Python 代码——我们假设这是我们的起点——如下所示:

# mandelbrot/python_mandel.py
import numpy as np
def in_mandel(cx, cy, max_iter):
    x = cx
    y = cy
    for i in range(max_iter):
        x2 = x**2
        y2 = y**2
        if (x2 + y2) >= 4:
            return i
        y = 2.0*x*y + cy
        x = x2 - y2 + cx
    return max_iter
def compute_mandel(N_x, N_y, N_iter):
    xlim_l = -2.5
    xlim_u = 0.5
    ylim_l = -1.2
    ylim_u = 1.2
    x_vals = np.linspace(xlim_l, xlim_u, N_x, dtype=np.float64)
    y_vals = np.linspace(ylim_l, ylim_u, N_y, dtype=np.float64)
    height = np.empty((N_x, N_y), dtype=np.int64)
    for i in range(N_x):
        for j in range(N_y):
            height[i, j] = in_mandel(x_vals[i], y_vals[j], N_iter)
    return height

纯 Python 中这段代码相对较慢的原因是相当明显的:嵌套循环。为了演示目的,让我们假设我们无法使用 NumPy 对这段代码进行矢量化。一些初步测试显示,使用这些函数生成 Mandelbrot 集的 320×240 点和 255 步大约需要 6.3 秒。您的时间可能会有所不同,这取决于您的系统。

在这个示例中,我们将使用 Cython 大大提高前面代码的性能,以生成 Mandelbrot 集图像。

准备工作

对于这个示例,我们需要安装 NumPy 包和 Cython 包。您还需要在系统上安装 GCC 等 C 编译器。例如,在 Windows 上,您可以通过安装 MinGW 来获取 GCC 的版本。

操作步骤

按照以下步骤使用 Cython 大大提高生成 Mandelbrot 集图像的代码性能:

  1. mandelbrot文件夹中创建一个名为cython_mandel.pyx的新文件。在这个文件中,我们将添加一些简单的导入和类型定义:
# mandelbrot/cython_mandel.pyx
import numpy as np
cimport numpy as np
cimport cython
ctypedef Py_ssize_t Int
ctypedef np.float64_t Double
  1. 接下来,我们使用 Cython 语法定义in_mandel例程的新版本。我们在这个例程的前几行添加了一些声明:
cdef int in_mandel(Double cx, Double cy, int max_iter):
    cdef Double x = cx
    cdef Double y = cy
    cdef Double x2, y2
    cdef Int i
  1. 函数的其余部分与 Python 版本的函数相同:
for i in range(max_iter):
        x2 = x**2
        y2 = y**2
        if (x2 + y2) >= 4:
            return i
        y = 2.0*x*y + cy
        x = x2 - y2 + cx
    return max_iter
  1. 接下来,我们定义compute_mandel函数的新版本。我们向这个函数添加了 Cython 包的两个装饰器:
@cython.boundscheck(False)
@cython.wraparound(False)
def compute_mandel(int N_x, int N_y, int N_iter):
  1. 然后,我们像在原始例程中一样定义常量:
cdef double xlim_l = -2.5
    cdef double xlim_u = 0.5
    cdef double ylim_l = -1.2
    cdef double ylim_u = 1.2
  1. 我们使用 NumPy 包中的linspaceempty例程的方式与 Python 版本完全相同。这里唯一的添加是我们声明了ij变量,它们是Int类型的:
cdef np.ndarray x_vals = np.linspace(xlim_l, xlim_u, 
        N_x, dtype=np.float64)
    cdef np.ndarray y_vals = np.linspace(ylim_l, ylim_u, 
        N_y, dtype=np.float64)
    cdef np.ndarray height = np.empty((N_x, N_y), dtype=np.int64)
    cdef Int i, j
  1. 定义的其余部分与 Python 版本完全相同:
for i in range(N_x):
        for j in range(N_y):
            height[i, j] = in_mandel(x_vals[i], y_vals[j], N_iter)
    return height
  1. 接下来,在mandelbrot文件夹中创建一个名为setup.py的新文件,并将以下导入添加到此文件的顶部:
# mandelbrot/setup.py
import numpy as np
from setuptools import setup, Extension
from Cython.Build import cythonize
  1. 之后,我们使用指向原始python_mandel.py文件的源定义一个扩展模块。将此模块的名称设置为hybrid_mandel
hybrid = Extension(
    "hybrid_mandel",
    sources=["python_mandel.py"],
    include_dirs=[np.get_include()],
    define_macros=[("NPY_NO_DEPRECATED_API", 
       "NPY_1_7_API_VERSION")]
)

Python 数学应用(四)(4)https://developer.aliyun.com/article/1506415

相关文章
|
1月前
|
存储 数据采集 人工智能
Python编程入门:从零基础到实战应用
本文是一篇面向初学者的Python编程教程,旨在帮助读者从零开始学习Python编程语言。文章首先介绍了Python的基本概念和特点,然后通过一个简单的例子展示了如何编写Python代码。接下来,文章详细介绍了Python的数据类型、变量、运算符、控制结构、函数等基本语法知识。最后,文章通过一个实战项目——制作一个简单的计算器程序,帮助读者巩固所学知识并提高编程技能。
|
2月前
|
机器学习/深度学习 Python
堆叠集成策略的原理、实现方法及Python应用。堆叠通过多层模型组合,先用不同基础模型生成预测,再用元学习器整合这些预测,提升模型性能
本文深入探讨了堆叠集成策略的原理、实现方法及Python应用。堆叠通过多层模型组合,先用不同基础模型生成预测,再用元学习器整合这些预测,提升模型性能。文章详细介绍了堆叠的实现步骤,包括数据准备、基础模型训练、新训练集构建及元学习器训练,并讨论了其优缺点。
77 3
|
2月前
|
机器学习/深度学习 算法 数据挖掘
线性回归模型的原理、实现及应用,特别是在 Python 中的实践
本文深入探讨了线性回归模型的原理、实现及应用,特别是在 Python 中的实践。线性回归假设因变量与自变量间存在线性关系,通过建立线性方程预测未知数据。文章介绍了模型的基本原理、实现步骤、Python 常用库(如 Scikit-learn 和 Statsmodels)、参数解释、优缺点及扩展应用,强调了其在数据分析中的重要性和局限性。
71 3
|
10天前
|
算法 数据处理 Python
高精度保形滤波器Savitzky-Golay的数学原理、Python实现与工程应用
Savitzky-Golay滤波器是一种基于局部多项式回归的数字滤波器,广泛应用于信号处理领域。它通过线性最小二乘法拟合低阶多项式到滑动窗口中的数据点,在降噪的同时保持信号的关键特征,如峰值和谷值。本文介绍了该滤波器的原理、实现及应用,展示了其在Python中的具体实现,并分析了不同参数对滤波效果的影响。适合需要保持信号特征的应用场景。
56 11
高精度保形滤波器Savitzky-Golay的数学原理、Python实现与工程应用
|
30天前
|
数据可视化 编译器 Python
Manim:数学可视化的强大工具 | python小知识
Manim(Manim Community Edition)是由3Blue1Brown的Grant Sanderson开发的数学动画引擎,专为数学和科学可视化设计。它结合了Python的灵活性与LaTeX的精确性,支持多领域的内容展示,能生成清晰、精确的数学动画,广泛应用于教育视频制作。安装简单,入门容易,适合教育工作者和编程爱好者使用。
240 7
|
1月前
|
缓存 开发者 Python
深入探索Python中的装饰器:原理、应用与最佳实践####
本文作为技术性深度解析文章,旨在揭开Python装饰器背后的神秘面纱,通过剖析其工作原理、多样化的应用场景及实践中的最佳策略,为中高级Python开发者提供一份详尽的指南。不同于常规摘要的概括性介绍,本文摘要将直接以一段精炼的代码示例开篇,随后简要阐述文章的核心价值与读者预期收获,引领读者快速进入装饰器的世界。 ```python # 示例:一个简单的日志记录装饰器 def log_decorator(func): def wrapper(*args, **kwargs): print(f"Calling {func.__name__} with args: {a
42 2
|
1月前
|
机器学习/深度学习 人工智能 自然语言处理
探索未来编程:Python在人工智能领域的深度应用与前景###
本文将深入探讨Python语言在人工智能(AI)领域的广泛应用,从基础原理到前沿实践,揭示其如何成为推动AI技术创新的关键力量。通过分析Python的简洁性、灵活性以及丰富的库支持,展现其在机器学习、深度学习、自然语言处理等子领域的卓越贡献,并展望Python在未来AI发展中的核心地位与潜在变革。 ###
|
13天前
|
存储 缓存 算法
探索企业文件管理软件:Python中的哈希表算法应用
企业文件管理软件依赖哈希表实现高效的数据管理和安全保障。哈希表通过键值映射,提供平均O(1)时间复杂度的快速访问,适用于海量文件处理。在Python中,字典类型基于哈希表实现,可用于管理文件元数据、缓存机制、版本控制及快速搜索等功能,极大提升工作效率和数据安全性。
49 0
|
2月前
|
机器学习/深度学习 自然语言处理 语音技术
Python在深度学习领域的应用,重点讲解了神经网络的基础概念、基本结构、训练过程及优化技巧
本文介绍了Python在深度学习领域的应用,重点讲解了神经网络的基础概念、基本结构、训练过程及优化技巧,并通过TensorFlow和PyTorch等库展示了实现神经网络的具体示例,涵盖图像识别、语音识别等多个应用场景。
74 8
|
2月前
|
设计模式 开发者 Python
Python编程中的设计模式应用与实践感悟####
本文作为一篇技术性文章,旨在深入探讨Python编程中设计模式的应用价值与实践心得。在快速迭代的软件开发领域,设计模式如同导航灯塔,指引开发者构建高效、可维护的软件架构。本文将通过具体案例,展现设计模式如何在实际项目中解决复杂问题,提升代码质量,并分享个人在实践过程中的体会与感悟。 ####