ZMQ之多线程编程

简介: ZMQ之多线程编程

ZMQ之多线程编程


使用ZMQ进行多线程编程(MT编程)将会是一种享受。在多线程中使用ZMQ套接字时,你不需要考虑额外的东西,让它们自如地运作就好。

       使用ZMQ进行多线程编程时,不需要考虑互斥、锁、或其他并发程序中要考虑的因素,你唯一要关心的仅仅是线程之间的消息。

       什么叫“完美”的多线程编程,指的是代码易写易读,可以跨系统、跨语言地使用同一种技术,能够在任意颗核心的计算机上运行,没有状态,没有速度的瓶颈。

       如果你有多年的多线程编程经验,知道如何使用锁、信号灯、临界区等机制来使代码运行得正确(尚未考虑快速),那你可能会很沮丧,因为ZMQ将改变这一切。三十多年来,并发式应用程序开发所总结的经验是:不要共享状态。这就好比两个醉汉想要分享一杯啤酒,如果他们不是铁哥们儿,那他们很快就会打起来。当有更多的醉汉加入时,情况就会更糟。多线程编程有时就像醉汉抢夺啤酒那样糟糕。

       进行多线程编程往往是痛苦的,当程序因为压力过大而崩溃时,你会不知所然。有人写过一篇《多线程代码中的11个错误易发点》的文章,在大公司中广为流传,列举其中的几项:没有进行同步、错误的粒度、读写分离、无锁排序、锁传递、优先级冲突等。

       假设某一天的下午三点,当证券市场正交易得如火如荼的时候,突然之间,应用程序因为锁的问题崩溃了,那将会是何等的场景?所以,作为程序员的我们,为解决那些复杂的多线程问题,只能用上更复杂的编程机制。

       有人曾这样比喻,那些多线程程序原本应作为大型公司的核心支柱,但往往又最容易出错;那些想要通过网络不断进行延伸的产品,最后总以失败告终。

       如何用ZMQ进行多线程编程,以下是一些规则:

               1、不要在不同的线程之间访问同一份数据,如果要用到传统编程中的互斥机制,那就有违ZMQ的思想了。唯一的例外是ZMQ上下文对象,它是线程安全的。

               2、必须为进程创建ZMQ上下文,并将其传递给所有你需要使用inproc协议进行通信的线程;

               3、你可以将线程作为单独的任务来对待,使用自己的上下文,但是这些线程之间就不能使用inproc协议进行通信了。这样做的好处是可以在日后方便地将程序拆分为不同的进程来运行。

               4、不要在不同的线程之间传递套接字对象,这些对象不是线程安全的。从技术上来说,你是可以这样做的,但是会用到互斥和锁的机制,这会让你的应用程序变得缓慢和脆弱。唯一合理的情形是,在某些语言的ZMQ类库内部,需要使用垃圾回收机制,这时可能会进行套接字对象的传递。

       当你需要在应用程序中使用两个装置时,可能会将套接字对象从一个线程传递给另一个线程,这样做一开始可能会成功,但最后一定会随机地发生错误。所以说,应在同一个线程中打开和关闭套接字。

       如果你能遵循上面的规则,就会发现多线程程序可以很容易地拆分成多个进程。程序逻辑可以在线程、进程、或是计算机中运行,根据你的需求进行部署即可。

       ZMQ使用的是系统原生的线程机制,而不是某种“绿色线程”。这样做的好处是你不需要学习新的多线程编程API,而且可以和目标操作系统进行很好的结合。你可以使用类似英特尔的ThreadChecker工具来查看线程工作的情况。缺点在于,如果程序创建了太多的线程(如上千个),则可能导致操作系统负载过高。

       下面我们举一个实例,让原来的Hello World服务变得更为强大。原来的服务是单线程的,如果请求较少,自然没有问题。ZMQ的线程可以在一个核心上高速地运行,执行大量的工作。但是,如果有一万次请求同时发送过来会怎么样?因此,现实环境中,我们会启动多个worker线程,他们会尽可能地接收客户端请求,处理并返回应答。

       当然,我们可以使用启动多个worker进程的方式来实现,但是启动一个进程总比启动多个进程要来的方便且易于管理。而且,作为线程启动的worker,所占用的带宽会比较少,延迟也会较低。

       以下是多线程版的Hello World服务:

       mtserver: Multithreaded service in C

