基于嵌入式操作系统VxWorks的多任务并发程序设计(4)――任务间通信A

简介:
基于嵌入式操作系统 VxWorks 的多任务并发程序设计( 4
――任务间通信
作者:宋宝华  e-mail:[email]21cnbao@21cn.com[/email] 出处:软件报
VxWorks 提供了多种任务间通信方式,主要有:
1 )共享内存 (Shared Memory) ,用于简单的数据共享;
由于 VxWorks 的所有任务存在于单一的线性地址空间,所以任务间可共享数据,全局变量、线性队列、环形队列、链表、指针都可被具有不同上下文的任务访问。
2 )信号量 (Semaphore) ,用于互斥和同步;
信号量是 VxWorks 所提供的最快速的任务间通信机制,它主要用于解决任务间共享资源的互斥访问和同步。针对问题的不同类型, VxWorks 提供了三种信号量:二进制 (binary) 信号量、互斥 (mutual exclusion) 信号量、计数 (counting) 信号量。二进制信号量和互斥信号量只能处于 0 1 两种状态,可以看作计数值为 1 的计数信号量。二进制信号量可用于同步与互斥,而互斥信号量只能用于互斥,但其支持优先级继承机制
3 )消息队列 (Message queues) 和管道 (Pipe) ,单 CPU 内任务间的信息传送;
消息机制使用一个被各有关进程共享的消息队列,任务之间经由这个消息队列发送和接收消息。管道则是受驱动器 pipeDrv 管理的虚拟 I/O 设备,它提供了一种灵活的消息传送机制。任务能调用标准的 I/O 函数打开、读出、写入管道。
4 )套接字 (Socket) 和远程过程调用 (RPC) ,用于网络间任务消息传送;
与其它操作系统一样, Vxworks 和网络协议的接口靠套接字来实现,套接字可实现运行在 VxWorks 系统或其它系统之间任务的信息传送。远程过程调用允许任务调用另一运行 VxWorks 或其它系统的主机上的过程。
5 )信号 (Signals) ,用于异常处理 (Exception handling)

7.任务间通信

7.1 信号量

VxWorks 主要提供如下 API 进行信号量的创建、获取和释放:
1 semBCreate( ) :分配并初始化一个二进制信号量,函数原型为:
SEM_ID      semBCreate
    (
      int         options,      /* 信号量选项 */
      SEM_B_STATE initialState  /* 信号量初始化状态值 */
    ) ;
2 semMCreate( ) :分配并初始化一个互斥信号量,函数原型为:
SEM_ID         semBCreate
       (
         int                     options,         /* 信号量选项 */
         SEM_B_STATE      initialState      /* 信号量初始化状态值 */
       );
3 semCCreate( ) :分配并初始化一个计数信号量,函数原型为:
SEM_ID         semCCreate
       (
         int              options,         /* 信号量选项 */
         int              initialCount    /* 信号量初始计数值 */
       ) ;
当一个信号量被创建时,它的队列 (queue) 类型需要被确定。等待信号量的任务队列可以以优先级顺序  (SEM_Q_PRIORITY) 或者先到先得方式 (SEM_Q_FIFO) 排列。
4 semDelete( ) :删除一个自由的信号量,函数原型为:
STATUS         semDelete
       (
        SEM_ID     semId     /* 要删除的信号量 ID */
       );
5 semTake( ) :占有一个信号量,函数原型为:
STATUS         semTake
       (
        SEM_ID     semId     /* 所要得到的信号量 ID */
        int         timeout /* 等待时间 */
       );
6 semGive( ) :释放一个信号量,函数原型为:
STATUS         semGive
       (
        SEM_ID     semId     /* 所给出的信号量 ID */
       );
7 semFlush( ) :解锁所有等待信号量的任务,函数原型为:
STATUS         semFlush
       (
        SEM_ID     semId     /* 要解锁的信号量 ID */
       );

7.1.1 二进制信号量

