MapReduce 原理与实践

简介: MapReduce 原理与实践

MapReduce 简介


MapReduce 核心思想

Hadoop MapReduce 是一个编程框架,它可以轻松地编写应用程序,以可靠的、容错的方式处理大量的数据(数千个节点)。


正如其名,MapReduce 的工作模式主要分为 Map 阶段和 Reduce 阶段。


一个 MapReduce 任务(Job)通常将输入的数据集分割成独立的块,这些块被 map 任务以完全并行的方式处理。框架对映射(map)的输出进行排序,然后将其输入到 reduce 任务中。通常,作业的输入和输出都存储在文件系统中。框架负责调度任务、监视任务并重新执行失败的任务。


在 Hadoop 集群中,计算节点一般和存储节点相同,即 MapReduce 框架和 Hadoop 分布式文件系统均运行在同一组节点上。这种配置允许框架有效地调度已经存在数据的节点上的作业,使得跨集群的带宽具有较高的聚合度,能够有效利用资源。


94621e8f510268ea6945781524c46888.png


MapReduce 框架由单个主节点(Master)的 ResourceManager、每个从节点(Slave) NodeManager 和每个应用程序的 MRAppMaster 组成。


在处于最低程度时,应用程序通过实现适当的接口以及抽象类来指定输入输出的位置、提供 map 和 reduce 函数。在实现过程中还会涉及到作业配置等参数的设置。


在编程框架完善并打包之后,Hadoop 的作业客户端(job client)可以将作业(一般是 jar 包或者可执行文件)和配置项提交给 ResourceManager 。后者负责将作业代码和配置项分发给从节点(Slave),之后负责作业的调度和监视,同时也向作业客户端提供状态和诊断信息。


fcd08ccbb051d88f22d5bbd977672de0.png


  • Client Service: 应用提交、终止、输出信息(应用、队列、集群等的状态信息)
  • Adaminstration Service: 队列、节点、Client 权限管理
  • ApplicationMasterService: 注册、终止 ApplicationMaster, 获取 ApplicationMaster 的资源申请或取消的请求,并将其异步地传给 Scheduler, 单线程处理
  • ApplicationMaster Liveliness Monitor: 接收 ApplicationMaster 的心跳消息,如果某个 ApplicationMaster 在一定时间内没有发送心跳,则被任务失效,其资源将会被回收,然后 ResourceManager 会重新分配一个 ApplicationMaster 运行该应用(默认尝试 2 次)
  • Resource Tracker Service: 注册节点, 接收各注册节点的心跳消息
  • NodeManagers Liveliness Monitor: 监控每个节点的心跳消息,如果长时间没有收到心跳消息,则认为该节点无效, 同时所有在该节点上的 Container 都标记成无效,也不会调度任务到该节点运行
  • ApplicationManager: 管理应用程序,记录和管理已完成的应用
  • ApplicationMaster Launcher: 一个应用提交后,负责与 NodeManager 交互,分配 Container 并加载 ApplicationMaster,也负责终止或销毁
  • YarnScheduler: 资源调度分配, 有 FIFO(with Priority),Fair,Capacity 方式
  • ContainerAllocationExpirer: 管理已分配但没有启用的 Container,超过一定时间则将其回收

MapReduce 编程框架


你可能想知道 MapReduce 究竟处理什么形式的数据。事实上, MapReduce 框架只对 <key, value> 形式的键值对进行处理。


换言之,该框架会将任务的输入当成一组 <key, value> 键值对,最后也会生成一组 <key, value> 键值对作为结果。其中的 key 和 value 可以根据具体问题将其理解为不同的类型。


key 和 value 的类必须由框架来完成序列化,因此我们需要做的就是实现其中的可写接口(Writable)。此外,对于其中的一些关键类还必须实现 WritableComparable 接口,以便于框架对其进行排序。在讲到具体的实现时我们会提到上述知识点。


综上所述,一个 MapReduce 作业从输入到输出的过程中,经历了以下过程。


