继上文,继续完善我们的匹配系统
完善 match_client
直接上代码:
... # 1.从终端读取输入 from sys import stdin def operate(op, user_id, username, score): transport = TSocket.TSocket('localhost', 9090) transport = TTransport.TBufferedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = Match.Client(protocol) transport.open() # 2.远程调用 user = User(user_id, username, score) if op == "add": client.add_user(user, "") elif op == "remove": client.remove_user(user, "") transport.close() def main(): # 3.读取用空格分开的输入 for line in stdin: op, user_id, username, score = line.split(" ") operate(op, int(user_id), username, int(score)) if __name__ == "__main__": main() 复制代码
读取输入,远程调用:
只需要简单修改几句 thrift 帮我们生成的代码,我们的客户端就大体完成了。
完善 match_server
关于匹配机制,有以下几点:
- 定义一个消息队列,处理客户端传过来的
add_user
和remove_user
的命令 - 定义一个用户池,将用户放到同一个用户池中来进行匹配
- 添加和移除不能同时进行,要有锁
- 经典的生产者-消费者问题
上代码,引入一些必要的头文件,然后定义消息队列的结构:
// match_server/main.cpp ... #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <vector> #include <unistd.h> ... struct Task { User user; string type; }; struct MsgQueue { queue<Task> q; mutex m; condition_variable cv; }msg_queue; 复制代码
处理队列,消费者,注意锁的运用,没有消息时阻塞线程,等待唤醒:
void consume_task() { while(true) { unique_lock<mutex> lck(msg_queue.m); if (msg_queue.q.empty()) { msg_queue.cv.wait(lck); } else { auto task = msg_queue.q.front(); msg_queue.q.pop(); // 别忘了解锁 lck.unlock(); // 在用户池里添加或删除 if (task.type == "add") pool.add(task.user); else if (task.type == "remove") pool.remove(task.user); // 每次处理完消息都在用户池进行一次匹配 pool.match(); } } } int main(int argc, char **argv) { ... cout << "Start match server 😎" << endl; // 开一个单独的线程给消费者 thread matching_thread(consume_task); server.serve(); return 0; } 复制代码
添加以及移除用户方法,每次上锁只能有一个方法操作队列,然后唤醒消费线程:
class MatchHandler : virtual public MatchIf { public: MatchHandler() { } int32_t add_user(const User& user, const std::string& info) { printf("add_user\n"); unique_lock<mutex> lck(msg_queue.m); msg_queue.q.push({user, "add"}); msg_queue.cv.notify_all(); return 0; } int32_t remove_user(const User& user, const std::string& info) { printf("remove_user\n"); unique_lock<mutex> lck(msg_queue.m); msg_queue.q.push({user, "remove"}); msg_queue.cv.notify_all(); return 0; } }; 复制代码
新建用户池,用于匹配,这里采取了最简单的匹配方法,只要用户池里有两个人就匹配(后续可以优化):
class Pool { public: void save_result(int a, int b) { // 打印匹配结果 printf("Match Result: %d %d\n", a, b); } void match() { while (users.size() > 1) { // 两个人就匹配 auto a = users[0], b = users[1]; users.erase(users.begin()); users.erase(users.begin()); save_result(a.id, b.id); } } void add(User user) { users.push_back(user); } void remove(User user) { for (uint32_t i = 0; i < users.size(); i++){ if (users[i].id == user.id) { users.erase(users.begin()+i); break; } } } private: // 用户池 vector<User> users; }pool; 复制代码
编译运行,可以看到已经跑通了:
就到这里吧,具体代码可以查看github,后续优化手段有很多,我在这里抛砖引玉:
- 分差
- 时间
- 多线程
- ...