Python 数学应用(四)(3)https://developer.aliyun.com/article/1506410
- 现在,我们定义第二个扩展模块,将源设置为刚刚创建的
cython_mandel.pyx
文件:
cython = Extension( "cython_mandel", sources=["cython_mandel.pyx"], include_dirs=[np.get_include()], define_macros=[("NPY_NO_DEPRECATED_API", "NPY_1_7_API_VERSION")] )
- 接下来,将这两个扩展模块添加到列表中,并调用
setup
例程来注册这些模块:
extensions = [hybrid, cython] setup( ext_modules = cythonize(extensions, compiler_directives= {"language_level": "3"}), )
- 在
mandelbrot
文件夹中创建一个名为__init__.py
的新空文件,以便将其转换为可以在 Python 中导入的包。 - 在
mandelbrot
文件夹中打开终端,并使用以下命令构建 Cython 扩展模块:
python3.8 setup.py build_ext --inplace
- 现在,开始一个名为
run.py
的新文件,并添加以下导入语句:
# run.py from time import time from functools import wraps import matplotlib.pyplot as plt
- 从我们定义的每个模块中导入各种
compute_mandel
例程:原始的python_mandel
;Cython 化的 Python 代码hybrid_mandel
;以及编译的纯 Cython 代码cython_mandel
:
from mandelbrot.python_mandel import compute_mandel as compute_mandel_py from mandelbrot.hybrid_mandel import compute_mandel as compute_mandel_hy from mandelbrot.cython_mandel import compute_mandel as compute_mandel_cy
- 定义一个简单的计时器装饰器,我们将用它来测试例程的性能:
def timer(func, name): @wraps(func) def wrapper(*args, **kwargs): t_start = time() val = func(*args, **kwargs) t_end = time() print(f"Time taken for {name}: {t_end - t_start}") return val return wrapper
- 将
timer
装饰器应用于每个导入的例程,并定义一些用于测试的常量:
mandel_py = timer(compute_mandel_py, "Python") mandel_hy = timer(compute_mandel_hy, "Hybrid") mandel_cy = timer(compute_mandel_cy, "Cython") Nx = 320 Ny = 240 steps = 255
- 用我们之前设置的常量运行每个装饰的例程。将最终调用(Cython 版本)的输出记录在
vals
变量中:
mandel_py(Nx, Ny, steps) mandel_hy(Nx, Ny, steps) vals = mandel_cy(Nx, Ny, steps)
- 最后,绘制 Cython 版本的输出,以检查例程是否正确计算了 Mandelbrot 集:
fig, ax = plt.subplots() ax.imshow(vals.T, extent=(-2.5, 0.5, -1.2, 1.2)) plt.show()
运行run.py
文件将在终端打印每个例程的执行时间,如下所示:
Time taken for Python: 6.276328802108765 Time taken for Hybrid: 5.816391468048096 Time taken for Cython: 0.03116750717163086
Mandelbrot 集的绘图可以在以下图像中看到:
图 10.4:使用 Cython 代码计算的 Mandelbrot 集的图像
这是我们对 Mandelbrot 集的期望。
它是如何工作的…
在这个示例中发生了很多事情,所以让我们从解释整个过程开始。Cython 接受用 Python 语言的扩展编写的代码,并将其编译成 C 代码,然后用于生成可以导入 Python 会话的 C 扩展库。实际上,您甚至可以使用 Cython 直接将普通 Python 代码编译为扩展,尽管结果不如使用修改后的语言好。在这个示例中的前几个步骤中,我们在修改后的语言中定义了 Python 代码的新版本(保存为.pyx
文件),其中包括类型信息以及常规 Python 代码。为了使用 Cython 构建 C 扩展,我们需要定义一个设置文件,然后创建一个文件来生成结果。
Cython 代码的最终编译版本比其 Python 等效代码运行速度快得多。Cython 编译的 Python 代码(在本示例中称为混合代码)的性能略优于纯 Python 代码。这是因为生成的 Cython 代码仍然必须处理带有所有注意事项的 Python 对象。通过在.pyx
文件中向 Python 代码添加类型信息,我们开始看到性能的重大改进。这是因为in_mandel
函数现在有效地被定义为一个 C 级别函数,它不与 Python 对象交互,而是操作原始数据类型。
Cython 代码和 Python 等效代码之间存在一些小但非常重要的区别。在步骤 1中,您可以看到我们像往常一样导入了 NumPy 包,但我们还使用了cimport
关键字将一些 C 级别的定义引入了作用域。在步骤 2中,我们在定义in_mandel
例程时使用了cdef
关键字而不是def
关键字。这意味着in_mandel
例程被定义为一个 C 级别函数,不能从 Python 级别使用,这在调用这个函数时(这经常发生)节省了大量开销。
关于这个函数定义的唯一其他真正的区别是在签名和函数的前几行中包含了一些类型声明。我们在这里应用的两个装饰器禁用了访问列表(数组)元素时的边界检查。boundscheck
装饰器禁用了检查索引是否有效(在 0 和数组大小之间),而wraparound
装饰器禁用了负索引。尽管它们禁用了 Python 内置的一些安全功能,但这两个装饰器在执行过程中都会对速度产生适度的改进。在这个示例中,禁用这些检查是可以的,因为我们正在使用循环遍历数组的有效索引。
设置文件是我们告诉 Python(因此也是 Cython)如何构建 C 扩展的地方。Cython 中的cythonize
例程在这里起着关键作用,因为它触发了 Cython 构建过程。在步骤 9和10中,我们使用setuptools
中的Extension
类定义了扩展模块,以便我们可以为构建定义一些额外的细节;具体来说,我们为 NumPy 编译设置了一个环境变量,并添加了 NumPy C 头文件的include
文件。这是通过Extension
类的define_macros
关键字参数完成的。我们在步骤 13中使用setuptools
命令来构建 Cython 扩展,并且添加了--inplace
选项,这意味着编译后的库将被添加到当前目录,而不是放在一个集中的位置。这对开发来说是很好的。
运行脚本相当简单:从每个定义的模块中导入例程 - 其中两个实际上是 C 扩展模块 - 并计算它们的执行时间。我们必须在导入别名和例程名称上有一些创造性,以避免冲突。
还有更多…
Cython 是改进代码性能的强大工具。然而,在优化代码时,您必须始终谨慎地花费时间。使用像 Python 标准库中提供的 cProfiler 这样的性能分析工具可以用来找到代码中性能瓶颈出现的地方。在这个示例中,性能瓶颈出现的地方是相当明显的。在这种情况下,Cython 是解决问题的良药,因为它涉及对(双重)for
循环内的函数进行重复调用。然而,它并不是解决性能问题的通用方法,往往情况下,通过重构代码以利用高性能库,可以大大提高代码的性能。
Cython 与 Jupyter 笔记本集成良好,并且可以无缝地在笔记本的代码块中使用。Cython 也包含在 Python 的 Anaconda 发行版中,因此在使用 Anaconda 发行版安装了 Cython 后,就无需额外设置即可在 Jupyter 笔记本中使用 Cython。
在从 Python 生成编译代码时,Cython 并不是唯一的选择。例如,NumBa 包(numba.pydata.org/
)提供了一个即时(JIT)编译器,通过简单地在特定函数上放置装饰器来优化 Python 代码的运行时。NumBa 旨在与 NumPy 和其他科学 Python 库一起使用,并且还可以用于利用 GPU 加速代码。
使用 Dask 进行分布式计算
Dask 是一个用于在多个线程、进程或甚至计算机之间进行分布式计算的库,以有效地进行大规模计算。即使您只是在一台笔记本电脑上工作,这也可以极大地提高性能和吞吐量。Dask 提供了 Python 科学堆栈中大多数数据结构的替代品,如 NumPy 数组和 Pandas DataFrames。这些替代品具有非常相似的接口,但在内部,它们是为分布式计算而构建的,以便它们可以在多个线程、进程或计算机之间共享。在许多情况下,切换到 Dask 就像改变import
语句一样简单,可能还需要添加一些额外的方法调用来启动并发计算。
在这个示例中,我们将学习如何使用 Dask 对 DataFrame 进行一些简单的计算。
准备工作
对于这个示例,我们需要从 Dask 包中导入dataframe
模块。按照 Dask 文档中的约定,我们将使用别名dd
导入此模块:
import dask.dataframe as dd
我们还需要这一章的代码库中的sample.csv
文件。
如何做…
按照以下步骤使用 Dask 对 DataFrame 对象执行一些计算:
- 首先,我们需要将数据从
sample.csv
加载到 Dask 的DataFrame
中:
data = dd.read_csv("sample.csv")
- 接下来,我们对 DataFrame 的列执行标准计算:
sum_data = data.lower + data.upper print(sum_data)
与 Pandas DataFrames 不同,结果不是一个新的 DataFrame。print
语句给了我们以下信息:
Dask Series Structure: npartitions=1 float64 ... dtype: float64 Dask Name: add, 6 tasks
- 要实际获得结果,我们需要使用
compute
方法:
result = sum_data.compute() print(result.head())
结果现在如预期所示:
0 -0.911811 1 0.947240 2 -0.552153 3 -0.429914 4 1.229118 dtype: float64
- 我们计算最后两列的均值的方式与 Pandas DataFrame 完全相同,但我们需要添加一个调用
compute
方法来执行计算:
means = data.loc[:, ("lower", "upper")].mean().compute() print(means)
打印的结果与我们的预期完全一致:
lower -0.060393 upper -0.035192 dtype: float64
它是如何工作的…
Dask 为计算构建了一个任务图,描述了需要对数据集合执行的各种操作和计算之间的关系。这样可以将计算步骤分解,以便可以按正确的顺序在不同的工作器之间进行计算。然后将此任务图传递给调度程序,调度程序将实际任务发送给工作器执行。Dask 配备了几种不同的调度程序:同步、线程、多进程和分布式。可以在compute
方法的调用中选择调度程序的类型,或者全局设置。如果没有给出一个合理的默认值,Dask 会选择一个合理的默认值。
同步、线程和多进程调度程序在单台机器上工作,而分布式调度程序用于与集群一起工作。Dask 允许您以相对透明的方式在调度程序之间切换,尽管对于小任务,您可能不会因为设置更复杂的调度程序而获得任何性能优势。
compute
方法是这个示例的关键。通常会在 Pandas DataFrames 上执行计算的方法现在只是设置了一个通过 Dask 调度程序执行的计算。直到调用compute
方法之前,计算才会开始。这类似于Future
作为异步函数调用结果的代理返回,直到计算完成才会实现。
还有更多…
Dask 提供了 NumPy 数组的接口,以及本示例中显示的 DataFrames。还有一个名为dask_ml
的机器学习接口,它提供了类似于 scikit-learn 包的功能。一些外部包,如xarray
,也有 Dask 接口。Dask 还可以与 GPU 一起工作,以进一步加速计算并从远程源加载数据,这在计算分布在集群中时非常有用。