首先是一些说明:


  • <k1, v1>:输入的原始数据,即使不是 K/V 形式的,你也可以在代码中对其进行格式上的调整。
  • <k3, v3>:输出的计算结果。

因此,这个过程主要经历了以下几步:


<k1, v1> -> Map -> <k2, v2> -> Combine -> <k2, v2> -> Reduce -> <k3, v3>


下面我们将对这个过程中涉及到的关键概念进行讲解。同时,也请阅读 Google 发表的关于 MapReduce 原理的论文。这篇文章对 MapReduce 的工作原理进行了深入的讲解,可以帮助理解各个流程之间的关系。


扩展链接:


  • MapReduce: Simplified Data Processing on Large Clusters


Mapper


Map 阶段是一个独立的任务,由 Mapper 负责将输入的键值对(以下简称为“输入对”)映射到一组中间键值对。转换后的中间键值对(以下简称为“中间对”)不需要与原始的输入有相同的类型。给定的输入键值对可以映射到零或多个输出键值对(以下简称为“输出对”)。


例如,<A, B> 这个键值对可以在 Map 阶段分别映射为 <A, 1> 和 <B, 1> 两个键值对。


总体而言,Mapper 的实现过程主要是向 Job.setmapperclass(Class) 方法传递具体的作业代码。输入的数据被划分为等长的小数据块,称为输入分片(InputSplit)。然后框架为该作业中 InputSplit 所包含的每个键值对调用 map(WritableComparable,Writable, Context) 方法。


当然,我们在应用程序的开发中可以重写 cleanup(Context) 方法来执行一些清理工作。


刚刚提到输出对不需要与输入对具有相同的类型,给定的输入对可以映射到零或多个输出对。输出对将会在 context.write(WritableComparable, Writable) 方法调用时被收集在一起。


此外,应用程序可以使用 Counter (计数器)来报告一些统计数据。


所有与给定输出键有关的中间值都将被框架分组,并传递给 Reducer 以产生最终的输出结果。我们可以通过 Job.setGroupingComparatorClass(Class) 方法来指定 Comparator (比较器)以控制分组过程。这稍微有些复杂,在初次学习时可以仅作了解。


随后,Mapper 的输出将在 Reducer 中进行排序和分区。**分区的总数量与作业的 Reducer 任务数相同。**用户可以通过实现自定义 Partitioner (分区器)来控制哪些键(及其相关的键值对)由哪些 Reducer 处理。


用户可以通过 Job.setCombinerClass(Class) 方法来指定 Combiner (组合器)。这个 Combiner 用于在本地执行中间输出的聚合操作,这将有利于减少从 Mapper 传输到 Reducer 的数据量。本地执行是指在单个节点范围内进行处理,减少数据量是指跨节点的数据传输量变少。


在上述过程中产生的中间输出总是以 (key-len, key, value-len, value) 的格式存储,并且这些中间输出是已经排过序的。我们可以在应用程序中通过设置 Configuration 对象的参数来利用 CompressionCodec 工具控制是否要压缩中间输出,以及如何压缩中间输出。


面试时容易问到一个问题是如何计算 Map 的数量:一个简单的回答是其数量通常由输入数据的总大小决定,即输入文件块的总数。


对于 Map 来说,正常情况下的并行计算量是每个节点大约 10-100 个 Map 。对于某些计算量较小的 Map 任务,这个数量可以设置为 300 个。任务的设置过程也是需要耗费时间的,所以在海量数据背景下,最好让 Map 的处理时间不低于一分钟,以最大化地利用集群资源。


因此,如果预计输入数据有 10TB ,而块大小设置为 128MB ,那默认情况下将会产生 82000 个 map 。如果我们通过 Configuration.set(MRJobConfig。NUM_MAPS, int) 设置了更高的值,则这个数量将会根据设置再次改变。


Reducer


Map 过程(包括了 Combine)之后就是 Reduce 过程。Reduce 过程利用 Reducer 共享键来减少中间值集合的规模。


