发布订阅的简单实现

简介: 发布订阅的简单实现

publisher.lua

文件下载:publisher.lua

local zmq = require("lzmq")
local timer = require("lzmq.timer")
local zassert = zmq.assert
local context = zmq.context()
local address = 'tcp://127.0.0.1:10000'
local zmq_pub, err = context:socket(zmq.PUB, { bind = address })
zassert(zmq_pub, err)
print("Create publisher server: ", address)
-- get the time stamp string wih format: %Y-%m-%d %H:%M:%S.ms
function timeStamp()
    local ms = timer.absolute_time() -- get ms use lzmp.timmer.absolute_time()
    local s = math.floor(ms / 1000) -- second
    local date = os.date("%Y-%m-%d %H:%M:%S.", s) -- second to data
    local sub_ms = ms - s * 1000 -- just ms
    return date .. tostring(sub_ms)
end
-- publish message
while true do
    io.write("Publisher> ") -- prompt keyword: "Publisher> "
    io.flush()
    local cmd = io.read("*line") -- read message from cmd line
    if (cmd and #cmd > 0) then
        local ret, err = zmq_pub:send("101", zmq.SNDMORE) -- zmq.SNDMORE: 表示发送的消息由多个消息帧组成
        local ret, err = zmq_pub:send(cmd .. "\r\n")
        if (ret) then
            print(timeStamp(), "[SEND]:", cmd)
        else
            print("[ERROR]:", err)
        end
    end
end
点击复制复制失败已复制


subscriber.lua

文件下载:subscriber.lua

local zmq = require("lzmq")
local timer = require("lzmq.timer")
local zassert = zmq.assert
local zpoller = require("lzmq.poller")
local context = zmq.context();
local address = 'tcp://127.0.0.1:10000'
local zmq_sub, err = context:socket{zmq.SUB, subscribe = "101"; connect = address; }
zassert(zmq_sub,err);
print("[Subscriber]: ","Create subscriber with address : ",address);
function timeStamp()
    local ms = timer.absolute_time()
    local s = math.floor(ms/1000)
    local date = os.date("%Y-%m-%d %H:%M:%S.", s)
    local sub_ms = ms-s*1000
    return date..tostring(sub_ms)
end
--poller()解决一个线程中有多个sokect同时需要收发数据时,不用在send()或者recv()时阻塞socket
--在recv()端接受信息的用zmq.POLLIN
--在send()端发送消息的用zmq.POLLOUT
local poller = zpoller.new(2)
--此处暂时只能用闭包函数,尝试单独写函数来实现时出错
poller:add(zmq_sub, zmq.POLLIN, function()
    io.write("Receiver> ")
    io.flush()
    print(timeStamp().."\t[REV:]\t"..zmq_sub:recv())
end)
poller:start()
目录
相关文章
|
7月前
|
消息中间件 Cloud Native Kafka
一文搞懂 Kafka consumer 与 broker 交互机制与原理
AutoMQ致力于打造下一代云原生Kafka系统,解决Kafka痛点。本文深入解析Kafka Consumer与Broker的交互机制,涉及消费者角色、核心组件及常用接口。消费者以group形式工作,包括leader和follower。交互流程涵盖FindCoordinator、JoinGroup、SyncGroup、拉取消息和退出过程。文章还探讨了broker的consumer group状态管理和rebalance原理。AutoMQ团队分享Kafka技术,感兴趣的话可以关注他们。
588 3
一文搞懂 Kafka consumer 与 broker 交互机制与原理
|
消息中间件 存储 Java
消息中间件系列教程(09) -RabbitMQ -案例代码(发布订阅模式)
消息中间件系列教程(09) -RabbitMQ -案例代码(发布订阅模式)
64 0
|
消息中间件 Java Maven
消息中间件系列教程(08) -RabbitMQ -案例代码(工作队列模式)
消息中间件系列教程(08) -RabbitMQ -案例代码(工作队列模式)
63 0
|
7月前
|
消息中间件 Java BI
RabbitMQ的四种消息传递模式与演示代码
RabbitMQ的四种消息传递模式与演示代码
92 0
|
安全
MQ的优缺点 及 不同MQ的区别
MQ的优缺点 及 不同MQ的区别
156 0
|
7月前
|
消息中间件 设计模式 前端开发
【面试题】说说你对发布订阅、观察者模式的理解?区别?
【面试题】说说你对发布订阅、观察者模式的理解?区别?
106 0
|
消息中间件 算法 Java
RabbitMQ灵活运用,怎么理解五种消息模型
RabbitMQ灵活运用,怎么理解五种消息模型
153 0
|
设计模式 消息中间件 Java
SpringBoot事件监听机制及观察者/发布订阅模式详解
介绍观察者模式和发布订阅模式的区别。 SpringBoot快速入门事件监听。 什么是观察者模式? 观察者模式是经典行为型设计模式之一。 在GoF的《设计模式》中,观察者模式的定义:在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。如果你觉得比较抽象,接下来这个例子应该会让你有所感觉:
|
消息中间件 设计模式 Java
SpringBoot事件监听机制及观察者模式/发布订阅模式
SpringBoot事件监听机制及观察者模式/发布订阅模式
394 0
|
设计模式
关于观察者模式/发布订阅模式我所知道的
关于观察者模式/发布订阅模式我所知道的
105 0