关于C#线程,线程池和并行运算的简单使用和对比

简介:

参考:

http://msdn.microsoft.com/zh-cn/library/system.threading.threadpool(VS.80).aspx

http://www.codeproject.com/KB/threads/threadtests.aspx

http://www.codeproject.com/KB/threads/smartthreadpool.aspx

http://blog.zhaojie.me/2009/07/thread-pool-1-the-goal-and-the-clr-thread-pool.html  (老赵的浅谈线程池上中下三篇)

Jeffrey Richter <<CLR via C#>> 3rd Edition

 

先大概看一下控制台应用程序的Main方法的主要代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
static  bool  done = false ;
static  decimal  count2 = 0;
static  int  threadDone = 0; //标志启用线程数?
static  System.Timers.Timer timer = new  System.Timers.Timer(1000);
 
static  decimal [] threadPoolCounters = new  decimal [10];
static  Thread[] threads = new  Thread[10];
static  System.Timers.Timer[] threadTimers = new  System.Timers.Timer[10];
 
static  void  Main( string [] args)
{
     timer.Stop();
     /*当 AutoReset 设置为 false 时,Timer 只在第一个 Interval 过后引发一次 Elapsed 事件。
      若要保持以 Interval 时间间隔引发 Elapsed 事件,请将 AutoReset 设置为 true。*/
     timer.AutoReset = false ;
     timer.Elapsed += new  ElapsedEventHandler(OnTimerEvent); //当timer.Start()时,触发事件
     decimal  total = 0;
 
     // raw test
     decimal  count1 = SingleThreadTest(); //单一线程,一跑到底
     Console.WriteLine( "Single thread count = "  + count1.ToString());
 
     // create one thread, increment counter, destroy thread, repeat
     Console.WriteLine();
     CreateAndDestroyTest(); //创建一个线程,运算,然后销毁该线程 重复前面的动作
     Console.WriteLine( "Create and destroy per count = "  + count2.ToString());
 
     // Create 10 threads and run them simultaneously
     //一次性创建10个线程,然后遍历使线程执行运算
     Console.WriteLine();
     InitThreadPoolCounters();
     InitThreads();
     StartThreads();
     while  (threadDone != 10) { };
     Console.WriteLine( "10 simultaneous threads:" );
     for  ( int  i = 0; i < 10; i++)
     {
         Console.WriteLine( "T"  + i.ToString() + " = "  + threadPoolCounters[i].ToString() + "   " );
         total += threadPoolCounters[i];
     }
     Console.WriteLine( "Total = "  + total.ToString());
     Console.WriteLine();
 
     Console.WriteLine( "///////////////////////////////////////////////////" );
 
     // using ThreadPool
     //直接通过线程池的QueueUserWorkItem方法,按队列执行10个任务
     Console.WriteLine();
     Console.WriteLine( "ThreadPool:" );
     InitThreadPoolCounters();
     QueueThreadPoolThreads();
     while  (threadDone != 10) { };
     Console.WriteLine( "ThreadPool: 10 simultaneous threads:" );
     total = 0;
     for  ( int  i = 0; i < 10; i++)
     {
         //              threadTimers[i].Stop();
         //              threadTimers[i].Dispose();
         Console.WriteLine( "T"  + i.ToString() + " = "  + threadPoolCounters[i].ToString() + "   " );
         total += threadPoolCounters[i];
     }
     Console.WriteLine( "Total = "  + total.ToString());
 
     // using SmartThreadPool
     //通过Amir Bar的SmartThreadPool线程池,利用QueueUserWorkItem方法,按队列执行10个任务
     Console.WriteLine();
     Console.WriteLine( "SmartThreadPool:" );
     InitThreadPoolCounters();
     QueueSmartThreadPoolThreads();
     while  (threadDone != 10) { };
     Console.WriteLine( "SmartThreadPool: 10 simultaneous threads:" );
     total = 0;
     for  ( int  i = 0; i < 10; i++)
     {
         Console.WriteLine( "T"  + i.ToString() + " = "  + threadPoolCounters[i].ToString() + "   " );
         total += threadPoolCounters[i];
     }
     Console.WriteLine( "Total = "  + total.ToString());
 
     // using ManagedThreadPool
     //通过Stephen Toub改进后的线程池,利用QueueUserWorkItem方法,按队列执行10个任务
     Console.WriteLine();
     Console.WriteLine( "ManagedThreadPool:" );
     InitThreadPoolCounters();
     QueueManagedThreadPoolThreads();
     while  (threadDone != 10) { };
     Console.WriteLine( "ManagedThreadPool: 10 simultaneous threads:" );
     total = 0;
     for  ( int  i = 0; i < 10; i++)
     {
         Console.WriteLine( "T"  + i.ToString() + " = "  + threadPoolCounters[i].ToString() + "   " );
         total += threadPoolCounters[i];
     }
     Console.WriteLine( "Total = "  + total.ToString());
 
     // using C#4.0 Parallel
     //通过Tasks.Parallel.For进行并行运算
     Console.WriteLine();
     Console.WriteLine( "Parallel:" );
     InitThreadPoolCounters();
     UseParallelTasks();
     while  (threadDone != 10) { };
     Console.WriteLine( "Parallel: 10 simultaneous threads:" );
     total = 0;
     for  ( int  i = 0; i < 10; i++)
     {
         Console.WriteLine( "T"  + i.ToString() + " = "  + threadPoolCounters[i].ToString() + "   " );
         total += threadPoolCounters[i];
     }
     Console.WriteLine( "Total = "  + total.ToString());
}