1 :二进制信号量
/* includes */
#include "vxWorks.h"
#include "taskLib.h"
#include "semLib.h"
#include "stdio.h"
 
/* function prototypes */
void taskOne(void);
void taskTwo(void);
 
/* globals */
#define ITER 10
SEM_ID semBinary;
int global = 0;
 
void binary(void)
{
  int taskIdOne, taskIdTwo;
 
  /* create semaphore with semaphore available and queue tasks on FIFO basis */
  semBinary = semBCreate(SEM_Q_FIFO, SEM_FULL);
 
  /* Note 1: lock the semaphore for scheduling purposes */
  semTake(semBinary, WAIT_FOREVER);
 
  /* spawn the two tasks */
  taskIdOne = taskSpawn("t1", 90, 0x100, 2000, (FUNCPTR)taskOne, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
  taskIdTwo = taskSpawn("t2", 90, 0x100, 2000, (FUNCPTR)taskTwo, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
}
 
void taskOne(void)
{
  int i;
  for (i = 0; i < ITER; i++)
  {
    semTake(semBinary, WAIT_FOREVER); /* wait indefinitely for semaphore */
    printf("I am taskOne and global = %d......................\n", ++global);
    semGive(semBinary); /* give up semaphore */
  }
}
 
void taskTwo(void)
{
  int i;
  semGive(semBinary); /* Note 2: give up semaphore(a scheduling fix) */
  for (i = 0; i < ITER; i++)
  {
    semTake(semBinary, WAIT_FOREVER); /* wait indefinitely for semaphore */
    printf("I am taskTwo and global = %d----------------------\n", --global);
    semGive (semBinary); /* give up semaphore */
  }
}
上述程序通过 semTake semGive 保护每次使用 printf 输出一串信息时不被间断。
要创建一个发挥互斥作用的二进制信号量一般使用 semBCreat(xxx, SEM_FULL) 调用,其中的 SEM_FULL 暗示该信号量用于任务间的互斥(最开始二进制信号量可获得)。对临界区域( critical region )的访问需以 semTake semGive 加以保护:
semTake (semMutex, WAIT_FOREVER);
. .  /*critical region, only accessible by a single task at a time*/
semGive (semMutex);
要创建一个发挥同步作用的二进制信号量一般使用 semBCreat(xxx, SEM_EMPTY)  调用,其中的 SEM_EMPTY  暗示该信号量用于任务间的同步(即最开始二进制信号量不可获得)。
二进制信号量使用最广泛的一种情况是中断与任务间通信。中断服务程序一般以二进制信号量“通知”对应的任务进行中断后的处理工作,例如:
SEM_ID syncSem;/* ID of sync semaphore */
myTask(void)
{
  semTake(syncSem, WAIT_FOREVER); /* wait for event to occur */
  printf("my Task got the semaphore\n");
  ... /* process event */
}
 
eventInterruptSvcRout(void)
{
  semGive(syncSem); /* let my Task process event */
  ...
}

7.1.2 互斥信号量

互斥信号量可以看作一种特殊的二进制信号量,其支持普通二进制信号量不支持的一些特性,提供优先级继承、安全删除和回归能力。互斥信号量的使用方法和二进制信号量基本类似,但有如下区别:
1 )仅仅被用做互斥,不能提供同步机制;
2 )只能被使用它的任务释放;
3 )中断服务程序( ISR )不能释放它;
4 )不能使用函数 semFlush( )
5 )支持使用二进制信号量进行互斥时所不支持的优先级“翻转”。
任务的优先级翻转是指高优先级任务因等待低优先级任务占用的互斥资源而被较低优先级(高于低优先级但低于高优先级)的任务不断抢占的情况。 VxWorks 操作系统提供优先级继承机制对优先级翻转进行预防。占用互斥资源但优先级较低的任务被暂时地提高到等待该资源的最高优先级任务的优先级。这样,中等优先级的任务将无法抢占原本低优先级的任务,使得低优先级任务能尽快执行,释放出优先级较高的任务所需要的资源。
为了使互斥信号量支持优先级继承支持,我们在调用 semMCreate 时应使用 SEM_Q_PRIORITY SEM_INVERSION_SAFE 选项。互斥信号量提供互斥也需要对临界区域进行保护:
semTake (semMutex, WAIT_FOREVER);
. .  //critical region, only accessible by a single task at a time .
semGive (semMutex);

