《C++ Concurrencyin Action》第9章--高级线程管理

简介: 《C++ Concurrencyin Action》第9章--高级线程管理

本章主要内容

· 线程池

· 处理线程池中任务的依赖关系

· 池中线程如何获取任务

· 中断线程

之前的章节中,我们通过创建std::thread对象来对线程进行管理。在一些情况下,这种方式不可行了,因为需要在线程的整个生命周期中对其进行管理,并根据硬件来确定线程数量,等等。理想情况是将代码划分为最小块,再并发执行,之后交给处理器和标准库,进行性能优化

另一种情况是,当使用多线程来解决某个问题时,在某个条件达成的时候,可以提前结束。可能是因为结果已经确定,或者因为产生错误,亦或是用户执行终止操作。无论是哪种原因,线程都需要发送“请停止”请求,放弃任务,清理,然后尽快停止。

本章,我们将了解一下管理线程和任务的机制,从自动管理线程数量和自动管理任务划分开始。

9.1线程池****

很多公司里,雇员通常会在办公室度过他们的办公时光(偶尔也会外出访问客户或供应商),或是参加贸易展会。虽然外出可能很有必要,并且可能需要很多人一起去,不过对于一些特别的雇员来说,一趟可能就是几个月,甚至是几年。公司要给每个雇员都配一辆车,这基本上是不可能的,不过公司可以提供一些共用车辆;这样就会有一定数量车,来让所有雇员使用。当一个员工要去异地旅游时,那么他就可以从共用车辆中预定一辆,并在返回公司的时候将车交还。如果某天没有闲置的共用车辆,雇员就得不延后其旅程了。

线程池就是类似的一种方式,在大多数系统中,将每个任务指定给某个线程是不切实际的,不过可以利用现有的并发性,进行并发执行。线程池就提供了这样的功能,提交到线程池中的任务将并发执行,提交的任务将会挂在任务队列上。队列中的每一个任务都会被池中的工作线程所获取,当任务执行完成后,再回到线程池中获取下一个任务。

创建一个线程池时,会遇到几个关键性的设计问题,比如:可使用的线程数量,高效的任务分配方式,以及是否需要等待一个任务完成。

在本节,我们将看到线程池是如何解决这些问题的,从最简单的线程池开始吧!

9.1.1** 最简单的线程池**

作为最简单的线程池,其拥有固定数量的工作线程(通常工作线程数量与std::thread::hardware_concurrency()相同)。当工作需要完成时,可以调用函数将任务挂在任务队列中。每个工作线程都会从任务队列上获取任务,然后执行这个任务,执行完成后再回来获取新的任务。在最简单的线程池中,线程就不需要等待其他线程完成对应任务了。如果需要等待,就需要对同步进行管理。

下面清单中的代码就展示了一个最简单的线程池实现。

清单9.1 简单的线程池

class thread_pool
{
std::atomic_bool done;
thread_safe_queue > work_queue; // 1
std::vectorstd::threadthreads; // 2
join_threads joiner; //** 3**
void worker_thread()
{
while(!done)  //** 4**
{
  std::function<void()>task;
 if(work_queue.try_pop(task))  //** 5**
  {
    task();  // **6**
  }
  else
  {
   std::this_thread::yield();  // **7**
  }
}
}
public:
thread_pool():
done(false),joiner(threads)

{

unsigned constthread_count=std::thread::hardware_concurrency();  // **8**
try
{
  for(unsignedi=0;i<thread_count;++i)
  {
    threads.push_back( 
     std::thread(&thread_pool::worker_thread,this));  // **9**
  }
}
catch(...)
{
  done=true;  // **10**
  throw;
}
}
~thread_pool()
{
done=true;  // **11**
}
template
void submit(FunctionType f)
{
work_queue.push(std::function(f)); // 12
}
};

实现中有一组工作线程②,并且使用了一个线程安全队列(见第6章)①来管理任务队列。这种情况下,用户不用等待任务,并且任务不需要返回任何值,所以可以使用std::function对任务进行封装。submit()函数会将函数或可调用对象包装成一个std::function实例,并将其推入队列中⑫。

线程始于构造函数:使用std::thread::hardware_concurrency()来获取硬件支持多少个并发线程⑧,这些线程会在worker_thread()成员函数中执行⑨。

当有异常抛出时,线程启动就会失败,所以需要保证任何已启动的线程都能停止,并且能在这种情况下清理干净。当有异常抛出时,通过使用try-catch来设置done标志⑩,还有join_threads类的实例(来自于第8章)③用来汇聚所有线程。当然也需要析构函数:仅设置done标志⑪,并且join_threads确保所有线程在线程池销毁前全部执行完成。注意成员声明的顺序很重要:done标志和worker_queue必须在threads数组之前声明,而数据必须在joiner前声明。这就能确保成员能以正确的顺序销毁;比如,所有线程都停止运行时,队列就可以安全的销毁了。

worker_thread函数很简单:从任务队列上获取任务⑤,以及同时执行这些任务⑥,执行一个循环直到done标志被设置④。如果任务队列上没有任务,函数会调用std::this_thread::yield()让线程休息⑦,并且给予其他线程向任务队列上推送任务的机会。

一些简单的情况,这样线程池就足以满足要求,特别是任务没有返回值,或需要执行一些阻塞操作的时候。不过,在很多情况下,这样简单的线程池完全不够用,其他情况使用这样简单的线程池可能会出现问题,比如:死锁。同样,在简单例子中,使用std::async能提供更好的功能(如第8章中的例子)。

在本章中,我们将了解一下更加复杂的线程池实现,通过添加特性满足用户需求,或减少问题的发生几率。

首先,从已经提交的任务开始说起。

9.1.2** 等待提交到线程池中的任务**

