泛函编程(20)-泛函库设计-Further Into Parallelism

简介:

    上两节我们建了一个并行运算组件库,实现了一些基本的并行运算功能。到现在这个阶段,编写并行运算函数已经可以和数学代数解题相近了:我们了解了问题需求,然后从类型匹配入手逐步产生题解。下面我们再多做几个练习吧。

在上节我们介绍了asyncF,它的类型款式是这样的:asyncF(f: A => B): A => Par[B],从类型款式(type signature)分析,asyncF函数的功能是把一个普通的函数 A => B转成A => Par[B],Par[B]是一个并行运算。也就是说asyncF可以把一个输入参数A的函数变成一个同样输入参数A的并行运算。asyncF函数可以把List[A],一串A值,按照函数A => B变成List[Par[A]],即一串并行运算。

例:函数f: (a: A) => a + 10:List(1,2,3).map(asyncF(f))=List(Par(1+10),Par(2+10),Par(3+10)),这些Par是并行运算的。但它们的运算结果需要另一个函数sequence来读取。我们从以上分析可以得出sequence的类型款式:


1 def sequence[A](lp: List[Par[A]]): Par[List[A]]

用sequence把List[Par[A]]转成Par[List[A]]后我们就可以用Par.map对List[A]进行操作了。List有map,我们可以再用map对A进行操作。在上一节我们做了个练习:


1 def parMap[A,B](l: List[A])(f: A => B): Par[List[B]]

parMap按List[A]产生了一串并行运算的函数f。我们可以从类型匹配着手一步一步推导:

1、lp: List[Par[B]] = l.map(asyncF(f))

2、pl: Par[List[B]] = sequence(lp) >>> parMap

再做个新的习题:用并行运算方式Filter List:


1 def parFilter[A](as: List[A])(f: A => Boolean): Par[List[A]]

 我们还是从类型匹配着手一步步推导:

1、asyncF( a => if(f(a)) List(a) else List() )  >>> Par[List[A]]

2、lpl: List[Par[List[A]]] = as.map( asyncF( a => if(f(a)) List(a) else List()))

3、pll: Par[List[List[A]]] = sequence(lpl)

4、map(pll){ a => a.flatten } >>> Par[List{A]]


1   def parFilter[A](as: List[A])(f: A => Boolean): Par[List[A]] = {
2        val pars: List[Par[List[A]]] = as.map(asyncF( (a: A) => if (f(a)) List(a) else List() ))
3        map(sequence(pars)){ a => a.flatten }
4   }                                               //> parFilter: [A](as: List[A])(f: A => Boolean)ch71.Par.Par[List[A]]

  测试结果:


 1 parFilter(List(10,29,13,3,6,48)){_ > 10}(es).get//> pool-1-thread-1
 2                                                   //| pool-1-thread-2
 3                                                   //| pool-1-thread-3
 4                                                   //| pool-1-thread-4
 5                                                   //| pool-1-thread-5
 6                                                   //| pool-1-thread-6
 7                                                   //| pool-1-thread-7
 8                                                   //| pool-1-thread-8
 9                                                   //| pool-1-thread-9
10                                                   //| pool-1-thread-10
11                                                   //| pool-1-thread-11
12                                                   //| pool-1-thread-12
13                                                   //| pool-1-thread-14
14                                                   //| pool-1-thread-16
15                                                   //| pool-1-thread-13
16                                                   //| pool-1-thread-15
17                                                   //| pool-1-thread-17
18                                                   //| res0: List[Int] = List(29, 13, 48)

再做一个计算字数的练习:用并行运算方式来计算List里的文字数。我们尽量用共性的方法来通用化解答。如果文字是以List装载的活,类型就是:List[String],举个实例:List("the quick fox","is running","so fast")。我们可以分两步解决:

1、"the quick fox".split(' ').size >>> 把字符串分解成文字并计算数量

2、List(A,B,C) >>> A.size + B.size + C.size >>> 把List里的文字数积合。

这两步可以分两个函数来实现:

1. f: A => B >>> 我们需要把这个函数转成并行运算:List[Par[B]]

2. g: List[B] => B


 1   def generalWordCount[A,B](as: List[A])(f: A => B)(g: List[B] => B): Par[B] = {
 2       val lp: List[Par[B]] = as.map(asyncF(f))
 3       val pl: Par[List[B]] = sequence(lp)
 4       map(pl)(g)
 5   }                                               //> generalWordCount: [A, B](as: List[A])(f: A => B)(g: List[B] => B)ch71.Par.P
 6                                                   //| ar[B]
 7   def wordCount(as: List[String]): Par[Int] = {
 8       generalWordCount(as)(_.split(' ').size)(_.sum)
 9   }                                               //> wordCount: (as: List[String])ch71.Par.Par[Int]
