浅谈.NET下的多线程和并行计算(六)线程池基础下

简介: 这节我们按照线程池的核心思想来自定义一个简单的线程池: 1) 池中使用的线程不少于一定数量,不多于一定数量 2) 池中线程不够的时候创建,富裕的时候收回 3) 任务排队,没有可用线程时,任务等待 我们的目的只是实现这些“需求”,不去考虑性能(比如等待一段时间再去创建新的线程等策略)以及特殊的处理(异常),在实现这个需求的过程中我们也回顾了线程以及线程同步的基本概念。

这节我们按照线程池的核心思想来自定义一个简单的线程池:

1) 池中使用的线程不少于一定数量,不多于一定数量

2) 池中线程不够的时候创建,富裕的时候收回

3) 任务排队,没有可用线程时,任务等待

我们的目的只是实现这些“需求”,不去考虑性能(比如等待一段时间再去创建新的线程等策略)以及特殊的处理(异常),在实现这个需求的过程中我们也回顾了线程以及线程同步的基本概念。

首先,把任务委托和任务需要的状态数据封装一个对象:

public class WorkItem
{
    public WaitCallback Action { get; set; }
    public object State { get; set; }

    public WorkItem(WaitCallback action, object state)
    {
        this.Action = action;
        this.State = state;
    }
}

然后来创建一个对象作为线程池中的一个线程:

public class SimpleThreadPoolThread
{
    private object locker = new object();
    private AutoResetEvent are = new AutoResetEvent(false);
    private WorkItem wi;
    private Thread t;
    private bool b = true;
    private bool isWorking;

    public bool IsWorking
    {
        get
        {
            lock (locker)
            {
                return isWorking;
            }
        }
    }
    public event Action<SimpleThreadPoolThread> WorkComplete;

    public SimpleThreadPoolThread()
    {
        lock (locker)
        {
            // 当前没有实际任务
            isWorking = false;
        }
        t = new Thread(Work) { IsBackground = true };
        t.Start();
    }

    public void SetWork(WorkItem wi)
    {
        this.wi = wi;
    }

    public void StartWork()
    {
        // 发出信号
        are.Set();
    }

    public void StopWork()
    {
        // 空任务
        wi = null;
        // 停止线程循环
        b = false;
        // 发出信号结束线程
        are.Set();
    }

    private void Work()
    {
        while (b)
        {
            // 没任务,等待信号
            are.WaitOne();
            if (wi != null)
            {
                lock (locker)
                {
                    // 开始
                    isWorking = true;
                }
                // 执行任务
                wi.Action(wi.State);
                lock (locker)
                {
                    // 结束
                    isWorking = false;
                }
                // 结束事件
                WorkComplete(this);
            }
        }
    }

代码的细节可以看注释,对这段代码的整体结构作一个说明:

1) 由于这个线程是被线程池中任务复用的,所以线程的任务处于循环中,除非线程池打算回收这个线程,否则不会退出循环结束任务

2) 使用自动信号量让线程没任务的时候等待,由线程池在外部设置任务后发出信号来执行实际的任务,执行完毕后继续等待

3) 线程公开一个完成的事件,线程池可以挂接处理方法,在任务完成后更新线程池状态

4) 线程池中的所有线程都是后台线程

下面再来实现线程池:

public class SimpleThreadPool : IDisposable
{
    private object locker = new object();
    private bool b = true;
    private int minThreads;
    private int maxThreads;
    private int currentActiveThreadCount;
    private List<SimpleThreadPoolThread> simpleThreadPoolThreadList = new List<SimpleThreadPoolThread>();
    private Queue<WorkItem> workItemQueue = new Queue<WorkItem>();

    public int CurrentActiveThreadCount
    {
        get
        {
            lock (locker)
            {
                return currentActiveThreadCount;
            }
        }

    }

    public int CurrentThreadCount
    {
        get
        {
            lock (locker)
            {
                return simpleThreadPoolThreadList.Count;
            }
        }
    }

    public int CurrentQueuedWorkCount
    {
        get
        {
            lock (locker)
            {
                return workItemQueue.Count;
            }
        }
    }

    public SimpleThreadPool()
    {
        minThreads = 4;
        maxThreads = 25;
        Init();
    }

    public SimpleThreadPool(int minThreads, int maxThreads)
    {
        if (minThreads > maxThreads)
            throw new ArgumentException("minThreads > maxThreads", "minThreads,maxThreads");
        this.minThreads = minThreads;
        this.maxThreads = maxThreads;
        Init();
    }

