DIOCP开源项目-Delphi高性能无锁队列(lock-free)

简介: 最近想在DIOCP中加入任务调度线程,DIOCP的工作线程作为生产者(producer)将接受到的数据对象,投递到任务调度线程中,然后统一进行分配。然而这一切都需要一个队列, 这几天都在关注无锁队列。   [队列] 首先是一个队列,简单的队列就是,生产者把数据压入队列(push), 消费者通过队列Pop出数据进行处理。

最近想在DIOCP中加入任务调度线程,DIOCP的工作线程作为生产者(producer)将接受到的数据对象,投递到任务调度线程中,然后统一进行分配。然而这一切都需要一个队列, 这几天都在关注无锁队列。

 

[队列]

首先是一个队列,简单的队列就是,生产者把数据压入队列(push), 消费者通过队列Pop出数据进行处理。

简单的队列就是提供Push,和Pop函数。

我们用一个链表来存储数据。Head ->data01->data02...data_n->Tail, 每个数据块的结构如下

type
  PVarQueueBlock = ^TVarQueueBlock;
  TVarQueueBlock = packed record
    value:Variant;
    next:PVarQueueBlock;
  end;

 

1.在进行Push压入数据时压入将Tail.next指向新压入的数据块, 然后用新的数据块做Tail

procedure TSimpleQueue.pushQueue(pvData: PVarQueueBlock);
begin
  if FHead = nil then
  begin
    FHead = pvData;
    FTail := FHead;
  end else
  begin
    FTail.next := pvData;
    FTail := pvData;
  end;
  Inc(FCount);
end;

 

 

2.在进行Pop数据时把Head数据块取出,然后用Head数据块指向的下一块当作Head.

function TSimpleQueue.popQueue: PVarQueueBlock;
var
  lvTemp, lvRet:PVarQueueBlock;
begin
  lvTemp := FHead;
  if (lvTemp = nil) then
  begin        //没有任何可以Pop出的值
    Result := nil;
    exit;
  end;

  //
  FHead := FHead.next;

  Dec(FCount);
  Result :=lvTemp;
end;

 

上面就是简单的队列

 

[无锁队列]

上面的实现的队列在多线程情况下是不安全的。如果要在多并发下队列要进行加锁,在push和pop时加锁也是一种办法。可以直接用临界就可以了,但是我们要做的是无锁队列

 

首先记住多并发设计规则:决不要假设任何代码会连续执行

 

上面的push操作

FTail.next := pvData;
FTail := pvData;

也许执行了FTail.next:=pvData后,会被另外的线程抢走,然后FTail进行了新的赋值,这样在进行FTail := pvData;这样整个数据链条就会被破坏。

如果这两行我们能一次行完成,这样就可以实现无锁操作了,这样我们需要引入原子操作.Interlocked中的函数。

 

说无锁其实不太确切,只是锁的粒度小了。我们是使用api的InterlockedCompareExchange函数来实现的。

查一下MSDN

http://msdn.microsoft.com/en-us/library/windows/desktop/ms683560(v=vs.85).aspx

LONG __cdecl InterlockedCompareExchange(
  _Inout_  LONG volatile *Destination,
  _In_     LONG Exchange,
  _In_     LONG Comparand
);

Parameters

Destination [in, out]

A pointer to the destination value.

Exchange [in]

The exchange value.

Comparand [in]

The value to compare to Destination.

Return value

The function returns the initial value of the Destination parameter.

 

大概解释一下。这个函数是比较后进行交换。第一个参数是要存放目的的数据,第二个是交换数据,第三个是比较数据(与第一个比较), 如果交换返回的数据和第三个参数一样。

 

 

这样就可以在push和pop一步完成。

这里贴出push的pop操作

procedure TVarQueue.pushQueue(pvData: PVarQueueBlock);
var
  lvTemp:PVarQueueBlock;
  lvPointer:Pointer;
begin
  while True do
  begin
     lvTemp := FTail;
     while lvTemp.next <> nil do lvTemp := lvTemp.next;
     if InterlockedCompareExchangePointer(Pointer(lvTemp.next), Pointer(pvData), nil) = nil then
     begin
       break;
     end;
  end;
  FTail := pvData;
  Inc(FCount);