在作业中,reducer 的数量可以通过 job.setNumReduceTasks(int) 方法设置。


具体而言,Reducer 在实现时需要我们通过 Job.setReducerClass(Class) 方法来传递具体的作业代码,并可以通过重写覆盖该方法来完成初始化。框架将对分组后的输入中的每个 <key, (含有多个value的列表)> 调用 reduce(WritableComparable, Iterable<Writable>, Context) 方法。同样的,我们可以在应用程序中重写 cleanup(Context) 方法来执行清理工作。


Reducer 有三个主要阶段,分别是:shuffle 、 sort 和 reduce 。


  • Shuffle:也称之为混洗,mapper 的排序输出是 Reducer 的输入。在这个阶段,MapReduce 框架通过 HTTP 协议获取所有 Mapper 输出的相关分区。
  • Sort:排序。由于不同的 Mapper 可能输出相同的键,因此在此阶段,框架将对 Reducer 的输入通过键进行分组。需要特别说明的是,Shuffle(混洗)和 Sort (排序)阶段可能会同时发生。当获取到 map 的输出时,这两个阶段就会被合并进行。
  • Reduce:规约。在这个阶段将会对每一个 <key, (含有多个value的列表)> 记录都调用 reduce(WritableComparable, Iterable<Writable>, Context) 方法进行处理。作为最后一个阶段,Reduce 任务的输出通常都通过 Context.write(WritableComparable, Writable) 方法写入到文件系统(无论是 HDFS 还是本地文件系统)。在这个过程中,Reducer 的输出是无序的,我们也可以使用 Counter 来统计一些数据。

Partitioner

Partitioner 用于对键的空间进行分区。


具体来说,它控制了中间 Map 输出的键的分区,其目的是为了优化并减少计算量。键(或键的子集)用于派生分区,通常由散列函数派生。分区的总数与作业的 Reduce 任务数相同。HashPartitioner 是 MapReduce 使用的默认分区程序。


Counter

Counter (计数器)是 MapReduce 应用程序报告统计数据的一种工具。在 Mapper 和 Reducer 的具体实现中,可以利用 Counter 来报告统计信息。


除了需要我们自己实现的部分,MapReduce 也自带了非常实用的 Mapper 、 Reducer 和 Partitioner 库,你可以在官方文档的 Package Summary 章节了解他们的具体用法。

WordCount 实验

在了解了 MapReduce 的工作原理之后,我们将再次使用 WordCount 来帮助理解它的工作过程。只是,这一次将由我们自己编写 WordCount 的代码来实现相关的逻辑。


MapReduce 作业的编译与执行


准备

准备输入文件

在集群启动完成后,我们上传一些数据作为稍后词频统计的“原料”。


请在终端中输入以下命令来完成上传,这一次上传的文件是 Linux 系统 /etc 目录下的 protocols 协议文件。

hdfs dfs -put /etc/protocols /user/hadoop

创建文件

数据准备好之后,在终端中输入以下命令,新建一个名为 WordCount.java 的文件。


# 请在 hadoop 用户的主目录新建该文件
cd /home/hadoop
touch WordCount.java

使用 Vim 编辑器打开并编辑该文件。你也可以使用其他的编辑器。

vim /home/hadoop/WordCount.java

在 WordCount.java 文件内先输入以下代码,完成整个 WordCount 程序的大体框架。


//  以下是本程序会用到的相关的包,先引入
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
// 注意类名要与文件名的大小写一致
public class WordCount {
    // 自定义的 TokenizerMapper 类将继承自 Mapper 类,以实现相关的接口和方法
    // 在 Map 阶段将会执行其中的作业逻辑
    public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{
  }
    // 自定义的 IntSumReducer 类将继承自 Reducer 类,以实现相关的接口和方法
    // 在 Reduce 阶段将会执行其中的作业逻辑
    public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
  }
    // main 方法是整个程序的入口,在这里涉及到作业(Job)的各项设置
    public static void main(String[] args) throws Exception {
  }
}


