UIUC CS241 讲义:众包系统编程书(4)

简介: UIUC CS241 讲义:众包系统编程书(4)

UIUC CS241 讲义:众包系统编程书(3)https://developer.aliyun.com/article/1427161

如果不调用 pthread_join 会发生什么?

已完成的线程将继续消耗资源。最终,如果创建了足够多的线程,pthread_create 将失败。在实践中,这只是长时间运行进程的问题,但对于简单的短暂进程来说并不是问题,因为当进程退出时,所有线程资源都会被自动释放。

我应该使用 pthread_exit 还是 pthread_join

pthread_exitpthread_join 都会让其他线程自行完成(即使在主线程中调用)。但是,只有 pthread_join 会在指定线程完成时返回。pthread_exit 不会等待,它会立即结束线程,并且不会给你继续执行的机会。

你能把指针从一个线程传递给另一个线程的堆栈变量吗?

是的。但是你需要非常小心关于堆栈变量的生命周期。

pthread_t start_threads() {
  int start = 42;
  pthread_t tid;
  pthread_create(&tid, 0, myfunc, &start); // ERROR!
  return tid;
}

上面的代码是无效的,因为函数start_threads很可能会在myfunc开始之前返回。该函数传递了start的地址,但是当myfunc执行时,start已经不在作用域内,其地址将被重新用于另一个变量。

以下代码是有效的,因为栈变量的生命周期比后台线程长。

void start_threads() {
  int start = 42;
  void *result;
  pthread_t tid;
  pthread_create(&tid, 0, myfunc, &start); // OK - start will be valid!
  pthread_join(tid, &result);
}

竞争条件简介

我怎样才能创建十个具有不同起始值的线程。

以下代码应该启动十个值为 0,1,2,3,…9 的线程,但运行时打印出1 7 8 8 8 8 8 8 8 10!你能看出为什么吗?

#include <pthread.h>
void* myfunc(void* ptr) {
    int i = *((int *) ptr);
    printf("%d ", i);
    return NULL;
}
int main() {
    // Each thread gets a different value of i to process
    int i;
    pthread_t tid;
    for(i =0; i < 10; i++) {
        pthread_create(&tid, NULL, myfunc, &i); // ERROR
    }
    pthread_exit(NULL);
}

上面的代码存在“竞争条件” - i 的值正在改变。新线程稍后启动(在示例输出中,最后一个线程在循环结束后启动)。

为了克服这种竞争条件,我们将为每个线程提供一个指向其自己数据区域的指针。例如,对于每个线程,我们可能希望存储 id、起始值和输出值:

struct T {
  pthread_t id;
  int start;
  char result[100];
};

这些可以存储在数组中 -

struct T *info = calloc(10 , sizeof(struct T)); // reserve enough bytes for ten T structures

并且每个数组元素都传递给每个线程 -

pthread_create(&info[i].id, NULL, func, &info[i]);

为什么有些函数(例如 asctime、getenv、strtok、strerror)不是线程安全的?

为了回答这个问题,让我们看一个简单的函数,它也不是“线程安全”的

char *to_message(int num) {
    char static result [256];
    if (num < 10) sprintf(result, "%d : blah blah" , num);
    else strcpy(result, "Unknown");
    return result;
}

在上面的代码中,结果缓冲区存储在全局内存中。这很好 - 我们不希望返回指向栈上无效地址的指针,但整个内存中只有一个结果缓冲区。如果两个线程同时使用它,那么一个线程将破坏另一个:

时间 线程 1 线程 2 注释
1 to_m(5)
2 to_m(99) 现在两个线程都会看到结果缓冲区中存储的是“未知”

什么是条件变量、信号量、互斥锁?

这些是同步锁,用于防止竞争条件,并确保同一程序中运行的线程之间的正确同步。此外,这些锁在概念上与内核内部使用的原语相同。

使用线程而不是分叉进程有什么优势吗?

是的!在线程之间共享信息很容易,因为线程(同一进程的线程)存在于相同的虚拟内存空间中。此外,创建线程比创建(分叉)进程要快得多。

使用线程而不是分叉进程有什么缺点吗?

是的!没有隔离!因为线程存在于同一个进程中,一个线程可以访问与其他线程相同的虚拟内存。一个线程可以终止整个进程(例如,尝试读取地址零)。

您可以使用多个线程分叉一个进程吗?

是的!但是子进程只有一个线程(这是调用fork的线程的克隆)。我们可以将其视为一个简单的例子,后台线程在子进程中从不打印出第二条消息。

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
static pid_t child = -2;
void *sleepnprint(void *arg) {
  printf("%d:%s starting up...\n", getpid(), (char *) arg);
  while (child == -2) {sleep(1);} /* Later we will use condition variables */
  printf("%d:%s finishing...\n",getpid(), (char*)arg);
  return NULL;  
}
int main() {
  pthread_t tid1, tid2;
  pthread_create(&tid1,NULL, sleepnprint, "New Thread One");
  pthread_create(&tid2,NULL, sleepnprint, "New Thread Two");
  child = fork();
  printf("%d:%s\n",getpid(), "fork()ing complete");
  sleep(3);
  printf("%d:%s\n",getpid(), "Main thread finished");
  pthread_exit(NULL);
  return 0; /* Never executes */
}
8970:New Thread One starting up...
8970:fork()ing complete
8973:fork()ing complete
8970:New Thread Two starting up...
8970:New Thread Two finishing...
8970:New Thread One finishing...
8970:Main thread finished
8973:Main thread finished

实际上,在分叉之前创建线程可能会导致意外错误,因为(如上所示)其他线程在分叉时立即终止。另一个线程可能刚刚锁定了互斥锁(例如通过调用 malloc),并且再也不会解锁。高级用户可能会发现pthread_atfork有用,但我们建议您通常尽量避免在分叉之前创建线程,除非您完全了解这种方法的限制和困难。

还有其他情况下fork可能比创建线程更可取吗。

创建单独的进程很有用

  • 当需要更多安全性时(例如,Chrome 浏览器为不同的标签使用不同的进程)
  • 在运行现有和完整的程序时,需要一个新进程(例如,启动’gcc’)
  • 当您遇到同步原语并且每个进程都在系统中操作某些东西时

我怎样才能找到更多信息?

man 页面中查看完整示例,并在pthread 参考指南中查看。另外:简明的第三方示例代码,解释创建、连接和退出

Pthreads,第三部分:并行问题(奖励)

概述

下一节将讨论当 pthread 发生冲突时会发生什么,但如果每个线程做的事情完全不同,没有重叠呢?

我们找到了最大加速并行问题吗?

尴尬的并行问题

并行算法的研究在过去几年里迅速发展。一个尴尬的并行问题是指需要很少的工作就可以转换为并行的问题。其中很多问题都涉及一些同步概念,但并非总是如此。你已经知道一个可并行化的算法,归并排序!

