理解Ray Data分布式数据处理原理-源码分析

简介: Ray Data是构建于Ray之上的分布式数据处理库,提供了高性能且可扩展的API,可被有效结合到AI相关场景,例如批量推理、数据预处理等。Ray Data内部通过流式执行机制来有效处理大规模数据集,并可有效的利用异构架构,不让CPU或GPU成为数据处理的瓶颈。

简介

Ray Data是构建于Ray之上的分布式数据处理库,提供了高性能且可扩展的API,可被有效结合到AI相关场景,例如批量推理、数据预处理等。Ray Data内部通过流式执行机制来有效处理大规模数据集,并可有效的利用异构架构,不让CPU或GPU成为数据处理的瓶颈。


虽然Ray Data提供了简单易用的API,但其内部实现与执行原理较难理解,二次开发与问题排查有一定的难度,因此本文从Ray Data源码入手,重点分析了Ray Data流式执行的实现原理。


示例

import ray
from typing import Dict
from numpy.typing import NDArray

# Load a CSV dataset directly from NAS/S3
ds = ray.data.read_csv("/mnt/example-data/iris.csv")

#用户定义Map推理类
class CustomMapActor:
    # 初始化
    def __init__(self): 
      print("init")

    # 推理,自定义模型调用逻辑,必需实现
    def __call__(self, single):
        single["output"] = "test"
        return single
        
#用户定义MapBatch推理类
class CustomMapBatchActor:
    # 初始化
    def __init__(self): 
      print("init")

    # 推理,自定义模型调用逻辑,格式为{key:NDArray, key:NDArray},key为列名,NDArray为batch_size长度的列值数组
    def __call__(self, data: Dict[str, NDArray]):
       return data

ds = ds.map(CustomMapActor,concurrency=5)

ds = ds.map_batches(CustomMapBatchActor, concurrency=5, batch_size=1024)

ds.write_csv("/mnt/result-data/")

上面这段代码定义了一个经典的Ray Data数据处理流程,从CSV文件中读取数据,使用map操作处理每一行数据,再使用map_batches操作批量处理数据,最后将结果写入到CSV文件中进行保存。得益于流式执行,即使输入数据量远大于机器内存,Ray Data依然可以将数据处理完。Ray Data通过read_api获得一个Dataset,并使用Dataset的map、map_batches算子对输入数据进行处理,最终将数据写入到某个位置。


基础概念

Datasets与blocks

Datasets类位于ray.data.dataset.py文件中,是Ray Data的核心概念,也是最重要的面向用户的API,Dataset代表着一个分布式数据集合,并定义了数据加载、处理、写入的算子。用户总是通过read_api来生成一个数据集,并对数据执行一系列转换,最终将转换后的数据写入到某个存储中。Dataset的API是Lazy的,也就是说每一步定义的算子并不会执行,直到真正需要消耗数据时才会执行,可以类比Java Stream API,每一步中间操作定义的是一个执行计划,只有在遇到Terminal Operation时,才会真正触发计算的执行。


在Ray Data内部,数据集由blocks组成,每个block代表着dataset连续多行的一个子集合。block是ray data存储与在集群中传输的最小单元,可以被并行处理。

上图展示了Dataset以及block的概念,每个block存储了数据集中连续的1000行数据,在底层实际存储为pyarrow的Table,如果每行的数据结构较复杂导致pyarrow无法表示,Ray Data会自动转而使用pandas Dataframes来存储。


同时可以看出,Dataset位于Ray Job的Driver程序中,操作的都是block的ObjectRef。每个block的实际数据被存储在Ray的分布式共享内存,即Object Store中。

Operators与Plans

Ray Data中,Operator是算子的基础抽象,每个read、map、write操作实际是由算子来执行。Plans则代表着执行计划,实际上存储了一个由Operator以及依赖关系组成的DAG,DAG中的每一个节点是一个Operator。Ray Data定义了Logical Operator与Physical Operator两种算子,其中Logical Operator是无状态的算子,描述了应该执行哪种操作,而Physical Operator则实际描述了如何启动分布式Ray Task来执行这些操作。在我们调用Dataset API时,事实上是构建了一个由Logical Operator组成的Logical Plan,在实际执行时,Ray Data内部的Planner组件会将每个Logical Operator转换为实际执行的Physical Operator,并生成最终由Physical Operator组成的DAG,即Physical Plan,用于真正的执行操作。

上图展示了Ray Data构建Physical Plan的过程,首先Ray会采用内部的Optimizer对Logical Plan做优化,可能会合并或拆分一些算子,接着转换为Physical Plan,再次进行优化,得到最终的算子执行DAG。Optimizer主要是为了提升执行性能,例如用户连续定义了多个基于Ray Task的map操作,Ray Data最终会将其融合为1个map 操作,以减少序列化的开销。

例如以下代码:

ds = ray.data.read_csv(xx)

class MapActor:
    xxxx

ds = ds.map(MapActor)

ds = ds.write_parquet(xxx)

生成的Logical Plan如下:

Read->MapRows->Write

转换成的Physical Plan如下:

InputDataBuffer->TaskPoolMapOperator(ReadCSV)->ActorPoolMapOperator(MapActor)->TaskPoolMapOperator(WriteCSV)

其中InputDataBuffer与TaskPoolMapOperator、ActorPoolMapOperator都是PhysicalOperator的子类,PhysicalOperator的工作模式为:

1. 接收一个block引用流;

2. 对输入block执行操作(可能启动Ray Task或Ray Actor来分布式执行);

3. 输出另一个block引用流;

流式执行模型

Ray Data采用流式执行的方式来高效处理大规模数据集。

流式执行模型将多个Operator连接成一个流水线,每个Operator拥有一个输入队列与输出队列,在具体实现中,每个Operator的输入队列被设置为其前置Operator的输出队列(即两者为同一对象引用),这使得每个Operator的输出可以直接传输到下游Operator的输入队列中,从而实现了高效的数据流动。


这种流水线架构允许多个Operator并行执行,以提升整体的资源利用率,如果一个操作需要CPU资源,另一个操作需要GPU资源,流水线模型可以在GPU运行的同时执行CPU任务,而无需等待上一个任务执行完成。这也是Ray Data宣传的其相对于Spark的核心优势,下面的动图也直观的展示了这一点:

源码分析

读取数据源

在ray/data/read_api.py文件中提供了对用户暴露的读取数据源接口,我们总是从read_api中的函数开始构建数据处理流程,例如示例代码中的read_csv函数,其源码如下:

read_csv方法

@PublicAPI
def read_csv(
    paths: Union[str, List[str]],
    *,
    filesystem: Optional["pyarrow.fs.FileSystem"] = None,
    parallelism: int = -1,
    ray_remote_args: Dict[str, Any] = None,
    arrow_open_stream_args: Optional[Dict[str, Any]] = None,
    meta_provider: Optional[BaseFileMetadataProvider] = None,
    partition_filter: Optional[PathPartitionFilter] = None,
    partitioning: Partitioning = Partitioning("hive"),
    include_paths: bool = False,
    ignore_missing_paths: bool = False,
    shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
    file_extensions: Optional[List[str]] = None,
    concurrency: Optional[int] = None,
    override_num_blocks: Optional[int] = None,
    **arrow_csv_args,
) -> Dataset:
    """Creates a :class:`~ray.data.Dataset` from CSV files.
    Args:
        paths: 传入文件读取路径
        filesystem: 可以传入文件系统
        其余参数省略
    Returns:
        :class:`~ray.data.Dataset` producing records read from the specified paths.
    """
    _emit_meta_provider_deprecation_warning(meta_provider)

    if meta_provider is None:
        meta_provider = DefaultFileMetadataProvider()

    #核心部分,构建CSVDatasource 
    datasource = CSVDatasource(
        paths,
        arrow_csv_args=arrow_csv_args,
        filesystem=filesystem,
        open_stream_args=arrow_open_stream_args,
        meta_provider=meta_provider,
        partition_filter=partition_filter,
        partitioning=partitioning,
        ignore_missing_paths=ignore_missing_paths,
        shuffle=shuffle,
        include_paths=include_paths,
        file_extensions=file_extensions,
    )

    #所有的read_xxx函数,最终都适用read_datasource方法传入一个Datasource,并返回一个Dataset类
    return read_datasource(
        datasource,
        parallelism=parallelism,
        ray_remote_args=ray_remote_args,
        concurrency=concurrency,
        override_num_blocks=override_num_blocks,
    )

可以看出,read_csv函数首先构建了CSVDatasource类,该类为Datasource类的子类,Datasource是定义数据读取源头的接口,要求子类来具体实现。接着调用read_datasource方法,返回一个Dataset对象。

Datasource

@PublicAPI
classDatasource:
    """Interface for defining a custom :class:`~ray.data.Dataset` datasource.

    To read a datasource into a dataset, use :meth:`~ray.data.read_datasource`.
    """  # noqa: E501
    def get_name(self) -> str:
        """Return a human-readable name forthis datasource.
        This will be used as the names of the read tasks.
        """
        name = type(self).__name__
        datasource_suffix = "Datasource"
        if name.endswith(datasource_suffix):
            name = name[: -len(datasource_suffix)]
        return name

    def estimate_inmemory_data_size(self) -> Optional[int]:
        """Return an estimate of the in-memory data size, or None if unknown.

        Note that the in-memory data size may be larger than the on-disk data size.
        """
        raise NotImplementedError

    def get_read_tasks(self, parallelism: int) -> List["ReadTask"]:
        """Execute the read and return read tasks.

        Args:
            parallelism: The requested read parallelism. The number of read
                tasks should equal to this value if possible.

        Returns:
            A list of read tasks that can be executed to read blocks from the
            datasource in parallel.
        """
        raise NotImplementedError

Datasource类中最重要的方法是get_read_tasks方法,返回的是一个ReadTask的列表,需要由子类来实现,ReadTask对自定义的读取函数进行了封装,定义如下:

ReadTask

@DeveloperAPI
class ReadTask(Callable[[], Iterable[Block]]):
    """A function used to read blocks from the :class:`~ray.data.Dataset`.

    Read tasks are generated by :meth:`~ray.data.Datasource.get_read_tasks`,
    andreturn a list of ``ray.data.Block`` when called. Initial metadata about the read
    operation can be retrieved via the ``metadata`` attribute prior to executing the
    read. Final metadata is returned after the read along with the blocks.

    Ray will execute read tasks in remote functions to parallelize execution.
    Note that the number of blocks returned can vary at runtime. For example,
    if a task is reading a single large file it can return multiple blocks to
    avoid running out of memory during the read.

    The initial metadata should reflect all the blocks returned by the read,
    e.g., if the metadata says ``num_rows=1000``, the read can return a single
    block of 1000 rows, or multiple blocks with 1000 rows altogether.

    The final metadata (returned with the actual block) reflects the exact
    contents of the block itself.
    """

    def __init__(self, read_fn: Callable[[], Iterable[Block]], metadata: BlockMetadata):
        self._metadata = metadata
        self._read_fn = read_fn

    @property
    def metadata(self) -> BlockMetadata:
        return self._metadata

    @property
    def read_fn(self) -> Callable[[], Iterable[Block]]:
        return self._read_fn

    def __call__(self) -> Iterable[Block]:
        result = self._read_fn()
        if not hasattr(result, "__iter__"):
            DeprecationWarning(
                "Read function must return Iterable[Block], got {}. "
                "Probably you need to return `[block]` instead of "
                "`block`.".format(result)
            )
        yield from result

不难看出,ReadTask将传入的_read_fn包装成了一个可以直接调用的类,_read_fn则具体定义了如何从数据源读取数据,并封装成Block,Ray Data最终会启动Ray的remote方法来分布式并行读取数据。


下面我们看一下CSVDatasource如何具体实现get_read_tasks方法的。

FileBasedDatasource