    public void QueueUserWorkItem(WorkItem wi)
    {
        lock (locker)
        {
            // 任务入列
            workItemQueue.Enqueue(wi);
        }
    }

    private void Init()
    {
        lock (locker)
        {
            // 一开始创建最小线程
            for (int i = 0; i < minThreads; i++)
            {
                CreateThread();
            }
            currentActiveThreadCount = 0;
        }
        new Thread(Work) { IsBackground = true }.Start();
    }

    private SimpleThreadPoolThread CreateThread()
    {
        SimpleThreadPoolThread t = new SimpleThreadPoolThread();
        // 挂接任务结束事件
        t.WorkComplete += new Action<SimpleThreadPoolThread>(t_WorkComplete);
        // 线程入列
        simpleThreadPoolThreadList.Add(t);
        return t;
    }

    private void Work()
    {
        // 线程池主循环
        while (b)
        {
            Thread.Sleep(100);
            lock (locker)
            {
                // 如果队列中有任务并且当前线程小于最大线程
                if (workItemQueue.Count > 0 && CurrentActiveThreadCount < maxThreads)
                {
                    WorkItem wi = workItemQueue.Dequeue();
                    // 寻找闲置线程
                    SimpleThreadPoolThread availableThread = simpleThreadPoolThreadList.FirstOrDefault(t => t.IsWorking == false);
                    // 无则创建
                    if (availableThread == null)
                        availableThread = CreateThread();
                    // 设置任务
                    availableThread.SetWork(wi);
                    // 开始任务
                    availableThread.StartWork();
                    // 增加个活动线程
                    currentActiveThreadCount++;
                }
            }
        }
    }

    private void t_WorkComplete(SimpleThreadPoolThread t)
    {
        lock (locker)
        {
            // 减少个活动线程
            currentActiveThreadCount--;
            // 如果当前线程数有所富裕并且比最小线程多
            if ((workItemQueue.Count + currentActiveThreadCount) < minThreads && CurrentThreadCount > minThreads)
            {
                // 停止已完成的线程
                t.StopWork();
                // 从线程池删除线程
                simpleThreadPoolThreadList.Remove(t);
            }
        }
    }

    public void Dispose()
    {
        // 所有线程停止
        foreach (var t in simpleThreadPoolThreadList)
        {
            t.StopWork();
        }
        // 线程池主循环停止
        b = false;
    }
}

线程池的结构如下:

1) 在构造方法中可以设置线程池最小和最大线程

2) 维护一个任务队列和一个线程池中线程的列表

3) 初始化线程池的时候就创建最小线程数量定义的线程

4) 线程池主循环每20毫秒就去处理一次,如果有任务并且线程池还可以处理任务的话,先是找闲置线程,找不到则创建一个

5) 通过设置任务委托以及发出信号量来开始任务

6) 线程池提供了三个属性来查看当前活动线程数,当前总线程数和当前队列中的任务数

7) 任务完成的回调事件中我们判断如果当前线程有富裕并且比最小线程多则回收线程

8) 线程池是IDispose对象,在Dispose()方法中停止所有线程后停止线程池主循环

写一段代码来测试线程池:

using (SimpleThreadPool t = new SimpleThreadPool(2, 4))
{
    Stopwatch sw2 = Stopwatch.StartNew();
    for (int i = 0; i < 10; i++)
    {
        t.QueueUserWorkItem(new WorkItem((index =>
        {
            Console.WriteLine(string.Format("#{0} : {1} / {2}", Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("mm:ss"), index));
            Console.WriteLine(string.Format("CurrentActiveThread: {0} / CurrentThread: {1} / CurrentQueuedWork: {2}", t.CurrentActiveThreadCount, t.CurrentThreadCount, t.CurrentQueuedWorkCount));
            Thread.Sleep(1000);
        }), i));
    }
    while (t.CurrentQueuedWorkCount > 0 || t.CurrentActiveThreadCount > 0)
    {
        Thread.Sleep(10);
    }
    Console.WriteLine("All work completed");
    Console.WriteLine(string.Format("CurrentActiveThread: {0} / CurrentThread: {1} / CurrentQueuedWork: {2}", t.CurrentActiveThreadCount, t.CurrentThreadCount, t.CurrentQueuedWorkCount));
    Console.WriteLine(sw2.ElapsedMilliseconds);
} 