end;
function TVarQueue.popQueue: PVarQueueBlock;
var
  lvTemp, lvRet:PVarQueueBlock;
  lvPointer:Pointer;
begin
  ///为了方便 队列中始终保留一个FHead数据块
  ///  也就是说FHead指向的下一个数据块才是第一个数据块
  ///

  while True do
  begin
    lvTemp := FHead;
    if (lvTemp = nil) or (lvTemp.next = nil) then
    begin        //没有任何可以Pop出的值
      Result := nil;
      exit;
    end;
    if InterlockedCompareExchangePointer(Pointer(FHead), lvTemp.next, lvTemp) = lvTemp then
    begin
      break;
    end;
  end;
  Dec(FCount);
  lvRet := lvTemp.next;
  Result := lvRet;
  lvTemp.next := nil;
  Dispose(lvTemp);
  //返回的是head.next
end;

 

后续我会上传完整的代码到DIOCP项目中。

如有漏洞,敬请指出。欢迎假如DIOCP群讨论

 

 

 

目录
相关文章
|
5月前
|
程序员 C++
malloc与free的内存管理奥秘:技术分享
【8月更文挑战第22天】在软件开发过程中,内存管理是一个至关重要的环节。特别是在使用C或C++这类语言时,程序员需要手动管理内存的分配与释放。malloc和free函数是这一过程中的核心工具。本文将深入探讨malloc如何分配内存,以及free如何知道释放多少内存,帮助你在工作学习中更好地掌握这一技术干货。
121 4
|
8月前
|
算法 调度
FreeRTOS入门教程(互斥锁的概念和函数使用)
FreeRTOS入门教程(互斥锁的概念和函数使用)
403 0
|
Java Linux API
[笔记]c++基础实践《一》std::thread以及多线程相关概念
[笔记]c++基础实践《一》std::thread以及多线程相关概念
|
Java Go 调度
Go语言核心手册-12.sync.Pool
pool在掌握基础用法的同时,需要知道Get和Push方法的实现逻辑,其中最重要的一点,是需要将pool和GMP的调度原理结合起来,其中两者的P的原理其实是一样的,只是对于资源抢占这一块,GMP抢占的是G,pool抢占的是pool数据,对于这块,其实是自己个人的理解,如果理解的不对,还请大家帮忙指出。
183 0
Go语言核心手册-12.sync.Pool
|
安全 Go Android开发
Go语言核心手册-8.sync.WaitGroup
WaitGroup是开箱即用和并发安全的,可以通过它很方便地实现一对多goroutine协作流程,即:一个分发子任务的goroutine,和多个执行子任务的goroutine,共同来完成一个较大的任务。在使用WaitGroup值的时候,我们一定要注意,千万不要让其中的计数器的值小于0,否则就会引发 panic。另外,我们最好用“先统一Add,再并发Done,最后Wait”这种标准方式,来使用WaitGroup值, 尤其不要在调用Wait方法的同时,并发地通过调用Add方法去增加其计数器的值,因为这也有可能引发 panic。
157 0
Go语言核心手册-8.sync.WaitGroup
Go语言核心手册-9.互斥锁
本章的内容不多,主要需要注意互斥锁和读写锁的几条注意事项,读写锁其实就是更细粒度的锁划分,为了能让程序更好并发,上面已经讲述的非常清楚,这里就不再啰嗦。唯一再强调的一点,无论是互斥锁还是读写锁,我们都不要试图去解锁未锁定的锁,因为这样会引发不可恢复的 panic。
121 0
|
消息中间件 Java C++
开源项目推荐:多进程和多线程的高性能消息队列(无锁队列),lock-free queue
开源项目推荐:多进程和多线程的高性能消息队列(无锁队列),lock-free queue
1190 0
|
存储 Rust 自然语言处理
Rust的Future、GO的Goroutine、Linux的Epoll高并发背后的殊途同归
今天我们继续高并发的话题,在上次的博客中我们有提到,Rust的Future机制非常有助于程序员按照更为自然、简洁的逻辑去设计系统,我们必须要知道高并发系统的关键在于立交桥的分流与导流构造而非信号灯的限流。因此把精力放在设计锁、互斥系这些信号系统上是非常事倍功半的。