@DeveloperAPI
class FileBasedDatasource(Datasource):
    """File-based datasource for reading files.
    Don't use this class directly. Instead, subclass it and implement `_read_stream()`.
    """
    def __init__(
        self,
        paths: Union[str, List[str]],
        ...
    ):
        #...
        #...
        #...
        paths, self._filesystem = _resolve_paths_and_filesystem(paths, filesystem)
        self._filesystem = RetryingPyFileSystem.wrap(
            self._filesystem, retryable_errors=self._data_context.retried_io_errors
        )
        paths, file_sizes = map(
            list,
            zip(
                *meta_provider.expand_paths(
                    paths,
                    self._filesystem,
                    partitioning,
                    ignore_missing_paths=ignore_missing_paths,
                )
            ),
        )
        # Read tasks serialize `FileBasedDatasource` instances, and the list of paths
        # can be large. To avoid slow serialization speeds, we store a reference to
        # the paths rather than the paths themselves.
        self._paths_ref = ray.put(paths)
        self._file_sizes_ref = ray.put(file_sizes)
    def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
        #...
        #...
        
        #定义了从所有文件路径中读取Block的方法
        def read_files(
            read_paths: Iterable[str],
        ) -> Iterable[Block]:
            nonlocal filesystem, open_stream_args, partitioning
            fs = _unwrap_s3_serialization_workaround(filesystem)
            #核心是遍历所有文件路径,并调用子类实现的_read_stream从单个路径中读取Block
            for read_path in read_paths:
                partitions: Dict[str, str] = {}
                if partitioning is not None:
                    parse = PathPartitionParser(partitioning)
                    partitions = parse(read_path)
                with RetryingContextManager(
                    self._open_input_source(fs, read_path, **open_stream_args),
                    context=self._data_context,
                ) as f:
                    for block in iterate_with_retry(
                        lambda: self._read_stream(f, read_path),
                        description="read stream iteratively",
                        match=self._data_context.retried_io_errors,
                    ):
                        if partitions:
                            block = _add_partitions(block, partitions)
                        if self._include_paths:
                            block_accessor = BlockAccessor.for_block(block)
                            block = block_accessor.fill_column("path", read_path)
                        yield block
        
        #主要封装了多线程读取与单线程读取
        def create_read_task_fn(read_paths, num_threads):
            def read_task_fn():
                nonlocal num_threads, read_paths
                #多线程读取
                if num_threads > 0:
                    yield from make_async_gen(
                        iter(read_paths),
                        read_files,
                        num_workers=num_threads,
                        preserve_ordering=True,
                        buffer_size=max(len(read_paths) // num_threads, 1),
                    )
                #单线程读取
                else:
                    yield from read_files(read_paths)
            return read_task_fn
        # fix https://github.com/ray-project/ray/issues/24296
        parallelism = min(parallelism, len(paths))
        #根据用户传入对parallelism对文件路径进行分块,共生成parallelism个ReadTask
        read_tasks = []
        split_paths = np.array_split(paths, parallelism)
        split_file_sizes = np.array_split(file_sizes, parallelism)
        for read_paths, file_sizes in zip(split_paths, split_file_sizes):
            if len(read_paths) <= 0:
                continue
            meta = self._meta_provider(
                read_paths,
                self._schema,
                rows_per_file=self._rows_per_file(),
                file_sizes=file_sizes,
            )
            #封装实际的read_task_fn
            read_task_fn = create_read_task_fn(read_paths, self._NUM_THREADS_PER_TASK)
            #将read_task_fn传入ReadTask的构造方法,作为实际的文件读取方法
            read_task = ReadTask(read_task_fn, meta)
            read_tasks.append(read_task)
        return read_tasks
    def _read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block]:
        """Streaming read a single file.
        This method should be implemented by subclasses.
        """
        raise NotImplementedError(
            "Subclasses of FileBasedDatasource must implement _read_stream()."
        )

可以看出,FileBasedDatasource在初始化时对路径进行了解析,获取目录下所有文件与元数据。而在get_read_tasks方法中,首先定义了read_files,该方法遍历了所有文件路径,对每单个文件路径调用_read_stream方法读取成Block并返回,_read_stream是抽象方法,由子类实现。create_read_task_fn对read_files方法进行进一步封装,决定是单线程读取还是多线程读取。最终调用create_read_task_fn获取实际的read_task_fn,并封装到ReadTask类中进行返回。


下面看一下CSVDatasource是如何实现_read_stream方法来读取单个文件的。

CSVDatasource

class CSVDatasource(FileBasedDatasource):
    #重点,由父类FileBasedDatasource调用
    def _read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block]:
        import pyarrow as pa
        from pyarrow import csv
        try:
            reader = csv.open_csv(
                f,
                read_options=self.read_options,
                parse_options=self.parse_options,
                **self.arrow_csv_args,
            )
            schema = None
            while True:
                try:
                    batch = reader.read_next_batch()
                    table = pa.Table.from_batches([batch], schema=schema)
                    if schema is None:
                        schema = table.schema
                    yield table
                except StopIteration:
                    return

不难看出,_read_stream方法引入了csv库以进行数据读取,最终将读取出的数据转换为pyarrow的Table并返回,我们前文已经提到,底层中block的默认格式正是pyarrow.Table。


得益于良好的抽象定义,我们可以方便地实现自定义的Datasource,例如如果希望从一种新的文件格式中读取,只需要继承FileBasedDatasource并实现_read_stream方法即可。也可以继承Datasource类并实现get_read_tasks方法来实现任意数据源的读取。

read_datasource方法

对于所有的Datasource,在read_api中最终都会调用read_datasource方法来构建Dataset对象。

@PublicAPI
@wrap_auto_init
def read_datasource(
    datasource: Datasource,
    *,
    parallelism: int = -1,
    ray_remote_args: Dict[str, Any] = None,
    concurrency: Optional[int] = None,
    override_num_blocks: Optional[int] = None,
    **read_args,
) -> Dataset:
    """Read a stream from a custom :class:`~ray.data.Datasource`.

    Args:
        datasource: The :class:`~ray.data.Datasource` to read data from.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually setthis
            value in most cases.
        read_args: Additional kwargs to pass to the :class:`~ray.data.Datasource`
            implementation.

    Returns:
        :class:`~ray.data.Dataset` that reads data from the :class:`~ray.data.Datasource`.
    """  # noqa: E501
    parallelism = _get_num_output_blocks(parallelism, override_num_blocks)

    ctx = DataContext.get_current()

    if ray_remote_args is None:
        ray_remote_args = {}

    ifnot datasource.supports_distributed_reads:
        ray_remote_args["scheduling_strategy"] = NodeAffinitySchedulingStrategy(
            ray.get_runtime_context().get_node_id(),
            soft=False,
        )

    if"scheduling_strategy"not in ray_remote_args:
        ray_remote_args["scheduling_strategy"] = ctx.scheduling_strategy

    #为了兼容旧接口,一起Ray Data采用legacy_reader,现在采用datasource,这里直接认为datasource_or_legacy_reader=datasource即可
    datasource_or_legacy_reader = _get_datasource_or_legacy_reader(
        datasource,
        ctx,
        read_args,
    )

    cur_pg = ray.util.get_current_placement_group()
    requested_parallelism, _, inmemory_size = _autodetect_parallelism(
        parallelism,
        ctx.target_max_block_size,
        DataContext.get_current(),
        datasource_or_legacy_reader,
        placement_group=cur_pg,
    )

    #调用Datasource.get_read_tasks方法,获取ReadTask列表,主要用于构建DatasetStats
    read_tasks = datasource_or_legacy_reader.get_read_tasks(requested_parallelism)

    import uuid

    stats = DatasetStats(
        metadata={"Read": [read_task.metadata for read_task in read_tasks]},
        parent=None,
        needs_stats_actor=True,
        stats_uuid=uuid.uuid4(),
    )

    #核心代码,创建Read Operator,这是一个Logical Operator,将datasource传入
    read_op = Read(
        datasource,
        datasource_or_legacy_reader,
        parallelism,
        inmemory_size,
        len(read_tasks) if read_tasks else0,
        ray_remote_args,
        concurrency,
    )
    #创建ExecutionPlan,这个类最终会用于流式执行,可以理解为
    execution_plan = ExecutionPlan(stats)
    #创建LogicalPlan,并初始化DAG的源头为Read算子
    logical_plan = LogicalPlan(read_op, execution_plan._context)
    #构建Dataset
    return Dataset(
        plan=execution_plan,
        logical_plan=logical_plan,
    )

此处设计到的概念比较多,核心部分是先初始化一个Read对象,Read对象是LogicalOperator的子类,接着初始化一个ExecutionPlan与LogicalPlan,并用于初始化Dataset对象。ExecutionPlan是Ray Data流式执行的核心入口,负责在构建数据集时保存LogicalPlan,并在执行时将LogicalPlan转换为PhysicalPlan,最终进行执行。


LogicalPlan的定义则较为简单,只有一个_dag字段,类型为LogicalOperator。

LogicalPlan


class LogicalPlan(Plan):
    """The plan with a DAG of logical operators."""

    def __init__(self, dag: LogicalOperator, context: "DataContext"):
        super().__init__(context)
        self._dag = dag

    @property
    def dag(self) -> LogicalOperator:
        """Get the DAG of logical operators."""
        return self._dag

    def sources(self) -> List[LogicalOperator]:
        """List of operators that are sources forthis plan's DAG."""
        # If an operator has no input dependencies, it's a source.
        ifnotany(self._dag.input_dependencies):
            return [self._dag]

        sources = []
        for op in self._dag.input_dependencies:
            sources.extend(LogicalPlan(op, self._context).sources())
        return sources

有人可能疑惑为什么_dag字段是LogicalOperator类型,这是因为LogicalOperator是Operator类的子类,而Operator类通过input_dependencies字段保留了当前算子的前置依赖,因此通过DAG中的一个节点即可遍历整个图。

Operator

classOperator:
    """Abstract class for operators.

    Operators live on the driver side of the Dataset only.
    """

    def __init__(
        self,
        name: str,
        input_dependencies: List["Operator"],
    ):
        self._name = name
        self._input_dependencies = input_dependencies
        self._output_dependencies = []
        for x in input_dependencies:
            assert isinstance(x, Operator), x
            x._output_dependencies.append(self)

    @property
    def name(self) -> str:
        return self._name

    @property
    def dag_str(self) -> str:
        """String representation of the whole DAG."""
        if self.input_dependencies:
            out_str = ", ".join([x.dag_str for x in self.input_dependencies])
            out_str += " -> "
        else:
            out_str = ""
        out_str += f"{self.__class__.__name__}[{self._name}]"
        return out_str

    @property
    def input_dependencies(self) -> List["Operator"]:
        """List of operators that provide inputs for this operator."""
        assert hasattr(
            self, "_input_dependencies"
        ), "Operator.__init__() was not called."
        return self._input_dependencies

    @property
    def output_dependencies(self) -> List["Operator"]:
        """List of operators that consume outputs from this operator."""
        assert hasattr(
            self, "_output_dependencies"
        ), "Operator.__init__() was not called."
        return self._output_dependencies

    def post_order_iter(self) -> Iterator["Operator"]:
        """Depth-first traversal of this operator and its input dependencies."""
        for op in self.input_dependencies:
            yield from op.post_order_iter()
        yield self

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}[{self._name}]"

    def __str__(self) -> str:
        return repr(self)

在read_datasource的最后,会使用创建的LogicalPlan与ExecutionPlan初始化一个Dataset,在Dataset构造函数中,会调用ExcutionPlan的link_logical_plan方法,将LogicalPlan传入ExcutionPlan。

@PublicAPI
classDataset:
    def __init__(
        self,
        plan: ExecutionPlan,
        logical_plan: LogicalPlan,
    ):
        """Construct a Dataset (internal API).

        The constructor is not part of the Dataset API. Use the ``ray.data.*``
        read methods to construct a dataset.
        """
        assert isinstance(plan, ExecutionPlan), type(plan)
        usage_lib.record_library_usage("dataset")  # Legacy telemetry name.

        self._plan = plan
        self._logical_plan = logical_plan
        self._plan.link_logical_plan(logical_plan)

        # Handle to currently running executor forthis dataset.
        self._current_executor: Optional["Executor"] = None
        self._write_ds = None

        self._set_uuid(StatsManager.get_dataset_id_from_stats_actor())

至此,ds = ray.data.read_csv()语句就执行完毕了,创建了一个Dataset对象,并在其_plan字段中保存了一个ExecutionPlan,该ExecutionPlan保存了LogicalPlan,而LogicalPlan中保存了一个LogicalOperator的DAG,目前只有一个节点,即Read。

构建Map/MapBatches数据处理

接着我们分析ds = ds.map()中,map函数的源码。

map方法

