首页> 标签> 并行计算
"并行计算"
共 1903 条结果
全部 问答 文章 公开课 课程 电子书 技术圈 体验
大数据计算的基石——MapReduce
摘要MapReduce 是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个 Map 函数处理一个基于 key/value pair 的数据集合,输出中间的基于 key/value pair 的数据集合;然后再创建一个 Reduce 函数用来合并所有的具有相同中间 key 值的中间 value 值。现实世界中有很多满足上述处理模型的例子,本论文将详细描述这个模型。MapReduce 架构的程序能够在大量的普通配置的计算机上实现并行化处理。这个系统在运行时只关心:如何分割输入数据,在大量计算机组成的集群上的调度,集群中计算机的错误处理,管理集群中计算机之间必要的通信。采用 MapReduce 架构可以使那些没有并行计算和分布式处理系统开发经验的程序员有效利用分布式系统的丰富资源。我们的 MapReduce 实现运行在规模可以灵活调整的由普通机器组成的集群上:一个典型的 MapReduce计算往往由几千台机器组成、处理以 TB 计算的数据。程序员发现这个系统非常好用:已经实现了数以百计的 MapReduce 程序,在 Google 的集群上,每天都有 1000 多个 MapReduce 程序在执行。1 介绍在过去的 5 年里,包括本文作者在内的 Google 的很多程序员,为了处理海量的原始数据,已经实现了数以百计的、专用的计算方法。这些计算方法用来处理大量的原始数据,比如,文档抓取(类似网络爬虫的程序)、Web 请求日志等等;也为了计算处理各种类型的衍生数据,比如倒排索引、Web 文档的图结构的各种表示形势、每台主机上网络爬虫抓取的页面数量的汇总、每天被请求的最多的查询的集合等等。大多数这样的数据处理运算在概念上很容易理解。然而由于输入的数据量巨大,因此要想在可接受的时间内完成运算,只有将这些计算分布在成百上千的主机上。如何处理并行计算、如何分发数据、如何处理错误?所有这些问题综合在一起,需要大量的代码处理,因此也使得原本简单的运算变得难以处理。为了解决上述复杂的问题,我们设计一个新的抽象模型,使用这个抽象模型,我们只要表述我们想要执行的简单运算即可,而不必关心并行计算、容错、数据分布、负载均衡等复杂的细节,这些问题都被封装在了一个库里面。设计这个抽象模型的灵感来自 Lisp 和许多其他函数式语言的 Map 和 Reduce 的原语。我们意识到我们大多数的运算都包含这样的操作:在输入数据的“逻辑”记录上应用 Map 操作得出一个中间 key/value pair 集合,然后在所有具有相同 key 值的 value 值上应用 Reduce 操作,从而达到合并中间的数据,得到一个想要的结果的目的。使用 MapReduce 模型,再结合用户实现的 Map 和 Reduce 函数,我们就可以非常容易的实现大规模并行化计算;通过 MapReduce 模型自带的“再次执行”(re-execution)功能,也提供了初级的容灾实现方案。这个工作(实现一个 MapReduce 框架模型)的主要贡献是通过简单的接口来实现自动的并行化和大规模的分布式计算,通过使用 MapReduce 模型接口实现在大量普通的 PC 机上高性能计算。第二部分描述基本的编程模型和一些使用案例。第三部分描述了一个经过裁剪的、适合我们的基于集群的计算环境MapReduce 实现。第四部分描述我们认为在 MapReduce 编程模型中一些实用的技巧。第五部分对于各种不同的任务,测量我们 MapReduce 实现的性能。第六部分揭示了在 Google 内部如何使用 MapReduce 作为基础重写我们的索引系统产品,包括其它一些使用 MapReduce 的经验。第七部分讨论相关的和未来的工作。2 编程模型MapReduce 编程模型的原理是:利用一个输入 key/value pair 集合来产生一个输出的 key/value pair 集合。MapReduce 库的用户用两个函数表达这个计算:Map 和 Reduce。用户自定义的 Map 函数接受一个输入的 key/value pair 值,然后产生一个中间 key/value pair 值的集合。MapReduce 库把所有具有相同中间 key 值 I 的中间 value 值集合在一起后传递给 reduce 函数。用户自定义的 Reduce 函数接受一个中间 key 的值 I 和相关的一个 value 值的集合。Reduce 函数合并这些value 值,形成一个较小的 value 值的集合。一般的,每次 Reduce 函数调用只产生 0 或 1 个输出 value 值。通常我们通过一个迭代器把中间 value 值提供给 Reduce 函数,这样我们就可以处理无法全部放入内存中的大量 的 value 值的集合。2.1 例子例如,计算一个大的文档集合中每个单词出现的次数,下面是伪代码段:map(String key, String value): // key: document name // value: document contents for each word w in value: EmitIntermediate(w, “1″); reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result));Map 函数输出文档中的每个词、以及这个词的出现次数(在这个简单的例子里就是 1)。Reduce 函数把 Map函数产生的每一个特定的词的计数累加起来。另外,用户编写代码,使用输入和输出文件的名字、可选的调节参数来完成一个符合 MapReduce 模型规范的对象,然后调用 MapReduce 函数,并把这个规范对象传递给它。用户的代码和 MapReduce 库链接在一起(用 C++实现)。附录 A 包含了这个实例的全部程序代码。2.2 类型尽管在前面例子的伪代码中使用了以字符串表示的输入输出值,但是在概念上,用户定义的Map和Reduce函数都有相关联的类型:map(k1,v1) ->list(k2,v2) reduce(k2,list(v2)) ->list(v2)比如,输入的 key 和 value 值与输出的 key 和 value 值在类型上推导的域不同。此外,中间 key 和 value 值与输出 key 和 value 值在类型上推导的域相同。2 我们的 C++中使用字符串类型作为用户自定义函数的输入输出,用户在自己的代码中对字符串进行适当 的类型转换。2.3 更多的例子这里还有一些有趣的简单例子,可以很容易的使用 MapReduce 模型来表示:分布式的 Grep:Map 函数输出匹配某个模式的一行,Reduce 函数是一个恒等函数,即把中间数据复制到输出。计算 URL 访问频率:Map 函数处理日志中 web 页面请求的记录,然后输出(URL,1)。Reduce 函数把相同URL 的 value 值都累加起来,产生(URL,记录总数)结果。倒转网络链接图:Map 函数在源页面(source)中搜索所有的链接目标(target)并输出为(target,source)。Reduce 函数把给定链接目标(target)的链接组合成一个列表,输出(target,list(source))。每个主机的检索词向量:检索词向量用一个(词,频率)列表来概述出现在文档或文档集中的最重要的一些词。Map 函数为每一个输入文档输出(主机名,检索词向量),其中主机名来自文档的 URL。Reduce 函数接收给定主机的所有文档的检索词向量,并把这些检索词向量加在一起,丢弃掉低频的检索词,输出一个最终的(主机名,检索词向量)。倒排索引:Map 函数分析每个文档输出一个(词,文档号)的列表,Reduce 函数的输入是一个给定词的所有(词,文档号),排序所有的文档号,输出(词,list(文档号))。所有的输出集合形成一个简单的倒排索引,它以一种简单的算法跟踪词在文档中的位置。分布式排序:Map 函数从每个记录提取 key,输出(key,record)。Reduce 函数不改变任何的值。这个运算依赖分区机制(在 4.1 描述)和排序属性(在 4.2 描述)。3 实现MapReduce 模型可以有多种不同的实现方式。如何正确选择取决于具体的环境。例如,一种实现方式适用于小型的共享内存方式的机器,另外一种实现方式则适用于大型 NUMA 架构的多处理器的主机,而有的实现方式更适合大型的网络连接集群。本章节描述一个适用于 Google 内部广泛使用的运算环境的实现:用以太网交换机连接、由普通 PC 机组成的大型集群。在我们的环境里包括:x86 架构、运行 Linux 操作系统、双处理器、2-4GB 内存的机器。普通的网络硬件设备,每个机器的带宽为百兆或者千兆,但是远小于网络的平均带宽的一半。集群中包含成百上千的机器,因此,机器故障是常态。存储为廉价的内置 IDE 硬盘。一个内部分布式文件系统用来管理存储在这些磁盘上的数据。文件系 统通过数据复制来在不可靠的硬件上保证数据的可靠性和有效性。用户提交工作(job)给调度系统。每个工作(job)都包含一系列的任务(task),调度系统将这些任 务调度到集群中多台可用的机器上。3.1 执行概括通过将 Map 调用的输入数据自动分割为 M 个数据片段的集合,Map 调用被分布到多台机器上执行。输入的数据片段能够在不同的机器上并行处理。使用分区函数将 Map 调用产生的中间 key 值分成 R 个不同分区(例如,hash(key) mod R),Reduce 调用也被分布到多台机器上执行。分区数量(R)和分区函数由用户来指定。图 1 展示了我们的 MapReduce 实现中操作的全部流程。当用户调用 MapReduce 函数时,将发生下面的一系列动作(下面的序号和图 1 中的序号一一对应):用户程序首先调用的 MapReduce 库将输入文件分成 M 个数据片度,每个数据片段的大小一般从16MB 到 64MB(可以通过可选的参数来控制每个数据片段的大小)。然后用户程序在机群中创建大量的程序副本。这些程序副本中的有一个特殊的程序–master。副本中其它的程序都是 worker 程序,由 master 分配任务。有 M 个 Map 任务和 R 个 Reduce 任务将被分配,master 将一个 Map 任务或 Reduce 任务分配给一个空闲的 worker。被分配了 map 任务的 worker 程序读取相关的输入数据片段,从输入的数据片段中解析出 key/valuepair,然后把 key/value pair 传递给用户自定义的 Map 函数,由 Map 函数生成并输出的中间 key/valuepair,并缓存在内存中。缓存中的 key/value pair 通过分区函数分成 R 个区域,之后周期性的写入到本地磁盘上。缓存的key/value pair 在本地磁盘上的存储位置将被回传给 master,由 master 负责把这些存储位置再传送给Reduce worker。当 Reduce worker 程序接收到 master 程序发来的数据存储位置信息后,使用 RPC 从 Map worker 所在主机的磁盘上读取这些缓存数据。当 Reduce worker 读取了所有的中间数据后,通过对 key 进行排序 后使得具有相同 key 值的数据聚合在一起。由于许多不同的 key 值会映射到相同的 Reduce 任务上,因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。Reduce worker 程序遍历排序后的中间数据,对于每一个唯一的中间 key 值,Reduce worker 程序将这个 key 值和它相关的中间 value 值的集合传递给用户自定义的 Reduce 函数。Reduce 函数的输出被追加到所属分区的输出文件。当所有的 Map 和 Reduce 任务都完成之后,master 唤醒用户程序。在这个时候,在用户程序里的对MapReduce 调用才返回。在成功完成任务之后,MapReduce 的输出存放在 R 个输出文件中(对应每个 Reduce 任务产生一个输出文件,文件名由用户指定)。一般情况下,用户不需要将这 R 个输出文件合并成一个文件–他们经常把这些文件作为另外一个MapReduce 的输入,或者在另外一个可以处理多个分割文件的分布式应用中使用。3.2 Master 数据结构Master 持有一些数据结构,它存储每一个 Map 和 Reduce 任务的状态(空闲、工作中或完成),以及 Worker机器(非空闲任务的机器)的标识。Master 就像一个数据管道,中间文件存储区域的位置信息通过这个管道从 Map 传递到 Reduce。因此,对于每个已经完成的 Map 任务,master 存储了 Map 任务产生的 R 个中间文件存储区域的大小和位置。当 Map任务完成时,Master 接收到位置和大小的更新信息,这些信息被逐步递增的推送给那些正在工作的 Reduce 任务。3.3 容错因为 MapReduce 库的设计初衷是使用由成百上千的机器组成的集群来处理超大规模的数据,所以,这个库必须要能很好的处理机器故障。3.3.1 worker 故障master 周期性的 ping 每个 worker。如果在一个约定的时间范围内没有收到 worker 返回的信息,master 将把这个 worker 标记为失效。所有由这个失效的 worker 完成的 Map 任务被重设为初始的空闲状态,之后这些任务就可以被安排给其他的 worker。同样的,worker 失效时正在运行的 Map 或 Reduce 任务也将被重新置为空闲状态,等待重新调度。当 worker 故障时,由于已经完成的 Map 任务的输出存储在这台机器上,Map 任务的输出已不可访问了,因此必须重新执行。而已经完成的 Reduce 任务的输出存储在全局文件系统上,因此不需要再次执行。当一个 Map 任务首先被 worker A 执行,之后由于 worker A 失效了又被调度到 worker B 执行,这个“重新执行”的动作会被通知给所有执行 Reduce 任务的 worker。任何还没有从 worker A 读取数据的 Reduce 任务将从 worker B 读取数据。MapReduce 可以处理大规模 worker 失效的情况。比如,在一个 MapReduce 操作执行期间,在正在运行的集群上进行网络维护引起 80 台机器在几分钟内不可访问了,MapReduce master 只需要简单的再次执行那些不可访问的worker 完成的工作,之后继续执行未完成的任务,直到最终完成这个 MapReduce 操作。3.3.2 master 失败一个简单的解决办法是让 master 周期性的将上面描述的数据结构的写入磁盘,即检查点(checkpoint)。如果这个 master 任务失效了,可以从最后一个检查点(checkpoint)开始启动另一个master 进程。然而,由于只有一个 master 进程,master 失效后再恢复是比较麻烦的,因此我们现在的实现是如果 master 失效,就中止 MapReduce 运算。客户可以检查到这个状态,并且可以根据需要重新执行 MapReduce操作。3.3.3 在失效方面的处理机制当用户提供的 Map 和 Reduce 操作是输入确定性函数(即相同的输入产生相同的输出)时,我们的分布式实现在任何情况下的输出都和所有程序没有出现任何错误、顺序的执行产生的输出是一样的。我们依赖对 Map 和 Reduce 任务的输出是原子提交的来完成这个特性。每个工作中的任务把它的输出写到私有的临时文件中。每个 Reduce 任务生成一个这样的文件,而每个 Map 任务则生成 R 个这样的文件(一个 Reduce 任务对应一个文件)。当一个 Map 任务完成的时,worker 发送一个包含 R 个临时文件名的完成消息给 master。如果 master 从一个已经完成的 Map 任务再次接收到到一个完成消息,master 将忽略这个消息;否则,master 将这 R 个文件的名字记录在数据结构里。当 Reduce 任务完成时,Reduce worker 进程以原子的方式把临时文件重命名为最终的输出文件。如果同一个 Reduce 任务在多台机器上执行,针对同一个最终的输出文件将有多个重命名操作执行。我们依赖底层文件系统提供的重命名操作的原子性来保证最终的文件系统状态仅仅包含一个 Reduce 任务产生的数据。使用 MapReduce 模型的程序员可以很容易的理解他们程序的行为,因为我们绝大多数的 Map 和 Reduce操作是确定性的,而且存在这样的一个事实:我们的失效处理机制等价于一个顺序的执行的操作。当 Map 和 Reduce 操作是不确定性的时候,我们提供虽然较弱但是依然合理的处理机制。当使用非确定操作的时候, 一个 Reduce 任务 R1 的输出等价于一个非确定性程序顺序执行产生时的输出。但是,另一个 Reduce 任务 R2的输出也许符合一个不同的非确定顺序程序执行产生的 R2 的输出。考虑 Map 任务 M 和 Reduce 任务 R1、R2 的情况。我们设定 e(Ri)是 Ri 已经提交的执行过程(有且仅有一个这样的执行过程)。当 e(R1)读取了由 M 一次执行产生的输出,而 e(R2)读取了由 M 的另一次执行产生的输出,导致了较弱的失效处理。3.4 存储位置在我们的计算运行环境中,网络带宽是一个相当匮乏的资源。我们通过尽量把输入数据(由 GFS 管理)存储在集群中机器的本地磁盘上来节省网络带宽。GFS 把每个文件按 64MB 一个 Block 分隔,每个 Block 保存在多台机器上,环境中就存放了多份拷贝(一般是 3 个拷贝)。MapReduce 的 master 在调度 Map 任务时会考虑输入文件的位置信息,尽量将一个 Map 任务调度在包含相关输入数据拷贝的机器上执行;如果上述努力失败了,master 将尝试在保存有输入数据拷贝的机器附近的机器上执行 Map 任务(例如,分配到一个和包含输入数 据的机器在一个 switch 里的 worker 机器上执行)。当在一个足够大的 cluster 集群上运行大型 MapReduce 操作的时候,大部分的输入数据都能从本地机器读取,因此消耗非常少的网络带宽。3.5 任务粒度如前所述,我们把 Map 拆分成了 M 个片段、把 Reduce 拆分成 R 个片段执行。理想情况下,M 和 R 应当比集群中 worker 的机器数量要多得多。在每台 worker 机器都执行大量的不同任务能够提高集群的动态的负载均衡能力,并且能够加快故障恢复的速度:失效机器上执行的大量 Map 任务都可以分布到所有其他的 worker机器上去执行。但是实际上,在我们的具体实现中对 M 和 R 的取值都有一定的客观限制,因为 master 必须执行 O(M+R)次调度,并且在内存中保存 O(MR)个状态(对影响内存使用的因素还是比较小的:O(MR)块状态,大概每对 Map 任务/Reduce 任务 1 个字节就可以了)。更进一步,R 值通常是由用户指定的,因为每个 Reduce 任务最终都会生成一个独立的输出文件。实际使用时我们也倾向于选择合适的 M 值,以使得每一个独立任务都是处理大约 16M 到 64M 的输入数据(这样, 上面描写的输入数据本地存储优化策略才最有效),另外,我们把 R 值设置为我们想使用的 worker 机器数量的小的倍数。我们通常会用这样的比例来执行 MapReduce:M=200000,R=5000,使用 2000 台 worker 机器。3.6 备用任务影响一个 MapReduce 的总执行时间最通常的因素是“落伍者”:在运算过程中,如果有一台机器花了很长的时间才完成最后几个 Map 或 Reduce 任务,导致 MapReduce 操作总的执行时间超过预期。出现“落伍者”的原因非常多。比如:如果一个机器的硬盘出了问题,在读取的时候要经常的进行读取纠错操作,导致读取数据的速度从 30M/s 降低到 1M/s。如果 cluster 的调度系统在这台机器上又调度了其他的任务,由于 CPU、内存、本地硬盘和网络带宽等竞争因素的存在,导致执行 MapReduce 代码的执行效率更加缓慢。我们最近遇到的一个问题是由于机器的初始化代码有 bug,导致关闭了的处理器的缓存:在这些机器上执行任务的性能和正常情况相差上百倍。我们有一个通用的机制来减少“落伍者”出现的情况。当一个 MapReduce 操作接近完成的时候,master调度备用(backup)任务进程来执行剩下的、处于处理中状态(in-progress)的任务。无论是最初的执行进程、还是备用(backup)任务进程完成了任务,我们都把这个任务标记成为已经完成。我们调优了这个机制,通常只会占用比正常操作多几个百分点的计算资源。我们发现采用这样的机制对于减少超大 MapReduce 操作的总处理时间效果显著。例如,在 5.3 节描述的排序任务,在关闭掉备用任务的情况下要多花 44%的时间完成排序任务。4 技巧虽然简单的 Map 和 Reduce 函数提供的基本功能已经能够满足大部分的计算需要,我们还是发掘出了一些有价值的扩展功能。本节将描述这些扩展功能。4.1 分区函数MapReduce 的使用者通常会指定 Reduce 任务和 Reduce 任务输出文件的数量(R)。我们在中间 key 上使用分区函数来对数据进行分区,之后再输入到后续任务执行进程。一个缺省的分区函数是使用 hash 方法(比如,hash(key) mod R)进行分区。hash 方法能产生非常平衡的分区。然而,有的时候,其它的一些分区函数对 key值进行的分区将非常有用。比如,输出的 key 值是 URLs,我们希望每个主机的所有条目保持在同一个输出文件中。为了支持类似的情况,MapReduce库的用户需要提供专门的分区函数。例如使用“hash(Hostname(urlkey)) mod R”作为分区函数就可以把所有来自同一个主机的 URLs 保存在同一个输出文件中。4.2 顺序保证我们确保在给定的分区中,中间 key/value pair 数据的处理顺序是按照 key 值增量顺序处理的。这样的顺序保证对每个分成生成一个有序的输出文件,这对于需要对输出文件按 key 值随机存取的应用非常有意义,对在排序输出的数据集也很有帮助。4.3 Combiner 函数在某些情况下,Map 函数产生的中间 key 值的重复数据会占很大的比重,并且,用户自定义的 Reduce 函数满足结合律和交换律。在 2.1 节的词数统计程序是个很好的例子。由于词频率倾向于一个 zipf 分布(齐夫分布),每个 Map 任务将产生成千上万个这样的记录。所有的这些记录将通过网络被发送到一个单独的Reduce 任务,然后由这个 Reduce 任务把所有这些记录累加起来产生一个数字。我们允许用户指定一个可选的 combiner 函数,combiner 函数首先在本地将这些记录进行一次合并,然后将合并的结果再通过网络发送出去。Combiner 函数在每台执行 Map 任务的机器上都会被执行一次。一般情况下,Combiner 和 Reduce 函数是一样的。Combiner 函数和 Reduce 函数之间唯一的区别是 MapReduce 库怎样控制函数的输出。Reduce 函数的输出被保存在最终的输出文件里,而 Combiner 函数的输出被写到中间文件里,然后被发送给 Reduce 任务。部分的合并中间结果可以显著的提高一些 MapReduce 操作的速度。附录 A 包含一个使用 combiner 函数的例子。4.4 输入和输出的类型MapReduce 库支持几种不同的格式的输入数据。比如,文本模式的输入数据的每一行被视为是一个key/value pair。key 是文件的偏移量,value 是那一行的内容。另外一种常见的格式是以 key 进行排序来存储的 key/value pair 的序列。每种输入类型的实现都必须能够把输入数据分割成数据片段,该数据片段能够由单独的 Map 任务来进行后续处理(例如,文本模式的范围分割必须确保仅仅在每行的边界进行范围分割)。虽然大多数 MapReduce 的使用者仅仅使用很少的预定义输入类型就满足要求了,但是使用者依然可以通过提供一 个简单的 Reader 接口实现就能够支持一个新的输入类型。Reader 并非一定要从文件中读取数据,比如,我们可以很容易的实现一个从数据库里读记录的 Reader,或者从内存中的数据结构读取数据的 Reader。类似的,我们提供了一些预定义的输出数据的类型,通过这些预定义类型能够产生不同格式的数据。用户采用类似添加新的输入数据类型的方式增加新的输出类型。4.5 副作用在某些情况下,MapReduce 的使用者发现,如果在 Map 和/或 Reduce 操作过程中增加辅助的输出文件会比较省事。我们依靠程序 writer 把这种“副作用”变成原子的和幂等的 3 。通常应用程序首先把输出结果写到一个临时文件中,在输出全部数据之后,在使用系统级的原子操作 rename 重新命名这个临时文件。如果一个任务产生了多个输出文件,我们没有提供类似两阶段提交的原子操作支持这种情况。因此,对于会产生多个输出文件、并且对于跨文件有一致性要求的任务,都必须是确定性的任务。但是在实际应用过程中,这个限制还没有给我们带来过麻烦。4.6 跳过损坏的记录有时候,用户程序中的 bug 导致 Map 或者 Reduce 函数在处理某些记录的时候 crash 掉,MapReduce 操作无法顺利完成。惯常的做法是修复 bug 后再次执行 MapReduce 操作,但是,有时候找出这些 bug 并修复它们不是一件容易的事情;这些 bug 也许是在第三方库里边,而我们手头没有这些库的源代码。而且在很多时候,忽略一些有问题的记录也是可以接受的,比如在一个巨大的数据集上进行统计分析的时候。我们提供了一种执行模式,在这种模式下,为了保证保证整个处理能继续进行,MapReduce会检测哪些记录导致确定性的crash,并且跳过这些记录不处理。每个 worker 进程都设置了信号处理函数捕获内存段异常(segmentation violation)和总线错误(bus error)。在执行 Map 或者 Reduce 操作之前,MapReduce 库通过全局变量保存记录序号。如果用户程序触发了一个系统信号,消息处理函数将用“最后一口气”通过 UDP 包向 master 发送处理的最后一条记录的序号。当 master看到在处理某条特定记录不止失败一次时,master 就标志着条记录需要被跳过,并且在下次重新执行相关的Map 或者 Reduce 任务的时候跳过这条记录。4.7 本地执行调试 Map 和 Reduce 函数的 bug 是非常困难的,因为实际执行操作时不但是分布在系统中执行的,而且通常是在好几千台计算机上执行,具体的执行位置是由 master 进行动态调度的,这又大大增加了调试的难度。为了简化调试、profile 和小规模测试,我们开发了一套 MapReduce 库的本地实现版本,通过使用本地版本的MapReduce 库,MapReduce 操作在本地计算机上顺序的执行。用户可以控制 MapReduce 操作的执行,可以把操作限制到特定的 Map 任务上。用户通过设定特别的标志来在本地执行他们的程序,之后就可以很容易的使用本地调试和测试工具(比如 gdb)。4.8 状态信息master 使用嵌入式的 HTTP 服务器(如 Jetty)显示一组状态信息页面,用户可以监控各种执行状态。状态信息页面显示了包括计算执行的进度,比如已经完成了多少任务、有多少任务正在处理、输入的字节数、中间数据的字节数、输出的字节数、处理百分比等等。页面还包含了指向每个任务的 stderr 和 stdout 文件的链接。用户根据这些数据预测计算需要执行大约多长时间、是否需要增加额外的计算资源。这些页面也可以用来分析什么时候计算执行的比预期的要慢。另外,处于最顶层的状态页面显示了哪些 worker 失效了,以及他们失效的时候正在运行的 Map 和 Reduce任务。这些信息对于调试用户代码中的 bug 很有帮助。4.9 计数器MapReduce 库使用计数器统计不同事件发生次数。比如,用户可能想统计已经处理了多少个单词、已经索引的多少篇 German 文档等等。为了使用这个特性,用户在程序中创建一个命名的计数器对象,在 Map 和 Reduce 函数中相应的增加计数器的值。例如:Counter* uppercase; uppercase = GetCounter(“uppercase”); map(String name, String contents): for each word w in contents: if (IsCapitalized(w)): uppercase->Increment(); EmitIntermediate(w, “1″);这些计数器的值周期性的从各个单独的worker机器上传递给master (附加在ping的应答包中传递)。master把执行成功的 Map 和 Reduce 任务的计数器值进行累计,当 MapReduce 操作完成之后,返回给用户代码。计数器当前的值也会显示在 master 的状态页面上,这样用户就可以看到当前计算的进度。当累加计数器的值的时候,master 要检查重复运行的 Map 或者 Reduce 任务,避免重复累加(之前提到的备用任务和失效后重新执行任务这两种情况会导致相同的任务被多次执行)。有些计数器的值是由 MapReduce 库自动维持的,比如已经处理的输入的 key/value pair 的数量、输出的key/value pair 的数量等等。计数器机制对于 MapReduce 操作的完整性检查非常有用。比如,在某些 MapReduce 操作中,用户需要确保输出的 key value pair 精确的等于输入的 key value pair,或者处理的 German 文档数量在处理的整个文档数量中属于合理范围。5 性能本节我们用在一个大型集群上运行的两个计算来衡量 MapReduce 的性能。一个计算在大约 1TB 的数据中进行特定的模式匹配,另一个计算对大约 1TB 的数据进行排序。这两个程序在大量的使用 MapReduce 的实际应用中是非常典型的 — 一类是对数据格式进行转换,从一种表现形式转换为另外一种表现形式;另一类是从海量数据中抽取少部分的用户感兴趣的数据。5.1 集群配置所有这些程序都运行在一个大约由 1800 台机器构成的集群上。每台机器配置 2 个 2G 主频、支持超线程的 Intel Xeon 处理器,4GB 的物理内存,两个 160GB 的 IDE 硬盘和一个千兆以太网卡。这些机器部署在一个两层的树形交换网络中,在 root 节点大概有 100-200GBPS 的传输带宽。所有这些机器都采用相同的部署(对等部署),因此任意两点之间的网络来回时间小于 1 毫秒。在 4GB 内存里,大概有 1-1.5G 用于运行在集群上的其他任务。测试程序在周末下午开始执行,这时主机的 CPU、磁盘和网络基本上处于空闲状态。5.2 GREP这个分布式的 grep 程序需要扫描大概 10 的 10 次方个由 100 个字节组成的记录,查找出现概率较小的 3个字符的模式(这个模式在 92337 个记录中出现)。输入数据被拆分成大约 64M 的 Block(M=15000),整个输出数据存放在一个文件中(R=1)。图 2 显示了这个运算随时间的处理过程。其中 Y 轴表示输入数据的处理速度。处理速度随着参与MapReduce 计算的机器数量的增加而增加,当 1764 台 worker 参与计算的时,处理速度达到了 30GB/s。当Map 任务结束的时候,即在计算开始后 80 秒,输入的处理速度降到 0。整个计算过程从开始到结束一共花了大概 150 秒。这包括了大约一分钟的初始启动阶段。初始启动阶段消耗的时间包括了是把这个程序传送到各个 worker 机器上的时间、等待 GFS 文件系统打开 1000 个输入文件集合的时间、获取相关的文件本地位置优化信息的时间。5.3 排序排序程序处理 10 的 10 次方个 100 个字节组成的记录(大概 1TB 的数据)。这个程序模仿 TeraSort benchmark[10]。排序程序由不到 50 行代码组成。只有三行的 Map 函数从文本行中解析出 10 个字节的 key 值作为排序的key,并且把这个 key 和原始文本行作为中间的 key/value pair 值输出。我们使用了一个内置的恒等函数作为Reduce 操作函数。这个函数把中间的 key/value pair 值不作任何改变输出。最终排序结果输出到两路复制的GFS 文件系统(也就是说,程序输出 2TB 的数据)。如前所述,输入数据被分成 64MB 的 Block(M=15000)。我们把排序后的输出结果分区后存储到 4000个文件(R=4000)。分区函数使用 key 的原始字节来把数据分区到 R 个片段中。在这个 benchmark 测试中,我们使用的分区函数知道 key 的分区情况。通常对于排序程序来说,我们会增加一个预处理的 MapReduce 操作用于采样 key 值的分布情况,通过采样的数据来计算对最终排序处理的分区点。图三(a)显示了这个排序程序的正常执行过程。左上的图显示了输入数据读取的速度。数据读取速度峰值会达到 13GB/s,并且所有 Map 任务完成之后,即大约 200 秒之后迅速滑落到 0。值得注意的是,排序程序输入数据读取速度小于分布式 grep 程序。这是因为排序程序的 Map 任务花了大约一半的处理时间和 I/O 带宽把中间输出结果写到本地硬盘。相应的分布式 grep 程序的中间结果输出几乎可以忽略不计。左边中间的图显示了中间数据从 Map 任务发送到 Reduce 任务的网络速度。这个过程从第一个 Map 任务完成之后就开始缓慢启动了。图示的第一个高峰是启动了第一批大概 1700 个 Reduce 任务(整个 MapReduce分布到大概 1700 台机器上,每台机器 1 次最多执行 1 个 Reduce 任务)。排序程序运行大约 300 秒后,第一批启动的 Reduce 任务有些完成了,我们开始执行剩下的 Reduce 任务。所有的处理在大约 600 秒后结束。左下图表示 Reduce 任务把排序后的数据写到最终的输出文件的速度。在第一个排序阶段结束和数据开始写入磁盘之间有一个小的延时,这是因为 worker 机器正在忙于排序中间数据。磁盘写入速度在 2-4GB/s 持续一段时间。输出数据写入磁盘大约持续 850 秒。计入初始启动部分的时间,整个运算消耗了 891 秒。这个速度和 TeraSort benchmark[18]的最高纪录 1057 秒相差不多。还有一些值得注意的现象:输入数据的读取速度比排序速度和输出数据写入磁盘速度要高不少,这是因为我们的输入数据本地化优化策略起了作用 — 绝大部分数据都是从本地硬盘读取的,从而节省了网络带宽。排序速度比输出数据写入到磁盘的速度快,这是因为输出数据写了两份(我们使用了 2 路的 GFS 文件系统,写入复制节点的原因是为了保证数据可靠性和可用性)。我们把输出数据写入到两个复制节点的原因是因为这是底层文件系统的保证数据可靠性和可用性的实现机制。如果底层文件系统使用类似容错编码14的方式而不是复制的方式保证数据的可靠性和可用性,那么在输出数据写入磁盘的时候,就可以降低网络带宽的使用。5.4 高效的 backup 任务图三(b)显示了关闭了备用任务后排序程序执行情况。执行的过程和图 3(a)很相似,除了输出数据写磁盘的动作在时间上拖了一个很长的尾巴,而且在这段时间里,几乎没有什么写入动作。在 960 秒后,只有5 个 Reduce 任务没有完成。这些拖后腿的任务又执行了 300 秒才完成。整个计算消耗了 1283 秒,多了 44%的执行时间。5.5 失效的机器在图三(c)中演示的排序程序执行的过程中,我们在程序开始后几分钟有意的 kill 了 1746 个 worker 中的 200 个。集群底层的调度立刻在这些机器上重新开始新的 worker 处理进程(因为只是 worker 机器上的处理进程被 kill 了,机器本身还在工作)。图三(c)显示出了一个“负”的输入数据读取速度,这是因为一些已经完成的 Map 任务丢失了(由于相应的执行 Map 任务的 worker 进程被 kill 了),需要重新执行这些任务。相关 Map 任务很快就被重新执行了。整个运算在 933 秒内完成,包括了初始启动时间(只比正常执行多消耗了 5%的时间)。6 经验我们在 2003 年 1 月完成了第一个版本的 MapReduce 库,在 2003 年 8 月的版本有了显著的增强,这包括了输入数据本地优化、worker 机器之间的动态负载均衡等等。从那以后,我们惊喜的发现,MapReduce 库能广泛应用于我们日常工作中遇到的各类问题。它现在在 Google 内部各个领域得到广泛应用,包括:大规模机器学习问题Google News 和 Froogle 产品的集群问题从公众查询产品(比如 Google 的 Zeitgeist)的报告中抽取数据。从大量的新应用和新产品的网页中提取有用信息(比如,从大量的位置搜索网页中抽取地理位置信 息)。大规模的图形计算。图四显示了在我们的源代码管理系统中,随着时间推移,独立的 MapReduce 程序数量的显著增加。从 2003年早些时候的0个增长到2004年9月份的差不多900个不同的程序。MapReduce的成功取决于采用MapReduce 库能够在不到半个小时时间内写出一个简单的程序,这个简单的程序能够在上千台机器的组成的集群上做大规模并发处理,这极大的加快了开发和原形设计的周期。另外,采用 MapReduce 库,可以让完全没有分布式和/或并行系统开发经验的程序员很容易的利用大量的资源,开发出分布式和/或并行处理的应用。在每个任务结束的时候,MapReduce 库统计计算资源的使用状况。在表 1,我们列出了 2004 年 8 月份 MapReduce 运行的任务所占用的相关资源。6.1 大规模索引到目前为止,MapReduce 最成功的应用就是重写了 Google 网络搜索服务所使用到的 index 系统。索引系统的输入数据是网络爬虫抓取回来的海量的文档,这些文档数据都保存在 GFS 文件系统里。这些文档原始内容 4 的大小超过了 20TB。索引程序是通过一系列的 MapReduce 操作(大约 5 到 10 次)来建立索引。使用MapReduce(替换上一个特别设计的、分布式处理的索引程序)带来这些好处:实现索引部分的代码简单、小巧、容易理解,因为对于容错、分布式以及并行计算的处理都是 MapReduce库提供的。比如,使用 MapReduce 库,计算的代码行数从原来的 3800 行 C++代码减少到大概 700 行代码。MapReduce 库的性能已经足够好了,因此我们可以把在概念上不相关的计算步骤分开处理,而不是混在一起以期减少数据传递的额外消耗。概念上不相关的计算步骤的隔离也使得我们可以很容易改变索引处理方式。比如,对之前的索引系统的一个小更改可能要耗费好几个月的时间,但是在使用 MapReduce 的新系统上,这样的更改只需要花几天时间就可以了。索引系统的操作管理更容易了。因为由机器失效、机器处理速度缓慢、以及网络的瞬间阻塞等引起的绝大部分问题都已经由 MapReduce 库解决了,不再需要操作人员的介入了。另外,我们可以通过在索引系统集群中增加机器的简单方法提高整体处理性能。7 相关工作很多系统都提供了严格的编程模式,并且通过对编程的严格限制来实现并行计算。例如,一个结合函数可以通过把 N 个元素的数组的前缀在 N 个处理器上使用并行前缀算法,在 log N 的时间内计算完[6,9,13] 5 。MapReduce 可以看作是我们结合在真实环境下处理海量数据的经验,对这些经典模型进行简化和萃取的成果。更加值得骄傲的是,我们还实现了基于上千台处理器的集群的容错处理。相比而言,大部分并发处理系统都只在小规模的集群上实现,并且把容错处理交给了程序员。Bulk Synchronous Programming[17]和一些 MPI 原语[11]提供了更高级别的并行处理抽象,可以更容易写出并行处理的程序。MapReduce 和这些系统的关键不同之处在于,MapReduce 利用限制性编程模式实现了用户程序的自动并发处理,并且提供了透明的容错处理。我们数据本地优化策略的灵感来源于 active disks[12,15]等技术,在 active disks 中,计算任务是尽量推送到数据存储的节点处理 6 ,这样就减少了网络和 IO 子系统的吞吐量。我们在挂载几个硬盘的普通机器上执行我们的运算,而不是在磁盘处理器上执行我们的工作,但是达到的目的一样的。我们的备用任务机制和 Charlotte System[3]提出的 eager 调度机制比较类似。Eager 调度机制的一个缺点是如果一个任务反复失效,那么整个计算就不能完成。我们通过忽略引起故障的记录的方式在某种程度上解决了这个问题。MapReduce 的实现依赖于一个内部的集群管理系统,这个集群管理系统负责在一个超大的、共享机器的集群上分布和运行用户任务。虽然这个不是本论文的重点,但是有必要提一下,这个集群管理系统在理念上和其它系统,如 Condor[16]是一样。MapReduce 库的排序机制和 NOW-Sort[1]的操作上很类似。读取输入源的机器(map workers)把待排序的数据进行分区后,发送到 R 个 Reduce worker 中的一个进行处理。每个 Reduce worker 在本地对数据进行排序(尽可能在内存中排序)。当然,NOW-Sort 没有给用户自定义的 Map 和 Reduce 函数的机会,因此不具备MapReduce 库广泛的实用性。River[2]提供了一个编程模型:处理进程通过分布式队列传送数据的方式进行互相通讯。和 MapReduce类似,River 系统尝试在不对等的硬件环境下,或者在系统颠簸的情况下也能提供近似平均的性能。River 是通过精心调度硬盘和网络的通讯来平衡任务的完成时间。MapReduce 库采用了其它的方法。通过对编程模型进行限制,MapReduce 框架把问题分解成为大量的“小”任务。这些任务在可用的 worker 集群上动态的调度,这样快速的 worker 就可以执行更多的任务。通过对编程模型进行限制,我们可用在工作接近完成的时候调度备用任务,缩短在硬件配置不均衡的情况下缩小整个操作完成的时间(比如有的机器性能差、或者机器被某些操作阻塞了)。BAD-FS[5]采用了和 MapReduce 完全不同的编程模式,它是面向广域网的。不过,这两个系统有两个基础功能很类似。(1)两个系统采用重新执行的方式来防止由于失效导致的数据丢失。(2)两个都使用数据本地化调度策略,减少网络通讯的数据量。TACC[7]是一个用于简化构造高可用性网络服务的系统。和 MapReduce 一样,它也依靠重新执行机制来实现的容错处理。8 结束语MapReduce 编程模型在 Google 内部成功应用于多个领域。我们把这种成功归结为几个方面:首先,由于MapReduce 封装了并行处理、容错处理、数据本地化优化、负载均衡等等技术难点的细节,这使得 MapReduce库易于使用。即便对于完全没有并行或者分布式系统开发经验的程序员而言;其次,大量不同类型的问题都可以通过 MapReduce 简单的解决。比如,MapReduce 用于生成 Google 的网络搜索服务所需要的数据、用来 排序、用来数据挖掘、用于机器学习,以及很多其它的系统;第三,我们实现了一个在数千台计算机组成的大型集群上灵活部署运行的 MapReduce。这个实现使得有效利用这些丰富的计算资源变得非常简单,因此也适合用来解决 Google 遇到的其他很多需要大量计算的问题。我们也从 MapReduce 开发过程中学到了不少东西。首先,约束编程模式使得并行和分布式计算非常容易,也易于构造容错的计算环境;其次,网络带宽是稀有资源。大量的系统优化是针对减少网络传输量为目的的:本地优化策略使大量的数据从本地磁盘读取,中间文件写入本地磁盘、并且只写一份中间文件也节约了网络带宽;第三,多次执行相同的任务可以减少性能缓慢的机器带来的负面影响同时解决了由于机器失效导致的数据丢失问题。
文章
存储  ·  缓存  ·  分布式计算  ·  并行计算  ·  负载均衡  ·  大数据  ·  程序员  ·  调度  ·  C++  ·  索引
2022-05-27
Panda处理文本和时序数据?首选向量化
Pandas中的向量化,就像6个Pandas一样说起Pandas中的属性接口,首先要从数据类型谈起。在任何一门编程语言中,虽然各自的数据类型有很多,比如数值型有int、long、double,字符串有str或者char类型,还有时间数据类型以及布尔数据类型等,可以说这数值型、字符串型、时间型以及布尔型基本覆盖了所有基本的数据类型。而像其他的数组、列表、字典等则都是集合类的数据结构,不属于基本数据类型。数值型操作是所有数据处理的主体,支持程度自不必说,布尔型数据在Pandas中其实也有较好的体现,即通过&、|、~三种位运算符也相当于是实现了向量化的并行操作,那么对于字符串和时间格式呢?其实这就是本文今天要分享的重点内容:属性接口——str、dt,两类接口均用几个小例子简单粗暴的进行示范,即学即用!严格意义上讲,Pandas中的属性接口除了str和dt外,还有枚举类型cat接口,但其实用法很小众,所以本文不予提及。01 字符串接口——str在Pandas中,当一列数据类型均为字符串类型时,则可对该列执行属性接口操作,即通过调用.str属性可调用一系列的字符串方法函数,比如split、strim等,还实现了正则表达式的绝大部分功能,包括查找、匹配和替换等、这对于Pandas处理文本数据来说简直是开挂一般的存在。举个例子,例如构造如下虚拟DataFrame数据,其中所有列都用到了字符串类型:df = pd.DataFrame({ "name":['GuanYu', 'zhangFei', 'zhao-yun', 'machao', 'huangzhong#'], "city":['湖北省荆州市', '四川省汉中市', '四川省成都市', '甘肃省西凉区', '四川省成都市'], "salary":['30-50K', '30-50k', '30-45k', '30-40k', '30-40k'], "helpers":['关平 周仓 廖化 马良', '张苞 魏延', '马云禄', '马岱 庞德', '严颜'] })对应数据表如下:观察数据可见,name列字符串格式不是很统一,既有大小写混乱,也有-、#等其他无用字符,city列相对规整,但马超所在列不是xx省xx市结构,而salary均有薪资上下限组成,最后helpers列则是一个复合类型,各部下之间用空格进行区分。针对这一数据,需要完成如下处理需求:规整姓名列,均变为小写形式且过滤无用字符提取所在城市信息计算平均薪资提取部下人数信息对于以上需求,用Pandas实现都非常之容易:姓名列统一小写,然后过滤掉非字母的字符,其中lower是Python字符串内置的通用方法,replace虽然是Pandas中的全局方法,但嵌套了一层str属性接口后即执行正则匹配的替换,这里即用到了正则表达式的匹配原则,即对a-z字母以外的其他字符替换为空字符:根据正则表达式,提取省市之间的城市信息,特别地,第二个关键词还可能是区,所以可用正则表达式中的findall提取功能,还需注意提取的限定关键字为前面以"省"开头、后面以"市"或"区"结束的中间字符,即是城市信息:计算平均薪资。由于这里的薪资字段其实还是比较规整的,即都是以K结尾(虽然可能有大小写之别),薪资上下限用-连接,所有其实有多种方法可以实现,这里举例其中的两种,其中第一种用到了字符串的切分函数,第二种方法仍然是正则匹配查找。两种方法均实现了两个数字的提取,进而可以完成上下限的均值计算。最后是提取下属信息,注意到这里的下属由一个字符串组成,且下属之间由空格间隔。针对这一需求,也可轻松实现两种解决方案,其中之一是进行拆分然后获取拆分后列表的长度、第二种是直接对字符串中空格进行计数,而后+1即为总的部下人数。两种方案结果是一致的:最后,给出str中的所有属性接口函数:02 时间属性接口——dt与str用法极其类似、对时间类型的数据处理极为友好的另一个属性接口是dt,即datetime的简称,要求适用于格式是时间类型的数据。由于时间类型在某些特定应用场景还是非常常用的,所以灵活运用dt属性接口也可实现非常便捷的数据处理操作。这里首先仍然给出示例数据:以上述时间序列数据为例,通过dt时间属性接口可以很容易的实现各类时间信息的提取,例如提取年份、日期和时间信息即可分别调用year、date和time属性即可。这里需要注意的是,在前述str属性接口中,多数dt后面接的都是函数,而这里获取的year、date和time等都是属性(因为无需参数),二者的区别体现为函数以()结尾,而属性则无需括号。但dt属性接口基本上都是这种属性接口,但也有一些是函数,例如指定类型的格式化完整的接口清单如下:基本上,时间格式中想得到的、想不到的基本都给予了实现,用来提取个时间信息简直是太方便了。03 小结一门编程语言中的基本数据类型无非就是数值型、字符串型、时间型以及布尔型,Pandas为了应对各种数据格式的向量化操作,针对字符串和时间格式数据专门提供了str和dt两个属性接口(数值型数据天然支持向量化操作,而布尔型也可通过位运算符&、|、~实现并行计算),通过调用属性接口后的系列方法,可以实现丰富的API以及高效的计算能力。另外,除了str和dt两个属性接口外还有一个枚举属性接口cat(即category缩写),但实际上用处较为局限。至此,Pandas应用小技巧系列文章已经推出了大部分,后续将视情整理一篇合集,敬请期待。
文章
并行计算  ·  数据处理  ·  API  ·  数据格式  ·  Python
2022-05-27
伙计,提高自己的并发技能,从锁优化开始!
锁是最常用的同步方法之一。在高并发的环境下,激烈的锁竞争会导致程序的性能下降。对于单任务或者单线程的应用而言,其主要资源消耗都花在任务本身,它既不需要维护并行数据结构间的一致性状态,也不需要为线程的切换和调度花费时间。对于多线程应用来说,系统除了处理功能需求外,还需要额外维护多线程环境的特有信息,如线程本身的元数据、线程的调度、线程上下文的切换等。并行计算之所以能提高系统的性能,并不是因为它"少干活"了,而是因为并行计算可以更合理地进行任务调度,充分利用各个CPU资源。如何提高锁性能减少锁持有时间对于使用锁进行并发控制的应用程序而言,在锁竞争过程中,单个线程对锁的持有时间与系统性能有着直接的关系。如果锁的持有锁时间越长,那么锁的竞争程度也就越激烈。简单来讲就是:要100个人填写信息表,但是只有一根笔,每个人如果没想好怎么填,那么每个人持有笔的时间就会很长,那么总的时间就会变长。因此减少对某个锁的持有时间,以减少线程间互斥。例如下面这段代码:public synchronized void synMethod(){ method1(); mainMethod(); method2(); }上面那段代码中,只有mainMethod()方法需要做同步控制,而method1()和method2()不需要做同步控制,那么上面那段在高并发的情况下对整个方法都进行了同步控制,如果method1()和method2()两个方法的耗时长,那么会导致整个程序的执行时间变长。因此我们可以选择下面这样优化:public void synMethod(){ method1(); synchronized(this){ mainMethod(); } method2(); }这样做的好处就是,只针对mainMethod()方法做了同步控制,锁占用的时间相对较短,因此能够有较高的并发度。较少锁的持有时间有助于降低锁冲突的可能性,进而提升系统的并发能力。减小锁粒度减小锁粒度也是一种削弱多线程锁竞争的有效手段。这种技术典型的使用场景就是ConcurrentHashMap类的实现。对ConcurrentHashMap有所了解的小伙伴应该知道,传统的HashTable之所以是线程安全的就是因为它是对整个方法加锁。而ConcurrentHashMap的性能比较高是因为它内部细分了若干个小的HashMap,称之为段(SEGMENT)。在默认情况下,一个ConcurrentHashMap类可以细分为16个端,性能相当于提升了16倍。在ConcurrentHashMap中增加一个数据,并不是对整个HashMap加锁,而是首先根据hashcode得出应该被存放在哪个段中,然后对该段加锁,并完成put()操作。当多个线程进行put()操作的时候,如果锁的不是同一个段,那么就可以实现并行操作。但是,减小锁粒度会带来一个新的问题:当系统需要取得全局锁时,其消耗的资源会比较多。例如:当ConcurrentHashMap调用size()方法时,需要或者所有子段的锁。虽然事实上,size()方法会先使用无锁的方式求和,如果失败才会尝试这种方式,但是在高并发的情况下,ConcurrentHashMap的性能依然要弱于同步的HashMap。减小锁粒度,就是指缩小锁定对象的范围,从而降低锁冲突的可能性,进而提高系统的并发能力用读写锁来替换独占锁读写分离锁可以有效地帮助减少锁竞争,提高系统性能。比如:A1、A2、A3三个线程进行写操作,B1、B2、B3三个线程进行读操作,如果使用重入锁或者内部锁,那么所有读之间,读与写之间,写之间都是串行操作。但是因为读操作并不会造成数据的完整性破坏,因此这种等待是不合理的。因此可以使用读写分离锁ReadWriteLock来提高系统的性能。使用示例如下:锁粗化通常情况下,为了保证多线程间的有效并发,会要求每个线程持有锁的时间尽量短,在使用完公共资源后,应该立即释放锁,只有这样,等待在这个锁上的其他线程才能尽早地获得资源执行任务。错误示例:public void synMethod(){ synchronized(this){ method1(); } synchronized(this){ method2(); } }优化后:public void synMethod(){ synchronized(this){ method1(); method2(); } }尤其是在循环中要注意锁的粗化错误示例:public void synMethod(){ for (int i = 1; i < n; i++) { synchronized(lock){ //do sth ... } } }优化后:synchronized(lock){ for (int i = 1; i < n; i++) { //do sth ... } }JVM进行的锁优化偏向锁锁偏向是一种针对加锁操作的优化手段。核心思想:如果一个线程获得了一个锁,那么这个锁就进入了***偏向模式***,当这个线程释放完这个锁后,下次同其他线程再次请求时,无须在做任何同步操作。这样就节省了大量的锁申请相关操作。但是在锁竞争比较激烈的场合,效果不佳,因为在竞争激烈的场合,最有可能的情况就是每次都是不同的线程来请求,这样偏向模式会失效,因此还不如不启用偏向锁。可以通过 JVM参数 -XX:+UseBiasedLocking开启偏向锁。轻量级锁如果偏向锁失败,那么虚拟机并不会立即挂起线程,它还会使用一种称为轻量级锁的优化手段。轻量级锁的操作也很方便,它只是简单地将对象头部作为指针指向持有锁的线程堆栈的头部,来判断一个线程是否持有对象锁。如果线程获得轻量级锁成功,则可以顺利进入临界区,如果轻量级锁加锁失败,则表示其他线程抢先争夺到了锁,那么当前线程的锁请求就会膨胀为重量级锁。自旋锁锁膨胀后,为了避免线程真实地在操作系统层面挂起,虚拟机还会做最后的努力——自旋锁。当前线程暂时获取不到锁,但是如果简单粗暴地将这个线程挂起是一种得不偿失的操作,因此虚拟机会让当前线程做几个空循环,在经过若干次循环后,如果可以得到锁,那么就顺利进入临界区。重量级锁如果经过自旋还不能获得锁,才会真的将线程在操作系统层面挂起,升级为 重量级锁 **锁消除Java虚拟机在 JIT 编译时,会通过对运行上下文进行扫描,去除不可能存在共享资源竞争的锁,通过锁消除,可以节省毫无意义的请求锁时间。public String[] createArrays() { Vector<Integer> vector = new Vector<>(); for (int i = 1; i < 100; i++) { vector.add(i); } return vector.toArray(new String[]{}); }上面一段代码中,因为 vector 这个变量是定义在createArrays()这个方法中,是一个局部变量,在线程栈中分配的,属于线程私有的数据,因此不存在资源竞争的情况。而Vector内部所有加锁同步都是没有必要的,如果虚拟机检测到这种情况,就会将这些无用的锁操作去除。锁消除涉及的一项关键技术为逃逸分析,所谓逃逸分析就是观察某一个变量是否会逃出某一个作用域。在上面例子中,变量vector 没有逃出createArrays()这个函数的方位,因此虚拟机才会就将这个变量的加锁操作去除。如果 createArrays()返回的不是 String数组,而是 vector 本身,那么就认为变量 vector 逃出了当前函数,会被其他线程所访问到。例如下面代码:public Vector<Integer> createList() { Vector<Integer> vector = new Vector<>(); for (int i = 1; i < 100; i++) { vector.add(i); } return vector; }ThreadLocal除了控制资源的访问外,我们还可以通过增加资源来保证所有对象的线程安全。简单来讲就是:要100个人填写信息表,我们可以分配100根笔给他们填写,人手一根,那么填写的速度也将大大增加。上面这个代码,如果没有同步控制则会出现java.lang.NumberFormatException: multiple points和java.lang.NumberFormatException: For input string: ""异常,因为SimpleDateFormat不是线程安全的,除非加锁控制。但是除了加锁我们还有没有其他方法呢,答案是有的,那就是使用ThreadLocal,每个线程分配一个SimpleDateFormat。为每一个线程分配不同的对象,需要在应用层面保证 ThreadLocal 只起到了简单的容器作用ThreadLocal的实现原理set()方法:public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); }先获取当前线程对象,然后通过getMap()方法拿到线程的ThreadLocalMap,并将值存入ThreadLocalMap中。可以简单把ThreadLocalMap理解为一个Map,其中key为当前线程对象,value便是我们所需要的值。get()方法:public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); }先获取到当前线程的ThreadLocalMap,然后通过将自己作为key取得内部的实际数据如果希望及时回收对象,我们应该使用ThreadLocal.remove()方法将这个变量移除,否则如果将一些大的对象设置到 ThreadLocal中,没有及时回收,会造成内存泄漏的可能。无锁锁分为乐观锁和悲观锁,而无锁就是一种乐观的策略,它是使用一种叫比较并交换(CAS,Compare And Swap)的技术来鉴别线程冲突,一旦检测到冲突发生,就重试当前操作直到没有冲突为止。比较并交换CAS的算法过程是:包含三个参数 CAS(V,E,N),其中V表示要更新的变量,E表示预期值,N表示新值。仅当V值等于E值时,才会将V值设置为N值。最后返回当前V的真实值。当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其他均会失败。失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。线程安全整数(AtomicInteger)AtomicInteger是在 JDK并发包中的atomic 中的,可以把它看作一个整数,与Integer不同的是,它是可变的,并且是线程安全的。对其进行修改等任何操作都是用 CAS 指令进行的。下面是AtomicInteger 的常用方法:public final int get() //取得当前值 public final void set(int newValue) //设置当前值 public final int getAndSet(int newValue) //设置新值,并返回旧值 public final boolean compareAndSet(int expect,int u) //如果当前值为expect,则设置为u public final int getAndIncrement() //当前值加1,返回旧值 public final int getAndDecrement() //当前值减1,返回旧值 public final int getAndAdd(int delta) //当前值增加delta。返回旧值 public final int incrementAndSet() //当前值加1,返回新值 public final int decrementAndSet() //当前值减1,返回新值 public final int addAndGet(int delta) //当前值增加delta,返回新值就内部实现上来说,AtomicInteger中保存了一个核心字段:private volatile int value;使用示例:可以看出,在多线程的情况下,AtomicInteger是保证线程安全的。无锁的对象引用(AtomicReference)AtomicReference 和AtomicInteger非常相似,不同之处就在于AtomicInteger是对整数的封装,而AtomicReference是对普通对象的引用,也就是它可以保证你在修改对象引用时的线程安全性。通常情况下线程判断被修改对象是否可以正确写入的条件是对象的当前值和期望值是否一致是正确的。但是有一种特殊的情况就是:当你获取对象当前数据后,在准备修改被新值前,对象的值被其他线程连续修改了两次,最后一次修改为旧值,这个时候线程在不知情的情况下,又对这个数据重新赋值。下图说明为例:带有时间戳的对象引用(AtomicStampedReference)AtomicReference无法解决上面的问题是因为,对象在修改成成中丢失了状态信息,因此我们只要能够记录对象在修改过程中的状态值,就可以很好的解决对象被反复修改的导致线程无法正确判断对象状态的问题。AtomicStampedReference它内部不经维护了对象值,还维护了一个时间戳,当AtomicStampedReference被修改的时候,除了更新数据本身外,还必须要更新时间戳。当AtomicStampedReference设置对象值时,对象值及时间戳都必须满足期望值,写入才会成功,因此,即使对象之被反复读写,写回原值,只要时间戳发生变化,就不能正确写入。
文章
并行计算  ·  安全  ·  算法  ·  Java  ·  调度  ·  容器
2022-05-27
简简单单:看Java8如何优雅的帮你成为开发业务大神
因为家人得关系,正好,最近也有时间,再梳理自己的知识点,打算明年找个技术试一下,遂整理再日常得开发工作中,常用得一些开发小技巧,开发环境就是我们的常用得Java8以及junit,个人觉得平时在联系得时候,junit还是非常好用得希望看到这份文章得朋友,能够自己动手实践一下,实践出真知,好啦,话不多说,我们开始正题吧函数式编程匿名函数λ演算流式编程基本原理在Java中流式编程的基本原理有两点。构建流数据流转(流水线)规约IntStream.rangeClosed(1, 100) // 1\. 构建流 .mapToObj(String::valueOf)// 2\. 数据流转(流水线) .collect(joining()); // 3\. 规约案例英雄的主位置一共有几类,分别是什么@Test fun t1() { // 英雄的主位置一共有几类,分别是什么 // 映射 val roleMains = heroes.map(Hero::getRoleMain) // 过滤为空的数据 .filter(Objects::nonNull) // 去重 .distinct() println(roleMains.size) println(roleMains) }@Test public void t1() { // 英雄的主位置一共有几类,分别是什么 List<String> roleMains = heroes.stream() // 映射 .map(Hero::getRoleMain) // 过滤为空的数据 .filter(Objects::nonNull) // 去重 .distinct() // 收集列表 .collect(toList()); System.out.println(roleMains.size()); System.out.println(roleMains); }英雄按主次位置分组后,输出每个分组有多少英雄,其中:近战英雄有多少位,远程英雄有多少位@Test fun t2() { // 英雄按主次位置分组后,输出每个分组有多少英雄,其中:近战英雄有多少位,远程英雄有多少位 // 主次位置分组的英雄数量 val groupHeroCount = heroes.groupingBy { Pair.of(it.roleMain, it.roleAssist) }.eachCount() // 主次分组后,再按攻击范围分组的英雄数量 val groupThenGroupCount = heroes.groupBy { Pair.of(it.roleMain, it.roleAssist) }.map { val value = it.value.groupingBy(Hero::getAttackRange).eachCount() Pair.of(it.key, value) }.associateBy({ it.left }, { it.value }) // 遍历输出 groupThenGroupCount.forEach { (groupKey, groupValue) -> val groupingCount = groupHeroCount[groupKey] print("英雄分组key为:$groupKey;英雄数量:$groupingCount;") groupValue.forEach { (countKey, countValue) -> print("英雄攻击范围:$countKey;英雄数量:$countValue;") } println() } }@Test public void t2() { // 英雄按主次位置分组后,输出每个分组有多少英雄,其中:近战英雄有多少位,远程英雄有多少位 // 主次位置分组的英雄数量 Map<Pair<String, String>, Long> groupHeroCount = heroes.stream() .collect(groupingBy(hero -> Pair.of(hero.getRoleMain(), hero.getRoleAssist()), counting())); // 主次分组后,再按攻击范围分组的英雄数量 Map<Pair<String, String>, Map<String, Long>> groupThenGroupCount = heroes.stream() .collect(groupingBy(hero -> Pair.of(hero.getRoleMain(), hero.getRoleAssist()), groupingBy(Hero::getAttackRange, counting()))); // 遍历输出 groupThenGroupCount.forEach((groupKey, groupValue) -> { Long groupingCount = groupHeroCount.get(groupKey); System.out.print("英雄分组key为:" + groupKey + ";英雄数量:" + groupingCount + ";"); groupValue.forEach((countKey, countValue) -> System.out.print("英雄攻击范围:" + countKey + ";英雄数量:" + countValue + ";")); System.out.println(); }); }求近战英雄HP初始值的加总@Test fun t3() { // 求近战英雄HP初始值的加总 val sum = heroes.filter { "近战" == it.attackRange } .map(Hero::getHpStart) .filter(Objects::nonNull) .reduce(BigDecimal::add) println("近战英雄HP初始值的加总为:$sum") }@Test public void t3() { // 求近战英雄HP初始值的加总 BigDecimal sum = heroes.stream() .filter(hero -> "近战".equals(hero.getAttackRange())) .map(Hero::getHpStart) .filter(Objects::nonNull) .reduce(BigDecimal.ZERO, BigDecimal::add); System.out.println("近战英雄HP初始值的加总为:" + sum); }通过最小列表收集器获取最小列表@Test public void t4() { // 通过最小列表收集器获取最小列表 List<BigDecimal> minAttackGrowth = heroes.stream() .map(Hero::getAttackGrowth) .collect(new MinListCollector<>()); System.out.println(minAttackGrowth); List<Hero> minHero = heroes.stream() .collect(new MinListCollector<>()); System.out.println(minHero); }import java.util.*; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collector; import java.util.stream.Collectors; import static java.util.stream.Collector.Characteristics.*; /** * 最小列表收集器 * * @author switch * @since 2020/8/18 */ public class MinListCollector<T extends Comparable<? super T>> implements Collector<T, List<T>, List<T>> { /** * 收集器的特性 * * @see Characteristics */ private final static Set<Characteristics> CHARACTERISTICS = Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH)); private final static int ZERO = 0; /** * 最小值 */ private final AtomicReference<T> min = new AtomicReference<>(); @Override public Supplier<List<T>> supplier() { // supplier参数用于生成结果容器,容器类型为A return ArrayList::new; } @Override public BiConsumer<List<T>, T> accumulator() { // accumulator用于消费元素,也就是归纳元素,这里的T就是元素,它会将流中的元素一个一个与结果容器A发生操作 return (list, element) -> { // 获取最小值 T minValue = min.get(); if (Objects.isNull(minValue)) { // 第一次比较 list.add(element); min.set(element); } else if (element.compareTo(minValue) < ZERO) { // 发现更小的值 list.clear(); list.add(element); min.compareAndSet(minValue, element); } else if (element.compareTo(minValue) == ZERO) { // 与最小值相等 list.add(element); } }; } @Override public BinaryOperator<List<T>> combiner() { // combiner用于两个两个合并并行执行的线程的执行结果,将其合并为一个最终结果A return (left, right) -> { // 最小值列表合并 List<T> leftList = getMinList(left); List<T> rightList = getMinList(right); leftList.addAll(rightList); return leftList; }; } private List<T> getMinList(List<T> list) { return list.stream() .filter(element -> element.compareTo(min.get()) == ZERO) .collect(Collectors.toList()); } @Override public Function<List<T>, List<T>> finisher() { // finisher用于将之前整合完的结果R转换成为A return Function.identity(); } @Override public Set<Characteristics> characteristics() { // characteristics表示当前Collector的特征值,这是个不可变Set return CHARACTERISTICS; } }优雅的空处理import org.junit.Test; import java.util.Optional; /** * @author switch * @since 2020/8/18 */ public class OptionalTests { @Test public void t1() { // orElse System.out.println(Optional.ofNullable(null).orElse("张三")); System.out.println(Optional.ofNullable(null).orElseGet(() -> "李四")); System.out.println(Optional.ofNullable("王五").orElseThrow(NullPointerException::new)); } @Test public void t2() { // isPresent Optional<String> name = Optional.ofNullable("张三"); if (name.isPresent()) { System.out.println(name.get()); } } @Test public void t3() { // map Optional<Integer> number = Optional.of("123456").map(Integer::valueOf); if (number.isPresent()) { System.out.println(number.get()); } } @Test public void t4() { // flatMap Optional<Integer> number = Optional.of("123456").flatMap(s -> Optional.of(Integer.valueOf(s))); if (number.isPresent()) { System.out.println(number.get()); } } @Test public void t5() { // 过滤 String number = "123456"; String filterNumber = Optional.of(number).filter(s -> !s.equals(number)).orElse("654321"); System.out.println(filterNumber); } }新的并发工具类CompletableFuture单机批处理多线程执行模型该模型适用于百万级量级的任务。超过千万数据,可以考虑分组,多机器并行执行。基本流程:从数据库获取Id列表拆分成n个子Id列表通过子Id列表获取关联数据(注意:都需要提供批量查询接口)映射到需要处理的Model(提交到CompletableFuture)->处理数据->收集成list)(java 8流式处理)收集的list进行join操作收集list模型模型原理:Stream+CompletableFuture+lambda简要解释:CompletableFuture是java8提供的一个工具类,主要是用于异步处理流程编排的。Stream是java8提供的一个集合流式处理工具类,主要用于数据的流水线处理。lambda在java中是基于内部匿名类实现的,可以大幅减少重复代码。总结:在该模型中Stream用于集合流水线处理、CompletableFuture解决异步编排问题(非阻塞)、lambda简化代码。数据流动List<List<String>> -> Stream<List<String>> -> Stream<List<Model>> -> Stream<CompletableFuture<List<Model>>> -> Stream<CompletableFuture<List<映射类型>>> -> List<CompletableFuture<Void>>案例ThreadPoolUtilimport org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; public final class ThreadPoolUtil { public static ThreadPoolTaskExecutor getDefaultExecutor(Integer poolSize, Integer maxPoolSize, Integer queueCapacity) { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setAllowCoreThreadTimeOut(true); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setCorePoolSize(poolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } }ThreadPoolConfigimport org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration public class ThreadPoolConfig { /** * 计算规则:N(thread) = N(cpu) * U(cpu) * (1 + w/c) * N(thread):线程池大小 * N(cpu):处理器核数 * U(cpu):期望CPU利用率(该值应该介于0和1之间) * w/c:是等待时间与计算时间的比率,比如说IO操作即为等待时间,计算处理即为计算时间 */ private static final Integer TASK_POOL_SIZE = 50; private static final Integer TASK_MAX_POOL_SIZE = 100; private static final Integer TASK_QUEUE_CAPACITY = 1000; @Bean("taskExecutor") public ThreadPoolTaskExecutor taskExecutor() { return ThreadPoolUtil.getDefaultExecutor(TASK_POOL_SIZE, TASK_MAX_POOL_SIZE, TASK_QUEUE_CAPACITY); } }getFuturesStreampublic Stream<CompletableFuture<List<Model>>> getFuturesStream(List<List<String>> idSubLists) { return idSubLists.stream() .map(ids -> CompletableFuture.supplyAsync(() -> modelService.listByIds(ids), taskExecutor) ); }standardisationpublic void standardisation() { List<CompletableFuture<Void>> batchFutures = getFuturesStream(idSubLists) .map(future -> future.thenApply(this::listByNormalize)) .map(future -> future.thenAccept(modelService::batchUpdateData)) .collect(Collectors.toList()); List<Void> results = batchFutures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); }调整线程池的大小《Java并发编程实战》一书中,Brian Goetz和合著者们为线程池大小的优化提供了不少中肯的建议。这非常重要,如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如你的应用所面临的情况,处理器的一些核可能就无法充分利用。Brian Goetz建议,线程池大小与处理器的利用率之比可以使用下面的公式进行估算:其中:是处理器的核的数目,可以通过Runtime.getRuntime().availableProcessors()得到是期望的CPU利用率(该值应该介于0和1之间) 是等待时间与计算时间的比率,比如说IO操作即为等待时间,计算处理即为计算时间并行——使用流还是CompletableFutures?对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,可以调整线程池的大小,而这能帮助确保整体的计算不会因为线程都在等待I/O而发生阻塞。使用这些API的建议如下:如果进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。反之,如果并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,可以依据等待/计算,或者的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性很难判断到底什么时候触发了等待。日期和时间API
文章
并行计算  ·  Java  ·  API  ·  数据库
2022-05-18
Python 脚本一个要注意的点
我发现有不少朋友写 Python 脚本非常随意,要么不用函数,要么函数随处定义,反正第一眼看不出要执行的第一行代码位于何处,这样的脚本可读性很差,而且容易隐藏 bug,解决这个问题很简单,当我们写 Python 脚本时,一定要加上这个:def main():    # do something    print("do something.")if __name__ == "__main__":    main()你可能要反对了:我怎么爽就怎么写,凭什么听你的,多写个 if __name__...?别急,让我说三个原因。第一,它让 Python 文件的作用更加明确首先需要明白 __name__ 的作用,当脚本直接被 Python 解释器执行时,其值就是 "__main__",当其被其他 Python 程序 import 的时候,其值就是对应的 Python 脚本文件名,可以在 Python 解释器验证下,假定有个 some_script.py 其内容如下:print("some_script.py")print(__name__)在 Python 解释器导入一下:❯ vim some_script.py❯ pythonPython 3.8.5 (v3.8.5:580fbb018f, Jul 20 2020, 12:11:27)[Clang 6.0 (clang-600.0.57)] on darwinType "help", "copyright", "credits" or "license" for more information.>>> import some_scriptsome_script.pysome_script>>>可以看到,__name__ 的值就是 Python 脚本的文件名 some_script。也就是说 if __name__ == "__main__": 后面的代码在 import 的时候是不会运行的。明白了这一点,if __name__ == "__main__": 就可以做为区分脚本和库的一个标志,当我们看到 if __name__ == "__main__": 时,就认为这一个可以直接运行的脚本,当没有看到这行代码时,就认为这是一个库,可以被其他程序引用,Explicit is better than implicit.,不是吗?再举个例子:假如你写了一个不带if __name__ == "__main__": 的脚本,叫 bad_script.py,内容如下:def useful_function(x):    return x * xclass UsefulClass:    def __init__(self, x):        self.x = x#你自己测试了一吧,没毛病for i in range(7):    print(useful_function(i))别人写了个 useful.py,引用了你的 useful_function:from bad_script import useful_functiondef main():    print(f'{useful_function(3)=}')if __name__ == '__main__':    main()一运行,发现打印了不可预期的内容查了半天原因,发现是你的脚本输出的,你说别人会不会骂你?假如你在自己脚本里定义了全局变量,别人如果在不合适的位置导入了 *,就会把你这个全局变量也导入,导致变量覆盖,很容易会出现 bug。第二,它让 Python 文件更加易读,对 IDE 友好有了 if __name__ == "__main__": 相当于 Python 程序也有了一个入口函数,所有的变量都从这里开始定义和使用,我们可以清晰的知道程序的逻辑开始于何处(当然还需要我们自觉的把程序的开始逻辑都放在这里)其实,这也是 PyCharm 推荐的做法,当你新建一个项目的时候,它默认创建的 main.py 就是长这样的:在if __name__ == "__main__": 的那一行的最左边也有一个绿色的运行按钮,点击一下,程序就从这一行开始运行了。为什么很多优秀的编程语言,比如 C、Java、Golang、C++ 都有一个 main 入口函数呢?我想很重要的一个原因就是就是程序入口统一,容易阅读。第三、多进程场景下,必须用 if main比如说你用多进程搞并行计算,写了这样的代码:import multiprocessing as mpdef useful_function(x):    return x * xprint("processing in parallel")with mp.Pool() as p:    results = p.map(useful_function, [1, 2, 3, 4])    print(results)当你运行的时候,会发现程序不停的在创建进程,同时也在不停的报错 RuntimeError,即使你 Ctrl C 也无法终止程序。而加上了 if __name__ == "__main__": 程序就会按照预期的进行:import multiprocessing as mpdef useful_function(x):    return x * xif __name__ == '__main__':    print("processing in parallel")    with mp.Pool() as p:        results = p.map(useful_function, [1, 2, 3, 4])        print(results)这是为什么呢?其实我是这样理解的,Python 的多程序就是启动了多个 Python 解释器,每个 Python 解释器都会导入你这个脚本,复制一份全局变量和函数给子进程用,如果有了if __name__ == "__main__":,那它后面的代码就不会被 import,也就不会被重复执行。否则,这个创建多进程的代码就会被 import,就会被执行,从而无限递归的去创建子进程,Python3 会报 RuntimeError,顺序是先创建进程,然后报错的,因此就会出现不停的创建进程,不停的报错,Ctrl C 也无法终止的现象,只能 kill 掉整个终端。这里有个官方解释[1]最后的话if __name__ == "__main__": 虽然不是强制的,但是基于上述三点原因,我强烈推荐你这么做,它是 Python 社区的约定,对应Python 之禅:明确优于隐晦。正如 _ 作为变量名的意思就是告诉读代码的人:这个变量不重要,后面也不会用到它。当你看到 Python 脚本有 if __name__ == "__main__": 时,就会意识到,这是一个可执行的脚本,当被其他程序导入时,这部分代码不会被执行,而多进程的程序中,这是必须的。参考资料官方解释: https://docs.python.org/zh-cn/3/library/multiprocessing.html#programming-guidelines
文章
开发框架  ·  并行计算  ·  IDE  ·  .NET  ·  Java  ·  Go  ·  开发工具  ·  C++  ·  Python
2022-05-27
重新学习 JavaScript 中 Async (异步)
异步事实上,程序中现在 运行的部分和将来 运行的部分之间的关系就是异步编程的核心。实际上,所有重要的程序(特别是 JavaScript 程序)都需要通过这样或那样的方法来管理这段时间间隙,这时可能是在等待用户输入、从数据库或文件系统中请求数据、通过网络发送数据并等待响应,或者是在以固定时间间隔执行重复任务(比如动画)。分块的程序可以把 JavaScript 程序写在单个 .js 文件中,但是这个程序几乎一定是由多个块构成的。这些块中只有一个是现在 执行,其余的则会在将来 执行。最常见的块 单位是函数。程序中将来 执行的部分并不一定在现在 运行的部分执行完之后就立即执行。换句话说,现在 无法完成的任务将会异步完成,因此并不会出现人们本能地认为会出现的或希望出现的阻塞行为。从现在 到将来 的“等待”,最简单的方法(但绝对不是唯一的,甚至也不是最好的!)是使用一个通常称为回调函数 的函数:ajax('http://example.com/', function(response) { console.log(response); });任何时候,只要把一段代码包装成一个函数,并指定它在响应某个事件(定时器、鼠标点击、Ajax 响应等)时执行,就是在代码中创建了一个将来 执行的块,也由此在这个程序中引入了异步机制。异步控制台并没有什么规范或一组需求指定 console.* 方法族如何工作——它们并不是 JavaScript 正式的一部分,而是由宿主环境添加到 JavaScript 中的。因此,不同的浏览器和 JavaScript 环境可以按照自己的意愿来实现,有时候这会引起混淆。在某些条件下,某些浏览器的 console.log() 并不会把传入的内容立即输出。出现这种情况的主要原因是,在许多程序(不只是 JavaScript)中,I/O 是非常低速的阻塞部分。所以,(从页面 /UI 的角度来说)浏览器在后台异步处理控制台 I/O 能够提高性能,这时用户甚至可能根本意识不到其发生。:::tip如果遇到这种少见的情况,最好的选择是在 JavaScript 调试器中使用断点,而不要依赖控制台输出。次优的方案是把对象序列化到一个字符串中,以强制执行一次“快照”,比如通过 JSON.stringify() 。:::事件循环JavaScript 的宿主环境提供了一种机制来处理程序中多个块的执行,且执行每块时调用 JavaScript 引擎,这种机制被称为事件循环 。换句话说,JavaScript 引擎本身并没有时间的概念,只是一个按需执行 JavaScript 任意代码片段的环境。“事件”(JavaScript 代码执行)调度总是由包含它的环境进行。// eventLoop 是一个队列 var eventLoop = []; var event; while (true) { // 一次 tick if (eventLoop.length > 0) { event = eventLoop.shift(); try { event(); } catch (error) { reportError(error); } } }事件循环,可以简单看做,有一个用 while 循环实现的持续运行的循环,循环的每一轮称为一个 tick。对每个 tick 而言,如果在队列中有等待事件,那么就会从队列中摘下一个事件并执行。这些事件就是回调函数。::: tip注意!setTimeout() 并没有把回调函数挂在事件循环队列中。它所做的是设定一个定时器。当定时器到时后,环境会把回调函数放在事件循环中,这样,在未来某个时刻的 tick 会摘下并执行这个回调。::: 所以换句话说就是,程序通常分成了很多小块,在事件循环队列中一个接一个地执行。严格地说,和你的程序不直接相关的其他事件也可能会插入到队列中。并行线程异步是关于现在 和将来 的时间间隙,而并行是关于能够同时发生的事情。并行计算最常见的工具就是进程 和线程 。进程和线程独立运行,并可能同时运行:在不同的处理器,甚至不同的计算机上,但多个线程能够共享单个进程的内存。事件循环把自身的工作分成一个个任务并顺序执行,不允许对共享内存的并行访问和修改。通过分立线程中彼此合作的事件循环,并行和顺序执行可以共存。并行线程的交替执行和异步事件的交替调度,其粒度是完全不同的。JavaScript 从不跨线程共享数据,这意味着不需要考虑这一层次的不确定性。但是这并不意味着 JavaScript 总是确定性的。// 尽管 later() 的所有内容被看作单独的一个事件循环队列表项, // 但如果考虑到这段代码是运行在一个线程中, // 实际上可能有很多个不同的底层运算。 function later () { answer = answer * 2; console.log(answer); }在单线程环境中,线程队列中的这些项目是底层运算确实是无所谓的,因为线程本身不会被中断。但如果是在并行系统中,同一个程序中可能有两个不同的线程在运转,这时很可能就会得到不确定的结果。var a = 20; function foo () { a = a + 1; } function bar () { a = a * 2; } ajax('/foo', foo); ajax('/bar', bar);所以,多线程编程是非常复杂的。因为如果不通过特殊的步骤来防止中断和交错运行的话,可能会得到出乎意料的、不确定的行为,通常这很让人头疼。由于 JavaScript 的单线程特性,foo() (以及 bar() )中的代码具有原子性。也就是说,一旦 foo() 开始运行,它的所有代码都会在 bar() 中的任意代码运行之前完成,或者相反。这称为完整运行 (run-to-completion)特性。在 JavaScript 的特性 中,函数顺序的不确定性就是通常所说的竞态条件 (race condition),foo() 和 bar() 相互竞争,看谁先运行。具体来说,因为无法可靠预测 a 和 b 的最终结果,所以才是竞态条件。并发设想一个展示状态更新列表(比如社交网络新闻种子)的网站,其随着用户向下滚动列表而逐渐加载更多内容。要正确地实现这一特性,需要(至少)两个独立的“进程”同时运行(也就是说,是在同一段时间内,并不需要在同一时刻)。第一个“进程”在用户向下滚动页面触发 onscroll 事件时响应这些事件(发起 Ajax 请求要求新的内容)。第二个“进程”接收 Ajax 响应(把内容展示到页面)。::: warning“进程”之所以打上引号,是因为这并不是计算机科学意义上的真正操作系统级进程。这是虚拟进程,或者任务,表示一个逻辑上相关的运算序列。之所以使用“进程”而不是“任务”,是因为从概念上来讲,“进程”的定义更符合这里使用的意义。:::两个或多个“进程”同时执行就出现了并发,不管组成它们的单个运算是否并行 执行(在独立的处理器或处理器核心上同时运行)。可以把并发看作“进程”级(或者任务级)的并行,与运算级的并行(不同处理器上的线程)相对。非交互两个或多个“进程”在同一个程序内并发地交替运行它们的步骤 / 事件时,如果这些任务彼此不相关,就不一定需要交互。如果进程间没有相互影响的话,不确定性是完全可以接受的 。交互更常见的情况是,并发的“进程”需要相互交流,通过作用域或 DOM 间接交互。如果出现这样的交互,就需要对它们的交互进行协调以避免竞态的出现。var res = []; function response (data) { res.push(data); } ajax('/foo', response); ajax('/bar', response);这里的并发“进程”是这两个用来处理 Ajax 响应的 response() 调用。它们可能以任意顺序运行。这种不确定性很有可能就是一个竞态条件 bug。所以,可以协调交互顺序来处理这样的竞态条件:var res = []; function response(data) { if (data.url == '/foo') { res[0] = data; } else if (data.url == '/bar') { res[1] = data; } } ajax('/foo', response); ajax('/bar', response);通过简单的协调,就避免了竞态条件引起的不确定性。协作还有一种并发合作方式,称为并发协作 (cooperative concurrency)。这里的重点不再是通过共享作用域中的值进行交互(尽管显然这也是允许的!)。这里的目标是取到一个长期运行的“进程”,并将其分割成多个步骤或多批任务,使得其他并发“进程”有机会将自己的运算插入到事件循环队列中交替运行。比如,遍历一个很长的列表进行值转换的 Ajax 响应处理函数,会使用 Array.prototype.map() 让代码更简洁:var res = []; function response(data) { res = res.concat( data.map(function (val) { return val * 2; }) ); } ajax('/foo', response); ajax('/bar', response);如果列表记录有上千万条,可能需要运行很久。这样的“进程”运行时,页面上的其他代码都不能运行,包括不能有其他的 response() 调用或 UI 刷新,甚至是像滚动、输入、按钮点击这样的用户事件。所以,要创建一个协作性更强更友好且不会霸占事件循环队列的并发系统,可以异步地批处理这些结果。每次处理之后返回事件循环,让其他等待事件有机会运行。一种简单的处理方法:var res = []; function response(data) { var chunk = data.splice(0,, 1000); res = res.concat( chunk.map(function (val) { return val * 2; }) ); if (data.length > 0) { // 异步调度下一次批处理 setTimeout(function () { response(data); }, 0); } } ajax('/foo', response); ajax('/bar', response);这里使用 setTimeout(fn, 0) (hack)进行异步调度,基本上它的意思就是“把这个函数插入到当前事件循环队列的结尾处”。不能保证会严格按照调用顺序处理,所以各种情况都有可能出现,比如定时 器漂移,在这种情况下,这些事件的顺序就不可预测。任务在 ES6 中,有一个新的概念建立在事件循环队列之上,叫作任务队列 (job queue)。任务队列可以理解为,是挂在事件循环队列的每个 tick 之后的一个队列。在事件循环的每个tick 中,可能出现的异步动作不会导致一个完整的新事件添加到事件循环队列中,而会在当前 tick 的任务队列末尾添加一个项目(一个任务)。像是在说:“这里还有一件事将来 要做,但要确保在其他任何事情发生之前就完成它。”一个任务可能引起更多任务被添加到同一个队列末尾。所以,理论上说,任务循环 (job loop)可能无限循环(一个任务总是添加另一个任务,以此类推),进而导致程序的饿死,无法转移到下一个事件循环 tick。任务和 setTimeout(fn, 0) hack 的思路类似,但是其实现方式的定义更加良好,对顺序的保证性更强:尽可能早的将来。语句顺序代码中语句的顺序和 JavaScript 引擎执行语句的顺序并不一定要一致。回调continuation// A ajax('/foo', function (data) { // C }); // B// C,会在未来的某个时刻被执行。回调函数包裹或者说封装了程序的延续(continuation)。嵌套回调与链式回调嵌套回调常常被称为回调地狱 (callback hell),有时也被称为毁灭金字塔 (pyramid of doom)。可以通过手工硬编码更好地线性(追踪)代码。但一旦你指定(也就是预先计划)了所有的可能事件和路径,代码就会变得非常复杂,以至于无法维护和更新。这才是回调地狱的真正问题所在!我们的顺序阻塞式的大脑计划行为无法很好地映射到面向回调的异步代码。这就是回调方式最主要的缺陷:对于它们在代码中表达异步的方式,我们的大脑需要努力才能同步得上。信任问题// A ajax('/foo', function (data) { // C }); // B// C 会延迟到 将来 发生,可能是在第三方的控制下。从根本上来说,这种控制的转移通常不会给程序带来很多问题。但是,请不要被这个小概率迷惑而认为这种控制切换不是什么大问题。实际上,这是回调驱动设计最严重(也是最微妙)的问题。它以这样一个思路为中心:有时候 ajax() (也就是你交付回调 continuation 的第三方)不是你编写的代码,也不在你的直接控制下。多数情况下,它是某个第三方提供的工具。把这称为控制反转 (inversion of control),也就是把自己程序一部分的执行控制交给某个第三方。在你的代码和第三方工具(一组你希望有人维护的东西)之间有一份并没有明确表达的契约。:::tip不管你怎么做,类型的检查 / 规范化的过程对于函数输入是很常见的,即使是对于理论上完全可以信任的代码。大体上说,这等价于那条地缘政治原则:“信任,但要核实。”:::但是,回调并没有为我们提供任何东西来支持核实检查行为。我们不得不自己构建全部的机制,而且通常为每个异步回调重复这样的工作最后都成了负担。回调最大的问题是控制反转,它会导致信任链的完全断裂。回调变体分离回调在这种设计下,API 的出错处理函数 failure() 常常是可选的,如果没有提供的话,就是假定这个错误可以吞掉。function success (data) { console.log(data); } function error (data) { console.error(data); } ajax('/foo', success, error);“error-first 风格”有时候也称为“Node 风格”,因为几乎所有 Node.js API 都采用这种风格,其中回调的第一个参数保留用作错误对象(如果有的话)。如果成功的话,这个参数就会被清空 / 置假(后续的参数就是成功数据)。不过,如果产生了错误结果,那么第一个参数就会被置起 / 置真(通常就不会再传递其他结果):function response(err, data) { if (err) { console.error(err); } else { console.log(data); } } ajax('/foo', response);不过这样还是存在一些问题,首先,这并没有像表面看上去那样真正解决主要的信任问题。这并没有涉及阻止或过滤不想要的重复调用回调的问题。另外,不要忽略这个事实:尽管这是一种你可以采用的标准模式,但是它肯定更加冗长和模式化,可复用性不高,所以你还得不厌其烦地给应用中的每个回调添加这样的代码。可以通过设置超时来取消事件,来应对完全不调用这个信任问题。function timeoutify (fn, delay) { var intv = setTimeout(function () { intv = null; fn(new Error('timeout')); }, delay); return function () { if (intv) { clearTimeout(intv); fn.apply(null, arguments); } }; } function foo (err, data) { if (err) { console.error(err); } else { console.log(data); } } ajax('/foo', timeoutify(foo, 1000));
文章
并行计算  ·  前端开发  ·  JavaScript  ·  API  ·  调度  ·  数据库
2022-05-26
英特尔开源 SYCLomatic 迁移工具,让跨架构开发变得更容易
近日,英特尔发布了一项开源工具,该工具可通过名为 SYCLomatic 的项目,将代码迁移至 SYCL,这有助于开发者更轻松地将 CUDA 代码迁移到 SYCL 和 C++,从而加速面向异构架构的跨架构编程。这个开源项目能让开发社区共同协作,以推动 SYCL 标准的采用,这是将开发者从单个厂商的封闭生态系统中解放出来的关键一步。SYCLomatic 项目地址:https://github.com/oneapi-src/SYCLomaticSYCL 是一种基于 C++ 的 Khronos Group 标准,它扩展了 C++ 功能以支持多种架构和非共享内存的配置。为启动该项目,英特尔开源了其 DPC++ 兼容性工具背后的技术,以进一步提升迁移能力,生成更多基于 SYCL 的应用。跨架构重复利用代码简化了开发工作,减少了持续维护代码所需的时间和成本。英特尔 oneAPI 专家 James Reinders 表示:“使用 SYCL 将代码迁移至 C++,为代码提供了更强的 ISO C++ 一致性,支持多家供应商以缓解供应商锁定问题,并且支持多种架构,为充分运用新的硬件创新提供灵活性。SYCLomatic 是一个可实现大部分工作自动化的宝藏工具,让开发者能更多地关注定制化调优,而不是迁移代码。”在硬件创新为计算带来多样化异构架构的同时,软件开发也已经变得越来越复杂,很难充分释放 CPU 和加速器的价值。如今的开发者及其团队普遍缺乏时间、金钱和资源,以适应代码的重写和测试,为这些不同的架构提升应用的性能。开发者正在寻求开放替代方案,以提升时间价值。英特尔正在提供一种更简单、更快捷的方式,让开发者进行硬件选择。值得一提的是,利用带有 LLVM 差异化的 Apache 2.0 许可证,托管在 GitHub 上的 SYCLomatic 项目为开发者提供了一个社区,让他们能做出贡献并提供反馈,以进一步开放跨 CPU、GPU 和 FPGA 的异构开发。SYCLomatic 能够协助开发者将 CUDA 代码迁移到 SYCL,通常可以将 90-95% 的 CUDA 代码自动迁移到 SYCL 代码。开发人员仅需手动编程剩余的工作,然后对特定架构进行定制化调优,达到所需的性能级别,便可完成这一过程。在将 SYCLomatic 项目开源后,众多开发者和机构表示已经从这项技术中受益。HACC(硬件 / 混合加速宇宙学代码)、宇宙物理学和先进计算(anl.gov)的 Steve(Esteban)Rangel 表示:“CRK-HACC 是一个正在开发的 N 体宇宙学模拟代码。为了给极光(Aurora)超级计算机做好准备,英特尔 DPC++ 兼容性工具能让我们将 20 多个核心快速迁移到 SYCL。由于当前版本的代码迁移工具不支持迁移到仿函数,我们编写了一个简单的 clang 工具来重构生成的 SYCL 源代码,以满足我们的需求。通过开源 SYCLomatic 项目,我们计划整合此前的工作,以获得更强大的解决方案,并助力让仿函数成为可用迁移选项的一部分。”SYCLomatic 项目地址:https://github.com/oneapi-src/SYCLomatic
文章
并行计算  ·  Apache  ·  开发者  ·  C++  ·  异构计算
2022-05-26
❤️Android 进程与线程 ❤️不好不要钱(下)
3、Android中的线程线程分为两种:UI/Main Thread (主线程)Worker Thread(工作线程)        一个线程总是由另一个线程启动,所以总有一个特殊的线程,叫做主线程。它是应用启动并执行的第一个线程。每次启动一个新工作线程,都会从主线程分出一条独立的线。3.1 UI/Main Thread (主线程)       启动应用程序时,系统会为应用程序创建一个执行线程,称为 "main"。该线程非常重要,因为它负责将事件发送到适当的用户界面小部件,包括绘图事件。与Android UI toolkit (来自Android.widget和Android.view包的组件)交互的线程。        因此,主线程有时也称为UI线程。但是,在特殊情况下,应用程序的主线程可能不是它的UI线程。注意:构建工具将@MainThread和 @UiThread注释视为可互换的,因此你可以@UiThread 从@MainThread方法中调用方法,反之亦然。但是,在系统应用程序在不同线程上具有多个视图的情况下,UI 线程可能与主线程不同。因此,你应该 @UiThread 使用 @MainThread.       在同一进程中运行的所有组件都在UI线程中实例化。        此外,Android UI toolkit不是线程安全的。因此,你不能从工作线程操作UI—你必须从UI线程对用户界面执行所有操作。因此,Android的单线程模型只有两条规则:不要阻塞UI线程;不要在非UI线程访问 UI 。3.1.1 阻塞UI线程        如果所有事情都发生在UI线程中,那么执行长时间操作(如网络访问或数据库查询)将阻塞整个UI。发生ANR的原因:Activity超过5秒无响应;BroadcastReceiver超过10秒无响应。3.1.2 Worker Thread操作UI@Override protected void onCreate(@Nullable @org.jetbrains.annotations.Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_thread); //Worker Thread(工作线程) new Thread(new Runnable() { @Override public void run() { //操作UI线程 Toast.makeText(ThreadActivity.this,"我是Worker Thread",Toast.LENGTH_SHORT).show(); } }).start(); }运行后直接报错:2021-10-12 14:47:47.495 4122-4247/com.scc.demo E/AndroidRuntime: FATAL EXCEPTION: Thread-7 Process: com.scc.demo, PID: 4122 java.lang.RuntimeException: Can't toast on a thread that has not called Looper.prepare() at android.widget.Toast$TN.<init>(Toast.java:895) at android.widget.Toast.<init>(Toast.java:205) at android.widget.Toast.makeText(Toast.java:597) at android.widget.Toast.makeText(Toast.java:566) at com.scc.demo.actvitiy.ThreadActivity$1.run(ThreadActivity.java:18) at java.lang.Thread.run(Thread.java:919)3.2 Worker Thread(工作线程)        因不能阻塞主线程,但是有些耗时操作(如加载图片、网络请求等)非即时相应的则可以通过工作线程来执行。注意,你不能从UI线程或"主"线程以外的任何线程更新UI。为了解决这个问题,Android提供了几种从其他线程访问UI线程的方法:Activity.runOnUiThread(Runnable)View.post(Runnable)View.postDelayed(Runnable, long)3.2.1 样例:子线程访问UI线程public class ThreadActivity extends ActivityBase{ TextView tvName; @Override protected void onCreate(@Nullable @org.jetbrains.annotations.Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_thread); tvName = findViewById(R.id.tv_name); tvName.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { csThread(); startThread(); } }); } private void csThread(){ //Worker Thread(工作线程) new Thread(new Runnable() { @Override public void run() { //这样写直接报错 tvName.setText("我是Worker Thread---行路难!行路难!"); // ------强大的分割线------ // 下面几种方式都没问题 //第一种:Activity.runOnUiThread(Runnable) runOnUiThread(new Runnable() { @Override public void run() { tvName.setText("我是Worker Thread---行路难!行路难!"); Toast.makeText(ThreadActivity.this,"我是Worker Thread",Toast.LENGTH_SHORT).show(); } }); //第二种:View.post(Runnable) tvName.post(new Runnable() { @Override public void run() { tvName.setText("我是Worker Thread---行路难!行路难!"); Toast.makeText(ThreadActivity.this,"我是Worker Thread",Toast.LENGTH_SHORT).show(); } }); //第三种:View.postDelayed(Runnable, long) tvName.postDelayed(new Runnable() { @Override public void run() { tvName.setText("我是Worker Thread---行路难!行路难!"); Toast.makeText(ThreadActivity.this,"我是Worker Thread",Toast.LENGTH_SHORT).show(); } },1000); //第四种:Handler new Handler(Looper.getMainLooper()).post(new Runnable() { @Override public void run() { tvName.setText("我是Worker Thread---行路难!行路难!"); Toast.makeText(ThreadActivity.this,"我是Worker Thread",Toast.LENGTH_SHORT).show(); } }); } }).start(); } }子线程直接操作主线程报错信息:        理论上应该拿 3.1.2 Worker Thread 操作UI 时的报错信息。既然都能通过这种方式解决,就多举一个。2021-10-12 16:02:51.754 8635-8676/com.scc.demo E/AndroidRuntime: FATAL EXCEPTION: Thread-2 Process: com.scc.demo, PID: 8635 android.view.ViewRootImpl$CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views. at android.view.ViewRootImpl.checkThread(ViewRootImpl.java:8798) at android.view.ViewRootImpl.requestLayout(ViewRootImpl.java:1606) at android.view.View.requestLayout(View.java:25390) ... at android.widget.TextView.checkForRelayout(TextView.java:9719) at android.widget.TextView.setText(TextView.java:6311) ... at com.scc.demo.actvitiy.ThreadActivity$2.run(ThreadActivity.java:31) at java.lang.Thread.run(Thread.java:923)3.2.2 Android提供了几种从其他线程访问UI线程的方法源码 Activity.runOnUiThread(Runnable) public final void runOnUiThread(Runnable action) { if (Thread.currentThread() != mUiThread) { mHandler.post(action); } else { action.run(); } } View.post(Runnable) public boolean post(Runnable action) { final AttachInfo attachInfo = mAttachInfo; if (attachInfo != null) { return attachInfo.mHandler.post(action); } getRunQueue().post(action); return true; } View.postDelayed(Runnable, long) public boolean postDelayed(Runnable action, long delayMillis) { final AttachInfo attachInfo = mAttachInfo; if (attachInfo != null) { return attachInfo.mHandler.postDelayed(action, delayMillis); } getRunQueue().postDelayed(action, delayMillis); return true; } 你会发现他们都是使用 Handler 来完成的。所以在 3.2.1 的样例中咱可以使用 new Handler() 来完成更新 UI。3.3 线程的状态new:新建状态,new出来,还没有调用start。Runnable:可运行状态,调用start进入可运行状态,可能运行也可能没有运行,取决于操作系统的调度。Blocked:阻塞状态,被锁阻塞,暂时不活动,阻塞状态是线程阻塞在进入synchronized关键字修饰的方法或代码块(获取锁)时的状态。Waiting:等待状态,不活动,不运行任何代码,等待线程调度器调度,wait sleep。Timed Waiting:超时等待,在指定时间自行返回。Terminated:终止状态,包括正常终止和异常终止。3.4 开启线程的三种方式1:继承Thread重写run方法。2:实现Runnable重写run方法。3:实现Callable重写call方法。 private void startThread(){ //第一种:继承Thread重写run方法 new MyThread().start(); //第二种:实现Runnable重写run方法 new Thread(new MyRunanble()).start(); //第三种:实现Callable重写call方法 FutureTask<Integer> ft = new FutureTask<Integer>(new MyCallable()); new Thread(ft).start(); } class MyThread extends Thread{ @Override public void run() { MLog.e(this.getClass().getName()); } } class MyRunanble implements Runnable{ @Override public void run() { MLog.e(this.getClass().getName()); } } class MyCallable implements Callable { @Override public Object call() throws Exception { MLog.e(this.getClass().getName()); return null; } }小结实现Callable和实现Runnable类似,但是功能更强大,具体表现在:可以在任务结束后提供一个返回值,Runnable不行。call方法可以抛出异常,Runnable的run方法不行。可以通过运行Callable得到的Fulture对象监听目标线程调用call方法的结果,得到返回值,(fulture.get(),调用后会阻塞,直到获取到返回值)。3.5 run()和start()方法区别run():方法只是线程的主体方法,和普通方法一样,不会创建新的线程。start():只有调用start()方法,才会启动一个新的线程,新线程才会调用run()方法,线程才会开始执行。3.6 wait、notify、notifyAllwait():释放obj的锁,导致当前的线程等待,直接其他线程调用此对象的notify()或notifyAll()方法。notify(): 唤醒在此对象监视器上等待的单个线程notifyAll(): 通知所有等待该竞争资源的线程注意:当要调用wait()或notify()/notifyAll()方法时,一定要放到synchronized(obj)代码中,否则会报错java.lang.IllegalMonitorStateException。当调用obj.notify/notifyAll后,调用线程依旧持有obj锁,因此等待线程虽被唤醒,但仍无法获得obj锁,直到调用线程退出synchronized块,释放obj锁后,其他等待线程才有机会获得锁继续执行。3.7 join、sleep、waitjoin()方法在等待的过程中释放对象锁。sleep()方法在睡眠时不释放对象锁,wait():释放对象锁3.8 线程阻塞1:线程执行了Thread.sleep(int millsecond)方法,放弃CPU,睡眠一段时间,一段时间过后恢复执行;2:线程执行一段同步代码,但无法获得相关的同步锁,只能进入阻塞状态,等到获取到同步锁,才能恢复执行;3:线程执行了一个对象的wait()方法,直接进入阻塞态,等待其他线程执行notify()/notifyAll()操作;4:线程执行某些IO操作,因为等待相关资源而进入了阻塞态,如System.in,但没有收到键盘的输入,则进入阻塞态。5:线程礼让,Thread.yield()方法,暂停当前正在执行的线程对象,把执行机会让给相同或更高优先级的线程,但并不会使线程进入阻塞态,线程仍处于可执行态,随时可能再次分得CPU时间。6:线程自闭,join()方法,在当前线程调用另一个线程的join()方法,则当前线程进入阻塞态,直到另一个线程运行结束,当前线程再由阻塞转为就绪态。7:线程执行suspend()使线程进入阻塞态,必须resume()方法被调用,才能使线程重新进入可执行状态。3.9 线程中断        使用 interrupt()中断,但调用interrupt()方法只是传递中断请求消息,并不代表要立马停止目标线程。然后通过抛出InterruptedException来唤醒它。public class Thread { // 中断当前线程 public void interrupt(); // 判断当前线程是否被中断 public boolen isInterrupt(); // 清除当前线程的中断状态,并返回之前的值 public static boolen interrupted(); }3.10 线程池ThreadPoolExecutor        线程池的工作原理:线程池可以减少创建和销毁线程的次数,从而减少系统资源的消耗。当一个任务提交到线程池时:1:首先判断核心线程池中的线程是否已经满了,如果没满,则创建一个核心线程执行任务,否则进入下一步。2:判断工作队列是否已满,没有满则加入工作队列,否则执行下一步。3:判断线程数是否达到了最大值,如果不是,则创建非核心线程执行任务,否则执行饱和策略,默认抛出异常。3.11 线程池的种类       FixedThreadPool:可重用固定线程数的线程池,只有核心线程,没有非核心线程,核心线程不会被回收,有任务时,有空闲的核心线程就用核心线程执行,没有则加入队列排队。       SingleThreadExecutor:单线程线程池,只有一个核心线程,没有非核心线程,当任务到达时,如果没有运行线程,则创建一个线程执行,如果正在运行则加入队列等待,可以保证所有任务在一个线程中按照顺序执行,和FixedThreadPool的区别只有数量。       CachedThreadPool:按需创建的线程池,没有核心线程,非核心线程有Integer.MAX_VALUE个,每次提交任务如果有空闲线程则由空闲线程执行,没有空闲线程则创建新的线程执行,适用于大量的需要立即处理的并且耗时较短的任务。       ScheduledThreadPoolExecutor:继承自ThreadPoolExecutor,用于延时执行任务或定期执行任务,核心线程数固定,线程总数为Integer.MAX_VALUE。3.12 保证线程安全        线程安全性体现在三个方法:       原子性:提供互斥访问,同一时刻只能有一个线和至数据进行操作。        JDK中提供了很多atomic类,如AtomicInteger\AtomicBoolean\AtomicLong,它们是通过CAS完成原子性。 JDK提供锁分为两种:synchronized依赖JVM实现锁,该关键字作用对象的作用范围内同一时刻只能有一个线程进行操作。另一种是LOCK,是JDK提供的代码层面的锁,依赖CPU指令,代表性是ReentrantLock。       可见性:一个线程对主内存的修改及时被其他线程看到。        JVM提供了synchronized和volatile,volatile的可见性是通过内存屏障和禁止重排序实现的,volatile会在写操作时,在写操作后加一条store屏障指令,将本地内存中的共享变量值刷新到主内存;会在读操作时,在读操作前加一条load指令,从内存中读取共享变量。       有序性:指令没有被编译器重排序。        可通过volatile、synchronized、Lock保证有序性。3.12 volatile、synchronized、Lock、ReentrantLock       volatile:解决变量在多个线程间的可见性,但不能保证原子性,只能用于修饰变量,不会发生阻塞。volatile能屏蔽编译指令重排,不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面。多用于并行计算的单例模式。volatile规定CPU每次都必须从内存读取数据,不能从CPU缓存中读取,保证了多线程在多CPU计算中永远拿到的都是最新的值。       synchronized:互斥锁,操作互斥,并发线程过来,串行获得锁,串行执行代码。解决的是多个线程间访问共享资源的同步性,可保证原子性,也可间接保证可见性,因为它会将私有内存和公有内存中的数据做同步。可用来修饰方法、代码块。会出现阻塞。synchronized发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生。非公平锁,每次都是相互争抢资源。       lock:一个接口,lock可以让等待锁的线程响应中断。在发生异常时,如果没有主动通过unLock()去释放锁,则可能造成死锁现象,因此使用Lock时需要在finally块中释放锁。       ReentrantLoc:可重入锁,锁的分配机制是基于线程的分配,而不是基于方法调用的分配。ReentrantLock有tryLock方法,如果锁被其他线程持有,返回false,可避免形成死锁。对代码加锁的颗粒会更小,更节省资源,提高代码性能。ReentrantLock可实现公平锁和非公平锁,公平锁就是先来的先获取资源。ReentrantReadWriteLock用于读多写少的场合,且读不需要互斥场景。相关推荐❤️Android Runtime (ART) 和 Dalvik❤️❤️Android Apk 的打包过程 ❤️❤️Android Apk的启动过程❤️❤️startActivity源码分析(含启动新应用) ❤️
文章
缓存  ·  并行计算  ·  安全  ·  Java  ·  调度  ·  数据库  ·  Android开发
2022-05-23
一文掌握物体检测库TensorFlow 2.x Object Detection安装
这是机器未来的第4篇文章写在前面:• 博客简介:专注AIoT领域,追逐未来时代的脉搏,记录路途中的技术成长!• 专栏简介:记录博主从0到1掌握物体检测工作流的过程,具备自定义物体检测器的能力• 面向人群:具备深度学习理论基础的学生或初级开发者• 专栏计划:接下来会逐步发布跨入人工智能的系列博文,敬请期待• Python零基础快速入门系列• 快速入门Python数据科学系列• 人工智能开发环境搭建系列• 机器学习系列• 物体检测快速入门系列• 自动驾驶物体检测系列• ......@[toc]1. 概述tensorflow object detection api一个框架,它可以很容易地构建、训练和部署对象检测模型,并且是一个提供了众多基于COCO数据集、Kitti数据集、Open Images数据集、AVA v2.1数据集和iNaturalist物种检测数据集上提供预先训练的对象检测模型集合。tensorflow object detection api是目前最主流的目标检测框架之一,主流的目标检测模型如图所示:snipaste20220513_0948282. 预置条件为了顺利按照本手册安装tensroflow object detection api,请参考Windows部署Docker GPU深度学习开发环境安装必备的工具。若自行创建安装条件,请确保已经满足以下条件• 支持python3.8以上版本• 支持cuda、cudnn(可选)• 支持git本手册使用docker运行环境。3. 安装步骤3.1 Docker环境3.1.1 启动docker启动docker桌面客户端,如图所示:13.1.2 启动容器在windows平台可以启动命令行工具或者windows terminal工具(App Store下载),这里使用terminal工具。输入以下命令,查看当前存在的images列表PS C:\Users\xxxxx> docker images REPOSITORY TAG IMAGE ID CREATED SIZE docker/getting-started latest bd9a9f733898 5 weeks ago 28.8MB tensorflow/tensorflow 2.8.0-gpu-jupyter cc9a9ae2a5af 6 weeks ago 5.99GB可以看到之前安装的tensorflow-2.8.0-gpu-jupyter镜像,现在基于这个镜像启动容器docker run --gpus all -itd -v e:/dockerdir/docker_work/:/home/zhou/ -p 8888:8888 -p 6006:6006 --ipc=host cc9a9ae2a5af jupyter notebook --no-browser --ip=0.0.0.0 --allow-root --NotebookApp.token= --notebook-dir='/home/zhou/'命令释义: docker run:表示基于镜像启动容器 --gpus all:不加此选项,nvidia-smi命令会不可用 -i: 交互式操作。 -t: 终端。 -d:后台运行,需要使用【docker exec -it 容器id /bin/bash】进入容器 -v e:/dockerdir/docker_work/:/home/zhou/:将windows平台的e:/dockerdir/docker_work/目录映射到docker的ubuntu系统的/home/zhou/目录下,实现windows平台和docker系统的文件共享 -p 8888:8888 -p 6006:6006:表示将windows系统的8888、6006端口映射到docker的8888、6006端口,这两个端口分别为jupyter notebook和tensorboard的访问端口 --ipc=host:用于多个容器之间的通讯 cc9a9ae2a5af:tensorflow-2.8.0-gpu-jupyter镜像的IMAGE ID jupyter notebook --no-browser --ip=0.0.0.0 --allow-root --NotebookApp.token= --notebook-dir='/home/zhou/': docker开机启动命令,这里启动jupyter3.1.3 使用vscode访问docker container启动vscode后,选择docker工具栏,在启动的容器上,右键选择附着到VsCode23.1.4 更换docker容器ubuntu系统的安装源为国内源在vscode软件界面上,选择【文件】-【打开文件夹】,选择根目录【/】,找到【/etc/apt/sources.list】,将ubuntu的安装源全部切换为aliyun源,具体操作为:将【archive.ubuntu.com】修改为【mirrors.aliyun.com】即可,修改后如下:# See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to # newer versions of the distribution. deb http://mirrors.aliyun.com/ubuntu/ focal main restricted # deb-src http://mirrors.aliyun.com/ubuntu/ focal main restricted ## Major bug fix updates produced after the final release of the ## distribution. deb http://mirrors.aliyun.com/ubuntu/ focal-updates main restricted # deb-src http://mirrors.aliyun.com/ubuntu/ focal-updates main restricted ## N.B. software from this repository is ENTIRELY UNSUPPORTED by the Ubuntu ## team. Also, please note that software in universe WILL NOT receive any ## review or updates from the Ubuntu security team. deb http://mirrors.aliyun.com/ubuntu/ focal universe # deb-src http://mirrors.aliyun.com/ubuntu/ focal universe deb http://mirrors.aliyun.com/ubuntu/ focal-updates universe # deb-src http://mirrors.aliyun.com/ubuntu/ focal-updates universe ## N.B. software from this repository is ENTIRELY UNSUPPORTED by the Ubuntu ## team, and may not be under a free licence. Please satisfy yourself as to ## your rights to use the software. Also, please note that software in ## multiverse WILL NOT receive any review or updates from the Ubuntu ## security team. deb http://mirrors.aliyun.com/ubuntu/ focal multiverse # deb-src http://mirrors.aliyun.com/ubuntu/ focal multiverse deb http://mirrors.aliyun.com/ubuntu/ focal-updates multiverse # deb-src http://mirrors.aliyun.com/ubuntu/ focal-updates multiverse ## N.B. software from this repository may not have been tested as ## extensively as that contained in the main release, although it includes ## newer versions of some applications which may provide useful features. ## Also, please note that software in backports WILL NOT receive any review ## or updates from the Ubuntu security team. deb http://mirrors.aliyun.com/ubuntu/ focal-backports main restricted universe multiverse # deb-src http://mirrors.aliyun.com/ubuntu/ focal-backports main restricted universe multiverse ## Uncomment the following two lines to add software from Canonical's ## 'partner' repository. ## This software is not part of Ubuntu, but is offered by Canonical and the ## respective vendors as a service to Ubuntu users. # deb http://archive.canonical.com/ubuntu focal partner # deb-src http://archive.canonical.com/ubuntu focal partner deb http://security.ubuntu.com/ubuntu/ focal-security main restricted # deb-src http://security.ubuntu.com/ubuntu/ focal-security main restricted deb http://security.ubuntu.com/ubuntu/ focal-security universe # deb-src http://security.ubuntu.com/ubuntu/ focal-security universe deb http://security.ubuntu.com/ubuntu/ focal-security multiverse # deb-src http://security.ubuntu.com/ubuntu/ focal-security multiverse• 执行如下命令,更新配置apt-get update;apt-get -f install; apt-get upgrade• 更多aliyun的源配置访问:阿里云安装源传送门3.1.5 验证GPU是否加载成功(在电脑有Nvidia显卡的情况下)• 输入nvidia-smi查看GPU使用情况,nvcc -V查询cuda版本root@cc58e655b170:/home/zhou# nvidia-smi Tue Mar 22 15:08:57 2022 +-----------------------------------------------------------------------------+ | NVIDIA-SMI 470.85 Driver Version: 472.47 CUDA Version: 11.4 | |-------------------------------+----------------------+----------------------+ | GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC | | Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. | | | | MIG M. | |===============================+======================+======================| | 0 NVIDIA GeForce ... Off | 00000000:01:00.0 Off | N/A | | N/A 48C P8 9W / N/A | 153MiB / 6144MiB | ERR! Default | | | | N/A | +-------------------------------+----------------------+----------------------+ +-----------------------------------------------------------------------------+ | Processes: | | GPU GI CI PID Type Process name GPU Memory | | ID ID Usage | |=============================================================================| | No running processes found | +-----------------------------------------------------------------------------+ root@cc58e655b170:/home/zhou# nvcc -V nvcc: NVIDIA (R) Cuda compiler driver Copyright (c) 2005-2021 NVIDIA Corporation Built on Sun_Feb_14_21:12:58_PST_2021 Cuda compilation tools, release 11.2, V11.2.152 Build cuda_11.2.r11.2/compiler.29618528_0从nvcc -V的日志,可以看出cuda版本为11.2• 输入以下命令,查询cuDNN版本python -c "import tensorflow as tf;print(tf.reduce_sum(tf.random.normal([1000, 1000])))"输出结果如下:root@cc58e655b170:/usr# python -c "import tensorflow as tf;print(tf.reduce_sum(tf.random.normal([1000, 1000])))" 2022-03-22 15:26:13.281719: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1525] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 3951 MB memory: -> device: 0, name: NVIDIA GeForce GTX 1660 Ti, pci bus id: 0000:01:00.0, compute capability: 7.5 tf.Tensor(-2613.715, shape=(), dtype=float32)从输出日志,可以看到GPU:NVIDIA GeForce GTX 1660 Ti已经加载到docker,cuDNN版本为7.53.2 Windows开发环境同Docker环境,验证cuda和cuDNN安装情况。3.3 下载tensorflow object detection api项目源码• 在home/zhou目录下创建tensorflow的目录cd /home/zhou; mkdir tensorflow; cd tensorflow• 下载源码git clone https://github.com/tensorflow/models.git下载完毕后,默认文件名名称为models-master, 将文件名重命名为models,保持文件名和平台一致mv models-matser models如果网速不好,直接下载zip压缩包吧3下载完毕后的文档结构如图所示:tensorflow/ └─ models/ ├─ community/ ├─ official/ ├─ orbit/ ├─ research/ └── ...3.4 安装配置protobufTensorflow对象检测API使用Protobufs来配置模型和训练参数。在使用框架之前,必须下载并编译Protobuf库。• 回到用户目录cd /home/zhou• 下载protobuf 这里下载的已经预编译好的protobufwget -c https://github.com/protocolbuffers/protobuf/releases/download/v3.19.4/protoc-3.19.4-linux-x86_64.zip• 解压 先执行mkdir protoc-3.19.4创建目录,然后执行unzip protoc-3.19.4-linux-x86_64.zip -d protoc-3.19.4/解压到制定目录protoc-3.19.4root@cc58e655b170:/home/zhou# mkdir protoc-3.19.4 root@cc58e655b170:/home/zhou# unzip protoc-3.19.4-linux-x86_64.zip -d protoc-3.19.4/ Archive: protoc-3.19.4-linux-x86_64.zip creating: protoc-3.19.4/include/ creating: protoc-3.19.4/include/google/ creating: protoc-3.19.4/include/google/protobuf/ inflating: protoc-3.19.4/include/google/protobuf/wrappers.proto inflating: protoc-3.19.4/include/google/protobuf/source_context.proto inflating: protoc-3.19.4/include/google/protobuf/struct.proto inflating: protoc-3.19.4/include/google/protobuf/any.proto inflating: protoc-3.19.4/include/google/protobuf/api.proto inflating: protoc-3.19.4/include/google/protobuf/descriptor.proto creating: protoc-3.19.4/include/google/protobuf/compiler/ inflating: protoc-3.19.4/include/google/protobuf/compiler/plugin.proto inflating: protoc-3.19.4/include/google/protobuf/timestamp.proto inflating: protoc-3.19.4/include/google/protobuf/field_mask.proto inflating: protoc-3.19.4/include/google/protobuf/empty.proto inflating: protoc-3.19.4/include/google/protobuf/duration.proto inflating: protoc-3.19.4/include/google/protobuf/type.proto creating: protoc-3.19.4/bin/ inflating: protoc-3.19.4/bin/protoc inflating: protoc-3.19.4/readme.txt• 配置protoc 在~/.bashrc文件的末尾添加如下代码export PATH=$PATH:/home/zhou/protoc-3.19.4/bin执行如下命令,使其生效source ~/.bashrc执行echo $PATH查看是否生效root@cc58e655b170:/home/zhou/protoc-3.19.4/bin# echo $PATH /home/zhou/protoc-3.19.4/bin:/home/zhou/protoc-3.19.4/bin:/home/zhou/protoc-3.19.4/bin:/root/.vscode-server/bin/c722ca6c7eed3d7987c0d5c3df5c45f6b15e77d1/bin/remote-cli:/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/home/zhou/protoc-3.19.4/bin可以看到protoc的安装目录/home/zhou/protoc-3.19.4/bin已经添加到PATH了。3.5 将proto后缀文件转换为python可识别格式• 切换目录cd /home/zhou/tensorflow/models/research/• 查看转换前的目录文件列表ls object_detection/protos/•4转换proto文件格式为python可识别序列化文件protoc object_detection/protos/*.proto --python_out=.• 转换后,如下所示ls object_detection/protos/53.6 安装coco api从TensorFlow 2.x开始, pycocotools包被列为对象检测API的依赖项。理想情况下,这个包应该在安装对象检测API时安装,如下面安装对象检测API一节所述,但是由于各种原因,安装可能会失败,因此更简单的方法是提前安装这个包,在这种情况下,稍后的安装将被跳过。pip install cython pip install git+https://github.com/philferriere/cocoapi.git#subdirectory=PythonAPI默认指标是基于Pascal VOC评估中使用的那些指标。要使用COCO对象检测指标,在配置文件的eval_config消息中添加metrics_set: "coco_detection_metrics"。要使用COCO实例分割度量,在配置文件的eval_config消息中添加metrics_set: "coco_mask_metrics"。3.7 安装object detection api• 当前的工作路径应为root@cc58e655b170:/home/zhou/tensorflow/models/research# pwd /home/zhou/tensorflow/models/research• 安装object detection apicp object_detection/packages/tf2/setup.py . python -m pip install --use-feature=2020-resolver .安装过程会持续一段时间,安装完毕后,可以执行如下代码,测试安装是否完成。python object_detection/builders/model_builder_tf2_test.py输出如下:...... I0322 16:48:09.677789 140205126002496 efficientnet_model.py:144] round_filter input=192 output=384 I0322 16:48:10.876914 140205126002496 efficientnet_model.py:144] round_filter input=192 output=384 I0322 16:48:10.877072 140205126002496 efficientnet_model.py:144] round_filter input=320 output=640 I0322 16:48:11.294571 140205126002496 efficientnet_model.py:144] round_filter input=1280 output=2560 I0322 16:48:11.337533 140205126002496 efficientnet_model.py:454] Building model efficientnet with params ModelConfig(width_coefficient=2.0, depth_coefficient=3.1, resolution=600, dropout_rate=0.5, blocks=(BlockConfig(input_filters=32, output_filters=16, kernel_size=3, num_repeat=1, expand_ratio=1, strides=(1, 1), se_ratio=0.25, id_skip=True, fused_conv=False, conv_type='depthwise'), BlockConfig(input_filters=16, output_filters=24, kernel_size=3, num_repeat=2, expand_ratio=6, strides=(2, 2), se_ratio=0.25, id_skip=True, fused_conv=False, conv_type='depthwise'), BlockConfig(input_filters=24, output_filters=40, kernel_size=5, num_repeat=2, expand_ratio=6, strides=(2, 2), se_ratio=0.25, id_skip=True, fused_conv=False, conv_type='depthwise'), BlockConfig(input_filters=40, output_filters=80, kernel_size=3, num_repeat=3, expand_ratio=6, strides=(2, 2), se_ratio=0.25, id_skip=True, fused_conv=False, conv_type='depthwise'), BlockConfig(input_filters=80, output_filters=112, kernel_size=5, num_repeat=3, expand_ratio=6, strides=(1, 1), se_ratio=0.25, id_skip=True, fused_conv=False, conv_type='depthwise'), BlockConfig(input_filters=112, output_filters=192, kernel_size=5, num_repeat=4, expand_ratio=6, strides=(2, 2), se_ratio=0.25, id_skip=True, fused_conv=False, conv_type='depthwise'), BlockConfig(input_filters=192, output_filters=320, kernel_size=3, num_repeat=1, expand_ratio=6, strides=(1, 1), se_ratio=0.25, id_skip=True, fused_conv=False, conv_type='depthwise')), stem_base_filters=32, top_base_filters=1280, activation='simple_swish', batch_norm='default', bn_momentum=0.99, bn_epsilon=0.001, weight_decay=5e-06, drop_connect_rate=0.2, depth_divisor=8, min_depth=None, use_se=True, input_channels=3, num_classes=1000, model_name='efficientnet', rescale_input=False, data_format='channels_last', dtype='float32') INFO:tensorflow:time(__main__.ModelBuilderTF2Test.test_create_ssd_models_from_config): 33.12s I0322 16:48:11.521103 140205126002496 test_util.py:2373] time(__main__.ModelBuilderTF2Test.test_create_ssd_models_from_config): 33.12s [ OK ] ModelBuilderTF2Test.test_create_ssd_models_from_config [ RUN ] ModelBuilderTF2Test.test_invalid_faster_rcnn_batchnorm_update INFO:tensorflow:time(__main__.ModelBuilderTF2Test.test_invalid_faster_rcnn_batchnorm_update): 0.0s I0322 16:48:11.532667 140205126002496 test_util.py:2373] time(__main__.ModelBuilderTF2Test.test_invalid_faster_rcnn_batchnorm_update): 0.0s [ OK ] ModelBuilderTF2Test.test_invalid_faster_rcnn_batchnorm_update [ RUN ] ModelBuilderTF2Test.test_invalid_first_stage_nms_iou_threshold INFO:tensorflow:time(__main__.ModelBuilderTF2Test.test_invalid_first_stage_nms_iou_threshold): 0.0s I0322 16:48:11.535152 140205126002496 test_util.py:2373] time(__main__.ModelBuilderTF2Test.test_invalid_first_stage_nms_iou_threshold): 0.0s [ OK ] ModelBuilderTF2Test.test_invalid_first_stage_nms_iou_threshold [ RUN ] ModelBuilderTF2Test.test_invalid_model_config_proto INFO:tensorflow:time(__main__.ModelBuilderTF2Test.test_invalid_model_config_proto): 0.0s I0322 16:48:11.535965 140205126002496 test_util.py:2373] time(__main__.ModelBuilderTF2Test.test_invalid_model_config_proto): 0.0s [ OK ] ModelBuilderTF2Test.test_invalid_model_config_proto [ RUN ] ModelBuilderTF2Test.test_invalid_second_stage_batch_size INFO:tensorflow:time(__main__.ModelBuilderTF2Test.test_invalid_second_stage_batch_size): 0.0s I0322 16:48:11.539124 140205126002496 test_util.py:2373] time(__main__.ModelBuilderTF2Test.test_invalid_second_stage_batch_size): 0.0s [ OK ] ModelBuilderTF2Test.test_invalid_second_stage_batch_size [ RUN ] ModelBuilderTF2Test.test_session [ SKIPPED ] ModelBuilderTF2Test.test_session [ RUN ] ModelBuilderTF2Test.test_unknown_faster_rcnn_feature_extractor INFO:tensorflow:time(__main__.ModelBuilderTF2Test.test_unknown_faster_rcnn_feature_extractor): 0.0s I0322 16:48:11.542018 140205126002496 test_util.py:2373] time(__main__.ModelBuilderTF2Test.test_unknown_faster_rcnn_feature_extractor): 0.0s [ OK ] ModelBuilderTF2Test.test_unknown_faster_rcnn_feature_extractor [ RUN ] ModelBuilderTF2Test.test_unknown_meta_architecture INFO:tensorflow:time(__main__.ModelBuilderTF2Test.test_unknown_meta_architecture): 0.0s I0322 16:48:11.543226 140205126002496 test_util.py:2373] time(__main__.ModelBuilderTF2Test.test_unknown_meta_architecture): 0.0s [ OK ] ModelBuilderTF2Test.test_unknown_meta_architecture [ RUN ] ModelBuilderTF2Test.test_unknown_ssd_feature_extractor INFO:tensorflow:time(__main__.ModelBuilderTF2Test.test_unknown_ssd_feature_extractor): 0.0s I0322 16:48:11.545147 140205126002496 test_util.py:2373] time(__main__.ModelBuilderTF2Test.test_unknown_ssd_feature_extractor): 0.0s [ OK ] ModelBuilderTF2Test.test_unknown_ssd_feature_extractor ---------------------------------------------------------------------- Ran 24 tests in 42.982s OK (skipped=1)看到结果为OK,则表示安装成功,接下来就可以开始物体检测之旅了。• 《物体检测快速入门系列》快速导航:• 物体检测快速入门系列(1)-基于Tensorflow2.x Object Detection API构建自定义物体检测器• 物体检测快速入门系列(2)-Windows部署GPU深度学习开发环境• 物体检测快速入门系列(3)-Windows部署Docker GPU深度学习开发环境• 物体检测快速入门系列(4)-TensorFlow 2.x Object Detection API快速安装手册
文章
机器学习/深度学习  ·  并行计算  ·  API  ·  TensorFlow  ·  算法框架/工具  ·  Docker  ·  异构计算  ·  Windows  ·  Python  ·  容器
2022-05-25
原理解析:如何让 Join 跑得更快?
前言JOIN 一直是数据库性能优化的老大难问题,本来挺快的查询,一旦涉及了几个 JOIN,性能就会陡降。而且,参与 JOIN 的表越大越多,性能就越难提上来。其实,让 JOIN 跑得快的关键是要对 JOIN 分类,分类之后,就能利用各种类型 JOIN 的特征来做性能优化了。正文一、JOIN 分类有 SQL 开发经验的同学都知道,绝大多数 JOIN 都是等值 JOIN,也就是关联条件为等式的 JOIN。非等值 JOIN 要少见得多,而且多数情况也可以转换成等值 JOIN 来处理,所以我们可以只讨论等值 JOIN。等值 JOIN 主要又可以分为两大类:外键关联和主键关联。外键关联是指用一个表的非主键字段,去关联另一个表的主键,前者称为事实表,后者为维表。比如下图中,订单表是事实表,客户表、产品表、雇员表是维表。imagepng外键表是多对一关系,而且是不对称的,事实表和维表的位置不能互换。需要说明的是,这里说的主键是指逻辑上的主键,也就是在表中取值唯一、可以用于唯一确定某条记录的字段(或字段组),不一定在数据库表上建立过主键。主键关联是指用一个表的主键关联另一个表的主键或部分主键。比如下图中客户和 VIP 客户、订单表和订单明细表的关联。imagepng客户和 VIP 客户按照主键关联,这两个表互为同维表。订单则是用主键去关联明细的部分主键,我们称订单表是主表,明细表是子表。同维表是一对一关系。且同维表之间是对称的,两个表的地位相同。主子表则是一对多关系,而且是不对称的,有明确的方向。仔细观察会发现,这两类 JOIN 都涉及到主键了。而不涉及主键的 JOIN 会导致多对多关系,大多数情况都没有业务意义。换句话说,上述这两大类 JOIN 涵盖了几乎全部有业务意义的 JOIN。如果我们能利用 JOIN 总会涉及主键这个特征做性能优化,能解决掉这两大类 JOIN,其实也就意味着解决了大部分 JOIN 性能问题。但是,SQL 对 JOIN 的定义并不涉及主键,只是两个表做笛卡尔积后再按某种条件过滤。这个定义很简单也很宽泛,几乎可以描述一切。但是,如果严格按这个定义去实现 JOIN,也就没办法在性能优化时利用主键的特征了。SPL 改变了 JOIN 的定义,专门针对这两大类 JOIN 分别处理,利用了主键的特征减少运算量,从而实现性能优化的目标。下面我们来看看 SPL 具体是怎么做的。二、外键关联如果事实表和维表都不太大,可以全部装入内存,SPL 提供了外键地址化方法:先把事实表中的外键字段值转换为对应维表记录的地址,之后引用维表字段时,就可以用地址直接取出了。以前面的订单表、雇员表为例,假定这两个表已经被读入内存。外键地址化的工作机制是这样的:对于订单表某记录 r 的 eid 字段,到雇员表中找到这个 eid 字段值对应的记录,得到其内存地址 a,再将 r 的 eid 字段值替换成 a。对订单表的所有记录都做好这样的转换,就完成了外键地址化。这时候,订单表记录 r 要引用雇员表字段时,直接用 eid 字段存储的地址 a 取出雇员表记录和字段就可以了,相当于常数时间内就能取得雇员表的字段,不需要再到雇员表做查找。可以在系统启动时把事实表和维表读入内存,并一次性做好外键地址化,即预关联。这样,在后续关联计算时就能直接用事实表外键字段中的地址去取维表记录,完成高性能的 JOIN 计算。外键地址化和预关联的详细原理请参考:【性能优化】6.1 [外键关联] 外键地址化SQL 通常使用 HASH 算法来做内存连接,需要计算 HASH 值和比对,性能会比直接用地址读取差很多。SPL 之所以能实现外键地址化,是利用了维表的关联字段是主键这一特征。上面例子中,关联字段 eid 是雇员表的主键,具有唯一性。订单表中的每个 eid 只会唯一对应一条雇员记录,所以才能把每个 eid 转换成它唯一对应的那条雇员记录的地址。而 SQL 对 JOIN 的定义中没有主键的约定,就不能认定与事实表中外键关联的维表记录有唯一性,有可能发生与多条记录关联的情况。对于订单表的记录来讲,eid 值没有办法唯一对应一条雇员记录,就无法做到外键地址化了。而且 SQL 也没有记录地址这种数据类型,结果会导致每次关联时还是要计算 HASH 值并比对。只是两个表 JOIN 时,外键地址化和 HASH 关联的差别还不是非常明显。这是因为 JOIN 并不是最终目的,JOIN 之后还会有其它很多运算,JOIN 本身运算消耗时间的占比相对不大。但事实表常常会有多个维表,甚至维表还会有很多层。比如订单关联产品,产品关联供应商,供应商关联城市,城市关联国家等等。在关联表很多时,外键地址化的性能优势会更明显。下面的测试,在关联表个数不同的情况下对比 SPL 与 Oracle 的性能差异,可以看出在表很多时,外键地址化的优势相当明显:imagepng对于只有维表能装入内存,而事实表很大需要外存的情况,SPL 提供了外键序号化方法:预先将事实表中的外键字段值转换为维表对应记录的序号。关联计算时,分批读入新事实表记录,再用序号取出对应维表记录。以上述订单表、产品表为例,假定产品表已经装入内存,订单表存储在外存中。外键序号化的过程是这样:先读入一批订单数据,设其中某记录 r 中的 pid 对应的是内存中产品表的第 i 条记录。我们要将 r 中的 pid 字段值转换为 i。对这批订单记录都完成这样的转换后,再做关联计算时,从外存中分批读入订单数据。对于其中的记录 r,就可以直接根据 pid 值,去内存中的产品表里用位置取出相应的记录,也避免了查找动作。数据库通常会把小表读入内存,再分批读入大表数据,用哈希算法做内存连接,需要计算哈希值和比对。而 SPL 使用序号定位是直接读取,不需要进行任何比对,性能优势比较明显。虽然预先把事实表的外键字段转换成序号需要一定成本,但这个预计算只需要做一次,而且可以在多次外键关联中得到复用。SPL 外键序号化同样利用了维表关联字段是主键的特征。如前所述,SQL 对 JOIN 的定义没有主键的约定,无法利用这一特征做到外键序号化。另外,SQL 使用无序集合的概念,即使我们事先把外键序号化了,数据库也无法利用这个特点,不能在无序集合上使用序号快速定位的机制,最快也就是用索引查找。而且,数据库并不知道外键被序号化了,仍然会去计算 HASH 值和比对。下面这个测试,在不同并行数情况下,对比 SPL 和 Oracle 完成大事实表、小维表关联计算的速度,SPL 跑的比 Oracle 快 3 到 8 倍。测试结果见下图:imagepng这个测试更详细的信息请参考:性能优化技巧:外键序号化。如果维表很大也需要外存,而事实表较小能装入内存,SPL 则提供了大维表查找机制。如果维表和事实表都很大,SPL 则使用单边分堆算法。对于维表过滤后再关联的情况,SPL 提供了索引复用方法及对位序列等方法。数据量大到需要分布式计算时,如果维表较小,SPL 采用复写维表机制,将维表在集群节点上复制多份;如果维表很大,则采用集群维表方法以保证随机访问。这两种方法都可以有效的避免 Shuffle 动作。相比而言,SQL 体系下不能区分出维表,HASH 拆分方法要将两个表都做 Shuffle 动作,网络传输量要大得多。三、主键关联主键关联涉及的表一般都比较大,需要存储在外存中。SPL 为此提供了有序归并方法:预先将外存表按照主键有序存储,关联时顺序取出数据做归并计算。以客户和 VIP 客户两个表做内连接为例,假设已经预先将两个表按照主键 cid 有序存储在外存中。关联时,从两个表的游标中读取记录,逐条比较 cid 值。如果 cid 相等,则将两表的记录合并成结果游标的一条记录返回。如果不相等,则 cid 小的那个游标再读取记录,继续判断。重复这些动作直到任何一个表的数据被取完,返回的游标就是 JOIN 的结果。对于两个大表关联,数据库通常使用哈希分堆算法,复杂度是乘法级的。而有序归并算法复杂度是加法级,性能会好很多。而且,数据库做大数据的外存运算时,哈希分堆会产生缓存文件的读写动作。有序归并算法则只需要对两个表依次遍历,不必借助外存缓存,可以大幅降低 IO 量,有巨大的性能优势。预先按照主键排序的成本虽高,但是一次性做好即可,以后就总能使用归并算法实现 JOIN,性能可以提高很多。同时,SPL 也提供了在有追加数据时仍然保持数据整体有序的方案。这类 JOIN 的特征在于关联字段是主键或部分主键,有序归并算法正是根据这个特征来设计的。因为不管是同维表还是主子表,关联字段都不会是主键之外的其他字段,所以我们将关联表按照主键有序这一种方式排序存储就可以了,不会出现冗余。而外键关联就不具备这个特征,不能使用有序归并。具体来说,是因为事实表的关联字段不是主键,会存在多个要参与关联的外键字段,我们不可能让同一个事实表同时按多个字段都有序。SQL 对 JOIN 的定义不区分 JOIN 类型,不假定某些 JOIN 总是针对主键的,就没办法从算法层面上利用主键关联的特征。而且,前面说过 SQL 基于无序集合概念,数据库不会刻意保证数据的物理有序性,很难实施有序归并算法。有序归并算法的优势还在于易于分段并行。以订单和订单明细按 oid 关联为例,假如将两表都按照记录数大致平均分为 4 段,订单第 2 段的 oid 有可能会出现在明细第 3 段,类似的错位会导致错误的计算结果。SPL 再次利用主键 oid 的有序性,提供同步分段机制,解决了这个问题:先将有序的订单表分为 4 段,再找到每一段起止记录的 oid 值形成 4 个区间,将明细表也分成同步的 4 段。这样,在并行计算时两表对应分段就不会出现错位了。由于明细表也对 oid 有序,可以迅速地按照起止 oid 定位,不会降低有序归并的性能。有序归并和同步分段并行的原理,详见:SPL 有序归并关联。传统的 HASH 分堆技术实现并行就比较困难了,多线程做 HASH 分堆时需要同时向某个分堆写出数据,造成共享资源冲突;而下一步实现某组分堆关联时又会消费大量内存,无法实施较大的并行数量。实际测试证明,在相同情况下,我们对两个大表做主键关联测试,结果是 SPL 比 Oracle 快了近 3 倍:除了有序归并,SPL 还提供了很多高性能算法,全面提高主键关联 JOIN 的计算速度。包括:附表机制,可以将多表一体化存储,减少存储数据量的同时,还相当于预先完成了关联,不需要再比对了;关联定位算法,实现先过滤再关联,可以避免全表遍历,获得更好的性能等等。当数据量继续增加,需要多台服务器集群时,SPL 提供复组表机制,将需要关联的大表按照主键分布到集群节点上。相同主键的数据在同一节点,避免分机之间的数据传输,也不会出现 Shuffle 动作。四、回顾与总结回顾上面两大类、各场景 JOIN,采用 SPL 分情况提供的高性能算法,可以利用不同类型 JOIN 的特征提速,让 JOIN 跑得更快。SQL 对上述这么多种 JOIN 场景笼统的处理,就没办法针对不同 JOIN 的特征来实施这些高性能算法。比如:事实表和维表都装入内存时,SQL 只能按照键值计算 HASH 和比对,无法利用地址直接对应;SQL 数据表无序,在大表按照主键关联时无法做到有序归并,只能使用 HASH 分堆,有可能会出现多次缓存的现象,性能有一定的不可控性。并行计算方面,SQL 单表计算时还容易做到分段并行,多表关联运算时一般就只能事先做好固定分段,很难做到同步动态分段,这就难以根据机器的负载临时决定并行数量。对于集群运算也是这样,SQL 在理论上不区分维表和事实表,要实现大表 JOIN 就会不可避免地产生占用大量网络资源的 HASH Shuffle 动作,在集群节点数太多时,网络传输造成的延迟会超过节点多带来的好处。SPL 设计并应用了新的运算和存储模型,可以在原理和实现上解决 SQL 的这些问题。对于 JOIN 的不同分类和场景,程序员有针对性的采取上述高性能算法,就能获得更快的计算速度,让 JOIN 跑得更快。SPL资料SPL官网SPL下载SPL源代码
文章
SQL  ·  存储  ·  缓存  ·  并行计算  ·  算法  ·  Oracle  ·  关系型数据库  ·  程序员  ·  数据库  ·  索引
2022-05-25
1 2 3 4 5 6 7 8 9
...
20
跳转至:
开发与运维
5195 人关注 | 125228 讨论 | 187671 内容
+ 订阅
  • Pandas用了这么久,有觉得哪里不好的地方吗?
  • 二十四、MapReduce工作机制
  • Linux 不懂权限管理,怎么玩骚操作?
查看更多 >
数据库
248843 人关注 | 44343 讨论 | 56810 内容
+ 订阅
  • 二十四、MapReduce工作机制
  • 阿里云ECS续领文章
  • SQL用了两年多,我最常用的2个小技巧
查看更多 >
云计算
21601 人关注 | 57834 讨论 | 36750 内容
+ 订阅
  • 详解Pandas读取csv文件时2个有趣的参数设置
  • 阿里云ECS续领文章
  • 全链路跟踪(压测)必备基础组件之线程上下文“三剑客”
查看更多 >
人工智能
2599 人关注 | 9268 讨论 | 62634 内容
+ 订阅
  • Pandas用了这么久,有觉得哪里不好的地方吗?
  • 二十四、MapReduce工作机制
  • 详解Pandas读取csv文件时2个有趣的参数设置
查看更多 >
大数据
184034 人关注 | 22893 讨论 | 52269 内容
+ 订阅
  • Pandas用了这么久,有觉得哪里不好的地方吗?
  • 二十四、MapReduce工作机制
  • 详解Pandas读取csv文件时2个有趣的参数设置
查看更多 >