void merge_sort(int *arr, size_t len){
     if(len > 1){
     //Mergesort the left half
     //Mergesort the right half
     //Merge the two halves
     }

有了对线程的新理解,你只需要为左半部分创建一个线程,为右半部分创建一个线程。鉴于你的 CPU 有多个真实核心,你将看到与Amdahl’s Law相符的加速。时间复杂度分析在这里也变得有趣。并行算法的运行时间为 O(log^3(n))(因为我们假设有很多核心)。

然而在实践中,我们通常会做两个改变。一是,一旦数组变得足够小,我们就会放弃并行归并排序算法,转而使用快速排序或其他在小数组上运行快速的算法(某种缓存一致性)。另一件我们知道的事情是,CPU 并不拥有无限的核心。为了解决这个问题,我们通常会保留一个工作池。

工作池

我们知道 CPU 的核心数量是有限的。很多时候我们会启动一些线程,并在它们空闲时给它们任务。

另一个问题,Parallel Map

假设我们想要对整个数组应用一个函数,一次处理一个元素。

int *map(int (*func)(int), int *arr, size_t len){
    int *ret = malloc(len*sizeof(*arr));
    for(size_t i = 0; i < len; ++i) 
        ret[i] = func(arr[i]);
    return ret;
}

由于没有任何元素依赖于其他元素,你会如何并行化这个问题?你认为在线程之间如何分配工作最好?

调度

有几种方法可以分解工作。

  • 静态调度:将问题分解成固定大小的块(预先确定的),并让每个线程处理其中的每个块。当每个子问题花费的时间大致相同时,这种方法效果很好,因为没有额外的开销。你只需要编写一个循环,并将 map 函数分配给每个子数组。
  • 动态调度:当一个新问题可用时,让一个线程处理它。当你不知道调度需要多长时间时,这是很有用的。
  • 引导调度:这是上述两种方法的混合,具有各自的优点和权衡。你可以从静态调度开始,如果需要的话慢慢转向动态调度。
  • 运行时调度:你完全不知道问题需要多长时间。与其自己决定,不如让程序决定该做什么!

来源,但不需要记住。

一些缺点

你不会立即看到加速,因为缓存一致性和调度额外的线程等原因。

其他问题

Wikipedia

  • 在 Web 服务器上为多个用户提供静态文件。
  • 曼德勃罗集、Perlin 噪声和类似的图像,其中每个点都是独立计算的。
  • 计算机图形的渲染。在计算机动画中,每一帧可能是独立渲染的(参见并行渲染)。
  • 在密码学中的暴力搜索。值得注意的现实世界例子包括 distributed.net 和加密货币中使用的工作证明系统。
  • 生物信息学中用于多个查询的 BLAST 搜索(但不适用于单个大查询)[9]
  • 大规模人脸识别系统将数千个任意获取的人脸(例如,通过闭路电视的安全或监控视频)与同样大量的先前存储的人脸(例如,罪犯库或类似的观察名单)进行比较。
  • 比较许多独立场景的计算机模拟,例如气候模型。
  • 进化计算元启发式,如遗传算法。
  • 数值天气预报的集合计算。
  • 粒子物理中的事件模拟和重建。
  • Marching squares 算法
  • 二次筛和数域筛的筛选步骤。
  • 随机森林机器学习技术中的树生长步骤。
  • 离散傅立叶变换,其中每个谐波都是独立计算的。

Pthread 复习问题

主题

  • pthread 生命周期
  • 每个线程都有一个堆栈
  • 从线程中捕获返回值
  • 使用pthread_join
  • 使用pthread_create
  • 使用pthread_exit
  • 在什么条件下进程会退出

问题

  • 当创建一个 pthread 时会发生什么?(你不需要进入超级细节)
  • 每个线程的堆栈在哪里?
  • 如何在给定pthread_t的情况下获得返回值?线程可以如何设置返回值?如果丢弃返回值会发生什么?
  • 为什么pthread_join很重要(考虑堆栈空间、寄存器、返回值)?
  • 在正常情况下pthread_exit做什么(即你不是最后一个线程)?调用 pthread_exit 时会调用哪些其他函数?
  • 给我三个多线程进程将退出的条件。你还能想到其他条件吗?
  • 什么是尴尬并行问题?

五、同步

同步,第一部分:互斥锁

解决临界区

什么是临界区?

临界区是一段代码,只能由一个线程同时执行,如果程序要正确运行。如果两个线程(或进程)同时在临界区内执行代码,那么可能程序可能不再具有正确的行为。

仅仅递增一个变量是否是临界区?

可能。递增变量(i++)是通过三个单独的步骤执行的:将内存内容复制到 CPU 寄存器。增加 CPU 中的值。将新值存储在内存中。如果内存位置只能由一个线程访问(例如下面的自动变量i),则不可能发生竞争条件,也没有与i相关的临界区。但是,sum变量是全局变量,并且被两个线程访问。可能两个线程可能同时尝试递增变量。

#include <stdio.h>
#include <pthread.h>
// Compile with -pthread
int sum = 0; //shared
void *countgold(void *param) {
    int i; //local to each thread
    for (i = 0; i < 10000000; i++) {
        sum += 1;
    }
    return NULL;
}
int main() {
    pthread_t tid1, tid2;
    pthread_create(&tid1, NULL, countgold, NULL);
    pthread_create(&tid2, NULL, countgold, NULL);
    //Wait for both threads to finish:
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);
    printf("ARRRRG sum is %d\n", sum);
    return 0;
}

上述代码的典型输出是ARGGGH sum is 8140268每次运行程序时都会打印不同的总和,因为存在竞争条件;代码无法阻止两个线程同时读写sum。例如,两个线程都将当前的 sum 值复制到运行每个线程的 CPU 中(假设为 123)。两个线程都将其自己的副本增加一。两个线程写回该值(124)。如果线程在不同时间访问了 sum,则计数将为 125。

如何确保一次只有一个线程可以访问全局变量?

你的意思是,“帮助 - 我需要一个互斥体!”如果一个线程当前正在临界区内,我们希望另一个线程等到第一个线程完成。为此,我们可以使用互斥体(Mutual Exclusion 的缩写)。

对于简单的示例,我们需要添加的代码最少只有三行:

pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; // global variable
pthread_mutex_lock(&m); // start of Critical Section
pthread_mutex_unlock(&m); //end of Critical Section

一旦我们完成了互斥体,我们还应该调用pthread_mutex_destroy(&m)。请注意,您只能销毁未锁定的互斥体。对已销毁的锁调用 destroy,初始化已初始化的锁,锁定已锁定的锁,解锁未锁定的锁等都是不受支持的(至少对于默认的互斥体),通常会导致未定义的行为。

如果我锁定了互斥体,是否会阻止所有其他线程?

不,其他线程将继续。只有当一个线程尝试锁定已经锁定的互斥体时,线程才必须等待。一旦原始线程解锁互斥体,第二个(等待的)线程将获取锁并能够继续。

还有其他创建互斥体的方法吗?

可以。您可以仅对全局(“静态”)变量使用宏 PTHREAD_MUTEX_INITIALIZER。m = PTHREAD_MUTEX_INITIALIZER 等同于更通用的pthread_mutex_init(&m,NULL)。init 版本包括用于在性能和额外错误检查以及高级共享选项之间进行权衡的选项。

pthread_mutex_t *lock = malloc(sizeof(pthread_mutex_t)); 
pthread_mutex_init(lock, NULL);
//later
pthread_mutex_destroy(lock);
free(lock);

关于“init”和“destroy”需要记住的事情:

  • 多个线程的初始化/销毁具有未定义的行为
  • 销毁锁定的互斥体具有未定义的行为
  • 基本上尝试遵循一个线程初始化一个互斥体,而且只有一个线程初始化一个互斥体的模式。

互斥体陷阱

所以 pthread_mutex_lock在其他线程读取相同变量时会停止吗?

不,互斥体不是那么聪明 - 它与代码(线程)一起工作,而不是数据。只有当另一个线程在锁定的互斥体上调用lock时,第二个线程才需要等待,直到互斥体被解锁。

考虑

int a;
pthread_mutex_t m1 = PTHREAD_MUTEX_INITIALIZER,
                 m2 = = PTHREAD_MUTEX_INITIALIZER;
// later
// Thread 1
pthread_mutex_lock(&m1);
a++;
pthread_mutex_unlock(&m1);
// Thread 2
pthread_mutex_lock(&m2);
a++;
pthread_mutex_unlock(&m2);

仍会导致竞争条件。

我可以在 fork 之前创建互斥体吗?

是的 - 但是子进程和父进程将不共享虚拟内存,并且每个进程都将拥有独立于其他进程的互斥体。

(高级说明:使用共享内存有高级选项,允许子进程和父进程共享互斥体,如果使用正确的选项并使用共享内存段。请参阅stackoverflow 示例

如果一个线程锁定了一个互斥锁,另一个线程能解锁它吗?

不行。同一个线程必须解锁它。

我可以使用两个或更多的互斥锁吗?

是的!事实上,通常每个需要更新的数据结构都有一个锁。

如果你只有一个锁,那么两个线程之间可能会对锁有显著的争用,这是不必要的。例如,如果两个线程正在更新两个不同的计数器,可能不需要使用相同的锁。

然而,简单地创建许多锁是不够的:重要的是能够推理关于临界区的问题,例如,一个线程不能在更新期间读取两个数据结构,而这两个数据结构暂时处于不一致的状态。

调用 lock 和 unlock 会有任何开销吗?

调用pthread_mutex_lock_unlock会有一些开销;然而这是你为了程序正确运行所付出的代价!

最简单的完整示例?

下面显示了一个完整的示例

#include <stdio.h>
#include <pthread.h>
// Compile with -pthread
// Create a mutex this ready to be locked!
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
int sum = 0;
void *countgold(void *param) {
    int i;
    //Same thread that locks the mutex must unlock it
    //Critical section is just 'sum += 1'
    //However locking and unlocking a million times
    //has significant overhead in this simple answer
    pthread_mutex_lock(&m);
    // Other threads that call lock will have to wait until we call unlock
    for (i = 0; i < 10000000; i++) {
    sum += 1;
    }
    pthread_mutex_unlock(&m);
    return NULL;
}
int main() {
    pthread_t tid1, tid2;
    pthread_create(&tid1, NULL, countgold, NULL);
    pthread_create(&tid2, NULL, countgold, NULL);
    //Wait for both threads to finish:
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);
    printf("ARRRRG sum is %d\n", sum);
    return 0;
}