@PublicAPI(api_group=BT_API_GROUP)
    def map(
        self,
        fn: UserDefinedFunction[Dict[str, Any], Dict[str, Any]],
        *,
        compute: Optional[ComputeStrategy] = None,
        fn_args: Optional[Iterable[Any]] = None,
        fn_kwargs: Optional[Dict[str, Any]] = None,
        fn_constructor_args: Optional[Iterable[Any]] = None,
        fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
        num_cpus: Optional[float] = None,
        num_gpus: Optional[float] = None,
        memory: Optional[float] = None,
        concurrency: Optional[Union[int, Tuple[int, int]]] = None,
        ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
        **ray_remote_args,
    ) -> "Dataset":
        #根据传入的fn类型确定计算方式,如果传入的是函数,则最终会通过Ray Task的方式执行计算
        #如果传入的是类,则最终会通过Ray Actor的方式执行
        compute = get_compute_strategy(
            fn,
            fn_constructor_args=fn_constructor_args,
            compute=compute,
            concurrency=concurrency,
        )

        #最后启动Ray Task/Actor时,会传入ray_remote_args。
        if num_cpus is not None:
            ray_remote_args["num_cpus"] = num_cpus

        if num_gpus is not None:
            ray_remote_args["num_gpus"] = num_gpus

        if memory is not None:
            ray_remote_args["memory"] = memory

        #拷贝数据集已有的ExecutionPlan
        plan = self._plan.copy()
        #定义MapRows LogicalOperator
        map_op = MapRows(
            self._logical_plan.dag, #之前的LogicalPlan的dag,即最后一个算子作为新建算子的前置算子
            fn,
            fn_args=fn_args,
            fn_kwargs=fn_kwargs,
            fn_constructor_args=fn_constructor_args,
            fn_constructor_kwargs=fn_constructor_kwargs,
            compute=compute,
            ray_remote_args_fn=ray_remote_args_fn,
            ray_remote_args=ray_remote_args,
        )
        #更新LogicalPlan,此时map_op已有前置输入
        logical_plan = LogicalPlan(map_op, self.context)
        return Dataset(plan, logical_plan)

可以看出,map方法定义了一个MapRows Operator,并传入Dataset已有的dag作为其input_dependencies,相当于将MapRows添加到DAG的末尾,接着更新logical_plan,将新创建的MapRows作为LogicalPlan的DAG入口,相当于LogicalPlan始终保存着DAG上的最后一个Operator。最后返回Dataset,可以看出ExecutionPlan仅是copy了一次,没有改变,而LogicalPlan新增了一个节点。

此时DAG变为了:Read-->MapRows

map_batches方法

map_batches方法与map方法类似,区别在于最终生成的是MapBatches Operator,此处不再赘述。

经过map_batches方法后,DAG变成了:Read->MapRows->MapBatches

输出数据

在执行到ds.write_csv("/mnt/result-data/")语句时,Ray Data首先构造一个Write操作,接着真正开始触发执行。

write_csv方法

@ConsumptionAPI
    @PublicAPI(api_group=IOC_API_GROUP)
    def write_csv(
        self,
        path: str,
        *,
        filesystem: Optional["pyarrow.fs.FileSystem"] = None,
        try_create_dir: bool = True,
        arrow_open_stream_args: Optional[Dict[str, Any]] = None,
        filename_provider: Optional[FilenameProvider] = None,
        arrow_csv_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
        min_rows_per_file: Optional[int] = None,
        ray_remote_args: Dict[str, Any] = None,
        concurrency: Optional[int] = None,
        num_rows_per_file: Optional[int] = None,
        **arrow_csv_args,
    ) -> None:
        #定义CSVDatasink,可以类比于读取时的Datasource
        datasink = CSVDatasink(
            path,
            arrow_csv_args_fn=arrow_csv_args_fn,
            arrow_csv_args=arrow_csv_args,
            min_rows_per_file=effective_min_rows,
            filesystem=filesystem,
            try_create_dir=try_create_dir,
            open_stream_args=arrow_open_stream_args,
            filename_provider=filename_provider,
            dataset_uuid=self._uuid,
        )
        self.write_datasink(
            datasink,
            ray_remote_args=ray_remote_args,
            concurrency=concurrency,
        )

可以看出,首先构建了一个CSVDatasink,该类是Datasink类的子类,Datasink可以类比于Datasource,定义了数据输出的方式,核心接口为write,需要子类实现。

Datasink

@DeveloperAPI
class Datasink(Generic[WriteReturnType]):
    def write(
        self,
        blocks: Iterable[Block],
        ctx: TaskContext,
    ) -> WriteReturnType:
        """Write blocks. This is used by a single write task.

        Args:
            blocks: Generator of data blocks.
            ctx: ``TaskContext`` for the write task.

        Returns:
            Result of this write task. When the entire write operator finishes,
            All returned values will be passed as `WriteResult.write_returns`
            to `Datasink.on_write_complete`.
        """
        raise NotImplementedError

可以看出,Datasink的write方法定义了如何将一个block写入到指定位置,并返回一个写入结果。我们可以具体看一个_FileDatasink的例子,该类是多种基于文件的Datasink的父类:

_FileDatasink

class _FileDatasink(Datasink[None]):
    #......
    def write(
        self,
        blocks: Iterable[Block],
        ctx: TaskContext,
    ) -> None:
        builder = DelegatingBlockBuilder()
        #遍历block的迭代器,合并block(block可能会被Split),并调用write_block方法。
        for block in blocks:
            builder.add_block(block)
        block = builder.build()
        block_accessor = BlockAccessor.for_block(block)

        if block_accessor.num_rows() == 0:
            logger.warning(f"Skipped writing empty block to {self.path}")
            return

        self.write_block(block_accessor, 0, ctx)

    #由子类实现
    def write_block(self, block: BlockAccessor, block_index: int, ctx: TaskContext):
        raise NotImplementedError


@DeveloperAPI
class BlockBasedFileDatasink(_FileDatasink):
    def write_block_to_file(self, block: BlockAccessor, file: "pyarrow.NativeFile"):
        """Write a block of data to a file.

        Args:
            block: The block to write.
            file: The file to write the block to.
        """
        raise NotImplementedError

    def write_block(self, block: BlockAccessor, block_index: int, ctx: TaskContext):
        #获取写入的文件名
        filename = self.filename_provider.get_filename_for_block(
            block, ctx.task_idx, block_index
        )
        #获取写入地址
        write_path = posixpath.join(self.path, filename)

        def write_block_to_path():
            #打开文件写入流
            with self.open_output_stream(write_path) as file:
                #由子类实现具体写入
                self.write_block_to_file(block, file)

        call_with_retry(
            write_block_to_path,
            description=f"write '{write_path}'",
            match=self._data_context.retried_io_errors,
        )

可以看出,_FileDatasink的write函数首先遍历Block的迭代器,获取每一个block并组装成一个大block,这主要是为了兼容Block可能会被Split的情况。接着调用write_block方法,该方法由子类重写。在BlockBasedFileDatasink子类中,write_block方法生成了一个文件地址,并打开文件流,最终调用子类的write_block_to_file方法将Block的数据写入到文件中。

CSVDatasink

class CSVDatasink(BlockBasedFileDatasink):
    def write_block_to_file(self, block: BlockAccessor, file: "pyarrow.NativeFile"):
        from pyarrow import csv

        writer_args = _resolve_kwargs(self.arrow_csv_args_fn, **self.arrow_csv_args)
        write_options = writer_args.pop("write_options", None)
        csv.write_csv(block.to_arrow(), file, write_options, **writer_args)

在CSVDatasink中,依然是调用了csv库来实现文件的写入,显然与Datasource一样,我们也可以自定义数据将被写入的位置。

write_datasink方法

同样的,在write_xxx方法中,最终都会调用write_datasink方法,该方法首先往DAG中添加一个Write Operator,接着触发数据集的真正执行。

@ConsumptionAPI(pattern="Time complexity:")
    def write_datasink(
        self,
        datasink: Datasink,
        *,
        ray_remote_args: Dict[str, Any] = None,
        concurrency: Optional[int] = None,
    ) -> None:
        #...
        #...
        plan = self._plan.copy()
        #往DAG中添加Write Operator
        write_op = Write(
            self._logical_plan.dag,
            datasink,
            ray_remote_args=ray_remote_args,
            concurrency=concurrency,
        )
        
        logical_plan = LogicalPlan(write_op, self.context)

        try:
            datasink.on_write_start()
            #通过materialize方法触发数据集的真正执行
            self._write_ds = Dataset(plan, logical_plan).materialize()
            #获取到数据集执行的结果,实际上是最后一个Operator的输出
            raw_write_results = ray.get(self._write_ds._plan.execute().block_refs)
            write_result = gen_datasink_write_result(raw_write_results)
            logger.info(
                "Data sink %s finished. %d rows and %s data written.",
                datasink.get_name(),
                write_result.num_rows,
                memory_string(write_result.size_bytes),
            )
            datasink.on_write_complete(write_result)

        except Exception as e:
            datasink.on_write_failed(e)
            raise

首先与之前类似,在DAG中添加了Write Operator,接着调用了Dataset.materialize()方法触发了数据集的真正执行,然后通过self._write_ds._plan.execute().block_refs方法得到执行结果,并输出到日志中。接着我们具体看一下数据集的执行过程。

流式执行

@ConsumptionAPI(pattern="store memory.", insert_after=True)
    @PublicAPI(api_group=E_API_GROUP)
    def materialize(self) -> "MaterializedDataset":
        #首先复制原本的数据集
        copy = Dataset.copy(self, _deep_copy=True, _as=MaterializedDataset)
        #直接触发数据集的执行,并获得最终执行结果
        bundle = copy._plan.execute()
        #定义一个新的LogicalPlan,只有一个InputData Operator,用于缓存数据集的执行结果
        logical_plan = LogicalPlan(InputData(input_data=ref_bundles), self.context)
        #将缓存执行结果的数据集输出
        output = MaterializedDataset(
            ExecutionPlan(copy._plan.stats(), data_context=copy.context),
            logical_plan,
        )
        # Metrics are tagged with `copy`s uuid, update the output uuid with
        # this so the user can access the metrics label.
        output.set_name(copy.name)
        output._set_uuid(copy._get_uuid())
        output._plan.execute()  # No-op that marks the plan as fully executed.
        return output

该方法调用了数据集的ExecutionPlan的execute方法来触发执行,并将执行结果保存在一个新的MaterializedDataset中返回。我们具体看一下ExecutionPlan.execute()方法的逻辑。

classExecutionPlan:
    def execute(
    self,
    preserve_order: bool = False,
) -> RefBundle:
    """Execute this plan.

    Args:
        preserve_order: Whether to preserve order in execution.

    Returns:
        The blocks of the output dataset.
    """
    self._has_started_execution = True

    # Always used the saved context for execution.
    context = self._context
    
    #判断下是否已经执行过了,如果执行过就不再重复执行
    ifnot self.has_computed_output():
        from ray.data._internal.execution.legacy_compat import (
            _get_initial_stats_from_plan,
            execute_to_legacy_block_list,
        )

        if self._logical_plan.dag.output_data() is not None:
            #...
        else:
            #如果没有执行过,则采用StreamingExecutor流式执行
            from ray.data._internal.execution.streaming_executor import (
                StreamingExecutor,
            )

            metrics_tag = create_dataset_tag(self._dataset_name, self._dataset_uuid)
            #初始化StreamingExecutor
            executor = StreamingExecutor(
                context,
                metrics_tag,
            )
            #真正触发executor的执行
            blocks = execute_to_legacy_block_list(
                executor,
                self,
                dataset_uuid=self._dataset_uuid,
                preserve_order=preserve_order,
            )
            bundle = RefBundle(
                tuple(blocks.iter_blocks_with_metadata()),
                owns_blocks=blocks._owned_by_consumer,
            )

        # Set the snapshot to the output of the finaloperator.
        self._snapshot_bundle = bundle
        self._snapshot_operator = self._logical_plan.dag
        self._snapshot_stats = stats
        self._snapshot_stats.dataset_uuid = self._dataset_uuid

    return self._snapshot_bundle

在execute方法中,首先判断是否已经执行过了,如果已经执行过则直接返回结果,我们重点看一下没有执行过的情况。此时会初始化一个StreamingExecutor,并调用execute_to_legacy_block_list方法。

def execute_to_legacy_block_list(
    executor: Executor,
    plan: ExecutionPlan,
    dataset_uuid: str,
    preserve_order: bool,
) -> BlockList:
    """Execute a plan with the new executor and translate it into a legacy block list.

    Args:
        executor: The executor to use.
        plan: The legacy plan to execute.
        dataset_uuid: UUID of the dataset forthis execution.
        preserve_order: Whether to preserve order in execution.

    Returns:
        The output as a legacy block list.
    """

    #Logical Operator到Physical Operator的转换在此处执行
    dag, stats = _get_execution_dag(
        executor,
        plan,
        preserve_order,
    )
    #对DAG流式执行
    bundles = executor.execute(dag, initial_stats=stats)
    #将RefBundle转换为BlockList,RefBundle事实上是一组block引用的集合,是Operators之间进行数据传输的基本单元
    #RefBudle通常存储1个Block以及其MetaData,但如果发生了block splitting,也可能存储多个
    block_list = _bundles_to_block_list(bundles)
    # Set the stats UUID after execution finishes.
    _set_stats_uuid_recursive(executor.get_stats(), dataset_uuid)
    return block_list