我们可以先熟悉一下大致思路。代码中,我们主要依靠输出的数字count或者total来判断哪个方法执行效率更高(原文是How Hign Can I Count?),通常输出的数字越大,我们就认为它”干的活越多“,效率越高。主要实现过程就是通过一个静态的System.Timers.Timer对象的timer实例,设置它的Interval属性和ElapsedEventHandler事件:

1
2
3
4
5
         static  System.Timers.Timer timer = new  System.Timers.Timer(1000);
/*当 AutoReset 设置为 false 时,Timer 只在第一个 Interval 过后引发一次 Elapsed 事件。
              若要保持以 Interval 时间间隔引发 Elapsed 事件,请将 AutoReset 设置为 true。*/
timer.AutoReset = false ;
timer.Elapsed += new  ElapsedEventHandler(OnTimerEvent); //当timer.Start()时,触发事件

其中,timer的事件触发的函数:

1
2
3
4
static  void  OnTimerEvent( object  src, ElapsedEventArgs e)
    {
        done = true ;
    }

每次timer.Start执行的时候,一次测试就将开始,这样可以确保测试的不同方法都在1000毫秒内跑完。

下面开始具体介绍几个方法:

A、线程

这个非常简单,就是通过主线程计算在1000毫秒内,count从0递增加到了多少:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/// <summary>
/// 单一线程,一跑到底
/// </summary>
/// <returns></returns>
static  decimal  SingleThreadTest()
{
     done = false ;
     decimal  counter = 0;
     timer.Start();
     while  (!done)
     {
         ++counter;
     }
     return  counter;
}

while判断可以保证方法在1000毫秒内执行完成。

 

B、多线程

这个多线程方法比较折腾,先创建线程,然后运行,最后销毁线程,这就是一个线程执行单元,重复10次这个线程执行单元。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/// <summary>
   /// 创建一个线程,运算,然后销毁该线程 重复前面的动作
   /// </summary>
   static  void  CreateAndDestroyTest()
   {
       done = false ;
       timer.Start();
       while  (!done)
       {
           Thread counterThread = new  Thread( new  ThreadStart(Count1Thread));
           counterThread.IsBackground = true ; //后台线程
           counterThread.Start();
           while  (counterThread.IsAlive) { };
       }
   }

那个ThreadStart委托对应的方法Count1Thread如下:

1
2
3
4
static  void  Count1Thread()
{
     ++count2; //静态字段count2自增
}

从表面上看,大家估计都可以猜到,效果可能不佳。

 

C、还是多线程

这个方法不判断线程的执行状态,不用等到一个线程销毁后再创建一个线程,然后执行线程方法。线程执行的方法就是根据线程的Name找到一个指定数组的某一索引,并累加改变数组的值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/// <summary>
     /// 将数组和线程数标志threadDone回到初始状态
     /// </summary>
     static  void  InitThreadPoolCounters()
     {
         threadDone = 0;
         for  ( int  i = 0; i < 10; i++)
         {
             threadPoolCounters[i] = 0;
         }
     }
 
     /// <summary>
     /// 初始化10个线程
     /// </summary>
     static  void  InitThreads()
     {
         for  ( int  i = 0; i < 10; i++)
         {
             threads[i] = new  Thread( new  ThreadStart(Count2Thread));
             threads[i].IsBackground = true ;
             threads[i].Name = i.ToString(); //将当前线程的Name赋值为数组索引,在Count2Thread方法中获取对应数组
         }
     }
 
     /// <summary>
     /// 开始多线程运算
     /// </summary>
     static  void  StartThreads()
     {
         done = false ;
         timer.Start();
         for  ( int  i = 0; i < 10; i++)
         {
             threads[i].Start();
         }
     }

其中,每一个线程需要执行的委托方法

1
2
3
4
5
6
7
8
9
static  void  Count2Thread()
{
     int  n = Convert.ToInt32(Thread.CurrentThread.Name); //取数组索引
     while  (!done)
     {
         ++threadPoolCounters[n];
     }
     Interlocked.Increment( ref  threadDone); //以原子操作的形式保证threadDone递增
}