第8章中的例子中,线程间的任务划分完成后,代码会显式生成新线程,主线程通常就是等待新线程在返回调用之前结束,确保所有任务都完成。使用线程池,就需要等待任务提交到线程池中,而非直接提交给单个线程。这与基于std::async的方法(第8章等待future的例子)类似,使用清单9.1中的简单线程池,使用第4章中提到的工具:条件变量和future。虽然,会增加代码的复杂度,不过,要比直接对任务进行等待的方式好很多。

通过增加线程池的复杂度,可以直接等待任务完成。使用submit()函数返回一个对任务描述的句柄,用来等待任务的完成。任务句柄会用条件变量或future进行包装,这样能使用线程池来简化代码。

一种特殊的情况是,执行任务的线程需要返回一个结果到主线程上进行处理。你已经在本书中看到多个这样的例子,比如:parallel_accumulate()(第2章)。这种情况下,需要用future对最终的结果进行转移。清单9.2展示了对简单线程池的修改,通过修改就能等待任务完成,以及在工作线程完成后,返回一个结果到等待线程中去,不过std::packaged_task<>实例是不可拷贝的,仅是可移动的,所以不能再使用std::function<>来实现任务队列,因为std::function<>需要存储可复制构造的函数对象。包装一个自定义函数,用来处理只可移动的类型。这就是一个带有函数操作符的类型擦除类。只需要处理那些没有函数和无返回的函数,所以这是一个简单的虚函数调用。

清单9.2 可等待任务的线程池

class function_wrapper
{
struct impl_base
{
virtual void call()=0;
virtual ~impl_base() {}
};
std::unique_ptrimpl;
template
struct impl_type: impl_base
{
F f;
impl_type(F&& f_): f(std::move(f_)){}
void call() { f(); }
};
public:
template
function_wrapper(F&&f):impl(new impl_type(std::move(f))) {}
void operator()() {impl->call(); }
function_wrapper() = default;
function_wrapper(function_wrapper&&other):impl(std::move(other.impl)) {}
function_wrapper&operator=(function_wrapper&& other)
{
impl=std::move(other.impl);
return *this;
}
function_wrapper(constfunction_wrapper&)=delete;
function_wrapper(function_wrapper&)=delete;
function_wrapper&operator=(const function_wrapper&)=delete;
};
class thread_pool
{
thread_safe_queue work_queue; // 使用function_wrapper,而非使用std::function
void worker_thread()
{
while(!done)
{
  function_wrapper task;
  if(work_queue.try_pop(task))
  {
    task();
  }
  else
  {
    std::this_thread::yield();
  }
}
}
public:
template
std::future::type> submit(FunctionType f) //** 1**
{
typedef typenamestd::result_of<FunctionType()>::type result_type;  // 2

std::packaged_task task(std::move(f)); // 3

std::future<result_type>res(task.get_future());  // **4**
work_queue.push(std::move(task));  //** 5**

return res;  // **6**
}
// 休息一下
};

首先,修改的是submit()函数①返回一个std::future<>保存任务的返回值,并且允许调用者等待任务完全结束。因为需要知道提供函数f的返回类型,所以使用std::result_of<>:std::result_of::type是FunctionType类型的引用实例(如,f),并且没有参数。同样,函数中可以对result_type typedef②使用std::result_of<>。

然后,将f包装入std::packaged_task③,因为f是一个无参数的函数或是可调用对象,能够返回result_type类型的实例。向任务队列推送任务⑤和返回future⑥前,就可以从std::packaged_task<>中获取future④。注意,要将任务推送到任务队列中时,只能使用std::move(),因为std::packaged_task<>是不可拷贝的。为了对任务进行处理,队列里面存的就是function_wrapper对象,而非std::function对象。

现在线程池允许等待任务,并且返回任务后的结果。下面的清单就展示了,如何让parallel_accumuate函数使用线程池。

清单9.3 parallel_accumulate使用一个可等待任务的线程池

template
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long constlength=std::distance(first,last);
if(!length)
return init;
unsigned long const block_size=25;
unsigned long constnum_blocks=(length+block_size-1)/block_size; // 1
std::vector futures(num_blocks-1);
thread_pool pool;
Iterator block_start=first;
for(unsigned longi=0;i<(num_blocks-1);++i)
{
Iterator block_end=block_start;

std::advance(block_end,block_size);

futures[i]=pool.submit(accumulate_block()); // 2

block_start=block_end;
}
Tlast_result=accumulate_block()(block_start,last);
T result=init;
for(unsigned longi=0;i<(num_blocks-1);++i)
{
result+=futures[i].get();
}
result += last_result;
return result;
}

与清单8.4相比,有几个点需要注意一下。首先,工作量是依据使用的块数(num_blocks①),而不是线程的数量。为了利用线程池的最大化可扩展性,需要将工作块划分为最小工作块。当线程池中线程不多时,每个线程将会处理多个工作块,不过随着硬件可用线程数量的增长,会有越来越多的工作块并发执行。

当你选择“因为能并发执行,最小工作块值的一试”时,就需要谨慎了。向线程池提交一个任务有一定的开销;让工作线程执行这个任务,并且将返回值保存在std::future<>中,对于太小的任务,这样的开销不划算。如果任务块太小,使用线程池的速度可能都不及单线程。

假设,任务块的大小合理,就不用为这些事而担心:打包任务、获取future或存储之后要汇入的std::thread对象;使用线程池的时候,这些都需要注意。之后,就是调用submit()来提交任务②。

线程池也需要注意异常安全。任何异常都会通过submit()返回给future,并在获取future的结果时,抛出异常。如果函数因为异常退出,线程池的析构函数会丢掉那些没有完成的任务,等待线程池中的工作线程完成工作。

在简单的例子中,这个线程池工作的还算不错,因为这里的任务都是相互独立的。不过,当任务队列中的任务有依赖关系时,这个线程池就不能胜任了。

9.1.3** 等待依赖任务**