此处做了两件事,首先调用_get_execution_dag方法将LogicalPlan转换为PhysicalPlan,接着使用StreamingExecutor来执行PhysicalPlan,并返回结果。我们分别分析下这两个过程。

LogicalPlan转换为PhysicalPlan

_get_execution_dag

def _get_execution_dag(
    executor: Executor,
    plan: ExecutionPlan,
    preserve_order: bool,
) -> Tuple[PhysicalOperator, DatasetStats]:
    """Get the physical operators DAG from a plan."""
    # Record usage of logical operators if available.
    ifhasattr(plan, "_logical_plan")and plan._logical_plan is not None:
        record_operators_usage(plan._logical_plan.dag)

    # Get DAG of physical operators and input statistics.
    dag = get_execution_plan(plan._logical_plan).dag
    stats = _get_initial_stats_from_plan(plan)

    # Enforce to preserve ordering if the plan has operators
    # required to do so, such as Zip and Sort.
    if preserve_order or plan.require_preserve_order():
        executor._options.preserve_order = True

    return dag, stats

def get_execution_plan(logical_plan: LogicalPlan) -> PhysicalPlan:
    """Get the physical execution plan for the provided logical plan.

    This process has 3 steps:
    (1) logical optimization: optimize logical operators.
    (2) planning: convert logical to physical operators.
    (3) physical optimization: optimize physical operators.
    """
    optimized_logical_plan = LogicalOptimizer().optimize(logical_plan)
    logical_plan._dag = optimized_logical_plan.dag
    physical_plan = Planner().plan(optimized_logical_plan)
    return PhysicalOptimizer().optimize(physical_plan)

_get_execution_dag的核心思路就是将LogicalPlan转换为PhysicalPlan,顺便进行Plan的优化。该转换过程是由Planner().plan完成的。

Planner().plan

classPlanner:
    """The planner to convert optimized logical to physical operators.

    Note that planner is only doing operators conversion. Physical optimization work is
    done by physical optimizer.
    """

    def __init__(self):
        self._physical_op_to_logical_op: Dict[PhysicalOperator, LogicalOperator] = {}

    def plan(self, logical_plan: LogicalPlan) -> PhysicalPlan:
        """Convert logical to physical operators recursively in post-order."""
        physical_dag = self._plan(logical_plan.dag, logical_plan.context)
        physical_plan = PhysicalPlan(
            physical_dag,
            self._physical_op_to_logical_op,
            logical_plan.context,
        )
        return physical_plan

    def _plan(
        self, logical_op: LogicalOperator, data_context: DataContext
    ) -> PhysicalOperator:
        # Plan the input dependencies first.
        physical_children = []
        for child in logical_op.input_dependencies:
            physical_children.append(self._plan(child, data_context))

        physical_op = None
        for op_type, plan_fn in PLAN_LOGICAL_OP_FNS:
            if isinstance(logical_op, op_type):
                # We will call `set_logical_operators()` in the following for-loop,
                # no need to do it here.
                physical_op = plan_fn(logical_op, physical_children, data_context)
                break

        if physical_op is None:
            raise ValueError(
                f"Found unknown logical operator during planning: {logical_op}"
            )

        # Traverse up the DAG, andset the mapping from physical to logical operators.
        # At this point, all physical operators without logical operators set
        # must have been created by the current logical operator.
        queue = [physical_op]
        whilequeue:
            curr_physical_op = queue.pop()
            # Once we find an operator with a logical operatorset, we can stop.
            if curr_physical_op._logical_operators:
                break

            curr_physical_op.set_logical_operators(logical_op)
            queue.extend(physical_op.input_dependencies)

        self._physical_op_to_logical_op[physical_op] = logical_op
        return physical_op

_plan方法中,通过递归构建的方式将LogicalPlan转换为PhysicalPlan,最关键的语句是:physical_op = plan_fn(logical_op, physical_children, data_context)。这里通过ray data预定义好的plan_fn进行Operator转换。每个LogicalOperator都有其对应的转换函数。我们以读取数据时使用的Read为例具体分析下转换过程,Read的转换函数为plan_read_op,在此之前我们先看一下PhysicalOperator的结构。

PhysicalOperator

class PhysicalOperator(Operator):
    """Abstract class for physical operators.

    An operator transforms one or more input streams of RefBundles into a single
    output stream of RefBundles.
    Physical operators are stateful and non-serializable; they live on the driver side
    of the Dataset only.
    Here's a simple example of implementing a basic "Map"operator:
        class MapOperator(PhysicalOperator):
            def __init__(self):
                self.active_tasks = []
            def add_input(self, refs, _):
                self.active_tasks.append(map_task.remote(refs))
            def has_next(self):
                ready, _ = ray.wait(self.active_tasks, timeout=0)
                return len(ready) > 0
            def get_next(self):
                ready, remaining = ray.wait(self.active_tasks, num_returns=1)
                self.active_tasks = remaining
                return ready[0]
    Note that the above operator fully supports both bulk and streaming execution,
    since `add_input` and `get_next` can be called in any order. In bulk execution
    (now deprecated), all inputs would be added up-front, but in streaming
    execution (now the default execution mode) the calls could be interleaved.
    """
    _OPERATOR_ID_LABEL_KEY = "__data_operator_id"
    def __init__(
        self,
        name: str,
        input_dependencies: List["PhysicalOperator"],
        data_context: DataContext,
        target_max_block_size: Optional[int],
    ):
        super().__init__(name, input_dependencies)
        for x in input_dependencies:
            assert isinstance(x, PhysicalOperator), x
        self._inputs_complete = not input_dependencies
        self._output_block_size_option = None
        self.set_target_max_block_size(target_max_block_size)
        self._started = False
        self._in_task_submission_backpressure = False
        self._in_task_output_backpressure = False
        self._estimated_num_output_bundles = None
        self._estimated_output_num_rows = None
        self._execution_completed = False
        # The LogicalOperator(s) which were translated to create this PhysicalOperator.
        # Set via `PhysicalOperator.set_logical_operators()`.
        self._logical_operators: List[LogicalOperator] = []
        self._data_context = data_context
        self._id = str(uuid.uuid4())
        # Initialize metrics after data_context is set
        self._metrics = OpRuntimeMetrics(self)
    def start(self, options: ExecutionOptions) -> None:
        """Called by the executor when execution starts for an operator.
        Args:
            options: The global options used for the overall execution.
        """
        self._started = True
    def should_add_input(self) -> bool:
        """Return whether it is desirable to add input to thisoperator right now.
        Operators can customize the implementation of this method to apply additional
        backpressure (e.g., waiting for internal actors to be created).
        """
        return True
    def add_input(self, refs: RefBundle, input_index: int) -> None:
        """Called when an upstream result is available.
        Inputs may be added in any order, and calls to `add_input` may be interleaved
        with calls to `get_next` / `has_next` to implement streaming execution.
        Subclasses should override `_add_input_inner` instead of this method.
        Args:
            refs: The ref bundle that should be added as input.
            input_index: The index identifying the input dependency producing the
                input. For most operators, this is always `0` since there is only
                one upstream input operator.
        """
        assert 0 <= input_index < len(self._input_dependencies), (
            f"Input index out of bounds (total inputs {len(self._input_dependencies)}, "
            f"index is {input_index})"
        )
        self._metrics.on_input_received(refs)
        self._add_input_inner(refs, input_index)
    def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
        """Subclasses should overridethis method to implement `add_input`."""
        raise NotImplementedError
    def has_next(self) -> bool:
        """Returns when a downstream output is available.
        When this returns true, it is safe to call `get_next()`.
        """
        raise NotImplementedError
    def get_next(self) -> RefBundle:
        """Get the next downstream output.
        It is only allowed to call thisif `has_next()` has returned True.
        Subclasses should override `_get_next_inner` instead of this method.
        """
        output = self._get_next_inner()
        self._metrics.on_output_taken(output)
        return output
    def _get_next_inner(self) -> RefBundle:
        """Subclasses should overridethis method to implement `get_next`."""
        raise NotImplementedError

正如官方注释所说,PhysicalOperator将RefBundles输入流转换为RefBundles输出流,其核心方法为add_input,即接收输入,以及get_next,即获取输出。add_input内部实际调用了_add_input_inner抽象方法,该方法必须由子类实现,用于真正实现消费上游的输出。get_next方法则调用了_get_next_inner抽象方法,由子类具体实现如何获取输出。add_input与get_next可以被以任意顺序调用,从而实现流式执行。


下面我们看一下plan_read_op的实现是如何将Read这一LogicalOperator转换为PhysicalOperator的

plan_read_op方法

def plan_read_op(
    op: Read,
    physical_children: List[PhysicalOperator],
    data_context: DataContext,
) -> PhysicalOperator:
    """Get the corresponding DAG of physical operators for Read.

    Note this method only converts the given `op`, but not its input dependencies.
    See Planner.plan()for more details.
    """
    assert len(physical_children)== 0
    #首先定义get_input_data
    def get_input_data(target_max_block_size) -> List[RefBundle]:
        parallelism = op.get_detected_parallelism()
        assert (
            parallelism is not None
        ), "Read parallelism must be set by the optimizer before execution"
        #创建Read时传入了Datasource,此处调用get_read_tasks获取List[ReadTask]
        read_tasks = op._datasource_or_legacy_reader.get_read_tasks(parallelism)
        _warn_on_high_parallelism(parallelism, len(read_tasks))

        #将ReadTask封装为Block
        ret = []
        for read_task in read_tasks:
            read_task_ref = ray.put(read_task)
            ref_bundle = RefBundle(
                [
                    (
                        # TODO(chengsu): figure out a better way to pass read
                        # tasks other than ray.put().
                        read_task_ref,
                        cleaned_metadata(read_task, read_task_ref),
                    )
                ],
                # `owns_blocks` is False, because these refs are the root of the
                # DAG. We shouldn't eagerly free them. Otherwise, the DAG cannot
                # be reconstructed.
                owns_blocks=False,
            )
            ret.append(ref_bundle)
        return ret
    #InputDataBuffer是一个PhysicalOperator,其输出是ReadTask
    inputs = InputDataBuffer(
        data_context,
        input_data_factory=get_input_data,
    )

    #定义实际调用ReadTask来进行读取的方法
    def do_read(blocks: Iterable[ReadTask], _: TaskContext) -> Iterable[Block]:
        for read_task in blocks:
            yield from read_task()

    
    # Create a MapTransformer for a read operator
    transform_fns: List[MapTransformFn] = [
        # First, execute the read tasks.
        #将do_read封装到BlockMapTransformFn中
        BlockMapTransformFn(do_read),
    ]
    #接着构建输出Block
    transform_fns.append(BuildOutputBlocksMapTransformFn.for_blocks())

    #MapTransformer封装了MapOperator的数据转换逻辑
    map_transformer = MapTransformer(transform_fns)

    return MapOperator.create(
        map_transformer,
        inputs,
        data_context,
        name=op.name,
        target_max_block_size=None,
        compute_strategy=TaskPoolStrategy(op._concurrency),
        ray_remote_args=op._ray_remote_args,
    )

首先定义了get_input_data函数,该函数调用了Datasource.get_read_tasks()方法获取了ReadTask列表,并将其封装到RefBundle中(此时的ReadTask可以理解为是一种特殊的Block)。接着初始化了一个InputDataBuffer,并设置其input_data_factory字段为get_input_data函数。InputDataBuffer属于PhysicalOperator,实现如下:

InputDataBuffer