10   val lw = List("the quick silver fox", "is running","the one legged fog", "is hopping")
11                                                   //> lw  : List[String] = List(the quick silver fox, is running, the one legged 
12                                                   //| fog, is hopping)
13   wordCount(lw)(es).get                           //> pool-1-thread-1
14                                                   //| pool-1-thread-3
15                                                   //| pool-1-thread-2
16                                                   //| pool-1-thread-15
17                                                   //| pool-1-thread-16
18                                                   //| pool-1-thread-7
19                                                   //| pool-1-thread-10
20                                                   //| pool-1-thread-14
21                                                   //| pool-1-thread-6
22                                                   //| pool-1-thread-13
23                                                   //| pool-1-thread-9
24                                                   //| res7: Int = 12

相信大家对泛函编程的这种数学解题模式已经有了一定的了解。

在前面我们曾经提过现在的fork实现方式如果使用固定数量线程池的话有可能造成锁死:


1   val es = Executors.newFixedThreadPool(1)
2   val a = fork(async(40+2))
3   run(es)(a).get

我们再回顾一下fork的实现:


1     def fork[A](pa: => Par[A]): Par[A] = {
2         es => {
3             es.submit(new Callable[A] {
4               def call: A = run(es)(pa).get
5             })
6         }
7     }

可以看出我们提交的callable内部是一个run par,这个run会再提交一个callable然后锁定get。外面的callable必须等待内部callable的get锁定完成。所以这种fork实现是需要两个线程的。如果线程池无法再为内部callable提供线程的话,那么外面的callable就会处于永远等待中形成死锁。上面的parMap函数会按照List的长度分解出同等数量的并行运算,运行时会造成死锁吗?如果线程池不是固定数量线程的话,答案就是否定的:如果并行运算数量大于线程数,那么运算会分批进行:后面的运算可以等待前面的运算完成后释放出线程后继续运行,这里重点是前面的运算始终是可以完成的,所以不会造成死锁。

我们再看看现在所有的组件函数是否足够应付所有问题,还需不需要增加一些基本组件,这也是开发一个函数库必须走的过程;这就是一个不断更新的过程。

现在有个新问题:如果一个并行运算的运行依赖另一个并行运算的结果,应该怎样解决?先看看问题的类型款式:


1   def choice[A](pa: Par[Boolean])(ifTrue: Par[A], ifFalse: Par[A]): Par[A]

我们可能马上想到用map: map(pa){b => if(b) ifTrue else ifFalse}, 不过这样做的结果类型是:Par[Par[A]], 是代表我们需要新的组件函数来解决这个问题吗?我们先试着解这个题:


1   def choice[A](pa: Par[Boolean])(ifTrue: Par[A], ifFalse: Par[A]): Par[A] = {
2       es => if(run(es)(pa).get) run(es)(ifTrue) else run(es)(ifFalse)
3   }

我们可以看到现在choice是个最基本组件了。为了解决一个问题就创造一个新的组件不是泛函编程的风格。应该是用一些更基本的组件组合成一个描述这个问题的函数,那才是我们要采用的风格。我们应该试着用一个函数能把Par[Par[A]]变成Par[A],可能就可以用map了:

 

ppa: Par[Par[A]], 如果 run(es)(ppa).get 得到 pa: Par[A], 再run(es)(pa) >>> Future[A]。 Par[A] = es => Future[A],不就解决问题了嘛:


1   def join[A](ppa: Par[Par[A]]): Par[A] = {
2       es => {
3           run(es)(run(es)(ppa).get())
4       }
5   }

现在可以用map来实现choice了吧。但是,map是针对元素A来操作的,ifTrue和ifFalse都是Par[A],还无法使用map。那就先放放吧。

既然我们能在两个并行运算中选择,那么能在N个并行运算中选择不是能更抽象吗?


1   def choiceN[A](pb: Par[Int])(choices: List[Par[A]]): Par[A]

run(es)(pb).get 得出指数(index), choices(index)就是选择的运算了:


1   def choiceN[A](pb: Par[Int])(choices: List[Par[A]]): Par[A] = {
2       es => {
3           run(es)(choices(run(es)(pb).get))
4       }
5   }

从choiceN中我们可以发现一个共性模式:是一个选择函数:Int => Par[A]。再抽象一步我们把选择函数变成:A => Par[B]。这个函数就像之前接触过的flatMap函数的传入参数函数f一样的。我们先看看flatMap的类型款式:


1   def flatMap[A,B](pa: Par[A])(f: A => Par[B]): Par[B]

我们只要flatMap pb 传入 A => Par[B]就可以实现choiceN了: 


1   def flatMap[A,B](pa: Par[A])(f: A => Par[B]): Par[B] = {
2       es => {
3           run(es)(f(run(es)(pa).get))
4       }
5   }

