程序并发操作中,解决数据同步的四种方法

简介: 程序并发操作中,解决数据同步的四种方法

非预期结果的全局变量


下面这段代码是线程中的函数与中断处理函数对全局变量a进行操作


int a = 0;
/* 中断处理程序 */
void interrupt_handle() {
  a++;
}
/* 线程处理函数 */
void thread_func() {
  a++;
}

a++这段代码转换为汇编代码后的处理过程如下:


把a从内存中加载到某个寄存器(mov eax, ebx)

这个寄存器加一(inc eax)

把这个寄存器写回内存(mov ebx, eax)


当thread_func函数还没有运行完第二条指令时,中断就将其打断,然后进入中断开始执行interrupt_handle函数,当中断函数运行完之后a = 1,然后再回到线程环境继续执行第三条指令,这时a依然为1,这个结果显然是错误的。


为了避免上述问题,可以有两种方法解决:


把a++变为原子操作


控制中断,在执行a++时,关闭中断,执行完之后打开中断


方法一:原子操作


下面以x86平台,使用c语言内联式汇编代码进行说明。


使用代码书实现原子操作


GCC 支持嵌入汇编代码的模板,不同于其它 C 编译器支持嵌入汇编代码的方式,为了优化用户代码,GCC 设计了一种特有的嵌入方式,它规定了汇编代码嵌入的形式和嵌入汇编代码需要由哪几个部分组成,如下面代码所示。

__asm__ __volatile__(代码部分:输出部分列表: 输入部分列表:损坏部分列表);

括号里大致分为 4 个部分:


  1. 汇编代码部分,这里是实际嵌入的汇编代码。
  2. 输出列表部分,让 GCC 能够处理 C 语言左值表达式与汇编代码的结合。
  3. 输入列表部分,也是让 GCC 能够处理 C 语言表达式、变量、常量,让它们能够输入到汇编代码中去。
  4. 损坏列表部分,告诉 GCC 汇编代码中用到了哪些寄存器,以便 GCC 在汇编代码运行前,生成保存它们的代码,并且在生成的汇编代码运行后,恢复它们(寄存器)的代码。它们之间用冒号隔开,如果只有汇编代码部分,后面的冒号可以省略。但是有输入列表部分而没有输出列表部分的时候,输出列表部分的冒号就必须要写,否则 GCC 没办法判断,同样的道理对于其它部分也一样。


gcc手册

asm __volatile__详细说明

汇编指令速查


  • 以atomic_add函数为例进行说明
static inline void atomic_add(int i, atomic_t *v)
{
  __asm__ __volatile__("lock;" "addl %1,%0"
                  : "+m" (v->a_count)
                  : "ir" (i));
}
//"lock;" "addl %1,%0" 是汇编指令部分,%1,%0是占位符,它表示输出、输入列表中变量或表态式,占位符的数字从输出部分开始依次增加,这些变量或者表态式会被GCC处理成寄存器、内存、立即数放在指令中。 
//: "+m" (v->a_count) 是输出列表部分,“+m”表示(v->a_count)和内存地址关联
//: "ir" (i) 是输入列表部分,“ir” 表示i是和立即数或者寄存器关联


实现原子操作的完成代码


//定义一个原子类型
typedef struct s_ATOMIC{
  //在变量前加上volatile,是为了禁止编译器优化,使其每次都从内存中加载变量
  volatile s32_t a_count; 
}atomic_t;
//原子读
static inline s32_t atomic_read(const atomic_t *v)
{        
  //x86平台取地址处是原子
  return (*(volatile u32_t*)&(v)->a_count);
}
//原子写
static inline void atomic_write(atomic_t *v, int i)
{
  //x86平台把一个值写入一个地址处也是原子的 
  v->a_count = i;
}
//原子加上一个整数
static inline void atomic_add(int i, atomic_t *v)
{
  __asm__ __volatile__("lock;" "addl %1,%0"
                  : "+m" (v->a_count)
                  : "ir" (i));
}
//原子减去一个整数
static inline void atomic_sub(int i, atomic_t *v)
{
   __asm__ __volatile__("lock;" "subl %1,%0"
                  : "+m" (v->a_count)
                  : "ir" (i));
}
//原子加1
static inline void atomic_inc(atomic_t *v)
{
  __asm__ __volatile__("lock;" "incl %0"
                  : "+m" (v->a_count));
}
//原子减1
static inline void atomic_dec(atomic_t *v)
{
   __asm__ __volatile__("lock;" "decl %0"
                  : "+m" (v->a_count));
}