class InputDataBuffer(PhysicalOperator):
    """Defines the input data for the operator DAG.

    For example, this may hold cached blocks from a previous Dataset execution, or
    the arguments for read tasks.
    """

    def __init__(
        self,
        data_context: DataContext,
        input_data: Optional[List[RefBundle]] = None,
        input_data_factory: Optional[Callable[[int], List[RefBundle]]] = None,
        num_output_blocks: Optional[int] = None,
    ):
        """Create an InputDataBuffer.

        Args:
            input_data: The list of bundles to output from thisoperator.
            input_data_factory: The factory to get input data, if input_data is None.
            num_output_blocks: The number of output blocks. If not specified, progress
                bars total will be set based on num output bundles instead.
        """
        super().__init__("Input", [], data_context, target_max_block_size=None)
        if input_data is not None:
            assert input_data_factory is None
            # Copy the input data to avoid mutating the original list.
            self._input_data = input_data[:]
            self._is_input_initialized = True
            self._initialize_metadata()
        else:
            # Initialize input lazily when execution is started.
            assert input_data_factory is not None
            self._input_data_factory = input_data_factory
            self._is_input_initialized = False
        self._input_data_index = 0

    def start(self, options: ExecutionOptions) -> None:
        ifnot self._is_input_initialized:
            #调用_input_data_factory方法,获取ReadTask引用列表
            self._input_data = self._input_data_factory(
                self.actual_target_max_block_size
            )
            self._is_input_initialized = True
            self._initialize_metadata()
        # InputDataBuffer does not take inputs from other operators,
        # so we record input metrics here
        for bundle in self._input_data:
            self._metrics.on_input_received(bundle)
        super().start(options)

    def has_next(self) -> bool:
        return self._input_data_index < len(self._input_data)

    def _get_next_inner(self) -> RefBundle:
        # We can't pop the input data. If we do, Ray might garbage collect the block
        # references, and Ray won't be able to reconstruct downstream objects.
        bundle = self._input_data[self._input_data_index]
        self._input_data_index += 1
        return bundle

    def get_stats(self) -> StatsDict:
        return {}

    def _add_input_inner(self, refs, input_index) -> None:
        raise ValueError("Inputs are not allowed for this operator.")

InputDataBuffer在init时传入了input_data_factory,用于获取ReadTask引用。在start方法中调用了input_data_factory(),并将得到的List[Refbundle(ReadTask)]存储在self._input_data字段中。


在_get_next_inner实现中,返回了一个ReadTask的引用,也就是说InputDataBuffer每次输出一个ReadTask。


InputDataBuffer没有定义_add_input_inner方法,因为InputDataBuffer一定是DAG中的第一个PhysicalOperator,是数据读取的源头。


plan_read_op,在定义了InputDataBuffer后,接着定义了一个do_read方法,该方法的输入为Iterable[ReadTask],输出为Iterable[Block],实际上调用了ReadTask来执行数据读取。do_read接着被封装到了一个MapTransformFn中。MapTransformFn是对转换函数的抽象,定义如下:

MapTransformFn

classMapTransformFn:
    """Represents a single transform function in a MapTransformer."""

    def __init__(
        self,
        input_type: MapTransformFnDataType,
        output_type: MapTransformFnDataType,
        category: MapTransformFnCategory,
        is_udf: bool = False,
    ):
        """
        Args:
            callable: the underlying Python callable object.
            input_type: the type of the input data.
            output_type: the type of the output data.
        """
        self._callable = callable
        self._input_type = input_type
        self._output_type = output_type
        self._category = category
        self._output_block_size_option = None
        self._is_udf = is_udf

    #由子类具体实现
    @abstractmethod
    def __call__(
        self,
        input: Iterable[MapTransformFnData],
        ctx: TaskContext,
    ) -> Iterable[MapTransformFnData]:
        ...

    @property
    def input_type(self) -> MapTransformFnDataType:
        return self._input_type

    @property
    def output_type(self) -> MapTransformFnDataType:
        return self._output_type

class BlockMapTransformFn(MapTransformFn):
    """A block-to-block MapTransformFn."""

    def __init__(self, block_fn: MapTransformCallable[Block, Block]):
        self._block_fn = block_fn
        super().__init__(
            MapTransformFnDataType.Block,
            MapTransformFnDataType.Block,
            category=MapTransformFnCategory.DataProcess,
        )

    def __call__(self, input: Iterable[Block], ctx: TaskContext) -> Iterable[Block]:
        yield from self._block_fn(input, ctx)

    def __repr__(self) -> str:
        return f"BlockMapTransformFn({self._block_fn})"

    def __eq__(self, other):
        return (
            isinstance(other, BlockMapTransformFn) and self._block_fn == other._block_fn
        )

MapTransformerFn定义了输入输出的数据格式,主要有三种:Block、Row、Batch。其具体逻辑由子类实现,例如BlockMapTransformFn就是单纯的调用传入的block_fn,将Block转换为另一个Block。也有一些子类实现Batch、Block、Row之间的相互转换。

MapTransformer

classMapTransformer:
    """Encapsulates the data transformation logic of a physical MapOperator.

    A MapTransformer may consist of one or more steps, each of which is represented
    as a MapTransformFn. The first MapTransformFn must take blocks as input, and
    the last MapTransformFn must output blocks. The intermediate data types can
    be blocks, rows, or batches.
    """

    def __init__(
        self,
        transform_fns: List[MapTransformFn],
        init_fn: Optional[Callable[[], None]] = None,
    ):
        """
        Args:
        transform_fns: A list of `MapTransformFn`s that will be executed sequentially
            to transform data.
        init_fn: A function that will be called before transforming data.
            Used for the actor-based map operator.
        """
        self.set_transform_fns(transform_fns)

        self._init_fn = init_fn if init_fn is not None else lambda: None
        self._output_block_size_option = None
        self._udf_time = 0

    def apply_transform(
        self,
        input_blocks: Iterable[Block],
        ctx: TaskContext,
    ) -> Iterable[Block]:
        iter = input_blocks
        # Apply the transform functions sequentially to the input iterable.
        for transform_fn in self._transform_fns:
            iter = transform_fn(iter, ctx)
        return iter

MapTransformer则存储了一组MapTransformerFn列表,其核心方法是apply_transform,按照顺序连续调用MapTransformerFn列表,注意MapTransformer的输入和输出必须都是Block,因此第一个MapTransformerFn的输入一定是Block,而最后一个MapTransformerFn的输出也必须是Block。


在plan_read_op中,实际上生成了一个带有两个MapTransformerFn的MapTransformer,第一个是BlockMapTransformFn(do_read),封装了真正执行读取数据的do_read函数。下一个是BuildOutputBlocksMapTransformFn.for_blocks(),这是为了输出一个标准大小的Block,例如如果系统定义了一个Block的行数是1024,那么BuildOutputBlocksMapTransformFn会每次输出一个1024行的Block。最终这个MapTransformer被传入到MapOperator.create()方法中进行初始化。MapOperator是最常见的PhysicalOperator之一,几乎所有的数据转换都依赖于MapOperator,其内部封装了一个MapTransformer,并最终通过Ray Task/Actor来分布式执行该转换。

MapOperator.create()

class MapOperator(OneToOneOperator, ABC):
    @classmethod
    def create(
        cls,
        map_transformer: MapTransformer,
        input_op: PhysicalOperator,
        data_context: DataContext,
        target_max_block_size: Optional[int] = None,
        name: str = "Map",
        # TODO(ekl): slim down ComputeStrategy to only specify the compute
        # config andnot contain implementation code.
        compute_strategy: Optional[ComputeStrategy] = None,
        min_rows_per_bundle: Optional[int] = None,
        supports_fusion: bool = True,
        ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
        ray_remote_args: Optional[Dict[str, Any]] = None,
    ) -> "MapOperator":
        """Create a MapOperator.

        This factory creates the MapOperator pool implementation that corresponds to the
        compute argument:
            - If None or TaskPoolStrategy -> TaskPoolMapOperator
            - If ActorPoolStrategy -> ActorPoolMapOperator

        Args:
            transform_fn: The function to apply to each ref bundle input.
            input_op: Operator generating input data forthis op.
            init_fn: The callable class to instantiate ifusing ActorPoolMapOperator.
            name: The name of thisoperator.
            compute_strategy: Customize the compute strategy forthis op.
            target_max_block_size: The target maximum number of bytes to
                include in an output block.
            min_rows_per_bundle: The number of rows to gather per batch passed to the
                transform_fn, or None to use the block size. Setting the batch size is
                important for the performance of GPU-accelerated transform functions.
                The actual rows passed may be less if the dataset is small.
            supports_fusion: Whether thisoperator supports fusion with other operators.
            ray_remote_args_fn: A function that returns a dictionary of remote args
                passed to each map worker. The purpose of this argument is to generate
                dynamic arguments for each actor/task, and will be called each time
                prior to initializing the worker. Args returned from this dict will
                always override the args in ``ray_remote_args``. Note: this is an
                advanced, experimental feature.
            ray_remote_args: Customize the :func:`ray.remote` args forthis op's tasks.
        """
        #默认采用Ray Task方式执行
        if compute_strategy is None:
            compute_strategy = TaskPoolStrategy()

        ifisinstance(compute_strategy, TaskPoolStrategy):
            from ray.data._internal.execution.operators.task_pool_map_operator import(
                TaskPoolMapOperator,
            )
            #对于TaskPoolStrategy,采用TaskPoolMapOperator
            returnTaskPoolMapOperator(
                map_transformer,
                input_op,
                data_context,
                name=name,
                target_max_block_size=target_max_block_size,
                min_rows_per_bundle=min_rows_per_bundle,
                concurrency=compute_strategy.size,
                supports_fusion=supports_fusion,
                ray_remote_args_fn=ray_remote_args_fn,
                ray_remote_args=ray_remote_args,
            )
        elif isinstance(compute_strategy, ActorPoolStrategy):
            from ray.data._internal.execution.operators.actor_pool_map_operator import(
                ActorPoolMapOperator,
            )
            #对于ActorPoolStrategy,采用ActorPoolMapOperator
            returnActorPoolMapOperator(
                map_transformer,
                input_op,
                data_context,
                target_max_block_size=target_max_block_size,
                compute_strategy=compute_strategy,
                name=name,
                min_rows_per_bundle=min_rows_per_bundle,
                supports_fusion=supports_fusion,
                ray_remote_args_fn=ray_remote_args_fn,
                ray_remote_args=ray_remote_args,
            )
        else:
            raise ValueError(f"Unsupported execution strategy {compute_strategy}")

在MapOperator.create函数中,会通过compute_strategy来决定初始化MapOperator两个子类中的哪一个。在我们自定义map方法时,如果传入的是类,就会采用ActorPoolMapOperator执行,如果传入的是函数,则会采用TaskPoolMapOperator来执行。

下面我们以ActorPoolMapOperator为例进行分析,该类是Ray Data最核心的部分之一。

ActorPoolMapOperator

1)构造方法

class ActorPoolMapOperator(MapOperator):
    """A MapOperator implementation that executes tasks on an actor pool.
    """

    def __init__(
        self,
        map_transformer: MapTransformer,
        input_op: PhysicalOperator,
        data_context: DataContext,
        target_max_block_size: Optional[int],
        compute_strategy: ActorPoolStrategy,
        name: str = "ActorPoolMap",
        min_rows_per_bundle: Optional[int] = None,
        supports_fusion: bool = True,
        ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
        ray_remote_args: Optional[Dict[str, Any]] = None,
    ):
        super().__init__(
            map_transformer,
            input_op,
            data_context,
            name,
            target_max_block_size,
            min_rows_per_bundle,
            supports_fusion,
            ray_remote_args_fn,
            ray_remote_args,
        )

        #初始化ActorPool
        self._actor_pool = _ActorPool(
            compute_strategy,
            self._start_actor,
            per_actor_resource_usage,
            self.data_context._enable_actor_pool_on_exit_hook,
        )
        # A queue of bundles awaiting dispatch to actors.
        self._bundle_queue = create_bundle_queue()
        # Cached actor class.
        self._cls = None
        # Whether no more submittable bundles will be added.
        self._inputs_done = False

在ActorPoolMapOperator的构造方法中,首先初始化了一个_ActorPool,这个_ActorPool中维护了Ray Actor的创建、销毁、监控等方法,并可根据多种条件进行扩容与缩容。接着初始化了一个_bundle_queue,保存着所有等待处理的输入队列。_ActorPool中具体保存的Actor类是_MapWorker,该类封装了map_transformer,并在submit方法中实现了真正的map_transformer的调用执行逻辑。

2)_MapWorker

class _MapWorker:
    """An actor worker for MapOperator."""

    def __init__(
        self,
        ctx: DataContext,
        src_fn_name: str,
        map_transformer: MapTransformer,
    ):
        DataContext._set_current(ctx)
        self.src_fn_name: str = src_fn_name
        self._map_transformer = map_transformer
        # Initialize state forthis actor.
        self._map_transformer.init()

    def get_location(self) -> NodeIdStr:
        return ray.get_runtime_context().get_node_id()

    def submit(
        self,
        data_context: DataContext,
        ctx: TaskContext,
        *blocks: Block,
        **kwargs: Dict[str, Any],
    ) -> Iterator[Union[Block, List[BlockMetadata]]]:
        yield from _map_task(
            self._map_transformer,
            data_context,
            ctx,
            *blocks,
            **kwargs,
        )

get_location方法可以用于获取这个Actor的节点位置。重点看下submit方法,使用_map_task方法对map_transformer真正调用执行:

3)_map_task

def _map_task(
    map_transformer: MapTransformer,
    data_context: DataContext,
    ctx: TaskContext,
    *blocks: Block,
    **kwargs: Dict[str, Any],
) -> Iterator[Union[Block, List[BlockMetadata]]]:
    """Remote function for a single operator task.

    Args:
        fn: The callable that takes Iterator[Block] as input and returns
            Iterator[Block] as output.
        blocks: The concrete block values from the task ref bundle.

    Returns:
        A generator of blocks, followed by the list of BlockMetadata for the blocks
        as the last generator return.
    """
    DataContext._set_current(data_context)
    ctx.kwargs.update(kwargs)
    TaskContext.set_current(ctx)
    stats = BlockExecStats.builder()
    map_transformer.set_target_max_block_size(ctx.target_max_block_size)
    with MemoryProfiler(data_context.memory_usage_poll_interval_s) as profiler:
        for b_out in map_transformer.apply_transform(iter(blocks), ctx):
            # TODO(Clark): Add input file propagation from input blocks.
            m_out = BlockAccessor.for_block(b_out).get_metadata()
            m_out.exec_stats = stats.build()
            m_out.exec_stats.udf_time_s = map_transformer.udf_time()
            m_out.exec_stats.task_idx = ctx.task_idx
            m_out.exec_stats.max_uss_bytes = profiler.estimate_max_uss()
            yield b_out
            yield m_out
            stats = BlockExecStats.builder()
            profiler.reset()

    TaskContext.reset_current()

_map_task方法调用了MapTransformer.apply_transform处理输入数据,并返回输出Block与BlockMetadeta的生成器。生成器第一次输出结果Block,第二次输出与之对应的BlockMetadata。(但是注意此处代码注释中写到生成器先返回所有Block,再返回List[BlockMetadata],与实际逻辑不符,原因未知)。

4)_start_actor

def _start_actor(self):
        """Start a new actor and add it to the actor pool as a pending actor."""
        assert self._cls is not None
        ctx = self.data_context
        if self._ray_remote_args_fn:
            self._refresh_actor_cls()
        actor = self._cls.options(
            _labels={self._OPERATOR_ID_LABEL_KEY: self.id}
        ).remote(
            ctx,
            src_fn_name=self.name,
            map_transformer=self._map_transformer,
        )
        res_ref = actor.get_location.options(name=f"{self.name}.get_location").remote()

        def _task_done_callback(res_ref):
            # res_ref is a futurefor a now-ready actor; move actor from pending to the
            # active actor pool.
            has_actor = self._actor_pool.pending_to_running(res_ref)
            ifnot has_actor:
                # Actor has already been killed.
                return
            # A new actor has started, we try to dispatch queued tasks.
            self._dispatch_tasks()

        self._submit_metadata_task(
            res_ref,
            lambda: _task_done_callback(res_ref),
        )
        return actor, res_ref

_start_actor方法定义了启动单个Actor的逻辑。


首先获取到Actor类,即_MapWorker,使用ray.remote函数创建一个actor,此时actor处于pending状态。


通过调用res_ref = actor.get_location.remote()方法,可以触发get_location方法的远程执行,并返回调用结果的ObjectRef。


这个res_ref其实可以认为是Actor创建是否成功的标志,如果使用ray.get(res_ref)尝试获取该ObjectRef的值,那么只有在actor创建完毕并远程执行完get_location()后,才可以获取到res_ref的真正数据,否则ray.get()会被阻塞。ActorPool会维护一个_pending_actors字段,结构为Dict[ObjectRef, ray.actor.ActorHandle]。保存了res_ref到ActorHandle的映射关系。在res_ref计算完成后,ActorPool会调用pending_to_running方法,将这个Actor设置为Running状态,允许向其提交任务。


此处调用了_submit_metadata_task方法,针对res_ref提交了一个MetadataOpTask,该方法的定义位于父类MapOperator中,主要逻辑是将ObjectRef封装到MetadataOpTask中,MetadataOpTask封装了一个ObjectRef与对应的task_done_callback回调函数,在Ray Data的调度循环中,会尝试使用ray.wait()来判断每个MetadataOpTask的object_ref是否计算完成,并在计算完成后调用_task_done_callback()。


也就是说在res_ref计算完成后,会自动调用_task_done_callback,将Actor状态由Pending变为Running,并调用_dispatch_tasks尝试处理新任务,_dispatch_tasks方法将在后面详细解释。

class MapOperator(OneToOneOperator, ABC):
    def _submit_metadata_task(
        self, result_ref: ObjectRef, task_done_callback: Callable[[], None]
    ):
        """Submit a new metadata-handling task."""
        task_index = self._next_metadata_task_idx
        self._next_metadata_task_idx += 1

        def _task_done_callback():
            self._metadata_tasks.pop(task_index)
            task_done_callback()

        #在Operator中维护一个列表,保存所有提交的MetadataOpTask,后续调度系统会遍历该列表
        self._metadata_tasks[task_index] = MetadataOpTask(
            task_index, result_ref, _task_done_callback
        )

class MetadataOpTask(OpTask):
    """Represents an OpTask that only handles metadata, instead of Block data."""

    def __init__(
        self,
        task_index: int,
        object_ref: ray.ObjectRef,
        task_done_callback: Callable[[], None],
        task_resource_bundle: Optional[ExecutionResources] = None,
    ):
        """
        Args:
            object_ref: The ObjectRef of the task.
            task_done_callback: The callback to call when the task is done.
        """
        super().__init__(task_index, task_resource_bundle)
        self._object_ref = object_ref
        self._task_done_callback = task_done_callback

    #如何标志MetadataOpTask已经完成?只需要_object_ref变成ready,即代表ray的异步远程计算已经完成。
    def get_waitable(self) -> ray.ObjectRef:
        return self._object_ref

    def on_task_finished(self):
        """Callback when the task is finished."""
        self._task_done_callback()

5)start

def start(self, options: ExecutionOptions):
    self._actor_locality_enabled = options.actor_locality_enabled
    super().start(options)

    # Create the actor workers and add them to the pool.
    self._cls = ray.remote(**self._ray_remote_args)(_MapWorker)
    self._actor_pool.scale_up(self._actor_pool.min_size())
    refs = self._actor_pool.get_pending_actor_refs()

    # We synchronously wait for the initial number of actors to start. This avoids
    # situations where the scheduler is unable to schedule downstream operators
    # due to lack of available actors, causing an initial "pileup" of objects on
    # upstream operators, leading to a spike in memory usage prior to steady state.
    logger.debug(f"{self._name}: Waiting for {len(refs)} pool actors to start...")
    try:
        timeout = self.data_context.wait_for_min_actors_s
        ray.get(refs, timeout=timeout)
    except ray.exceptions.GetTimeoutError:
        raise ray.exceptions.GetTimeoutError(
            "Timed out while starting actors. "
            "This may mean that the cluster does not have "
            "enough resources for the requested actor pool."
        )

在start方法中,定义了ActorPoolMapOperator的启动逻辑,首先调用_actor_pool.scale_up方法,按照用户定义的Actor数量扩容_actor_pool。此时_actor_pool会循环调用_start_actor来启动新的Actor。

接着通过ray.get()等待所有的res_ref,即等待Actor启动成功。如果超过了最大等待时间依然没有启动足够的Actor(通常由于集群资源不足,或Actor初始化方法有问题),则直接抛异常停止任务执行。

6)_add_bundled_input

def _add_bundled_input(self, bundle: RefBundle):
    self._bundle_queue.add(bundle)
    self._metrics.on_input_queued(bundle)
    # Try to dispatch all bundles in the queue, including thisnew bundle.
    self._dispatch_tasks()

前面已经介绍过,每个PhysicalOperator都要实现_add_bundled_input方法,定义如何处理新的输入数据,此处可以看出ActorPoolMapOperator定义了将输入的RefBundle添加到_bundle_queue队列中,并调用_dispatch_tasks方法尝试处理数据。_dispatch_tasks是处理数据的核心逻辑:

7)_dispatch_tasks

def _dispatch_tasks(self):
    """Try to dispatch tasks from the bundle buffer to the actor pool.

    This is called when:
        * a new input bundle is added,
        * a task finishes,
        * a new worker has been created.
    """
    #循环处理_bundle_queue中的所有数据
    while self._bundle_queue:
        # 从ActorPool中挑选一个Running状态的Actor
        if self._actor_locality_enabled:
            actor = self._actor_pool.pick_actor(self._bundle_queue.peek())
        else:
            actor = self._actor_pool.pick_actor()
        #如果没有正在运行的actor,则无法调度任务,等待下次执行
        if actor is None:
            # No actors available for executing the next task.
            break
        #获取一个bundle
        bundle = self._bundle_queue.pop()
        self._metrics.on_input_dequeued(bundle)
        input_blocks = [block for block, _ in bundle.blocks]
        ctx = TaskContext(
            task_idx=self._next_data_task_idx,
            target_max_block_size=self.actual_target_max_block_size,
        )
        #远程调用actor.submit方法,submit方法最终调用的是_map_task。返回的是Block结果的生成器
        gen = actor.submit.options(
            num_returns="streaming",
            name=f"{self.name}.submit",
            **self._ray_actor_task_remote_args,
        ).remote(
            self.data_context,
            ctx,
            *input_blocks,
            **self.get_map_task_kwargs(),
        )
        #定义任务结束的回调函数,
        def _task_done_callback(actor_to_return):
            # Return the actor that was running the task to the pool.
            self._actor_pool.return_actor(actor_to_return)
            # Dipsatch more tasks.
            self._dispatch_tasks()

        from functools import partial

        #封装任务到DataOpTask中
        self._submit_data_task(
            gen, bundle, partial(_task_done_callback, actor_to_return=actor)
        )

可以看出_dispatch_tasks的核心逻辑如下:

1. 循环_bundle_queue,获取待处理的Block数据;

2. 从Pool中挑选出一个Running状态的actor;

3. 调用actor.submit.remote()方法提交任务到Ray集群分布式执行,处理;

4. 调用_submit_data_task方法,将submit返回的ObjectRefGenerator封装到DataOpTask中。

此处submit返回的gen是一个ObjectRefGenerator类型的生成器,这是由于submit方法采用yield from _map_task返回了一个Generator,在ray remote转换后返回的即为ObjectRefGenerator,可以通过next方法获取下一个ObjectRef。也可以使用ray.wait方法等待生成器的下一个输出计算完成。参考文档:https://docs.ray.io/en/latest/ray-core/ray-generator.html#generators

这个生成器最终通过调用_submit_data_task被封装到了DataOpTask中:

def _submit_data_task(
        self,
        gen: ObjectRefGenerator,
        inputs: RefBundle,
        task_done_callback: Optional[Callable[[], None]] = None,
    ):
        """Submit a new data-handling task."""
        # TODO(hchen):
        # 1. Move this to the base PhyscialOperator class.
        # 2. This method should only take a block-processing function as input,
        #    instead of a streaming generator. The logic of submitting ray tasks
        #    can also be capsulated in the base class.
        task_index = self._next_data_task_idx
        self._next_data_task_idx += 1
        self._metrics.on_task_submitted(task_index, inputs)

        #每次有新的数据计算完成,则回调该函数
        def _output_ready_callback(task_index, output: RefBundle):
            # Since output is streamed, it should only contain one block.
            assert len(output) == 1
            self._metrics.on_task_output_generated(task_index, output)

            # Notify output queue that the task has produced an new output.
            self._output_queue.notify_task_output_ready(task_index, output)
            self._metrics.on_output_queued(output)
        #所有数据计算完成,则回调该函数
        def _task_done_callback(task_index: int, exception: Optional[Exception]):
            self._data_tasks.pop(task_index)
            # Notify output queue that this task is complete.
            self._output_queue.notify_task_completed(task_index)
            if task_done_callback:
                task_done_callback()

        #封装到DataOpTask中
        self._data_tasks[task_index] = DataOpTask(
            task_index,
            gen,
            lambda output: _output_ready_callback(task_index, output),
            functools.partial(_task_done_callback, task_index),
        )

_submit_data_task是父类MapOperator定义的方法,主要是定义了_output_ready_callback、 _task_done_callback等回调函数,并和ObjectRefGenerator生成器一起封装到DataOpTask中。DataOpTask代表了PhysicalOperator提交到ray集群远程执行的一个分布式任务。

8)DataOpTask

