NUS CS1101S:SICP JavaScript 描述:三、模块化、对象和状态(3)https://developer.aliyun.com/article/1427726
我们可以通过使用两个账户的序列化器来实现这一点,以序列化整个exchange
函数。为此,我们将安排访问账户的序列化器。请注意,我们故意打破了银行账户对象的模块化,通过消息传递来暴露序列化器。下面的make_account
版本与第 3.1.1 节中给出的原始版本相同,只是提供了一个序列化器来保护余额变量,并且通过消息传递导出了序列化器:
function make_account_and_serializer(balance) { function withdraw(amount) { if (balance > amount) { balance = balance - amount; return balance; } else { return "Insufficient funds"; } } function deposit(amount) { balance = balance + amount; return balance; } const balance_serializer = make_serializer(); return m => m === "withdraw" ? withdraw : m === "deposit" ? deposit : m === "balance" ? balance : m === "serializer" ? balance_serializer : error(m, "unknown request – make_account"); }
我们可以使用这个来进行序列化的存款和取款。然而,与我们之前的序列化账户不同,现在每个银行账户对象的用户都有责任显式地管理序列化,例如:
function deposit(account, amount) { const s = account("serializer"); const d = account("deposit"); s(d(amount)); }
以这种方式导出序列化器为我们提供了足够的灵活性来实现一个序列化的交换程序。我们只需使用两个账户的序列化器对原始的exchange
函数进行序列化:
function serialized_exchange(account1, account2) { const serializer1 = account1("serializer"); const serializer2 = account2("serializer"); serializer1(serializer2(exchange))(account1, account2); }
练习 3.43
假设三个账户的余额初始为 10、10、20 和 30,并且多个线程运行,交换账户中的余额。论证如果线程按顺序运行,在任意数量的并发交换之后,账户余额应该以某种顺序为30,并且多个线程运行,交换账户中的余额。论证如果线程按顺序运行,在任意数量的并发交换之后,账户余额应该以某种顺序为 10、20和20 和 30。绘制一个类似于图 3.29 中的时间图,以展示如果使用本节中账户交换程序的第一个版本,这个条件如何被违反。另外,论证即使使用这个exchange
程序,账户余额的总和也会被保留。绘制一个时间图,以展示如果我们没有对各个账户上的交易进行序列化,即使这个条件也会被违反。
练习 3.44
考虑从一个账户转账到另一个账户的问题。本·比特迪德尔声称,即使有多个人同时在多个账户之间转账,使用任何序列化存款和取款交易的账户机制,例如上文中的make_account
版本,也可以通过以下函数实现。
function transfer(from_account, to_account, amount) { from_account("withdraw")(amount); to_account("deposit")(amount); }
路易斯·里森纳声称这里存在问题,我们需要使用更复杂的方法,比如处理交换问题所需的方法。路易斯是对的吗?如果不是,转账问题和交换问题之间的本质区别是什么?(假设from_account
中的余额至少为amount
。)
练习 3.45
路易斯·里森纳认为我们的银行账户系统现在过于复杂和容易出错,因为存款和取款不再自动序列化。他建议make_account_and_serializer
应该导出序列化器(供serialized_exchange
等函数使用),而不是像make_account
一样使用它来序列化账户和存款。他建议重新定义账户如下:
function make_account_and_serializer(balance) { function withdraw(amount) { if (balance > amount) { balance = balance - amount; return balance; } else { return "Insufficient funds"; } } function deposit(amount) { balance = balance + amount; return balance; } const balance_serializer = make_serializer(); return m => m === "withdraw" ? balance_serializer(withdraw) : m === "deposit" ? balance_serializer(deposit) : m === "balance" ? balance : m === "serializer" ? balance_serializer : error(m, "unknown request – make_account"); }
然后存款的处理方式与原始的make_account
相同:
function deposit(account, amount) { account("deposit")(amount); }
解释路易斯的推理有什么问题。特别是考虑serialized_exchange
被调用时会发生什么。
实现序列化器
我们根据一种称为互斥体的更原始的同步机制来实现序列化器。互斥体是支持两种操作的对象——可以获取互斥体,也可以释放互斥体。一旦互斥体被获取,那么在互斥体被释放之前,该互斥体上的其他获取操作都无法进行。在我们的实现中,每个序列化器都有一个关联的互斥体。给定一个函数f
,序列化器返回一个函数,该函数获取互斥体,运行f
,然后释放互斥体。这确保了由序列化器产生的函数中只有一个可以同时运行,这正是我们需要保证的序列化属性。为了将序列化器应用于接受任意数量参数的函数,我们使用 JavaScript 的剩余参数和展开语法。参数args
前面的…
收集函数的任何调用中的所有参数(这里是全部参数)到一个向量数据结构中。在应用f(…args)
中args
前面的…
将args
的元素展开,使它们成为f
的单独参数。
function make_serializer() { const mutex = make_mutex(); return f => { function serialized_f(…args) { mutex("acquire"); const val = f(…args); mutex("release"); return val; } return serialized_f; }; }
互斥体是一个可变对象(这里我们将使用一个一元列表,称为单元格),它可以保存true
或false
的值。当值为false
时,互斥体可用于获取。当值为true
时,互斥体不可用,任何试图获取互斥体的线程都必须等待。
我们的互斥锁构造函数make_mutex
首先将单元内容初始化为假。要获取互斥锁,我们测试单元。如果互斥锁可用,我们将单元内容设置为真并继续。否则,我们在一个循环中等待,一遍又一遍地尝试获取,直到我们发现互斥锁可用。⁵⁰ 要释放互斥锁,我们将单元内容设置为假。
function make_mutex() { const cell = list(false); function the_mutex(m) { return m === "acquire" ? test_and_set(cell) ? the_mutex("acquire") // retry : true : m === "release" ? clear(cell) : error(m, "unknown request – mutex"); } return the_mutex; } function clear(cell) { set_head(cell, false); }
函数test_and_set
测试单元并返回测试结果。此外,如果测试结果为假,test_and_set
在返回假之前将单元内容设置为真。我们可以将这种行为表达为以下函数:
function test_and_set(cell) { if (head(cell)) { return true; } else { set_head(cell, true); return false; } }
然而,这种test_and_set
的实现并不足以满足要求。这里有一个关键的微妙之处,这是并发控制进入系统的基本地方:test_and_set
操作必须被原子化执行。也就是说,我们必须保证,一旦一个线程测试了单元并发现它为假,单元内容实际上会在任何其他线程测试单元之前被设置为真。如果我们不能做到这一点,那么互斥锁可能会以类似于图 3.29 中的银行账户失败的方式失败。(参见练习 3.46。)
test_and_set
的实际实现取决于我们的系统如何运行并发线程的细节。例如,我们可能正在使用时间片轮转机制在顺序处理器上执行并发线程,该机制循环遍历线程,允许每个线程在中断之前运行一小段时间。在这种情况下,test_and_set
可以通过在测试和设置期间禁用时间片轮转来工作。另外,多处理计算机提供了直接在硬件中支持原子操作的指令。⁵¹
练习 3.46
假设我们使用文本中所示的普通函数来实现test_and_set
,而不尝试使操作原子化。绘制一个类似于图 3.29 中的时序图,以演示互斥锁实现如何通过允许两个线程同时获取互斥锁而失败。
练习 3.47
信号量(大小为n
)是互斥锁的一种泛化。像互斥锁一样,信号量支持获取和释放操作,但它更一般,最多n
个线程可以同时获取它。尝试获取信号量的其他线程必须等待释放操作。给出信号量的实现
- a. 以互斥锁为条件
- b. 以原子
test_and_set
操作为条件。
死锁
现在我们已经看到了如何实现串行化器,我们可以看到即使在上面的serialized_exchange
函数中,账户交换仍然存在问题。假设 Peter 试图将a[1]
与a[2]
交换,同时 Paul 尝试将a[2]
与a[1]
交换。假设 Peter 的线程到达了进入保护a[1]
的串行化函数的点,就在那之后,Paul 的线程进入了保护a[2]
的串行化函数。现在 Peter 无法继续(进入保护a[2]
的串行化函数)直到 Paul 退出保护a[2]
的串行化函数。同样,Paul 在 Peter 退出保护a[1]
的串行化函数之前也无法继续。每个线程都永远被阻塞,等待另一个线程。这种情况被称为死锁。在提供对多个共享资源的并发访问的系统中,死锁总是一个危险。
在这种情况下避免死锁的一种方法是给每个账户分配一个唯一的标识号,并重写serialized_exchange
,使得一个线程总是尝试首先进入保护最低编号账户的函数。虽然这种方法对于交换问题效果很好,但还有其他需要更复杂的死锁避免技术的情况,或者根本无法避免死锁。(参见练习 3.48 和 3.49。)⁵²
练习 3.48
详细解释上述避免死锁的方法(即,账户编号,并且每个线程尝试先获取编号较小的账户)在交换问题中避免死锁的原因。重写serialized_exchange
以纳入这个想法。(您还需要修改make_account
,以便每个账户都带有一个可以通过发送适当消息访问的编号。)
练习 3.49
给出一个情景,说明上述避免死锁的机制不起作用的情况。(提示:在交换问题中,每个线程事先知道它将需要访问的账户。考虑一个情况,一个线程必须在知道它将需要访问哪些额外的共享资源之前获得对一些共享资源的访问。)
并发、时间和通信
我们已经看到,编写并发系统需要控制不同线程访问共享状态时事件的顺序,并且我们已经看到如何通过合理使用串行器来实现这种控制。但并发的问题并不仅仅如此,因为从根本上来看,“共享状态”并不总是清楚是什么意思。
诸如test_and_set
之类的机制要求线程在任意时间检查全局共享标志。这在现代高速处理器中实现起来是有问题且低效的,因为由于流水线和缓存内存等优化技术,内存的内容可能不会在每一时刻处于一致状态。因此,在一些多处理系统中,串行器范式正在被其他并发控制方法所取代。
共享状态的问题也出现在大型分布式系统中。例如,想象一个分布式银行系统,其中各个分行维护银行余额的本地值,并定期将其与其他分行维护的值进行比较。在这样的系统中,“账户余额”的价值在同步之后才会确定。如果 Peter 向他与 Paul 共同持有的账户存钱,我们应该在何时说账户余额已经改变——当本地分行的余额改变时,还是直到同步之后?如果 Paul 从不同的分行访问账户,对于银行系统来说应该放置什么合理的约束条件,以使行为“正确”?对于正确性来说,唯一重要的可能是 Peter 和 Paul 个别观察到的行为以及同步后账户的“状态”。关于“真实”账户余额或同步之间事件顺序的问题可能是无关或无意义的。
这里的基本现象是,同步不同的线程,建立共享状态,或对事件进行排序都需要线程之间的通信。实质上,并发控制中的任何时间概念都必须与通信紧密联系在一起。引人入胜的是,在相对论中也存在时间和通信之间的类似联系,光速(可以用来同步事件的最快信号)是一个将时间和空间联系起来的基本常数。我们在处理计算模型中的时间和状态时遇到的复杂性,实际上可能反映了物理宇宙的基本复杂性。
3.5 流
我们已经对作为建模工具的赋值有了很好的理解,也对赋值引发的复杂问题有了认识。现在是时候问问我们是否可以以不同的方式进行事情,以避免其中一些问题。在本节中,我们将探讨一种基于称为“流”的数据结构的状态建模的替代方法。正如我们将看到的,流可以减轻一些状态建模的复杂性。
让我们退一步,回顾一下这种复杂性的根源。为了模拟现实世界的现象,我们做出了一些看似合理的决定:我们用具有局部变量的计算对象来模拟具有局部状态的现实世界对象。我们将现实世界中的时间变化与计算机中的时间变化相对应。我们用模型对象的局部变量的赋值来实现计算机中模型对象状态的时间变化。
还有其他方法吗?我们能否避免将计算机中的时间与模拟世界中的时间相对应?我们必须使模型随时间变化以模拟变化中的世界现象吗?从数学函数的角度来思考这个问题。我们可以将数量x
的随时间变化描述为时间的函数x(t)
。如果我们一瞬间地专注于x
,我们会认为它是一个变化的数量。然而,如果我们专注于值的整个时间历史,我们并不强调变化——函数本身并不改变。⁵⁶
如果时间以离散步骤来衡量,那么我们可以将时间函数建模为(可能是无限的)序列。在本节中,我们将看到如何以代表被建模系统的时间历史的序列来建模变化。为了实现这一点,我们引入了称为流的新数据结构。从抽象的角度来看,流只是一个序列。然而,我们会发现,将流的直接实现作为列表(如 2.2.1 节中所示)并不能充分展现流处理的威力。作为替代,我们引入了延迟求值技术,这使我们能够将非常大(甚至是无限的)序列表示为流。
流处理让我们能够建模具有状态的系统,而无需使用赋值或可变数据。这对于理论和实践都有重要的影响,因为我们可以构建避免引入赋值固有缺点的模型。另一方面,流框架本身也带来了困难,以及哪种建模技术能够导致更模块化和更易维护的系统的问题仍然是开放的。
3.5.1 流是延迟列表
正如我们在 2.2.3 节中看到的,序列可以作为组合程序模块的标准接口。我们为操作序列制定了强大的抽象,如map
、filter
和accumulate
,以简洁而优雅的方式捕捉了各种操作。
不幸的是,如果我们将序列表示为列表,这种优雅是以计算效率严重不足的代价换来的,无论是在时间还是空间上。当我们将对序列的操作表示为列表的转换时,我们的程序必须在过程的每一步构造和复制数据结构(可能非常庞大)。
为了理解这一点,让我们比较两个计算区间内所有质数之和的程序。第一个程序是用标准的迭代风格编写的:⁵⁷
function sum_primes(a, b) { function iter(count, accum) { return count > b ? accum : is_prime(count) ? iter(count + 1, count + accum) : iter(count + 1, accum); } return iter(a, 0); }
第二个程序使用 2.2.3 节的序列操作执行相同的计算:
function sum_primes(a, b) { return accumulate((x, y) => x + y, 0, filter(is_prime, enumerate_interval(a, b))); }
在进行计算时,第一个程序只需要存储正在累积的总和。相比之下,第二个程序中的过滤器在enumerate_interval
构建完整的区间数字列表之前无法进行任何测试。过滤器生成另一个列表,然后传递给accumulate
,然后被折叠以形成总和。第一个程序不需要这样大的中间存储,我们可以将其视为逐步枚举区间,将每个质数添加到生成的总和中。
如果我们使用序列范例来计算从 10,000 到 1,000,000 的区间中第二个质数,那么使用列表的低效性就会变得非常明显,通过求值表达式
head(tail(filter(is_prime, enumerate_interval(10000, 1000000))));
这个表达式确实找到了第二个素数,但计算开销是过分的。我们构造了一个接近一百万的整数列表,通过测试每个元素的素性来过滤这个列表,然后忽略了几乎所有的结果。在更传统的编程风格中,我们会交错枚举和过滤,并在达到第二个素数时停止。
流是一个巧妙的想法,它允许我们使用序列操作而不会产生列表操作的成本。有了流,我们可以实现两全其美:我们可以优雅地将程序构建为序列操作,同时获得增量计算的效率。基本思想是只部分构造流,并将部分构造传递给消费流的程序。如果消费者尝试访问尚未构造的流的一部分,流将自动构造足够的自身来产生所需的部分,从而保持整个流存在的幻觉。换句话说,尽管我们将编写程序,就好像我们正在处理完整的序列,但我们的流实现被设计为自动透明地交错流的构造和使用。
为了实现这一点,我们将使用对构造流,流的第一项在对的头部。然而,我们不是将流的其余值放入对的尾部,而是在那里放置一个“承诺”,以计算其余部分(如果有的话)。如果我们有一个数据项h
和一个流t
,我们通过求值pair(h, () => t)
来构造一个流,其头部是h
,尾部是t
—流的尾部t
被包装在一个没有参数的函数中,因此其求值将被延迟。空流是null
,与空列表相同。
要访问非空流的第一个数据项,我们只需选择一对的head
,就像列表一样。但是要访问流的尾部,我们需要求值延迟的表达式。为了方便起见,我们定义
function stream_tail(stream) { return tail(stream)(); }
这选择了一对的尾部,并应用在那里找到的函数来获得流的下一对(或者如果流的尾部为空则为null
)—实际上,强制了对尾部的函数来实现其承诺。
我们可以制作和使用流,就像我们可以制作和使用列表一样,来表示按顺序排列的聚合数据。特别是,我们可以构建章节 2 中的列表操作的流模拟,例如list_ref
、map
和for_each
:⁵⁸
function stream_ref(s, n) { return n === 0 ? head(s) : stream_ref(stream_tail(s), n - 1); } function stream_map(f, s) { return is_null(s) ? null : pair(f(head(s)), () => stream_map(f, stream_tail(s))); } function stream_for_each(fun, s) { if (is_null(s)) { return true; } else { fun(head(s)); return stream_for_each(fun, stream_tail(s)); } }
stream_for_each
函数对于查看流是有用的。
function display_stream(s) { return stream_for_each(display, s); }
为了使流的实现自动透明地交错流的构造和使用,我们安排了流的尾部在被stream_tail
函数访问时进行求值,而不是在pair
构造流时进行求值。这种实现选择让人想起了我们在 2.1.2 节中讨论有理数时所看到的情况,那里我们看到我们可以选择实现有理数,使得分子和分母的约分在构造时或选择时进行。这两种有理数实现产生相同的数据抽象,但选择对效率有影响。流和普通列表之间存在类似的关系。作为数据抽象,流和列表是相同的。不同之处在于元素的求值时间。对于普通列表,head
和tail
都在构造时进行求值。对于流,tail
在选择时进行求值。
流的实际应用
为了看到这种数据结构的行为,让我们分析上面看到的“过分”的素数计算,以流的术语重新表述:
head(stream_tail(stream_filter( is_prime, stream_enumerate_interval(10000, 1000000))));
我们将看到它确实有效地工作。
我们首先使用参数 10,000 和 1,000,000 调用stream_enumerate_interval
函数。函数stream_enumerate_interval
是enumerate_interval
(2.2.3 节)的流模拟:
function stream_enumerate_interval(low, high) { return low > high ? null : pair(low, () => stream_enumerate_interval(low + 1, high)); }
因此,由stream_enumerate_interval
返回的结果,由pair
组成,是⁵⁹
pair(10000, () => stream_enumerate_interval(10001, 1000000));
也就是说,stream_enumerate_interval
返回一个表示为pair
的流,其head
为 10,000,tail
是一个承诺,如果需要的话会枚举更多的间隔。现在,使用filter
函数的流模拟对素数进行过滤。
function stream_filter(pred, stream) { return is_null(stream) ? null : pred(head(stream)) ? pair(head(stream), () => stream_filter(pred, stream_tail(stream))) : stream_filter(pred, stream_tail(stream)); }
函数stream_filter
测试流的head
(即 10,000)。由于这不是素数,stream_filter
检查其输入流的尾部。调用stream_tail
迫使延迟的stream_enumerate_interval
,现在返回
pair(10001, () => stream_enumerate_interval(10002, 1000000));
函数stream_filter
现在查看这个流的head
,10,001,看到这也不是素数,强制另一个stream_tail
,依此类推,直到stream_enumerate_interval
产生素数 10,007,然后根据其定义,stream_filter
返回
pair(head(stream), stream_filter(pred, stream_tail(stream)));
在这种情况下是
pair(10007, () => stream_filter( is_prime, pair(10008, () => stream_enumerate_interval(10009, 1000000))));
这个结果现在传递给了我们原始表达式中的stream_tail
。这迫使延迟的stream_filter
,进而不断迫使延迟的stream_enumerate_interval
,直到找到下一个素数,即 10,009。最后,结果传递给了我们原始表达式中的head
。
pair(10009, () => stream_filter( is_prime, pair(10010, () => stream_enumerate_interval(10011, 1000000))));
函数head
返回 10,009,计算完成。只有测试了必要数量的整数以确定素数,只有在必要时才枚举了间隔以提供素数过滤器。
一般来说,我们可以将延迟求值看作是“需求驱动”的编程,即流处理中的每个阶段只激活足够满足下一个阶段的部分。我们所做的是将计算中的实际事件顺序与函数的表面结构分离。我们编写函数,就好像流“一次性”存在一样,而实际上,计算是逐步进行的,就像传统的编程风格一样。
一个优化
当我们构造流对时,我们通过将这些表达式包装在函数中来延迟求值它们的尾部表达式。我们在需要时通过应用函数来强制执行它们的求值。
这个实现足以使流按照广告宣传的方式工作,但是在需要时我们将考虑一个重要的优化。在许多应用中,我们最终会多次强制执行相同的延迟对象。这可能导致涉及流的递归程序严重低效。(见练习 3.57。)解决方案是构建延迟对象,使得第一次强制执行时,它们存储计算的值。后续的强制执行将简单地返回存储的值,而不重复计算。换句话说,我们实现了流对的构造,作为类似于练习 3.27 中描述的记忆化函数的一种方式。实现这一点的一种方法是使用以下函数,它以一个函数(无参数)作为参数,并返回函数的记忆化版本。第一次运行记忆化函数时,它保存计算结果。在后续的求值中,它只是返回结果。⁶⁰
function memo(fun) { let already_run = false; let result = undefined; return () => { if (!already_run) { result = fun(); already_run = true; return result; } else { return result; } }; }
我们可以在构造流对时使用memo
。例如,而不是
function stream_map(f, s) { return is_null(s) ? null : pair(f(head(s)), () => stream_map(f, stream_tail(s))); }
我们可以定义一个优化的函数stream_map
如下:
function stream_map_optimized(f, s) { return is_null(s) ? null : pair(f(head(s)), memo(() => stream_map_optimized(f, stream_tail(s)))); }
练习 3.50
声明一个函数stream_map_2
,它接受一个二元函数和两个流作为参数,并返回一个流,其元素是将函数成对应用于参数流的相应元素的结果。
function stream_map_2(f, s1, s2) { … }
类似于stream_map_optimized
,通过修改stream_map_2
声明一个函数stream_map_2_optimized
,使结果流使用记忆化。
练习 3.51
请注意,我们的原始函数display
在显示后返回其参数。解释器在求值以下序列中的每个语句时打印什么?
let x = stream_map(display, stream_enumerate_interval(0, 10)); stream_ref(x, 5); stream_ref(x, 7);
如果使用stream_map_optimized
而不是stream_map
,解释器会打印什么?
let x = stream_map_optimized(display, stream_enumerate_interval(0, 10)); stream_ref(x, 5); stream_ref(x, 7);
练习 3.52
考虑以下语句序列
let sum = 0; function accum(x) { sum = x + sum; return sum; } const seq = stream_map(accum, stream_enumerate_interval(1, 20)); const y = stream_filter(is_even, seq); const z = stream_filter(x => x % 5 === 0, seq); stream_ref(y, 7); display_stream(z);
在上述每个语句被求值后,sum
的值是多少?求值stream_ref
和display_stream
表达式的打印响应是什么?如果我们在每个构造的流对的每个尾部应用了函数memo
,如上面的优化建议,这些响应会有所不同吗?请解释。
3.5.2 无限流
我们已经看到如何支持操作流的幻觉,即使在实际上,我们只计算我们需要访问的流的部分。我们可以利用这种技术来有效地表示序列作为流,即使序列非常长。更重要的是,我们可以使用流来表示无限长的序列。例如,考虑以下正整数流的定义:
function integers_starting_from(n) { return pair(n, () => integers_starting_from(n + 1)); } const integers = integers_starting_from(1);
这是有道理的,因为integers
将是一个对,其head
是 1,tail
是一个承诺去产生从 2 开始的整数。这是一个无限长的流,但在任何给定的时间,我们只能检查其中的有限部分。因此,我们的程序永远不会知道整个无限流不存在。
使用integers
,我们可以定义其他无限流,例如不能被 7 整除的整数流:
function is_divisible(x, y) { return x % y === 0; } const no_sevens = stream_filter(x => ! is_divisible(x, 7), integers);
然后我们可以通过访问该流的元素来找到不能被 7 整除的整数:
stream_ref(no_sevens, 100); 117
类似于integers
,我们可以定义斐波那契数的无限流:
function fibgen(a, b) { return pair(a, () => fibgen(b, a + b)); } const fibs = fibgen(0, 1);
常量fibs
是一个对的head
是 0,tail
是一个承诺去求值fibgen(1, 1)
。当我们求值这个延迟的fibgen(1, 1)
时,它将产生一个对,其head
是 1,tail
是一个承诺去求值fibgen(1, 2)
,依此类推。
要查看更激动人心的无限流,我们可以将no_sevens
示例推广到使用称为厄拉托斯特尼筛法的方法构造质数的无限流。我们从从 2 开始的整数开始,这是第一个质数。为了得到其余的质数,我们首先从其余的整数中过滤出 2 的倍数。这留下了一个以 3 开始的流,这是下一个质数。现在我们从这个流的其余部分中过滤出 3 的倍数。这留下了一个以 5 开始的流,这是下一个质数,依此类推。换句话说,我们通过筛选过程构造质数,描述如下:对流 S 进行筛选,形成一个流,其第一个元素是 S 的第一个元素,其余部分是通过从 S 的其余部分中过滤出 S 的第一个元素的所有倍数并进行筛选得到的。这个过程可以很容易地用流操作来描述:
function sieve(stream) { return pair(head(stream), () => sieve(stream_filter( x => ! is_divisible(x, head(stream)), stream_tail(stream)))); } const primes = sieve(integers_starting_from(2));
现在,要找到特定的质数,我们只需要询问:
stream_ref(primes, 50); 233
思考一下由sieve
建立的信号处理系统,如图 3.31 中的“亨德森图”所示。输入流馈入一个unpairer
,将流的第一个元素与其余部分分开。第一个元素用于构造一个可被整除的过滤器,通过该过滤器传递其余部分,并将过滤器的输出馈送到另一个筛子箱中。然后将原始的第一个元素与内部筛子的输出相连,形成输出流。因此,流不仅是无限的,信号处理器也是无限的,因为筛子中包含一个筛子。
图 3.31 将素数筛视为信号处理系统。每条实线代表正在传输的值流。从head
到pair
和filter
的虚线表示这是一个单个值,而不是一个流。
隐式定义流
上述integers
和fibs
流是通过指定显式计算流元素的“生成”函数来定义的。指定流的另一种方法是利用延迟求值来隐式定义流。例如,以下语句定义了流ones
为无限流的 1:
const ones = pair(1, () => ones);
这与递归函数的声明非常相似:ones
是一个head
为 1 且tail
是一个承诺来求值ones
的对,求值tail
再次给我们一个 1 和一个承诺来求值ones
,依此类推。
我们可以通过使用add_streams
等操作来操作流,从而做更有趣的事情,该操作产生两个给定流的逐元素和。⁶⁴
function add_streams(s1, s2) { return stream_map_2((x1, x2) => x1 + x2, s1, s2); }
现在我们可以如下定义整数:
const integers = pair(1, () => add_streams(ones, integers));
这定义了integers
为一个流,其第一个元素为 1,其余部分是ones
和integers
的和。因此,integers
的第二个元素是integers
的第一个元素加 1,或 2;integers
的第三个元素是integers
的第二个元素加 1,或 3;依此类推。这个定义之所以有效,是因为在任何时候,integers
流的足够部分已经生成,以便我们可以将其反馈到定义中以产生下一个整数。
我们可以以相同的方式定义斐波那契数:
const fibs = pair(0, () => pair(1, () => add_streams(stream_tail(fibs), fibs)));
这个定义表示fibs
是一个以 0 和 1 开头的流,这样流的其余部分可以通过将fibs
加到自身移位一个位置来生成:
1 1 2 3 5 8 13 21 ... = stream_tail(fibs) 0 1 1 2 3 5 8 13 ... = fibs 0 1 1 2 3 5 8 13 21 34 ... = fibs
函数scale_stream
在制定这种流定义时也很有用。这将流中的每个项目乘以给定的常数:
function scale_stream(stream, factor) { return stream_map(x => x * factor, stream); }
例如,
const double = pair(1, () => scale_stream(double, 2));
产生 2 的幂的流:1, 2, 4, 8, 16, 32, ....
可以通过从整数开始并通过测试素数性进行过滤来给出素数流的另一种定义。我们需要第一个素数 2 来开始:
const primes = pair(2, () => stream_filter(is_prime, integers_starting_from(3)));
这个定义并不像看起来那么简单,因为我们将测试一个数n
是否为素数,方法是检查是否可以被小于或等于的素数(而不是任意整数)整除:
function is_prime(n) { function iter(ps) { return square(head(ps)) > n ? true : is_divisible(n, head(ps)) ? false : iter(stream_tail(ps)); } return iter(primes); }
这是一个递归定义,因为primes
是根据is_prime
谓词定义的,而is_prime
谓词本身使用primes
流。这个函数之所以有效,是因为在任何时候,primes
流的足够部分已经生成,以便我们可以测试下一个需要检查的数的素数性。也就是说,对于每个n
,我们测试其是否为素数,要么n
不是素数(在这种情况下,已经生成了一个可以整除它的素数),要么n
是素数(在这种情况下,已经生成了一个素数,即小于n
的素数,大于的素数)。
练习 3.53
不运行程序的情况下,描述由以下定义的流的元素
const s = pair(1, () => add_streams(s, s));
练习 3.54
定义一个函数mul_streams
,类似于add_streams
,它产生其两个输入流的逐元素乘积。与integers
流一起使用,完成以下流的定义,其第n
个元素(从 0 开始计数)是n + 1
的阶乘:
const factorials = pair(1, () => mul_streams(〈??〉, 〈??〉));
练习 3.55
定义一个名为partial_sums
的函数,该函数以流S
作为参数,并返回其元素为S[0]
,S[0]
+ S[1]
,S[0]
+ S[1]
+ S[2]
,…的流。例如,partial_sums(integers)
应该是流1, 3, 6, 10, 15, ...
。
练习 3.56
一个著名的问题,首次由 R. Hamming 提出,是按升序枚举所有没有除了 2、3 或 5 之外的质因数的正整数,而且没有重复。一个明显的方法是简单地依次测试每个整数,看它是否有除 2、3 和 5 之外的因子。但这非常低效,因为随着整数变大,符合要求的整数越来越少。作为替代方案,让我们称所需的数字流为S
,并注意关于它的以下事实。
S
以 1 开始。scale_stream(S, 2)
的元素也是S
的元素。scale_stream(S, 3)
和scale_stream(S, 5)
也是如此。- 这些都是
S
的元素。
现在我们只需要从这些来源中组合元素。为此,我们定义一个函数merge
,它将两个有序流合并成一个有序结果流,消除重复项:
function merge(s1, s2) { if (is_null(s1)) { return s2; } else if (is_null(s2)) { return s1; } else { const s1head = head(s1); const s2head = head(s2); return s1head < s2head ? pair(s1head, () => merge(stream_tail(s1), s2)) : s1head > s2head ? pair(s2head, () => merge(s1, stream_tail(s2))) : pair(s1head, () => merge(stream_tail(s1), stream_tail(s2))); } }
然后,可以使用merge
构造所需的流,如下所示:
const S = pair(1, () => merge((??), (??)));
在上面标有(??)
的地方填写缺失的表达式。
练习 3.57
使用基于add_streams
函数的 fibs 声明计算第 n 个斐波那契数时执行了多少次加法?证明如果add_streams
使用练习 3.50 中描述的stream_map_2_optimized
函数,这个数字呈指数增长。
练习 3.58
给出函数计算的流的解释
function expand(num, den, radix) { return pair(math_trunc((num * radix) / den), () => expand((num * radix) % den, den, radix)); }
其中math_trunc
丢弃其参数的小数部分,即除法的余数。expand(1, 7, 10)
产生的连续元素是什么?expand(3, 8, 10)
产生什么?
练习 3.59
在 2.5.3 节中,我们看到如何实现多项式算术系统,将多项式表示为项的列表。类似地,我们可以处理幂级数,例如
表示为无限流。我们将级数a[0] + a[1]x + a[2]x² + a[3]x³ + ...
表示为其元素为系数a[0]
,a[1], a[2], a[3], ...
的流。
- a. 级数
a[0] + a[1]x + a[2]x² + a[3]x³ + ...
的积分是级数
c + a[0]x + 1/2 a[1]x² + 1/3 a[2]x³ + 1/4 a[3]x⁴ + ...
- 定义一个函数
integrate_series
,它以流a[0], a[1], a[2], ...
作为输入,表示幂级数,并返回非常数项积分的系数流a[0], 1/2 a[1], 1/3 a[2], ...
。(由于结果没有常数项,它不表示幂级数;当我们使用integrate_series
时,我们将使用pair
将适当的常数添加到流的开头。) - b. 函数
x -> eˣ
是它自己的导数。这意味着eˣ
和eˣ
的积分是相同的级数,除了常数项,它是e⁰= 1
。因此,我们可以生成eˣ
的级数为
const exp_series = pair(1, () => integrate_series(exp_series));
- 展示如何从正弦的导数是余弦和余弦的导数是负正弦这两个事实开始生成正弦和余弦的级数:
const cosine_series = pair(1, 〈??〉); const sine_series = pair(0, 〈??〉);
练习 3.60
使用练习 3.59 中级数表示为系数流的方式,通过add-streams
实现级数相加。完成以下函数的声明以实现级数相乘:
function mul_series(s1, s2) { pair(〈??〉, () => add_streams(〈??〉, 〈??〉)); }
您可以通过验证sin²x + cos²x = 1
,使用练习 3.59 中的级数来测试您的函数。
练习 3.61
设S
是一个幂级数(练习 3.59),其常数项为 1。假设我们想找到幂级数1/S
,即级数X
,使得S X = 1
。将S
写为1 + S[R]
,其中S[R]
是常数项之后的S
的部分。然后我们可以按如下方式解出X
:
S · X = 1 (1 + S[R]) · X = 1 X + S[R] · X = 1 X = 1 – S[R] · X
换句话说,X
是常数项为 1 的幂级数,其高阶项由S[R]
的负数乘以X
给出。使用这个想法编写一个名为invert_unit_series
的函数,该函数计算常数项为 1 的幂级数S
的1/S
。您需要使用练习 3.60 中的mul_series
。
练习 3.62
使用练习 3.60 和 3.61 的结果定义一个名为div_series
的函数,该函数可以将两个幂级数相除。div_series
函数应适用于任何两个级数,只要分母级数以非零常数项开头。(如果分母有零常数项,则div_series
应发出错误信号。)展示如何使用div_series
与练习 3.59 的结果一起生成正切的幂级数。
3.5.3 利用流范式
具有延迟求值的流可以是一个强大的建模工具,提供了许多局部状态和赋值的好处。此外,它们避免了引入赋值到编程语言中时伴随的一些理论上的纠缠。
流方法可以提供启发,因为它允许我们构建具有不同模块边界的系统,而不是围绕对状态变量的赋值组织的系统。例如,我们可以将整个时间序列(或信号)视为关注的焦点,而不是单个时刻的状态变量的值。这使得方便地组合和比较来自不同时刻的状态组件。
将迭代公式表述为流过程
在 1.2.1 节中,我们介绍了迭代过程,通过更新状态变量进行。我们现在知道,我们可以将状态表示为“无时间”的值流,而不是一组要更新的变量。让我们在重新访问 1.1.7 节中的求平方根函数时采用这种观点。回想一下,这个想法是通过反复应用改进猜测的函数来生成越来越好的x
的平方根的序列:
function sqrt_improve(guess, x) { return average(guess, x / guess); }
在我们原始的sqrt
函数中,我们让这些猜测成为状态变量的连续值。相反,我们可以生成无限的猜测流,从初始猜测值 1 开始:
function sqrt_stream(x) { return pair(1, () => stream_map(guess => sqrt_improve(guess, x), sqrt_stream(x))); } display_stream(sqrt_stream(2)); `1` 1.5 1.4166666666666665 1.4142156862745097 1.4142135623746899 ...
我们可以生成越来越多的流项,以获得越来越好的猜测。如果愿意,我们可以编写一个函数,直到答案足够好为止一直生成项。(参见练习 3.64。)
我们可以以相同的方式处理的另一个迭代是基于我们在 1.3.1 节中看到的交替级数生成π
的近似值:
首先生成级数的和项流(奇整数的倒数,交替符号)。然后我们取越来越多项的和的流(使用练习 3.55 的partial_sums
函数)并将结果缩放 4 倍:
function pi_summands(n) { return pair(1 / n, () => stream_map(x => - x, pi_summands(n + 2))); } const pi_stream = scale_stream(partial_sums(pi_summands(1)), 4); display_stream(pi_stream); `4` 2.666666666666667 3.466666666666667 2.8952380952380956 3.3396825396825403 2.9760461760461765 3.2837384837384844 3.017071817071818 ...
这给我们提供了一个越来越好的π
的近似流,尽管这些近似值收敛得相当慢。序列的八个项将π
的值限制在 3.284 和 3.017 之间。
到目前为止,我们对状态流方法的使用与更新状态变量并没有太大不同。但是流使我们有机会做一些有趣的技巧。例如,我们可以使用序列加速器转换流,将近似值序列转换为收敛到与原始值相同的新序列,只是更快。
其中一种加速器,由十八世纪瑞士数学家 Leonhard Euler 提出,对于偏和交错级数的序列效果很好(交错符号的项的级数)。在欧拉的技术中,如果S[n]
是原始和序列的第n
项,则加速的序列具有项
因此,如果原始序列表示为值的流,则变换后的序列由
function euler_transform(s) { const s0 = stream_ref(s, 0); // S[n][–1] const s1 = stream_ref(s, 1); // S[n] const s2 = stream_ref(s, 2); // S[n][+1] return pair(s2 - square(s2 - s1) / (s0 + (-2) * s1 + s2), memo(() => euler_transform(stream_tail(s)))); }
请注意,我们利用了第 3.5.1 节的记忆化优化,因为在接下来的内容中,我们将依赖于对生成的流的重复求值。
我们可以用我们对π
的逼近序列来演示欧拉加速:
display_stream(euler_transform(pi_stream)); 3.166666666666667 3.1333333333333337 3.1452380952380956 3.13968253968254 3.1427128427128435 3.1408813408813416 3.142071817071818 3.1412548236077655 ...
更好的是,我们可以加速加速的序列,然后递归加速,依此类推。也就是说,我们创建了一个流的流(我们将其称为表格的结构),其中每个流都是前一个流的变换:
function make_tableau(transform, s) { return pair(s, () => make_tableau(transform, transform(s))); }
表格的形式
s[00] s[01] s[02] s[03] s[04] ... s[10] s[11] s[12] s[13] ... s[20] s[21] s[22] ... ...
最后,我们通过取表格的每一行的第一个项来形成一个序列:
function accelerated_sequence(transform, s) { return stream_map(head, make_tableau(transform, s)); }
我们可以演示这种“超加速”π
序列:
display_stream(accelerated_sequence(euler_transform, pi_stream)); `4` 3.166666666666667 3.142105263157895 3.141599357319005 3.1415927140337785 3.1415926539752927 3.1415926535911765 3.141592653589778 ...
结果令人印象深刻。取序列的八个项可以得到π
的正确值,精确到小数点后 14 位。如果我们只使用原始的π
序列,我们需要计算大约10¹³
个项(即,扩展系列直到单个项小于10^(–13)
)才能获得这么高的精度!
我们本可以在不使用流的情况下实现这些加速技术。但是流的表述特别优雅和方便,因为整个状态序列作为数据结构对我们可用,并且可以使用统一的一组操作进行操作。
练习 3.63
Louis Reasoner 对sqrt_stream
函数生成的流的性能不满意,并尝试使用记忆化来优化它:
function sqrt_stream_optimized(x) { return pair(1, memo(() => stream_map(guess => sqrt_improve(guess, x), sqrt_stream_optimized(x)))); }
Alyssa P. Hacker 提出
function sqrt_stream_optimized_2(x) { const guesses = pair(1, memo(() => stream_map(guess => sqrt_improve(guess, x), guesses))); return guesses; }
并声称 Louis 的版本比她的要低效得多,因为它执行了冗余计算。解释 Alyssa 的答案。Alyssa 的方法如果没有记忆化,是否比原始的sqrt_stream
更有效?
练习 3.64
编写一个名为stream_limit
的函数,它接受一个流和一个数字(容差)作为参数。它应该检查流,直到找到两个连续的元素,它们的绝对值之差小于容差,并返回这两个元素中的第二个。使用这个函数,我们可以通过
function sqrt(x, tolerance) { return stream_limit(sqrt_stream(x), tolerance); }
练习 3.65
使用级数
计算三个逼近自然对数 2 的序列,方式与我们上面对π
所做的方式相同。这些序列收敛得有多快?
无限流的对
在第 2.2.3 节中,我们看到序列范式如何处理传统的嵌套循环,作为对成对序列定义的过程。如果我们将这种技术推广到无限流,那么我们可以编写不容易表示为循环的程序,因为“循环”必须在无限集合上进行。
例如,假设我们想要将第 2.2.3 节的prime_sum_pairs
函数推广为生成所有整数(i, j)
对的流,其中i ≤ j
,使得i + j
是素数。如果int_pairs
是所有整数(i, j)
对的序列,其中i ≤ j
,那么我们所需的流就是简单的⁶⁷
stream_filter(pair => is_prime(head(pair) + head(tail(pair))), int_pairs);
因此,我们的问题是生成int_pairs
流。更一般地说,假设我们有两个流S = (S[i])
和T = (T[j])
,并想象一个无限的矩形数组
(S[0], T[0]) (S[0], T[1]) (S[0], T[2]) ... (S[1], T[0]) (S[1], T[1]) (S[1], T[2]) ... (S[2], T[0]) (S[2], T[1]) (S[2], T[2]) ... ...
我们希望生成一个包含数组中所有位于对角线上方或对角线上的成对的流,即成对
(S[0], T[0]) (S[0], T[1]) (S[0], T[2]) ... (S[1], T[1]) (S[1], T[2]) ... (S[2], T[2]) ... ...
(如果我们将S
和T
都作为整数流,那么这将是我们期望的流int_pairs
。)
将成对的一般流称为pairs(S, T)
,并将其视为由三部分组成:对(S[0]
, T[0]
),第一行中其余的对,以及剩余的对。
(S[0], T[0]) (S[0], T[1]) (S[0], T[2]) ... (S[1], T[1]) (S[1], T[2]) ... (S[2], T[2]) ... ...
观察到这种分解中的第三部分(不在第一行中的对)是(递归地)由stream_tail(S)
和stream_tail(T)
形成的对。还要注意第二部分(第一行的其余部分)是
stream_map(x => list(head(s), x), stream_tail(t));
因此,我们可以按以下方式形成我们的成对流:
function pairs(s, t) { return pair(list(head(s), head(t)), () => combine-in-some-way( stream_map(x => list(head(s), x), stream_tail(t)), pairs(stream_tail(s), stream_tail(t)))); }
为了完成函数,我们必须选择一种组合两个内部流的方法。一个想法是使用第 2.2.1 节中的append
函数的流模拟:
function stream_append(s1, s2) { return is_null(s1) ? s2 : pair(head(s1), () => stream_append(stream_tail(s1), s2)); }
然而,这对于无限流来说是不合适的,因为它在合并第二个流之前从第一个流中取出所有元素。特别是,如果我们尝试使用以下方式生成所有正整数的成对:
pairs(integers, integers);
我们的结果流将首先尝试运行所有第一个整数等于 1 的对,因此永远不会产生任何其他第一个整数值的对。
为了处理无限流,我们需要设计一种组合顺序,以确保如果我们让程序运行足够长的时间,每个元素最终都会被访问到。实现这一点的一种优雅方法是使用以下interleave
函数:
function interleave(s1, s2) { return is_null(s1) ? s2 : pair(head(s1), () => interleave(s2, stream_tail(s1))); }
由于interleave
从两个流中交替获取元素,因此第二个流的每个元素最终都会进入交错流中,即使第一个流是无限的。
因此,我们可以生成所需的成对流如下:
function pairs(s, t) { return pair(list(head(s), head(t)), () => interleave(stream_map(x => list(head(s), x), stream_tail(t)), pairs(stream_tail(s), stream_tail(t)))); }
练习 3.66
检查流pairs(integers, integers)
。您能对成对放入流中的顺序做出一般性评论吗?例如,大约有多少对在(1,100)之前?对(99,100)之前?对(100,100)之前?(如果您能在这里做出精确的数学陈述,那就更好了。但是,如果您发现自己陷入困境,请随时给出更多的定性答案。)
练习 3.67
修改pairs
函数,使得pairs(integers, integers)
将生成所有整数对(i, j)
的流(不带条件i ≤ j
)。提示:您需要混合另一个流。
练习 3.68
Louis Reasoner 认为从三个部分构建成对流是不必要复杂的。他建议不将对(S[0]
, T[0]
)与第一行中其余的对分开,而是建议使用整个第一行,如下所示:
function pairs(s, t) { return interleave(stream_map(x => list(head(s), x), t), pair(stream_tail(s), stream_tail(t))); }
这样行得通吗?考虑一下,如果我们使用 Louis 对pairs
的定义来求值pairs(integers, integers)
会发生什么。
练习 3.69
编写一个名为triples
的函数,该函数接受三个无限流S
、T
和U
,并生成三元组(S[i], T[j], U[k])
的流,其中i ≤ j ≤ k
。使用triples
生成所有正整数的勾股三元组的流,即三元组(i, j, k)
,使得i ≤ j
且i² + j² = k²
。
练习 3.70
生成流时,以某种有用的顺序出现的整数对会更好,而不是通过特设的交错过程得到的顺序。如果我们定义一种方法来表明一个整数对“小于”另一个整数对,我们可以使用类似于练习 3.56 的merge
函数的技术。这样做的一种方法是定义一个“加权函数”W(i, j)
,并规定如果W(i[1], j[1]) < W(i[2], j[2])
,则(i[1], j[1])
小于(i[2], j[2])
。编写一个名为merge_weighted
的函数,它类似于merge
,但merge_weighted
接受一个额外的参数weight
,这是一个计算一对整数的权重的函数,并用于确定结果合并流中元素应该出现的顺序。使用这个方法,将pairs
推广为一个名为weighted_pairs
的函数,它接受两个流,以及一个计算加权函数的函数,并生成整数对的流,根据权重排序。使用你的函数生成
- a. 所有正整数对
(i, j)
的流,其中i ≤ j
,根据和i + j
进行排序 - b. 所有正整数对
(i, j)
的流,其中i ≤ j
,i
和j
都不能被 2、3 或 5 整除,并且这些对根据和2i + 3j + 5ij
进行排序。
练习 3.71
有时称为拉马努金数的数字可以用两种以上的方式表示为两个立方数的和,以纪念数学家斯里尼瓦萨·拉马努金。有序的整数对流为计算这些数字提供了一种优雅的解决方案。要找到一个可以用两种不同方式写成两个立方数的和的数字,我们只需要生成根据和i³ + j³
(参见练习 3.70)加权的整数对流,然后在流中搜索具有相同权重的两个连续整数对。编写一个函数来生成拉马努金数。第一个这样的数字是 1,729。接下来的五个是什么?
练习 3.72
类似于练习 3.71,生成一个流,其中包含所有可以用三种不同方式写成两个平方和的数字(显示它们可以这样写成的方式)。
流作为信号
我们通过将流描述为信号处理系统中的“信号”的计算模拟来开始我们对流的讨论。实际上,我们可以直接使用流来模拟信号处理系统,将信号在连续时间间隔的值表示为流的连续元素。例如,我们可以实现一个积分器或求和器,对于输入流x = (x[i])
,初始值C
和小增量dt
,累积和。
并返回值流S = (S[i])
。以下的integral
函数类似于整数流的“隐式样式”定义(第 3.5.2 节):
function integral(integrand, initial_value, dt) { const integ = pair(initial_value, () => add_streams(scale_stream(integrand, dt), integ)); return integ; }
图 3.32 是一个与integral
函数对应的信号处理系统的图片。输入流通过dt
进行缩放,并通过加法器,其输出再次通过相同的加法器传递。integ
的定义中的自引用在图中通过将加法器的输出连接到其中一个输入的反馈环中得到反映。
图 3.32 integral
函数视为信号处理系统。
练习 3.73
我们可以使用流来模拟电路,以表示一系列时间点上的电流或电压值。例如,假设我们有一个由电阻R
和电容C
串联组成的 RC 电路。电路对注入电流i
的电压响应v
由图 3.33 中的公式确定,其结构由附带的信号流图所示。
图 3.33 一个 RC 电路和相关的信号流图。
编写一个模拟这个电路的函数RC
。RC
应该以R
、C
和dt
的值作为输入,并应该返回一个函数,该函数以表示当前i
的流和电容器电压v[0]
的初始值作为输入,并产生电压v
的流作为输出。例如,您应该能够通过求值const RC1 = RC(5, 1, 0.5)
来使用RC
来模拟一个R=5
欧姆、C=1
法拉和 0.5 秒时间步长的 RC 电路。这将定义RC1
作为一个函数,它接受表示电流时间序列的流和初始电容器电压,并产生电压的输出流。
练习 3.74
艾莉莎·P·黑客正在设计一个系统,用于处理来自物理传感器的信号。她希望产生的一个重要特性是描述输入信号的零交叉的信号。也就是说,结果信号应该在输入信号从负变为正时为+1,在输入信号从正变为负时为-1,否则为 0。(假设 0 输入的符号为正。)例如,具有其相关零交叉信号的典型输入信号可能是
在艾莉莎的系统中,传感器的信号表示为一个流sense_data
,而流zero_crossings
是相应的零交叉流。艾莉莎首先编写了一个名为sign_change_detector
的函数,该函数将两个值作为参数并比较这些值的符号以产生适当的 0、1 或-1。然后她按照以下方式构造了她的零交叉流:
function make_zero_crossings(input_stream, last_value) { return pair(sign_change_detector(head(input_stream), last_value), () => make_zero_crossings(stream_tail(input_stream), head(input_stream))); } const zero_crossings = make_zero_crossings(sense_data, 0);
艾莉莎的老板伊娃·卢·阿特走过来,建议这个程序大致等同于以下使用练习 3.50 中的stream_map_2
函数的程序:
const zero_crossings = stream_map_2(sign_change_detector, sense_data, expression);
通过提供指定的expression
来完成程序。
练习 3.75
很遗憾,艾莉莎在练习 3.74 中的零交叉检测器证明是不够的,因为传感器的嘈杂信号导致了虚假的零交叉。硬件专家莱姆·E·特维基建议艾莉莎在提取零交叉之前平滑信号以滤除噪音。艾莉莎接受了他的建议,并决定从通过将感应数据的每个值与前一个值进行平均构造的信号中提取零交叉。她向助手路易斯·里森纳解释了问题,后者试图实施这个想法,修改了艾莉莎的程序如下:
function make_zero_crossings(input_stream, last_value) { const avpt = (head(input_stream) + last_value) / 2; return pair(sign_change_detector(avpt, last_value), () => make_zero_crossings(stream_tail(input_stream), avpt)); }
这并没有正确实现艾莉莎的计划。找到路易斯安装的错误并修复它,而不改变程序的结构。(提示:您需要增加make_zero_crossings
的参数数量。)
练习 3.76
伊娃·卢·阿特对路易斯在练习 3.75 中的方法提出了批评。他写的程序不是模块化的,因为它混合了平滑操作和零交叉提取。例如,如果艾莉莎找到了更好的方法来调节她的输入信号,提取器就不应该被改变。通过编写一个名为smooth
的函数来帮助路易斯,该函数以流作为输入并产生一个流,其中每个元素都是两个连续输入流元素的平均值。然后使用smooth
作为组件以更模块化的方式实现零交叉检测器。
3.5.4 流和延迟求值
在前一节的最后,integral
函数展示了我们如何使用流来模拟包含反馈环的信号处理系统。图 3.32 中所示的加法器的反馈环是通过integral
的内部流integ
是根据自身定义的事实来建模的:
const integ = pair(initial_value, () => add_streams(scale_stream(integrand, dt), integ));
解释器处理这种隐式定义的能力取决于将对add_streams
的调用包装在 lambda 表达式中所产生的延迟。没有这种延迟,解释器无法在求值对add_streams
的调用之前构造integ
,这将要求integ
已经被定义。一般来说,这种延迟对于使用流来模拟包含循环的信号处理系统至关重要。没有延迟,我们的模型必须被制定为信号处理组件的任何输入在输出产生之前必须被完全求值。这将禁止循环。
很不幸,带有循环的系统的流模型可能需要超出迄今为止所见的流编程模式的延迟。例如,图 3.34 显示了一个信号处理系统,用于解决微分方程dy/dt = f(y)
,其中f
是一个给定的函数。图中显示了一个映射组件,它将f
应用于其输入信号,并以一种非常类似于实际用于解决这类方程的模拟计算机电路的反馈环路连接到积分器。
图 3.34 一个解方程dy/dt = f(y)
的“模拟计算机电路”。
假设我们对y
有一个初始值y[0]
,我们可以尝试使用以下函数来模拟这个系统
function solve(f, y0, dt) { const y = integral(dy, y0, dt); const dy = stream_map(f, y); return y; }
这个函数不起作用,因为在solve
的第一行中,对integral
的调用要求定义输入dy
,而这直到solve
的第二行才发生。
另一方面,我们的定义意图是有意义的,因为原则上我们可以开始生成y
流而不知道dy
。实际上,integral
和许多其他流操作可以在只有关于参数的部分信息时生成部分答案。对于integral
,输出流的第一个元素是指定的initial_value
。因此,我们可以在不求值被积函数dy
的情况下生成输出流的第一个元素。一旦我们知道y
的第一个元素,solve
的第二行中的stream_map
就可以开始工作来生成dy
的第一个元素,这将产生y
的下一个元素,依此类推。
为了利用这个想法,我们将重新定义integral
,以期望积分流作为延迟参数。函数integral
将强制积分在需要生成输出流的第一个元素时才被求值:
function integral(delayed_integrand, initial_value, dt) { const integ = pair(initial_value, () => { const integrand = delayed_integrand(); return add_streams(scale_stream(integrand, dt), integ); }); return integ; }
现在我们可以通过延迟y
的声明中dy
的求值来实现我们的solve
函数:
function solve(f, y0, dt) { const y = integral(() => dy, y0, dt); const dy = stream_map(f, y); return y; }
一般来说,integral
的每个调用者现在都必须延迟被积函数的参数。我们可以通过计算微分方程dy/dt = y
的解在y = 1
处的值来证明solve
函数的工作:
stream_ref(solve(y => y, 1, 0.001), 1000); 2.716923932235896
练习 3.77
上面使用的integral
函数类似于第 3.5.2 节中整数无限流的“隐式”定义。或者,我们可以给出更像integers-starting-from
(也在第 3.5.2 节中)的integral
的定义:
function integral(integrand, initial_value, dt) { return pair(initial_value, is_null(integrand) ? null : integral(stream_tail(integrand), dt * head(integrand) + initial_value, dt)); }
在带有循环的系统中使用时,这个函数与我们原始版本的integral
一样存在问题。修改函数,以便它期望integrand
作为延迟参数,因此可以在上面显示的solve
函数中使用。
练习 3.78
考虑设计一个信号处理系统来研究齐次二阶线性微分方程
输出流,对y
进行建模,是由一个包含循环的网络生成的。这是因为d²y/dt²
的值取决于y
和dy/dt
的值,而这两者都是通过对d²y/dt²
进行积分来确定的。我们想要编码的图表如图 3.35 所示。编写一个名为solve_2nd
的函数,该函数以常数a
、b
和dt
以及y
和dy/dt
的初始值y[0]
和dy[0]
作为参数,并生成y
的连续值流。
图 3.35 用于解决二阶线性微分方程的信号流图。
练习 3.79
将练习 3.78 的solve_2nd
函数泛化,以便用于解决一般的二阶微分方程d²y/dt² = f (dy/dt, y)
。
练习 3.80
串联 RLC 电路由一个电阻、一个电容和一个电感器串联而成,如图 3.36 所示。如果R
、L
和C
分别是电阻、电感和电容,那么这三个元件的电压(v
)和电流(i
)之间的关系由以下方程描述
电路连接规定了关系
i[R]
= i[L]
= –i[C]
v[C]
= v[L]
+ v[R]
结合这些方程表明电路的状态(由电容器两端的电压v[C]
和电感器中的电流i[L]
总结)由一对微分方程描述
表示这个微分方程系统的信号流图如图 3.37 所示。
图 3.36 一个串联 RLC 电路。
图 3.37 一个用于解决串联 RLC 电路的信号流图。
编写一个名为RLC
的函数,该函数以电路的参数R
、L
和C
以及时间增量dt
作为参数。类似于练习 3.73 中的RC
函数,RLC
应该生成一个函数,该函数接受状态变量的初始值,5v[C0]
和i[L0]
,并生成状态v[C]
和i[L]
的流的一对(使用pair
)。使用RLC
,生成一对流,模拟具有R = 1
欧姆、C = 0.2
法拉、L = 1
亨利、dt = 0.1
秒和初始值i[L0] = 0
安培和v[C0] = 10
伏特的串联 RLC 电路的行为。
正常顺序求值
本节中的示例说明了延迟求值如何提供很大的编程灵活性,但这些示例也表明了这如何使我们的程序变得更加复杂。例如,我们的新integral
函数赋予我们建模具有循环的系统的能力,但现在我们必须记住应该使用延迟的被积函数来调用integral
,并且使用integral
的每个函数都必须意识到这一点。实际上,我们创建了两类函数:普通函数和接受延迟参数的函数。通常情况下,创建不同类别的函数会迫使我们创建不同类别的高阶函数。⁷³
避免需要两种不同类别的函数的一种方法是使所有函数都采用延迟参数。我们可以采用一种求值模型,其中所有函数的参数都自动延迟,并且只有在实际需要时才强制参数(例如,当原始操作需要时)。这将使我们的语言转换为使用正则序求值,我们在 1.1.5 节介绍求值替换模型时首次描述了这一点。转换为正则序求值提供了一种统一而优雅的方式来简化延迟求值的使用,如果我们只关注流处理,这将是一种自然的策略。在 4.2 节中,我们在研究了求值器之后,将看到如何以这种方式转换我们的语言。不幸的是,在函数调用中包含延迟会破坏我们设计依赖事件顺序的程序的能力,例如使用赋值、改变数据或执行输入或输出的程序。即使在一对的尾部延迟也会造成很大的混乱,正如练习 3.51 和 3.52 所示。据人所知,可变性和延迟求值在编程语言中并不相容。
3.5.5 函数式程序的模块化和对象的模块化
正如我们在 3.1.2 节中看到的,引入赋值的主要好处之一是,我们可以通过将大系统的部分状态封装或“隐藏”在局部变量中来增加系统的模块化。流模型可以在不使用赋值的情况下提供等效的模块化。举例来说,我们可以从流处理的角度重新实现我们在 3.1.2 节中研究的π
的蒙特卡洛估计。
关键的模块化问题是,我们希望隐藏随机数生成器的内部状态,不让使用随机数的程序知道。我们从一个名为rand_update
的函数开始,它的连续值提供了我们的随机数供应,并用它来生成一个随机数生成器:
function make_rand() { let x = random_init; return () => { x = rand_update(x); return x; }; } const rand = make_rand();
在流的表述中,没有随机数生成器per se,只是通过连续调用rand_update
产生的随机数流:
const random_numbers = pair(random_init, () => stream_map(rand_update, random_numbers));
我们用这个来构建在random_numbers
流中对连续对执行的 Cesàro 实验结果的流:
function map_successive_pairs(f, s) { return pair(f(head(s), head(stream_tail(s))), () => map_successive_pairs( f, stream_tail(stream_tail(s)))); } const dirichlet_stream = map_successive_pairs((r1, r2) => gcd(r1, r2) === 1, random_numbers);
现在dirichlet_stream
被输入到monte_carlo
函数中,它产生一个概率估计的流。然后将结果转换为π
的估计流。这个程序的版本不需要一个告诉要执行多少次试验的参数。通过查看pi
流的更远处,可以获得更好的π
估计(通过进行更多的实验):
function monte_carlo(experiment_stream, passed, failed) { function next(passed, failed) { return pair(passed / (passed + failed), () => monte_carlo(stream_tail(experiment_stream), passed, failed)); } return head(experiment_stream) ? next(passed + 1, failed) : next(passed, failed + 1); } const pi = stream_map(p => math_sqrt(6 / p), monte_carlo(dirichlet_stream, 0, 0));
这种方法具有相当的模块化,因为我们仍然可以制定一个通用的monte_carlo
函数,可以处理任意的实验。但是没有赋值或局部状态。
练习 3.81
练习 3.6 讨论了将随机数生成器泛化,以允许重新设置随机数序列,从而产生可重复的“随机”数列。以此相同生成器的流形式进行一个流的表述,它在一个请求输入流上操作,请求是"generate"
一个新的随机数或"reset"
序列到指定值,并产生所需的随机数流。在你的解决方案中不要使用赋值。
练习 3.82
重新进行练习 3.5,使用流的术语进行蒙特卡洛积分。estimate_integral
的流版本不会有一个告诉要执行多少次试验的参数。相反,它将产生基于越来越多试验的估计流。
时间的函数式编程视图
现在让我们回到本章开头提出的对象和状态问题,并从一个新的角度来审视它们。我们引入了赋值和可变对象,以提供一种模块化构建具有状态的系统的程序的机制。我们使用本地状态变量构建了计算对象,并使用赋值来修改这些变量。我们通过相应的计算对象的时间行为来模拟对象在世界中的时间行为。
现在我们已经看到,流提供了一种用本地状态模拟对象的替代方式。我们可以使用表示连续状态的时间历史的流来模拟变化的数量,例如某个对象的本地状态。实质上,我们使用流明确表示时间,这样我们就可以将模拟世界中的时间与求值过程中发生的事件序列分离开来。实际上,由于延迟求值的存在,模型中模拟的时间与求值过程中事件的顺序可能几乎没有关系。
为了对比这两种建模方法,让我们重新考虑一下监视银行账户余额的“取款处理器”的实现。在 3.1.3 节中,我们实现了一个简化版本的这样一个处理器:
function make_simplified_withdraw(balance) { return amount => { balance = balance - amount; return balance; }; }
对make_simplified_withdraw
的调用会产生计算对象,每个对象都有一个名为balance
的本地状态变量,该变量会随着对对象的连续调用而递减。该对象接受一个amount
作为参数,并返回新的余额。我们可以想象银行账户的用户输入一系列输入到该对象中,并观察显示屏上显示的返回值序列。
或者,我们可以将取款处理器建模为一个函数,该函数以余额和要取款的金额流作为输入,并产生账户中连续余额的流:
function stream_withdraw(balance, amount_stream) { return pair(balance, () => stream_withdraw(balance - head(amount_stream), stream_tail(amount_stream))); }
函数stream_withdraw
实现了一个明确定义的数学函数,其输出完全由其输入确定。然而,假设输入amount_stream
是用户输入的连续值的流,而结果余额的流被显示。那么,从输入值和观察结果的用户的角度来看,流程过程与make_simplified_withdraw
创建的对象具有相同的行为。然而,使用流版本,没有赋值,没有本地状态变量,因此也没有我们在 3.1.3 节中遇到的理论困难。然而,系统具有状态!
这真是非常了不起。即使stream_withdraw
实现了一个行为不会改变的明确定义的数学函数,用户在这里的感知是在与一个具有变化状态的系统进行交互。解决这个悖论的一种方法是意识到是用户的时间存在给系统带来了状态。如果用户能够从交互中退出,并考虑余额流而不是单独的交易,系统将显得没有状态。⁷⁴
从复杂过程的某一部分的角度来看,其他部分似乎随时间变化。它们具有隐藏的时间变化的本地状态。如果我们希望编写模拟我们世界中这种自然分解的程序(从我们作为该世界一部分的视角来看),并在我们的计算机中使用结构,我们将创建不是功能性的计算对象——它们必须随时间变化。我们使用本地状态变量来模拟状态,并使用对这些变量的赋值来模拟状态的变化。通过这样做,我们使计算模型的执行时间成为我们所在世界的时间,因此我们在计算机中得到了“对象”。
用对象建模是强大且直观的,主要是因为这与我们与之交互的世界的感知相匹配。然而,正如我们在本章中反复看到的那样,这些模型引发了关于约束事件顺序和同步多个进程的棘手问题。避免这些问题的可能性刺激了函数式编程语言的发展,这些语言不包括任何关于赋值或可变数据的规定。在这样的语言中,所有函数都实现其参数的明确定义的数学函数,其行为不会改变。函数式方法对处理并发系统非常有吸引力。
另一方面,如果我们仔细观察,我们也可以看到与时间相关的问题潜入了函数式模型。当我们希望设计交互式系统,特别是模拟独立实体之间的交互时,一个特别棘手的领域就会出现。例如,再次考虑允许联合银行账户的银行系统的实现。在使用赋值和对象的传统系统中,我们将彼得和保罗共享一个账户的事实建模为彼得和保罗都将其交易请求发送到同一个银行账户对象,正如我们在 3.1.3 节中所看到的。从流的角度来看,本质上没有“对象”,我们已经指出银行账户可以被建模为一个处理交易请求流以产生响应流的过程。因此,我们可以通过合并彼得的交易请求流和保罗的请求流,并将结果馈送到银行账户流程,来建模彼得和保罗共有一个联合银行账户,如图 3.38 所示。
图 3.38 一个联合银行账户,通过合并两个交易请求流来建模。
这种表述的问题在于合并的概念。不能简单地通过交替从彼得和保罗那里获取交易请求来合并这两个流。假设保罗很少访问账户。我们几乎无法强迫彼得等待保罗访问账户,然后才能发出第二笔交易。无论如何实现这样的合并,它都必须以某种方式交错这两个交易流,这种方式受到“彼得和保罗感知的”真实时间的约束,即如果彼得和保罗相遇,他们可以同意某些交易在会面之前被处理,而其他交易在会面之后被处理。这正是我们在 3.4.1 节中需要处理的约束,我们发现需要引入显式同步来确保并发处理具有状态对象的事件的“正确”顺序。因此,在支持函数式风格的尝试中,合并来自不同代理的输入重新引入了函数式风格旨在消除的相同问题。
我们开始本章的目标是构建计算模型,其结构与我们试图建模的真实世界的感知相匹配。我们可以将世界建模为一组独立的、有时间限制的、相互作用的具有状态的对象,或者我们可以将世界建模为一个单一的、无时间的、无状态的统一体。每种观点都有强大的优势,但单独的观点都不完全令人满意。一个宏伟的统一尚未出现。