nanomsg实验——pubsub

简介:

nanomsg实验——pubsub

发布订阅模式是很多消息中间件提供的常见功能。通过消息机制,能够将消息发布者和消息接收(消费)者
进行解耦。pubsub模式也是nanomsg直接支持的一直消息模型之一,因此通过pubsub模式实验,
同时也大致了解了下nanomsg的基础用法。

服务端

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>

void usage(const char *name)
{
    fprintf(stderr, "%s [ bind url]n", name);
}

int main(int argc, char **argv)
{
    if(argc != 2) {
        usage(argv[0]);
        exit(-1);
    }

    const char *url = argv[1];
    int sock = nn_socket(AF_SP, NN_PUB);
    if(sock < 0) {
        fprintf (stderr, "nn_socket failed: %sn", nn_strerror (errno));
        exit(-1);
    }

    if(nn_bind(sock, url) < 0) {
        fprintf(stderr, "nn_bind failed: %sn", nn_strerror(errno));
        exit(-1);
    }

    while(1) {
        time_t rawtime;
        struct tm * timeinfo;

        time (&rawtime);
        timeinfo = localtime (&rawtime);
        char *text = asctime (timeinfo);
        int textLen = strlen(text);
        text[textLen - 1] = '';

        printf ("SERVER: PUBLISHING DATE %sn", text);
        nn_send(sock, text, textLen, 0);
        sleep(1);
    }

    return 0;
}


nanomsg使用非常简单,只要直接include nanomsg/nn.h,即可使用基本API。使用内置的通信模式,
需要引入对应的头文件,如pubsub模式,引入nonomsg/pubsub.h即可。

pubsub server,需要首先通过nn_socket调用创建socket,这里模仿了POSIX接口,
函数返回一个文件描述符。因此直接通过判断返回值是否大于0,判断是否创建成功。注意第二个参数为协议,
在协议相关头文件中会定义对应的宏。然后所有操作都将基于这个文件描述符。
和berkeley sockets一样,server需要bind一个端口,nanomsg需要bind一个url。目前nanomsg支持的格式有:
* 进程内通信(inproc):url格式为inproc://test
* 进程间同in想(ipc):url格式为ipc:///tmp/test.ipc
* tcp通信:url格式为tcp://*:5555

github上源码貌似已经支持websocket了。

nanomsg的错误和UNIX相同,失败之后会设置errno,可以通过nn_strerror获取对应的错误文本。

bind完了之后,就可以通过nn_send函数向socket发送消息了。这个函数参数和berkeley sockets api接口类似。
这里直接获取当前时间,然后发出给所有订阅者。

客户端

#include <stdio.h>
#include <stdlib.h>

#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>

int main(int argc, char **argv)
{
    if(argc != 3) {
        fprintf(stderr, "usage: %s NAME BIND_URLn", argv[0]);
        exit(-1);
    }
    const char *name = argv[1];
    const char *url = argv[2];

    int sock = nn_socket (AF_SP, NN_SUB);
    if(sock < 0) {
        fprintf(stderr, "fail to create socket: %sn", nn_strerror(errno));
        exit(-1);
    }
    if(nn_setsockopt (sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) {
        fprintf(stderr, "fail to set sorket opts: %sn", nn_strerror(errno));
        exit(-1);
    }

    if (nn_connect(sock, url) < 0) {
        fprintf(stderr, "fail to connect to %s : %sn", url, nn_strerror(errno));
        exit(-1);
    }


    while ( 1 ) {
        char *buf = NULL;
        int bytes = nn_recv (sock, &buf, NN_MSG, 0);
        printf ("CLIENT (%s): RECEIVED %sn", name, buf);
        nn_freemsg (buf);
    }

    nn_shutdown(sock, 0);

    return 0;
}


客户端初始化和服务端差不多,在连接服务端之前,需要通过nn_setsockopt将当前socket设置成消息订阅者。
然后通过nn_connect连接发布者,参数和服务端bind的差不多,也是一个socket、一个url。
这里的url要和服务端bind的url相同。之后就是一个死循环不停的接收发布者的消息。

测试

首先是编译,和普通c程序相同,只是增加链接nanomsg。


gcc -o pubserver pubserver.c -lnanomsg
gcc -o pubclient pubclient.c -lnanomsg

为了方便测试,写了一个简单的shell脚本:


#!/bin/bash

BASE="$( cd "$( dirname "$0" )" && pwd )"
PUB=$BASE/pubserver
SUB=$BASE/pubclient

URL="tcp://127.0.0.1:1234"

echo "start pubserver to bind tcp: $URL"

$PUB tcp://127.0.0.1:1234 &

echo "start to start pubclient"
for((i = 0; i < 10; i++))
    do
    echo "start client$i"
    $SUB client$i $URL &
    sleep 1
done

sleep 20
echo "kill all process and exit"

for pid in `jobs -p`
do
    echo "kill $pid"
    kill $pid
done

wait

脚本很简单,首先启动一个消息发布者,然后每秒启动一个消息接受者。等待20s之后,kill掉所有子进程。

脚本的输出:


start pubserver to bind tcp: tcp://127.0.0.1:1234
start to start pubclient
start client0
SERVER: PUBLISHING DATE Tue Feb 17 15:12:11 2015
start client1
SERVER: PUBLISHING DATE Tue Feb 17 15:12:12 2015
CLIENT (client0): RECEIVED Tue Feb 17 15:12:12 2015
CLIENT (client1): RECEIVED Tue Feb 17 15:12:12 2015
start client2
SERVER: PUBLISHING DATE Tue Feb 17 15:12:13 2015
CLIENT (client0): RECEIVED Tue Feb 17 15:12:13 2015
CLIENT (client1): RECEIVED Tue Feb 17 15:12:13 2015
CLIENT (client2): RECEIVED Tue Feb 17 15:12:13 2015
start client3
SERVER: PUBLISHING DATE Tue Feb 17 15:12:14 2015
CLIENT (client0): RECEIVED Tue Feb 17 15:12:14 2015
CLIENT (client1): RECEIVED Tue Feb 17 15:12:14 2015
CLIENT (client2): RECEIVED Tue Feb 17 15:12:14 2015
...
SERVER: PUBLISHING DATE Tue Feb 17 15:12:41 2015
CLIENT (client0): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client1): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client2): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client3): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client4): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client5): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client6): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client7): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client8): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client9): RECEIVED Tue Feb 17 15:12:41 2015
kill all process and exit