快速排序算法为例,原理很简单:数据与中轴数据项比较,在中轴项两侧分为大于和小于的两个序列,然后再对这两组序列进行排序。这两组序列会递归排序,最后会整合成一个全排序序列。要将这个算法写成并发模式,需要保证递归调用能够使用硬件的并发能力。

回到第4章,第一次接触这个例子,我们使用std::async来执行每一层的调用,让标准库来选择,是在新线程上执行这个任务,还是当对应get()调用时,进行同步执行。运行起来很不错,因为每一个任务都在其自己的线程上执行,或当需要的时候进行调用。

当回顾第8章时,使用了一个固定线程数量(根据硬件可用并发线程数)的结构体。在这样的情况下,使用了栈来挂起要排序的数据块。当每个线程在为一个数据块排序前,会向数据栈上添加一组要排序的数据,然后对当前数据块排序结束后,接着对另一块进行排序。这里,等待其他线程完成排序,可能会造成死锁,因为这会消耗有限的线程。有一种情况很可能会出现,就是所有线程都在等某一个数据块被排序,不过没有线程在做排序。通过拉取栈上数据块的线程,对数据块进行排序,来解决这个问题;因为,已处理的指定数据块,就是其他线程都在等待排序的数据块。

如果只用简单的线程池进行替换,例如:第4章替换std::async的线程池。只有固定数量的线程,因为线程池中没有空闲的线程,线程会等待没有被安排的任务。因此,需要和第8章中类似的解决方案:当等待某个数据块完成时,去处理未完成的数据块。如果使用线程池来管理任务列表和相关线程——使用线程池的主要原因——就不用再去访问任务列表了。可以对线程池做一些改动,自动完成这些事情。

最简单的方法就是在thread_pool中添加一个新函数,来执行任务队列上的任务,并对线程池进行管理。高级线程池的实现可能会在等待函数中添加逻辑,或等待其他函数来处理这个任务,优先的任务会让其他的任务进行等待。下面清单中的实现,就展示了一个新run_pending_task()函数,对于快速排序的修改将会在清单9.5中展示。

清单9.4 run_pending_task()函数实现

void thread_pool::run_pending_task()
{
function_wrapper task;
if(work_queue.try_pop(task))
{
task();
}
else
{
std::this_thread::yield();

}

}

run_pending_task()的实现去掉了在worker_thread()函数的主循环。函数任务队列中有任务的时候,执行任务;要是没有的话,就会让操作系统对线程进行重新分配。

下面快速排序算法的实现要比清单8.1中版本简单许多,因为所有线程管理逻辑都被移入到线程池。

清单9.5 基于线程池的快速排序实现

template
struct sorter // 1
{
thread_pool pool; //** 2**
std::listdo_sort(std::list& chunk_data)
{
if(chunk_data.empty())
{
  return chunk_data;
}
std::list<T> result;

result.splice(result.begin(),chunk_data,chunk_data.begin());

T const&partition_val=*result.begin();
typenamestd::list<T>::iterator divide_point=
 std::partition(chunk_data.begin(),chunk_data.end(),[&](T const&val){return val<partition_val;});
std::list<T>new_lower_chunk;

new_lower_chunk.splice(new_lower_chunk.end(),chunk_data,chunk_data.begin(),divide_point);

std::future new_lower=

pool.submit(std::bind(&sorter::do_sort,this,std::move(new_lower_chunk)));// **3**
std::list<T>new_higher(do_sort(chunk_data));

result.splice(result.end(),new_higher);

while(!new_lower.wait_for(std::chrono::seconds(0))== std::future_status::timeout)
{
  pool.run_pending_task();  // **4**
}

result.splice(result.begin(),new_lower.get());

return result;
}
};
template
std::list parallel_quick_sort(std::list input)
{
if(input.empty())
{
return input;
}
sorter s;
return s.do_sort(input);
}

与清单8.1相比,这里将实际工作放在sorter类模板的do_sort()成员函数中执行①,即使例子中仅对thread_pool实例进行包装②。

线程和任务管理,在线程等待的时候,就会少向线程池中提交一个任务③,并且执行任务队列上未完成的任务④。需要显式的管理线程和栈上要排序的数据块。当有任务提交到线程池中,可以使用std::bind()绑定this指针到do_sort()上,绑定是为了让数据块进行排序。这种情况下,需要对new_lower_chunk使用std::move()将其传入函数,数据移动要比拷贝的方式开销少。

虽然,使用等待其他任务的方式,解决了死锁问题,这个线程池距离理想的线程池很远。

首先,每次对submit()的调用和对run_pending_task()的调用,访问的都是同一个队列。在第8章中,当多线程去修改一组数据,就会对性能有所影响,所以需要解决这个问题。

9.1.4** 避免队列中的任务竞争**

线程每次调用线程池的submit()函数,都会推送一个任务到工作队列中。就像工作线程为了执行任务,从任务队列中获取任务一样。这意味着随着处理器的增加,在任务队列上就会有很多的竞争,这会让性能下降。使用无锁队列会让任务没有明显的等待,但是乒乓缓存会消耗大量的时间。

为了避免乒乓缓存,每个线程建立独立的任务队列。这样,每个线程就会将新任务放在自己的任务队列上,并且当线程上的任务队列没有任务时,去全局的任务列表中取任务。下面列表中的实现,使用了一个thread_local变量,来保证每个线程都拥有自己的任务列表(如全局列表那样)。

清单9.6 线程池——线程具有本地任务队列

class thread_pool
{
thread_safe_queue pool_work_queue;
typedefstd::queue local_queue_type; // 1
static thread_localstd::unique_ptr local_work_queue; // 2
void worker_thread()
{
local_work_queue.reset(newlocal_queue_type);  // **3**
while(!done)
{
  run_pending_task();
}
}
public:
template
std::future::type> submit(FunctionType f)
{
typedef typenamestd::result_of<FunctionType()>::type result_type;

std::packaged_task task(f);

std::future<result_type>res(task.get_future());
if(local_work_queue)  //** 4**
{
  local_work_queue->push(std::move(task));
}
else
{
 pool_work_queue.push(std::move(task)); //** 5**
}
return res;
}
void run_pending_task()
{
function_wrapper task;
if(local_work_queue &&!local_work_queue->empty())  // **6**
{
  task=std::move(local_work_queue->front());
  local_work_queue->pop();
  task();
}
elseif(pool_work_queue.try_pop(task))  // **7**
{
  task();
}
else
{
  std::this_thread::yield();
}
}
// rest as before
};