在测试过程中,我们看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Create 10 threads and run them simultaneously
       //一次性创建10个线程,然后遍历使线程执行运算
       Console.WriteLine();
       InitThreadPoolCounters();
       InitThreads();
       StartThreads();
       while  (threadDone != 10) { };
       Console.WriteLine( "10 simultaneous threads:" );
       for  ( int  i = 0; i < 10; i++)
       {
           Console.WriteLine( "T"  + i.ToString() + " = "  + threadPoolCounters[i].ToString() + "   " );
           total += threadPoolCounters[i];
       }
       Console.WriteLine( "Total = "  + total.ToString());
       Console.WriteLine();

最后算出这个数组的所有元素的总和,就是这10个线程在1000毫秒内所做的事情。其中, while (threadDone != 10) { };这个判断非常重要。这个方法看上去没心没肺,线程创建好就不管它的死活了(还是管活不管死?),所以效率应该不低。

实际上,我在本地测试并看了一下输出,表面看来,按count大小逆序排列:C>A>B,这就说明多线程并不一定比单线程运行效率高。其实B之所以效率不佳,主要是由于这个方法大部分的”精力“花在线程的执行状态和销毁处理上。

注意,其实C和A、B都没有可比性,因为C计算的是数组的总和,而A和B只是简单的对一个数字进行自加。

ps:C这一块说的没有中心,想到哪写到哪,所以看起来写得很乱,如果看到这里您还觉着不知所云,建议先下载最后的demo,先看代码,再对照这篇文章。

好了,到这里,我们对线程的创建和使用应该有了初步的了解。细心的人可能会发现,我们new一个Thread,然后给线程实例设置属性,比如是否后台线程等等,其实这部分工作可以交给下面介绍的线程池ThreadPool来做(D、E和F主要介绍线程池)。

 

D、线程池ThreadPool

在实际的项目中大家可能使用最多最熟悉的就是这个类了,所以没什么可说的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/// <summary>
/// ThreadPool测试
/// </summary>
static  void  QueueThreadPoolThreads()
{
     done = false ;
     for  ( int  i = 0; i < 10; i++)
     {
         ThreadPool.QueueUserWorkItem( new  WaitCallback(Count3Thread), i);
     }
 
     timer.Start();
}
 
static  void  Count3Thread( object  state)
{
     int  n = ( int )state;
     while  (!done)
     {
         ++threadPoolCounters[n];
     }
     Interlocked.Increment( ref  threadDone);
}

我们知道线程池里的线程默认都是后台线程,所以它实际上简化了线程的属性设置,更方便异步编程。