有了flatMap,我们可以用它来实现choice,choiceN了:


1   def choiceByFlatMap[A](pb: Par[Boolean])(ifTrue: Par[A], ifFalse: Par[A]): Par[A] ={
2     flatMap(pb){a => if (a) ifTrue else ifFalse }
3   }
4   def choiceNByFlatMap[A](pb: Par[Int])(choices: List[Par[A]]): Par[A] = {
5       flatMap(pb){choices(_)}
6   }

在前面我们无法用map来实现choice,因为类型不匹配。加了一个join函数,又因为map元素类型不匹配,又不行。现在看来flatMap恰恰是我们需要解决choice的组件,而且flatMap能更抽象一层,连choiceN都一并解决了。值得注意的是我们在以上解决问题的过程中一再提及类型匹配,这恰恰体现了泛函编程就是函数解题的过程。

那么flatMap,join,map之间有没有什么数学关系呢?


1   def joinByFlatMap[A](ppa: Par[Par[A]]): Par[A] = {
2       flatMap(ppa){(x: Par[A]) => x}
3   }
4   def flatMapByJoin[A,B](pa: Par[A])(f: A => Par[B]): Par[B] = {
5       join(map(pa)(f))
6   }
7   def mapByFlatMap[A,B](pa: Par[A])(f: A => B): Par[B] = {
8       flatMap(pa) { a => unit(f(a)) }
9   }

它们之间的确可以用数学公式来表达。


相关文章
|
20天前
|
人工智能 数据可视化 安全
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
本文详解如何用阿里云Lighthouse一键部署OpenClaw,结合飞书CLI等工具,让AI真正“动手”——自动群发、生成科研日报、整理知识库。核心理念:未来软件应为AI而生,CLI即AI的“手脚”,实现高效、安全、可控的智能自动化。
34899 55
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
|
15天前
|
人工智能 自然语言处理 安全
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
本文介绍了Claude Code终端AI助手的使用指南,主要内容包括:1)常用命令如版本查看、项目启动和更新;2)三种工作模式切换及界面说明;3)核心功能指令速查表,包含初始化、压缩对话、清除历史等操作;4)详细解析了/init、/help、/clear、/compact、/memory等关键命令的使用场景和语法。文章通过丰富的界面截图和场景示例,帮助开发者快速掌握如何通过命令行和交互界面高效使用Claude Code进行项目开发,特别强调了CLAUDE.md文件作为项目知识库的核心作用。
13820 42
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
|
3天前
|
缓存 人工智能 自然语言处理
我对比了8个Claude API中转站,踩了不少坑,总结给你
本文是个人开发者耗时1周实测的8大Claude中转平台横向评测,聚焦Claude Code真实体验:以加权均价(¥/M token)、内部汇率、缓存支持、模型真实性及稳定性为核心指标。
|
10天前
|
人工智能 JavaScript Ubuntu
低成本搭建AIP自动化写作系统:Hermes保姆级使用教程,长文和逐步实操贴图
我带着怀疑的态度,深度使用了几天,聚焦微信公众号AIP自动化写作场景,写出来的几篇文章,几乎没有什么修改,至少合乎我本人的意愿,而且排版风格,也越来越完善,同样是起码过得了我自己这一关。 这个其实OpenClaw早可以实现了,但是目前我觉得最大的区别是,Hermes会自主总结提炼,并更新你的写作技能。 相信就冲这一点,就值得一试。 这篇帖子主要就Hermes部署使用,作一个非常详细的介绍,几乎一步一贴图。 关于Hermes,无论你赞成哪种声音,我希望都是你自己动手行动过,发自内心的选择!
2782 28
|
1月前
|
人工智能 JSON 机器人
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
本文带你零成本玩转OpenClaw:学生认证白嫖6个月阿里云服务器,手把手配置飞书机器人、接入免费/高性价比AI模型(NVIDIA/通义),并打造微信公众号“全自动分身”——实时抓热榜、AI选题拆解、一键发布草稿,5分钟完成热点→文章全流程!
45813 158
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
|
5天前
|
弹性计算 人工智能 自然语言处理
阿里云Qwen3.6全新开源,三步完成专有版部署!
Qwen3.6是阿里云全新MoE架构大模型系列,稀疏激活显著降低推理成本,兼顾顶尖性能与高性价比;支持多规格、FP8量化、原生Agent及100+语言,开箱即用。
|
8天前
|
人工智能 弹性计算 安全
Hermes Agent是什么?怎么部署?超详细实操教程
Hermes Agent 是 Nous Research 于2026年2月开源的自进化AI智能体,支持跨会话持久记忆、自动提炼可复用技能、多平台接入与200+模型切换,真正实现“越用越懂你”。MIT协议,部署灵活,隐私可控。
2115 4