深入浅出多线程系列之十一:生产者/消费者队列

简介:

上次我们使用AutoResetEvent实现了一个生产/消费者队列。这一次我们要使用WaitPulse方法来实现一个更强大的版本,它允许多个消费者,每一个消费者都在自己的线程中运行。

 

我们使用数组来跟踪线程。

Thread[] _workers;

 

通过跟踪线程可以让我们在所有的线程都结束后再结束我们的队列任务。 

每一个消费者线程都执行一个叫做Consume的方法,在一个for循环中,我们可以创建和启动线程。例如:

复制代码
       public  PCQueue( int  workerCount)
        {
            _workers 
=   new  Thread[workerCount];
            
for  ( int  i  =   0 ; i  <  workerCount; i ++ )
                (_workers[i] 
=   new  Thread(Consume)).Start();
        }
复制代码

 

上次我们使用的是一个字符串来代表任务,这次我们使用Action委托,它的定义如下:

Public delegate void Action();

为了表示一系列的任务,我们使用Queue<T> 集合,例如:

Queue<Action> _itemQ = new Queue<Action>();

 

在我们调用生产(EnqueueItem)和消费(Consume)方法前,还是完整的看一看代码吧:

 

复制代码
class  PCQueue
    {
        
readonly   object  _locker  =   new   object ();
        Thread[] _workers;
        Queue
< Action >  _itemQ  =   new  Queue < Action > ();  // 保存任务的队列
         public  PCQueue( int  workerCount)
        {
            _workers 
=   new  Thread[workerCount];
            
for  ( int  i  =   0 ; i  <  workerCount; i ++ )
                (_workers[i] 
=   new  Thread(Consume)).Start();
        }

        
public   void  Shutdown( bool  waitForWorkers)
        {
           
// 为每一个线程插入一个null item,可以是每一个worker 退出
             foreach  (Thread worker  in  _workers)
                EnqueueItem(
null );

           
// 等待所有的线程退出。
             if  (waitForWorkers)
                
foreach  (Thread worker  in  _workers)
                    worker.Join();
        }

        
public   void  EnqueueItem(Action item)
        {
            
lock  (_locker)
            {
                _itemQ.Enqueue(item);
                Monitor.Pulse(_locker); 
// 通知等待队列中的线程
            }
        }

        
void  Consume()
        {
            
while  ( true )
            {
                Action item;
                
lock  (_locker)
                {
                    
while  (_itemQ.Count  ==   0 )
                    {
                        Monitor.Wait(_locker); 
// 释放锁,并阻止当前线程,直到其他线程发送pulse信号。                    }
                    item  =  _itemQ.Dequeue();
                }

                
if  (item  ==   null return // 退出的信号
                item();
            }
        }
    }
复制代码

 

我们可以有一个退出策略,插入一个null item作为consumer退出的信号。如果我们想要快速的退出,可以使用一个独立的”cancel” 标记,因为我们支持多个consumers,所以我们必须为每一个consumer插入一个null item

 

下面是Main方法。使用两个consumer线程,然后让这两个consumers执行10个委托。

 

复制代码
        public   static   void  Main()
        {
            PCQueue q 
=   new  PCQueue( 2 );
            Console.WriteLine(
" Enqueuing 10 items... " );

            
for  ( int  i  =   0 ; i  <   10 ; i ++ )
            {
                
int  itemNumber  =  i;
                q.EnqueueItem(() 
=>
                    {
                        Thread.Sleep(
1000 );  // 模拟耗时的工作
                        Console.WriteLine( "  Task  "   +  itemNumber);
                    });
            }

            q.Shutdown(
true ); //等待关闭
            Console.WriteLine();
            Console.WriteLine(
" Workers complete! " );
        }
复制代码

 

 

下面让我们细致的看一看EnqueueItem方法:

复制代码
public   void  EnqueueItem(Action item)
        {
            
lock  (_locker)
            {
                _itemQ.Enqueue(item);
                Monitor.Pulse(_locker); 
// 通知等待队列中的线程
            }
        }
复制代码

因为我们的队列_itemQ被多线程环境使用,因此在对_itemQ进行读取的时候需要加锁lock.

因为我们插入了一个新的任务,我们必须修改阻塞条件,也就是调用pulse方法,来唤醒调用了wait方法的线程。

 

出于对效率的考虑,当插入一个Item的时候使用Pulse来代替PulseAll方法,因为大部分时候每一个Item只需要一个consumer来执行。如果你有一个冰淇淋,你不可能叫30个睡眠的孩子都起来吃它,同样,对于一个item,同时唤醒30consumers一点好处都没有。

 

让我们再看看Consumer方法。

我们希望当没什么事情做的时候,线程阻塞就可以了,换句话说,队列中没有item的时候,线程就应该阻塞。因此我们的阻塞条件是_itemQ.Count ==0;

 

复制代码
                Action item;
                
lock  (_locker)
                {
                    
while  (_itemQ.Count  ==   0 )
                    {
                        Monitor.Wait(_locker); 
// 释放锁,并阻止当前线程,直到其他线程发送pulse信号。                    }
                    item  =  _itemQ.Dequeue();
                }

                
if  (item  ==   null return // 退出的信号
                item();
复制代码

 

while循环退出的时候也意味着_itemQ 至少有一个item。我们必须在释放锁之前调用你哦个dequeue方法来获取item,考虑下下面的代码:

复制代码
                lock  (_locker)
                {
                    
while  (_itemQ.Count  ==   0 )
                    {
                        Monitor.Wait(_locker); 
// 释放锁,并阻止当前线程,直到其他线程发送pulse信号。                    }
                }
                
// 现在在这里可能被抢占,_itemQ可能被修改
                 lock  (_locker)
                {
                    item 
=  _itemQ.Dequeue();
                }
复制代码

 

itemDequeued后,我们就应该立即释放锁了,如果我们在执行task的时候,一直持有锁,则会没有必要的阻塞其他线程来获取任务。

 

Wait Timeouts

在调用Wait方法的时候可以传递一个毫秒或Timespan的时间来设置超时。如果Wait超时了,那么Wait方法就会返回false

带有超时功能的Wait方法的主要步骤:

  1. 释放锁。
  2. 阻塞 直到 pulsed 或者超时。
  3. 重新获取锁。

超时就好像CLR 在超时到了的时候自动的调用了 pulse方法一样。

 

下面是使用超时的Wait的主要代码:

         lock(_locker)

         while(<阻塞条件>)

                   Monitor.Wait(_locker,<超时时间>);

 

Monitor.Wait 方法返回一个bool值来代表是调用了pulse还是已经超时了。

如果是true 代表调用了pulse

如果是false:代表超时了。

这对记录日志很有用。

 

 






本文转自LoveJenny博客园博客,原文链接:http://www.cnblogs.com/LoveJenny/archive/2011/06/01/2060857.html,如需转载请自行联系原作者
目录
相关文章
|
29天前
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
38 7
|
29天前
|
消息中间件 存储 安全
|
2月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
20 1
|
2月前
|
存储 运维 API
源码解密协程队列和线程队列的实现原理(一)
源码解密协程队列和线程队列的实现原理(一)
35 1
|
2月前
|
存储 安全 API
源码解密协程队列和线程队列的实现原理(二)
源码解密协程队列和线程队列的实现原理(二)
35 1
|
2月前
|
消息中间件 NoSQL 关系型数据库
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
25 0
|
4月前
|
算法 Java
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
该博客文章综合介绍了Java并发编程的基础知识,包括线程与进程的区别、并发与并行的概念、线程的生命周期状态、`sleep`与`wait`方法的差异、`Lock`接口及其实现类与`synchronized`关键字的对比,以及生产者和消费者问题的解决方案和使用`Condition`对象替代`synchronized`关键字的方法。
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
|
4月前
|
存储 监控 Java
|
4月前
|
消息中间件 设计模式 安全
多线程魔法:揭秘一个JVM中如何同时运行多个消费者
【8月更文挑战第22天】在Java虚拟机(JVM)中探索多消费者模式,此模式解耦生产与消费过程,提升系统性能。通过`ExecutorService`和`BlockingQueue`构建含2个生产者及4个消费者的系统,实现实时消息处理。多消费者模式虽增强处理能力,但也引入线程安全与资源竞争等挑战,需谨慎设计以确保高效稳定运行。
94 2
|
4月前
|
安全 Java
Java模拟生产者-消费者问题。生产者不断的往仓库中存放产品,消费者从仓库中消费产品。其中生产者和消费者都可以有若干个。在这里,生产者是一个线程,消费者是一个线程。仓库容量有限,只有库满时生产者不能存
该博客文章通过Java代码示例演示了生产者-消费者问题,其中生产者在仓库未满时生产产品,消费者在仓库有产品时消费产品,通过同步机制确保多线程环境下的线程安全和有效通信。