使用协程实现多线程

简介: 使用协程实现多线程

协程能够实现一种协作式多线程。每个协程都等价于一个线程。一对 yield-resume 可以将执行权在不同线程之间切换。不过,与普通的多线程不同,协程是非抢占的。当一个协程正在运行时,是无法从外部停止它的。只有当协程显示地要求时(通过调用函数 yield )它才会挂起执行。对于有些应用而言,这并没有问题,而对于另外一些应用则不行。当不存在抢占时,编程简单很多。由于在程序中所有的线程间同步都是显式的,所以我们无需为线程同步问题抓狂,只需要确保一个协程只在它的临界区之外调用 yield 即可。


不过,对于非抢占式多线程来说,只要有一个线程调用了阻塞操作,整个程序在该操作完成前都会阻塞。对于很多应用程序来说,这种行为是无法接受的,而这也正是导致许多程序员不把协程看作传统多线程的一种实现的原因。接下来,我们会用一个有趣(且显而易见)的方法来解决这个问题。


让我们假设一个典型的多线程场景:我们希望通过 HTTP 下载多个远程文件。为了下载多个远程文件,我们必须先知道如何下载一个远程文件。在本例中,我们将使用 LuaSocket 标准库。要下载一个文件,必须先打开一个到对应站点的连接,然后发送下载文件的请求,接收文件(按块),最后关闭连接。在 Lua 语言中,可以按以下步骤来完成这项任务。首先,加载 LuaSocket 库:

local socket = require "socket"点击复制复制失败已复制


然后,定义主机和要下载的文件。在本例中,我们从 Lua 语言官网下载 Lua5.3 的手册:

host = "www.lua.org"
file = "/manual/5.3/manual.html"点击复制复制失败已复制


接下来,打开一个 TCP 连接,连接到该站点的 80 端口( HTTP 协议的默认端口):

c = assert(socket.connect(host, 80))点击复制复制失败已复制


这步操作返回一个连接对象,可以用它来发送下载文件的请求:

local request = string.format("GET %s HTTP/1.0\r\nhost: %s\r\n\r\n", file, host)
c:send(request)点击复制复制失败已复制


接下来,以 1KB 为一块读取文件,并将每块写入到标准输出中:

repeat
  local s, status, partial = c:receive(2^10)
  io.write(s or partial)
until status == "closed"点击复制复制失败已复制


函数 receive 要么返回它读取到的字符串,要么在发生错误时返回 nil 外加错误码及出错前读取到的内容。当主机关闭连接时,把输入流中剩余的内容打印出来,然后退出接受循环。


下载完文件后,关闭连接:

c:close()点击复制复制失败已复制


既然我们知道了如何下载一个文件,那么再回到下载多个文件的问题上。最简单的做法是逐个地下载文件。不过,这种串行的做法太慢了,它只能在下载一个文件后再下载一个文件。当读取一个远程文件时,程序把大部分的时间耗费在了等待数据到大上。更准确的说,程序将时间耗费在了多 receive 的阻塞调用上。因此,如果一个程序能够同时并行下载所有文件的话,就会快很多。当一个连接没有可用数据时,程序便可以从其他连接读取数据。很明显,协程为构造这种并发下载的代码结构提供了一种简便的方式。我们可以为每个下载任务创建一个新线程,当一个线程无可用数据时,它就可以将控制权传递给一个简单的调度器,这个调度器再去调用其他的线程。


在用协程重写程序前,我们先把之前的代码重写成一个函数。如下所示:

function download(host,file)
  local c = assert(socket.connect(host, 80))
  local count = 0     -- 计算读取的字节数
  local request = string.format("GET %s HTTP/1.0\r\nhost: %s\r\n\r\n", file, host)
  c:send(request)
  while true do
    local s, status = receive(c)
    count = count + #s
    if status == "closed" then break end
  end
  c:close()
  print(file, count)
end点击复制复制失败已复制


由于我们对远程文件的内容并不感兴趣,所以不需要将文件内容写入到标准输出中,只要计算并输出文件大小即可。


提示

