可epoll队列

简介: 可epoll队列.pdf 什么是可epoll队列? 就可以使用epoll来监控队列中是否有数据的队列,当然也支持select和poll。 应用场景 一个线程,需要将队列(共享内存队列或普通队列均可)中的数据取出来,然后通过网络发送出去。

img_e25d4fb2f8de1caf41a735ec53088516.png可epoll队列.pdf

什么是可epoll队列?

就可以使用epoll来监控队列中是否有数据的队列,当然也支持selectpoll

应用场景

一个线程,需要将队列(共享内存队列或普通队列均可)中的数据取出来,然后通过网络发送出去。如果没有可epoll队列,这个问题处理起来就比较麻烦。

代码实现

实现基于pipe,但pipe可能会产生毛刺。新的内存(2.6.22)引入了eventfd(相关的还有timerfdsignalfd),基于它的实现,不会有毛刺。

 

 

/** 可以放入Epoll监控的队列

  * RawQueueClass为原始队列类名,如util::CArrayQueue

  * 为线程安全类

  */

template class RawQueueClass>

class CEpollableQueue: public CEpollable

{

    typedef typename RawQueueClass::_DataType DataType;

    

public:

    /** 构造一个可Epoll的队列,注意只可监控读事件,也就是队列中是否有数据

      * @queue_max: 队列最大可容纳的元素个数

      * @exception: 如果出错,则抛出CSyscallException异常

      */

    CEpollableQueue(uint32_t queue_max)

        :_raw_queue(queue_max)

        ,_push_waiter_number(0)

    {

        if (-1 == pipe(_pipefd)) throw sys::CSyscallException(errno, __FILE__, __LINE__);

        set_fd(_pipefd[0]);

    }

    

    ~CEpollableQueue()

    {

        close();

    }

 

    /** 关闭队列 */

    virtual void close()

    {

        sys::LockHelper<:clock> lock_helper(_lock);

        if (_pipefd[0] != -1)

        {     

            // 让CEpollable来关闭_pipefd[0],在CEpollable::close()中将会调用

            // before_close,以保持语义总是相同的

            CEpollable::close();

            //close_fd(_pipefd[0]);

            _pipefd[0] = -1;            

        }

        if (_pipefd[1] != -1)

        {        

            close_fd(_pipefd[1]);

            _pipefd[1] = -1;

        }

    }

 

    /** 判断队列是否已满 */

    bool is_full() const 

{

        sys::LockHelper<:clock> lock_helper(_lock);

        return _raw_queue.is_full();

    }

    

    /** 判断队列是否为空 */

    bool is_empty() const 

{

        sys::LockHelper<:clock> lock_helper(_lock);

        return _raw_queue.is_empty();

    }

 

    /***

      * 取队首元素

      * @elem: 存储取到的队首元素

      * @return: 如果队列为空,则返回false,否则返回true

      */

    bool front(DataType& elem) const 

{

        sys::LockHelper<:clock> lock_helper(_lock);

        if (_raw_queue.is_empty()) return false;

 

        elem = _raw_queue.front();

        return true;

    }

    

/***

      * 弹出队首元素

      * @elem: 存储弹出的队首元素

      * @return: 如果队列为空,则返回false,否则取到元素并返回true

      * @exception: 如果出错,则抛出CSyscallException异常

      */

    bool pop_front(DataType& elem) 

{

        sys::LockHelper<:clock> lock_helper(_lock);

        return do_pop_front(elem);

    }

 

    void pop_front()

    {

        DataType elem;

        (void)pop_front(elem);

    }

 

    /***

      * 从队首依次弹出多个元素

      * @elem_array: 存储弹出的队首元素数组

      * @array_size: 输入和输出参数,存储实际弹出的元素个数

      * @exception: 如果出错,则抛出CSyscallException异常

      */

    void pop_front(DataType* elem_array, uint32_t& array_size)

    {

        uint32_t i = 0;

        sys::LockHelper<:clock> lock_helper(_lock);

 

        for (;;)

        {            

            if (!do_pop_front(elem_array[i])) break;

            if (++i == array_size) break;

        }

 

        array_size = i;

    }

    

/***

      * 向队尾插入一元素

      * @elem: 待插入到队尾的元素

      * @millisecond: 如果队列满,等待队列非满的毫秒数,如果为0则不等待,直接返回false

      * @return: 如果队列已经满,则返回false,否则插入成功并返回true

      * @exception: 如果出错,则抛出CSyscallException异常

      */