需要说明的是,线程池使用过程中会有这样那样的缺陷(虽然本文的几个线程池任务都不会受这种缺陷影响)。比如,我们一次性向线程池中加入100个任务,但是当前的系统可能只支持25个线程,并且每个线程正处于”忙碌“状态,如果一次性加入池中系统会处理不过来,那么多余的任务必须等待,这就造成等待的时间过长,系统无法响应。还好,ThreadPool提供了GetAvailableThreads方法,可以让你知道当前可用的工作线程数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static  void  QueueThreadPoolThreads()
{
     done = false ;
     for  ( int  i = 0; i < 10; i++)
     {
         //ThreadPool.QueueUserWorkItem(new WaitCallback(Count3Thread), i); //直接给程序池添加任务有时是很草率的
 
         WaitCallback wcb = new  WaitCallback(Count3Thread);
         int  workerThreads, availabeThreads;
         ThreadPool.GetAvailableThreads( out  workerThreads, out  availabeThreads);
         if  (workerThreads > 0) //可用线程数>0
         {
             ThreadPool.QueueUserWorkItem(wcb, i);
         }
         else
         {
             //to do 可以采取一种策略,让这个任务合理地分配给线程
         }
     }

如果没有可用的工作线程数,必须设计一定的策略,让这个任务合理地分配给线程。

也许就是类似于上面那样的限制,很多开发者都自己创建自己的线程池,同时也就有了后面的SmartThreadPoolManagedThreadPool大展身手的机会。

 

E、线程池SmartThreadPool

大名鼎鼎的SmartThreadPool,但是我从来没在项目中使用过,所以只是找了一段简单的代码测试一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/// <summary>
  /// SmartThreadPool测试
  /// </summary>
  static  void  QueueSmartThreadPoolThreads()
  {
      SmartThreadPool smartThreadPool = new  SmartThreadPool();
      // Create a work items group that processes
      // one work item at a time
      IWorkItemsGroup wig = smartThreadPool.CreateWorkItemsGroup(1);
 
      done = false ;
      timer.Start();
      for  ( int  i = 0; i < 10; i++)
      {
          wig.QueueWorkItem( new  WorkItemCallback(Count4Thread), i);
      }
      // Wait for the completion of all work items in the work items group
      wig.WaitForIdle();
      smartThreadPool.Shutdown();
  }
 
static  object  Count4Thread( object  state)
  {
      int  n = ( int )state;
      while  (!done)
      {
          ++threadPoolCounters[n];
      }
      Interlocked.Increment( ref  threadDone);
      return  null ;
  }

自从收藏这个SmartThreadPool.dll后,我还从没有在项目中使用过。查看它的源码注释挺少也挺乱的,不知道有没有高人知道它的一个效率更好的方法。您也可以看看英文原文,自己尝试体验一下。如果您熟悉使用SmartThreadPool,欢迎讨论。

 

F、线程池ManagedThreadPool

Stephen Toub这个完全用C#托管代码实现的线程池也非常有名,在Marc Clifton的英文原文中,作者也不吝溢美之词,赞它“quite excellent”,用当前异军突起的一个词汇形容就是太给力了,于我心有戚戚焉:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/// <summary>
    /// ManagedThreadPool测试
    /// </summary>
    static  void  QueueManagedThreadPoolThreads()
    {
        done = false ;
        timer.Start();
        for  ( int  i = 0; i < 10; i++)
        {
            Toub.Threading.ManagedThreadPool.QueueUserWorkItem( new  WaitCallback(Count5Thread), i);
        }
    }
    static  void  Count5Thread( object  state)
    {
        int  n = ( int )state;
        while  (!done)
        {
            ++threadPoolCounters[n];
        }
        Interlocked.Increment( ref  threadDone);
    }

 

对于这个托管的线程池,我个人的理解,就是它在管理线程的时候,这个池里还有一个缓存线程的池,即一个ArrayList对象。它一开始就初始化了一定数量的线程,并通过ProcessQueuedItems方法保证异步执行进入池中的队列任务(那个死循环有时可能导致CPU过分忙碌),这样在分配异步任务的时候,就省去了频繁去创建(new)一个线程。同时它在实现信号量(Semaphore)的同步和线程出入队列的设计上都可圈可点,非常巧妙,强烈推荐您阅读它的源码。

 

G、并行运算

下面的示例,我只使用了简单的System.Threading.Tasks.Parallel.For 对应的for 循环的并行运算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/// <summary>
/// 并行运算测试
/// </summary>
static  void  UseParallelTasks()
{
     done = false ;
     timer.Start();
     // System.Threading.Tasks.Parallel.For - for 循环的并行运算
     System.Threading.Tasks.Parallel.For(0, 10, (i) => { Count6Thread(i); });
}
static  void  Count6Thread( object  state)
{
     int  n = ( int )state;
     while  (!done)
     {
         ++threadPoolCounters[n];
     }
     Interlocked.Increment( ref  threadDone);
}

没有什么要特殊说明的,就是新类库的使用。看代码,好像比使用线程或线程池更加简单直接,有机会争取多用一用。我在本地测试的时候,在Release版本下,按照count的大小逆序排列,总体上G>D>F>E。需要注意到一件事,就是SmartThreadPool中排入队列的任务是一个返回值为Object的委托类型,这和其他的几个没有返回的(void类型)不同。SmartThreadPool口碑还是不错的,也许是我没有正确使用它。

 

最后小结一下:本文主要列举了C#中我所知道的几种常见的异步处理的方法,欢迎大家纠错或补充。






本文转自JeffWong博客园博客,原文链接:http://www.cnblogs.com/jeffwongishandsome/archive/2010/11/12/1876137.html,如需转载请自行联系原作者



目录
相关文章
|
1月前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
112 38
|
29天前
|
Java
.如何根据 CPU 核心数设计线程池线程数量
IO 密集型:核心数*2 计算密集型: 核心数+1 为什么加 1?即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费。
38 4
|
1月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
3月前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
152 29
|
1月前
|
Java
线程池内部机制:线程的保活与回收策略
【10月更文挑战第24天】 线程池是现代并发编程中管理线程资源的一种高效机制。它不仅能够复用线程,减少创建和销毁线程的开销,还能有效控制并发线程的数量,提高系统资源的利用率。本文将深入探讨线程池中线程的保活和回收机制,帮助你更好地理解和使用线程池。
87 2
|
1月前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
101 4
|
1月前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
275 2
|
2月前
|
Dubbo Java 应用服务中间件
剖析Tomcat线程池与JDK线程池的区别和联系!
剖析Tomcat线程池与JDK线程池的区别和联系!
154 0
剖析Tomcat线程池与JDK线程池的区别和联系!
|
2月前
|
SQL 传感器 开发框架
今天我们聊聊C#的并发和并行
今天我们聊聊C#的并发和并行
71 1
|
2月前
|
并行计算 安全 Java
Python 多线程并行执行详解
Python 多线程并行执行详解
78 3