代码中我们向线程池推入10个任务,每个任务需要1秒执行,任务执行前输出当前任务的所属线程的Id,当前时间以及状态值。然后再输出线程池的几个状态属性。主线程循环等待所有任务完成后再次输出线程池状态属性以及所有任务完成耗费的时间:

image

我们可以看到:

1) 线程池中的线程总数从2到4到2

2) 线程池中活动的线程数从2到4到0

3) 线程池中排队的任务数从9到0

4) 所有线程完成一共使用了3秒时间

相比.NET内置的线程池,性能虽然有0.5秒的提高(可以见前文,.NET线程池在创建新的线程之前会等待0.5秒左右的时间),但其实一个好的线程池的实现需要考虑很多策略(什么时候去创建新线程,什么时候去回收老线程),.NET的ThreadPool在整体性能上做的很好,所以不建议随便去使用自定义的线程池。本例更只能作为实验和演示。

作者: lovecindywang
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
相关文章
|
11月前
|
安全 算法 Java
Java 多线程:线程安全与同步控制的深度解析
本文介绍了 Java 多线程开发的关键技术,涵盖线程的创建与启动、线程安全问题及其解决方案,包括 synchronized 关键字、原子类和线程间通信机制。通过示例代码讲解了多线程编程中的常见问题与优化方法,帮助开发者提升程序性能与稳定性。
458 0
|
8月前
|
设计模式 缓存 安全
【JUC】(6)带你了解共享模型之 享元和不可变 模型并初步带你了解并发工具 线程池Pool,文章内还有饥饿问题、设计模式之工作线程的解决于实现
JUC专栏第六篇,本文带你了解两个共享模型:享元和不可变 模型,并初步带你了解并发工具 线程池Pool,文章中还有解决饥饿问题、设计模式之工作线程的实现
558 2
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
745 60
【Java并发】【线程池】带你从0-1入门线程池
|
监控 Kubernetes Java
阿里面试:5000qps访问一个500ms的接口,如何设计线程池的核心线程数、最大线程数? 需要多少台机器?
本文由40岁老架构师尼恩撰写,针对一线互联网企业的高频面试题“如何确定系统的最佳线程数”进行系统化梳理。文章详细介绍了线程池设计的三个核心步骤:理论预估、压测验证和监控调整,并结合实际案例(5000qps、500ms响应时间、4核8G机器)给出具体参数设置建议。此外,还提供了《尼恩Java面试宝典PDF》等资源,帮助读者提升技术能力,顺利通过大厂面试。关注【技术自由圈】公众号,回复“领电子书”获取更多学习资料。
|
11月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
Java 数据挖掘 调度
Java 多线程创建零基础入门新手指南:从零开始全面学习多线程创建方法
本文从零基础角度出发,深入浅出地讲解Java多线程的创建方式。内容涵盖继承`Thread`类、实现`Runnable`接口、使用`Callable`和`Future`接口以及线程池的创建与管理等核心知识点。通过代码示例与应用场景分析,帮助读者理解每种方式的特点及适用场景,理论结合实践,轻松掌握Java多线程编程 essentials。
824 5
|
Java
线程池是什么?线程池在实际工作中的应用
总的来说,线程池是一种有效的多线程处理方式,它可以提高系统的性能和稳定性。在实际工作中,我们需要根据任务的特性和系统的硬件能力来合理设置线程池的大小,以达到最佳的效果。
356 18
|
Python
python3多线程中使用线程睡眠
本文详细介绍了Python3多线程编程中使用线程睡眠的基本方法和应用场景。通过 `time.sleep()`函数,可以使线程暂停执行一段指定的时间,从而控制线程的执行节奏。通过实际示例演示了如何在多线程中使用线程睡眠来实现计数器和下载器功能。希望本文能帮助您更好地理解和应用Python多线程编程,提高程序的并发能力和执行效率。
593 20
|
安全 Java C#
Unity多线程使用(线程池)
在C#中使用线程池需引用`System.Threading`。创建单个线程时,务必在Unity程序停止前关闭线程(如使用`Thread.Abort()`),否则可能导致崩溃。示例代码展示了如何创建和管理线程,确保在线程中执行任务并在主线程中处理结果。完整代码包括线程池队列、主线程检查及线程安全的操作队列管理,确保多线程操作的稳定性和安全性。
|
NoSQL Redis
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
345 1