#include "zhelpers.h"
#include <pthread.h>
static void *
worker_routine (void *context) {
    //  连接至代理的套接字
    void *receiver = zmq_socket (context, ZMQ_REP);
    zmq_connect (receiver, "inproc://workers");
    while (1) {
        char *string = s_recv (receiver);
        printf ("Received request: [%s]\n", string);
        free (string);
        //  工作
        sleep (1);
        //  返回应答
        s_send (receiver, "World");
    }
    zmq_close (receiver);
    return NULL;
}
int main (void)
{
    void *context = zmq_init (1);
    //  用于和client进行通信的套接字
    void *clients = zmq_socket (context, ZMQ_ROUTER);
    zmq_bind (clients, "tcp://*:5555");
    //  用于和worker进行通信的套接字
    void *workers = zmq_socket (context, ZMQ_DEALER);
    zmq_bind (workers, "inproc://workers");
    //  启动一个worker池
    int thread_nbr;
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, context);
    }
    //  启动队列装置
    zmq_device (ZMQ_QUEUE, clients, workers);
    //  程序不会运行到这里,但仍进行清理工作
    zmq_close (clients);
    zmq_close (workers);
    zmq_term (context);
    return 0;
}

所有的代码应该都已经很熟悉了:

               1、服务端启动一组worker线程,每个worker创建一个REP套接字,并处理收到的请求。worker线程就像是一个单线程的服务,唯一的区别是使用了inproc而非tcp协议,以及绑定-连接的方向调换了。

               2、服务端创建ROUTER套接字用以和client通信,因此提供了一个TCP协议的外部接口。

               3、服务端创建DEALER套接字用以和worker通信,使用了内部接口(inproc)。

               4、服务端启动了QUEUE内部装置,连接两个端点上的套接字。QUEUE装置会将收到的请求分发给连接上的worker,并将应答路由给请求方。

       需要注意的是,在某些编程语言中,创建线程并不是特别方便,POSIX提供的类库是pthreads,但Windows中就需要使用不同的API了。我们会在第三章中讲述如何包装一个多线程编程的API。

       示例中的“工作”仅仅是1秒钟的停留,我们可以在worker中进行任意的操作,包括与其他节点进行通信。消息的流向是这样的:REQ-ROUTER-queue-DEALER-REP。

线程间的信号传输

       当你刚开始使用ZMQ进行多线程编程时,你可能会问:要如何协调两个线程的工作呢?可能会想要使用sleep()这样的方法,或者使用诸如信号、互斥等机制。事实上,你唯一要用的就是ZMQ本身。回忆一下那个醉汉抢啤酒的例子吧。

       下面的示例演示了三个线程之间需要如何进行同步:

我们使用PAIR套接字和inproc协议。

       mtrelay: Multithreaded relay in C

#include "zhelpers.h"
#include <pthread.h>
static void *
step1 (void *context) {
    //  连接至步骤2,告知我已就绪
    void *xmitter = zmq_socket (context, ZMQ_PAIR);
    zmq_connect (xmitter, "inproc://step2");
    printf ("步骤1就绪,正在通知步骤2……\n");
    s_send (xmitter, "READY");
    zmq_close (xmitter);
    return NULL;
}
static void *
step2 (void *context) {
    //  启动步骤1前先绑定至inproc套接字
    void *receiver = zmq_socket (context, ZMQ_PAIR);
    zmq_bind (receiver, "inproc://step2");
    pthread_t thread;
    pthread_create (&thread, NULL, step1, context);
    //  等待信号
    char *string = s_recv (receiver);
    free (string);
    zmq_close (receiver);
    //  连接至步骤3,告知我已就绪
    void *xmitter = zmq_socket (context, ZMQ_PAIR);
    zmq_connect (xmitter, "inproc://step3");
    printf ("步骤2就绪,正在通知步骤3……\n");
    s_send (xmitter, "READY");
    zmq_close (xmitter);
    return NULL;
}
int main (void)
{
    void *context = zmq_init (1);
    //  启动步骤2前先绑定至inproc套接字
    void *receiver = zmq_socket (context, ZMQ_PAIR);
    zmq_bind (receiver, "inproc://step3");
    pthread_t thread;
    pthread_create (&thread, NULL, step2, context);
    //  等待信号
    char *string = s_recv (receiver);
    free (string);
    zmq_close (receiver);
    printf ("测试成功!\n");
    zmq_term (context);
    return 0;
}

