发布订阅的简单实现

简介: 发布订阅的简单实现

publisher.lua

文件下载:publisher.lua

local zmq = require("lzmq")
local timer = require("lzmq.timer")
local zassert = zmq.assert
local context = zmq.context()
local address = 'tcp://127.0.0.1:10000'
local zmq_pub, err = context:socket(zmq.PUB, { bind = address })
zassert(zmq_pub, err)
print("Create publisher server: ", address)
-- get the time stamp string wih format: %Y-%m-%d %H:%M:%S.ms
function timeStamp()
    local ms = timer.absolute_time() -- get ms use lzmp.timmer.absolute_time()
    local s = math.floor(ms / 1000) -- second
    local date = os.date("%Y-%m-%d %H:%M:%S.", s) -- second to data
    local sub_ms = ms - s * 1000 -- just ms
    return date .. tostring(sub_ms)
end
-- publish message
while true do
    io.write("Publisher> ") -- prompt keyword: "Publisher> "
    io.flush()
    local cmd = io.read("*line") -- read message from cmd line
    if (cmd and #cmd > 0) then
        local ret, err = zmq_pub:send("101", zmq.SNDMORE) -- zmq.SNDMORE: 表示发送的消息由多个消息帧组成
        local ret, err = zmq_pub:send(cmd .. "\r\n")
        if (ret) then
            print(timeStamp(), "[SEND]:", cmd)
        else
            print("[ERROR]:", err)
        end
    end
end
点击复制复制失败已复制


subscriber.lua

文件下载:subscriber.lua

local zmq = require("lzmq")
local timer = require("lzmq.timer")
local zassert = zmq.assert
local zpoller = require("lzmq.poller")
local context = zmq.context();
local address = 'tcp://127.0.0.1:10000'
local zmq_sub, err = context:socket{zmq.SUB, subscribe = "101"; connect = address; }
zassert(zmq_sub,err);
print("[Subscriber]: ","Create subscriber with address : ",address);
function timeStamp()
    local ms = timer.absolute_time()
    local s = math.floor(ms/1000)
    local date = os.date("%Y-%m-%d %H:%M:%S.", s)
    local sub_ms = ms-s*1000
    return date..tostring(sub_ms)
end
--poller()解决一个线程中有多个sokect同时需要收发数据时,不用在send()或者recv()时阻塞socket
--在recv()端接受信息的用zmq.POLLIN
--在send()端发送消息的用zmq.POLLOUT
local poller = zpoller.new(2)
--此处暂时只能用闭包函数,尝试单独写函数来实现时出错
poller:add(zmq_sub, zmq.POLLIN, function()
    io.write("Receiver> ")
    io.flush()
    print(timeStamp().."\t[REV:]\t"..zmq_sub:recv())
end)
poller:start()
目录
相关文章
|
关系型数据库 数据库 PostgreSQL
云数据库RDS PostgreSQL 快速入门(二)
云数据库RDS PostgreSQL 快速入门(二)
220 1
|
Java Go 调度
Golang中的协程(goroutine)
Golang中的协程(goroutine)
127 0
|
存储 监控 网络协议
Consul简介和安装
Consul 是一套开源的分布式服务发现和配置管理系统,由 HashiCorp 公司用 Go 语言开发。
Consul简介和安装
|
JavaScript 前端开发 API
12 个 GitHub 上超火的 JavaScript 奇技淫巧项目,找到写 JavaScript 的灵感!(上)
12 个 GitHub 上超火的 JavaScript 奇技淫巧项目,找到写 JavaScript 的灵感!(上)
197 0
12 个 GitHub 上超火的 JavaScript 奇技淫巧项目,找到写 JavaScript 的灵感!(上)
|
7天前
|
人工智能 运维 安全
|
5天前
|
人工智能 异构计算
敬请锁定《C位面对面》,洞察通用计算如何在AI时代持续赋能企业创新,助力业务发展!
敬请锁定《C位面对面》,洞察通用计算如何在AI时代持续赋能企业创新,助力业务发展!
|
6天前
|
机器学习/深度学习 人工智能 自然语言处理
B站开源IndexTTS2,用极致表现力颠覆听觉体验
在语音合成技术不断演进的背景下,早期版本的IndexTTS虽然在多场景应用中展现出良好的表现,但在情感表达的细腻度与时长控制的精准性方面仍存在提升空间。为了解决这些问题,并进一步推动零样本语音合成在实际场景中的落地能力,B站语音团队对模型架构与训练策略进行了深度优化,推出了全新一代语音合成模型——IndexTTS2 。
607 21