ZMQ之多线程编程

简介: ZMQ之多线程编程

ZMQ之多线程编程


使用ZMQ进行多线程编程(MT编程)将会是一种享受。在多线程中使用ZMQ套接字时,你不需要考虑额外的东西,让它们自如地运作就好。

       使用ZMQ进行多线程编程时,不需要考虑互斥、锁、或其他并发程序中要考虑的因素,你唯一要关心的仅仅是线程之间的消息。

       什么叫“完美”的多线程编程,指的是代码易写易读,可以跨系统、跨语言地使用同一种技术,能够在任意颗核心的计算机上运行,没有状态,没有速度的瓶颈。

       如果你有多年的多线程编程经验,知道如何使用锁、信号灯、临界区等机制来使代码运行得正确(尚未考虑快速),那你可能会很沮丧,因为ZMQ将改变这一切。三十多年来,并发式应用程序开发所总结的经验是:不要共享状态。这就好比两个醉汉想要分享一杯啤酒,如果他们不是铁哥们儿,那他们很快就会打起来。当有更多的醉汉加入时,情况就会更糟。多线程编程有时就像醉汉抢夺啤酒那样糟糕。

       进行多线程编程往往是痛苦的,当程序因为压力过大而崩溃时,你会不知所然。有人写过一篇《多线程代码中的11个错误易发点》的文章,在大公司中广为流传,列举其中的几项:没有进行同步、错误的粒度、读写分离、无锁排序、锁传递、优先级冲突等。

       假设某一天的下午三点,当证券市场正交易得如火如荼的时候,突然之间,应用程序因为锁的问题崩溃了,那将会是何等的场景?所以,作为程序员的我们,为解决那些复杂的多线程问题,只能用上更复杂的编程机制。

       有人曾这样比喻,那些多线程程序原本应作为大型公司的核心支柱,但往往又最容易出错;那些想要通过网络不断进行延伸的产品,最后总以失败告终。

       如何用ZMQ进行多线程编程,以下是一些规则:

               1、不要在不同的线程之间访问同一份数据,如果要用到传统编程中的互斥机制,那就有违ZMQ的思想了。唯一的例外是ZMQ上下文对象,它是线程安全的。

               2、必须为进程创建ZMQ上下文,并将其传递给所有你需要使用inproc协议进行通信的线程;

               3、你可以将线程作为单独的任务来对待,使用自己的上下文,但是这些线程之间就不能使用inproc协议进行通信了。这样做的好处是可以在日后方便地将程序拆分为不同的进程来运行。

               4、不要在不同的线程之间传递套接字对象,这些对象不是线程安全的。从技术上来说,你是可以这样做的,但是会用到互斥和锁的机制,这会让你的应用程序变得缓慢和脆弱。唯一合理的情形是,在某些语言的ZMQ类库内部,需要使用垃圾回收机制,这时可能会进行套接字对象的传递。

       当你需要在应用程序中使用两个装置时,可能会将套接字对象从一个线程传递给另一个线程,这样做一开始可能会成功,但最后一定会随机地发生错误。所以说,应在同一个线程中打开和关闭套接字。

       如果你能遵循上面的规则,就会发现多线程程序可以很容易地拆分成多个进程。程序逻辑可以在线程、进程、或是计算机中运行,根据你的需求进行部署即可。

       ZMQ使用的是系统原生的线程机制,而不是某种“绿色线程”。这样做的好处是你不需要学习新的多线程编程API,而且可以和目标操作系统进行很好的结合。你可以使用类似英特尔的ThreadChecker工具来查看线程工作的情况。缺点在于,如果程序创建了太多的线程(如上千个),则可能导致操作系统负载过高。

       下面我们举一个实例,让原来的Hello World服务变得更为强大。原来的服务是单线程的,如果请求较少,自然没有问题。ZMQ的线程可以在一个核心上高速地运行,执行大量的工作。但是,如果有一万次请求同时发送过来会怎么样?因此,现实环境中,我们会启动多个worker线程,他们会尽可能地接收客户端请求,处理并返回应答。

       当然,我们可以使用启动多个worker进程的方式来实现,但是启动一个进程总比启动多个进程要来的方便且易于管理。而且,作为线程启动的worker,所占用的带宽会比较少,延迟也会较低。

       以下是多线程版的Hello World服务:

       mtserver: Multithreaded service in C

