publisher.lua
文件下载:publisher.lua
local zmq = require("lzmq") local timer = require("lzmq.timer") local zassert = zmq.assert local context = zmq.context() local address = 'tcp://127.0.0.1:10000' local zmq_pub, err = context:socket(zmq.PUB, { bind = address }) zassert(zmq_pub, err) print("Create publisher server: ", address) -- get the time stamp string wih format: %Y-%m-%d %H:%M:%S.ms function timeStamp() local ms = timer.absolute_time() -- get ms use lzmp.timmer.absolute_time() local s = math.floor(ms / 1000) -- second local date = os.date("%Y-%m-%d %H:%M:%S.", s) -- second to data local sub_ms = ms - s * 1000 -- just ms return date .. tostring(sub_ms) end -- publish message while true do io.write("Publisher> ") -- prompt keyword: "Publisher> " io.flush() local cmd = io.read("*line") -- read message from cmd line if (cmd and #cmd > 0) then local ret, err = zmq_pub:send("101", zmq.SNDMORE) -- zmq.SNDMORE: 表示发送的消息由多个消息帧组成 local ret, err = zmq_pub:send(cmd .. "\r\n") if (ret) then print(timeStamp(), "[SEND]:", cmd) else print("[ERROR]:", err) end end end 点击复制复制失败已复制
subscriber.lua
文件下载:subscriber.lua
local zmq = require("lzmq") local timer = require("lzmq.timer") local zassert = zmq.assert local zpoller = require("lzmq.poller") local context = zmq.context(); local address = 'tcp://127.0.0.1:10000' local zmq_sub, err = context:socket{zmq.SUB, subscribe = "101"; connect = address; } zassert(zmq_sub,err); print("[Subscriber]: ","Create subscriber with address : ",address); function timeStamp() local ms = timer.absolute_time() local s = math.floor(ms/1000) local date = os.date("%Y-%m-%d %H:%M:%S.", s) local sub_ms = ms-s*1000 return date..tostring(sub_ms) end --poller()解决一个线程中有多个sokect同时需要收发数据时,不用在send()或者recv()时阻塞socket --在recv()端接受信息的用zmq.POLLIN --在send()端发送消息的用zmq.POLLOUT local poller = zpoller.new(2) --此处暂时只能用闭包函数,尝试单独写函数来实现时出错 poller:add(zmq_sub, zmq.POLLIN, function() io.write("Receiver> ") io.flush() print(timeStamp().."\t[REV:]\t"..zmq_sub:recv()) end) poller:start()