class DataOpTask(OpTask):
    """Represents an OpTask that handles Block data."""

    def __init__(
        self,
        task_index: int,
        streaming_gen: ObjectRefGenerator,
        output_ready_callback: Callable[[RefBundle], None],
        task_done_callback: Callable[[Optional[Exception]], None],
        task_resource_bundle: Optional[ExecutionResources] = None,
    ):
        """
        Args:
            streaming_gen: The streaming generator of this task. It should yield blocks.
            output_ready_callback: The callback to call when a new RefBundle is output
                from the generator.
            task_done_callback: The callback to call when the task is done.
        """
        super().__init__(task_index, task_resource_bundle)
        self._streaming_gen = streaming_gen
        self._output_ready_callback = output_ready_callback
        self._task_done_callback = task_done_callback

    def get_waitable(self) -> ObjectRefGenerator:
        return self._streaming_gen

    def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int:
        """Callback when data is ready to be read from the streaming generator.

        Args:
            max_bytes_to_read: Max bytes of blocks to read. If None, all available
                will be read.
        Returns: The number of blocks read.
        """
        bytes_read = 0
        while max_bytes_to_read is None or bytes_read < max_bytes_to_read:
            try:
                block_ref = self._streaming_gen._next_sync(0)
                if block_ref.is_nil():
                    # The generator currently doesn't have new output.
                    # And it's not stopped yet.
                    break
            except StopIteration:
                self._task_done_callback(None)
                break

            try:
                meta = ray.get(next(self._streaming_gen))
            except StopIteration:
                # The generator should always yield 2 values (block and metadata)
                # each time. If we get a StopIteration here, it means an error
                # happened in the task.
                # And in thiscase, the block_ref is the exception object.
                # TODO(hchen): Ray Core should have a better interface for
                # detecting and obtaining the exception.
                try:
                    ray.get(block_ref)
                    assert False, "Above ray.get should raise an exception."
                except Exception as ex:
                    self._task_done_callback(ex)
                    raise ex from None
            self._output_ready_callback(
                RefBundle([(block_ref, meta)], owns_blocks=True)
            )
            bytes_read += meta.size_bytes
        return bytes_read

DataOpTask中,封装了由actor.submit方法返回的生成器_streaming_gen。在每次流式执行循环中,会尝试调用ray.wait等待_streaming_gen产生输出,并在输出完成后调用on_data_ready方法,该方法具体逻辑:


1. 调用block_ref = self._streaming_gen._next_sync(0),从_streaming_gen中获取计算好的Block数据;

2. 如果出现StopIteration,则_streaming_gen没有后续输出了,回调_task_done_callback;

3. 调用meta = ray.get(next(self._streaming_gen)),从_streaming_gen中获取计算好的BlockMetaData数据,这里与_map_task的实现对应上了,即第一次yield Block,第二次yield BlockMetadata;

4. 将Block与BlockMetadata封装到RefBundle中,并回调_output_ready_callback。在MapOperator._submit_data_task中定义了该回调函数,会将输出结果放到MapOperator的输出缓存中,后续可以用get_next方法获取结果。


至此,我们已经将ActorPoolMapOperator相关的内容介绍完毕,TaskPoolMapOperator不再介绍,最终也是通过ray.remote提交任务,并封装到DataOpTask,区别在于TaskPoolMapOperator直接使用函数提交_map_task,没有使用_MapWroker类进行封装。


再次回顾之前的plan_read_op方法,在最后将do_read方法通过MapTransformer封装到了一个TaskPoolMapOperator或ActorPoolMapOperator中,以实现分布式执行。plan_read_op将Read这一LogicalOperator转换为两个PhysicalOperator,最终生成的DAG为:

InputDataBuffer-->TaskPoolMapOperator(ReadCSV)

InputDataBuffer的输出即ReadTask,是TaskPoolMapOperator(ReadCSV)的输入,而TaskPoolMapOperator(ReadCSV)接收ReadTask作为输入,并通过ray.remote分布式执行do_read,调用ReadTask读取数据,其输出即为读取到的Block数据。作为后续算子的输入。同理,对于MapRows与MapBatches两个LogicalOperator,也有相对应的plan_udf_map_op将其转换为PhysicalOperator。

plan_udf_map_op方法

def plan_udf_map_op(
    op: AbstractUDFMap,
    physical_children: List[PhysicalOperator],
    data_context: DataContext,
) -> MapOperator:
    """Get the corresponding physical operators DAG for AbstractUDFMap operators.

    Note this method only converts the given `op`, but not its input dependencies.
    See Planner.plan()for more details.
    """
    assert len(physical_children)== 1
    input_physical_dag = physical_children[0]

    compute = get_compute(op._compute)
    fn, init_fn = _parse_op_fn(op)

    if isinstance(op, MapBatches):
        transform_fn = _generate_transform_fn_for_map_batches(fn)
        map_transformer = _create_map_transformer_for_map_batches_op(
            transform_fn,
            op._batch_size,
            op._batch_format,
            op._zero_copy_batch,
            init_fn,
        )
    else:
        if isinstance(op, MapRows):
            transform_fn = _generate_transform_fn_for_map_rows(fn)
        elif isinstance(op, FlatMap):
            transform_fn = _generate_transform_fn_for_flat_map(fn)
        else:
            raise ValueError(f"Found unknown logical operator during planning: {op}")

        map_transformer = _create_map_transformer_for_row_based_map_op(
            transform_fn, init_fn
        )

    return MapOperator.create(
        map_transformer,
        input_physical_dag,
        data_context,
        name=op.name,
        target_max_block_size=None,
        compute_strategy=compute,
        min_rows_per_bundle=op._min_rows_per_bundled_input,
        ray_remote_args_fn=op._ray_remote_args_fn,
        ray_remote_args=op._ray_remote_args,
    )

不难看出,这里使用了map_transformer封装了用户自定义的转换函数,最终调用MapOperator.create,我们示例代码中传入的是一个类,因此最终转换为ActorPoolMapOperator。

执行PhysicalPlan

在上一节我们描述了如何通过Planner将LogicalOperator转换为PhysicalOperator,并对最重要的几种PhysicalOperator进行了介绍。我们的示例代码在转换后得到的DAG如下:

InputDataBuffer->TaskPoolMapOperator(ReadCSV)->ActorPoolMapOperator(CustomMapActor)->ActorPoolMapOperator(CustomMapBatchActor)->TaskPoolMapOperator(WriteCSV)

我们回顾下execute_to_legacy_block_list方法,在获取了PhysicalPlan后,接着便可使用StreamingExecutor对该DAG流式执行

def execute_to_legacy_block_list(
    executor: Executor,
    plan: ExecutionPlan,
    dataset_uuid: str,
    preserve_order: bool,
) -> BlockList:
    #Logical Operator到Physical Operator的转换在此处执行
    dag, stats = _get_execution_dag(
        executor,
        plan,
        preserve_order,
    )
    #对DAG流式执行
    bundles = executor.execute(dag, initial_stats=stats)
    
    block_list = _bundles_to_block_list(bundles)
    # Set the stats UUID after execution finishes.
    _set_stats_uuid_recursive(executor.get_stats(), dataset_uuid)
    return block_list

在介绍StreamingExecutor之前,先介绍下OpState类:

OpState

classOpState:
    """The execution state tracked for each PhysicalOperator.

    This tracks state to manage input and output buffering for StreamingExecutor and
    progress bars, which is separate from execution state internal to the operators.

    Note: we use the `deque` data structure here because it is thread-safe, enabling
    operator queues to be shared across threads.
    """

    def __init__(self, op: PhysicalOperator, inqueues: List[OpBufferQueue]):
        # Each inqueue is connected to another operator's outqueue.
        assert len(inqueues)== len(op.input_dependencies), (op, inqueues)
        self.inqueues: List[OpBufferQueue] = inqueues
        # The outqueue is connected to another operator's inqueue (they physically
        # share the same Python list reference).
        #
        # Note: thisqueue is also accessed concurrently from the consumer thread.
        # (in addition to the streaming executor thread). Hence, it must be a
        # thread-safe type such as `deque`.
        self.outqueue: OpBufferQueue = OpBufferQueue()
        self.op = op
        self.progress_bar = None
        self.num_completed_tasks = 0
        self.inputs_done_called = False
        # Tracks whether `input_done` is called for each input op.
        self.input_done_called = [False] * len(op.input_dependencies)
        # Used for StreamingExecutor to signal exception or end of execution
        self._finished: bool = False
        self._exception: Optional[Exception] = None
        self._scheduling_status = OpSchedulingStatus()

    def add_output(self, ref: RefBundle) -> None:
        """Move a bundle produced by the operator to its outqueue."""
        self.outqueue.append(ref)
        self.num_completed_tasks += 1
        if self.progress_bar:
            assert (
                ref.num_rows() is not None
            ), "RefBundle must have a valid number of rows"
            self.progress_bar.update(ref.num_rows(), self.op.num_output_rows_total())

    def dispatch_next_task(self) -> None:
        """Move a bundle from the operator inqueue to the operator itself."""
        for i, inqueue in enumerate(self.inqueues):
            ref = inqueue.pop()
            if ref is not None:
                self.op.add_input(ref, input_index=i)
                return
        assert False, "Nothing to dispatch"

    def get_output_blocking(self, output_split_idx: Optional[int]) -> RefBundle:
        """Get an item from this node's output queue, blocking as needed.

        Returns:
            The RefBundle from the output queue, or an error / end of stream indicator.

        Raises:
            StopIteration: If all outputs are already consumed.
            Exception: If there was an exception raised during execution.
        """
        while True:
            # Check if StreamingExecutor has caught an exception or is done execution.
            if self._exception is not None:
                raise self._exception
            elif self._finished andnot self.outqueue.has_next(output_split_idx):
                raise StopIteration()
            ref = self.outqueue.pop(output_split_idx)
            if ref is not None:
                return ref
            time.sleep(0.01)

OpState与PhysicalOperator是一一对应的关系,OpState追踪了每个PhysicalOperator的执行状态,管理了PhysicalOperator输入与输出缓存。我们在基础概念-流式执行模型中提到,每个Operator的输入队列被设置为其前置Operator的输出队列,而这个队列实际保存在OpState对象中。OpState提供了一些重要方法:


1. add_output:将输出添加到算子的outqueue中(等价于添加到下游算子的inqueue中);


2. dispatch_next_task:从inqueue中获取输入,并调用self.op.add_input方法,显然最终会执行到该op的_add_input_inner方法来消费数据;


3. get_output_blocking:尝试从outqueue中获取一个数据,如果当前还没有数据,且算子没有执行结束,该方法会阻塞。

StreamingExecutor.execute()

class StreamingExecutor(Executor, threading.Thread):
    """A streaming Dataset executor.

    This implementation executes Dataset DAGs in a fully streamed way. It runs
    by setting up the operator topology, and then routing blocks through operators in
    a way that maximizes throughput under resource constraints.
    """

    def execute(
        self, dag: PhysicalOperator, initial_stats: Optional[DatasetStats] = None
    ) -> Iterator[RefBundle]:
        """Executes the DAG using a streaming execution strategy.

        We take an event-loop approach to scheduling. We block on the next scheduling
        event using `ray.wait`, updating operator state and dispatching new tasks.
        """

        #......
        #......
        # Setup the streaming DAG topology and start the runner thread.
        self._topology, _ = build_streaming_topology(dag, self._options)

        #...
        #...

        self._has_op_completed = {op: False for op in self._topology}

        self._output_node: OpState = self._topology[dag]
        
        StatsManager.register_dataset_to_stats_actor(
            self._dataset_tag,
            self._get_operator_tags(),
        )
        for callback in get_execution_callbacks(self._data_context):
            callback.before_execution_starts(self)

        self.start()
        
        self._execution_started = True

        class StreamIterator(OutputIterator):
            def __init__(self, outer: Executor):
                self._outer = outer

            def get_next(self, output_split_idx: Optional[int] = None) -> RefBundle:
                try:
                    item = self._outer._output_node.get_output_blocking(
                        output_split_idx
                    )
                    if self._outer._global_info:
                        self._outer._global_info.update(
                            item.num_rows(), dag.num_output_rows_total()
                        )
                    return item
                # Needs to be BaseException to catch KeyboardInterrupt. Otherwise we
                # can leave dangling progress bars by skipping shutdown.
                except BaseException as e:
                    self._outer.shutdown(
                        e ifnot isinstance(e, StopIteration) else None
                    )
                    raise

            def __del__(self):
                self._outer.shutdown()

        return StreamIterator(self)

StreamingExecutor的execute方法中,首先调用了build_streaming_topology方法,构建并初始化流式执行拓扑。