#include "zhelpers.h"
#include <pthread.h>
static void *
worker_routine (void *context) {
    //  连接至代理的套接字
    void *receiver = zmq_socket (context, ZMQ_REP);
    zmq_connect (receiver, "inproc://workers");
    while (1) {
        char *string = s_recv (receiver);
        printf ("Received request: [%s]\n", string);
        free (string);
        //  工作
        sleep (1);
        //  返回应答
        s_send (receiver, "World");
    }
    zmq_close (receiver);
    return NULL;
}
int main (void)
{
    void *context = zmq_init (1);
    //  用于和client进行通信的套接字
    void *clients = zmq_socket (context, ZMQ_ROUTER);
    zmq_bind (clients, "tcp://*:5555");
    //  用于和worker进行通信的套接字
    void *workers = zmq_socket (context, ZMQ_DEALER);
    zmq_bind (workers, "inproc://workers");
    //  启动一个worker池
    int thread_nbr;
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, context);
    }
    //  启动队列装置
    zmq_device (ZMQ_QUEUE, clients, workers);
    //  程序不会运行到这里,但仍进行清理工作
    zmq_close (clients);
    zmq_close (workers);
    zmq_term (context);
    return 0;
}

所有的代码应该都已经很熟悉了:

               1、服务端启动一组worker线程,每个worker创建一个REP套接字,并处理收到的请求。worker线程就像是一个单线程的服务,唯一的区别是使用了inproc而非tcp协议,以及绑定-连接的方向调换了。

               2、服务端创建ROUTER套接字用以和client通信,因此提供了一个TCP协议的外部接口。

               3、服务端创建DEALER套接字用以和worker通信,使用了内部接口(inproc)。

               4、服务端启动了QUEUE内部装置,连接两个端点上的套接字。QUEUE装置会将收到的请求分发给连接上的worker,并将应答路由给请求方。

       需要注意的是,在某些编程语言中,创建线程并不是特别方便,POSIX提供的类库是pthreads,但Windows中就需要使用不同的API了。我们会在第三章中讲述如何包装一个多线程编程的API。

       示例中的“工作”仅仅是1秒钟的停留,我们可以在worker中进行任意的操作,包括与其他节点进行通信。消息的流向是这样的:REQ-ROUTER-queue-DEALER-REP。

线程间的信号传输

       当你刚开始使用ZMQ进行多线程编程时,你可能会问:要如何协调两个线程的工作呢?可能会想要使用sleep()这样的方法,或者使用诸如信号、互斥等机制。事实上,你唯一要用的就是ZMQ本身。回忆一下那个醉汉抢啤酒的例子吧。

       下面的示例演示了三个线程之间需要如何进行同步:

我们使用PAIR套接字和inproc协议。

       mtrelay: Multithreaded relay in C

#include "zhelpers.h"
#include <pthread.h>
static void *
step1 (void *context) {
    //  连接至步骤2,告知我已就绪
    void *xmitter = zmq_socket (context, ZMQ_PAIR);
    zmq_connect (xmitter, "inproc://step2");
    printf ("步骤1就绪,正在通知步骤2……\n");
    s_send (xmitter, "READY");
    zmq_close (xmitter);
    return NULL;
}
static void *
step2 (void *context) {
    //  启动步骤1前先绑定至inproc套接字
    void *receiver = zmq_socket (context, ZMQ_PAIR);
    zmq_bind (receiver, "inproc://step2");
    pthread_t thread;
    pthread_create (&thread, NULL, step1, context);
    //  等待信号
    char *string = s_recv (receiver);
    free (string);
    zmq_close (receiver);
    //  连接至步骤3,告知我已就绪
    void *xmitter = zmq_socket (context, ZMQ_PAIR);
    zmq_connect (xmitter, "inproc://step3");
    printf ("步骤2就绪,正在通知步骤3……\n");
    s_send (xmitter, "READY");
    zmq_close (xmitter);
    return NULL;
}
int main (void)
{
    void *context = zmq_init (1);
    //  启动步骤2前先绑定至inproc套接字
    void *receiver = zmq_socket (context, ZMQ_PAIR);
    zmq_bind (receiver, "inproc://step3");
    pthread_t thread;
    pthread_create (&thread, NULL, step2, context);
    //  等待信号
    char *string = s_recv (receiver);
    free (string);
    zmq_close (receiver);
    printf ("测试成功!\n");
    zmq_term (context);
    return 0;
}