加上 lock 前缀的 addl、subl、incl、decl 指令都是原子操作

lock 前缀表示锁定总线


此时,上述应用场景中的代码就变成下面这样,无论是否有中断都不会出错

atomic_t a = {0};
void interrupt_handle()
{
  atomic_inc(&a);
}
void thread_func()
{
   atomic_inc(&a);
}


方法二:控制中断


原子操作只适合于单体变量,例如整数。而我们实际在应用时使用到的数据结构是各种各样的,这时就无法使用原子操作进行解决。


x86平台有专门的开关中断的指令,cli、sti 指令,它们主要是对 CPU 的 eflags 寄存器的 IF 位(第 9 位)进行清除和设置,CPU 正是通过此位来决定是否响应中断信号。这两条指令只能 Ring0 权限才能执行。

//关闭中断
void hal_cli()
{
  __asm__ __volatile__("cli": : :"memory");
}
//开启中断
void hal_sti()
{
  __asm__ __volatile__("sti": : :"memory");
}
//使用场景
void foo()
{
  hal_cli();
  //操作数据……
  hal_sti();
}
void bar()
{
  hal_cli();
  //操作数据……
  hal_sti();
}

上述代码在中断嵌套的情况下是不能使用的,例如:

void foo()
{
  hal_cli();
  //操作数据第一步……
  hal_sti();
}
void bar()
{
  hal_cli();
  foo();
  //操作数据第二步……
  hal_sti();
}


解决中断无法嵌套问题:

  • 在关闭中断函数中先保存 eflags 寄存器
  • 然后执行 cli 指令
  • 在开启中断函数中直接恢复之前保存的 eflags 寄存器


代码如下

typedef u32_t cpuflg_t;
static inline void hal_save_flags_cli(cpuflg_t* flags)
{
  __asm__ __volatile__(
            "pushfl \t\n" //把eflags寄存器压入当前栈顶
            "cli    \t\n" //关闭中断
            "popl %0 \t\n"//把当前栈顶弹出到eflags为地址的内存中        
            : "=m"(*flags)
            :
            : "memory"
          );
}
static inline void hal_restore_flags_sti(cpuflg_t* flags)
{
  __asm__ __volatile__(
              "pushl %0 \t\n"//把flags为地址处的值寄存器压入当前栈顶
              "popfl \t\n"   //把当前栈顶弹出到eflags寄存器中
              :
              : "m"(*flags)
              : "memory"
              );
}
void foo()
{
  hal_save_flags_cli();
  //操作数据第一步……
  hal_restore_flags_sti();
}
void bar()
{
  hal_save_flags_cli();
  foo();
  //操作数据第二步……
  hal_restore_flags_sti();
}
  • pushfl 指令把 eflags 寄存器压入当前栈顶
  • popfl 把当前栈顶的数据弹出到 eflags 寄存器中


方法三、自旋锁


当CPU变为多核心或者板卡上具有多个CPU,此时控制中断只能控制本地的CPU,无法控制其他CPU的中断。因此,上述通过开关中断的方式便不起作用了。


自旋锁定义


当一个线程尝试去获取某一把锁的时候,如果这个锁此时已经被别人获取(占用),那么此线程就无法获取到这把锁,该线程将会等待,间隔一段时间后会再次尝试获取。这种采用循环加锁 -> 等待的机制被称为自旋锁(spinlock)。


自旋锁原理


首先读取锁变量,判断其值是否已经加锁,如果未加锁则执行加锁,然后返回,表示加锁成功;如果已经加锁了,就要返回第一步继续执行后续步骤,因而得名自旋锁。自旋锁示意图如下:



37f2c835cb41459290ac364c7b6165fe.png

必须保证读取锁变量和判断并加锁的操作是原子执行的。否则,CPU0 在读取了锁变量之后,CPU1 读取锁变量判断未加锁执行加锁,然后 CPU0 也判断未加锁执行加锁,这时就会发现两个 CPU 都加锁成功,因此这个算法出错了。


这里可以使用原子交换指令去解决这个问题:

如果锁变量没加锁(值为0),可以用1跟它原子交换,从而实现读取+判断+加锁的原子操作;如果锁变量已加锁(值为1),用1跟它交换,相当于什么也没做,表示锁已被占用了只能进行自旋(重复判断锁变量是否为0)等待锁变量解锁。