编写 Mapper

map 方法是 Mapper 的具体实现。在下面这段代码中,Mappter 通过 map 方法一次只处理读入的一行数据。数据是由指定的 TextInputFormat 提供的。然后,它通过 StringTokenizer 将每一行的内容分割为由空格分隔的单词,形成 <word, 1> 形式的键值对。即每个单词作为键,对应一个计数的值。

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
    }
}


举个例子,如果我们在文件中读入了一行数据:


AAA BBB uiuing AAA

那么在 map 方法的处理后,将形成如下所示的键值对:


<AAA, 1>
<BBB, 1>
<uiuing, 1>
<AAA, 1>


理解了 map 的工作机制之后,现在我们来完善 Mapper 类的代码。


请在 /home/hadoop/WordCount.java 文件中将 TokenizerMapper 的代码按照如下的样例补充完整。相关的讲解将以注释给出。


public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
    // 在 MapReduce 框架中,基本数据类型都封装成了 Writable 类型
    // 因此 int 类型对应于 IntWritable 类型,在初始化时将其声明为静态常量是为了方便地使用 1 的值
    private final static IntWritable one = new IntWritable(1);
    // 声明一个 Text 类型的私有成员变量 word
    private Text word = new Text();
    // map 方法的写法是标准格式,可以参考官方文档理解各个参数的含义
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      // 从 value 中读入数据并按照空格分隔
      StringTokenizer itr = new StringTokenizer(value.toString());
      // 将每个分隔形成的单词组装成键值对
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }


编写 Reduce

reduce 方法是 Reducer 的具体实现。在 WordCount 的例子中,通过如下所示的 reduce 方法可以实现汇总键值对中的值,从而达到统计每个键出现次数的目的。这也是词频统计的核心所在。


public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }


上一步 map 阶段我们得到了如下所示的键值对:


<AAA, 1>
<BBB, 1>
<uiuing, 1>
<AAA, 1>


那么在 reduce 阶段,这些键值对中键相同的值将会被累加,聚合成如下所示的键值对:


<AAA, 2>
<BBB, 1>
<uiuing, 1>


理解了以上逻辑之后,请在 /home/hadoop/WordCount.java 文件中将 IntSumReducer 类的代码补充完整:

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    // 声明一个 IntWritable 类型值用于存放累加结果
    private IntWritable result = new IntWritable();
    // reduce 方法的写法也是参考官方文档进行的,相关的参数可以查阅官方文档进行理解
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      // 进行值的累加操作
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      // 将 int 基本类型通过 set 方法赋予到结果中
      result.set(sum);
      // 写入上下文中进行保存
      context.write(key, result);
    }
  }


编写任务驱动

我们知道任何编程框架都会有一个主入口,就像 Java 程序以 main 方法作为主入口一样。程序执行的第一步就是执行 main 方法中的逻辑。


因此,我们将 main 方法理解为任务驱动,即设置和发起任务的角色。


此外,在 WordCount 中,我们还可以复用 Reducer 的实现逻辑,并且指定一个 Combiner (组合器)。因此,在对键进行排序之后,每个 map 的输出都通过本地的 Combiner (根据作业配置与还原器相同)传递给本地的聚合过程,这样做可以减少后续 reduce 过程跨节点的数据传输量。


请在 /home/hadoop/WordCount.java 文件中将 main 方法的逻辑代码补充完整。相关的介绍将以注释形式给出。