这是一个ZMQ多线程编程的典型示例:

               1、两个线程通过inproc协议进行通信,使用同一个上下文。

               2、父线程创建一个套接字,绑定至inproc://端点,然后再启动子线程,将上下文对象传递给它。

               3、子线程创建第二个套接字,连接至inproc://端点,然后发送已就绪信号给父线程。

       需要注意的是,这段代码无法扩展到多个进程之间的协调。如果你使用inproc协议,只能建立结构非常紧密的应用程序。在延迟时间必须严格控制的情况下可以使用这种方法。对其他应用程序来说,每个线程使用同一个上下文,协议选用ipc或tcp。然后,你就可以自由地将应用程序拆分为多个进程甚至是多台计算机了。

       这是我们第一次使用PAIR套接字。为什么要使用PAIR?其他类型的套接字也可以使用,但都有一些缺点会影响到线程间的通信:

               1、你可以让信号发送方使用PUSH,接收方使用PULL,这看上去可能可以,但是需要注意的是,PUSH套接字发送消息时会进行负载均衡,如果你不小心开启了两个接收方,就会“丢失”一半的信号。而PAIR套接字建立的是一对一的连接,具有排他性。

               2、可以让发送方使用DEALER,接收方使用ROUTER。但是,ROUTER套接字会在消息的外层包裹一个来源地址,这样一来原本零字节的信号就可能要成为一个多段消息了。如果你不在乎这个问题,并且不会重复读取那个套接字,自然可以使用这种方法。但是,如果你想要使用这个套接字接收真正的数据,你就会发现ROUTER提供的消息是错误的。至于DEALER套接字,它同样有负载均衡的机制,和PUSH套接字有相同的风险。

               3、可以让发送方使用PUB,接收方使用SUB。一来消息可以照原样发送,二来PUB套接字不会进行负载均衡。但是,你需要对SUB套接字设置一个空的订阅信息(用以接收所有消息);而且,如果SUB套接字没有及时和PUB建立连接,消息很有可能会丢失。

       综上,使用PAIR套接字进行线程间的协调是最合适的。

相关文章
|
11天前
|
安全 数据处理 开发者
Python中的多线程编程:从入门到精通
本文将深入探讨Python中的多线程编程,包括其基本原理、应用场景、实现方法以及常见问题和解决方案。通过本文的学习,读者将对Python多线程编程有一个全面的认识,能够在实际项目中灵活运用。
|
3天前
|
安全 程序员 API
|
2天前
|
安全 Java 调度
Java中的多线程编程入门
【10月更文挑战第29天】在Java的世界中,多线程就像是一场精心编排的交响乐。每个线程都是乐团中的一个乐手,他们各自演奏着自己的部分,却又和谐地共同完成整场演出。本文将带你走进Java多线程的世界,让你从零基础到能够编写基本的多线程程序。
8 1
|
6天前
|
缓存 Java 调度
Java中的多线程编程:从基础到实践
【10月更文挑战第24天】 本文旨在为读者提供一个关于Java多线程编程的全面指南。我们将从多线程的基本概念开始,逐步深入到Java中实现多线程的方法,包括继承Thread类、实现Runnable接口以及使用Executor框架。此外,我们还将探讨多线程编程中的常见问题和最佳实践,帮助读者在实际项目中更好地应用多线程技术。
12 3
|
7天前
|
监控 安全 Java
Java多线程编程的艺术与实践
【10月更文挑战第22天】 在现代软件开发中,多线程编程是一项不可或缺的技能。本文将深入探讨Java多线程编程的核心概念、常见问题以及最佳实践,帮助开发者掌握这一强大的工具。我们将从基础概念入手,逐步深入到高级主题,包括线程的创建与管理、同步机制、线程池的使用等。通过实际案例分析,本文旨在提供一种系统化的学习方法,使读者能够在实际项目中灵活运用多线程技术。
|
6天前
|
缓存 安全 Java
Java中的多线程编程:从基础到实践
【10月更文挑战第24天】 本文将深入探讨Java中的多线程编程,包括其基本原理、实现方式以及常见问题。我们将从简单的线程创建开始,逐步深入了解线程的生命周期、同步机制、并发工具类等高级主题。通过实际案例和代码示例,帮助读者掌握多线程编程的核心概念和技术,提高程序的性能和可靠性。
8 2
|
6天前
|
Java
Java中的多线程编程:从基础到实践
本文深入探讨Java多线程编程,首先介绍多线程的基本概念和重要性,接着详细讲解如何在Java中创建和管理线程,最后通过实例演示多线程的实际应用。文章旨在帮助读者理解多线程的核心原理,掌握基本的多线程操作,并能够在实际项目中灵活运用多线程技术。
|
8天前
|
Java 数据处理 开发者
Java多线程编程的艺术:从入门到精通####
【10月更文挑战第21天】 本文将深入探讨Java多线程编程的核心概念,通过生动实例和实用技巧,引导读者从基础认知迈向高效并发编程的殿堂。我们将一起揭开线程管理的神秘面纱,掌握同步机制的精髓,并学习如何在实际项目中灵活运用这些知识,以提升应用性能与响应速度。 ####
30 3
|
11天前
|
Java API 调度
Java中的多线程编程:理解与实践
本文旨在为读者提供对Java多线程编程的深入理解,包括其基本概念、实现方式以及常见问题的解决方案。通过阅读本文,读者将能够掌握Java多线程编程的核心知识,提高自己在并发编程方面的技能。
|
9天前
|
Java
Java中的多线程编程:从入门到精通
本文将带你深入了解Java中的多线程编程。我们将从基础概念开始,逐步深入探讨线程的创建、启动、同步和通信等关键知识点。通过阅读本文,你将能够掌握Java多线程编程的基本技能,为进一步学习和应用打下坚实的基础。