//自旋锁结构
typedef struct
{
  volatile u32_t lock;//volatile可以防止编译器优化,保证其它代码始终从内存加载lock变量的值 
} spinlock_t;
//锁初始化函数
static inline void x86_spin_lock_init(spinlock_t * lock)
{
  lock->lock = 0;//锁值初始化为0是未加锁状态
}
//加锁函数
static inline void x86_spin_lock_disable_irq(spinlock_t * lock,cpuflg_t* flags)
{
  __asm__ __volatile__(
    "pushfq                 \n\t"
    "cli                    \n\t"
    "popq %0                \n\t"
    "1:         \n\t"
    "lock; xchg  %1, %2 \n\t" //把值为1的寄存器和lock内存中的值进行交换
    "cmpl   $0,%1       \n\t" //用0和交换回来的值进行比较
    "jnz    2f      \n\t"  //不等于0则跳转后面2标号处运行
    "jmp    3f      \n"   //若等于0则跳转后面3标号处返回
    "2:         \n\t"
    "cmpl   $0,%2       \n\t" //用0和lock内存中的值进行比较
    "jne    2b      \n\t" //若不等于0则跳转到前面2标号处运行继续比较
    "jmp    1b      \n\t" //若等于0则跳转到前面1标号处运行,交换并加锁
    "3:     \n"     
     :"=m"(*flags)
    : "r"(1), "m"(*lock));
}
//解锁函数
static inline void x86_spin_unlock_enabled_irq(spinlock_t* lock,cpuflg_t* flags)
{
   __asm__ __volatile__(
    "movl   $0, %0\n\t"
    "pushq %1 \n\t"
    "popfq \n\t"
    :
    : "m"(*lock), "m"(*flags));
}

%1 对应 “r”(1),表示由编译器自动分配一个通用寄存器,并填入值 1,例如 mov eax,1

%2 对应"m"(*lock),表示 lock 是内存地址

把 1 和内存中的值进行交换,若内存中是 1,则不会影响;因为本身写入就是 1,若内存中是 0,一交换,内存中就变成了 1,即加锁成功。


流程说明:


分配一个寄存器r值为1,交换r和lock的值;

判断换回来的r值是否为0,是表示加锁成功,可以直接返回,否则继续执行;

判断lock的值是否为0(为0说明已经解锁),是则重新进行交换加锁的逻辑,否则进行不断自旋判断。


方法四、信号量


原子锁,原子操作,这两种方式不适用于对时间有要求的操作。这时候就需要使用信号量进行同步操作。


信号量既能对资源数据进行保护(同一时刻只有一个代码执行流访问),又能在资源无法满足的情况下,让 CPU 可以执行其它任务。


信号量的三个关键词:


  • 等待(数据被锁住了)
  • 互斥(数据释放,各个CPU的进程进行抢夺)
  • 唤醒(重新激活等待的代码执行流)


信号量的实现


信号量的数据结构,至少需要一个变量来表示互斥,比如大于 0 则代码执行流可以继续运行,等于 0 则让代码执行流进入等待状态。还需要一个等待链,用于保存等待的代码执行流。


#define SEM_FLG_MUTEX 0
#define SEM_FLG_MULTI 1
#define SEM_MUTEX_ONE_LOCK 1
#define SEM_MULTI_LOCK 0
//等待链数据结构,用于挂载等待代码执行流(线程)的结构,里面有用于挂载代码执行流的链表和计数器变量,这里我们先不深入研究这个数据结构。
typedef struct s_KWLST
{   
  spinlock_t wl_lock;
  uint_t   wl_tdnr;
  list_h_t wl_list;
}kwlst_t;
//信号量数据结构
typedef struct s_SEM
{
  spinlock_t sem_lock;//维护sem_t自身数据的自旋锁
  uint_t sem_flg;//信号量相关的标志
  sint_t sem_count;//信号量计数值
  kwlst_t sem_waitlst;//用于挂载等待代码执行流(线程)结构
}sem_t;

信号量的使用步骤


1. 信号量初始化


sem_count 初始化为 1,sem_waitlst 等待链初始化为空。


2. 获取信号量


首先对用于保护信号量自身的自旋锁 sem_lock 进行加锁

检查信号值 sem_count 是否不小于1,如果大于等于1则执行“减 1”操作。

上步中检查 sem_count 如果小于 0,就让进程进入等待状态并且将其挂入 sem_waitlst中,然后调度其它进程运行。否则表示获取信号量成功。

最后对自旋锁sem_lock 进行解锁。


3. 代码执行流开始执行相关操作


4. 释放信号量


首先对用于保护信号量自身的自旋锁 sem_lock 进行加锁。