在上面的代码中,线程在进入计数室之前获取了锁。关键部分只有sum+=1,所以下一个版本也是正确的但更慢 -

for (i = 0; i < 10000000; i++) {
        pthread_mutex_lock(&m);
        sum += 1;
        pthread_mutex_unlock(&m);
    }
    return NULL;
}

这个过程运行得更慢,因为我们一百万次锁定和解锁互斥锁,这是昂贵的 - 至少与递增一个变量相比是昂贵的。(在这个简单的例子中,我们并不真正需要线程 - 我们可以加两次!)一个更快的多线程示例是使用一个自动(本地)变量添加一百万,然后在计算循环结束后将其添加到共享总数中:

int local = 0;
    for (i = 0; i < 10000000; i++) {
       local += 1;
    }
    pthread_mutex_lock(&m);
    sum += local;
    pthread_mutex_unlock(&m);
    return NULL;
}

如果我忘记解锁会发生什么?

死锁!我们稍后会谈论死锁,但如果多个线程调用这个循环会有什么问题。

while(not_stop){
    //stdin may not be thread safe
    pthread_mutex_lock(&m);
    char *line = getline(...);
    if(rand() % 2) { /* randomly skip lines */
         continue;
    }
    pthread_mutex_unlock(&m);
    process_line(line);
}

我什么时候可以销毁互斥锁?

你只能销毁一个未锁定的互斥锁

我可以将 pthread_mutex_t 复制到新的内存位置吗?

不行,将互斥锁的字节复制到新的内存位置,然后使用副本是支持的。

互斥锁的简单实现会是什么样的?

下面显示了一个简单(但不正确!)的建议。unlock函数只是解锁互斥锁并返回。lock 函数首先检查锁是否已经被锁定。如果当前已经被锁定,它将继续检查,直到另一个线程解锁互斥锁。

// Version 1 (Incorrect!)
void lock(mutex_t *m) {
  while(m->locked) { /*Locked? Nevermind - just loop and check again!*/ }
  m->locked = 1;
}
void unlock(mutex_t *m) {
  m->locked = 0;
}

版本 1 使用了“忙等待”(不必要地浪费 CPU 资源),但更严重的问题是:我们有一个竞争条件!

如果两个线程同时调用lock,有可能两个线程都会将’m_locked’读取为零。因此,两个线程都会认为它们对锁有独占访问权,然后两个线程都会继续。哎呀!

我们可以尝试通过在循环内调用pthread_yield()来减少一点 CPU 开销 - pthread_yield 建议操作系统暂时不使用 CPU,因此 CPU 可能被分配给等待运行的线程。但这并不能解决竞争条件。我们需要一个更好的实现 - 你能想出如何防止竞争条件吗?

我怎样才能了解更多?

玩! 阅读 man page!

同步,第二部分:计数信号量

什么是计数信号量?

计数信号量包含一个值,并支持两个操作“等待”和“发布”。发布增加信号量并立即返回。“等待”将在计数为零时等待。如果计数不为零,则信号量将减少计数并立即返回。

一个类比是饼干罐中的饼干数量(或者宝箱中的金币数量)。在拿饼干之前,调用“等待”。如果没有剩下饼干,那么等待将不会返回:它将等待,直到另一个线程通过调用 post 增加信号量。

简而言之,“发布”增加并立即返回,而“等待”将在计数为零时等待。在返回之前,它将减少计数。

我如何创建一个信号量?

本页介绍了未命名信号量。不幸的是,Mac OS X 目前还不支持这些。

首先决定初始值是零还是其他值(例如数组中剩余空间的数量)。与 pthread 互斥锁不同,创建信号量没有捷径 - 使用sem_init

#include <semaphore.h>
sem_t s;
int main() {
  sem_init(&s, 0, 10); // returns -1 (=FAILED) on OS X
  sem_wait(&s); // Could do this 10 times without blocking
  sem_post(&s); // Announce that we've finished (and one more resource item is available; increment count)
  sem_destroy(&s); // release resources of the semaphore
}

我可以从不同的线程调用 wait 和 post 吗?

可以!与互斥锁不同,增量和减量可以来自不同的线程。

可以使用信号量代替互斥锁吗?

是的 - 虽然信号量的开销更大。要使用信号量:

  • 用计数为一初始化信号量。
  • ...lock替换sem_wait
  • ...unlock替换sem_post

互斥锁是一个在“发布”之前始终“等待”的信号量

sem_t s;
sem_init(&s, 0, 1);
sem_wait(&s);
// Critical Section
sem_post(&s);

我可以在信号处理程序中使用 sem_post 吗?

是的!sem_post是少数几个可以在信号处理程序中正确使用的函数之一。这意味着我们可以释放一个等待的线程,该线程现在可以进行所有我们不允许在信号处理程序本身内调用的调用(例如printf)。

#include <stdio.h>
#include <pthread.h>
#include <signal.h>
#include <semaphore.h>
#include <unistd.h>
sem_t s;
void handler(int signal)
{
    sem_post(&s); /* Release the Kraken! */
}
void *singsong(void *param)
{
    sem_wait(&s);
    printf("I had to wait until your signal released me!\n");
}
int main()
{
    int ok = sem_init(&s, 0, 0 /* Initial value of zero*/); 
    if (ok == -1) {
       perror("Could not create unnamed semaphore");
       return 1;
    }
    signal(SIGINT, handler); // Too simple! See note below
    pthread_t tid;
    pthread_create(&tid, NULL, singsong, NULL);
    pthread_exit(NULL); /* Process will exit when there are no more threads */
}

请注意,健壮的程序不会在多线程程序中使用signal()(“在多线程进程中使用 signal()的效果是未指定的。”- 信号手册页);一个更正确的程序将需要使用sigaction

我如何找到更多信息?

阅读手册页:

同步,第三部分:使用互斥锁和信号量

线程安全的堆栈

什么是原子操作?

用维基百科的话来说,

如果一个操作(或一组操作)在系统的其他部分看起来是瞬间发生的,那么它就是原子的或不可中断的。没有锁,只有简单的 CPU 指令(“从内存中读取这个字节”)是原子的(不可分割的)。在单 CPU 系统中,可以暂时禁用中断(这样一系列操作就不能被中断),但实际上原子性是通过使用同步原语来实现的,通常是互斥锁。

