简介
Ray Data是构建于Ray之上的分布式数据处理库,提供了高性能且可扩展的API,可被有效结合到AI相关场景,例如批量推理、数据预处理等。Ray Data内部通过流式执行机制来有效处理大规模数据集,并可有效的利用异构架构,不让CPU或GPU成为数据处理的瓶颈。
虽然Ray Data提供了简单易用的API,但其内部实现与执行原理较难理解,二次开发与问题排查有一定的难度,因此本文从Ray Data源码入手,重点分析了Ray Data流式执行的实现原理。
示例
import ray from typing import Dict from numpy.typing import NDArray # Load a CSV dataset directly from NAS/S3 ds = ray.data.read_csv("/mnt/example-data/iris.csv") #用户定义Map推理类 class CustomMapActor: # 初始化 def __init__(self): print("init") # 推理,自定义模型调用逻辑,必需实现 def __call__(self, single): single["output"] = "test" return single #用户定义MapBatch推理类 class CustomMapBatchActor: # 初始化 def __init__(self): print("init") # 推理,自定义模型调用逻辑,格式为{key:NDArray, key:NDArray},key为列名,NDArray为batch_size长度的列值数组 def __call__(self, data: Dict[str, NDArray]): return data ds = ds.map(CustomMapActor,concurrency=5) ds = ds.map_batches(CustomMapBatchActor, concurrency=5, batch_size=1024) ds.write_csv("/mnt/result-data/")
上面这段代码定义了一个经典的Ray Data数据处理流程,从CSV文件中读取数据,使用map操作处理每一行数据,再使用map_batches操作批量处理数据,最后将结果写入到CSV文件中进行保存。得益于流式执行,即使输入数据量远大于机器内存,Ray Data依然可以将数据处理完。Ray Data通过read_api获得一个Dataset,并使用Dataset的map、map_batches算子对输入数据进行处理,最终将数据写入到某个位置。
基础概念
Datasets与blocks
Datasets类位于ray.data.dataset.py文件中,是Ray Data的核心概念,也是最重要的面向用户的API,Dataset代表着一个分布式数据集合,并定义了数据加载、处理、写入的算子。用户总是通过read_api来生成一个数据集,并对数据执行一系列转换,最终将转换后的数据写入到某个存储中。Dataset的API是Lazy的,也就是说每一步定义的算子并不会执行,直到真正需要消耗数据时才会执行,可以类比Java Stream API,每一步中间操作定义的是一个执行计划,只有在遇到Terminal Operation时,才会真正触发计算的执行。
在Ray Data内部,数据集由blocks组成,每个block代表着dataset连续多行的一个子集合。block是ray data存储与在集群中传输的最小单元,可以被并行处理。
上图展示了Dataset以及block的概念,每个block存储了数据集中连续的1000行数据,在底层实际存储为pyarrow的Table,如果每行的数据结构较复杂导致pyarrow无法表示,Ray Data会自动转而使用pandas Dataframes来存储。
同时可以看出,Dataset位于Ray Job的Driver程序中,操作的都是block的ObjectRef。每个block的实际数据被存储在Ray的分布式共享内存,即Object Store中。
Operators与Plans
Ray Data中,Operator是算子的基础抽象,每个read、map、write操作实际是由算子来执行。Plans则代表着执行计划,实际上存储了一个由Operator以及依赖关系组成的DAG,DAG中的每一个节点是一个Operator。Ray Data定义了Logical Operator与Physical Operator两种算子,其中Logical Operator是无状态的算子,描述了应该执行哪种操作,而Physical Operator则实际描述了如何启动分布式Ray Task来执行这些操作。在我们调用Dataset API时,事实上是构建了一个由Logical Operator组成的Logical Plan,在实际执行时,Ray Data内部的Planner组件会将每个Logical Operator转换为实际执行的Physical Operator,并生成最终由Physical Operator组成的DAG,即Physical Plan,用于真正的执行操作。
上图展示了Ray Data构建Physical Plan的过程,首先Ray会采用内部的Optimizer对Logical Plan做优化,可能会合并或拆分一些算子,接着转换为Physical Plan,再次进行优化,得到最终的算子执行DAG。Optimizer主要是为了提升执行性能,例如用户连续定义了多个基于Ray Task的map操作,Ray Data最终会将其融合为1个map 操作,以减少序列化的开销。
例如以下代码:
ds = ray.data.read_csv(xx) class MapActor: xxxx ds = ds.map(MapActor) ds = ds.write_parquet(xxx)
生成的Logical Plan如下:
Read->MapRows->Write
转换成的Physical Plan如下:
InputDataBuffer->TaskPoolMapOperator(ReadCSV)->ActorPoolMapOperator(MapActor)->TaskPoolMapOperator(WriteCSV)
其中InputDataBuffer与TaskPoolMapOperator、ActorPoolMapOperator都是PhysicalOperator的子类,PhysicalOperator的工作模式为:
1. 接收一个block引用流;
2. 对输入block执行操作(可能启动Ray Task或Ray Actor来分布式执行);
3. 输出另一个block引用流;
流式执行模型
Ray Data采用流式执行的方式来高效处理大规模数据集。
流式执行模型将多个Operator连接成一个流水线,每个Operator拥有一个输入队列与输出队列,在具体实现中,每个Operator的输入队列被设置为其前置Operator的输出队列(即两者为同一对象引用),这使得每个Operator的输出可以直接传输到下游Operator的输入队列中,从而实现了高效的数据流动。
这种流水线架构允许多个Operator并行执行,以提升整体的资源利用率,如果一个操作需要CPU资源,另一个操作需要GPU资源,流水线模型可以在GPU运行的同时执行CPU任务,而无需等待上一个任务执行完成。这也是Ray Data宣传的其相对于Spark的核心优势,下面的动图也直观的展示了这一点:
源码分析
读取数据源
在ray/data/read_api.py文件中提供了对用户暴露的读取数据源接口,我们总是从read_api中的函数开始构建数据处理流程,例如示例代码中的read_csv函数,其源码如下:
read_csv方法
@PublicAPI def read_csv( paths: Union[str, List[str]], *, filesystem: Optional["pyarrow.fs.FileSystem"] = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Optional[Dict[str, Any]] = None, meta_provider: Optional[BaseFileMetadataProvider] = None, partition_filter: Optional[PathPartitionFilter] = None, partitioning: Partitioning = Partitioning("hive"), include_paths: bool = False, ignore_missing_paths: bool = False, shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None, file_extensions: Optional[List[str]] = None, concurrency: Optional[int] = None, override_num_blocks: Optional[int] = None, **arrow_csv_args, ) -> Dataset: """Creates a :class:`~ray.data.Dataset` from CSV files. Args: paths: 传入文件读取路径 filesystem: 可以传入文件系统 其余参数省略 Returns: :class:`~ray.data.Dataset` producing records read from the specified paths. """ _emit_meta_provider_deprecation_warning(meta_provider) if meta_provider is None: meta_provider = DefaultFileMetadataProvider() #核心部分,构建CSVDatasource datasource = CSVDatasource( paths, arrow_csv_args=arrow_csv_args, filesystem=filesystem, open_stream_args=arrow_open_stream_args, meta_provider=meta_provider, partition_filter=partition_filter, partitioning=partitioning, ignore_missing_paths=ignore_missing_paths, shuffle=shuffle, include_paths=include_paths, file_extensions=file_extensions, ) #所有的read_xxx函数,最终都适用read_datasource方法传入一个Datasource,并返回一个Dataset类 return read_datasource( datasource, parallelism=parallelism, ray_remote_args=ray_remote_args, concurrency=concurrency, override_num_blocks=override_num_blocks, )
可以看出,read_csv函数首先构建了CSVDatasource类,该类为Datasource类的子类,Datasource是定义数据读取源头的接口,要求子类来具体实现。接着调用read_datasource方法,返回一个Dataset对象。
Datasource
@PublicAPI classDatasource: """Interface for defining a custom :class:`~ray.data.Dataset` datasource. To read a datasource into a dataset, use :meth:`~ray.data.read_datasource`. """ # noqa: E501 def get_name(self) -> str: """Return a human-readable name forthis datasource. This will be used as the names of the read tasks. """ name = type(self).__name__ datasource_suffix = "Datasource" if name.endswith(datasource_suffix): name = name[: -len(datasource_suffix)] return name def estimate_inmemory_data_size(self) -> Optional[int]: """Return an estimate of the in-memory data size, or None if unknown. Note that the in-memory data size may be larger than the on-disk data size. """ raise NotImplementedError def get_read_tasks(self, parallelism: int) -> List["ReadTask"]: """Execute the read and return read tasks. Args: parallelism: The requested read parallelism. The number of read tasks should equal to this value if possible. Returns: A list of read tasks that can be executed to read blocks from the datasource in parallel. """ raise NotImplementedError
Datasource类中最重要的方法是get_read_tasks方法,返回的是一个ReadTask的列表,需要由子类来实现,ReadTask对自定义的读取函数进行了封装,定义如下:
ReadTask
@DeveloperAPI class ReadTask(Callable[[], Iterable[Block]]): """A function used to read blocks from the :class:`~ray.data.Dataset`. Read tasks are generated by :meth:`~ray.data.Datasource.get_read_tasks`, andreturn a list of ``ray.data.Block`` when called. Initial metadata about the read operation can be retrieved via the ``metadata`` attribute prior to executing the read. Final metadata is returned after the read along with the blocks. Ray will execute read tasks in remote functions to parallelize execution. Note that the number of blocks returned can vary at runtime. For example, if a task is reading a single large file it can return multiple blocks to avoid running out of memory during the read. The initial metadata should reflect all the blocks returned by the read, e.g., if the metadata says ``num_rows=1000``, the read can return a single block of 1000 rows, or multiple blocks with 1000 rows altogether. The final metadata (returned with the actual block) reflects the exact contents of the block itself. """ def __init__(self, read_fn: Callable[[], Iterable[Block]], metadata: BlockMetadata): self._metadata = metadata self._read_fn = read_fn @property def metadata(self) -> BlockMetadata: return self._metadata @property def read_fn(self) -> Callable[[], Iterable[Block]]: return self._read_fn def __call__(self) -> Iterable[Block]: result = self._read_fn() if not hasattr(result, "__iter__"): DeprecationWarning( "Read function must return Iterable[Block], got {}. " "Probably you need to return `[block]` instead of " "`block`.".format(result) ) yield from result
不难看出,ReadTask将传入的_read_fn包装成了一个可以直接调用的类,_read_fn则具体定义了如何从数据源读取数据,并封装成Block,Ray Data最终会启动Ray的remote方法来分布式并行读取数据。
下面我们看一下CSVDatasource如何具体实现get_read_tasks方法的。
FileBasedDatasource
@DeveloperAPI class FileBasedDatasource(Datasource): """File-based datasource for reading files. Don't use this class directly. Instead, subclass it and implement `_read_stream()`. """ def __init__( self, paths: Union[str, List[str]], ... ): #... #... #... paths, self._filesystem = _resolve_paths_and_filesystem(paths, filesystem) self._filesystem = RetryingPyFileSystem.wrap( self._filesystem, retryable_errors=self._data_context.retried_io_errors ) paths, file_sizes = map( list, zip( *meta_provider.expand_paths( paths, self._filesystem, partitioning, ignore_missing_paths=ignore_missing_paths, ) ), ) # Read tasks serialize `FileBasedDatasource` instances, and the list of paths # can be large. To avoid slow serialization speeds, we store a reference to # the paths rather than the paths themselves. self._paths_ref = ray.put(paths) self._file_sizes_ref = ray.put(file_sizes) def get_read_tasks(self, parallelism: int) -> List[ReadTask]: #... #... #定义了从所有文件路径中读取Block的方法 def read_files( read_paths: Iterable[str], ) -> Iterable[Block]: nonlocal filesystem, open_stream_args, partitioning fs = _unwrap_s3_serialization_workaround(filesystem) #核心是遍历所有文件路径,并调用子类实现的_read_stream从单个路径中读取Block for read_path in read_paths: partitions: Dict[str, str] = {} if partitioning is not None: parse = PathPartitionParser(partitioning) partitions = parse(read_path) with RetryingContextManager( self._open_input_source(fs, read_path, **open_stream_args), context=self._data_context, ) as f: for block in iterate_with_retry( lambda: self._read_stream(f, read_path), description="read stream iteratively", match=self._data_context.retried_io_errors, ): if partitions: block = _add_partitions(block, partitions) if self._include_paths: block_accessor = BlockAccessor.for_block(block) block = block_accessor.fill_column("path", read_path) yield block #主要封装了多线程读取与单线程读取 def create_read_task_fn(read_paths, num_threads): def read_task_fn(): nonlocal num_threads, read_paths #多线程读取 if num_threads > 0: yield from make_async_gen( iter(read_paths), read_files, num_workers=num_threads, preserve_ordering=True, buffer_size=max(len(read_paths) // num_threads, 1), ) #单线程读取 else: yield from read_files(read_paths) return read_task_fn # fix https://github.com/ray-project/ray/issues/24296 parallelism = min(parallelism, len(paths)) #根据用户传入对parallelism对文件路径进行分块,共生成parallelism个ReadTask read_tasks = [] split_paths = np.array_split(paths, parallelism) split_file_sizes = np.array_split(file_sizes, parallelism) for read_paths, file_sizes in zip(split_paths, split_file_sizes): if len(read_paths) <= 0: continue meta = self._meta_provider( read_paths, self._schema, rows_per_file=self._rows_per_file(), file_sizes=file_sizes, ) #封装实际的read_task_fn read_task_fn = create_read_task_fn(read_paths, self._NUM_THREADS_PER_TASK) #将read_task_fn传入ReadTask的构造方法,作为实际的文件读取方法 read_task = ReadTask(read_task_fn, meta) read_tasks.append(read_task) return read_tasks def _read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block]: """Streaming read a single file. This method should be implemented by subclasses. """ raise NotImplementedError( "Subclasses of FileBasedDatasource must implement _read_stream()." )
可以看出,FileBasedDatasource在初始化时对路径进行了解析,获取目录下所有文件与元数据。而在get_read_tasks方法中,首先定义了read_files,该方法遍历了所有文件路径,对每单个文件路径调用_read_stream方法读取成Block并返回,_read_stream是抽象方法,由子类实现。create_read_task_fn对read_files方法进行进一步封装,决定是单线程读取还是多线程读取。最终调用create_read_task_fn获取实际的read_task_fn,并封装到ReadTask类中进行返回。
下面看一下CSVDatasource是如何实现_read_stream方法来读取单个文件的。
CSVDatasource
class CSVDatasource(FileBasedDatasource): #重点,由父类FileBasedDatasource调用 def _read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block]: import pyarrow as pa from pyarrow import csv try: reader = csv.open_csv( f, read_options=self.read_options, parse_options=self.parse_options, **self.arrow_csv_args, ) schema = None while True: try: batch = reader.read_next_batch() table = pa.Table.from_batches([batch], schema=schema) if schema is None: schema = table.schema yield table except StopIteration: return
不难看出,_read_stream方法引入了csv库以进行数据读取,最终将读取出的数据转换为pyarrow的Table并返回,我们前文已经提到,底层中block的默认格式正是pyarrow.Table。
得益于良好的抽象定义,我们可以方便地实现自定义的Datasource,例如如果希望从一种新的文件格式中读取,只需要继承FileBasedDatasource并实现_read_stream方法即可。也可以继承Datasource类并实现get_read_tasks方法来实现任意数据源的读取。
read_datasource方法
对于所有的Datasource,在read_api中最终都会调用read_datasource方法来构建Dataset对象。
@PublicAPI @wrap_auto_init def read_datasource( datasource: Datasource, *, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, concurrency: Optional[int] = None, override_num_blocks: Optional[int] = None, **read_args, ) -> Dataset: """Read a stream from a custom :class:`~ray.data.Datasource`. Args: datasource: The :class:`~ray.data.Datasource` to read data from. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks. concurrency: The maximum number of Ray tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn't change the total number of tasks run or the total number of output blocks. By default, concurrency is dynamically decided based on the available resources. override_num_blocks: Override the number of output blocks from all read tasks. By default, the number of output blocks is dynamically decided based on input data size and available resources. You shouldn't manually setthis value in most cases. read_args: Additional kwargs to pass to the :class:`~ray.data.Datasource` implementation. Returns: :class:`~ray.data.Dataset` that reads data from the :class:`~ray.data.Datasource`. """ # noqa: E501 parallelism = _get_num_output_blocks(parallelism, override_num_blocks) ctx = DataContext.get_current() if ray_remote_args is None: ray_remote_args = {} ifnot datasource.supports_distributed_reads: ray_remote_args["scheduling_strategy"] = NodeAffinitySchedulingStrategy( ray.get_runtime_context().get_node_id(), soft=False, ) if"scheduling_strategy"not in ray_remote_args: ray_remote_args["scheduling_strategy"] = ctx.scheduling_strategy #为了兼容旧接口,一起Ray Data采用legacy_reader,现在采用datasource,这里直接认为datasource_or_legacy_reader=datasource即可 datasource_or_legacy_reader = _get_datasource_or_legacy_reader( datasource, ctx, read_args, ) cur_pg = ray.util.get_current_placement_group() requested_parallelism, _, inmemory_size = _autodetect_parallelism( parallelism, ctx.target_max_block_size, DataContext.get_current(), datasource_or_legacy_reader, placement_group=cur_pg, ) #调用Datasource.get_read_tasks方法,获取ReadTask列表,主要用于构建DatasetStats read_tasks = datasource_or_legacy_reader.get_read_tasks(requested_parallelism) import uuid stats = DatasetStats( metadata={"Read": [read_task.metadata for read_task in read_tasks]}, parent=None, needs_stats_actor=True, stats_uuid=uuid.uuid4(), ) #核心代码,创建Read Operator,这是一个Logical Operator,将datasource传入 read_op = Read( datasource, datasource_or_legacy_reader, parallelism, inmemory_size, len(read_tasks) if read_tasks else0, ray_remote_args, concurrency, ) #创建ExecutionPlan,这个类最终会用于流式执行,可以理解为 execution_plan = ExecutionPlan(stats) #创建LogicalPlan,并初始化DAG的源头为Read算子 logical_plan = LogicalPlan(read_op, execution_plan._context) #构建Dataset return Dataset( plan=execution_plan, logical_plan=logical_plan, )
此处设计到的概念比较多,核心部分是先初始化一个Read对象,Read对象是LogicalOperator的子类,接着初始化一个ExecutionPlan与LogicalPlan,并用于初始化Dataset对象。ExecutionPlan是Ray Data流式执行的核心入口,负责在构建数据集时保存LogicalPlan,并在执行时将LogicalPlan转换为PhysicalPlan,最终进行执行。
LogicalPlan的定义则较为简单,只有一个_dag字段,类型为LogicalOperator。
LogicalPlan
class LogicalPlan(Plan): """The plan with a DAG of logical operators.""" def __init__(self, dag: LogicalOperator, context: "DataContext"): super().__init__(context) self._dag = dag @property def dag(self) -> LogicalOperator: """Get the DAG of logical operators.""" return self._dag def sources(self) -> List[LogicalOperator]: """List of operators that are sources forthis plan's DAG.""" # If an operator has no input dependencies, it's a source. ifnotany(self._dag.input_dependencies): return [self._dag] sources = [] for op in self._dag.input_dependencies: sources.extend(LogicalPlan(op, self._context).sources()) return sources
有人可能疑惑为什么_dag字段是LogicalOperator类型,这是因为LogicalOperator是Operator类的子类,而Operator类通过input_dependencies字段保留了当前算子的前置依赖,因此通过DAG中的一个节点即可遍历整个图。
Operator
classOperator: """Abstract class for operators. Operators live on the driver side of the Dataset only. """ def __init__( self, name: str, input_dependencies: List["Operator"], ): self._name = name self._input_dependencies = input_dependencies self._output_dependencies = [] for x in input_dependencies: assert isinstance(x, Operator), x x._output_dependencies.append(self) @property def name(self) -> str: return self._name @property def dag_str(self) -> str: """String representation of the whole DAG.""" if self.input_dependencies: out_str = ", ".join([x.dag_str for x in self.input_dependencies]) out_str += " -> " else: out_str = "" out_str += f"{self.__class__.__name__}[{self._name}]" return out_str @property def input_dependencies(self) -> List["Operator"]: """List of operators that provide inputs for this operator.""" assert hasattr( self, "_input_dependencies" ), "Operator.__init__() was not called." return self._input_dependencies @property def output_dependencies(self) -> List["Operator"]: """List of operators that consume outputs from this operator.""" assert hasattr( self, "_output_dependencies" ), "Operator.__init__() was not called." return self._output_dependencies def post_order_iter(self) -> Iterator["Operator"]: """Depth-first traversal of this operator and its input dependencies.""" for op in self.input_dependencies: yield from op.post_order_iter() yield self def __repr__(self) -> str: return f"{self.__class__.__name__}[{self._name}]" def __str__(self) -> str: return repr(self)
在read_datasource的最后,会使用创建的LogicalPlan与ExecutionPlan初始化一个Dataset,在Dataset构造函数中,会调用ExcutionPlan的link_logical_plan方法,将LogicalPlan传入ExcutionPlan。
@PublicAPI classDataset: def __init__( self, plan: ExecutionPlan, logical_plan: LogicalPlan, ): """Construct a Dataset (internal API). The constructor is not part of the Dataset API. Use the ``ray.data.*`` read methods to construct a dataset. """ assert isinstance(plan, ExecutionPlan), type(plan) usage_lib.record_library_usage("dataset") # Legacy telemetry name. self._plan = plan self._logical_plan = logical_plan self._plan.link_logical_plan(logical_plan) # Handle to currently running executor forthis dataset. self._current_executor: Optional["Executor"] = None self._write_ds = None self._set_uuid(StatsManager.get_dataset_id_from_stats_actor())
至此,ds = ray.data.read_csv()语句就执行完毕了,创建了一个Dataset对象,并在其_plan字段中保存了一个ExecutionPlan,该ExecutionPlan保存了LogicalPlan,而LogicalPlan中保存了一个LogicalOperator的DAG,目前只有一个节点,即Read。
构建Map/MapBatches数据处理
接着我们分析ds = ds.map()中,map函数的源码。
map方法
@PublicAPI(api_group=BT_API_GROUP) def map( self, fn: UserDefinedFunction[Dict[str, Any], Dict[str, Any]], *, compute: Optional[ComputeStrategy] = None, fn_args: Optional[Iterable[Any]] = None, fn_kwargs: Optional[Dict[str, Any]] = None, fn_constructor_args: Optional[Iterable[Any]] = None, fn_constructor_kwargs: Optional[Dict[str, Any]] = None, num_cpus: Optional[float] = None, num_gpus: Optional[float] = None, memory: Optional[float] = None, concurrency: Optional[Union[int, Tuple[int, int]]] = None, ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None, **ray_remote_args, ) -> "Dataset": #根据传入的fn类型确定计算方式,如果传入的是函数,则最终会通过Ray Task的方式执行计算 #如果传入的是类,则最终会通过Ray Actor的方式执行 compute = get_compute_strategy( fn, fn_constructor_args=fn_constructor_args, compute=compute, concurrency=concurrency, ) #最后启动Ray Task/Actor时,会传入ray_remote_args。 if num_cpus is not None: ray_remote_args["num_cpus"] = num_cpus if num_gpus is not None: ray_remote_args["num_gpus"] = num_gpus if memory is not None: ray_remote_args["memory"] = memory #拷贝数据集已有的ExecutionPlan plan = self._plan.copy() #定义MapRows LogicalOperator map_op = MapRows( self._logical_plan.dag, #之前的LogicalPlan的dag,即最后一个算子作为新建算子的前置算子 fn, fn_args=fn_args, fn_kwargs=fn_kwargs, fn_constructor_args=fn_constructor_args, fn_constructor_kwargs=fn_constructor_kwargs, compute=compute, ray_remote_args_fn=ray_remote_args_fn, ray_remote_args=ray_remote_args, ) #更新LogicalPlan,此时map_op已有前置输入 logical_plan = LogicalPlan(map_op, self.context) return Dataset(plan, logical_plan)
可以看出,map方法定义了一个MapRows Operator,并传入Dataset已有的dag作为其input_dependencies,相当于将MapRows添加到DAG的末尾,接着更新logical_plan,将新创建的MapRows作为LogicalPlan的DAG入口,相当于LogicalPlan始终保存着DAG上的最后一个Operator。最后返回Dataset,可以看出ExecutionPlan仅是copy了一次,没有改变,而LogicalPlan新增了一个节点。
此时DAG变为了:Read-->MapRows
map_batches方法
map_batches方法与map方法类似,区别在于最终生成的是MapBatches Operator,此处不再赘述。
经过map_batches方法后,DAG变成了:Read->MapRows->MapBatches
输出数据
在执行到ds.write_csv("/mnt/result-data/")语句时,Ray Data首先构造一个Write操作,接着真正开始触发执行。
write_csv方法
@ConsumptionAPI @PublicAPI(api_group=IOC_API_GROUP) def write_csv( self, path: str, *, filesystem: Optional["pyarrow.fs.FileSystem"] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, filename_provider: Optional[FilenameProvider] = None, arrow_csv_args_fn: Optional[Callable[[], Dict[str, Any]]] = None, min_rows_per_file: Optional[int] = None, ray_remote_args: Dict[str, Any] = None, concurrency: Optional[int] = None, num_rows_per_file: Optional[int] = None, **arrow_csv_args, ) -> None: #定义CSVDatasink,可以类比于读取时的Datasource datasink = CSVDatasink( path, arrow_csv_args_fn=arrow_csv_args_fn, arrow_csv_args=arrow_csv_args, min_rows_per_file=effective_min_rows, filesystem=filesystem, try_create_dir=try_create_dir, open_stream_args=arrow_open_stream_args, filename_provider=filename_provider, dataset_uuid=self._uuid, ) self.write_datasink( datasink, ray_remote_args=ray_remote_args, concurrency=concurrency, )
可以看出,首先构建了一个CSVDatasink,该类是Datasink类的子类,Datasink可以类比于Datasource,定义了数据输出的方式,核心接口为write,需要子类实现。
Datasink
@DeveloperAPI class Datasink(Generic[WriteReturnType]): def write( self, blocks: Iterable[Block], ctx: TaskContext, ) -> WriteReturnType: """Write blocks. This is used by a single write task. Args: blocks: Generator of data blocks. ctx: ``TaskContext`` for the write task. Returns: Result of this write task. When the entire write operator finishes, All returned values will be passed as `WriteResult.write_returns` to `Datasink.on_write_complete`. """ raise NotImplementedError
可以看出,Datasink的write方法定义了如何将一个block写入到指定位置,并返回一个写入结果。我们可以具体看一个_FileDatasink的例子,该类是多种基于文件的Datasink的父类:
_FileDatasink
class _FileDatasink(Datasink[None]): #...... def write( self, blocks: Iterable[Block], ctx: TaskContext, ) -> None: builder = DelegatingBlockBuilder() #遍历block的迭代器,合并block(block可能会被Split),并调用write_block方法。 for block in blocks: builder.add_block(block) block = builder.build() block_accessor = BlockAccessor.for_block(block) if block_accessor.num_rows() == 0: logger.warning(f"Skipped writing empty block to {self.path}") return self.write_block(block_accessor, 0, ctx) #由子类实现 def write_block(self, block: BlockAccessor, block_index: int, ctx: TaskContext): raise NotImplementedError @DeveloperAPI class BlockBasedFileDatasink(_FileDatasink): def write_block_to_file(self, block: BlockAccessor, file: "pyarrow.NativeFile"): """Write a block of data to a file. Args: block: The block to write. file: The file to write the block to. """ raise NotImplementedError def write_block(self, block: BlockAccessor, block_index: int, ctx: TaskContext): #获取写入的文件名 filename = self.filename_provider.get_filename_for_block( block, ctx.task_idx, block_index ) #获取写入地址 write_path = posixpath.join(self.path, filename) def write_block_to_path(): #打开文件写入流 with self.open_output_stream(write_path) as file: #由子类实现具体写入 self.write_block_to_file(block, file) call_with_retry( write_block_to_path, description=f"write '{write_path}'", match=self._data_context.retried_io_errors, )
可以看出,_FileDatasink的write函数首先遍历Block的迭代器,获取每一个block并组装成一个大block,这主要是为了兼容Block可能会被Split的情况。接着调用write_block方法,该方法由子类重写。在BlockBasedFileDatasink子类中,write_block方法生成了一个文件地址,并打开文件流,最终调用子类的write_block_to_file方法将Block的数据写入到文件中。
CSVDatasink
class CSVDatasink(BlockBasedFileDatasink): def write_block_to_file(self, block: BlockAccessor, file: "pyarrow.NativeFile"): from pyarrow import csv writer_args = _resolve_kwargs(self.arrow_csv_args_fn, **self.arrow_csv_args) write_options = writer_args.pop("write_options", None) csv.write_csv(block.to_arrow(), file, write_options, **writer_args)
在CSVDatasink中,依然是调用了csv库来实现文件的写入,显然与Datasource一样,我们也可以自定义数据将被写入的位置。
write_datasink方法
同样的,在write_xxx方法中,最终都会调用write_datasink方法,该方法首先往DAG中添加一个Write Operator,接着触发数据集的真正执行。
@ConsumptionAPI(pattern="Time complexity:") def write_datasink( self, datasink: Datasink, *, ray_remote_args: Dict[str, Any] = None, concurrency: Optional[int] = None, ) -> None: #... #... plan = self._plan.copy() #往DAG中添加Write Operator write_op = Write( self._logical_plan.dag, datasink, ray_remote_args=ray_remote_args, concurrency=concurrency, ) logical_plan = LogicalPlan(write_op, self.context) try: datasink.on_write_start() #通过materialize方法触发数据集的真正执行 self._write_ds = Dataset(plan, logical_plan).materialize() #获取到数据集执行的结果,实际上是最后一个Operator的输出 raw_write_results = ray.get(self._write_ds._plan.execute().block_refs) write_result = gen_datasink_write_result(raw_write_results) logger.info( "Data sink %s finished. %d rows and %s data written.", datasink.get_name(), write_result.num_rows, memory_string(write_result.size_bytes), ) datasink.on_write_complete(write_result) except Exception as e: datasink.on_write_failed(e) raise
首先与之前类似,在DAG中添加了Write Operator,接着调用了Dataset.materialize()方法触发了数据集的真正执行,然后通过self._write_ds._plan.execute().block_refs方法得到执行结果,并输出到日志中。接着我们具体看一下数据集的执行过程。
流式执行
@ConsumptionAPI(pattern="store memory.", insert_after=True) @PublicAPI(api_group=E_API_GROUP) def materialize(self) -> "MaterializedDataset": #首先复制原本的数据集 copy = Dataset.copy(self, _deep_copy=True, _as=MaterializedDataset) #直接触发数据集的执行,并获得最终执行结果 bundle = copy._plan.execute() #定义一个新的LogicalPlan,只有一个InputData Operator,用于缓存数据集的执行结果 logical_plan = LogicalPlan(InputData(input_data=ref_bundles), self.context) #将缓存执行结果的数据集输出 output = MaterializedDataset( ExecutionPlan(copy._plan.stats(), data_context=copy.context), logical_plan, ) # Metrics are tagged with `copy`s uuid, update the output uuid with # this so the user can access the metrics label. output.set_name(copy.name) output._set_uuid(copy._get_uuid()) output._plan.execute() # No-op that marks the plan as fully executed. return output
该方法调用了数据集的ExecutionPlan的execute方法来触发执行,并将执行结果保存在一个新的MaterializedDataset中返回。我们具体看一下ExecutionPlan.execute()方法的逻辑。
classExecutionPlan: def execute( self, preserve_order: bool = False, ) -> RefBundle: """Execute this plan. Args: preserve_order: Whether to preserve order in execution. Returns: The blocks of the output dataset. """ self._has_started_execution = True # Always used the saved context for execution. context = self._context #判断下是否已经执行过了,如果执行过就不再重复执行 ifnot self.has_computed_output(): from ray.data._internal.execution.legacy_compat import ( _get_initial_stats_from_plan, execute_to_legacy_block_list, ) if self._logical_plan.dag.output_data() is not None: #... else: #如果没有执行过,则采用StreamingExecutor流式执行 from ray.data._internal.execution.streaming_executor import ( StreamingExecutor, ) metrics_tag = create_dataset_tag(self._dataset_name, self._dataset_uuid) #初始化StreamingExecutor executor = StreamingExecutor( context, metrics_tag, ) #真正触发executor的执行 blocks = execute_to_legacy_block_list( executor, self, dataset_uuid=self._dataset_uuid, preserve_order=preserve_order, ) bundle = RefBundle( tuple(blocks.iter_blocks_with_metadata()), owns_blocks=blocks._owned_by_consumer, ) # Set the snapshot to the output of the finaloperator. self._snapshot_bundle = bundle self._snapshot_operator = self._logical_plan.dag self._snapshot_stats = stats self._snapshot_stats.dataset_uuid = self._dataset_uuid return self._snapshot_bundle
在execute方法中,首先判断是否已经执行过了,如果已经执行过则直接返回结果,我们重点看一下没有执行过的情况。此时会初始化一个StreamingExecutor,并调用execute_to_legacy_block_list方法。
def execute_to_legacy_block_list( executor: Executor, plan: ExecutionPlan, dataset_uuid: str, preserve_order: bool, ) -> BlockList: """Execute a plan with the new executor and translate it into a legacy block list. Args: executor: The executor to use. plan: The legacy plan to execute. dataset_uuid: UUID of the dataset forthis execution. preserve_order: Whether to preserve order in execution. Returns: The output as a legacy block list. """ #Logical Operator到Physical Operator的转换在此处执行 dag, stats = _get_execution_dag( executor, plan, preserve_order, ) #对DAG流式执行 bundles = executor.execute(dag, initial_stats=stats) #将RefBundle转换为BlockList,RefBundle事实上是一组block引用的集合,是Operators之间进行数据传输的基本单元 #RefBudle通常存储1个Block以及其MetaData,但如果发生了block splitting,也可能存储多个 block_list = _bundles_to_block_list(bundles) # Set the stats UUID after execution finishes. _set_stats_uuid_recursive(executor.get_stats(), dataset_uuid) return block_list
此处做了两件事,首先调用_get_execution_dag方法将LogicalPlan转换为PhysicalPlan,接着使用StreamingExecutor来执行PhysicalPlan,并返回结果。我们分别分析下这两个过程。
LogicalPlan转换为PhysicalPlan
_get_execution_dag
def _get_execution_dag( executor: Executor, plan: ExecutionPlan, preserve_order: bool, ) -> Tuple[PhysicalOperator, DatasetStats]: """Get the physical operators DAG from a plan.""" # Record usage of logical operators if available. ifhasattr(plan, "_logical_plan")and plan._logical_plan is not None: record_operators_usage(plan._logical_plan.dag) # Get DAG of physical operators and input statistics. dag = get_execution_plan(plan._logical_plan).dag stats = _get_initial_stats_from_plan(plan) # Enforce to preserve ordering if the plan has operators # required to do so, such as Zip and Sort. if preserve_order or plan.require_preserve_order(): executor._options.preserve_order = True return dag, stats def get_execution_plan(logical_plan: LogicalPlan) -> PhysicalPlan: """Get the physical execution plan for the provided logical plan. This process has 3 steps: (1) logical optimization: optimize logical operators. (2) planning: convert logical to physical operators. (3) physical optimization: optimize physical operators. """ optimized_logical_plan = LogicalOptimizer().optimize(logical_plan) logical_plan._dag = optimized_logical_plan.dag physical_plan = Planner().plan(optimized_logical_plan) return PhysicalOptimizer().optimize(physical_plan)
_get_execution_dag的核心思路就是将LogicalPlan转换为PhysicalPlan,顺便进行Plan的优化。该转换过程是由Planner().plan完成的。
Planner().plan
classPlanner: """The planner to convert optimized logical to physical operators. Note that planner is only doing operators conversion. Physical optimization work is done by physical optimizer. """ def __init__(self): self._physical_op_to_logical_op: Dict[PhysicalOperator, LogicalOperator] = {} def plan(self, logical_plan: LogicalPlan) -> PhysicalPlan: """Convert logical to physical operators recursively in post-order.""" physical_dag = self._plan(logical_plan.dag, logical_plan.context) physical_plan = PhysicalPlan( physical_dag, self._physical_op_to_logical_op, logical_plan.context, ) return physical_plan def _plan( self, logical_op: LogicalOperator, data_context: DataContext ) -> PhysicalOperator: # Plan the input dependencies first. physical_children = [] for child in logical_op.input_dependencies: physical_children.append(self._plan(child, data_context)) physical_op = None for op_type, plan_fn in PLAN_LOGICAL_OP_FNS: if isinstance(logical_op, op_type): # We will call `set_logical_operators()` in the following for-loop, # no need to do it here. physical_op = plan_fn(logical_op, physical_children, data_context) break if physical_op is None: raise ValueError( f"Found unknown logical operator during planning: {logical_op}" ) # Traverse up the DAG, andset the mapping from physical to logical operators. # At this point, all physical operators without logical operators set # must have been created by the current logical operator. queue = [physical_op] whilequeue: curr_physical_op = queue.pop() # Once we find an operator with a logical operatorset, we can stop. if curr_physical_op._logical_operators: break curr_physical_op.set_logical_operators(logical_op) queue.extend(physical_op.input_dependencies) self._physical_op_to_logical_op[physical_op] = logical_op return physical_op
_plan方法中,通过递归构建的方式将LogicalPlan转换为PhysicalPlan,最关键的语句是:physical_op = plan_fn(logical_op, physical_children, data_context)。这里通过ray data预定义好的plan_fn进行Operator转换。每个LogicalOperator都有其对应的转换函数。我们以读取数据时使用的Read为例具体分析下转换过程,Read的转换函数为plan_read_op,在此之前我们先看一下PhysicalOperator的结构。
PhysicalOperator
class PhysicalOperator(Operator): """Abstract class for physical operators. An operator transforms one or more input streams of RefBundles into a single output stream of RefBundles. Physical operators are stateful and non-serializable; they live on the driver side of the Dataset only. Here's a simple example of implementing a basic "Map"operator: class MapOperator(PhysicalOperator): def __init__(self): self.active_tasks = [] def add_input(self, refs, _): self.active_tasks.append(map_task.remote(refs)) def has_next(self): ready, _ = ray.wait(self.active_tasks, timeout=0) return len(ready) > 0 def get_next(self): ready, remaining = ray.wait(self.active_tasks, num_returns=1) self.active_tasks = remaining return ready[0] Note that the above operator fully supports both bulk and streaming execution, since `add_input` and `get_next` can be called in any order. In bulk execution (now deprecated), all inputs would be added up-front, but in streaming execution (now the default execution mode) the calls could be interleaved. """ _OPERATOR_ID_LABEL_KEY = "__data_operator_id" def __init__( self, name: str, input_dependencies: List["PhysicalOperator"], data_context: DataContext, target_max_block_size: Optional[int], ): super().__init__(name, input_dependencies) for x in input_dependencies: assert isinstance(x, PhysicalOperator), x self._inputs_complete = not input_dependencies self._output_block_size_option = None self.set_target_max_block_size(target_max_block_size) self._started = False self._in_task_submission_backpressure = False self._in_task_output_backpressure = False self._estimated_num_output_bundles = None self._estimated_output_num_rows = None self._execution_completed = False # The LogicalOperator(s) which were translated to create this PhysicalOperator. # Set via `PhysicalOperator.set_logical_operators()`. self._logical_operators: List[LogicalOperator] = [] self._data_context = data_context self._id = str(uuid.uuid4()) # Initialize metrics after data_context is set self._metrics = OpRuntimeMetrics(self) def start(self, options: ExecutionOptions) -> None: """Called by the executor when execution starts for an operator. Args: options: The global options used for the overall execution. """ self._started = True def should_add_input(self) -> bool: """Return whether it is desirable to add input to thisoperator right now. Operators can customize the implementation of this method to apply additional backpressure (e.g., waiting for internal actors to be created). """ return True def add_input(self, refs: RefBundle, input_index: int) -> None: """Called when an upstream result is available. Inputs may be added in any order, and calls to `add_input` may be interleaved with calls to `get_next` / `has_next` to implement streaming execution. Subclasses should override `_add_input_inner` instead of this method. Args: refs: The ref bundle that should be added as input. input_index: The index identifying the input dependency producing the input. For most operators, this is always `0` since there is only one upstream input operator. """ assert 0 <= input_index < len(self._input_dependencies), ( f"Input index out of bounds (total inputs {len(self._input_dependencies)}, " f"index is {input_index})" ) self._metrics.on_input_received(refs) self._add_input_inner(refs, input_index) def _add_input_inner(self, refs: RefBundle, input_index: int) -> None: """Subclasses should overridethis method to implement `add_input`.""" raise NotImplementedError def has_next(self) -> bool: """Returns when a downstream output is available. When this returns true, it is safe to call `get_next()`. """ raise NotImplementedError def get_next(self) -> RefBundle: """Get the next downstream output. It is only allowed to call thisif `has_next()` has returned True. Subclasses should override `_get_next_inner` instead of this method. """ output = self._get_next_inner() self._metrics.on_output_taken(output) return output def _get_next_inner(self) -> RefBundle: """Subclasses should overridethis method to implement `get_next`.""" raise NotImplementedError
正如官方注释所说,PhysicalOperator将RefBundles输入流转换为RefBundles输出流,其核心方法为add_input,即接收输入,以及get_next,即获取输出。add_input内部实际调用了_add_input_inner抽象方法,该方法必须由子类实现,用于真正实现消费上游的输出。get_next方法则调用了_get_next_inner抽象方法,由子类具体实现如何获取输出。add_input与get_next可以被以任意顺序调用,从而实现流式执行。
下面我们看一下plan_read_op的实现是如何将Read这一LogicalOperator转换为PhysicalOperator的
plan_read_op方法
def plan_read_op( op: Read, physical_children: List[PhysicalOperator], data_context: DataContext, ) -> PhysicalOperator: """Get the corresponding DAG of physical operators for Read. Note this method only converts the given `op`, but not its input dependencies. See Planner.plan()for more details. """ assert len(physical_children)== 0 #首先定义get_input_data def get_input_data(target_max_block_size) -> List[RefBundle]: parallelism = op.get_detected_parallelism() assert ( parallelism is not None ), "Read parallelism must be set by the optimizer before execution" #创建Read时传入了Datasource,此处调用get_read_tasks获取List[ReadTask] read_tasks = op._datasource_or_legacy_reader.get_read_tasks(parallelism) _warn_on_high_parallelism(parallelism, len(read_tasks)) #将ReadTask封装为Block ret = [] for read_task in read_tasks: read_task_ref = ray.put(read_task) ref_bundle = RefBundle( [ ( # TODO(chengsu): figure out a better way to pass read # tasks other than ray.put(). read_task_ref, cleaned_metadata(read_task, read_task_ref), ) ], # `owns_blocks` is False, because these refs are the root of the # DAG. We shouldn't eagerly free them. Otherwise, the DAG cannot # be reconstructed. owns_blocks=False, ) ret.append(ref_bundle) return ret #InputDataBuffer是一个PhysicalOperator,其输出是ReadTask inputs = InputDataBuffer( data_context, input_data_factory=get_input_data, ) #定义实际调用ReadTask来进行读取的方法 def do_read(blocks: Iterable[ReadTask], _: TaskContext) -> Iterable[Block]: for read_task in blocks: yield from read_task() # Create a MapTransformer for a read operator transform_fns: List[MapTransformFn] = [ # First, execute the read tasks. #将do_read封装到BlockMapTransformFn中 BlockMapTransformFn(do_read), ] #接着构建输出Block transform_fns.append(BuildOutputBlocksMapTransformFn.for_blocks()) #MapTransformer封装了MapOperator的数据转换逻辑 map_transformer = MapTransformer(transform_fns) return MapOperator.create( map_transformer, inputs, data_context, name=op.name, target_max_block_size=None, compute_strategy=TaskPoolStrategy(op._concurrency), ray_remote_args=op._ray_remote_args, )
首先定义了get_input_data函数,该函数调用了Datasource.get_read_tasks()方法获取了ReadTask列表,并将其封装到RefBundle中(此时的ReadTask可以理解为是一种特殊的Block)。接着初始化了一个InputDataBuffer,并设置其input_data_factory字段为get_input_data函数。InputDataBuffer属于PhysicalOperator,实现如下:
InputDataBuffer
class InputDataBuffer(PhysicalOperator): """Defines the input data for the operator DAG. For example, this may hold cached blocks from a previous Dataset execution, or the arguments for read tasks. """ def __init__( self, data_context: DataContext, input_data: Optional[List[RefBundle]] = None, input_data_factory: Optional[Callable[[int], List[RefBundle]]] = None, num_output_blocks: Optional[int] = None, ): """Create an InputDataBuffer. Args: input_data: The list of bundles to output from thisoperator. input_data_factory: The factory to get input data, if input_data is None. num_output_blocks: The number of output blocks. If not specified, progress bars total will be set based on num output bundles instead. """ super().__init__("Input", [], data_context, target_max_block_size=None) if input_data is not None: assert input_data_factory is None # Copy the input data to avoid mutating the original list. self._input_data = input_data[:] self._is_input_initialized = True self._initialize_metadata() else: # Initialize input lazily when execution is started. assert input_data_factory is not None self._input_data_factory = input_data_factory self._is_input_initialized = False self._input_data_index = 0 def start(self, options: ExecutionOptions) -> None: ifnot self._is_input_initialized: #调用_input_data_factory方法,获取ReadTask引用列表 self._input_data = self._input_data_factory( self.actual_target_max_block_size ) self._is_input_initialized = True self._initialize_metadata() # InputDataBuffer does not take inputs from other operators, # so we record input metrics here for bundle in self._input_data: self._metrics.on_input_received(bundle) super().start(options) def has_next(self) -> bool: return self._input_data_index < len(self._input_data) def _get_next_inner(self) -> RefBundle: # We can't pop the input data. If we do, Ray might garbage collect the block # references, and Ray won't be able to reconstruct downstream objects. bundle = self._input_data[self._input_data_index] self._input_data_index += 1 return bundle def get_stats(self) -> StatsDict: return {} def _add_input_inner(self, refs, input_index) -> None: raise ValueError("Inputs are not allowed for this operator.")
InputDataBuffer在init时传入了input_data_factory,用于获取ReadTask引用。在start方法中调用了input_data_factory(),并将得到的List[Refbundle(ReadTask)]存储在self._input_data字段中。
在_get_next_inner实现中,返回了一个ReadTask的引用,也就是说InputDataBuffer每次输出一个ReadTask。
InputDataBuffer没有定义_add_input_inner方法,因为InputDataBuffer一定是DAG中的第一个PhysicalOperator,是数据读取的源头。
plan_read_op,在定义了InputDataBuffer后,接着定义了一个do_read方法,该方法的输入为Iterable[ReadTask],输出为Iterable[Block],实际上调用了ReadTask来执行数据读取。do_read接着被封装到了一个MapTransformFn中。MapTransformFn是对转换函数的抽象,定义如下:
MapTransformFn
classMapTransformFn: """Represents a single transform function in a MapTransformer.""" def __init__( self, input_type: MapTransformFnDataType, output_type: MapTransformFnDataType, category: MapTransformFnCategory, is_udf: bool = False, ): """ Args: callable: the underlying Python callable object. input_type: the type of the input data. output_type: the type of the output data. """ self._callable = callable self._input_type = input_type self._output_type = output_type self._category = category self._output_block_size_option = None self._is_udf = is_udf #由子类具体实现 @abstractmethod def __call__( self, input: Iterable[MapTransformFnData], ctx: TaskContext, ) -> Iterable[MapTransformFnData]: ... @property def input_type(self) -> MapTransformFnDataType: return self._input_type @property def output_type(self) -> MapTransformFnDataType: return self._output_type class BlockMapTransformFn(MapTransformFn): """A block-to-block MapTransformFn.""" def __init__(self, block_fn: MapTransformCallable[Block, Block]): self._block_fn = block_fn super().__init__( MapTransformFnDataType.Block, MapTransformFnDataType.Block, category=MapTransformFnCategory.DataProcess, ) def __call__(self, input: Iterable[Block], ctx: TaskContext) -> Iterable[Block]: yield from self._block_fn(input, ctx) def __repr__(self) -> str: return f"BlockMapTransformFn({self._block_fn})" def __eq__(self, other): return ( isinstance(other, BlockMapTransformFn) and self._block_fn == other._block_fn )
MapTransformerFn定义了输入输出的数据格式,主要有三种:Block、Row、Batch。其具体逻辑由子类实现,例如BlockMapTransformFn就是单纯的调用传入的block_fn,将Block转换为另一个Block。也有一些子类实现Batch、Block、Row之间的相互转换。
MapTransformer
classMapTransformer: """Encapsulates the data transformation logic of a physical MapOperator. A MapTransformer may consist of one or more steps, each of which is represented as a MapTransformFn. The first MapTransformFn must take blocks as input, and the last MapTransformFn must output blocks. The intermediate data types can be blocks, rows, or batches. """ def __init__( self, transform_fns: List[MapTransformFn], init_fn: Optional[Callable[[], None]] = None, ): """ Args: transform_fns: A list of `MapTransformFn`s that will be executed sequentially to transform data. init_fn: A function that will be called before transforming data. Used for the actor-based map operator. """ self.set_transform_fns(transform_fns) self._init_fn = init_fn if init_fn is not None else lambda: None self._output_block_size_option = None self._udf_time = 0 def apply_transform( self, input_blocks: Iterable[Block], ctx: TaskContext, ) -> Iterable[Block]: iter = input_blocks # Apply the transform functions sequentially to the input iterable. for transform_fn in self._transform_fns: iter = transform_fn(iter, ctx) return iter
MapTransformer则存储了一组MapTransformerFn列表,其核心方法是apply_transform,按照顺序连续调用MapTransformerFn列表,注意MapTransformer的输入和输出必须都是Block,因此第一个MapTransformerFn的输入一定是Block,而最后一个MapTransformerFn的输出也必须是Block。
在plan_read_op中,实际上生成了一个带有两个MapTransformerFn的MapTransformer,第一个是BlockMapTransformFn(do_read),封装了真正执行读取数据的do_read函数。下一个是BuildOutputBlocksMapTransformFn.for_blocks(),这是为了输出一个标准大小的Block,例如如果系统定义了一个Block的行数是1024,那么BuildOutputBlocksMapTransformFn会每次输出一个1024行的Block。最终这个MapTransformer被传入到MapOperator.create()方法中进行初始化。MapOperator是最常见的PhysicalOperator之一,几乎所有的数据转换都依赖于MapOperator,其内部封装了一个MapTransformer,并最终通过Ray Task/Actor来分布式执行该转换。
MapOperator.create()
class MapOperator(OneToOneOperator, ABC): @classmethod def create( cls, map_transformer: MapTransformer, input_op: PhysicalOperator, data_context: DataContext, target_max_block_size: Optional[int] = None, name: str = "Map", # TODO(ekl): slim down ComputeStrategy to only specify the compute # config andnot contain implementation code. compute_strategy: Optional[ComputeStrategy] = None, min_rows_per_bundle: Optional[int] = None, supports_fusion: bool = True, ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None, ray_remote_args: Optional[Dict[str, Any]] = None, ) -> "MapOperator": """Create a MapOperator. This factory creates the MapOperator pool implementation that corresponds to the compute argument: - If None or TaskPoolStrategy -> TaskPoolMapOperator - If ActorPoolStrategy -> ActorPoolMapOperator Args: transform_fn: The function to apply to each ref bundle input. input_op: Operator generating input data forthis op. init_fn: The callable class to instantiate ifusing ActorPoolMapOperator. name: The name of thisoperator. compute_strategy: Customize the compute strategy forthis op. target_max_block_size: The target maximum number of bytes to include in an output block. min_rows_per_bundle: The number of rows to gather per batch passed to the transform_fn, or None to use the block size. Setting the batch size is important for the performance of GPU-accelerated transform functions. The actual rows passed may be less if the dataset is small. supports_fusion: Whether thisoperator supports fusion with other operators. ray_remote_args_fn: A function that returns a dictionary of remote args passed to each map worker. The purpose of this argument is to generate dynamic arguments for each actor/task, and will be called each time prior to initializing the worker. Args returned from this dict will always override the args in ``ray_remote_args``. Note: this is an advanced, experimental feature. ray_remote_args: Customize the :func:`ray.remote` args forthis op's tasks. """ #默认采用Ray Task方式执行 if compute_strategy is None: compute_strategy = TaskPoolStrategy() ifisinstance(compute_strategy, TaskPoolStrategy): from ray.data._internal.execution.operators.task_pool_map_operator import( TaskPoolMapOperator, ) #对于TaskPoolStrategy,采用TaskPoolMapOperator returnTaskPoolMapOperator( map_transformer, input_op, data_context, name=name, target_max_block_size=target_max_block_size, min_rows_per_bundle=min_rows_per_bundle, concurrency=compute_strategy.size, supports_fusion=supports_fusion, ray_remote_args_fn=ray_remote_args_fn, ray_remote_args=ray_remote_args, ) elif isinstance(compute_strategy, ActorPoolStrategy): from ray.data._internal.execution.operators.actor_pool_map_operator import( ActorPoolMapOperator, ) #对于ActorPoolStrategy,采用ActorPoolMapOperator returnActorPoolMapOperator( map_transformer, input_op, data_context, target_max_block_size=target_max_block_size, compute_strategy=compute_strategy, name=name, min_rows_per_bundle=min_rows_per_bundle, supports_fusion=supports_fusion, ray_remote_args_fn=ray_remote_args_fn, ray_remote_args=ray_remote_args, ) else: raise ValueError(f"Unsupported execution strategy {compute_strategy}")
在MapOperator.create函数中,会通过compute_strategy来决定初始化MapOperator两个子类中的哪一个。在我们自定义map方法时,如果传入的是类,就会采用ActorPoolMapOperator执行,如果传入的是函数,则会采用TaskPoolMapOperator来执行。
下面我们以ActorPoolMapOperator为例进行分析,该类是Ray Data最核心的部分之一。
ActorPoolMapOperator
1)构造方法
class ActorPoolMapOperator(MapOperator): """A MapOperator implementation that executes tasks on an actor pool. """ def __init__( self, map_transformer: MapTransformer, input_op: PhysicalOperator, data_context: DataContext, target_max_block_size: Optional[int], compute_strategy: ActorPoolStrategy, name: str = "ActorPoolMap", min_rows_per_bundle: Optional[int] = None, supports_fusion: bool = True, ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None, ray_remote_args: Optional[Dict[str, Any]] = None, ): super().__init__( map_transformer, input_op, data_context, name, target_max_block_size, min_rows_per_bundle, supports_fusion, ray_remote_args_fn, ray_remote_args, ) #初始化ActorPool self._actor_pool = _ActorPool( compute_strategy, self._start_actor, per_actor_resource_usage, self.data_context._enable_actor_pool_on_exit_hook, ) # A queue of bundles awaiting dispatch to actors. self._bundle_queue = create_bundle_queue() # Cached actor class. self._cls = None # Whether no more submittable bundles will be added. self._inputs_done = False
在ActorPoolMapOperator的构造方法中,首先初始化了一个_ActorPool,这个_ActorPool中维护了Ray Actor的创建、销毁、监控等方法,并可根据多种条件进行扩容与缩容。接着初始化了一个_bundle_queue,保存着所有等待处理的输入队列。_ActorPool中具体保存的Actor类是_MapWorker,该类封装了map_transformer,并在submit方法中实现了真正的map_transformer的调用执行逻辑。
2)_MapWorker
class _MapWorker: """An actor worker for MapOperator.""" def __init__( self, ctx: DataContext, src_fn_name: str, map_transformer: MapTransformer, ): DataContext._set_current(ctx) self.src_fn_name: str = src_fn_name self._map_transformer = map_transformer # Initialize state forthis actor. self._map_transformer.init() def get_location(self) -> NodeIdStr: return ray.get_runtime_context().get_node_id() def submit( self, data_context: DataContext, ctx: TaskContext, *blocks: Block, **kwargs: Dict[str, Any], ) -> Iterator[Union[Block, List[BlockMetadata]]]: yield from _map_task( self._map_transformer, data_context, ctx, *blocks, **kwargs, )
get_location方法可以用于获取这个Actor的节点位置。重点看下submit方法,使用_map_task方法对map_transformer真正调用执行:
3)_map_task
def _map_task( map_transformer: MapTransformer, data_context: DataContext, ctx: TaskContext, *blocks: Block, **kwargs: Dict[str, Any], ) -> Iterator[Union[Block, List[BlockMetadata]]]: """Remote function for a single operator task. Args: fn: The callable that takes Iterator[Block] as input and returns Iterator[Block] as output. blocks: The concrete block values from the task ref bundle. Returns: A generator of blocks, followed by the list of BlockMetadata for the blocks as the last generator return. """ DataContext._set_current(data_context) ctx.kwargs.update(kwargs) TaskContext.set_current(ctx) stats = BlockExecStats.builder() map_transformer.set_target_max_block_size(ctx.target_max_block_size) with MemoryProfiler(data_context.memory_usage_poll_interval_s) as profiler: for b_out in map_transformer.apply_transform(iter(blocks), ctx): # TODO(Clark): Add input file propagation from input blocks. m_out = BlockAccessor.for_block(b_out).get_metadata() m_out.exec_stats = stats.build() m_out.exec_stats.udf_time_s = map_transformer.udf_time() m_out.exec_stats.task_idx = ctx.task_idx m_out.exec_stats.max_uss_bytes = profiler.estimate_max_uss() yield b_out yield m_out stats = BlockExecStats.builder() profiler.reset() TaskContext.reset_current()
_map_task方法调用了MapTransformer.apply_transform处理输入数据,并返回输出Block与BlockMetadeta的生成器。生成器第一次输出结果Block,第二次输出与之对应的BlockMetadata。(但是注意此处代码注释中写到生成器先返回所有Block,再返回List[BlockMetadata],与实际逻辑不符,原因未知)。
4)_start_actor
def _start_actor(self): """Start a new actor and add it to the actor pool as a pending actor.""" assert self._cls is not None ctx = self.data_context if self._ray_remote_args_fn: self._refresh_actor_cls() actor = self._cls.options( _labels={self._OPERATOR_ID_LABEL_KEY: self.id} ).remote( ctx, src_fn_name=self.name, map_transformer=self._map_transformer, ) res_ref = actor.get_location.options(name=f"{self.name}.get_location").remote() def _task_done_callback(res_ref): # res_ref is a futurefor a now-ready actor; move actor from pending to the # active actor pool. has_actor = self._actor_pool.pending_to_running(res_ref) ifnot has_actor: # Actor has already been killed. return # A new actor has started, we try to dispatch queued tasks. self._dispatch_tasks() self._submit_metadata_task( res_ref, lambda: _task_done_callback(res_ref), ) return actor, res_ref
_start_actor方法定义了启动单个Actor的逻辑。
首先获取到Actor类,即_MapWorker,使用ray.remote函数创建一个actor,此时actor处于pending状态。
通过调用res_ref = actor.get_location.remote()方法,可以触发get_location方法的远程执行,并返回调用结果的ObjectRef。
这个res_ref其实可以认为是Actor创建是否成功的标志,如果使用ray.get(res_ref)尝试获取该ObjectRef的值,那么只有在actor创建完毕并远程执行完get_location()后,才可以获取到res_ref的真正数据,否则ray.get()会被阻塞。ActorPool会维护一个_pending_actors字段,结构为Dict[ObjectRef, ray.actor.ActorHandle]。保存了res_ref到ActorHandle的映射关系。在res_ref计算完成后,ActorPool会调用pending_to_running方法,将这个Actor设置为Running状态,允许向其提交任务。
此处调用了_submit_metadata_task方法,针对res_ref提交了一个MetadataOpTask,该方法的定义位于父类MapOperator中,主要逻辑是将ObjectRef封装到MetadataOpTask中,MetadataOpTask封装了一个ObjectRef与对应的task_done_callback回调函数,在Ray Data的调度循环中,会尝试使用ray.wait()来判断每个MetadataOpTask的object_ref是否计算完成,并在计算完成后调用_task_done_callback()。
也就是说在res_ref计算完成后,会自动调用_task_done_callback,将Actor状态由Pending变为Running,并调用_dispatch_tasks尝试处理新任务,_dispatch_tasks方法将在后面详细解释。
class MapOperator(OneToOneOperator, ABC): def _submit_metadata_task( self, result_ref: ObjectRef, task_done_callback: Callable[[], None] ): """Submit a new metadata-handling task.""" task_index = self._next_metadata_task_idx self._next_metadata_task_idx += 1 def _task_done_callback(): self._metadata_tasks.pop(task_index) task_done_callback() #在Operator中维护一个列表,保存所有提交的MetadataOpTask,后续调度系统会遍历该列表 self._metadata_tasks[task_index] = MetadataOpTask( task_index, result_ref, _task_done_callback ) class MetadataOpTask(OpTask): """Represents an OpTask that only handles metadata, instead of Block data.""" def __init__( self, task_index: int, object_ref: ray.ObjectRef, task_done_callback: Callable[[], None], task_resource_bundle: Optional[ExecutionResources] = None, ): """ Args: object_ref: The ObjectRef of the task. task_done_callback: The callback to call when the task is done. """ super().__init__(task_index, task_resource_bundle) self._object_ref = object_ref self._task_done_callback = task_done_callback #如何标志MetadataOpTask已经完成?只需要_object_ref变成ready,即代表ray的异步远程计算已经完成。 def get_waitable(self) -> ray.ObjectRef: return self._object_ref def on_task_finished(self): """Callback when the task is finished.""" self._task_done_callback()
5)start
def start(self, options: ExecutionOptions): self._actor_locality_enabled = options.actor_locality_enabled super().start(options) # Create the actor workers and add them to the pool. self._cls = ray.remote(**self._ray_remote_args)(_MapWorker) self._actor_pool.scale_up(self._actor_pool.min_size()) refs = self._actor_pool.get_pending_actor_refs() # We synchronously wait for the initial number of actors to start. This avoids # situations where the scheduler is unable to schedule downstream operators # due to lack of available actors, causing an initial "pileup" of objects on # upstream operators, leading to a spike in memory usage prior to steady state. logger.debug(f"{self._name}: Waiting for {len(refs)} pool actors to start...") try: timeout = self.data_context.wait_for_min_actors_s ray.get(refs, timeout=timeout) except ray.exceptions.GetTimeoutError: raise ray.exceptions.GetTimeoutError( "Timed out while starting actors. " "This may mean that the cluster does not have " "enough resources for the requested actor pool." )
在start方法中,定义了ActorPoolMapOperator的启动逻辑,首先调用_actor_pool.scale_up方法,按照用户定义的Actor数量扩容_actor_pool。此时_actor_pool会循环调用_start_actor来启动新的Actor。
接着通过ray.get()等待所有的res_ref,即等待Actor启动成功。如果超过了最大等待时间依然没有启动足够的Actor(通常由于集群资源不足,或Actor初始化方法有问题),则直接抛异常停止任务执行。
6)_add_bundled_input
def _add_bundled_input(self, bundle: RefBundle): self._bundle_queue.add(bundle) self._metrics.on_input_queued(bundle) # Try to dispatch all bundles in the queue, including thisnew bundle. self._dispatch_tasks()
前面已经介绍过,每个PhysicalOperator都要实现_add_bundled_input方法,定义如何处理新的输入数据,此处可以看出ActorPoolMapOperator定义了将输入的RefBundle添加到_bundle_queue队列中,并调用_dispatch_tasks方法尝试处理数据。_dispatch_tasks是处理数据的核心逻辑:
7)_dispatch_tasks
def _dispatch_tasks(self): """Try to dispatch tasks from the bundle buffer to the actor pool. This is called when: * a new input bundle is added, * a task finishes, * a new worker has been created. """ #循环处理_bundle_queue中的所有数据 while self._bundle_queue: # 从ActorPool中挑选一个Running状态的Actor if self._actor_locality_enabled: actor = self._actor_pool.pick_actor(self._bundle_queue.peek()) else: actor = self._actor_pool.pick_actor() #如果没有正在运行的actor,则无法调度任务,等待下次执行 if actor is None: # No actors available for executing the next task. break #获取一个bundle bundle = self._bundle_queue.pop() self._metrics.on_input_dequeued(bundle) input_blocks = [block for block, _ in bundle.blocks] ctx = TaskContext( task_idx=self._next_data_task_idx, target_max_block_size=self.actual_target_max_block_size, ) #远程调用actor.submit方法,submit方法最终调用的是_map_task。返回的是Block结果的生成器 gen = actor.submit.options( num_returns="streaming", name=f"{self.name}.submit", **self._ray_actor_task_remote_args, ).remote( self.data_context, ctx, *input_blocks, **self.get_map_task_kwargs(), ) #定义任务结束的回调函数, def _task_done_callback(actor_to_return): # Return the actor that was running the task to the pool. self._actor_pool.return_actor(actor_to_return) # Dipsatch more tasks. self._dispatch_tasks() from functools import partial #封装任务到DataOpTask中 self._submit_data_task( gen, bundle, partial(_task_done_callback, actor_to_return=actor) )
可以看出_dispatch_tasks的核心逻辑如下:
1. 循环_bundle_queue,获取待处理的Block数据;
2. 从Pool中挑选出一个Running状态的actor;
3. 调用actor.submit.remote()方法提交任务到Ray集群分布式执行,处理;
4. 调用_submit_data_task方法,将submit返回的ObjectRefGenerator封装到DataOpTask中。
此处submit返回的gen是一个ObjectRefGenerator类型的生成器,这是由于submit方法采用yield from _map_task返回了一个Generator,在ray remote转换后返回的即为ObjectRefGenerator,可以通过next方法获取下一个ObjectRef。也可以使用ray.wait方法等待生成器的下一个输出计算完成。参考文档:https://docs.ray.io/en/latest/ray-core/ray-generator.html#generators。
这个生成器最终通过调用_submit_data_task被封装到了DataOpTask中:
def _submit_data_task( self, gen: ObjectRefGenerator, inputs: RefBundle, task_done_callback: Optional[Callable[[], None]] = None, ): """Submit a new data-handling task.""" # TODO(hchen): # 1. Move this to the base PhyscialOperator class. # 2. This method should only take a block-processing function as input, # instead of a streaming generator. The logic of submitting ray tasks # can also be capsulated in the base class. task_index = self._next_data_task_idx self._next_data_task_idx += 1 self._metrics.on_task_submitted(task_index, inputs) #每次有新的数据计算完成,则回调该函数 def _output_ready_callback(task_index, output: RefBundle): # Since output is streamed, it should only contain one block. assert len(output) == 1 self._metrics.on_task_output_generated(task_index, output) # Notify output queue that the task has produced an new output. self._output_queue.notify_task_output_ready(task_index, output) self._metrics.on_output_queued(output) #所有数据计算完成,则回调该函数 def _task_done_callback(task_index: int, exception: Optional[Exception]): self._data_tasks.pop(task_index) # Notify output queue that this task is complete. self._output_queue.notify_task_completed(task_index) if task_done_callback: task_done_callback() #封装到DataOpTask中 self._data_tasks[task_index] = DataOpTask( task_index, gen, lambda output: _output_ready_callback(task_index, output), functools.partial(_task_done_callback, task_index), )
_submit_data_task是父类MapOperator定义的方法,主要是定义了_output_ready_callback、 _task_done_callback等回调函数,并和ObjectRefGenerator生成器一起封装到DataOpTask中。DataOpTask代表了PhysicalOperator提交到ray集群远程执行的一个分布式任务。
8)DataOpTask
class DataOpTask(OpTask): """Represents an OpTask that handles Block data.""" def __init__( self, task_index: int, streaming_gen: ObjectRefGenerator, output_ready_callback: Callable[[RefBundle], None], task_done_callback: Callable[[Optional[Exception]], None], task_resource_bundle: Optional[ExecutionResources] = None, ): """ Args: streaming_gen: The streaming generator of this task. It should yield blocks. output_ready_callback: The callback to call when a new RefBundle is output from the generator. task_done_callback: The callback to call when the task is done. """ super().__init__(task_index, task_resource_bundle) self._streaming_gen = streaming_gen self._output_ready_callback = output_ready_callback self._task_done_callback = task_done_callback def get_waitable(self) -> ObjectRefGenerator: return self._streaming_gen def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int: """Callback when data is ready to be read from the streaming generator. Args: max_bytes_to_read: Max bytes of blocks to read. If None, all available will be read. Returns: The number of blocks read. """ bytes_read = 0 while max_bytes_to_read is None or bytes_read < max_bytes_to_read: try: block_ref = self._streaming_gen._next_sync(0) if block_ref.is_nil(): # The generator currently doesn't have new output. # And it's not stopped yet. break except StopIteration: self._task_done_callback(None) break try: meta = ray.get(next(self._streaming_gen)) except StopIteration: # The generator should always yield 2 values (block and metadata) # each time. If we get a StopIteration here, it means an error # happened in the task. # And in thiscase, the block_ref is the exception object. # TODO(hchen): Ray Core should have a better interface for # detecting and obtaining the exception. try: ray.get(block_ref) assert False, "Above ray.get should raise an exception." except Exception as ex: self._task_done_callback(ex) raise ex from None self._output_ready_callback( RefBundle([(block_ref, meta)], owns_blocks=True) ) bytes_read += meta.size_bytes return bytes_read
DataOpTask中,封装了由actor.submit方法返回的生成器_streaming_gen。在每次流式执行循环中,会尝试调用ray.wait等待_streaming_gen产生输出,并在输出完成后调用on_data_ready方法,该方法具体逻辑:
1. 调用block_ref = self._streaming_gen._next_sync(0),从_streaming_gen中获取计算好的Block数据;
2. 如果出现StopIteration,则_streaming_gen没有后续输出了,回调_task_done_callback;
3. 调用meta = ray.get(next(self._streaming_gen)),从_streaming_gen中获取计算好的BlockMetaData数据,这里与_map_task的实现对应上了,即第一次yield Block,第二次yield BlockMetadata;
4. 将Block与BlockMetadata封装到RefBundle中,并回调_output_ready_callback。在MapOperator._submit_data_task中定义了该回调函数,会将输出结果放到MapOperator的输出缓存中,后续可以用get_next方法获取结果。
至此,我们已经将ActorPoolMapOperator相关的内容介绍完毕,TaskPoolMapOperator不再介绍,最终也是通过ray.remote提交任务,并封装到DataOpTask,区别在于TaskPoolMapOperator直接使用函数提交_map_task,没有使用_MapWroker类进行封装。
再次回顾之前的plan_read_op方法,在最后将do_read方法通过MapTransformer封装到了一个TaskPoolMapOperator或ActorPoolMapOperator中,以实现分布式执行。plan_read_op将Read这一LogicalOperator转换为两个PhysicalOperator,最终生成的DAG为:
InputDataBuffer-->TaskPoolMapOperator(ReadCSV)
InputDataBuffer的输出即ReadTask,是TaskPoolMapOperator(ReadCSV)的输入,而TaskPoolMapOperator(ReadCSV)接收ReadTask作为输入,并通过ray.remote分布式执行do_read,调用ReadTask读取数据,其输出即为读取到的Block数据。作为后续算子的输入。同理,对于MapRows与MapBatches两个LogicalOperator,也有相对应的plan_udf_map_op将其转换为PhysicalOperator。
plan_udf_map_op方法
def plan_udf_map_op( op: AbstractUDFMap, physical_children: List[PhysicalOperator], data_context: DataContext, ) -> MapOperator: """Get the corresponding physical operators DAG for AbstractUDFMap operators. Note this method only converts the given `op`, but not its input dependencies. See Planner.plan()for more details. """ assert len(physical_children)== 1 input_physical_dag = physical_children[0] compute = get_compute(op._compute) fn, init_fn = _parse_op_fn(op) if isinstance(op, MapBatches): transform_fn = _generate_transform_fn_for_map_batches(fn) map_transformer = _create_map_transformer_for_map_batches_op( transform_fn, op._batch_size, op._batch_format, op._zero_copy_batch, init_fn, ) else: if isinstance(op, MapRows): transform_fn = _generate_transform_fn_for_map_rows(fn) elif isinstance(op, FlatMap): transform_fn = _generate_transform_fn_for_flat_map(fn) else: raise ValueError(f"Found unknown logical operator during planning: {op}") map_transformer = _create_map_transformer_for_row_based_map_op( transform_fn, init_fn ) return MapOperator.create( map_transformer, input_physical_dag, data_context, name=op.name, target_max_block_size=None, compute_strategy=compute, min_rows_per_bundle=op._min_rows_per_bundled_input, ray_remote_args_fn=op._ray_remote_args_fn, ray_remote_args=op._ray_remote_args, )
不难看出,这里使用了map_transformer封装了用户自定义的转换函数,最终调用MapOperator.create,我们示例代码中传入的是一个类,因此最终转换为ActorPoolMapOperator。
执行PhysicalPlan
在上一节我们描述了如何通过Planner将LogicalOperator转换为PhysicalOperator,并对最重要的几种PhysicalOperator进行了介绍。我们的示例代码在转换后得到的DAG如下:
InputDataBuffer->TaskPoolMapOperator(ReadCSV)->ActorPoolMapOperator(CustomMapActor)->ActorPoolMapOperator(CustomMapBatchActor)->TaskPoolMapOperator(WriteCSV)
我们回顾下execute_to_legacy_block_list方法,在获取了PhysicalPlan后,接着便可使用StreamingExecutor对该DAG流式执行
def execute_to_legacy_block_list( executor: Executor, plan: ExecutionPlan, dataset_uuid: str, preserve_order: bool, ) -> BlockList: #Logical Operator到Physical Operator的转换在此处执行 dag, stats = _get_execution_dag( executor, plan, preserve_order, ) #对DAG流式执行 bundles = executor.execute(dag, initial_stats=stats) block_list = _bundles_to_block_list(bundles) # Set the stats UUID after execution finishes. _set_stats_uuid_recursive(executor.get_stats(), dataset_uuid) return block_list
在介绍StreamingExecutor之前,先介绍下OpState类:
OpState
classOpState: """The execution state tracked for each PhysicalOperator. This tracks state to manage input and output buffering for StreamingExecutor and progress bars, which is separate from execution state internal to the operators. Note: we use the `deque` data structure here because it is thread-safe, enabling operator queues to be shared across threads. """ def __init__(self, op: PhysicalOperator, inqueues: List[OpBufferQueue]): # Each inqueue is connected to another operator's outqueue. assert len(inqueues)== len(op.input_dependencies), (op, inqueues) self.inqueues: List[OpBufferQueue] = inqueues # The outqueue is connected to another operator's inqueue (they physically # share the same Python list reference). # # Note: thisqueue is also accessed concurrently from the consumer thread. # (in addition to the streaming executor thread). Hence, it must be a # thread-safe type such as `deque`. self.outqueue: OpBufferQueue = OpBufferQueue() self.op = op self.progress_bar = None self.num_completed_tasks = 0 self.inputs_done_called = False # Tracks whether `input_done` is called for each input op. self.input_done_called = [False] * len(op.input_dependencies) # Used for StreamingExecutor to signal exception or end of execution self._finished: bool = False self._exception: Optional[Exception] = None self._scheduling_status = OpSchedulingStatus() def add_output(self, ref: RefBundle) -> None: """Move a bundle produced by the operator to its outqueue.""" self.outqueue.append(ref) self.num_completed_tasks += 1 if self.progress_bar: assert ( ref.num_rows() is not None ), "RefBundle must have a valid number of rows" self.progress_bar.update(ref.num_rows(), self.op.num_output_rows_total()) def dispatch_next_task(self) -> None: """Move a bundle from the operator inqueue to the operator itself.""" for i, inqueue in enumerate(self.inqueues): ref = inqueue.pop() if ref is not None: self.op.add_input(ref, input_index=i) return assert False, "Nothing to dispatch" def get_output_blocking(self, output_split_idx: Optional[int]) -> RefBundle: """Get an item from this node's output queue, blocking as needed. Returns: The RefBundle from the output queue, or an error / end of stream indicator. Raises: StopIteration: If all outputs are already consumed. Exception: If there was an exception raised during execution. """ while True: # Check if StreamingExecutor has caught an exception or is done execution. if self._exception is not None: raise self._exception elif self._finished andnot self.outqueue.has_next(output_split_idx): raise StopIteration() ref = self.outqueue.pop(output_split_idx) if ref is not None: return ref time.sleep(0.01)
OpState与PhysicalOperator是一一对应的关系,OpState追踪了每个PhysicalOperator的执行状态,管理了PhysicalOperator输入与输出缓存。我们在基础概念-流式执行模型中提到,每个Operator的输入队列被设置为其前置Operator的输出队列,而这个队列实际保存在OpState对象中。OpState提供了一些重要方法:
1. add_output:将输出添加到算子的outqueue中(等价于添加到下游算子的inqueue中);
2. dispatch_next_task:从inqueue中获取输入,并调用self.op.add_input方法,显然最终会执行到该op的_add_input_inner方法来消费数据;
3. get_output_blocking:尝试从outqueue中获取一个数据,如果当前还没有数据,且算子没有执行结束,该方法会阻塞。
StreamingExecutor.execute()
class StreamingExecutor(Executor, threading.Thread): """A streaming Dataset executor. This implementation executes Dataset DAGs in a fully streamed way. It runs by setting up the operator topology, and then routing blocks through operators in a way that maximizes throughput under resource constraints. """ def execute( self, dag: PhysicalOperator, initial_stats: Optional[DatasetStats] = None ) -> Iterator[RefBundle]: """Executes the DAG using a streaming execution strategy. We take an event-loop approach to scheduling. We block on the next scheduling event using `ray.wait`, updating operator state and dispatching new tasks. """ #...... #...... # Setup the streaming DAG topology and start the runner thread. self._topology, _ = build_streaming_topology(dag, self._options) #... #... self._has_op_completed = {op: False for op in self._topology} self._output_node: OpState = self._topology[dag] StatsManager.register_dataset_to_stats_actor( self._dataset_tag, self._get_operator_tags(), ) for callback in get_execution_callbacks(self._data_context): callback.before_execution_starts(self) self.start() self._execution_started = True class StreamIterator(OutputIterator): def __init__(self, outer: Executor): self._outer = outer def get_next(self, output_split_idx: Optional[int] = None) -> RefBundle: try: item = self._outer._output_node.get_output_blocking( output_split_idx ) if self._outer._global_info: self._outer._global_info.update( item.num_rows(), dag.num_output_rows_total() ) return item # Needs to be BaseException to catch KeyboardInterrupt. Otherwise we # can leave dangling progress bars by skipping shutdown. except BaseException as e: self._outer.shutdown( e ifnot isinstance(e, StopIteration) else None ) raise def __del__(self): self._outer.shutdown() return StreamIterator(self)
StreamingExecutor的execute方法中,首先调用了build_streaming_topology方法,构建并初始化流式执行拓扑。
def build_streaming_topology( dag: PhysicalOperator, options: ExecutionOptions ) -> Tuple[Topology, int]: """Instantiate the streaming operator state topology for the given DAG. This involves creating the operator state for each operator in the DAG, registering it with this class, and wiring up the inqueues/outqueues of dependent operator states. Args: dag: The operator DAG to instantiate. options: The execution options to use to start operators. Returns: The topology dict holding the streaming execution state. The number of progress bars initialized so far. """ topology: Topology = {} # DFS walk to wire up operator states. def setup_state(op: PhysicalOperator) -> OpState: if op in topology: raise ValueError("An operator can only be present in a topology once.") # Wire up the input outqueues to this op's inqueues. inqueues = [] for i, parent in enumerate(op.input_dependencies): parent_state = setup_state(parent) inqueues.append(parent_state.outqueue) # Create state. op_state = OpState(op, inqueues) topology[op] = op_state op.start(options) return op_state setup_state(dag) return (topology, i)
事实上,就是递归遍历DAG中的每一个PhysicalOperator,创建其对应的OpState,并将该OpState的inqueue设置为上游节点的outqueue,最后执行op.start()方法初始化Operator。最终返回的topology中,每一个OpState都通过inqueue、outqueue与上下游算子建立了连接,相当于构建了出了流式执行的一个拓扑图。
在构建完拓扑后,execute方法执行了self.start()方法,以启动run()方法中定义的流式执行调度循环,
最后封装并返回一个迭代器,该迭代器的get_next方法调用了DAG中最后一个节点(示例中是Write算子)的get_output_blocking方法,遍历该迭代器可以得到数据集的最终执行结果(Write算子的输出一般是一些统计信息)。在execute_to_legacy_block_list方法中,调用了block_list = _bundles_to_block_list(bundles),迭代该Iterator,并返回最后的输出结果block_list。
下面重点看一下run()方法是如何循环调度整个DAG的执行的:
StreamingExecutor.run()
def run(self): """Run the control loop in a helper thread. Results are returned via the output node's outqueue. """ try: # Run scheduling loop until complete. while True: t_start = time.process_time() # use process_time to avoid timing ray.wait in _scheduling_loop_step continue_sched = self._scheduling_loop_step(self._topology) if self._initial_stats: self._initial_stats.streaming_exec_schedule_s.add( time.process_time() - t_start ) for callback in get_execution_callbacks(self._data_context): callback.on_execution_step(self) ifnot continue_sched or self._shutdown: break except Exception as e: # Propagate it to the result iterator. self._output_node.mark_finished(e) finally: # Signal end of results. self._output_node.mark_finished() def _scheduling_loop_step(self, topology: Topology) -> bool: """Run one step of the scheduling loop. This runs a few general phases: 1. Waiting for the next task completion using `ray.wait()`. 2. Pulling completed refs into operator outqueues. 3. Selecting and dispatching new inputs to operators. Returns: True if we should continue running the scheduling loop. """ self._resource_manager.update_usages() # Note: calling process_completed_tasks() is expensive since it incurs # ray.wait() overhead, so make sure to allow multiple dispatch per call for # greater parallelism. #调用ray.wait()来等待DAG中的任务执行完成并进行处理 num_errored_blocks = process_completed_tasks( topology, self._resource_manager, self._max_errored_blocks, ) #根据策略选择一个Operator来执行 op = select_operator_to_run( topology, self._resource_manager, self._backpressure_policies, self._autoscaler, ensure_at_least_one_running=self._consumer_idling(), ) i = 0 while op is not None: #调用OpState的dispatch_next_task,从inqueue中获取输入给Operator处理 topology[op].dispatch_next_task() self._resource_manager.update_usages() #循环选择下一个Operator,直到满足一些条件(例如所有Operator都在满负荷运行) op = select_operator_to_run( topology, self._resource_manager, self._backpressure_policies, self._autoscaler, ensure_at_least_one_running=self._consumer_idling(), ) # Log metrics of newly completed operators. for op in topology: if op.completed() andnot self._has_op_completed[op]: log_str = ( f"Operator {op} completed. " f"Operator Metrics:\n{op._metrics.as_dict()}" ) logger.debug(log_str) self._has_op_completed[op] = True # 持续处理,直到所有Operator都处理完成 returnnot all(op.completed() for op in topology)
run方法中实际上循环调用_scheduling_loop_step方法,执行单次调度循环,_scheduling_loop_step方法的主要逻辑为2步:
1. 调用process_completed_tasks,等待下一个任务完成并处理每个Operator的输出;
2. 根据策略选择一个Operator,调用OpState.dispatch_next_task()方法,从inqueue中获取输入交由Operator处理(前面已经介绍过最终调用的是PhysicalOperator._add_input_inner)。
在process_complete_task中,实际上对前面介绍的的DataOpTask以及MetadataOpTask做了处理,具体逻辑如下:
process_complete_task
def process_completed_tasks( topology: Topology, resource_manager: ResourceManager, max_errored_blocks: int, ) -> int: """Process any newly completed tasks. To update operator states, call `update_operator_states()` afterwards. Args: topology: The toplogy of operators. backpressure_policies: The backpressure policies to use. max_errored_blocks: Max number of errored blocks to allow, unlimited if negative. Returns: The number of errored blocks. """ # All active tasks, keyed by their waitables. active_tasks: Dict[Waitable, Tuple[OpState, OpTask]] = {} #获取所有Operator正在运行的DataOpTask与MetadataOpTask for op, state in topology.items(): for task in op.get_active_tasks(): #获取所有的waitable,对于DataOpTask来说,就是streaming_gen active_tasks[task.get_waitable()] = (state, task) # Process completed Ray tasks and notify operators. num_errored_blocks = 0 if active_tasks: ready, _ = ray.wait( list(active_tasks.keys()), num_returns=len(active_tasks), fetch_local=False, timeout=0.1, ) # Organize tasks by the operator they belong to, and sort them by task index. # So that we'll process them in a deterministic order. # This is because OpResourceAllocator may limit the number of blocks to read # per operator. In this case, we want to have fewer tasks finish quickly and # yield resources, instead of having all tasks output blocks together. ready_tasks_by_op = defaultdict(list) for ref in ready: state, task = active_tasks[ref] ready_tasks_by_op[state].append(task) for state, ready_tasks in ready_tasks_by_op.items(): ready_tasks = sorted(ready_tasks, key=lambda t: t.task_index()) for task in ready_tasks: if isinstance(task, DataOpTask): try: #回调DataOpTask的on_data_ready方法 bytes_read = task.on_data_ready( max_bytes_to_read_per_op.get(state, None) ) if state in max_bytes_to_read_per_op: max_bytes_to_read_per_op[state] -= bytes_read except Exception as e: num_errored_blocks += 1 should_ignore = ( max_errored_blocks < 0 or max_errored_blocks >= num_errored_blocks ) error_message = ( "An exception was raised from a task of " f'operator"{state.op.name}".' ) if should_ignore: remaining = ( max_errored_blocks - num_errored_blocks if max_errored_blocks >= 0 else"unlimited" ) error_message += ( " Ignoring this exception with remaining" f" max_errored_blocks={remaining}." ) logger.error(error_message, exc_info=e) else: error_message += ( " Dataset execution will now abort." " To ignore this exception and continue, set" " DataContext.max_errored_blocks." ) logger.error(error_message) raise e from None else: assert isinstance(task, MetadataOpTask) task.on_task_finished() # Pull any operator outputs into the streaming op state. for op, op_state in topology.items(): while op.has_next(): #将所有Operator当前的输出移动到outqueue中 op_state.add_output(op.get_next()) return num_errored_blocks
1. 首先获取所有PhysicalOperator正在运行的DataOpTask与MetadataOpTask,并得到对应的所有ObjectRef与ObjectRefGenerator。
2. 调用ray.wait(),等待ObjectRef执行完成或ObjectRefGenerator产生下一个输出。
3. wait会返回所有ready的ObjectRef与ObjectRefGenerator,因此可以获得当前输出Ready的DataOpTask与MetadataOpTask列表。
4. 对于Ready的DataOpTask,回调on_data_ready方法。
5. 对于Ready的MetadataOpTask,回调on_task_finished方法。
6. 遍历所有的Operator,如果当前有输出,则将输出移动到对应的outqueue中。
在每一个_scheduling_loop_step,Ray Data等待下一个输出结果的产生,回调结果,并将每个Operator的输出移动到outqueue中,接着选择一个Operator,执行dispatch_next_task方法从输入队列中获取数据进行处理,构建出了从inqueue中读取数据->启动ray remote task分布式处理->输出到outqueue的流式循环。
总结
虽然Ray Data提供的接口较为简洁,但其内部实现较为复杂且精巧,具体步骤可以分为:
1. 构建LogicalPlan;
2. 将LogicalPlan转换为PhysicalPlan;
3. 使用StreamingExecutor执行PhysicalPlan,PhysicalPlan中的每个PhysicalOperator并行执行,启动ray remote task分布式处理输入,StreamingExecutor在每次调度循环中将PhysicalOperator已有的结果移动到输出队列中,并选择一些PhysicalOperator向其增加更多输入。
虽然Ray在AI的多个领域展现出了较大的潜力,有望成为分布式计算的新范式,但目前网上分析Ray Data内部原理的文章较少,问题排查以及二次开发都有一定的难度,本文从简单的示例代码出发,系统性地分析了从Logical Plan构建到PhysicalPlan流式执行的整个过程,希望对读者有一定的帮助。
来源 | 阿里云开发者公众号
作者 | 琮安