    bool push_back(DataType elem, uint32_t millisecond=0) 

{

        sys::LockHelper<:clock> lock_helper(_lock);

        while (_raw_queue.is_full())

        {

            // 立即返回

            if (0 == millisecond) return false;

 

            // 超时等待

            util::CountHelpervolatile int32_t> ch(_push_waiter_number);            

            if (!_event.timed_wait(_lock, millisecond)) 

            {

                return false;

            }

        }        

 

        char c = 'x';

        _raw_queue.push_back(elem);

        // write还有相当于signal的作用        

        while (-1 == write(_pipefd[1], &c, sizeof(c)))

        {

            if (errno != EINTR)

                throw sys::CSyscallException(errno, __FILE__, __LINE__);

        }

 

        return true;

    }

 

    /** 得到队列中当前存储的元素个数 */

    uint32_t size() const 

        sys::LockHelper<:clock> lock_helper(_lock);

        return _raw_queue.size(); 

}

 

private:

    bool do_pop_front(DataType& elem)

    {            

        // 没有数据,也不阻塞,如果需要阻塞,应当使用事件队列CEventQueue

        if (_raw_queue.is_empty()) return false;

 

        char c;

        // read还有相当于CEvent::wait的作用        

        while (-1 == read(_pipefd[0], &c, sizeof(c)))

        {

            if (errno != EINTR)

                throw sys::CSyscallException(errno, __FILE__, __LINE__);

        }

 

        elem = _raw_queue.pop_front();

        // 如果有等待着,则唤醒其中一个

        if (_push_waiter_number > 0) _event.signal();

        

        return true;

    }

 

private:

    int _pipefd[2]; /** 管道句柄 */    

    sys::CEvent _event;

    mutable sys::CLock _lock;    

    RawQueueClass _raw_queue; /** 普通队列实例 */

    volatile int32_t _push_waiter_number; /** 等待队列非满的线程个数 */

};

 

相关文章
浅谈select,poll和epoll的区别
云栖号资讯:【点击查看更多行业资讯】在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来! select,poll和epoll其实都是操作系统中IO多路复用实现的方法。 select select方法本质其实就是维护了一个文件描述符(fd)数组,以此为基础,实现IO多路复用的功能。
浅谈select,poll和epoll的区别
|
2月前
|
消息中间件 Kubernetes NoSQL
多路复用I/O-epoll
多路复用I/O-epoll
38 0
|
2月前
|
前端开发 Java 索引
pollLast() 和poll啥区别
pollLast() 和poll啥区别
|
2月前
|
Unix
poll 函数 I/O 多路复用的技术
【4月更文挑战第14天】poll 是另一种在各种 UNIX 系统上被广泛支持的 I/O 多路复用技术,虽然名声没有 select 那么响,能力一点不比 select 差,而且因为可以突破 select 文件描述符的个数限制,在高并发的场景下尤其占优势。
|
2月前
|
Linux
|
2月前
|
缓存 Linux NoSQL
epoll与reactor浅析
epoll与reactor浅析
|
Linux Windows
poll&&epoll实现分析(二)——epoll实现
Epoll实现分析——作者:lvyilong316 通过上一章分析,poll运行效率的两个瓶颈已经找出,现在的问题是怎么改进。首先,如果要监听1000个fd,每次poll都要把1000个fd 拷入内核,太不科学了,内核干嘛不自己保存已经拷入的fd呢?答对了,epoll就是自己保存拷入的fd,它的API就已经说明了这一点——不是 epoll_wait的时候才传入fd,而是通过epoll_ctl把所有fd传入内核再一起"wait",这就省掉了不必要的重复拷贝。
1016 0
|
Linux 调度 网络协议
poll&&epoll实现分析(一)——poll实现
0.等待队列 在Linux内核中等待队列有很多用途,可用于中断处理、进程同步及定时。我们在这里只说,进程经常必须等待某些事件的发生。等待队列实现了在事件上的条件等待: 希望等待特定事件的进程把自己放进合适的等待队列,并放弃控制全。
1045 0
|
存储 Linux 程序员
epoll的使用实例
  在网络编程中通常需要处理很多个连接,可以用select和poll来处理多个连接。但是select都受进程能打开的最大文件描述符个数的限制。并且select和poll效率会随着监听fd的数目增多而下降。
1236 0