可以看见每次启动一个新的订阅者,每个订阅者都能够收到发布者发布的当前时间。

目录
相关文章
|
消息中间件 存储 缓存
Linux下kafka之C/C++客户端库librdkafka的编译,安装以及函数介绍(1)
Linux下kafka之C/C++客户端库librdkafka的编译,安装以及函数介绍
4013 0
|
Ubuntu Linux
在Linux中如何解压 .xz 和 tar.xz 文件?
【4月更文挑战第17天】
15215 6
在Linux中如何解压 .xz 和 tar.xz 文件?
|
缓存
银河麒麟server-V10配置镜像源
银河麒麟server-V10配置镜像源
20243 1
|
数据采集 机器学习/深度学习 数据挖掘
python数据分析——数据预处理
数据预处理是数据分析过程中不可或缺的一环,它的目的是为了使原始数据更加规整、清晰,以便于后续的数据分析和建模工作。在Python数据分析中,数据预处理通常包括数据清洗、数据转换和数据特征工程等步骤。
751 0
|
6月前
|
机器学习/深度学习 测试技术
先SFT后RL但是效果不佳?你可能没用好“离线专家数据”!
通义实验室Trinity-RFT团队提出CHORD框架,通过动态融合SFT与RL,解决大模型训练中“越学越差”“顾此失彼”等问题。该框架引入细粒度Token级权重与软过渡机制,实现从模仿到超越的高效学习,在数学推理与通用任务上均显著提升性能,相关代码已开源。
833 0
|
网络协议 Linux 网络安全
文件共享同步5种方式:NFS、NAS、rsync、scp、ftp
谈到文件同步,我们最直接的同步方式是采用rsync的同步软件,rsync同步可以保持server和client的强一致(server中的增删改都会同步client),但在实际场景中rsync可能并不能被采纳。
19210 0
|
设计模式 前端开发 C语言
【设计模式】 观察者模式介绍及C代码实现
观察者模式(Observer Pattern)是一种常用的设计模式,它定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象,当主题对象发生变化时,它的所有观察者都会收到通知并更新自己的状态。观察者模式又称为发布-订阅模式。Subject(主题):被观察的对象,它将所有观察者对象的引用保存在一个集合中,并提供了添加和删除观察者对象的方法。Observer(观察者):观察者接口,定义了更新自己的状态的方法,以便主题在状态发生变化时通知观察者。ConcreteSubject(具体主题)
878 0
【设计模式】 观察者模式介绍及C代码实现
|
物联网 API 数据库
一文带你认识蓝牙 GATT 协议
正所谓磨刀不误砍柴工,我们有必要先深入的学习一下 GATT 以及 GATT 相关的一些知识。 本文我们就来了解一下 蓝牙 GATT 到底是什么?同时了解下我们使用的 ESP32-C3 GATT示例的工程的代码结构。
9450 5
一文带你认识蓝牙 GATT 协议
|
数据采集 XML 运维
什么是主数据管理?企业主数据管理方法论
主数据又被称为黄金数据,其价值高也非常重要。对企业来说,主数据的重要性如何强调都不为过,主数据治理是企业数据治理中最为重要的一环。主数据管理的内容包括 主数据管理标准、主数据应用标准 和 主数据集成服务标准 三大类。
|
Ubuntu 安全 网络协议