多个线程同时读取多个文件时,输出的结果也是乱的


在新版代码中,我们使用一个辅助函数 receive 从连接接收数据。在串行的下载方式中, receive 的代码如下:

function receive (connection)
  local s, status, partial = conenction:receive(2^10)
  return s or partial, status
end点击复制复制失败已复制


在并行的实现中,这个函数在接收数据时不能阻塞。因此,在没有足够的可用数据时,该函数会挂起,如下所示:

function receive(connection)
  connection:settimeout(0)    -- 不阻塞
  local s, status, partial = connection:receive(2^10)
  if status == "timeout" then
    coroutine.yield(connection)
  end
  return s or partial, status
end点击复制复制失败已复制


调用 settimeout(0) 使得后续所有对连接进行的操作不会阻塞。如果返回状态为 "timeout" ,就表示该操作在返回时还未完成。此时,线程就会挂起。传递给 yield 的非假参数通知调度器线程仍在执行任务中。


注意

即使在超时的情况下,连接也会返回超时前已读取到的内容,也就是变量 partial 中的内容。

tasks = {}    -- 所有活跃任务的列表
function get (host, file)
  -- 为任务创建协程
  local co = coroutine.wrap(function ()
    download(host, file)
  end)
  -- 将其插入列表
  table.insert(tasks, co)
end
function dispatch ()
  local i = 1
  while true do
    if tasks[i] == nil then   -- 没有其他任务了?
      if tasks[1] == nil then   -- 列表为空?
        break   -- 从循环中退出
      end
      i = 1
    end
    local res = tasks[i]()    -- 运行一个任务
    if not res then           -- 任务结束?
      table.remove(tasks, i)
    else
      i = i + 1   -- 处理下一个任务
    end
  end
end点击复制复制失败已复制


表tasks为调度器保存着所有正在运行中的线程的列表。函数 get 保证每个下载任务运行在一个独立的线程中。调度器本身主要就是一个循环,它遍历所有的线程,逐个唤醒他们。调度器还必须在线程完成任务后,将该线程从列表中删除。在所有线程都完成运行后,调度器停止循环。


最后,主程序创建所有需要的线程并调起调度器。例如,如果要从 Lua 官网上下载几个发行包,主程序可能如下:

get("www.lua.org", "/ftp/lua-5.3.2.tar.gz")
get("www.lua.org", "/ftp/lua-5.3.1.tar.gz")
get("www.lua.org", "/ftp/lua-5.3.0.tar.gz")
get("www.lua.org", "/ftp/lua-5.2.4.tar.gz")
get("www.lua.org", "/ftp/lua-5.2.3.tar.gz")
dispatch()    -- 主循环点击复制复制失败已复制


尽管速度提高了,但是最后一种实现还有很大的优化空间。当至少有一个线程有数据可读取时不会有问题。然而,如果所有的线程都没有数据可读,调度程序就会陷入忙等待busy wait ),不断地从一个线程切换到另一个线程来检查是否有数据可读。这样,会导致协程版的实现比串行版的实现耗费多达 3 倍的 CPU 时间。


为了避免这样的情况,可以使用 LuaSocket 中的函数 select ,该函数允许程序阻塞直到一组套接字的状态发生改变。要实现这种改动,只需要修改调度器即可,如下所示:

function dispatch ()
  local i = 1
  local timeout = {}
  while true do
    if tasks[i] == nil then
      if tasks[1] == nil then
        break
      end
      i = 1
      timeout = {}
    end
    local res = tasks[i]()
    if not res then
      table.remove(tasks, i)
    else
      i = i + 1
      timeout[#timeout + 1] = res
      if #timeout == #tasks then
        socket.select(timeout)
      end
    end
  end
end点击复制复制失败已复制


在循环中,新的调度器将所有超时的连接收集到表 timeout 中。请记住,函数 receive 将这种超时的连接传递给 yield ,然后由 resume 返回。如果所有的连接均超时,那么调度器调用 select 等待这些连接的状态就会发生改变。这个最终的实现与上一个使用协程的实现一样快。另外,由于它不会有忙等待,所以串行实现耗费的 CPU 资源一样多。

目录
相关文章
|
7月前
|
Java 数据库 Android开发
【专栏】Kotlin在Android开发中的多线程优化,包括线程池、协程的使用,任务分解、避免阻塞操作以及资源管理
【4月更文挑战第27天】本文探讨了Kotlin在Android开发中的多线程优化,包括线程池、协程的使用,任务分解、避免阻塞操作以及资源管理。通过案例分析展示了网络请求、图像处理和数据库操作的优化实践。同时,文章指出并发编程的挑战,如性能评估、调试及兼容性问题,并强调了多线程优化对提升应用性能的重要性。开发者应持续学习和探索新的优化策略,以适应移动应用市场的竞争需求。
201 5
|
7月前
|
数据采集 数据库 C++
python并发编程:并发编程中是选择多线程呢?还是多进程呢?还是多协程呢?
python并发编程:并发编程中是选择多线程呢?还是多进程呢?还是多协程呢?
76 0
|
6月前
|
分布式计算 JavaScript 前端开发
多线程、多进程、协程的概念、区别与联系
多线程、多进程、协程的概念、区别与联系
103 1
|
2月前
|
运维 API 计算机视觉
深度解密协程锁、信号量以及线程锁的实现原理
深度解密协程锁、信号量以及线程锁的实现原理
49 2
|
2月前
|
存储 运维 API
源码解密协程队列和线程队列的实现原理(一)
源码解密协程队列和线程队列的实现原理(一)
44 1
|
2月前
|
存储 安全 API
源码解密协程队列和线程队列的实现原理(二)
源码解密协程队列和线程队列的实现原理(二)
37 1
|
7月前
|
安全 调度 Python
探索Python中的并发编程:协程与多线程的比较
本文将深入探讨Python中的并发编程技术,重点比较协程与多线程的特点和应用场景。通过对协程和多线程的原理解析,以及在实际项目中的应用案例分析,读者将能够更好地理解两种并发编程模型的异同,并在实践中选择合适的方案来提升Python程序的性能和效率。
|
4月前
|
调度 Android开发 开发者
【颠覆传统!】Kotlin协程魔法:解锁Android应用极速体验,带你领略多线程优化的无限魅力!
【8月更文挑战第12天】多线程对现代Android应用至关重要,能显著提升性能与体验。本文探讨Kotlin中的高效多线程实践。首先,理解主线程(UI线程)的角色,避免阻塞它。Kotlin协程作为轻量级线程,简化异步编程。示例展示了如何使用`kotlinx.coroutines`库创建协程,执行后台任务而不影响UI。此外,通过协程与Retrofit结合,实现了网络数据的异步加载,并安全地更新UI。协程不仅提高代码可读性,还能确保程序高效运行,不阻塞主线程,是构建高性能Android应用的关键。
66 4
|
7月前
|
并行计算 Java Linux
工作2年,有些人竟然还不懂进程、线程、协程之间的关系!
我们都知道计算机的核心是CPU,它承担了所有的计算任务;而操作系统是计算机的管理者,它负责任务的调度、资源的分配和管理,统领整个计算机硬件;应用程序则是具有某种功能的程序,程序是运行于操作系统之上的。
53 0
|
5月前
|
消息中间件 算法 Java
(十四)深入并发之线程、进程、纤程、协程、管程与死锁、活锁、锁饥饿详解
本文深入探讨了并发编程的关键概念和技术挑战。首先介绍了进程、线程、纤程、协程、管程等概念,强调了这些概念是如何随多核时代的到来而演变的,以满足高性能计算的需求。随后,文章详细解释了死锁、活锁与锁饥饿等问题,通过生动的例子帮助理解这些现象,并提供了预防和解决这些问题的方法。最后,通过一个具体的死锁示例代码展示了如何在实践中遇到并发问题,并提供了几种常用的工具和技术来诊断和解决这些问题。本文旨在为并发编程的实践者提供一个全面的理解框架,帮助他们在开发过程中更好地处理并发问题。