这是一个ZMQ多线程编程的典型示例:

               1、两个线程通过inproc协议进行通信,使用同一个上下文。

               2、父线程创建一个套接字,绑定至inproc://端点,然后再启动子线程,将上下文对象传递给它。

               3、子线程创建第二个套接字,连接至inproc://端点,然后发送已就绪信号给父线程。

       需要注意的是,这段代码无法扩展到多个进程之间的协调。如果你使用inproc协议,只能建立结构非常紧密的应用程序。在延迟时间必须严格控制的情况下可以使用这种方法。对其他应用程序来说,每个线程使用同一个上下文,协议选用ipc或tcp。然后,你就可以自由地将应用程序拆分为多个进程甚至是多台计算机了。

       这是我们第一次使用PAIR套接字。为什么要使用PAIR?其他类型的套接字也可以使用,但都有一些缺点会影响到线程间的通信:

               1、你可以让信号发送方使用PUSH,接收方使用PULL,这看上去可能可以,但是需要注意的是,PUSH套接字发送消息时会进行负载均衡,如果你不小心开启了两个接收方,就会“丢失”一半的信号。而PAIR套接字建立的是一对一的连接,具有排他性。

               2、可以让发送方使用DEALER,接收方使用ROUTER。但是,ROUTER套接字会在消息的外层包裹一个来源地址,这样一来原本零字节的信号就可能要成为一个多段消息了。如果你不在乎这个问题,并且不会重复读取那个套接字,自然可以使用这种方法。但是,如果你想要使用这个套接字接收真正的数据,你就会发现ROUTER提供的消息是错误的。至于DEALER套接字,它同样有负载均衡的机制,和PUSH套接字有相同的风险。

               3、可以让发送方使用PUB,接收方使用SUB。一来消息可以照原样发送,二来PUB套接字不会进行负载均衡。但是,你需要对SUB套接字设置一个空的订阅信息(用以接收所有消息);而且,如果SUB套接字没有及时和PUB建立连接,消息很有可能会丢失。

       综上,使用PAIR套接字进行线程间的协调是最合适的。

相关文章
|
3月前
|
安全 API
muduo源码剖析之EventLoop事件循环类
EventLoop.cc就相当于一个reactor,多线程之间的函数调用(用eventfd唤醒),epoll处理,超时队列处理,对channel的处理。运行loop的进程被称为IO线程,EventLoop提供了一些API确保相应函数在IO线程中调用,确保没有用互斥量保护的变量只能在IO线程中使用,也封装了超时队列的基本操作。
38 0
|
8月前
|
Linux
网络编程之阻塞与非阻塞的理解
网络编程之阻塞与非阻塞的理解
71 0
|
NoSQL 搜索推荐 网络协议
Java NIO、BIO、 AIO 与 同步、阻塞、非阻塞、异步IO 简析
我相信大部分人看到这些名词,都是一头雾水的,如果你去搜索引擎搜索,那么恭喜你,你又会被各种文章中的高大上的名词搞得云里雾里。那么,我们应该怎么理清这么名词之间的关系呢? 所谓 同步/异步/阻塞/非阻塞 IO ,是指操作系统中的对 IO 处理的不同方法,而 Java 对这些不同操作方法做了一些包装,由此有了 BIO / NIO / AIO 几种操作接口。 我不想复制一些高大上的概念,只是想尽量好好说话,说清楚他们之间的关系。 需求 有 A、B、C、D 四个线程可以生产文件,假设他们的返回的文件是一样的,对应我们的服务端 有 E、F、G、H 四个线程在随机时间向服务端上传一个文本,并且要求
|
Java Windows 容器
IOCP详解
IOCP(I/O Completion Port,I/O完成端口)是性能最好的一种I/O模型。它是应用程序使用线程池处理异步I/O请求的一种机制。在处理多个并发的异步I/O请求时,以往的模型都是在接收请求是创建一个线程来应答请求。这样就有很多的线程并行地运行在系统中。而这些线程都是可运行的,Windows内核花费大量的时间在进行线程的上下文切换,并没有多少时间花在线程运行上。再加上创建新线程的开销比较大,所以造成了效率的低下。
245 0
IOCP详解
|
Java Unix Linux
关于Socket 多线程 的一篇好文章
http://www.kegel.com/c10k.html#topIt's time for web servers to handle ten thousand clients simultaneously, don't you think? After all, the web is a big place now.
1434 0
|
前端开发 算法 C++
IOCP编程小结(中)
上一篇主要谈了一些基本理念,本篇将谈谈我个人总结的一些IOCP编程技巧。   网络游戏前端服务器的需求和设计   首先介绍一下这个服务器的技术背景。在分布式网络游戏服务器中,前端连接服务器是一种很常见的设计。
1080 0
|
消息中间件 算法 安全