因为不希望非线程池中的线程也拥有一个任务队列,使用std::unique_ptr<>指向线程本地的工作队列②;这个指针在worker_thread()中进行初始化③。std:unique_ptr<>的析构函数会保证在线程退出的时候,工作队列被销毁。

submit()会检查当前线程是否具有一个工作队列④。如果有,就是线程池中的线程,可以将任务放入线程的本地队列中;否者,就像之前一样将这个任务放在线程池中的全局队列中⑤。

run_pending_task()⑥中的检查和之前类似,只是要对是否存在本地任务队列进行检查。如果存在,就会从队列中的第一个任务开始处理;注意本地任务队列可以是一个普通的std::queue<>①,因为这个队列只能被一个线程所访问,就不存在竞争。如果本地线程上没有任务,就会从全局工作列表上获取任务⑦。

这样就能有效避免竞争,不过当任务分配不均时,造成的结果就是:某个线程本地队列中有很多任务的同时,其他线程无所事事。例如:举一个快速排序的例子,只有一开始的数据块能在线程池上被处理,因为剩余部分会放在工作线程的本地队列上进行处理,这样的使用方式也违背使用线程池的初衷。

幸好,这个问题是有解:本地工作队列和全局工作队列上没有任务时,可从别的线程队列中窃取任务。

9.1.5** 窃取任务**

为了让没有任务的线程能从其他线程的任务队列中获取任务,就需要本地任务列表可以进行访问,这样才能让run_pending_tasks()窃取任务。需要每个线程在线程池队列上进行注册,或由线程池指定一个线程。同样,还需要保证数据队列中的任务适当的被同步和保护,这样队列的不变量就不会被破坏。

实现一个无锁队列,让其拥有线程在其他线程窃取任务的时候,能够推送和弹出一个任务是可能的;不过,这个队列的实现就超出了本书的讨论范围。为了证明这种方法的可行性,将使用一个互斥量来保护队列中的数据。我们希望任务窃取是一个不常见的现象,这样就会减少对互斥量的竞争,并且使得简单队列的开销最小。下面,实现了一个简单的基于锁的任务窃取队列。

清单9.7 基于锁的任务窃取队列

class work_stealing_queue
{
private:
typedef function_wrapperdata_type;
std::dequethe_queue; //** 1**
mutable std::mutex the_mutex;
public:
work_stealing_queue(){}
work_stealing_queue(constwork_stealing_queue& other)=delete;
work_stealing_queue&operator=(const work_stealing_queue& other)=delete;
void push(data_type data) // 2
{
std::lock_guardstd::mutex lock(the_mutex);
the_queue.push_front(std::move(data));
}
bool empty() const
{
std::lock_guardstd::mutex lock(the_mutex);
return the_queue.empty();
}
bool try_pop(data_type&res) //** 3**
{
std::lock_guardstd::mutex lock(the_mutex);
if(the_queue.empty())
{
  return false;
}

res=std::move(the_queue.front());

the_queue.pop_front();
return true;
}
bool try_steal(data_type&res) // 4
{
std::lock_guardstd::mutex lock(the_mutex);
if(the_queue.empty())
{
  return false;
}
res=std::move(the_queue.back());
the_queue.pop_back();
return true;

}

};

这个队列对std::deque进行了简单的包装①,就能通过一个互斥锁来对所有访问进行控制了。push()②和try_pop()③对队列的前端进行操作,try_steal()④对队列的后端进行操作。

这就说明每个线程中的“队列”是一个后进先出的栈,最新推入的任务将会第一个执行。从缓存角度来看,这将对性能有所提升,因为任务相关的数据一直存于缓存中,要比提前将任务相关数据推送到栈上好。同样,这种方式很好的映射到某个算法上,例如:快速排序。之前的实现中,每次调用do_sort()都会推送一个任务到栈上,并且等待这个任务执行完毕。通过对最新推入任务的处理,就可以保证在将当前所需数据块处理完成前,其他任务是否需要这些数据块,从而可以减少活动任务的数量和栈的使用次数。try_steal()从队列末尾获取任务,为了减少与try_pop()之间的竞争;使用在第6、7章中的所讨论的技术来让try_pop()和try_steal()并发执行。

OK,现在拥有了一个很不错的任务队列,并且支持窃取;那这个队列将如何在线程池中使用呢?这里简单的展示一下。

清单9.8 使用任务窃取的线程池