8.消息队列和管道

VxWorks 主要提供如下 API 进行消息队列的创建、读取和传递:
msgQCreate( ) :创建斌初始化一个消息队列,函数原型为:
MSG_Q_ID  msgQCreate
   (
       int    maxMsgs,             /* 队列所能容纳的最大消息数目 */
       int    maxMsgLength,    /* 每一消息的最大长度 */
       int    options          /* 消息入列方式 */
   );
msgQDelete( ) :终止并释放一个消息队列,函数原型为:
STATUS       msgQDelete
    (
       MSG_Q_ID  msgQId  /* 要删除的消息队列 ID */
    );
msgQSend( ) :发送一个消息到消息队列,函数原型为:
STATUS       msgQSend
    (
       MSG_Q_ID  msgQId, /* 所发向的消息队列名 */
       char *     buffer,     /* 消息包所在缓冲区指针 */
       UINT     nBytes,   /* 消息包长度 */
       int           timeout,  /* 等待的时间长度 */
       int           priority   /* 优先级 */
   );
msgQReceive( ) :从消息队列接受一个消息,函数原型为:
int msgQReceive
   (
       MSG_Q_ID  msgQId, /* 接收消息的消息队列 ID */
       char *     buffer,     /* 接收消息的缓冲区指针 */
       UINT            maxNBytes,  /* 缓冲区长度 */
       int           timeout   /* 等待时间 */
   );
下面我们以著名的生产者 / 消费者问题作为例子来说明消息队列的用途:
2 :消息队列
/* includes */
#include "vxWorks.h"
#include "taskLib.h"
#include "msgQLib.h"
#include "sysLib.h"
#include "stdio.h"
/* globals */
#define  CONSUMER_TASK_PRI   99   /* Priority of the consumer task */
#define  PRODUCER_TASK_PRI  98   /* Priority of the producer task */
#define  TASK_STACK_SIZE    5000   /* stack size for spawned tasks */
struct msg {                         /* data structure for msg passing */
         int tid;                         /* task id */        
         int value;                       /* msg value */
        };
 
LOCAL MSG_Q_ID msgQId;                   /* message queue id */
LOCAL int numMsg = 8;                    /* number of messages */
LOCAL BOOL notDone;           /* Flag to indicate the completion */
 
/* function prototypes */
LOCAL void producerTask(); /* producer task */
LOCAL void consumerTask(); /* consumer task */
 
/* user entry */
void msgQDemo()
{
  notDone = TRUE; /* initialize the global flag */
 
  /* Create the message queue*/
  if ((msgQId = msgQCreate(numMsg, sizeof(struct msg), MSG_Q_FIFO)) == NULL)
  {
    perror("Error in creating msgQ");
  }
 
  /* Spawn the producerTask task */
  if (taskSpawn("tProducerTask", PRODUCER_TASK_PRI, 0, TASK_STACK_SIZE, (FUNCPTR)producerTask, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) == ERROR)
  {
    perror("producerTask: Error in spawning demoTask");
  }
  /* Spawn the consumerTask task */
  if (taskSpawn("tConsumerTask", CONSUMER_TASK_PRI, 0, TASK_STACK_SIZE, (FUNCPTR)consumerTask, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) == ERROR)
  {
    perror("consumerTask: Error in spawning demoTask");
  }
 
  /* polling is not recommended. But used to make this demonstration simple*/
  while (notDone)
    taskDelay(sysClkRateGet());
 
  if (msgQDelete(msgQId) == ERROR)
  {
    perror("Error in deleting msgQ");
  }
}
 
