linux下C语言实现多线程通信—环形缓冲区,可用于生产者(producer)/消费者(consumer)【转】

简介:

转自:http://blog.chinaunix.net/uid-28458801-id-4262445.html

操作系统:ubuntu10.04

前言:
    在嵌入式开发中,只要是带操作系统的,在其上开发产品应用,基本都需要用到多线程。
    为了提高效率,尽可能的提高并发率。因此,线程之间的通信就是问题的核心。
    根据当前产品需要,使用 环形缓冲区 解决。

一,环形缓冲区的实现
    1,cbuf.h

点击(此处)折叠或打开

  1. #ifndef __CBUF_H__
  2. #define __CBUF_H__

  3. #ifdef __cplusplus
  4. extern "C" {
  5. #endif

  6. /* Define to prevent recursive inclusion 
  7. -------------------------------------*/
  8. #include "types.h"
  9. #include "thread.h"


  10. typedef    struct _cbuf
  11. {
  12.     int32_t        size;            /* 当前缓冲区中存放的数据的个数 */
  13.     int32_t        next_in;        /* 缓冲区中下一个保存数据的位置 */
  14.     int32_t        next_out;        /* 从缓冲区中取出下一个数据的位置 */
  15.     int32_t        capacity;        /* 这个缓冲区的可保存的数据的总个数 */
  16.     mutex_t        mutex;            /* Lock the structure */
  17.     cond_t        not_full;        /* Full -not full condition */
  18.     cond_t        not_empty;        /Empty -not empty condition */
  19.     void        *data[CBUF_MAX];/* 缓冲区中保存的数据指针 */
  20. }cbuf_t;


  21. /* 初始化环形缓冲区 */
  22. extern    int32_t        cbuf_init(cbuf_t *c);

  23. /* 销毁环形缓冲区 */
  24. extern    void        cbuf_destroy(cbuf_t    *c);

  25. /* 压入数据 */
  26. extern    int32_t        cbuf_enqueue(cbuf_t *c,void *data);

  27. /* 取出数据 */
  28. extern    void*        cbuf_dequeue(cbuf_t *c);


  29. /* 判断缓冲区是否为满 */
  30. extern    bool        cbuf_full(cbuf_t    *c);

  31. /* 判断缓冲区是否为空 */
  32. extern    bool        cbuf_empty(cbuf_t *c);

  33. /* 获取缓冲区可存放的元素的总个数 */
  34. extern    int32_t        cbuf_capacity(cbuf_t *c);


  35. #ifdef __cplusplus
  36. }
  37. #endif

  38. #endif
  39. /END OF FILE 
  40. ---------------------------------------------------------------*/


    2,cbuf.c