class thread_pool
{
typedef function_wrappertask_type;
std::atomic_bool done;
thread_safe_queuepool_work_queue;
std::vector> queues; // 1
std::vectorstd::threadthreads;
join_threads joiner;
static thread_localwork_stealing_queue* local_work_queue; // 2
static thread_local unsignedmy_index;
void worker_thread(unsigned my_index_)
{
my_index=my_index_;

local_work_queue=queues[my_index].get(); // 3

while(!done)
{
  run_pending_task();
}
}
boolpop_task_from_local_queue(task_type& task)
{
return local_work_queue&& local_work_queue->try_pop(task);
}
boolpop_task_from_pool_queue(task_type& task)
{
returnpool_work_queue.try_pop(task);
}
boolpop_task_from_other_thread_queue(task_type& task) // 4
{
for(unsignedi=0;i<queues.size();++i)
{
  unsigned const index=(my_index+i+1)%queues.size();  //** 5**
 if(queues[index]->try_steal(task))
  {
    return true;
  }
}
return false;
}
public:
thread_pool(): done(false),joiner(threads)
{
unsigned constthread_count=std::thread::hardware_concurrency();
try
{
  for(unsignedi=0;i<thread_count;++i)
  {
    queues.push_back(std::unique_ptr<work_stealing_queue>(newwork_stealing_queue)); // 6
   threads.push_back(std::thread(&thread_pool::worker_thread,this,i));
  }
}
catch(...)
{
  done=true;
  throw;
}
}
~thread_pool()
{
done=true;
}
template
std::future::type> submit(FunctionType f)
{
typedef typename std::result_of<FunctionType()>::typeresult_type;

std::packaged_task task(f);

std::future<result_type>res(task.get_future());
if(local_work_queue)
{
 local_work_queue->push(std::move(task));
}
else
{
  pool_work_queue.push(std::move(task));
}
return res;
}
void run_pending_task()
{
task_type task;

if(pop_task_from_local_queue(task) || // 7

pop_task_from_pool_queue(task) || // **8**
  pop_task_from_other_thread_queue(task)) // **9**
{
  task();
}
else
{
  std::this_thread::yield();
}

}

};

这段代码与清单9.6很相似。第一个不同在于,每个线程都有一个work_stealing_queue,而非只是普通的std::queue<>②。当每个线程被创建,就创建了一个属于自己的工作队列⑥,每个线程自己的工作队列将存储在线程池的全局工作队列中①。列表中队列的序号,会传递给线程函数,然后使用序号来索引对应队列③。这就意味着线程池可以访问任意线程中的队列,为了给闲置线程窃取任务。run_pending_task()将会从线程的任务队列中取出一个任务来执行⑦,或从线程池队列中获取一个任务⑧,亦或从其他线程的队列中获取一个任务⑨。

pop_task_from_other_thread_queue()④会遍历池中所有线程的任务队列,然后尝试窃取任务。为了避免每个线程都尝试从列表中的第一个线程上窃取任务,每一个线程都会从下一个线程开始遍历,通过自身的线程序号来确定开始遍历的线程序号。

使用线程池有很多好处,还有很多很多的方式能为某些特殊用法提升性能,不过这就留给读者作为作业吧。

特别是还没有探究动态变换大小的线程池,即使线程被阻塞的时候(例如:I/O或互斥锁),程序都能保证CPU最优的使用率。

下面,我们将来了解一下线程管理的“高级”用法——中断线程。

9.2中断线程****

很多情况下,使用信号来终止一个长时间运行的线程是合理的。这种线程的存在,可能是因为工作线程所在的线程池被销毁,或是用户显式的取消了这个任务,亦或其他各种原因。不管是什么原因,原理都一样:需要使用信号来让未结束线程停止运行。这里需要一种合适的方式让线程主动的停下来,而非让线程戛然而止。

你可能会给每种情况制定一个独立的机制,这样做的意义不大。不仅因为用统一的机制会更容易在之后的场景中实现,而且写出来的中断代码不用担心在哪里使用。C++11标准没有提供这样的机制,不过实现这样的机制也并不困难。

在了解一下应该如何实现这种机制前,先来了解一下启动和中断线程的接口。

9.2.1** 启动和中断线程**

先看一下外部接口,需要从可中断线程上获取些什么?最起码需要和std::thread相同的接口,还要多加一个interrupt()函数:

class interruptible_thread
{
public:
template
interruptible_thread(FunctionTypef);
void join();
void detach();
bool joinable() const;
void interrupt();
};

类内部可以使用std::thread来管理线程,并且使用一些自定义数据结构来处理中断。现在,从线程的角度能看到什么呢?“能用这个类来中断线程”——需要一个断点(interruption point)。在不添加多余的数据的前提下,为了使断点能够正常使用,就需要使用一个没有参数的函数:interruption_point()。这意味着中断数据结构可以访问thread_local变量,并在线程运行时,对变量进行设置,因此当线程调用interruption_point()函数时,就会去检查当前运行线程的数据结构。我们将在后面看到interruption_point()的具体实现。

thread_local标志是不能使用普通的std::thread管理线程的主要原因;需要使用一种方法分配出一个可访问的interruptible_thread实例,就像新启动一个线程一样。在使用已提供函数来做这件事情前,需要将interruptible_thread实例传递给std::thread的构造函数,创建一个能够执行的线程,就像下面的代码清单所实现。

清单9.9 interruptible_thread的基本实现

class interrupt_flag
{
public:
void set();
bool is_set() const;
};
thread_local interrupt_flag this_thread_interrupt_flag; // 1
class interruptible_thread
{
std::thread internal_thread;
interrupt_flag* flag;
public:
template
interruptible_thread(FunctionTypef)
{
std::promise p; //** 2**
internal_thread=std::thread([f,&p]{  // **3**
 p.set_value(&this_thread_interrupt_flag);
  f();  //** 4**
});
flag=p.get_future().get();  //** 5**
}
void interrupt()
{
if(flag)
{
  flag->set();  //** 6**
}

}

};

提供函数f是包装了一个lambda函数③,线程将会持有f副本和本地promise变量§的引用②。在新线程中,lambda函数设置promise变量的值到this_thread_interrupt_flag(在thread_local①中声明)的地址中,为的是让线程能够调用提供函数的副本④。调用线程会等待与其future相关的promise就绪,并且将结果存入到flag成员变量中⑤。注意,即使lambda函数在新线程上执行,对本地变量p进行悬空引用,都没有问题,因为在新线程返回之前,interruptible_thread构造函数会等待变量p,直到变量p不被引用。实现没有考虑处理汇入线程,或分离线程。所以,需要flag变量在线程退出或分离前已经声明,这样就能避免悬空问题。

interrupt()函数相对简单:需要一个线程去做中断时,需要一个合法指针作为一个中断标志,所以可以仅对标志进行设置⑥。

9.2.2** 检查线程是否中断**