对信号值 sem_count 执行“加 1”操作,并检查其值是否大于 0。

上步中检查 sem_count 值如果大于 0,就执行唤醒 sem_waitlst 中进程的操作,并且需要调度进程时就执行进程调度操作,不管 sem_count 是否大于 0(通常会大于 0)都标记信号量释放成功。

最后对自旋锁 sem_lock 进行解锁。


代码实现

//获取信号量
void krlsem_down(sem_t* sem)
{
    cpuflg_t cpufg;
start_step:    
    krlspinlock_cli(&sem->sem_lock,&cpufg);
    if(sem->sem_count<1)
    {//如果信号量值小于1,则让代码执行流(线程)睡眠
        krlwlst_wait(&sem->sem_waitlst);
        krlspinunlock_sti(&sem->sem_lock,&cpufg);
        krlschedul();//切换代码执行流,下次恢复执行时依然从下一行开始执行,所以要goto开始处重新获取信号量
        goto start_step; 
    }
    sem->sem_count--;//信号量值减1,表示成功获取信号量
    krlspinunlock_sti(&sem->sem_lock,&cpufg);
    return;
}
//释放信号量
void krlsem_up(sem_t* sem)
{
    cpuflg_t cpufg;
    krlspinlock_cli(&sem->sem_lock,&cpufg);
    sem->sem_count++;//释放信号量
    if(sem->sem_count<1)
    {//如果小于1,则说数据结构出错了,挂起系统
        krlspinunlock_sti(&sem->sem_lock,&cpufg);
        hal_sysdie("sem up err");
    }
    //唤醒该信号量上所有等待的代码执行流(线程)
    krlwlst_allup(&sem->sem_waitlst);
    krlspinunlock_sti(&sem->sem_lock,&cpufg);
    krlsched_set_schedflgs();
    return;
}


相关文章
|
5月前
|
分布式数据库 数据库
数据同步并发控制与数据一致性
数据同步并发控制与数据一致性
78 3
|
4月前
|
关系型数据库 数据管理 数据库
数据管理DMS操作报错合集之在数据同步时遇到报错,该如何排查
数据管理DMS(Data Management Service)是阿里云提供的数据库管理和运维服务,它支持多种数据库类型,包括RDS、PolarDB、MongoDB等。在使用DMS进行数据库操作时,可能会遇到各种报错情况。以下是一些常见的DMS操作报错及其可能的原因与解决措施的合集。
108 2
|
3月前
|
SQL 数据管理 关系型数据库
数据管理DMS使用问题之DTs同实例同库前的表数据同步,该如何操作
阿里云数据管理DMS提供了全面的数据管理、数据库运维、数据安全、数据迁移与同步等功能,助力企业高效、安全地进行数据库管理和运维工作。以下是DMS产品使用合集的详细介绍。
|
2月前
|
安全 C# 开发者
【C# 多线程编程陷阱揭秘】:小心!那些让你的程序瞬间崩溃的多线程数据同步异常问题,看完这篇你就能轻松应对!
【8月更文挑战第18天】多线程编程对现代软件开发至关重要,特别是在追求高性能和响应性方面。然而,它也带来了数据同步异常等挑战。本文通过一个简单的计数器示例展示了当多个线程无序地访问共享资源时可能出现的问题,并介绍了如何使用 `lock` 语句来确保线程安全。此外,还提到了其他同步工具如 `Monitor` 和 `Semaphore`,帮助开发者实现更高效的数据同步策略,以达到既保证数据一致性又维持良好性能的目标。
40 0
|
4月前
|
SQL DataWorks 数据管理
DataWorks操作报错合集之数据同步时遇到资源包报错,该怎么办
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
23 1
|
4月前
|
分布式计算 DataWorks 关系型数据库
DataWorks操作报错合集之离线同步任务中,把表数据同步到POLARDB,显示所有数据都是脏数据,报错信息:ERROR JobContainer - 运行scheduler 模式[local]出错.是什么原因
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
4月前
|
分布式计算 DataWorks 关系型数据库
MaxCompute产品使用合集之DataWorks中使用Lindorm冷数据同步至MaxCompute,该如何操作
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
NoSQL MongoDB 数据库
实时计算 Flink版操作报错之在使用Flink CDC进行数据同步时遇到了全量同步不完全的问题,同时有任务偶尔报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
Java 关系型数据库 流计算
实时计算 Flink版操作报错合集之配置cats进行从MySQL到StarRocks的数据同步任务时遇到报错,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
344 0

热门文章

最新文章