泛函编程(36)-泛函Stream IO:IO数据源-IO Source & Sink

简介:

 上期我们讨论了IO处理过程:Process[I,O]。我们说Process就像电视信号盒子一样有输入端和输出端两头。Process之间可以用一个Process的输出端与另一个Process的输入端连接起来形成一串具备多项数据处理功能的完整IO过程。但合成的IO过程两头输入端则需要接到一个数据源,而另外一端则可能会接到一个数据接收设备如文件、显示屏等。我们在这篇简单地先介绍一下IO数据源Source和IO数据接收端Sink。

我们先用一个独立的数据类型来代表数据源Source进行简单的示范说明,这个类型与Process类型没有任何关系:


 1 import ProcessLib._
 2 object SourceSink {
 3 trait Source[O] {  //以下helper function都是把Source当作O类的List处理
 4   def |>[O2](p: Process[O,O2]): Source[O2]   //粘接一个Process p. 向其输入O
 5   def filter(f: O => Boolean): Source[O] = this |> Process.filter(f) //向p输入O
 6   def map[O2](f: O => O2): Source[O2] = this |> Process.lift(f)
 7   def take(n: Int): Source[O] = this |> Process.take(n)  //截取前n个O
 8   def takeWhile(f: O => Boolean): Source[O] = this |> Process.takeWhile(f)
 9   def drop(n: Int): Source[O] = this |> Process.drop(n) //跳过前n个O
10   def dropWhile(f: O => Boolean): Source[O] = this |> Process.dropWhile(f) 
11 }

从以上trait可以看到:Source的工作原理就是把一个Process的输入黏贴到Source的输出端。我们可以用这个 |> 把一串Process粘到Source的输出,如:Src.proc1.proc2.proc3。不过我们得先把proc1,proc2,proc3定义成Source组件函数,因为Source是一个完全独立的类型。

我们再来看看一个Source特殊案例:


1 case class ResourceR[R,I,O](   //Source的一个只读资源案例
2  acquire: IO[R],   //资源使用门户  resource handle
3  release: R => IO[Unit], //完成使用资源后的清理函数
4  step: R => IO[Option[I]], //资源内容读取函数
5  trans: Process[I,O]  //输出方式
6  ) extends Source[O] {
7      def |>[O2](p: Process[O,O2]): Source[O2] =  //实现抽象函数
8        ResourceR(acquire,release,step,trans |> p) //每次输入都产生一个ResourceR.它的trans与p进行管道对接
9  }

这是个只读的数据源。我们看到所有的动作都被包嵌在IO类型里,这样可以把副作用的产生延后到一些Source Interpreter来运算。这里我们只要用最简单的IO来说明就可以了:


 1 trait IO[A] { self =>
 2     def run: A
 3     def map[B](f: A => B): IO[B] =
 4       new IO[B] { def run = f(self.run) }
 5     def flatMap[B](f: A => IO[B]): IO[B] =
 6       new IO[B] { def run = f(self.run).run }
 7 }
 8 object IO {
 9     def unit[A](a: => A): IO[A] = new IO[A] { def run = a }
10     def flatMap[A,B](fa: IO[A])(f: A => IO[B]) = fa flatMap f
11     def apply[A](a: => A): IO[A] = unit(a) // syntax for IO { .. }
12 }

这个IO类型我们在前面的讨论里曾经练习过。

现在我们来看看一个文件读取的ResourceR例子:


 1 object Source {
 2 import java.io._
 3     def lines(fileName: String): Source[String] =  //从文件fileName里读取String
 4       ResourceR(   //创建一个Source的实例
 5         IO {io.Source.fromFile(fileName) },  //资源
 6         (src: io.Source) => IO { src.close },  //清理
 7         (src: io.Source) => IO {    //读取
 8             lazy val iterator = src.getLines
 9             if (iterator.hasNext) Some(iterator.next) else None //读完返回None
10         },
11         Process.passUnchanged) //Process[I,I],读什么输入什么
12 }

现在我们可以这样写一段程序了:


1  Source.lines("input.txt").count.exists{_ >= 40000 }
2                                                   //> res0: ch15.SourceSink.Source[Boolean] = ResourceR(ch15.SourceSink$IO$$anon$
3                                                   //| 3@762efe5d,<function1>,<function1>,Await(<function1>))

噢,记住把count和exists放到Source trait里:


1     def exists(f: O => Boolean): Source[Boolean] = this |> Process.exists(f)
2     def count: Source[Int] = this |> Process.count

上面的表达式可以说还只是IO过程的描述。实际副作用产生是在interpreter里:


1     def collect: IO[IndexedSeq[O]] = {  //读取数据源返回IO[IndexedSeq[O]], 用IO.run来实际运算
 2          def tryOr[A](a: => A)(cleanup: IO[Unit]): A =  //运算表达式a, 出现异常立即清理现场
 3            try a catch {case e: Exception => cleanup.run; throw e}
 4          @annotation.tailrec  //这是个尾递归算法,根据trans状态
 5          def go(acc: IndexedSeq[O], cleanup: IO[Unit], step: IO[Option[I]], trans: Process[I,O]): IndexedSeq[O] =
 6            trans match {
 7                case Halt() => cleanup.run; acc  //停止状态,清理现场
 8                case Emit(out,next) => go(tryOr(out +: acc)(cleanup), cleanup, step, next) //积累acc
 9                case Await(iproc) => tryOr(step.run)(cleanup) match {
10                    case None => cleanup.run; acc  //读完了清理现场
11                    case si => go(acc,cleanup,step,iproc(si))  //读入元素作为Process输入来改变Process状态
12                }
13            }
14          acquire map {res => go(IndexedSeq(),release(res),step(res),trans)} //开始读取
15      }

注意:无论读取完成或中途失败退出都会导致现场清理以防止资源漏出。可以推断这个interpreter还是很安全的。

与Source同样,我们还是用一个独立的类型Sink来代表数据接收端进行简单说明:


1 trait Sink[I] {
2      def <|[I2](p: Process[I2,I]): Sink[I2] //p的输出接到Sink的输入
3      def filter(f: I => Boolean): Sink[I] = this <| Process.filter(f)  //从p接收I
4      def map[I2](f: I2 => I): Sink[I2] = this <| Process.lift(f) //将接收的I2变成I
5      def take(n: Int): Sink[I] = this <| Process.take(n)  //从p接收前n个I
6      def takeWhile(f: I => Boolean): Sink[I] = this <| Process.takeWhile(f)
7      def drop(n: Int): Sink[I] = this <| Process.drop(n) //过滤掉首n个I
8      def dropWhile(f: I => Boolean): Sink[I] = this <| Process.dropWhile(f)
9  }

这和Source trait及其相似。注意和Process连接是反向的:由p指向Sink。

同样,一个只写的资源实例如下:


1 case class ResourceW[R,I,I2](  //只写资源
2    acquire: IO[R],   //资源使用门户, resource handle
3    release: R => IO[Unit],  //清理函数
4    rcvf: R => (I2 => IO[Unit]), //接收方式
5    trans: Process[I,I2]  //处理过程
6    ) extends Sink[I] {
7        def <|[I2](p: Process[I2,I]): Sink[I2] =
8          ResourceW(acquire,release,rcvf,p |> trans)    //制造一个ResourceW实例,由p到trans
9    }

这个也和ResourceR相似。还是与Process连接方式是反方向的:由p到trans。

以下是一个向文件写入的Sink组件:


 1 object Sink {
 2  import java.io._
 3      def file(fileName: String, append: Boolean = false): Sink[String] = //结果是Sink[String]。必须用interpreter来运算
 4        ResourceW(   //是一个ResourceW实例
 5        IO {new FileWriter(fileName,append) }, //创建FileWriter
 6        (w: FileWriter) => IO {w.close},  //释放FileWriter
 7        (w: FileWriter) => (s: String) => IO {w.write(s)},  //写入
 8        Process.passUnchanged    //不处理写入数据
 9        )
10  }

在学习过程中发现,独立于Process类型的Source,Sink类型使IO算法的表达式类型的集成很困难。这也限制了组件的功能。我们无法实现泛函编程简洁高雅的表达形式。在下面的讨论中我们会集中精力分析具备数据源功能的Process,希望在表达方式上能有所进步。



相关文章
|
10月前
|
Linux 测试技术 API
linux系统编程 文件io
linux系统编程 文件io
132 0
|
11月前
|
数据采集 并行计算 Java
【文末送书】Python高并发编程:探索异步IO和多线程并发
【文末送书】Python高并发编程:探索异步IO和多线程并发
215 0
|
4月前
|
Java Unix Windows
|
4月前
|
数据采集 异构计算
LabVIEW编程LabVIEW开发高级数据采集技术 操作数字IO 例程与相关资料
LabVIEW编程LabVIEW开发高级数据采集技术 操作数字IO 例程与相关资料
75 22
|
20天前
|
Java 数据处理
Java IO 接口(Input)究竟隐藏着怎样的神秘用法?快来一探究竟,解锁高效编程新境界!
【8月更文挑战第22天】Java的输入输出(IO)操作至关重要,它支持从多种来源读取数据,如文件、网络等。常用输入流包括`FileInputStream`,适用于按字节读取文件;结合`BufferedInputStream`可提升读取效率。此外,通过`Socket`和相关输入流,还能实现网络数据读取。合理选用这些流能有效支持程序的数据处理需求。
23 2
|
2月前
|
缓存 网络协议 算法
【Linux系统编程】深入剖析:四大IO模型机制与应用(阻塞、非阻塞、多路复用、信号驱动IO 全解读)
在Linux环境下,主要存在四种IO模型,它们分别是阻塞IO(Blocking IO)、非阻塞IO(Non-blocking IO)、IO多路复用(I/O Multiplexing)和异步IO(Asynchronous IO)。下面我将逐一介绍这些模型的定义:
111 1
|
3月前
|
Java 数据库连接
提升编程效率的利器: 解析Google Guava库之IO工具类(九)
提升编程效率的利器: 解析Google Guava库之IO工具类(九)
|
4月前
|
存储 Java API
Java语言IO(输入/输出)编程技术深度解析
Java语言IO(输入/输出)编程技术深度解析
269 1
|
3月前
|
SQL 数据处理 API
实时计算 Flink版产品使用问题之holo的io以及cpu使用较为稳定,sink端busy一直在20%左右,有时候50%,该如何优化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
调度 数据库 开发者
在Python编程中,并发编程和异步IO是两个重要的概念,它们对于提高程序性能和响应速度具有至关重要的作用
【6月更文挑战第10天】本文介绍了Python并发编程和异步IO,包括并发编程的基本概念如多线程、多进程和协程。线程和进程可通过threading及multiprocessing模块管理,但多线程受限于GIL。协程利用asyncio模块实现非阻塞IO,适合处理IO密集型任务。异步IO基于事件循环,能提高服务器并发处理能力,适用于网络编程和文件操作等场景。异步IO与多线程、多进程在不同任务中有各自优势,开发者应根据需求选择合适的技术。
28 0