def build_streaming_topology(
    dag: PhysicalOperator, options: ExecutionOptions
) -> Tuple[Topology, int]:
    """Instantiate the streaming operator state topology for the given DAG.

    This involves creating the operator state for each operator in the DAG,
    registering it with this class, and wiring up the inqueues/outqueues of
    dependent operator states.

    Args:
        dag: The operator DAG to instantiate.
        options: The execution options to use to start operators.

    Returns:
        The topology dict holding the streaming execution state.
        The number of progress bars initialized so far.
    """

    topology: Topology = {}

    # DFS walk to wire up operator states.
    def setup_state(op: PhysicalOperator) -> OpState:
        if op in topology:
            raise ValueError("An operator can only be present in a topology once.")

        # Wire up the input outqueues to this op's inqueues.
        inqueues = []
        for i, parent in enumerate(op.input_dependencies):
            parent_state = setup_state(parent)
            inqueues.append(parent_state.outqueue)

        # Create state.
        op_state = OpState(op, inqueues)
        topology[op] = op_state
        op.start(options)
        return op_state

    setup_state(dag)

    return (topology, i)

事实上,就是递归遍历DAG中的每一个PhysicalOperator,创建其对应的OpState,并将该OpState的inqueue设置为上游节点的outqueue,最后执行op.start()方法初始化Operator。最终返回的topology中,每一个OpState都通过inqueue、outqueue与上下游算子建立了连接,相当于构建了出了流式执行的一个拓扑图。


在构建完拓扑后,execute方法执行了self.start()方法,以启动run()方法中定义的流式执行调度循环,


最后封装并返回一个迭代器,该迭代器的get_next方法调用了DAG中最后一个节点(示例中是Write算子)的get_output_blocking方法,遍历该迭代器可以得到数据集的最终执行结果(Write算子的输出一般是一些统计信息)。在execute_to_legacy_block_list方法中,调用了block_list = _bundles_to_block_list(bundles),迭代该Iterator,并返回最后的输出结果block_list。

下面重点看一下run()方法是如何循环调度整个DAG的执行的:

StreamingExecutor.run()

def run(self):
    """Run the control loop in a helper thread.

    Results are returned via the output node's outqueue.
    """
    try:
        # Run scheduling loop until complete.
        while True:
            t_start = time.process_time()
            # use process_time to avoid timing ray.wait in _scheduling_loop_step
            continue_sched = self._scheduling_loop_step(self._topology)
            if self._initial_stats:
                self._initial_stats.streaming_exec_schedule_s.add(
                    time.process_time() - t_start
                )
            for callback in get_execution_callbacks(self._data_context):
                callback.on_execution_step(self)
            ifnot continue_sched or self._shutdown:
                break
    except Exception as e:
        # Propagate it to the result iterator.
        self._output_node.mark_finished(e)
    finally:
        # Signal end of results.
        self._output_node.mark_finished()


def _scheduling_loop_step(self, topology: Topology) -> bool:
    """Run one step of the scheduling loop.

    This runs a few general phases:
        1. Waiting for the next task completion using `ray.wait()`.
        2. Pulling completed refs into operator outqueues.
        3. Selecting and dispatching new inputs to operators.

    Returns:
        True if we should continue running the scheduling loop.
    """
    self._resource_manager.update_usages()
    # Note: calling process_completed_tasks() is expensive since it incurs
    # ray.wait() overhead, so make sure to allow multiple dispatch per call for
    # greater parallelism.
    
    #调用ray.wait()来等待DAG中的任务执行完成并进行处理
    num_errored_blocks = process_completed_tasks(
        topology,
        self._resource_manager,
        self._max_errored_blocks,
    )

    #根据策略选择一个Operator来执行
    op = select_operator_to_run(
        topology,
        self._resource_manager,
        self._backpressure_policies,
        self._autoscaler,
        ensure_at_least_one_running=self._consumer_idling(),
    )

    i = 0
    while op is not None:
        #调用OpState的dispatch_next_task,从inqueue中获取输入给Operator处理
        topology[op].dispatch_next_task()
        self._resource_manager.update_usages()
        #循环选择下一个Operator,直到满足一些条件(例如所有Operator都在满负荷运行)
        op = select_operator_to_run(
            topology,
            self._resource_manager,
            self._backpressure_policies,
            self._autoscaler,
            ensure_at_least_one_running=self._consumer_idling(),
        )

    # Log metrics of newly completed operators.
    for op in topology:
        if op.completed() andnot self._has_op_completed[op]:
            log_str = (
                f"Operator {op} completed. "
                f"Operator Metrics:\n{op._metrics.as_dict()}"
            )
            logger.debug(log_str)
            self._has_op_completed[op] = True

    # 持续处理,直到所有Operator都处理完成
    returnnot all(op.completed() for op in topology)

run方法中实际上循环调用_scheduling_loop_step方法,执行单次调度循环,_scheduling_loop_step方法的主要逻辑为2步:


1. 调用process_completed_tasks,等待下一个任务完成并处理每个Operator的输出;


2. 根据策略选择一个Operator,调用OpState.dispatch_next_task()方法,从inqueue中获取输入交由Operator处理(前面已经介绍过最终调用的是PhysicalOperator._add_input_inner)。


在process_complete_task中,实际上对前面介绍的的DataOpTask以及MetadataOpTask做了处理,具体逻辑如下:

process_complete_task

def process_completed_tasks(
    topology: Topology,
    resource_manager: ResourceManager,
    max_errored_blocks: int,
) -> int:
    """Process any newly completed tasks. To update operator
    states, call `update_operator_states()` afterwards.

    Args:
        topology: The toplogy of operators.
        backpressure_policies: The backpressure policies to use.
        max_errored_blocks: Max number of errored blocks to allow,
            unlimited if negative.
    Returns:
        The number of errored blocks.
    """

    # All active tasks, keyed by their waitables.
    active_tasks: Dict[Waitable, Tuple[OpState, OpTask]] = {}
    #获取所有Operator正在运行的DataOpTask与MetadataOpTask
    for op, state in topology.items():
        for task in op.get_active_tasks():
            #获取所有的waitable,对于DataOpTask来说,就是streaming_gen
            active_tasks[task.get_waitable()] = (state, task)

    # Process completed Ray tasks and notify operators.
    num_errored_blocks = 0
    if active_tasks:
        ready, _ = ray.wait(
            list(active_tasks.keys()),
            num_returns=len(active_tasks),
            fetch_local=False,
            timeout=0.1,
        )

        # Organize tasks by the operator they belong to, and sort them by task index.
        # So that we'll process them in a deterministic order.
        # This is because OpResourceAllocator may limit the number of blocks to read
        # per operator. In this case, we want to have fewer tasks finish quickly and
        # yield resources, instead of having all tasks output blocks together.
        ready_tasks_by_op = defaultdict(list)
        for ref in ready:
            state, task = active_tasks[ref]
            ready_tasks_by_op[state].append(task)

        for state, ready_tasks in ready_tasks_by_op.items():
            ready_tasks = sorted(ready_tasks, key=lambda t: t.task_index())
            for task in ready_tasks:
                if isinstance(task, DataOpTask):
                    try:
                        #回调DataOpTask的on_data_ready方法
                        bytes_read = task.on_data_ready(
                            max_bytes_to_read_per_op.get(state, None)
                        )
                        if state in max_bytes_to_read_per_op:
                            max_bytes_to_read_per_op[state] -= bytes_read
                    except Exception as e:
                        num_errored_blocks += 1
                        should_ignore = (
                            max_errored_blocks < 0
                            or max_errored_blocks >= num_errored_blocks
                        )
                        error_message = (
                            "An exception was raised from a task of "
                            f'operator"{state.op.name}".'
                        )
                        if should_ignore:
                            remaining = (
                                max_errored_blocks - num_errored_blocks
                                if max_errored_blocks >= 0
                                else"unlimited"
                            )
                            error_message += (
                                " Ignoring this exception with remaining"
                                f" max_errored_blocks={remaining}."
                            )
                            logger.error(error_message, exc_info=e)
                        else:
                            error_message += (
                                " Dataset execution will now abort."
                                " To ignore this exception and continue, set"
                                " DataContext.max_errored_blocks."
                            )
                            logger.error(error_message)
                            raise e from None
                else:
                    assert isinstance(task, MetadataOpTask)
                    task.on_task_finished()

    # Pull any operator outputs into the streaming op state.
    for op, op_state in topology.items():
        while op.has_next():
            #将所有Operator当前的输出移动到outqueue中
            op_state.add_output(op.get_next())

    return num_errored_blocks

1. 首先获取所有PhysicalOperator正在运行的DataOpTask与MetadataOpTask,并得到对应的所有ObjectRef与ObjectRefGenerator。

2. 调用ray.wait(),等待ObjectRef执行完成或ObjectRefGenerator产生下一个输出。

3. wait会返回所有ready的ObjectRef与ObjectRefGenerator,因此可以获得当前输出Ready的DataOpTask与MetadataOpTask列表。

4. 对于Ready的DataOpTask,回调on_data_ready方法。

5. 对于Ready的MetadataOpTask,回调on_task_finished方法。

6. 遍历所有的Operator,如果当前有输出,则将输出移动到对应的outqueue中。

在每一个_scheduling_loop_step,Ray Data等待下一个输出结果的产生,回调结果,并将每个Operator的输出移动到outqueue中,接着选择一个Operator,执行dispatch_next_task方法从输入队列中获取数据进行处理,构建出了从inqueue中读取数据->启动ray remote task分布式处理->输出到outqueue的流式循环。


总结

虽然Ray Data提供的接口较为简洁,但其内部实现较为复杂且精巧,具体步骤可以分为:

1. 构建LogicalPlan;

2. 将LogicalPlan转换为PhysicalPlan;

3. 使用StreamingExecutor执行PhysicalPlan,PhysicalPlan中的每个PhysicalOperator并行执行,启动ray remote task分布式处理输入,StreamingExecutor在每次调度循环中将PhysicalOperator已有的结果移动到输出队列中,并选择一些PhysicalOperator向其增加更多输入。


虽然Ray在AI的多个领域展现出了较大的潜力,有望成为分布式计算的新范式,但目前网上分析Ray Data内部原理的文章较少,问题排查以及二次开发都有一定的难度,本文从简单的示例代码出发,系统性地分析了从Logical Plan构建到PhysicalPlan流式执行的整个过程,希望对读者有一定的帮助。



来源  |  阿里云开发者公众号

作者  |  琮安

相关文章
|
18天前
|
Python
掌握Python装饰器:轻松统计函数执行时间
掌握Python装饰器:轻松统计函数执行时间
151 76
|
8月前
|
网络协议 Java Linux
如何解决“连接超时”的问题
当遇到“连接超时”问题时,可尝试以下方法:检查网络连接、重启路由器、清除浏览器缓存、关闭防火墙或杀毒软件、更改DNS服务器等。若问题依旧,建议联系网络服务提供商或技术人员寻求帮助。
2491 6
|
4月前
|
机器学习/深度学习 分布式计算 Kubernetes
30分钟拉起Ray集群并部署Stable Diffusion模型服务
Ray 是一个支持模型训练、测试以及部署的开源平台,由加州大学伯克利分校的 RISELab 开发。它旨在简化大规模机器学习、强化学习和分布式计算任务的开发与部署。阿里云计算巢实现了Ray Cluster的一键部署,帮助用户能够便捷地使用分布式集群训练和测试自己的模型。
|
8月前
|
存储 人工智能 数据挖掘
通义灵码的隐私保护机制
在数字化时代,用户隐私保护至关重要。通义灵码作为先进的AI代码生成工具,通过数据加密、匿名化处理及符合GDPR与CCPA等隐私法规的代码生成,有效保护用户隐私,降低法律风险,增强用户信任,促进业务发展。
通义灵码的隐私保护机制
|
安全 Linux 网络安全
Centos7安装Docker
Centos7安装Docker
915 0
|
数据采集 自然语言处理 开发工具
fasttext实现文本分类
fasttext实现文本分类
437 0
|
存储 缓存 分布式计算
Gluten + Celeborn: 让 Native Spark 拥抱 Cloud Native
本篇文章介绍了 Gluten 项目的背景和目标,以及它如何解决基于 Apache Spark 的数据负载场景中的 CPU 计算瓶颈。此外,还详细介绍了 Gluten 与 Celeborn 的集成。Celeborn 采用了 Push Shuffle 的设计,通过远端存储、数据重组、内存缓存、多副本等设计,不仅进一步提升 Gluten Shuffle 的性能和稳定性,还使得 Gluten 拥有更好的弹性,从而更好的拥抱云原生。
2663 4
Gluten + Celeborn: 让 Native Spark 拥抱 Cloud Native