// throws Exception 可以合理地抛出 MapReduce 任务中产生的异常,便于我们进行调试
public static void main(String[] args) throws Exception {
    // 程序的第一步是声明并初始化 Configuration 对象用于设置作业的相关运行参数
    Configuration conf = new Configuration();
    // 设置作业的配置参数和名称
    Job job = Job.getInstance(conf, "word count");
    // 将 WordCount 类作为运行的入口
    job.setJarByClass(WordCount.class);
    // 通过 setMapperClass 方法告诉集群应当在 map 阶段执行哪些逻辑
    job.setMapperClass(TokenizerMapper.class);
    // 通过 setCombinerClass 方法告诉集群应当在 combine 阶段执行哪些逻辑,此处复用了 Reducer 的逻辑,用于在本地进行部分结果的累加
    // 这个步骤不是必须的
    job.setCombinerClass(IntSumReducer.class);
    // 通过 setReducerClass 方法告诉集群应当在 combine 阶段执行哪些逻辑
    job.setReducerClass(IntSumReducer.class);
    // 设置输出结果中键的数据类型
    job.setOutputKeyClass(Text.class);
    // 设置输出结果中值的数据类型
    job.setOutputValueClass(IntWritable.class);
    // 利用 main 函数的第 1 个输入参数获取输入数据的路径
    FileInputFormat.addInputPath(job, new Path(args[0]));
    // 利用 main 函数的第 2 个输入参数获取输出数据的路径
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    // job.waitForCompletion(true) 相当于开启执行任务的开关,执行到此处时一个 MapReduce 应用才会真正地开始计算
    // 使用 System.exit 方法来告知程序运行的状态
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }


MapReduce 作业的编译和执行

编译 Jar 包

完成所有代码的编写之后,我们需要将 WordCount.java 文件编译为 Jar 包。


在终端中输入以下命令来完成编译:

cd ~
# 导入生效
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
# 编译
hadoop com.sun.tools.javac.Main /home/hadoop/WordCount.java
# 打包
jar cf wc.jar WordCount*.class


执行作业

Jar 包编译完成之后,就可以开始执行作业了。使用 hadoop jar 命令,并附带上 jar 包的路径、程序主类名称和输入输出文件的路径。


请在终端中输入以下命令来提交作业执行。

hadoop jar /home/hadoop/wc.jar WordCount /user/hadoop/protocols /user/hadoop/wordcount_output

观察作业进度

在 MapReduce 的工作过程中,我们可以在终端中的输出看到作业的进度:

a822c73c766926888b5e7987f3cc95be.png



如果看到 map 100% reduce 100% 的提示之后,则表明计算已经完成,在输出的提示中还包含了 Counter 、 MapReduce 框架以及 Shuffle 的一些统计信息,这些信息可以帮助我们分析作业的执行质量,也可以定位问题。


查看结果

最后,我们来查看计算的结果。


在终端中输入以下命令来查看计算结果:

hdfs dfs -cat /user/hadoop/wordcount_output/part-r-*

可以看到当前文件中出现最多的单词(字符)是 #:

1.png


作为改进,你可以在框架实现过程中通过正则表达式来过滤掉非单词的字符。


学习 MapReduce 方法

尽管前文较为详细地介绍了 MapReduce 的工作原理,并且以 WordCount 为例实现了一个 MapReduce 应用程序,但对初学者而言,如何将这样的编程思想应用在解决实际问题之中呢?


先不要忙着上手写自己的代码,学习一个编程框架最好的途径就是熟读文档和样例代码。


目前 Hadoop 在不同版本的官方文档(Documentation)中均提供了 MapReduce 框架的介绍,

cd ~
wget https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1-src.tar.gz
tar -zxvf hadoop-2.6.1-src.tar.gz
tree hadoop-2.6.1-src/hadoop-mapreduce-project/hadoop-mapreduce-examples/

2.png