现在就可以设置中断标志了,不过不检查线程是否被中断,这样的意义就不大了。使用interruption_point()函数最简单的情况;可以在一个安全的地方调用这个函数,如果标志已经设置,就可以抛出一个thread_interrupted异常:

void interruption_point()
{
if(this_thread_interrupt_flag.is_set())
{
throw thread_interrupted();

}

}

代码中可以在适当的地方使用这个函数:

void foo()
{
while(!done)
{
interruption_point();
process_next_item();

}

}

虽然也能工作,但不理想。最好实在线程等待或阻塞的时候中断线程,因为这时的线程不能运行,也就不能调用interruption_point()函数!在线程等待的时候,什么方式才能去中断线程呢?

9.2.3** 中断等待——条件变量**

OK,需要仔细选择中断的位置,并通过显式调用interruption_point()进行中断,不过在线程阻塞等待的时候,这种办法就显得苍白无力了,例如:等待条件变量的通知。就需要一个新函数——interruptible_wait()——就可以运行各种需要等待的任务,并且可以知道如何中断等待。之前提到,可能会等待一个条件变量,所以就从它开始:如何做才能中断一个等待的条件变量呢?最简单的方式是,当设置中断标志时,需要提醒条件变量,并在等待后立即设置断点。为了让其工作,需要提醒所有等待对应条件变量的线程,就能确保感谢兴趣的线程能够苏醒。伪苏醒是无论如何都要处理的,所以其他线程(非感兴趣线程)将会被当作伪苏醒处理——两者之间没什么区别。interrupt_flag结构需要存储一个指针指向一个条件变量,所以用set()函数对其进行提醒。为条件变量实现的interruptible_wait()可能会看起来像下面清单中所示。

清单9.10 为std::condition_variable实现的interruptible_wait有问题版

void interruptible_wait(std::condition_variable& cv,
std::unique_lockstd::mutex& lk)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv); //** 1**
cv.wait(lk); //** 2**
this_thread_interrupt_flag.clear_condition_variable(); //** 3**
interruption_point();
}

假设函数能够设置和清除相关条件变量上的中断标志,代码会检查中断,通过interrupt_flag为当前线程关联条件变量①,等待条件变量②,清理相关条件变量③,并且再次检查中断。如果线程在等待期间被条件变量所中断,中断线程将广播条件变量,并唤醒等待该条件变量的线程,所以这里就可以检查中断。不幸的是,代码有两个问题。第一个问题比较明显,如果想要线程安全:std::condition_variable::wait()可以抛出异常,所以这里会直接退出,而没有通过条件变量删除相关的中断标志。这个问题很容易修复,就是在析构函数中添加相关删除操作即可。

第二个问题就不大明显了,这段代码存在条件竞争。虽然,线程可以通过调用interruption_point()被中断,不过在调用wait()后,条件变量和相关中断标志就没有什么系了,因为线程不是等待状态,所以不能通过条件变量的方式唤醒。就需要确保线程不会在最后一次中断检查和调用wait()间被唤醒。这里,不对std::condition_variable的内部结构进行研究;不过,可通过一种方法来解决这个问题:使用lk上的互斥量对线程进行保护,这就需要将lk传递到set_condition_variable()函数中去。不幸的是,这将产生两个新问题:需要传递一个互斥量的引用到一个不知道生命周期的线程中去(这个线程做中断操作)为该线程上锁(调用interrupt()的时候)。这里可能会死锁,并且可能访问到一个已经销毁的互斥量,所以这种方法不可取。当不能完全确定能中断条件变量等待——没有interruptible_wait()情况下也可以时(可能有些严格),那有没有其他选择呢?一个选择就是放置超时等待,使用wait_for()并带有一个简单的超时量(比如,1ms)。在线程被中断前,算是给了线程一个等待的上限(以时钟刻度为基准)。如果这样做了,等待线程将会看到更多因为超时而“伪”苏醒的线程,不过超时也不轻易的就帮助到我们。与interrupt_flag相关的实现的一个实现放在下面的清单中展示。

清单9.11 为std::condition_variable在interruptible_wait中使用超时

class interrupt_flag
{
std::atomic flag;
std::condition_variable*thread_cond;
std::mutex set_clear_mutex;
public:
interrupt_flag(): thread_cond(0){}
void set()
{
flag.store(true,std::memory_order_relaxed);
std::lock_guardstd::mutex lk(set_clear_mutex);
if(thread_cond)
{
  thread_cond->notify_all();
}
}
bool is_set() const
{
returnflag.load(std::memory_order_relaxed);
}
voidset_condition_variable(std::condition_variable& cv)
{
std::lock_guardstd::mutex lk(set_clear_mutex);
thread_cond=&cv;
}
void clear_condition_variable()
{
std::lock_guardstd::mutex lk(set_clear_mutex);
thread_cond=0;
}
struct clear_cv_on_destruct
{
~clear_cv_on_destruct()
{
  this_thread_interrupt_flag.clear_condition_variable();
}
};
};
void interruptible_wait(std::condition_variable&cv,std::unique_lockstd::mutex& lk)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv);
interrupt_flag::clear_cv_on_destruct guard;
interruption_point();
cv.wait_for(lk,std::chrono::milliseconds(1));
interruption_point();
}
如果有谓词(相关函数)进行等待,1ms的超时将会完全在谓词循环中完全隐藏:
template
void interruptible_wait(std::condition_variable&cv,std::unique_lockstd::mutex& lk,Predicate pred)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv);
interrupt_flag::clear_cv_on_destruct guard;
while(!this_thread_interrupt_flag.is_set() && !pred())
{
cv.wait_for(lk,std::chrono::milliseconds(1));
}
interruption_point();
}

这会让谓词被检查的次数增加许多,不过对于简单调用wait()这套实现还是很好用的。超时变量很容易实现:通过制定时间,比如:1ms或更短。OK,对于std::condition_variable的等待,就需要小心应对了;std::condition_variable_any呢?还是能做的更好吗?