点击(此处)折叠或打开

  1. #include "cbuf.h"



  2. /* 初始化环形缓冲区 */
  3. int32_t        cbuf_init(cbuf_t *c)
  4. {
  5.     int32_t    ret = OPER_OK;

  6.     if((ret = mutex_init(&c->mutex)!= OPER_OK)    
  7.     {
  8. #ifdef DEBUG_CBUF
  9.     debug("cbuf init fail ! mutex init fail !\n");
  10. #endif
  11.         return ret;
  12.     }

  13.     if((ret = cond_init(&c->not_full)!= OPER_OK)    
  14.     {
  15. #ifdef DEBUG_CBUF
  16.     debug("cbuf init fail ! cond not full init fail !\n");
  17. #endif
  18.         mutex_destroy(&c->mutex);
  19.         return ret;
  20.     }

  21.     if((ret = cond_init(&c->not_empty)!= OPER_OK)
  22.     {
  23. #ifdef DEBUG_CBUF
  24.     debug("cbuf init fail ! cond not empty init fail !\n");
  25. #endif
  26.         cond_destroy(&c->not_full);
  27.         mutex_destroy(&c->mutex);
  28.         return ret;
  29.     }

  30.     c->size     = 0;
  31.     c->next_in    = 0;
  32.     c->next_out = 0;
  33.     c->capacity    = CBUF_MAX;

  34. #ifdef DEBUG_CBUF
  35.     debug("cbuf init success !\n");
  36. #endif

  37.     return ret;
  38. }


  39. /* 销毁环形缓冲区 */
  40. void        cbuf_destroy(cbuf_t    *c)
  41. {
  42.     cond_destroy(&c->not_empty);
  43.     cond_destroy(&c->not_full);
  44.     mutex_destroy(&c->mutex);

  45. #ifdef DEBUG_CBUF
  46.     debug("cbuf destroy success \n");
  47. #endif
  48. }



  49. /* 压入数据 */
  50. int32_t        cbuf_enqueue(cbuf_t *c,void *data)
  51. {
  52.     int32_t    ret = OPER_OK;

  53.     if((ret = mutex_lock(&c->mutex)!= OPER_OK)    return ret;

  54.     /*
  55.      * Wait while the buffer is full.
  56.      */
  57.     while(cbuf_full(c))
  58.     {
  59. #ifdef DEBUG_CBUF
  60.     debug("cbuf is full !!!\n");
  61. #endif
  62.         cond_wait(&c->not_full,&c->mutex);
  63.     }

  64.     c->data[c->next_in++= data;
  65.     c->size++;
  66.     c->next_in %= c->capacity;

  67.     mutex_unlock(&c->mutex);

  68.     /*
  69.      Let a waiting consumer know there is data.
  70.      */
  71.     cond_signal(&c->not_empty);

  72. #ifdef DEBUG_CBUF
  73. //    debug("cbuf enqueue success ,data : %p\n",data);
  74.     debug("enqueue\n");
  75. #endif

  76.     return ret;
  77. }



  78. /* 取出数据 */
  79. void*        cbuf_dequeue(cbuf_t *c)
  80. {
  81.     void     *data     NULL;
  82.     int32_t    ret     = OPER_OK;

  83.     if((ret = mutex_lock(&c->mutex)!= OPER_OK)    return NULL;

  84.        /*
  85.      * Wait while there is nothing in the buffer
  86.      */
  87.     while(cbuf_empty(c))
  88.     {
  89. #ifdef DEBUG_CBUF
  90.     debug("cbuf is empty!!!\n");
  91. #endif
  92.         cond_wait(&c->not_empty,&c->mutex);
  93.     }

  94.     data = c->data[c->next_out++];
  95.     c->size--;
  96.     c->next_out %= c->capacity;

  97.     mutex_unlock(&c->mutex);


  98.     /*
  99.      Let a waiting producer know there is room.
  100.      * 取出了一个元素,又有空间来保存接下来需要存储的元素
  101.      */
  102.     cond_signal(&c->not_full);

  103. #ifdef DEBUG_CBUF
  104. //    debug("cbuf dequeue success ,data : %p\n",data);
  105.     debug("dequeue\n");
  106. #endif

  107.     return data;
  108. }


  109. /* 判断缓冲区是否为满 */
  110. bool        cbuf_full(cbuf_t    *c)
  111. {
  112.     return (c->size == c->capacity);
  113. }

  114. /* 判断缓冲区是否为空 */
  115. bool        cbuf_empty(cbuf_t *c)
  116. {
  117.     return (c->size == 0);
  118. }

  119. /* 获取缓冲区可存放的元素的总个数 */
  120. int32_t        cbuf_capacity(cbuf_t *c)
  121. {
  122.     return c->capacity;
  123. }



二,辅助文件
    为了提高程序的移植性,对线程相关进行封装。
    1,thread.h

点击(此处)折叠或打开

  1. #ifndef __THREAD_H__
  2. #define __THREAD_H__

  3. #ifdef __cplusplus
  4. extern "C" {
  5. #endif

  6. /* Define to prevent recursive inclusion 
  7. -------------------------------------*/
  8. #include "types.h"





  9. typedef    struct _mutex
  10. {
  11.     pthread_mutex_t        mutex;
  12. }mutex_t;


  13. typedef    struct _cond
  14. {
  15.     pthread_cond_t        cond;
  16. }cond_t;


  17. typedef    pthread_t        tid_t;
  18. typedef    pthread_attr_t    attr_t;
  19. typedef    void*    (* thread_fun_t)(void*);


  20. typedef    struct _thread
  21. {
  22.     tid_t            tid;
  23.     cond_t            *cv;
  24.     int32_t            state;
  25.     int32_t            stack_size;
  26.     attr_t         attr;
  27.     thread_fun_t    fun;
  28. }thread_t;



  29. /* mutex */
  30. extern    int32_t        mutex_init(mutex_t    *m);
  31. extern    int32_t        mutex_destroy(mutex_t    *m);
  32. extern    int32_t        mutex_lock(mutex_t    *m);
  33. extern    int32_t        mutex_unlock(mutex_t    *m);


  34. /* cond */
  35. extern    int32_t        cond_init(cond_t    *c);
  36. extern    int32_t        cond_destroy(cond_t    *c);
  37. extern    int32_t        cond_signal(cond_t *c);
  38. extern    int32_t        cond_wait(cond_t    *c,mutex_t *m);



  39. /* thread */
  40. /* 线程的创建,其属性的设置等都封装在里面 */
  41. extern    int32_t        thread_create(thread_t *t);
  42. //extern    int32_t        thread_init(thread_t    *t);

  43. #define    thread_join(t, p)     pthread_join(t, p)
  44. #define    thread_self()        pthread_self()
  45. #define    thread_sigmask        pthread_sigmask


  46. #ifdef __cplusplus
  47. }
  48. #endif

  49. #endif
  50. /END OF FILE 
  51. ---------------------------------------------------------------*/


    2,thread.c

点击(此处)折叠或打开

  1. #include "thread.h"




  2. /* mutex */
  3. int32_t        mutex_init(mutex_t    *m)
  4. {
  5.     int32_t        ret = OPER_OK;

  6.     if((ret = pthread_mutex_init(&m->mutexNULL)!= 0)
  7.         ret -THREAD_MUTEX_INIT_ERROR;

  8.     return ret;
  9. }


  10. int32_t        mutex_destroy(mutex_t    *m)
  11. {
  12.     int32_t        ret = OPER_OK;

  13.     if((ret = pthread_mutex_destroy(&m->mutex)!= 0)
  14.         ret -MUTEX_DESTROY_ERROR;

  15.     return ret;
  16. }



  17. int32_t        mutex_lock(mutex_t    *m)
  18. {
  19.     int32_t        ret = OPER_OK;

  20.     if((ret = pthread_mutex_lock(&m->mutex)!= 0)
  21.         ret -THREAD_MUTEX_LOCK_ERROR;

  22.     return ret;
  23. }



  24. int32_t        mutex_unlock(mutex_t    *m)
  25. {
  26.     int32_t        ret = OPER_OK;

  27.     if((ret = pthread_mutex_unlock(&m->mutex)!= 0)
  28.         ret -THREAD_MUTEX_UNLOCK_ERROR;
  29.     
  30.     return ret;
  31. }






  32. /* cond */
  33. int32_t        cond_init(cond_t    *c)
  34. {
  35.     int32_t        ret = OPER_OK;

  36.     if((ret = pthread_cond_init(&c->condNULL)!= 0)
  37.         ret -THREAD_COND_INIT_ERROR;

  38.     return ret;
  39. }



  40. int32_t        cond_destroy(cond_t    *c)
  41. {
  42.     int32_t        ret = OPER_OK;

  43.     if((ret = pthread_cond_destroy(&c->cond)!= 0)
  44.         ret -COND_DESTROY_ERROR;
  45.     
  46.     return ret;
  47. }



  48. int32_t        cond_signal(cond_t *c)
  49. {
  50.     int32_t        ret = OPER_OK;


  51.     if((ret = pthread_cond_signal(&c->cond)!= 0)
  52.         ret -COND_SIGNAL_ERROR;
  53.     
  54.     return ret;
  55. }




  56. int32_t        cond_wait(cond_t    *c,mutex_t *m)
  57. {
  58.     int32_t        ret = OPER_OK;

  59.     if((ret = pthread_cond_wait(&c->cond&m->mutex)!= 0)
  60.         ret -COND_WAIT_ERROR;    
  61.     
  62.     return ret;
  63. }



三,测试
    1,测试代码

点击(此处)折叠或打开

  1. /
  2.  * cbuf begin
  3.  */
  4. #define        OVER    (-1)

  5. static        cbuf_t    cmd;
  6. static        int        line_1[200];
  7. static        int        line_2[200];
  8. //static        int        temp = 0;

  9. static        bool    line1_finish false;
  10. static        bool    line2_finish false;

  11. void*    producer_1(void *data)
  12. {
  13.     int32_t    i = 0;

  14.     for(i = 0; i < 200; i++)
  15.     {
  16.         line_1[i= i+1000;
  17.         cbuf_enqueue(&cmd&line_1[i]);

  18.         if(0 =(i % 9)) sleep(1);
  19.     }

  20.     line1_finish true;

  21.     return NULL;
  22. }

  23. void*    producer_2(void *data)
  24. {
  25.     int32_t    i = 0;

  26.     for(i = 0; i < 200; i++)
  27.     {
  28.         line_2[i= i+20000;
  29.         cbuf_enqueue(&cmd&line_2[i]);

  30.         if(0 =(i % 9)) sleep(1);
  31.     }

  32.     line2_finish true;

  33.     return NULL;
  34. }


  35. void*    consumer(void *data)
  36. {
  37.     int32_t        *ptr NULL;

  38.     while(1)
  39.     {
  40.         ptr = cbuf_dequeue(&cmd);
  41.         printf("%d\n",*ptr);

  42.         if(cbuf_empty(&cmd&& line2_finish && line1_finish)
  43.         {
  44.             printf("quit\n");
  45.             break;
  46.         }
  47.     }

  48.     return NULL;
  49. }


  50. void    test_cbuf_oper(void)
  51. {
  52.     pthread_t    l_1;
  53.     pthread_t    l_2;
  54.     pthread_t    c;
  55.     
  56.     cbuf_init(&cmd);

  57.     pthread_create(&l_1,NULL,producer_1,0);
  58.     pthread_create(&l_2,NULL,producer_2,0);
  59.     pthread_create(&c,NULL,consumer,0);

  60.     pthread_join(l_1,NULL);
  61.     pthread_join(l_2,NULL);
  62.     pthread_join(c,NULL);

  63.     cbuf_destroy(&cmd);
  64. }


  65. void    test_cbuf(void)
  66. {
  67.     test_cbuf_oper();
  68. }


  69. /
  70.  * cbuf end
  71.  */


    2,测试结果



四,参考文件
1,《bareos-master》源码
2,《nginx》源码

相关文章
|
2月前
|
Java 调度
[Java]线程生命周期与线程通信
本文详细探讨了线程生命周期与线程通信。文章首先分析了线程的五个基本状态及其转换过程,结合JDK1.8版本的特点进行了深入讲解。接着,通过多个实例介绍了线程通信的几种实现方式,包括使用`volatile`关键字、`Object`类的`wait()`和`notify()`方法、`CountDownLatch`、`ReentrantLock`结合`Condition`以及`LockSupport`等工具。全文旨在帮助读者理解线程管理的核心概念和技术细节。
41 1
[Java]线程生命周期与线程通信
|
1月前
|
Java
JAVA多线程通信:为何wait()与notify()如此重要?
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是实现线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件满足时被唤醒,从而确保数据一致性和同步。相比其他通信方式,如忙等待,这些方法更高效灵活。 示例代码展示了如何在生产者-消费者模型中使用这些方法实现线程间的协调和同步。
38 3
|
2月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
24 1
|
2月前
|
安全 Java 开发者
Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用
本文深入解析了Java多线程中的`wait()`、`notify()`和`notifyAll()`方法,探讨了它们在实现线程间通信和同步中的关键作用。通过示例代码展示了如何正确使用这些方法,并分享了最佳实践,帮助开发者避免常见陷阱,提高多线程程序的稳定性和效率。
48 1
|
2月前
|
Java
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件成立时被唤醒,从而有效解决数据一致性和同步问题。本文通过对比其他通信机制,展示了 `wait()` 和 `notify()` 的优势,并通过生产者-消费者模型的示例代码,详细说明了其使用方法和重要性。
31 1
|
1月前
|
Linux 网络安全 数据安全/隐私保护
Linux 超级强大的十六进制 dump 工具:XXD 命令,我教你应该如何使用!
在 Linux 系统中,xxd 命令是一个强大的十六进制 dump 工具,可以将文件或数据以十六进制和 ASCII 字符形式显示,帮助用户深入了解和分析数据。本文详细介绍了 xxd 命令的基本用法、高级功能及实际应用案例,包括查看文件内容、指定输出格式、写入文件、数据比较、数据提取、数据转换和数据加密解密等。通过掌握这些技巧,用户可以更高效地处理各种数据问题。
92 8
|
1月前
|
监控 Linux
如何检查 Linux 内存使用量是否耗尽?这 5 个命令堪称绝了!
本文介绍了在Linux系统中检查内存使用情况的5个常用命令:`free`、`top`、`vmstat`、`pidstat` 和 `/proc/meminfo` 文件,帮助用户准确监控内存状态,确保系统稳定运行。
252 6
|
1月前
|
Linux
在 Linux 系统中,“cd”命令用于切换当前工作目录
在 Linux 系统中,“cd”命令用于切换当前工作目录。本文详细介绍了“cd”命令的基本用法和常见技巧,包括使用“.”、“..”、“~”、绝对路径和相对路径,以及快速切换到上一次工作目录等。此外,还探讨了高级技巧,如使用通配符、结合其他命令、在脚本中使用,以及实际应用案例,帮助读者提高工作效率。
80 3
|
1月前
|
监控 安全 Linux
在 Linux 系统中,网络管理是重要任务。本文介绍了常用的网络命令及其适用场景
在 Linux 系统中,网络管理是重要任务。本文介绍了常用的网络命令及其适用场景,包括 ping(测试连通性)、traceroute(跟踪路由路径)、netstat(显示网络连接信息)、nmap(网络扫描)、ifconfig 和 ip(网络接口配置)。掌握这些命令有助于高效诊断和解决网络问题,保障网络稳定运行。
71 2
|
14天前
|
Linux Shell
Linux 10 个“who”命令示例
Linux 10 个“who”命令示例
43 14
Linux 10 个“who”命令示例
下一篇
DataWorks