这其中既有我们学习过的 WordCount 案例,也还有一些其他的示例程序。下面选择其中一些进行介绍:


  • **AggregateWordCount:**基于聚合的 MapReduce 程序,用于对输入文件中的单词进行计数。
  • **AggregateWordHistogram:**基于聚合的计算输入文件中单词的直方图的 MapReduce 程序。
  • **BaileyBorweinPlouffe:**使用 Bailey-Borwein-Plouffe 算法来计算精确的圆周率数字的 MapReduce 程序。
  • **Grep:**在输入中用于统计符合正则表达式匹配数量的 MapReduce 程序。
  • **Join:**对已排序并处于相同分区的数据集进行连接的 MapReduce 程序。
  • **MultiFileWordCount:**从多个文件中计算单词词频的 MapReduce 程序。
  • **Pentomino:**模拟瓷砖铺设的 MapReduce 程序,可用来寻找解决方法。
  • **Pi:**使用拟蒙特卡罗方法估计圆周率的 MapReduce 程序。
  • **RandomTextWriter:**该程序可向每个节点写入 10GB 的随机文本数据。
  • **RandomWriter:**该程序可向每个节点写入 10GB 的随机数据。
  • **SecondarySort:**一个二级排序的示例。
  • **Sort:**对数据进行排序的示例。
  • **Sudoku:**求解数独问题的程序。
  • **TeraGen:**为 Terasort 生成数据。
  • **TeraSort:**运行 Terasort 测试。这是一个非常出名的测试,用于验证分布式计算框架对 1TB 数据排序的速度,你可以访问 Sort Benchmark 了解更多信息。
  • **TeraValidate:**查看 Terasort 的结果。
  • **WordMean:**用于计算输入文件中单词的平均长度。
  • **WordMedian:**计算输入文件中单词的中位数长度的 MapReduce 程序。
  • **WordStandardDeviation:**一个 MapReduce 程序,它计算输入文件中单词长度的标准偏差。

在以上示例代码文件中,每一个代码文件都详尽地注释了其工作原理。通过阅读计算框架或示例程序的源代码,将能够在短时间内迅速加深理解。你可以选择以上任意感兴趣的源代码进行阅读,此处不再详细展开。



目录
相关文章
|
2月前
|
SQL 分布式计算 关系型数据库
阿里云E-MapReduce Trino专属集群外连引擎及权限控制踩坑实践
本文以云厂商售后技术支持的角度,从客户的需求出发,对于阿里云EMR-Trino集群的选型,外连多引擎的场景、Ldap以及Kerberos鉴权等问题进行了简要的实践和记录,模拟客户已有的业务场景,满足客户需求的同时对过程中的问题点进行解决、记录和分析,包括但不限于Mysql、ODPS、Hive connector的配置,Hive、Delta及Hudi等不同表格式读取的兼容,aws s3、阿里云 oss协议访问异常的解决等。
|
2月前
|
存储 分布式计算 关系型数据库
bigdata-08-MapReduce原理到实战
bigdata-08-MapReduce原理到实战
27 0
|
6月前
|
机器学习/深度学习 分布式计算 大数据
大数据 - MapReduce:从原理到实战的全面指南
大数据 - MapReduce:从原理到实战的全面指南
380 0
|
9月前
|
分布式计算 Hadoop 数据处理
Hadoop基础学习---6、MapReduce框架原理(二)
Hadoop基础学习---6、MapReduce框架原理(二)
|
9月前
|
存储 分布式计算 Hadoop
Hadoop基础学习---6、MapReduce框架原理(一)
Hadoop基础学习---6、MapReduce框架原理(一)
|
10月前
|
分布式计算
MapReduce 的原理、流程【重要】
MapReduce 的原理、流程【重要】
87 0
|
10月前
|
机器学习/深度学习 分布式计算 监控
Hadoop生态系统中的数据处理技术:MapReduce的原理与应用
Hadoop生态系统中的数据处理技术:MapReduce的原理与应用
|
消息中间件 分布式计算 关系型数据库
Kafka-HBase-MapReduce-Mysql 连接实践 通话记录
Kafka-HBase-MapReduce-Mysql 连接实践 通话记录
88 0
Kafka-HBase-MapReduce-Mysql 连接实践 通话记录
|
分布式计算 Java
Mapreduce执行机制之提交任务和切片原理
Mapreduce执行机制之提交任务和切片原理
|
存储 分布式计算 关系型数据库
大数据基础-MapReduce原理及核心编程思想
MapReduce原理及核心编程思想
163 0