/* producerTask  :发送消息给 consumerTask */
void producerTask(void)
{
  int count;
  int value;
  struct msg producedItem; /* producer item - produced data */
 
  printf("producerTask started: task id = %#x \n", taskIdSelf());
 
  /* Produce numMsg number of messages and send these messages */
 
  for (count = 1; count <= numMsg; count++)
  {
    value = count * 10; /* produce a value */
 
    /* Fill in the data structure for message passing */
    producedItem.tid = taskIdSelf();
    producedItem.value = value;
 
    /* Send Messages */
    if ((msgQSend(msgQId, (char*) &producedItem, sizeof(producedItem),
      WAIT_FOREVER, MSG_PRI_NORMAL)) == ERROR)
    {
      perror("Error in sending the message");
    }
    else
      printf("ProducerTask: tid = %#x, produced value = %d \n", taskIdSelf(),
        value);
  }
}
 
/* consumerTask :获取(消费)消息  */
void consumerTask(void)
{
  int count;
  struct msg consumedItem; /* consumer item - consumed data */
 
  printf("\n\nConsumerTask: Started -  task id = %#x\n", taskIdSelf());
 
  /* consume numMsg number of messages */
  for (count = 1; count <= numMsg; count++)
  {
    /* Receive messages */
    if ((msgQReceive(msgQId, (char*) &consumedItem, sizeof(consumedItem),
      WAIT_FOREVER)) == ERROR)
    {
      perror("Error in receiving the message");
    }
    else
      printf("ConsumerTask: Consuming msg of value %d from tid = %#x\n",
        consumedItem.value, consumedItem.tid);
  }
 
  notDone = FALSE; /* set the global flag to FALSE to indicate completion*/
}
程序运行输出:
producerTask started: task id = 0x1010f20
ProducerTask: tid = 0x1010f20, produced value = 10
ProducerTask: tid = 0x1010f20, produced value = 20
ProducerTask: tid = 0x1010f20, produced value = 30
ProducerTask: tid = 0x1010f20, produced value = 40
ProducerTask: tid = 0x1010f20, produced value = 50
ProducerTask: tid = 0x1010f20, produced value = 60
ProducerTask: tid = 0x1010f20, produced value = 70
ProducerTask: tid = 0x1010f20, produced value = 80
 
ConsumerTask: Started -  task id = 0x100c840
ConsumerTask: Consuming msg of value 10 from tid = 0x1010f20
ConsumerTask: Consuming msg of value 20 from tid = 0x1010f20
ConsumerTask: Consuming msg of value 30 from tid = 0x1010f20
ConsumerTask: Consuming msg of value 40 from tid = 0x1010f20
ConsumerTask: Consuming msg of value 50 from tid = 0x1010f20
ConsumerTask: Consuming msg of value 60 from tid = 0x1010f20
ConsumerTask: Consuming msg of value 70 from tid = 0x1010f20
ConsumerTask: Consuming msg of value 80 from tid = 0x1010f20
我们以 WINDVIEW 捕获上述程序运行时的状态,   表示发送消息,而   表示任务接收消息:
从程序运行的输出结果和上图可以看出,生产者“生产”了 numMsg 个(即 8 个)消息,消费者也“消费”了 numMsg 个消息。



 本文转自 21cnbao 51CTO博客,原文链接:http://blog.51cto.com/21cnbao/120324,如需转载请自行联系原作者


