TensorFlow 实战(七)(2)https://developer.aliyun.com/article/1522941
使用 TFX 编写数据管道
想象一下,你正在开发一个系统,根据天气条件来预测森林火灾的严重程度。你已经获得了过去观察到的森林火灾的数据集,并被要求创建一个模型。为了确保你能够将模型提供为服务,你决定创建一个工作流程来摄取数据并使用 TFX 训练模型。这个过程的第一步是创建一个能够读取数据(以 CSV 格式)并将其转换为特征的数据管道。作为这个管道的一部分,你将拥有一个数据读取器(从 CSV 生成示例),显示字段的摘要统计信息,了解数据的模式,并将其转换为模型理解的正确格式。
关于环境的重要信息
要运行本章的代码,强烈建议使用 Linux 环境(例如 Ubuntu),并且将提供该环境的说明。TFX 未针对 Windows 环境进行测试(mng.bz/J2Y0
)。另一个重要的事项是我们将使用稍旧版本的 TFX(1.6.0)。撰写时,最新版本为 1.9.0。这是因为在 1.6.0 版本之后的版本中,运行 TFX 在诸如笔记本等交互式环境中所需的关键组件已损坏。此外,本章后面我们将使用一种名为 Docker 的技术。由于对资源的访问受到严格限制,使 Docker 按我们所需的方式运行在 Windows 上可能会相当困难。此外,对于本章,我们将定义一个新的 Anaconda 环境。要执行此操作,请按照以下说明操作:
- 打开一个终端窗口,并进入代码存储库中的 Ch15-TFX-for-MLOps-in-TF2 目录。
- 如果您已经激活了 Anaconda 虚拟环境(例如 manning.tf2),请通过运行 conda deactivate manning.tf2 来停用它。
- 运行 conda create -n manning.tf2.tfx python=3.6 来创建一个新的虚拟 Anaconda 环境。
- 运行 conda activate manning.tf2.tfx 以激活新环境。
- 运行 pip install --use-deprecated=legacy-resolver -r requirements.txt。
- 运行 jupyter notebook。
- 打开 tfx/15.1_MLOps_with_tensorflow.ipynb 笔记本。
第一件事是下载数据集(列表 15.1)。我们将使用一个记录了葡萄牙蒙特西尼奥公园历史森林火灾的数据集。该数据集在archive.ics.uci.edu/ml/datasets/Forest+Fires
上免费提供。它是一个 CSV 文件,具有以下特征:
- X—蒙特西尼奥公园地图中的 x 轴空间坐标
- Y—蒙特西尼奥公园地图中的 y 轴空间坐标
- month—一年中的月份
- day—一周中的日期
- Fine Fuel Moisture Code (FFMC)—代表森林树冠阴影下的林地燃料湿度
- DMC—土壤平均含水量的数字评级
- Drought Code (DC)—表示土壤干燥程度的深度
- Initial Spread Index (ISI)—预期的火灾蔓延速率
- temp—摄氏度温度
- RH—相对湿度,单位%
- wind—风速,单位 km/h
- rain—外部降雨量,单位 mm/m2
- area—森林烧毁面积(单位公顷)
选择机器学习模型的特征
选择机器学习模型的特征不是一个微不足道的任务。通常,在使用特征之前,您必须了解特征,特征间的相关性,特征-目标相关性等等,然后就可以判断是否应使用特征。因此,不应该盲目地使用模型的所有给定特征。然而,在这种情况下,重点在于 MLOps,而不是数据科学决策,我们将使用所有特征。使用所有这些特征将稍后有助于解释在定义 MLOps 管道时可用的各种选项。
我们的任务是在给出所有其他特征的情况下预测烧毁面积。请注意,预测连续值(如面积)需要回归模型。因此,这是一个回归问题,而不是分类问题。
图 15.1 下载数据集
import os import requests import tarfile import shutil if not os.path.exists(os.path.join('data', 'csv', 'forestfires.csv')): ❶ url = "http:/ /archive.ics.uci.edu/ml/machine-learning-databases/forest- ➥ fires/forestfires.csv" r = requests.get(url) ❷ if not os.path.exists(os.path.join('data', 'csv')): ❸ os.makedirs(os.path.join('data', 'csv')) ❸ with open(os.path.join('data', 'csv', 'forestfires.csv'), 'wb') as f: ❸ f.write(r.content) ❸ else: print("The forestfires.csv file already exists.") if not os.path.exists(os.path.join('data', 'forestfires.names')): ❹ url = "http:/ /archive.ics.uci.edu/ml/machine-learning-databases/forest- ➥ fires/forestfires.names" r = requests.get(url) ❹ if not os.path.exists('data'): ❺ os.makedirs('data') ❺ with open(os.path.join('data', 'forestfires.names'), 'wb') as f: ❺ f.write(r.content) ❺ else: print("The forestfires.names file already exists.")
❶ 如果未下载数据文件,请下载该文件。
❷ 此行下载给定 URL 的文件。
❸ 创建必要的文件夹并将下载的数据写入其中。
❹ 如果未下载包含数据集描述的文件,请下载它。
❺ 创建必要的目录并将数据写入其中。
在这里,我们需要下载两个文件:forestfires.csv 和 forestfires.names。forestfires.csv 以逗号分隔的格式包含数据,第一行是标题,其余部分是数据。forestfires.names 包含更多关于数据的信息,以便您想更多地了解它。接下来,我们将分离出一个小的测试数据集以供后续手动测试。拥有一个专用的测试集,在任何阶段都没有被模型看到,将告诉我们模型的泛化情况如何。这将是原始数据集的 5%。其余 95%将用于训练和验证数据:
import pandas as pd df = pd.read_csv( os.path.join('data', 'csv', 'forestfires.csv'), index_col=None, ➥ header=0 ) train_df = df.sample(frac=0.95, random_state=random_seed) test_df = df.loc[~df.index.isin(train_df.index), :] train_path = os.path.join('data','csv','train') os.makedirs(train_path, exist_ok=True) test_path = os.path.join('data','csv','test') os.makedirs(test_path, exist_ok=True) train_df.to_csv( os.path.join(train_path, 'forestfires.csv'), index=False, header=True ) test_df.to_csv( os.path.join(test_path, 'forestfires.csv'), index=False, header=True )
现在,我们将开始 TFX 管道。第一步是定义存储管道工件的根目录。您可能会问什么是管道工件?在运行 TFX 管道时,它会在目录中存储各个阶段的中间结果(在某个子目录结构下)。其中的一个例子是,当您从 CSV 文件中读取数据时,TFX 管道会将数据拆分为训练和验证子集,将这些示例转换为 TFRecord 对象(即 TensorFlow 内部用于数据的对象类型),并将数据存储为压缩文件:
_pipeline_root = os.path.join( os.getcwd(), 'pipeline', 'examples', 'forest_fires_pipeline' )
TFX 使用 Abseil 进行日志记录。Abseil 是从 Google 的内部代码库中提取的开源 C ++库集合。它提供了日志记录,命令行参数解析等功能。如果您感兴趣,请在abseil.io/docs/python/
阅读有关该库的更多信息。我们将设置日志记录级别为 INFO,以便我们可以在 INFO 级别或更高级别看到日志记录语句。日志记录是具有重要功能的,因为我们可以获得很多见解,包括哪些步骤成功运行以及哪些错误被抛出:
absl.logging.set_verbosity(absl.logging.INFO)
完成初始设置后,我们将定义一个 InteractiveContext:
from tfx.orchestration.experimental.interactive.interactive_context import ➥ InteractiveContext context = InteractiveContext( pipeline_name = "forest_fires", pipeline_root=_pipeline_root )
TFX 在一个上下文中运行流水线。上下文被用来运行你在流水线中定义的各个步骤。它还起着非常重要的作用,就是在我们在流水线中进行过程中管理不同步骤之间的状态。为了管理状态之间的转换并确保流水线按预期运行,它还维护了一个元数据存储(一个小规模的数据库)。元数据存储包含各种信息,如执行顺序、组件的最终状态和产生的错误。你可以在以下侧边栏中了解有关元数据的信息。
元数据中有什么?
一旦创建了 InteractiveContext,你会在流水线根目录中看到一个名为 metadata.sqlite 的数据库。这是一个轻量级、快速的 SQL 数据库(www.sqlite.org/index.xhtml
),专为处理少量数据和传入请求而设计的。该数据库将记录有关输入、输出和执行相关输出的重要信息(组件的运行标识符、错误)。这些信息可以用于调试你的 TFX 流水线。元数据可以被视为不是直接输入或输出,但仍然是正确执行组件所必需的数据,以提供更大的透明度。在具有许多组件以许多不同方式相互连接的复杂 TFX 流水线的调试中,元数据可能非常有帮助。你可以在www.tensorflow.org/tfx/guide/mlmd
上了解更多信息。
我们要开始定义流水线了。本节流水线的主要目的是
- 从 CSV 文件加载数据并拆分为训练和验证数据
- 了解数据的模式(例如各个列、数据类型、最小/最大值等)
- 显示关于各种特征分布的摘要统计和图形
- 将原始列转换为特征,可能需要特殊的中间处理步骤
这些步骤是模型训练和部署的前奏。每个任务都将是流水线中的一个单独组件,在合适的时候我们将详细讨论这些步骤。
15.1.1 从 CSV 文件加载数据
第一步是定义一个组件来从 CSV 文件中读取示例并将数据拆分为训练和评估数据。为此,你可以使用 tfx.components.CsvExampleGen 对象。我们所需要做的就是将包含数据的目录提供给 input_base 参数:
from tfx.components import CsvExampleGen example_gen = CsvExampleGen(input_base=os.path.join('data', 'csv', 'train'))
然后我们使用之前定义的 InteractiveContext 来运行示例生成器:
context.run(example_gen)
让我们来看看这一步骤产生了什么。要查看数据,请前往 _pipeline_root 目录(例如,Ch15-TFX-for-MLOps-in-TF2/tfx/pipeline)。它应该具有类似于图 15.1 所示的目录/文件结构。
图 15.1 运行 CsvExampleGen 之后的目录/文件结构
您将看到在管道中创建了两个 GZip 文件(即带有 .gz 扩展名)。您会注意到在 CsvExampleGen 文件夹中有两个子目录:Split-train 和 Split-eval,分别包含训练和验证数据。当您运行包含前述代码的笔记本单元时,您还将看到一个输出 HTML 表格,显示 TFX 组件的输入和输出(图 15.2)。
图 15.2 运行 CsvExampleGen 组件生成的输出 HTML 表格
有一些值得注意的事项。首先,您将看到 execution_id,这是一个计数器生成的值,该计数器跟踪您运行 TFX 组件的次数。换句话说,每次运行 TFX 组件(如 CsvExampleGen)时,计数器都会增加 1。如果您继续向下看,您会看到一些关于 CsvExampleGen 如何分割数据的重要信息。如果您查看 component > CsvExampleGen > exec_properties > output_config 下,您会看到类似于
"split_config": { "splits": [ { "hash_buckets": 2, "name": "train" }, { "hash_buckets": 1, "name": "eval" } ] }
这里说数据集已被分成两组:train 和 eval。训练集大约占原始数据的三分之二,而评估集大约占原始数据的三分之一。这些信息是通过查看 hash_buckets 属性推断出来的。TFX 使用哈希将数据分成训练集和评估集。默认情况下,它将定义三个哈希桶。然后 TFX 使用每个记录中的值为该记录生成哈希。记录中的值传递给哈希函数以生成哈希。然后使用生成的哈希来将该示例分配到一个桶中。例如,如果哈希值为 7,则 TFX 可以轻松找到具有 7% 的桶,3 = 1,这意味着它将被分配到第二个桶(因为桶是从零开始索引的)。您可以按以下方式访问 CsvExampleGen 中的元素。
关于哈希的更多信息
有许多哈希函数,例如 MD5、SHA1 等。您可以在 blog.jscrambler.com/hashing-algorithms/
上阅读有关哈希函数的更多信息。在 TensorFlow 中,有两种不同的函数可用于生成哈希:tf.strings.to_hash_bucket_fast (mng.bz/woJq
) 和 tf.strings.to_ hash_bucket_strong ()。强哈希函数速度较慢,但更能抵御可能操纵输入以控制生成的哈希值的恶意攻击。
artifact = example_gen.outputs['examples'].get()[0] print("Artifact split names: {}".format(artifact.split_names)) print("Artifact URI: {}".format(artifact.uri)
这将打印以下输出:
Artifact split names: ["train", "eval"] Artifact URI: <path to project>/Ch15-TFX-for-MLOps-in- ➥ TF2/tfx/pipeline/examples/forest_fires_pipeline/CsvExampleGen/examples/1
之前我们说过,随着我们在管道中的进展,TFX 会将中间输出存储起来。我们看到 CsvExampleGen 组件已将数据存储为 .gz 文件。事实上,它将 CSV 文件中找到的示例存储为 TFRecord 对象。TFRecord 用于将数据存储为字节流。由于 TFRecord 是在使用 TensorFlow 时存储数据的常用方法;这些记录可以轻松地作为 tf.data.Dataset 检索,并且可以检查数据。下一个清单显示了如何做到这一点。
列表 15.2 打印 CsvExampleGen 存储的数据
train_uri = os.path.join( example_gen.outputs['examples'].get()[0].uri, 'Split-train' ❶ ) tfrecord_filenames = [ os.path.join(train_uri, name) for name in os.listdir(train_uri) ❷ ] dataset = tf.data.TFRecordDataset( tfrecord_filenames, compression_type="GZIP" ) ❸ for tfrecord in dataset.take(2): ❹ serialized_example = tfrecord.numpy() ❺ example = tf.train.Example() ❻ example.ParseFromString(serialized_example) ❼ print(example) ❽
❶ 获取代表训练示例的输出工件的 URL,该工件是一个目录。
❷ 获取此目录中的文件列表(所有压缩的 TFRecord 文件)。
❸ 创建一个 TFRecordDataset 来读取这些文件。GZip(扩展名为 .gz)包含一组 TFRecord 对象。
❹ 迭代前两个记录(可以是小于或等于数据集大小的任何数字)。
❺ 从 TFRecord(包含一个示例)获取字节流。
❻ 定义一个知道如何解析字节流的 tf.train.Example 对象。
❼ 将字节流解析为适当可读的示例。
❽ 打印数据。
如果你运行这段代码,你会看到以下内容:
features { feature { key: "DC" value { float_list { value: 605.7999877929688 } } } ... feature { key: "RH" value { int64_list { value: 43 } } } feature { key: "X" value { int64_list { value: 5 } } } ... feature { key: "area" value { float_list { value: 2.0 } } } feature { key: "day" value { bytes_list { value: "tue" } } } ... } ...
tf.train.Example 将数据保存为一组特征,每个特征都有一个键(列描述符)和一个值。你会看到给定示例的所有特征。例如,DC 特征具有浮点值 605.799,RH 特征具有整数值 43,area 特征具有浮点值 2.0,而 day 特征具有 bytes_list(用于存储字符串)值为 “tue”(即星期二)。
在移动到下一节之前,让我们再次提醒自己我们的目标是什么:开发一个模型,可以根据数据集中的所有其他特征来预测火灾蔓延(以公顷为单位)。这个问题被构建为一个回归问题。
15.1.2 从数据生成基本统计信息
作为下一步,我们将更好地理解数据。这称为探索性数据分析(EDA)。EDA 通常不是很明确,并且非常依赖于您正在解决的问题和数据。您还必须考虑到通常在项目交付之前的有限时间。换句话说,您不能测试所有内容,必须优先考虑要测试的内容和要假设的内容。对于我们在这里处理的结构化数据,一个很好的起点是了解类型(数值与分类)以及各列值的分布。TFX 为此提供了一个组件。StatisticsGen 将自动生成这些统计信息。我们很快将更详细地看到此模块提供了什么样的见解:
from tfx.components import StatisticsGen statistics_gen = StatisticsGen( examples=example_gen.outputs['examples']) context.run(statistics_gen)
这将生成一个 HTML 表格,类似于您在运行 CsvExampleGen 后看到的表格(见图 15.3)。
图 15.3 StatisticsGen 组件提供的输出
然而,要检索此步骤的最有价值的输出,您必须运行以下命令:
context.show(statistics_gen.outputs['statistics'])
这将在管道根目录中创建以下文件(见图 15.4)。
图 15.4 运行 StatisticsGen 后的目录/文件结构
图 15.5 展示了 TFX 提供的有关数据的宝贵信息集合。图 15.5 中的输出图是一个包含丰富数据的金矿,提供了大量关于我们处理的数据的信息。它为你提供了基本但全面的图表套件,提供了有关数据中存在的列的许多信息。让我们从上到下来看。在顶部,你可以选择排序和过滤图 15.5 中显示的输出。例如,你可以改变图表的顺序,选择基于数据类型的图表,或者通过正则表达式进行筛选。
图 15.5 由 StatisticsGen 组件生成的数据的摘要统计图
默认情况下,StatisticsGen 将为训练集和评估集生成图表。然后每个训练和评估部分将有几个子部分;在这种情况下,我们有数值列和分类列的部分。
在左边,你可以看到一些数字统计和特征的评估,而在右边,你可以看到特征分布的视觉表示。例如,拿训练集中的 FFMC 特征来说。我们可以看到它有 333 个例子且 0%的特征缺失值。它的平均值约为 90,标准偏差为 6.34。在图表中,你可以看到分布是相当倾斜的。几乎所有的值都集中在 80-90 范围内。你将看到稍后这可能会给我们制造问题以及我们将如何解决它们。
在分类部分,你可以看到日和月特征的值。例如,日特征有七个唯一值,且 0%缺失。日特征的最频繁值(即模式)出现了 60 次。请注意,日表示为条形图,月表示为线图,因为对于唯一值高于阈值的特征,使用线图可以使图表清晰且减少混乱。
15.1.3 从数据推断模式
到目前为止,我们已经从 CSV 文件中加载了数据并探索了数据集的基本统计信息。下一个重要的步骤是推断数据的模式。一旦提供了数据,TFX 可以自动推断数据的模式。如果你使用过数据库,推断出的模式与数据库模式相同。它可以被视为数据的蓝图,表达数据的结构和重要属性。它也可以被视为一组规则,规定数据应该看起来像什么。例如,如果你有了模式,你可以通过参考模式来分类给定的记录是否有效。
不做更多的话,让我们创建一个 SchemaGen 对象。SchemaGen 需要前一步的输出(即 StatisticsGen 的输出)和一个名为 infer_feature_shape 的布尔参数。
from tfx.components import SchemaGen schema_gen = SchemaGen( statistics=statistics_gen.outputs[‘statistics’], infer_feature_shape=False) context.run(schema_gen)
在这里,我们将 infer_feature_shape 设置为 False,因为我们将在特征上进行一些转换。因此,我们将有更大的灵活性来自由操作特征形状。然而,设置这个参数(infer_feature_shape)意味着对下游步骤(称为 transform 步骤)的重要改变。当 infer_feature_shape 设置为 False 时,传递给 transform 步骤的张量被表示为 tf.SparseTensor 对象,而不是 tf.Tensor 对象。如果设置为 True,则需要是一个具有已知形状的 tf.Tensor 对象。接下来,要查看 SchemaGen 的输出,可以执行以下操作
context.show(schema_gen.outputs['schema'])
这将产生表 15.1 所示的输出。
表 15.1 TFX 生成的模式输出
特征名称 | 类型 | 存在 | 价值 | 域 |
‘day’ | STRING | 必须的 | 单个的 | ‘day’ |
‘month’ | STRING | 必须的 | 单个的 | ‘month’ |
‘DC’ | FLOAT | 必须的 | 单个的 | - |
‘DMC’ | FLOAT | 必须的 | 单个的 | - |
‘FFMC’ | FLOAT | 必须的 | 单个的 | - |
‘ISI’ | FLOAT | 必须的 | 单个的 | - |
‘RH’ | INT | 必须的 | 单个的 | - |
‘X’ | INT | 必须的 | 单个的 | - |
‘Y’ | INT | 必须的 | 单个的 | - |
‘area’ | FLOAT | 必须的 | 单个的 | - |
‘rain’ | FLOAT | 必须的 | 单个的 | - |
‘temp’ | FLOAT | 必须的 | 单个的 | - |
‘wind’ | FLOAT | 必须的 | 单个的 | |
域 | 值 | |||
‘day’ | ||||
‘month’ | ‘apr’ | ‘aug’ | ‘dec’ | ‘feb’ |
域定义了给定特征的约束。我们列出了 TFX 中定义的一些最受欢迎的域:
- 整数域值(例如,定义整数特征的最小/最大值)
- 浮点域值(例如,定义浮点值特征的最小/最大值)
- 字符串域值(例如,为字符串特征定义允许的值/标记)
- 布尔域值(例如,可以用于定义真/假状态的自定义值)
- 结构域值(例如,可以用于定义递归域[域内的域]或具有多个特征的域)
- 自然语言域值(例如,为相关语言特征定义一个词汇表[允许的标记集合])
- 图像域值(例如,可以用来限制图像的最大字节大小)
- 时间域值(例如,可以用来定义数据/时间特征)
- 时间值域(例如,可以用来定义不带日期的时间)
域的列表可在名为 schema.proto 的文件中找到。schema.proto 在mng.bz/7yp9
上定义。这些文件是使用一个叫做 Protobuf 的库定义的。Protobuf 是一种用于对象序列化的库。您可以阅读下面的侧边栏了解有关 Protobuf 库的更多信息。
Protobuf 库
Protobuf 是由 Google 开发的对象序列化/反序列化库。需要序列化的对象被定义为 Protobuf 消息。消息的模板由 .proto 文件定义。然后,为了进行反序列化,Protobuf 提供了诸如 ParseFromString() 等函数。要了解有关该库的更多信息,请参阅 mng.bz/R45P
。
接下来,我们将看到如何将数据转换为特征。
15.1.4 将数据转换为特征
我们已经到达了数据处理管道的最终阶段。最后一步是将我们提取的列转换为对我们的模型有意义的特征。我们将创建三种类型的特征:
- 密集的浮点数特征—值以浮点数(例如,温度)的形式呈现。这意味着该值会按原样传递(可以选择进行归一化处理;例如,Z 分数归一化)以创建一个特征。
- 分桶特征—根据预定义的分桶间隔对数值进行分桶。这意味着该值将根据其落入的分桶而转换为桶索引(例如,我们可以将相对湿度分成三个值:低[-inf,33),中[33,66),高[66,inf))。
- 分类特征(基于整数或字符串)—值是从预定义的值集中选择的(例如,日期或月份)。如果该值尚未是整数索引(例如,日期作为字符串),则将使用将每个单词映射到索引的词汇表将其转换为整数索引(例如,“mon” 被映射为 0,“tue” 被映射为 1,等等)。
我们将向数据集中的每个字段介绍其中一种特征转换:
- X(空间坐标)—以浮点数值表示
- Y(空间坐标)—以浮点数值表示
- wind(风速)—以浮点数值表示
- rain(室外降雨)—以浮点数值表示
- FFMC(燃料湿度)—以浮点数值表示
- DMC(平均含水量)—以浮点数值表示
- DC(土壤干燥深度)—以浮点数值表示
- ISI(预期火灾蔓延速率)—以浮点数值表示
- temp(温度)—以浮点数值表示
- RH(相对湿度)—作为分桶值表示
- month—作为分类特征表示
- day—作为分类特征表示
- area(烧毁面积)—作为数值保留的标签特征
我们首先要定义一些常量,这些常量将帮助我们跟踪哪个特征分配给了哪个类别。此外,我们将保留特定属性(例如,分类特征的最大类数;请参阅下一个列表)。
列表 15.3 定义特征转换步骤中与特征相关的常量
%%writefile forest_fires_constants.py ❶ VOCAB_FEATURE_KEYS = ['day','month'] ❷ MAX_CATEGORICAL_FEATURE_VALUES = [7, 12] ❸ DENSE_FLOAT_FEATURE_KEYS = [ 'DC', 'DMC', 'FFMC', 'ISI', 'rain', 'temp', 'wind', 'X', 'Y' ❹ ] BUCKET_FEATURE_KEYS = ['RH'] ❺ BUCKET_FEATURE_BOUNDARIES = [(33, 66)] ❻ LABEL_KEY = 'area' ❼ def transformed_name(key): ❽ return key + '_xf'
❶ 此命令将将此单元格的内容写入文件(阅读侧边栏以获取更多信息)。
❷ 基于词汇(或字符串)的分类特征。
❸ 数据集中假设每个分类特征都有一个最大值。
❹ 密集特征(这些将作为模型输入,或进行归一化处理)。
❺ 分桶特征。
❻ 分桶特征的分桶边界(例如,特征 RH 将被分桶为三个箱子:[0, 33),[33, 66),[66,inf))。
❼ 标签特征将保留为数值特征,因为我们正在解决回归问题。
❽ 定义一个函数,将在特征名称后添加后缀。这将帮助我们区分生成的特征和原始数据列。
我们将这些笔记本单元格写为 Python 脚本(或 Python 模块)的原因是因为 TFX 期望运行所需的一些代码部分作为 Python 模块。
%%writefile 魔术命令
%%writefile 是一个 Jupyter 魔术命令(类似于%%tensorboard)。它会导致 Jupyter 笔记本将单元格中的内容写入到新文件中(例如,Python 模块/脚本)。这是从笔记本单元格创建独立 Python 模块的好方法。笔记本很适合进行实验,但对于生产级别的代码,Python 脚本更好。例如,我们的 TFX 管道期望某些函数(例如,如何将原始列预处理为特征)是独立的 Python 模块。我们可以方便地使用%%writefile 命令来实现这一点。
此命令必须指定为要写入文件的单元格中的第一个命令。
接下来,我们将编写另一个模块 forest_fires_transform.py,其中将有一个预处理函数(称为 preprocessing_fn),该函数定义了每个数据列应如何处理以成为特征(请参见下一个列表)。
列表 15.4 定义将原始数据转换为特征的 Python 模块。
%%writefile forest_fires_transform.py ❶ import tensorflow as tf import tensorflow_transform as tft import forest_fires_constants ❷ _DENSE_FLOAT_FEATURE_KEYS = forest_fires_constants.DENSE_FLOAT_FEATURE_KEYS❸ _VOCAB_FEATURE_KEYS = forest_fires_constants.VOCAB_FEATURE_KEYS ❸ _BUCKET_FEATURE_KEYS = forest_fires_constants.BUCKET_FEATURE_KEYS ❸ _BUCKET_FEATURE_BOUNDARIES = ➥ forest_fires_constants.BUCKET_FEATURE_BOUNDARIES ❸ _LABEL_KEY = forest_fires_constants.LABEL_KEY ❸ _transformed_name = forest_fires_constants.transformed_name ❸ def preprocessing_fn(inputs): ❹ outputs = {} for key in _DENSE_FLOAT_FEATURE_KEYS: ❺ outputs[_transformed_name(key)] = tft.scale_to_z_score( ❻ sparse_to_dense(inputs[key]) ❼ ) for key in _VOCAB_FEATURE_KEYS: outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary( ❽ sparse_to_dense(inputs[key]), num_oov_buckets=1) for key, boundary in zip(_BUCKET_FEATURE_KEYS, ❾ ➥ _BUCKET_FEATURE_BOUNDARIES): ❾ outputs[_transformed_name(key)] = tft.apply_buckets( ❾ sparse_to_dense(inputs[key]), bucket_boundaries=[boundary] ❾ ) ❾ outputs[_transformed_name(_LABEL_KEY)] = ➥ sparse_to_dense(inputs[_LABEL_KEY]) ❿ return outputs def sparse_to_dense(x): ⓫ return tf.squeeze( tf.sparse.to_dense( tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]) ), axis=1 )
❶ 此代码列表中的内容将被写入到单独的 Python 模块中。
❷ 导入先前定义的特征常量。
❸ 导入 forest_fires_constants 模块中定义的所有常量。
❹ 这是 tf.transform 库中必不可少的回调函数,用于将原始列转换为特征。
❺ 对所有密集特征进行处理。
❻ 对密集特征执行基于 Z-score 的缩放(或标准化)。
❼ 因为在 SchemaGen 步骤中 infer_feature_shape 设置为 False,我们的输入是稀疏张量。它们需要转换为密集张量。
❽ 对于基于词汇的特征,构建词汇表并将每个标记转换为整数 ID。
❾ 对待分桶的特征,使用定义的分桶边界,对特征进行分桶。
❿ 标签特征只是简单地转换为密集张量,没有其他特征转换。
⓫ 一个将稀疏张量转换为密集张量的实用函数。
您可以看到该文件被命名为 forest_fires_transform.py。它定义了一个 preprocessing_fn()函数,该函数接受一个名为 inputs 的参数。inputs 是一个从特征键到在 CSV 文件中找到的数据列的映射字典,从 example_gen 输出流动。最后,它返回一个字典,其中特征键映射到使用 tensorflow_transform 库转换的特征。在方法的中间,您可以看到预处理函数执行三项重要工作。
首先,它读取所有密集特征(其名称存储在 _DENSE_FLOAT_FEATURE_KEYS 中),并使用 z 分数对值进行归一化。z 分数将某一列x归一化为
其中,μ(x)是列的平均值,σ(x)是列的标准差。要对数据进行归一化,可以调用 tensorflow_transform 库中的 scale_to_z_score()函数。您可以阅读有关 tensorflow_transform 的侧边栏,了解更多有关该库提供的内容。然后,该函数使用新的键(通过 _transformed_name 函数生成)将每个特征存储在输出中,该新键衍生自原始特征名称(新键通过在原始特征名称末尾添加 _xf 生成)。
接下来,它处理基于词汇的分类特征(其名称存储在 _VOCAB_FEATURE_KEYS 中),通过使用字典将每个字符串转换为索引。该字典将每个字符串映射到索引,并且可以自动从提供的训练数据中学习。这类似于我们如何使用 Keras 的 Tokenizer 对象学习字典,将单词转换为单词 ID。在 tensorflow_transform 库中,您可以使用 compute_and_apply_vocabulary()函数完成这一操作。对于 compute_and_apply_vocabulary()函数,我们可以通过传递 num_oov_buckets=1 来将任何未见字符串分配给特殊类别(除了已分配给已知类别的类别)。
然后,函数处理待进行桶化的特征。Bucketization 是将连续值应用于桶的过程,其中桶由一组边界定义。使用 apply_buckets()函数可以轻松地对特征进行 bucket 化,该函数将特征(在输入字典中提供)和桶边界作为输入参数。
最后,我们保留包含标签的列不变。通过这样,我们定义了 Transform 组件(mng.bz/mOGr
)。
tensorflow_transform:将原始数据转换为特征
tensorflow_transform 是 TensorFlow 中的一个子库,主要关注特征转换。它提供了各种功能来计算各种东西:
- 对特征进行桶化(例如,将一系列值分组到预定义的一组桶中)
- 从字符串列中提取词袋特征
- 数据集的协方差矩阵
- 列的均值、标准差、最小值、最大值、计数等
您可以在mng.bz/5QgB
上阅读有关此库提供的功能的更多信息。
from tfx.components import Transform transform = Transform( examples=example_gen.outputs['examples'], schema=schema_gen.outputs['schema'], module_file=os.path.abspath('forest_fires_transform.py'), ) context.run(transform)
Transform 组件接受三个输入:
- CsvExampleGen 组件的输出示例
- SchemaGen 生成的架构
- 用于将数据转换为特征的 preprocessing_fn()函数的 Python 模块
当涉及到多组件流水线,比如 TFX 流水线时,我们必须尽可能地检查每一个中间输出。这比交给偶然性并祈祷一切顺利要好得多(通常情况下都不是这样)。因此,让我们通过打印运行 Transform 步骤后保存到磁盘上的一些数据来检查输出(见下一列表)。打印数据的代码与使用 CsvExampleGen 组件时打印数据的代码类似。
列表 15.5 检查 TFX Transform 步骤产生的输出
import forest_fires_constants _DENSE_FLOAT_FEATURE_KEYS = forest_fires_constants.DENSE_FLOAT_FEATURE_KEYS _VOCAB_FEATURE_KEYS = forest_fires_constants.VOCAB_FEATURE_KEYS _BUCKET_FEATURE_KEYS = forest_fires_constants.BUCKET_FEATURE_KEYS _LABEL_KEY = forest_fires_constants.LABEL_KEY # Get the URI of the output artifact representing the training examples, which is a directory train_uri = os.path.join( transform.outputs['transformed_examples'].get()[0].uri, 'Split-train' ) tfrecord_filenames = [ os.path.join(train_uri, name) for name in os.listdir(train_uri) ❶ ] dataset = tf.data.TFRecordDataset( tfrecord_filenames, compression_type="GZIP" ) ❷ example_records = [] ❸ float_features = [ _transformed_name(f) for f in _DENSE_FLOAT_FEATURE_KEYS + [_LABEL_KEY] ❹ ] int_features = [ _transformed_name(f) for f in _BUCKET_FEATURE_KEYS + ➥ _VOCAB_FEATURE_KEYS ❹ ] for tfrecord in dataset.take(5): ❺ serialized_example = tfrecord.numpy() ❻ example = tf.train.Example() ❻ example.ParseFromString(serialized_example) ❻ record = [ example.features.feature[f].int64_list.value for f in int_features ❼ ] + [ example.features.feature[f].float_list.value for f in float_features ❼ ] example_records.append(record) ❽ print(example) print("="*50)
❶ 获取此目录中文件的列表(所有压缩的 TFRecord 文件)。
❷ 创建一个 TFRecordDataset 来读取这些文件。
❸ 用于存储检索到的特征值(以供以后检查)
❹ 稠密(即,浮点数)和整数(即,基于词汇和分桶)特征
❺ 获取数据集中的前五个示例。
❻ 获取一个 TF 记录并将其转换为可读的 tf.train.Example。
❼ 我们将从 tf.train.Example 对象中提取特征的值以供后续检查。
❽ 将提取的值作为记录(即,值的元组)附加到 example_records 中。
解释的代码将打印特征转换后的数据。每个示例都将整数值存储在属性路径下,例如 example.features.feature[] .int64_list.value,而浮点值存储在 example.features.feature [].float_list.value 中。这将打印例如
features { feature { key: "DC_xf" value { float_list { value: 0.4196213185787201 } } } ... feature { key: "RH_xf" value { int64_list { value: 0 } } } ... feature { key: "area_xf" value { float_list { value: 2.7699999809265137 } } } ... }
请注意,我们使用 _transformed_name()函数来获取转换后的特征名称。我们可以看到,浮点值(DC_xf)使用 z 分数标准化,基于词汇的特征(day_xf)转换为整数,并且分桶特征(RH_xf)被呈现为整数。
经验法则:尽可能检查您的管道
当使用 TFX 等第三方库提供的组件时,对于底层实际发生的事情几乎没有透明度。TFX 并不是一个高度成熟的工具,并且正在开发过程中,这使问题更加严重。因此,我们总是尝试并入一些代码片段来探查这些组件,这将帮助我们检查这些组件的输入和输出是否正常。
在下一节中,我们将训练一个简单的回归模型,作为我们一直在创建的流水线的一部分。
练习 1
假设你想要做以下事情而不是先前定义的特征转换:
- DC—将数据缩放到[0, 1]的范围内
- temp—利用边界值(-inf,20],(20,30]和(30,inf)进行分桶处理
一旦特征被转换,将它们添加到名为 outputs 的字典中,其中每个特征都以转换后的特征名称作为键。假设你可以通过调用 _transformed_name(‘temp’) 来获取 temp 的转换后的特征名称。您如何使用 tensorflow_transform 库来实现此目标?您可以使用 scale_to_0_1() 和 apply_buckets() 函数来实现这一点。
15.2 训练一个简单的回归神经网络:TFX Trainer API
您已经定义了一个 TFX 数据管道,可以将 CSV 文件中的示例转换为模型准备的特征。现在,您将使用 TFX 定义一个模型训练器,该模型训练器将采用一个简单的两层全连接回归模型,并将其训练在从数据管道流出的数据上。最后,您将使用模型对一些样本评估数据进行预测。
使用 TFX 定义了一个良好定义的数据管道后,我们就可以使用从该管道流出的数据来训练模型。通过 TFX 训练模型一开始可能会稍微费劲,因为它期望的函数和数据的严格结构。但是,一旦您熟悉了您需要遵循的格式,它就会变得更容易。
我们将分三个阶段进行本节的学习。首先,让我们看看如何定义一个适合 TFX Transform 组件中定义的输出特征的 Keras 模型。最终,模型将接收 Transform 组件的输出。接下来,我们将研究如何编写一个封装了模型训练的函数。此函数将使用所定义的模型,并结合几个用户定义的参数,对模型进行训练并将其保存到所需的路径。保存的模型不能只是任意模型;在 TensorFlow 中它们必须具有所谓的 签名。这些签名规定了模型在最终通过 API 使用时的输入和输出是什么样子的。API 通过一个服务器提供,该服务器公开一个网络端口供客户端与 API 通信。图 15.6 描述了 API 如何与模型关联。
图 15.6 模型如何与 API、TensorFlow 服务器和客户端交互
让我们理解图 15.6 中发生了什么。首先,一个 HTTP 客户端发送请求到服务器。正在监听任何传入请求的服务器(即 TensorFlow 服务服务器)将读取请求并将其指向所需的模型签名。一旦模型签名接收到数据,它将对数据进行必要的处理,将其传递给模型,并生成输出(例如预测)。一旦预测可用,服务器将其返回给客户端。我们将在单独的部分详细讨论 API 和服务器端。在本节中,我们的重点是模型。
TensorFlow 服务中的签名是什么?
在现实生活中,签名的目的是唯一标识一个人。同样,TensorFlow 使用签名来唯一确定当通过 HTTP 请求将输入传递给模型时模型应该如何行为。一个签名有一个键和一个值。键是一个唯一标识符,定义了要激活该签名的确切 URL。值被定义为一个 TensorFlow 函数(即用 @tf.function 装饰的函数)。这个函数将定义如何处理输入并将其传递给模型以获得最终期望的结果。你现在不需要担心细节。我们有一个专门的部分来学习关于签名的内容。
我们将在单独的子部分回顾签名以更详细地理解它们。最后,我们将通过加载模型并向其提供一些数据来直观地检查模型预测。
15.2.1 定义 Keras 模型
使用 TFX 训练模型的基石是定义一个模型。有两种方法可以为 TFX 定义模型:使用 Estimator API 或使用 Keras API。我们将使用 Keras API,因为 Estimator API 不推荐用于 TensorFlow 2(有关详细信息,请参见下面的侧边栏)。
Estimator API vs. Keras API
我的观点是,未来,Keras 可能会成为构建模型的首选 API,而 Estimator API 可能会被弃用。TensorFlow 网站上说:
不建议使用 Estimators 编写新代码。Estimators 运行 v1.Session 风格的代码,这更难以编写正确,并且可能表现出乎意料,特别是当与 TF 2 代码结合使用时。Estimators 落在我们的兼容性保证下,但除了安全漏洞之外将不会收到任何修复。详情请参阅迁移指南。
来源:www.tensorflow.org/tfx/tutorials/tfx/components
我们首先要创建一个名为 _build_keras_model() 的函数,它将执行两项任务。首先,它将为我们在 Transform 步骤中定义的所有特征创建 tf.feature_column 类型的对象。tf.feature_column 是一种特征表示标准,被 TensorFlow 中定义的模型所接受。它是一种用于以列为导向的方式定义数据的便利工具(即,每个特征都表示为一列)。列式表示非常适用于结构化数据,其中每列通常是目标变量的独立预测器。让我们来看一些在 TensorFlow 中找到的具体 tf.feature_column 类型:
- tf.feature_column.numeric_column——用于表示像温度这样的稠密浮点字段。
- tf.feature_column.categorical_column_with_identity——用于表示分类字段或桶化字段,其中值是指向类别或桶的整数索引,例如日或月。因为传递给列本身的值是类别 ID,所以使用了“identity”这个术语。
- tf.feature_column.indicator_column—将 tf.feature_column.categorical_column_with_identity 转换为独热编码表示。
- tf.feature_column.embedding_column—可以用于从基于整数的列(如 tf.feature_column.categorical_column_with_identity)生成嵌入。它在内部维护一个嵌入层,并将给定整数 ID 返回相应的嵌入。
要查看完整列表,请参考mng.bz/6Xeo
。在这里,我们将使用 tf.feature_columns 的前三种类型作为我们待定义模型的输入。以下列表概述了如何使用 tf.feature_columns 作为输入。
第 15.6 节 构建使用特征列的 Keras 模型
def _build_keras_model() -> tf.keras.Model: ❶ real_valued_columns = [ ❷ tf.feature_column.numeric_column(key=key, shape=(1,)) for key in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS) ] categorical_columns = [ ❸ tf.feature_column.indicator_column( tf.feature_column.categorical_column_with_identity( key, num_buckets=len(boundaries)+1 ) ) for key, boundaries in zip( _transformed_names(_BUCKET_FEATURE_KEYS), _BUCKET_FEATURE_BOUNDARIES ) ] categorical_columns += [ ❹ tf.feature_column.indicator_column( tf.feature_column.categorical_column_with_identity( key, num_buckets=num_buckets, default_value=num_buckets-1 ) ) for key, num_buckets in zip( _transformed_names(_VOCAB_FEATURE_KEYS), _MAX_CATEGORICAL_FEATURE_VALUES ) ] model = _dnn_regressor( ❺ columns=real_valued_columns+categorical_columns, ❻ dnn_hidden_units=[128, 64] ❼ ) return model
❶ 定义函数签名。它将一个 Keras 模型作为输出返回。
❷ 为密集特征创建 tf.feature_column 对象。
❸ 为分桶特征创建 tf.feature_column 对象。
❹ 为分类特征创建 tf.feature_column 对象。
❺ 使用该函数定义一个深度回归模型。
❻ 使用上面定义的列
❼ 它将有两个中间层:128 个节点和 64 个节点。
让我们看一下存储在 real_valued_columns 中的第一组特征列。我们取密集浮点值列的原始键的转换名称,并为每列创建一个 tf.feature_column.numeric_column。您可以看到我们正在传递
- 键(字符串)—特征的名称
- 形状(一个列表/元组)—完整形状将派生为[批量大小] + 形状
例如,列 temp 的键将为 temp_xf,形状为(1,),意味着完整形状为[批量大小,1]。这个形状为[批量大小,1]是有意义的,因为每个密集特征每条记录只有一个值(这意味着我们在形状中不需要特征维度)。让我们通过一个玩具例子来看看 tf.feature_column.numeric_column 的运作:
a = tf.feature_column.numeric_column("a") x = tf.keras.layers.DenseFeatures(a)({'a': [0.5, 0.6]}) print(x)
这将输出
tf.Tensor( [[0.5] [0.6]], shape=(2, 1), dtype=float32)
在为分桶特征定义 tf.feature_column.categorical_column_with_identity 时,您需要传递
- 键(字符串)—特征的名称
- num_buckets(整数)—分桶特征中的桶数
例如,对于被分桶的 RH 特征,其键为 RH_xf,num_buckets = 3,其中桶为[[-inf,33),[33,66),[66,inf]]。由于我们将 RH 的分桶边界定义为(33, 66),num_buckets 被定义为 len(boundaries) +1 = 3。最后,每个分类特征都包装在 tf.feature_column.indicator_column 中,以将每个特征转换为独热编码表示。同样,我们可以进行一个快速实验来查看这些特征列的效果如何:
b = tf.feature_column.indicator_column( tf.feature_column.categorical_column_with_identity('b', num_buckets=10) ) y = tf.keras.layers.DenseFeatures(b)({'b': [5, 2]}) print(y)
这将产生
tf.Tensor( [[0\. 0\. 0\. 0\. 0\. 1\. 0\. 0\. 0\. 0.] [0\. 0\. 1\. 0\. 0\. 0\. 0\. 0\. 0\. 0.]], shape=(2, 10), dtype=float32)
最后,基于词汇的分类特征与分桶特征类似处理。对于每个特征,我们获取特征名称和最大类别数,并使用 tf.feature_column.categorical_column_with_identity 列定义一个列,其中
- 键(字符串)—特征的名称。
- num_buckets(整数)—类别数。
- default_value(int)—如果遇到以前看不见的类别,将分配该值。
TensorFlow 实战(七)(4)https://developer.aliyun.com/article/1522945