递增变量(i++是原子的,因为它需要三个不同的步骤:将位模式从内存复制到 CPU;使用 CPU 的寄存器进行计算;将位模式复制回内存。在这个递增序列期间,另一个线程或进程仍然可以读取旧值,并且当递增序列完成时,对同一内存的其他写入也会被覆盖。

我如何使用互斥锁使我的数据结构线程安全?

请注意,这只是一个介绍 - 编写高性能的线程安全数据结构需要自己的书!这是一个简单的数据结构(堆栈),它不是线程安全的:

// A simple fixed-sized stack (version 1)
#define STACK_SIZE 20
int count;
double values[STACK_SIZE];
void push(double v) { 
    values[count++] = v; 
}
double pop() {
    return values[--count];
}
int is_empty() {
    return count == 0;
}

堆栈的版本 1 不是线程安全的,因为如果两个线程同时调用 push 或 pop,那么结果或堆栈可能是不一致的。例如,想象一下,如果两个线程同时调用 pop,那么两个线程可能读取相同的值,两个线程可能读取原始计数值。

要将其转换为线程安全的数据结构,我们需要确定我们代码的关键部分,即哪些部分的代码必须一次只有一个线程。在上面的例子中,pushpopis_empty函数访问相同的变量(即内存),并且堆栈的所有关键部分。

push(和pop)正在执行时,数据结构处于不一致状态(例如计数可能尚未写入,因此可能仍然包含原始值)。通过用互斥锁包装这些方法,我们可以确保一次只有一个线程可以更新(或读取)堆栈。

以下是一个候选的“解决方案”。它正确吗?如果不是,它将如何失败?

// An attempt at a thread-safe stack (version 2)
#define STACK_SIZE 20
int count;
double values[STACK_SIZE];
pthread_mutex_t m1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t m2 = PTHREAD_MUTEX_INITIALIZER;
void push(double v) { 
    pthread_mutex_lock(&m1);
    values[count++] = v;
    pthread_mutex_unlock(&m1);
}
double pop() {
    pthread_mutex_lock(&m2);
    double v = values[--count];
    pthread_mutex_unlock(&m2);
    return v;
}
int is_empty() {
    pthread_mutex_lock(&m1);
    return count == 0;
    pthread_mutex_unlock(&m1);
}

上面的代码(“版本 2”)至少包含一个错误。花点时间看看你能不能找到错误,并弄清楚后果。

如果三个线程同时调用push(),锁m1确保只有一个线程在操作堆栈(两个线程将需要等待,直到第一个线程完成(调用解锁),然后第二个线程将被允许继续进入临界区,最后第三个线程将在第二个线程完成后被允许继续)。

类似的论点也适用于并发调用(同时调用)pop。然而,版本 2 不会阻止pushpop同时运行,因为pushpop使用两个不同的互斥锁。

在这种情况下,修复很简单 - 对 push 和 pop 函数都使用相同的互斥锁。

代码还有第二个错误;is_empty在比较后返回,不会解锁互斥锁。然而,错误不会立即被发现。例如,假设一个线程调用is_empty,稍后第二个线程调用push。这个线程会神秘地停止。使用调试器,你可以发现线程在push方法内的 lock()方法处被卡住,因为之前的is_empty调用没有解锁。因此,一个线程的疏忽导致了任意其他线程在以后的时间出现问题。

以下是更好的版本 -

// An attempt at a thread-safe stack (version 3)
int count;
double values[count];
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
void push(double v) { 
  pthread_mutex_lock(&m); 
  values[count++] = v;
  pthread_mutex_unlock(&m);
}
double pop() {
  pthread_mutex_lock(&m);
  double v = values[--count];
  pthread_mutex_unlock(&m);
  return v;
}
int is_empty() {
  pthread_mutex_lock(&m);
  int result= count == 0;
  pthread_mutex_unlock(&m);
  return result;
}

版本 3 是线程安全的(我们已经确保了所有关键部分的互斥),但有两点需要注意:

  • is_empty是线程安全的,但它的结果可能已经过时,即在线程得到结果时,堆栈可能不再为空!
  • 没有保护免受下溢(在空堆栈上弹出)或上溢(在已满堆栈上推入)

后一点可以使用计数信号量来修复。

该实现假定为单个堆栈。更通用的版本可能会将互斥锁作为内存结构的一部分,并使用 pthread_mutex_init 来初始化互斥锁。例如,

// Support for multiple stacks (each one has a mutex)
typedef struct stack {
  int count;
  pthread_mutex_t m; 
  double *values;
} stack_t;
stack_t* stack_create(int capacity) {
  stack_t *result = malloc(sizeof(stack_t));
  result->count = 0;
  result->values = malloc(sizeof(double) * capacity);
  pthread_mutex_init(&result->m, NULL);
  return result;
}
void stack_destroy(stack_t *s) {
  free(s->values);
  pthread_mutex_destroy(&s->m);
  free(s);
}
// Warning no underflow or overflow checks!
void push(stack_t *s, double v) { 
  pthread_mutex_lock(&s->m); 
  s->values[(s->count)++] = v; 
  pthread_mutex_unlock(&s->m); }
double pop(stack_t *s) { 
  pthread_mutex_lock(&s->m); 
  double v = s->values[--(s->count)]; 
  pthread_mutex_unlock(&s->m); 
  return v;
}
int is_empty(stack_t *s) { 
  pthread_mutex_lock(&s->m); 
  int result = s->count == 0; 
  pthread_mutex_unlock(&s->m);
  return result;
}

示例用法:

int main() {
    stack_t *s1 = stack_create(10 /* Max capacity*/);
    stack_t *s2 = stack_create(10);
    push(s1, 3.141);
    push(s2, pop(s1));
    stack_destroy(s2);
    stack_destroy(s1);
}

堆栈信号量

如果堆栈为空或已满,我如何强制我的线程等待?

使用计数信号量!使用计数信号量来跟踪剩余空间的数量,另一个信号量来跟踪堆栈中项目的数量。我们将称这两个信号量为’sremain’和’sitems’。记住,sem_wait会在信号量的计数被另一个线程调用sem_post减少到零时等待。

// Sketch #1
sem_t sitems;
sem_t sremain;
void stack_init(){
  sem_init(&sitems, 0, 0);
  sem_init(&sremain, 0, 10);
}
double pop() {
  // Wait until there's at least one item
  sem_wait(&sitems);
  ...
void push(double v) {
  // Wait until there's at least one space
  sem_wait(&sremain);
  ...

草图#2 已经实现了太早的post。在 push 中等待的另一个线程可能会错误地尝试写入一个已满的堆栈(同样,等待 pop()的线程可能会过早地继续)。

// Sketch #2 (Error!)
double pop() {
  // Wait until there's at least one item
  sem_wait(&sitems);
  sem_post(&sremain); // error! wakes up pushing() thread too early
  return values[--count];
}
void push(double v) {
  // Wait until there's at least one space
  sem_wait(&sremain);
  sem_post(&sitems); // error! wakes up a popping() thread too early
  values[count++] = v;
}

草图 3 实现了正确的信号量逻辑,但你能发现错误吗?

// Sketch #3 (Error!)
double pop() {
  // Wait until there's at least one item
  sem_wait(&sitems);
  double v= values[--count];
  sem_post(&sremain);
  return v;
}
void push(double v) {
  // Wait until there's at least one space
  sem_wait(&sremain);
  values[count++] = v;
  sem_post(&sitems); 
}

草图 3 正确地使用信号量强制执行了缓冲区满和缓冲区空的条件。然而,没有互斥:两个线程可以同时处于临界区,这将破坏数据结构(或至少导致数据丢失)。修复方法是在临界区周围包装一个互斥锁:

// Simple single stack - see above example on how to convert this into a multiple stacks.
// Also a robust POSIX implementation would check for EINTR and error codes of sem_wait.
// PTHREAD_MUTEX_INITIALIZER for statics (use pthread_mutex_init() for stack/heap memory)
pthread_mutex_t m= PTHREAD_MUTEX_INITIALIZER; 
int count = 0;
double values[10];
sem_t sitems, sremain;
void init() {
  sem_init(&sitems, 0, 0);
  sem_init(&sremains, 0, 10); // 10 spaces
}
double pop() {
  // Wait until there's at least one item
  sem_wait(&sitems);
  pthread_mutex_lock(&m); // CRITICAL SECTION
  double v= values[--count];
  pthread_mutex_unlock(&m);
  sem_post(&sremain); // Hey world, there's at least one space
  return v;
}
void push(double v) {
  // Wait until there's at least one space
  sem_wait(&sremain);
  pthread_mutex_lock(&m); // CRITICAL SECTION
  values[count++] = v;
  pthread_mutex_unlock(&m);
  sem_post(&sitems); // Hey world, there's at least one item
}
// Note a robust solution will need to check sem_wait's result for EINTR (more about this later)

常见的互斥锁陷阱是什么?

  • 由于愚蠢的拼写错误而锁定/解锁错误的互斥锁
  • 未解锁互斥锁(由于在错误条件下提前返回)
  • 资源泄漏(未调用pthread_mutex_destroy
  • 使用未初始化的互斥锁(或使用已被销毁的互斥锁)
  • 在线程上两次锁定互斥锁(未首先解锁)
  • 死锁和优先级反转(我们稍后会讨论这些)

同步,第四部分:关键部分问题

候选解决方案

什么是关键部分问题?

如已在Synchronization, Part 3: Working with Mutexes And Semaphores中讨论的,我们的代码中有一些关键部分只能由一个线程同时执行。我们将这种要求描述为“互斥排他”;只有一个线程(或进程)可以访问共享资源。

在多线程程序中,我们可以使用互斥锁和解锁调用来包装关键部分:

pthread_mutex_lock() - one thread allowed at a time! (others will have to wait here)
... Do Critical Section stuff here!
pthread_mutex_unlock() - let other waiting threads continue

我们如何实现这些锁定和解锁调用?我们能创建一个保证互斥的算法吗?下面显示了一个不正确的实现,

pthread_mutex_lock(p_mutex_t *m)     { while(m->lock) {}; m->lock = 1;}
pthread_mutex_unlock(p_mutex_t *m)   { m->lock = 0; }

乍一看,代码似乎是有效的;如果一个线程尝试锁定互斥量,稍后的线程必须等到锁被释放。然而,这种实现不能满足互斥。让我们从两个大致同时运行的线程的角度仔细观察这个“实现”。在下表中,时间从上到下依次进行-

时间 线程 1 线程 2
1 while(lock) {}
2 while(lock) {}
3 lock = 1 lock = 1

哎呀!存在竞争条件。不幸的是,两个线程都检查了锁并读取了一个错误的值,因此能够继续执行。

关键部分问题的候选解决方案。

为了简化讨论,我们只考虑两个线程。请注意,这些论点适用于线程和进程,经典的 CS 文献讨论了这些问题,涉及到需要对关键部分或共享资源进行独占访问(即互斥)的两个进程。

提高标志表示线程/进程进入关键部分的意图。

请记住,下面概述的伪代码是较大程序的一部分;线程或进程通常需要在进程的生命周期中多次进入关键部分。因此,想象每个示例都包裹在一个循环中,在循环中线程或进程在其他事务上工作了一段随机时间。

下面描述的候选解决方案有什么问题吗?

// Candidate #1
wait until your flag is lowered
raise my flag
// Do Critical Section stuff
lower my flag

答案:候选解决方案#1 也存在竞争条件,即它不能满足互斥排他,因为两个线程/进程都可以读取对方的标志值(=降低)并继续。

这表明我们应该在检查其他线程的标志之前提高标志 - 这是下面的候选解决方案#2。

// Candidate #2
raise my flag
wait until your flag is lowered
// Do Critical Section stuff
lower my flag

候选方案#2 满足互斥 - 不可能同时有两个线程在关键部分内。然而,这段代码存在死锁问题!假设两个线程希望同时进入关键部分:

时间 线程 1 线程 2
1 raise my flag
2 raise my flag

| 3 | wait... | wait... |

哎呀,现在两个线程/进程都在等待对方降低他们的标志。现在两者都将永远无法进入关键部分!

这表明我们应该使用轮流变量来尝试解决谁应该继续的问题。

轮流解决方案

以下候选解决方案#3 使用轮流变量礼貌地允许一个线程,然后另一个线程继续

// Candidate #3
wait until my turn is myid
// Do Critical Section stuff
turn = yourid

候选方案#3 满足互斥(每个线程或进程都可以独占访问关键部分),但是两个线程/进程必须采取严格的轮流方式来使用关键部分;即它们被迫进入交替的关键部分访问模式。例如,如果线程 1 希望每毫秒读取一个哈希表,但另一个线程每秒写入一个哈希表,那么读取线程必须再等待 999 毫秒才能再次从哈希表中读取。这种“解决方案”是不有效的,因为我们的线程应该能够取得进展并在没有其他线程当前在关键部分时进入关键部分。

对关键部分问题的解决方案的期望属性?

在解决关键部分问题中,我们希望的有三个主要的理想属性

  • 互斥 - 线程/进程获得独占访问权;其他线程/进程必须等待,直到它退出关键部分。
  • 有界等待 - 如果线程/进程必须等待,那么它只能等待有限的时间(不允许无限等待时间!)。有界等待的确切定义是,在给定进程进入之前,任何其他进程可以进入其关键部分的次数有一个上限(非无限)。
  • 进度 - 如果没有线程/进程在关键部分内,那么线程/进程应该能够继续进行(取得进展)而无需等待。

在考虑这些想法的基础上,让我们检查另一个候选解决方案,只有在两个线程同时需要访问时才使用基于轮换的标志。

轮换和标志解决方案

以下是 CSP 的正确解决方案吗?

\\ Candidate #4
raise my flag
if your flag is raised, wait until my turn
// Do Critical Section stuff
turn = yourid
lower my flag

一位教师和另一位 CS 教师最初也是这样认为的!然而,分析这些解决方案是棘手的。甚至关于这个特定主题的同行评审论文中也包含不正确的解决方案!乍一看,它似乎满足互斥、有界等待和进度:基于轮换的标志仅在出现平局时使用(因此允许进度和有界等待),并且似乎满足互斥。然而…也许你可以找到一个反例?

候选#4 失败,因为一个线程不会等到另一个线程降低他们的标志。经过一番思考(或灵感),可以创建以下场景来演示互斥不满足。

想象第一个线程运行这段代码两次(所以轮换标志现在指向第二个线程)。当第一个线程仍然在关键部分内时,第二个线程到达。第二个线程可以立即继续进入关键部分!

时间 轮换 线程#1 线程#2
1 2 raise my flag
2 2 if your flag is raised, wait until my turn raise my flag
3 2 // Do Critical Section stuff if your flag is raised, wait until my turn(真的!)
4 2 // Do Critical Section stuff // Do Critical Section stuff - 糟糕

有效的解决方案

Peterson 的解决方案是什么?

Peterson 在 1981 年的一篇两页论文中发表了他的小说和令人惊讶的简单解决方案。下面显示了他算法的一个版本,使用了共享变量turn

\\ Candidate #5
raise my flag
turn = your_id
wait until your flag is lowered and turn is yourid
// Do Critical Section stuff
lower my flag

该解决方案满足互斥、有界等待和进度。如果线程#2 将轮换设置为 2 并且当前在关键部分内。线程#1 到达,将轮换设置回 1,现在等待直到线程 2 降低标志。

Peterson 原始文章 pdf 的链接:G. L. Peterson: “关于互斥问题的神话”,信息处理通讯 12(3) 1981, 115–116

Peterson 的解决方案是第一个解决方案吗?

不,Dekkers 算法(1962 年)是第一个可以证明正确的解决方案。以下是该算法的一个版本。

raise my flag
while(your flag is raised) :
   if it's your turn to win :
     lower my flag
     wait while your turn
     raise my flag
// Do Critical Section stuff
set your turn to win
lower my flag

请注意,无论循环迭代零次、一次还是多次,进程的标志在关键部分始终被提升。此外,该标志可以被解释为立即意图进入关键部分。只有在另一个进程也提升了标志时,一个进程才会推迟,降低他们的意图标志并等待。

我可以只在 C 或汇编中实现 Peterson 的(或 Dekkers)算法吗?

是的 - 通过一点搜索,甚至今天也可以在特定简单的移动处理器上找到它的生产应用:Peterson 的算法用于实现 Tegra 移动处理器的低级 Linux 内核锁(由 Nvidia 的系统级芯片 ARM 处理器和 GPU 核心)android.googlesource.com/kernel/tegra.git/+/android-tegra-3.10/arch/arm/mach-tegra/sleep.S#58

然而,一般来说,CPU 和 C 编译器可以重新排序 CPU 指令或使用 CPU 核心特定的本地缓存值,如果另一个核心更新共享变量,则这些值可能是过时的。因此,对于大多数平台来说,简单的伪代码到 C 的实现太天真了。你现在可以停止阅读了。

哦… 你决定继续阅读。好吧,这里有龙!别说我们没警告过你。考虑这是一个高级和棘手的话题,但(剧透)有一个美好的结局。

考虑以下代码,

while(flag2 ) { /* busy loop - go around again */

一个高效的编译器会推断flag2变量在循环内部永远不会改变,因此测试可以优化为while(true)。使用volatile可以在一定程度上防止这种类型的编译器优化。

独立指令可以被优化编译器重新排序,或者在运行时由 CPU 的乱序执行优化重新排序。如果代码需要变量被修改和检查以及精确的顺序,这些复杂的优化。

一个相关的挑战是 CPU 核心包括数据缓存,用于存储最近读取或修改的主内存值。修改后的值可能不会立即写回主内存或重新从内存中读取。因此,数据更改,例如上面示例中的标志和转换变量的状态,可能不会在两个 CPU 核心之间共享。

但是有一个美好的结局。幸运的是,现代硬件使用“内存栅栏”(也称为内存屏障)CPU 指令来解决这些问题,以确保主内存和 CPU 缓存处于合理和一致的状态。更高级别的同步原语,如pthread_mutex_lock,将调用这些 CPU 指令作为其实现的一部分。因此,在实践中,使用互斥锁的临界区周围的锁定和解锁调用足以忽略这些低级问题。

进一步阅读:我们建议阅读以下网帖,讨论在 x86 进程上实现 Peterson 算法以及关于内存屏障的 Linux 文档。

bartoszmilewski.com/2008/11/05/who-ordered-memory-fences-on-an-x86/lxr.free-electrons.com/source/Documentation/memory-barriers.txt

硬件解决方案

我们如何在硬件上实现临界区问题?

我们可以使用 C11 原子操作来完美地做到这一点!完整的解决方案在这里详细说明(这是一个自旋锁互斥体,futex的实现可以在网上找到)。

typedef struct mutex_{
    atomic_int_least8_t lock;
    pthread_t owner;
} mutex;
#define UNLOCKED 0
#define LOCKED 1
#define UNASSIGNED_OWNER 0
int mutex_init(mutex* mtx){
    if(!mtx){
        return 0;
    }
    atomic_init(&mtx->lock, UNLOCKED); // Not thread safe the user has to take care of this
    mtx->owner = UNASSIGNED_OWNER;
    return 1;
}

这是初始化代码,这里没有什么花哨的。我们将互斥体的状态设置为未锁定,并将所有者设置为已锁定。

int mutex_lock(mutex* mtx){
    int_least8_t zero = UNLOCKED;
    while(!atomic_compare_exchange_weak_explicit
            (&mtx->lock, 
             &zero, 
             LOCKED,
             memory_order_relaxed,
             memory_order_relaxed)){
        zero = UNLOCKED;
        sched_yield(); //Use system calls for scheduling speed
    }
    //We have the lock now!!!!
    mtx->owner = pthread_self();
    return 1;
}

天啊!这段代码是做什么的?首先,它初始化一个变量,我们将保持为未锁定状态。原子比较和交换是大多数现代架构支持的指令(在 x86 上是lock cmpxchg)。这个操作的伪代码看起来像这样

int atomic_compare_exchange_pseudo(int* addr1, int* addr2, int val){
    if(*addr1 == *addr2){
        *addr1 = val;
        return 1;
    }else{
        *addr2 = *addr1;
        return 0;
    }
}

除了它是原子完成的,意味着在一个不可中断的操作中完成。部分是什么意思?原子指令也容易出现虚假失败,这意味着这些原子函数有两个版本,一个和一个,强保证成功或失败,而弱可能失败。我们使用弱部分是因为弱部分更快,而且我们在一个循环中!这意味着如果它失败得更频繁,我们也没关系,因为我们会继续旋转。

这个内存顺序是什么?我们之前讨论过内存栅栏,这就是它!我们不会详细讨论,因为这超出了本课程的范围,但不超出本文的范围。

在 while 循环内,我们未能获取到锁!我们将零重置为 unlocked 并睡一会儿。当我们醒来时,我们再次尝试获取锁。一旦成功交换,我们就进入了临界区!我们为解锁方法设置了互斥体的所有者,并返回成功。

在使用原子操作时,这如何保证互斥性,我们并不完全确定!但在这个简单的例子中,我们可以,因为能够成功期望锁处于 UNLOCKED(0)状态并将其交换到 LOCKED(1)状态的线程被认为是赢家。我们如何实现解锁?

int mutex_unlock(mutex* mtx){
    if(unlikely(pthread_self() != mtx->owner)){
        return 0; //You can't unlock a mutex if you aren't the owner
    }
    int_least8_t one = 1;
    //Critical section ends after this atomic
    mtx->owner = UNASSIGNED_OWNER;
    if(!atomic_compare_exchange_strong_explicit(
                &mtx->lock, 
                &one, 
                UNLOCKED,
                memory_order_relaxed,
                memory_order_relaxed)){
        //The mutex was never locked in the first place
        return 0;
    }
    return 1;
}

为了满足 API,除非你是拥有它的人,否则你不能解锁互斥体。然后我们取消互斥体所有者,因为在原子操作之后临界区已经结束。我们希望进行强交换,因为我们不想阻塞(pthread_mutex_unlock 不会阻塞)。我们期望互斥体被锁住,然后将其交换到解锁状态。如果交换成功,我们就解锁了互斥体。如果交换失败,这意味着互斥体是 UNLOCKED,我们试图将其从 UNLOCKED 切换到 UNLOCKED,保持解锁的非阻塞。

同步,第五部分:条件变量

条件变量简介

热身

给这些属性命名!

  • “一次只有一个进程(/线程)可以进入 CS”
  • “如果等待,那么另一个进程只能进入 CS 有限次数”
  • “如果没有其他进程在 CS 中,那么进程可以立即进入 CS”

参见Synchronization, Part 4: The Critical Section Problem获取答案。

什么是条件变量?如何使用它们?什么是虚假唤醒?

  • 条件变量允许一组线程睡眠,直到被唤醒!您可以唤醒一个线程或所有正在睡眠的线程。如果只唤醒一个线程,那么操作系统将决定唤醒哪个线程。您不直接唤醒线程,而是’信号’条件变量,然后将唤醒一个(或所有)正在条件变量内睡眠的线程。
  • 条件变量与互斥锁和循环一起使用(用于检查条件)。
  • 偶尔,一个等待的线程可能会出现无缘无故地唤醒(这称为虚假唤醒)!这不是问题,因为您总是在循环内使用wait,该循环测试必须为真才能继续。
  • 在条件变量中睡眠的线程通过调用pthread_cond_broadcast(唤醒所有)或pthread_cond_signal(唤醒一个)来唤醒。请注意,尽管函数名中有"signal",但这与 POSIX 的signal无关!

pthread_cond_wait做什么?

调用pthread_cond_wait执行三个动作:

  • 解锁互斥锁
  • 等待(睡眠,直到在同一条件变量上调用pthread_cond_signal
  • 在返回之前,锁定互斥锁

(高级话题)为什么条件变量也需要互斥锁?

条件变量需要互斥锁有三个原因。最容易理解的是,它可以防止早期唤醒消息(signalbroadcast函数)被“丢失”。想象一下以下事件序列(时间向下运行页面),其中条件在调用pthread_cond_wait之前 _ 刚好 _ 满足。在这个例子中,唤醒信号丢失了!

线程 1 线程 2
while( answer < 42) {
answer++
p_cond_signal(cv)
p_cond_wait(cv,m)

如果两个线程都锁定了互斥锁,则在调用pthread_cond_wait(cv, m)之后(然后在内部解锁互斥锁)之后才能发送信号

第二个常见的原因是,更新程序状态(answer变量)通常需要互斥锁 - 例如,多个线程可能正在更新answer的值。

第三个微妙的原因是满足实时调度的考虑,我们在这里只概述:在时间关键的应用程序中,等待的线程应该允许具有最高优先级的线程先继续。为了满足这个要求,调用pthread_cond_signalpthread_cond_broadcast之前也必须锁定互斥锁。对于好奇的人,可以在这里找到更长的历史讨论。

为什么会存在虚假唤醒?

出于性能考虑。在多 CPU 系统上,可能会发生竞争条件,导致唤醒(信号)请求被忽略。内核可能不会检测到这个丢失的唤醒调用,但可以检测到可能发生的情况。为了避免潜在的丢失信号,唤醒线程以便程序代码可以再次测试条件。

例子

条件变量总是与互斥锁一起使用。

在调用wait之前,必须锁定互斥锁,并且wait必须用循环包装。

pthread_cond_t cv;
pthread_mutex_t m;
int count;
// Initialize
pthread_cond_init(&cv, NULL);
pthread_mutex_init(&m, NULL);
count = 0;
pthread_mutex_lock(&m);
while (count < 10) {
   pthread_cond_wait(&cv, &m); 
/* Remember that cond_wait unlocks the mutex before blocking (waiting)! */
/* After unlocking, other threads can claim the mutex. */
/* When this thread is later woken it will */
/* re-lock the mutex before returning */
}
pthread_mutex_unlock(&m);
//later clean up with pthread_cond_destroy(&cv); and mutex_destroy 
// In another thread increment count:
while (1) {
  pthread_mutex_lock(&m);
  count++;
  pthread_cond_signal(&cv);
  /* Even though the other thread is woken up it cannot not return */
  /* from pthread_cond_wait until we have unlocked the mutex. This is */
  /* a good thing! In fact, it is usually the best practice to call */
  /* cond_signal or cond_broadcast before unlocking the mutex */
  pthread_mutex_unlock(&m);
}

实现计数信号量

  • 我们可以使用条件变量实现计数信号量。
  • 每个信号量都需要一个计数、一个条件变量和一个互斥锁
typedef struct sem_t {
  int count; 
  pthread_mutex_t m;
  pthread_condition_t cv;
} sem_t;

实现sem_init以初始化互斥锁和条件变量

int sem_init(sem_t *s, int pshared, int value) {
  if (pshared) { errno = ENOSYS /* 'Not implemented'*/; return -1;}
  s->count = value;
  pthread_mutex_init(&s->m, NULL);
  pthread_cond_init(&s->cv, NULL);
  return 0;
}

我们的sem_post实现需要增加计数。我们还将唤醒任何在条件变量内睡眠的线程。请注意,我们锁定并解锁互斥锁,因此一次只有一个线程可以在临界区内。

sem_post(sem_t *s) {
  pthread_mutex_lock(&s->m);
  s->count++;
  pthread_cond_signal(&s->cv); /* See note */
  /* A woken thread must acquire the lock, so it will also have to wait until we call unlock*/
  pthread_mutex_unlock(&s->m);
}

我们的sem_wait实现可能需要睡眠,如果信号量的计数为零。就像sem_post一样,我们使用锁来包装临界区(这样一次只有一个线程可以执行我们的代码)。请注意,如果线程确实需要等待,那么互斥锁将被解锁,允许另一个线程进入sem_post并唤醒我们的睡眠!

请注意,即使线程被唤醒,在从pthread_cond_wait返回之前,它必须重新获取锁,因此它将不得不等待一小段时间(例如,直到sem_post完成)。

sem_wait(sem_t *s) {
  pthread_mutex_lock(&s->m);
  while (s->count == 0) {
      pthread_cond_wait(&s->cv, &s->m); /*unlock mutex, wait, relock mutex*/
  }
  s->count--;
  pthread_mutex_unlock(&s->m);
}

等待sem_post不断调用pthread_cond_signal会不会破坏sem_wait 答案:不会!在计数非零之前,我们无法跳出循环。实际上,这意味着sem_post即使没有等待的线程,也会不必要地调用pthread_cond_signal。更高效的实现只会在必要时调用pthread_cond_signal,即:

/* Did we increment from zero to one- time to signal a thread sleeping inside sem_post */
  if (s->count == 1) /* Wake up one waiting thread!*/
     pthread_cond_signal(&s->cv);

其他信号量考虑

  • 真正的信号量实现包括队列和调度问题,以确保公平性和优先级,例如唤醒最高优先级的最长睡眠线程。
  • 另外,sem_init的高级用法允许信号量在进程之间共享。我们的实现仅适用于同一进程内的线程。

同步,第六部分:实现屏障

如何等待 N 个线程在继续下一步之前到达某一点?

假设我们想要执行一个多线程计算,它有两个阶段,但我们不想在第一阶段完成之前进入第二阶段。

我们可以使用一种称为屏障的同步方法。当一个线程到达屏障时,它将在屏障处等待,直到所有线程到达屏障,然后它们将一起继续。

想象一下,就像和一些朋友一起去远足。你们约定在每个山顶等待彼此(并且你心里记下了你的团队有多少人)。假设你是第一个到达第一个山顶的人。你会在山顶等待你的朋友。他们一个接一个地到达山顶,但直到你的团队中的最后一个人到达之前,没有人会继续前进。一旦他们到达,你们就会一起继续。

Pthreads 有一个实现这一点的函数pthread_barrier_wait()。您需要声明一个pthread_barrier_t变量,并使用pthread_barrier_init()对其进行初始化。pthread_barrier_init()将参与屏障的线程数作为参数。这里有一个例子。

现在让我们实现自己的屏障,并使用它在大型计算中同步所有线程。

double data[256][8192]
1 Threads do first calculation (use and change values in data)
2 Barrier! Wait for all threads to finish first calculation before continuing
3 Threads do second calculation (use and change values in data)

线程函数有四个主要部分-

void *calc(void *arg) {
  /* Do my part of the first calculation */
  /* Am I the last thread to finish? If so wake up all the other threads! */
  /* Otherwise wait until the other threads has finished part one */
  /* Do my part of the second calculation */
}

我们的主线程将创建 16 个线程,并将每个计算分成 16 个单独的部分。每个线程将被赋予一个唯一的值(0,1,2,…15),以便它可以处理自己的块。由于(void*)类型可以保存小整数,我们将通过将其转换为 void 指针来传递i的值。

#define N (16)
double data[256][8192] ;
int main() {
    pthread_t ids[N];
    for(int i = 0; i < N; i++)  
        pthread_create(&ids[i], NULL, calc, (void *) i);

请注意,我们永远不会将此指针值解引用为实际的内存位置-我们只会将其直接转换回整数:

void *calc(void *ptr) {
// Thread 0 will work on rows 0..15, thread 1 on rows 16..31
  int x, y, start = N * (int) ptr;
  int end = start + N; 
  for(x = start; x < end; x++) for (y = 0; y < 8192; y++) { /* do calc #1 */ }

第 1 个计算完成后,我们需要等待较慢的线程(除非我们是最后一个线程!)。因此,跟踪已经到达我们的屏障(也称为“检查点”)的线程数量:

// Global: 
int remain = N;
// After calc #1 code:
remain--; // We finished
if (remain ==0) {/*I'm last!  -  Time for everyone to wake up! */ }
else {
  while (remain != 0) { /* spin spin spin*/ }
}

然而,上述代码存在竞争条件(两个线程可能尝试递减remain),并且循环是一个忙循环。我们可以做得更好!让我们使用条件变量,然后我们将使用广播/信号函数唤醒睡眠的线程。

提醒一下,条件变量类似于一个房子!线程在那里睡觉(pthread_cond_wait)。您可以选择唤醒一个线程(pthread_cond_signal)或所有线程(pthread_cond_broadcast)。如果当前没有线程在等待,那么这两个调用将不起作用。

条件变量版本通常与忙循环不正确的解决方案非常相似-接下来我们将展示。首先,让我们添加一个互斥锁和条件全局变量,不要忘记在main中初始化它们…

//global variables
pthread_mutex_t m;
pthread_cond_t cv;
main() {
  pthread_mutex_init(&m, NULL);
  pthread_cond_init(&cv, NULL);

我们将使用互斥锁来确保只有一个线程在一次修改remain。最后到达的线程需要唤醒所有睡眠的线程-因此我们将使用pthread_cond_broadcast(&cv)而不是pthread_cond_signal

pthread_mutex_lock(&m);
remain--; 
if (remain ==0) { pthread_cond_broadcast(&cv); }
else {
  while(remain != 0) { pthread_cond_wait(&cv, &m); }
}
pthread_mutex_unlock(&m);

当线程进入pthread_cond_wait时,它释放互斥锁并进入睡眠状态。在将来的某个时刻,它将被唤醒。一旦我们将线程从睡眠中唤醒,它在返回之前必须等待直到可以锁定互斥锁。请注意,即使一个睡眠的线程提前醒来,它也会检查 while 循环条件并在必要时重新进入等待。

上述屏障不可重用这意味着如果我们将其放入任何旧的计算循环中,代码很可能会遇到屏障死锁或线程比一个迭代更快的情况。思考一下如何使上述屏障可重用,这意味着如果多个线程在循环中调用barrier_wait,则可以保证它们处于相同的迭代。

同步,第七部分:读者写者问题

读者写者问题是什么?

想象一下,您有一个键值映射数据结构,被许多线程使用。只要数据结构没有被写入,多个线程应该能够同时查找(读取)值。写者不那么合群-为了避免数据损坏,一次只有一个线程可以修改(write)数据结构(此时不能有读者正在读取)。

这是读者写者问题的一个例子。也就是说,我们如何有效地同步多个读者和写者,以便多个读者可以一起阅读,但写者可以获得独占访问?

下面显示了一个不正确的尝试(“锁”是pthread_mutex_lock的简写):

尝试#1

read() {
  lock(&m)
  // do read stuff
  unlock(&m)
}
write() {
  lock(&m)
  // do write stuff
  unlock(&m)
}

至少我们的第一次尝试不会遭受数据损坏(读者必须在写者写作时等待,反之亦然)!但是读者也必须等待其他读者。所以让我们尝试另一种实现…

尝试#2:

read() {
  while(writing) {/*spin*/}
  reading = 1
  // do read stuff
  reading = 0
}
write() {
  while(reading &#124;&#124; writing) {/*spin*/}
  writing = 1
  // do write stuff
  writing = 0
}

我们的第二次尝试遭受了竞争条件的影响-想象一下,如果两个线程同时调用readwrite(或同时调用 write)。两个线程都将能够继续进行!其次,我们可以有多个读者和多个写者,因此让我们跟踪读者或写者的总数。这就是我们尝试#3,

尝试#3

请记住,pthread_cond_wait执行个动作。首先,它会原子解锁互斥锁,然后休眠(直到被pthread_cond_signalpthread_cond_broadcast唤醒)。第三,唤醒的线程必须在返回之前重新获取互斥锁。因此,只有一个线程实际上可以在由 lock 和 unlock()方法定义的临界区域内运行。

下面的实现#3 确保如果有任何写者在写作,读者将进入 cond_wait。

read() {
    lock(&m)
    while (writing)
        cond_wait(&cv, &m)
    reading++;
/* Read here! */
    reading--
    cond_signal(&cv)
    unlock(&m)
}

但是因为候选#3 在读取之前没有解锁互斥锁,所以一次只能有一个读者读取。更好的版本在读取之前解锁:

read() {
    lock(&m);
    while (writing)
        cond_wait(&cv, &m)
    reading++;
    unlock(&m)
/* Read here! */
    lock(&m)
    reading--
    cond_signal(&cv)
    unlock(&m)
}

这是否意味着写者和读者可以同时读和写?不!首先,记住 cond_wait 要求线程在返回之前重新获取互斥锁。因此,只有一个线程可以在临界区域(用**标记)内执行代码!

read() {
    lock(&m);
**  while (writing)
**      cond_wait(&cv, &m)
**  reading++;
    unlock(&m)
/* Read here! */
    lock(&m)
**  reading--
**  cond_signal(&cv)
    unlock(&m)
}

写者必须等待所有人。互斥由锁来保证。

write() {
    lock(&m);
**  while (reading || writing)
**      cond_wait(&cv, &m);
**  writing++;
**
** /* Write here! */
**  writing--;
**  cond_signal(&cv);
    unlock(&m);
}

上述候选#3 还使用pthread_cond_signal;这只会唤醒一个线程。例如,如果许多读者正在等待写者完成,那么只有一个正在睡眠的读者将被唤醒。读者和写者应该使用cond_broadcast,以便所有线程都应该唤醒并检查它们的 while 循环条件。

饥饿的写者

上述候选#3 遭受饥饿。如果读者不断到来,那么写者将永远无法继续进行(“读取”计数永远不会减少到零)。这被称为饥饿,并且在重负载下会被发现。我们的修复方法是为写者实现有界等待。如果写者到达,他们仍然需要等待现有的读者,但是未来的读者必须被放置在“等待区”中等待写者完成。可以使用变量和条件变量来实现“等待区”(以便我们可以在写者完成后唤醒线程)。

我们的计划是,当写者到达并在等待当前读者完成之前,注册我们的写入意图(通过增加计数器’writer’)。下面是草图-

write() {
    lock()
    writer++
    while (reading || writing)
    cond_wait
    unlock()
  ...
}

并且当写者为非零时,传入的读者将不被允许继续。请注意,“写者”表示写者已到达,而“读取”和“写入”计数器表示有活动读者或写者。

read() {
    lock()
    // readers that arrive *after* the writer arrived will have to wait here!
    while(writer)
    cond_wait(&cv,&m)
    // readers that arrive while there is an active writer
    // will also wait.
    while (writing) 
        cond_wait(&cv,&m)
    reading++
    unlock
  ...
}

尝试#4

以下是我们对读者-写者问题的第一个工作解决方案。请注意,如果你继续阅读关于“读者写者问题”的内容,你会发现我们通过给予写者对锁的优先访问来解决了“第二个读者写者问题”。这个解决方案并不是最佳的。然而,它满足了我们最初的问题(N 个活跃读者,单个活跃写者,避免了如果有持续的读者流的话写者饥饿)。

你能识别出任何改进吗?例如,你会如何改进代码,以便我们只唤醒读者或一个写者?

int writers; // Number writer threads that want to enter the critical section (some or all of these may be blocked)
int writing; // Number of threads that are actually writing inside the C.S. (can only be zero or one)
int reading; // Number of threads that are actually reading inside the C.S.
// if writing !=0 then reading must be zero (and vice versa)
reader() {
    lock(&m)
    while (writers)
        cond_wait(&turn, &m)
    // No need to wait while(writing here) because we can only exit the above loop
    // when writing is zero
    reading++
    unlock(&m)
  // perform reading here
    lock(&m)
    reading--
    cond_broadcast(&turn)
    unlock(&m)
}
writer() {
    lock(&m)  
    writers++  
    while (reading || writing)   
        cond_wait(&turn, &m)  
    writing++  
    unlock(&m)  
    // perform writing here 
    lock(&m)  
    writing--  
    writers--  
    cond_broadcast(&turn)  
    unlock(&m)  
}

UIUC CS241 讲义:众包系统编程书(5)https://developer.aliyun.com/article/1427163

相关文章
|
3月前
|
数据可视化 Swift 开发者
零一万物开源Yi系列“理科状元”Yi-9B,消费级显卡可跑,魔搭社区最佳实践
零一万物发布并开源了Yi系列中的“理科状元”——Yi-9B,可在魔搭体验
|
3月前
|
存储 安全 网络协议
UIUC CS241 讲义:众包系统编程书(8)
UIUC CS241 讲义:众包系统编程书(8)
200 0
|
3月前
|
存储 NoSQL 编译器
UIUC CS241 讲义:众包系统编程书(1)
UIUC CS241 讲义:众包系统编程书(1)
71 0
|
3月前
|
XML 数据采集 JSON
XPath 技术介绍
XPath 技术介绍
|
3月前
|
存储 安全 NoSQL
UIUC CS241 讲义:众包系统编程书(2)
UIUC CS241 讲义:众包系统编程书(2)
122 0
|
3月前
|
网络协议 算法 安全
UIUC CS241 讲义:众包系统编程书(6)
UIUC CS241 讲义:众包系统编程书(6)
116 0
|
1月前
|
机器学习/深度学习 人工智能 自然语言处理
|
3月前
|
人工智能 搜索推荐 算法
个性化图像生成时代来了!六大顶尖高校联手发布全新Gen4Gen框架
【5月更文挑战第7天】六大顶尖高校联合发布的Gen4Gen框架引领个性化图像生成新纪元。该框架通过创新数据处理,实现半自动化数据集创建,提高文本到图像扩散模型性能,尤其在多概念个性化生成方面取得突破。Gen4Gen使用CP-CLIP和TI-CLIP指标评估性能,并基于MyCanvas数据集验证有效性。尽管面临挑战,如大型语言模型的局限性,但研究将继续探索优化数据集质量和使用多模态模型提升图像生成效果。论文链接:https://arxiv.org/abs/2402.15504
33 2
|
3月前
|
机器学习/深度学习 语音技术 数据库
ICLR 2024:为音视频分离提供新视角,清华大学胡晓林团队推出RTFS-Net
【2月更文挑战第17天】ICLR 2024:为音视频分离提供新视角,清华大学胡晓林团队推出RTFS-Net
74 1
ICLR 2024:为音视频分离提供新视角,清华大学胡晓林团队推出RTFS-Net
|
3月前
|
人工智能 自然语言处理
浙大联合微软等提出全新视频编辑统一框架UniEdit
【2月更文挑战第13天】浙大联合微软等提出全新视频编辑统一框架UniEdit
39 2
浙大联合微软等提出全新视频编辑统一框架UniEdit