相关文章
|
1月前
|
安全 Unix Linux
Unix是一个多用户、多任务的操作系统
Unix是一个多用户、多任务的操作系统
86 3
|
18天前
|
存储 iOS开发 MacOS
MacOS环境-手写操作系统-33-多任务多窗口
MacOS环境-手写操作系统-33-多任务多窗口
16 0
|
1月前
|
Web App开发 Linux iOS开发
操作系统的演变:从单任务到多核并发
在数字时代的浪潮中,操作系统作为计算机硬件与应用程序之间的桥梁,其发展历史充满了创新与变革。本文将带领读者穿越时空,探索操作系统如何从简单的单任务处理演化为今天能够高效管理多核处理器的复杂系统。我们将一窥各个时代下操作系统的设计哲学,以及它们是如何影响现代计算的方方面面。加入我们的旅程,一起见证技术的力量如何在每次迭代中重塑世界。
37 7
|
1月前
|
机器学习/深度学习 人工智能 算法
操作系统的未来:从多任务到深度学习的演变之路
本文将探讨操作系统如何从处理简单多任务发展到支持复杂的深度学习任务。我们将分析现代操作系统面临的新挑战,以及它们如何适应人工智能和大数据时代的要求。文章不仅回顾过去,也展望未来,思考操作系统在技术演进中的角色和方向。
48 3
|
1月前
|
人工智能 算法 数据挖掘
操作系统的演变:从单任务到多任务的旅程
操作系统(OS)是计算机系统的核心,它管理硬件资源、提供用户界面并运行应用程序。本文将探讨操作系统如何从单任务环境演变为支持多任务的环境,包括这一过程中的技术挑战和解决方案。我们将看到,随着计算需求的增长,操作系统必须适应更复杂的任务管理和资源分配策略,以提高效率和用户体验。通过这个旅程,我们不仅能够理解操作系统的发展,还能洞察未来可能的趋势。
47 5
|
2月前
|
机器学习/深度学习 人工智能 安全
操作系统的未来:从多任务处理到人工智能
【8月更文挑战第23天】本文将探讨操作系统的发展历程及其未来趋势,特别是人工智能在操作系统中的应用。我们将看到如何通过引入人工智能技术,操作系统能够更加智能化地管理资源,提高系统性能和用户体验。
|
2月前
|
缓存 安全 数据库
探索后端开发的核心原则与实践操作系统的未来:从多任务处理到智能优化
【8月更文挑战第23天】在数字化时代的浪潮中,后端开发作为技术架构的支柱,承载着数据处理、业务逻辑实现和系统性能优化的关键任务。本文将深入探讨后端开发的几大核心原则,包括模块化设计、性能优化、安全性强化及可维护性提升,旨在为读者揭示如何构建一个健壮、高效且安全的后端系统。通过分析这些原则背后的理念及其在实际开发中的应用,本文意在启发读者思考如何在不断变化的技术环境中,持续优化后端开发实践,以适应新的挑战和需求。
|
2月前
|
调度 UED
操作系统中的多任务处理机制
【8月更文挑战第23天】在数字时代,操作系统的核心功能之一是多任务处理。它允许用户同时运行多个程序,优化资源使用,并提高生产效率。本文将深入探讨操作系统如何实现多任务处理,以及这一机制对用户体验和系统性能的影响。通过理解多任务处理的工作原理,用户可以更好地管理计算资源,提升个人和组织的工作效率。
|
2月前
|
人工智能 算法 物联网
操作系统的演变:从单任务到多任务的旅程
【8月更文挑战第20天】在数字时代的浪潮中,操作系统作为计算机系统的核心,经历了从简单的单任务处理到复杂的多任务并行处理的转变。本文将探讨这一转变过程中的关键里程碑,包括早期的批处理系统、多道程序设计的出现、分时系统的创新以及现代操作系统中的多任务处理机制。通过这一旅程,我们将了解操作系统如何适应不断增长的计算需求,并预见未来可能的发展方向。
|
2月前
|
机器学习/深度学习 人工智能 自动驾驶
操作系统的演化之路:从单任务到多任务处理
【8月更文挑战第16天】 本文将探索操作系统(OS)的演进历程,聚焦于它们如何从处理单一任务的简单系统,发展成为能够同时处理多个任务的复杂系统。我们将分析这一转变背后的技术驱动因素,以及它对用户体验和系统性能的影响。文章还将探讨现代操作系统在面对日益增长的计算需求时所面临的挑战,以及未来的发展方向。