9.2.4** 使用std::condition_variable_any中断等待**

std::condition_variable_any与std::condition_variable的不同在于,std::condition_variable_any可以使用任意类型的锁,而不仅有std::unique_lockstd::mutex。可以让事情做起来更加简单,并且std::condition_variable_any可以比std::condition_variable做的更好。因为能与任意类型的锁一起工作,就可以设计自己的锁,上锁/解锁interrupt_flag的内部互斥量set_clear_mutex,并且锁也支持等待调用,就像下面的代码。

清单9.12 为std::condition_variable_any设计的interruptible_wait

class interrupt_flag
{
std::atomic flag;
std::condition_variable*thread_cond;
std::condition_variable_any*thread_cond_any;
std::mutex set_clear_mutex;
public:
interrupt_flag(): thread_cond(0),thread_cond_any(0){}
void set()
{
flag.store(true,std::memory_order_relaxed);
std::lock_guardstd::mutex lk(set_clear_mutex);
if(thread_cond)
{
  thread_cond->notify_all();
}
else if(thread_cond_any)
{
  thread_cond_any->notify_all();
}
}
template
voidwait(std::condition_variable_any& cv,Lockable& lk)
{
struct custom_lock
{
  interrupt_flag* self;
  Lockable& lk;
  custom_lock(interrupt_flag*self_, std::condition_variable_any& cond, Lockable& lk_):
    self(self_),lk(lk_)
  {
   self->set_clear_mutex.lock(); // **1**
   self->thread_cond_any=&cond; // **2**
  }
  void unlock()  //** 3**
  {
    lk.unlock();
    self->set_clear_mutex.unlock();
  }
  void lock()
  {
   std::lock(self->set_clear_mutex,lk); // **4**
  }
  ~custom_lock()
  {
   self->thread_cond_any=0;  // 5
   self->set_clear_mutex.unlock();
  }
};
custom_lock cl(this,cv,lk);
interruption_point();
cv.wait(cl);
interruption_point();
}
// rest as before
};
template
void interruptible_wait(std::condition_variable_any& cv,Lockable&lk)
{
this_thread_interrupt_flag.wait(cv,lk);
}

自定义的锁类型在构造的时候,需要所锁住内部set_clear_mutex①,对thread_cond_any指针进行设置,并引用std::condition_variable_any传入锁的构造函数中②。Lockable引用将会在之后进行存储,其变量必须被锁住。现在可以安心的检查中断,不用担心竞争了。如果这时中断标志已经设置,那么标志一定是在锁住set_clear_mutex时设置的。当条件变量调用自定义锁的unlock()函数中的wait()时,就会对Lockable对象和set_clear_mutex进行解锁③。这就允许线程可以尝试中断其他线程获取set_clear_mutex锁;以及在内部wait()调用之后,检查thread_cond_any指针。这就是在替换std::condition_variable后,所拥有的功能(不包括管理)。当wait()结束等待(因为等待,或因为伪苏醒),因为线程将会调用lock()函数,这里依旧要求锁住内部set_clear_mutex,并且锁住Lockable对象④。现在,在wait()调用时,custom_lock的析构函数中⑤清理thread_cond_any指针(同样会解锁set_clear_mutex)之前,可以再次对中断进行检查。

9.2.5** 中断其他阻塞调用**

这次轮到中断条件变量的等待了,不过其他阻塞情况,比如:互斥锁,等待future等等,该怎么办呢?通常情况下,可以使用std::condition_variable的超时选项,因为在实际运行中不可能很快的将条件变量的等待终止(不访问内部互斥量或future的话)。不过,在某些情况下,你知道知道你在等待什么,这样就可以让循环在interruptible_wait()函数中运行。作为一个例子,这里为std::future<>重载了interruptible_wait()的实现:

template
void interruptible_wait(std::future& uf)
{
while(!this_thread_interrupt_flag.is_set())
{
if(uf.wait_for(lk,std::chrono::milliseconds(1)==std::future_status::ready)
break;
}
interruption_point();
}

等待会在中断标志设置好的时候,或future准备就绪的时候停止,不过实现中每次等待future的时间只有1ms。这就意味着,中断请求被确定前,平均等待的时间为0.5ms(这里假设存在一个高精度的时钟)。通常wait_for至少会等待一个时钟周期,所以如果时钟周期为15ms,那么结束等待的时间将会是15ms,而不是1ms。接受与不接受这种情况,都得视情况而定。如果这必要,且时钟支持的话,可以持续削减超时时间。这种方式将会让线程苏醒很多次,来检查标志,并且增加线程切换的开销。

OK,我们已经了解如何使用interruption_point()和interruptible_wait()函数检查中断。

当中断被检查出来了,要如何处理它呢?

9.2.6** 处理中断**

从中断线程的角度看,中断就是thread_interrupted异常,因此能像处理其他异常那样进行处理。

特别是使用标准catch块对其进行捕获:

try
{
do_something();
}
catch(thread_interrupted&)
{
handle_interruption();
}

捕获中断,进行处理。其他线程再次调用interrupt()时,线程将会再次被中断,这就被称为断点(interruption point)。如果线程执行的是一系列独立的任务,就会需要断点;中断一个任务,就意味着这个任务被丢弃,并且该线程就会执行任务列表中的其他任务。

因为thread_interrupted是一个异常,在能够被中断的代码中,之前线程安全的注意事项都是适用的,就是为了确保资源不会泄露,并在数据结构中留下对应的退出状态。通常,让线程中断是可行的,所以只需要让异常传播即可。不过,当异常传入std::thread的析构函数时,std::terminate()将会调用,并且整个程序将会终止。为了避免这种情况,需要在每个将interruptible_thread变量作为参数传入的函数中放置catch(thread_interrupted)处理块,可以将catch块包装进interrupt_flag的初始化过程中。因为异常将会终止独立进程,就能保证未处理的中断是异常安全的。interruptible_thread构造函数中对线程的初始化,实现如下:

internal_thread=std::thread([f,&p]{

p.set_value(&this_thread_interrupt_flag);
    try
    {
      f();
    }
    catch(thread_interruptedconst&)
    {}
  });

下面,我们来看个更加复杂的例子。

9.2.7** 应用退出时中断后台任务**

试想,在桌面上查找一个应用。这就需要与用户互动,应用的状态需要能在显示器上显示,就能看出应用有什么改变。为了避免影响GUI的响应时间,通常会将处理线程放在后台运行。后台进程需要一直执行,直到应用退出;后台线程会作为应用启动的一部分被启动,并且在应用终止的时候停止运行。通常这样的应用只有在机器关闭时,才会退出,因为应用需要更新应用最新的状态,就需要全时间运行。在某些情况下,当应用被关闭,需要使用有序的方式将后台线程关闭,其中一种方式就是中断。

下面清单中为一个系统实现了简单的线程管理部分。

清单9.13 在后台监视文件系统

std::mutex config_mutex;
std::vector background_threads;
void background_thread(int disk_id)
{
while(true)
{
interruption_point();  // **1**
fs_changefsc=get_fs_changes(disk_id);  // **2**
if(fsc.has_changes())
{
  update_index(fsc);  // **3**
}
}
}
void start_background_processing()
{
background_threads.push_back(interruptible_thread(background_thread,disk_1));
background_threads.push_back(interruptible_thread(background_thread,disk_2));
}
int main()
{
start_background_processing(); //** 4**
process_gui_until_exit(); //** 5**
std::unique_lockstd::mutexlk(config_mutex);
for(unsignedi=0;i
{
background_threads[i].interrupt(); //** 6**
}
for(unsignedi=0;i
{
background_threads[i].join(); //** 7**

}

}

启动时,后台线程就已经启动④。之后,对应线程将会处理GUI⑤。当用户要求进程退出时,后台进程将会被中断⑥,并且主线程会等待每一个后台线程结束后才退出⑦。后台线程运行在一个循环中,并时刻检查磁盘的变化②,对其序号进行更新③。调用interruption_point()函数,可以在循环中对中断进行检查。

为什么中断线程前,对线程进行等待?为什么不中断每个线程,让它们执行下一个任务?答案就是“并发”。线程被中断后,不会马上结束,因为需要对下一个断点进行处理,并且在退出前执行析构函数和代码异常处理部分。因为需要汇聚每个线程,所以就会让中断线程等待,即使线程还在做着有用的工作——中断其他线程。只有当没有工作时(所有线程都被中断),不需要等待。这就允许中断线程并行的处理自己的中断,并更快的完成中断。

中断机制很容易扩展到更深层次的中断调用,或在特定的代码块中禁用中断,这就当做留给读者的作业吧。

9.3总结****

在本章中,我们了解各种“高级”线程管理技术:线程池和中断线程。也了解了如何使用本地任务队列,使用任务窃取的方式减小同步开销,提高线程池的吞吐量;等待子任务完成的同时执行队列中其他任务,从而来避免死锁。

也了解了使用线程去中断另一个处理线程的各种方式,比如:使用特定的断点和函数执行中断,要不就是使用某种方法,对阻塞等待进行中断。


相关文章
|
16天前
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
30 7
|
16天前
|
消息中间件 存储 安全
|
1月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
43 1
C++ 多线程之初识多线程
|
22天前
|
存储 并行计算 安全
C++多线程应用
【10月更文挑战第29天】C++ 中的多线程应用广泛,常见场景包括并行计算、网络编程中的并发服务器和图形用户界面(GUI)应用。通过多线程可以显著提升计算速度和响应能力。示例代码展示了如何使用 `pthread` 库创建和管理线程。注意事项包括数据同步与互斥、线程间通信和线程安全的类设计,以确保程序的正确性和稳定性。
|
1月前
|
存储 前端开发 C++
C++ 多线程之带返回值的线程处理函数
这篇文章介绍了在C++中使用`async`函数、`packaged_task`和`promise`三种方法来创建带返回值的线程处理函数。
45 6
|
1月前
|
缓存 负载均衡 Java
c++写高性能的任务流线程池(万字详解!)
本文介绍了一种高性能的任务流线程池设计,涵盖多种优化机制。首先介绍了Work Steal机制,通过任务偷窃提高资源利用率。接着讨论了优先级任务,使不同优先级的任务得到合理调度。然后提出了缓存机制,通过环形缓存队列提升程序负载能力。Local Thread机制则通过预先创建线程减少创建和销毁线程的开销。Lock Free机制进一步减少了锁的竞争。容量动态调整机制根据任务负载动态调整线程数量。批量处理机制提高了任务处理效率。此外,还介绍了负载均衡、避免等待、预测优化、减少复制等策略。最后,任务组的设计便于管理和复用多任务。整体设计旨在提升线程池的性能和稳定性。
76 5
|
1月前
|
C++
C++ 多线程之线程管理函数
这篇文章介绍了C++中多线程编程的几个关键函数,包括获取线程ID的`get_id()`,延时函数`sleep_for()`,线程让步函数`yield()`,以及阻塞线程直到指定时间的`sleep_until()`。
23 0
C++ 多线程之线程管理函数
|
1月前
|
资源调度 Linux 调度
Linux C/C++之线程基础
这篇文章详细介绍了Linux下C/C++线程的基本概念、创建和管理线程的方法,以及线程同步的各种机制,并通过实例代码展示了线程同步技术的应用。
29 0
Linux C/C++之线程基础
|
3月前
|
Java 调度
基于C++11的线程池
基于C++11的线程池
|
4月前
|
算法 编译器 C++
开发与运维线程问题之在C++的原子操作中memory_order如何解决
开发与运维线程问题之在C++的原子操作中memory_order如何解决
43 2