生产者:
#include <iostream> #include <string> #include "mqtt/async_client.h" const std::string SERVER_ADDRESS = "tcp://broker.example.com:1883"; const std::string CLIENT_ID = "publish_client"; const std::string TOPIC = "test/topic"; const int QOS = 1; class mqtt_callback : public virtual mqtt::callback { public: void connection_lost(const std::string& cause) override { std::cout << "\nConnection lost." << std::endl; if (!cause.empty()) std::cout << "Cause: " << cause << std::endl; } void delivery_complete(mqtt::delivery_token_ptr token) override {} }; int main(int argc, char* argv[]) { mqtt::async_client client(SERVER_ADDRESS, CLIENT_ID); mqtt_callback callback; client.set_callback(callback); mqtt::connect_options connOpts; connOpts.set_clean_session(true); try { client.connect(connOpts)->wait(); for (int i = 0; i < 100000; i++) { std::string message = std::to_string(i); mqtt::message_ptr pubmsg = mqtt::make_message(TOPIC, message); pubmsg->set_qos(QOS); client.publish(pubmsg)->wait(); } client.disconnect()->wait(); } catch (const mqtt::exception& exc) { std::cerr << "\nError: " << exc.what() << std::endl; return 1; } return 0; }
测试发现推送数据量比较大的时候,消费端消费的数据不及时。分析:wait()函数是个阻塞函数,会等待服务器返回ack消息后生产者才会往下继续执行,发送下一条指令。使用wireshark抓包可以看出。
临时解决方案:删除wait()函数,不需要等待服务器的ack返回。