C++企业项目实战(三)

简介: 教程来源 https://hllft.cn/category/artificial-intelligence.html KivaDB网络层基于epoll实现高性能事件循环,支持文件/时间事件调度与跨线程任务投递;TCP连接管理封装读写缓冲与状态机;持久化模块提供RDB快照功能,支持主进程保存与fork后台保存,具备魔数校验、版本标识及原子替换机制。

第四部分:网络层实现

4.1 事件循环(epoll封装)

// src/network/event_loop.h
#ifndef KIVADB_EVENT_LOOP_H
#define KIVADB_EVENT_LOOP_H

#include <functional>
#include <memory>
#include <vector>
#include <chrono>
#include <mutex>
#include <atomic>

namespace kivadb {

// 事件类型
enum EventType {
    EVENT_NONE = 0,
    EVENT_READABLE = 1,
    EVENT_WRITABLE = 2
};

// 事件回调函数类型
using EventCallback = std::function<void(int fd, int events, void* data)>;

// 时间事件回调函数类型
using TimeEventCallback = std::function<bool()>;

// 事件循环类
class EventLoop : public NonCopyable {
public:
    EventLoop();
    ~EventLoop();

    // 启动事件循环
    void loop();

    // 停止事件循环
    void stop();

    // 添加文件事件
    void addFileEvent(int fd, int events, EventCallback cb, void* data);

    // 修改文件事件
    void modifyFileEvent(int fd, int events);

    // 删除文件事件
    void removeFileEvent(int fd, int events);

    // 添加时间事件(毫秒)
    long long addTimeEvent(int milliseconds, TimeEventCallback cb);

    // 删除时间事件
    void removeTimeEvent(long long id);

    // 是否在当前线程运行
    bool isInLoopThread() const { return thread_id_ == std::this_thread::get_id(); }

    // 断言在当前线程
    void assertInLoopThread() { assert(isInLoopThread()); }

    // 跨线程调用
    void runInLoop(std::function<void()> cb);
    void queueInLoop(std::function<void()> cb);

private:
    void updateFileEvent(int fd, int events);
    void wakeup();
    void handleWakeup();
    void doPendingFunctors();

    int epoll_fd_;
    int wakeup_fd_;
    std::atomic<bool> stop_;
    std::thread::id thread_id_;

    struct FileEvent {
        int fd;
        int events;
        EventCallback read_callback;
        EventCallback write_callback;
        void* data;
    };

    std::vector<FileEvent> file_events_;
    std::vector<struct epoll_event> epoll_events_;

    struct TimeEvent {
        long long id;
        std::chrono::steady_clock::time_point when;
        TimeEventCallback callback;
    };

    std::vector<TimeEvent> time_events_;
    long long next_time_event_id_;
    std::mutex mutex_;
    std::vector<std::function<void()>> pending_functors_;
};

} // namespace kivadb

#endif // KIVADB_EVENT_LOOP_H
// src/network/event_loop.cpp
#include "event_loop.h"
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <unistd.h>
#include <cassert>
#include <algorithm>

namespace kivadb {

EventLoop::EventLoop()
    : stop_(false)
    , thread_id_(std::this_thread::get_id())
    , next_time_event_id_(0) {

    // 创建epoll实例
    epoll_fd_ = epoll_create1(EPOLL_CLOEXEC);
    if (epoll_fd_ == -1) {
        LOG_FATAL("Failed to create epoll: %s", strerror(errno));
    }

    // 创建eventfd用于唤醒事件循环
    wakeup_fd_ = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (wakeup_fd_ == -1) {
        LOG_FATAL("Failed to create eventfd: %s", strerror(errno));
    }

    // 添加wakeup_fd到epoll
    addFileEvent(wakeup_fd_, EVENT_READABLE, 
        [this](int fd, int events, void* data) { this->handleWakeup(); }, nullptr);
}

EventLoop::~EventLoop() {
    close(epoll_fd_);
    close(wakeup_fd_);
}

void EventLoop::loop() {
    assertInLoopThread();
    LOG_INFO("EventLoop started");

    while (!stop_) {
        // 计算下一个时间事件的超时时间
        int timeout = -1;
        if (!time_events_.empty()) {
            auto now = std::chrono::steady_clock::now();
            auto next = time_events_[0].when;
            auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(next - now);
            timeout = diff.count() > 0 ? diff.count() : 0;
        }

        // 等待事件
        int num_events = epoll_wait(epoll_fd_, epoll_events_.data(), 
                                     epoll_events_.size(), timeout);

        if (num_events < 0) {
            if (errno != EINTR) {
                LOG_ERROR("epoll_wait error: %s", strerror(errno));
            }
            continue;
        }

        // 处理文件事件
        for (int i = 0; i < num_events; ++i) {
            int fd = epoll_events_[i].data.fd;
            int events = 0;
            if (epoll_events_[i].events & EPOLLIN) events |= EVENT_READABLE;
            if (epoll_events_[i].events & EPOLLOUT) events |= EVENT_WRITABLE;

            auto& fe = file_events_[fd];
            if (fe.events & EVENT_READABLE && events & EVENT_READABLE) {
                fe.read_callback(fd, events, fe.data);
            }
            if (fe.events & EVENT_WRITABLE && events & EVENT_WRITABLE) {
                fe.write_callback(fd, events, fe.data);
            }
        }

        // 处理时间事件
        auto now = std::chrono::steady_clock::now();
        for (auto it = time_events_.begin(); it != time_events_.end(); ) {
            if (it->when <= now) {
                bool should_continue = it->callback();
                if (!should_continue) {
                    it = time_events_.erase(it);
                } else {
                    ++it;
                }
            } else {
                ++it;
            }
        }

        // 执行跨线程任务
        doPendingFunctors();
    }

    LOG_INFO("EventLoop stopped");
}

void EventLoop::addFileEvent(int fd, int events, EventCallback cb, void* data) {
    assertInLoopThread();

    if (fd >= (int)file_events_.size()) {
        file_events_.resize(fd + 1);
    }

    auto& fe = file_events_[fd];
    fe.fd = fd;
    fe.events |= events;
    if (events & EVENT_READABLE) {
        fe.read_callback = std::move(cb);
    }
    if (events & EVENT_WRITABLE) {
        fe.write_callback = std::move(cb);
    }
    fe.data = data;

    updateFileEvent(fd, fe.events);
}

void EventLoop::updateFileEvent(int fd, int events) {
    struct epoll_event ev;
    ev.events = 0;
    if (events & EVENT_READABLE) ev.events |= EPOLLIN;
    if (events & EVENT_WRITABLE) ev.events |= EPOLLOUT;
    ev.data.fd = fd;

    if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev) == -1) {
        if (errno == ENOENT) {
            epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
        }
    }
}

void EventLoop::removeFileEvent(int fd, int events) {
    assertInLoopThread();

    auto& fe = file_events_[fd];
    fe.events &= ~events;

    if (fe.events == 0) {
        epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
    } else {
        updateFileEvent(fd, fe.events);
    }
}

long long EventLoop::addTimeEvent(int milliseconds, TimeEventCallback cb) {
    assertInLoopThread();

    long long id = ++next_time_event_id_;
    auto when = std::chrono::steady_clock::now() + std::chrono::milliseconds(milliseconds);

    time_events_.push_back({id, when, std::move(cb)});
    std::sort(time_events_.begin(), time_events_.end(),
        [](const TimeEvent& a, const TimeEvent& b) { return a.when < b.when; });

    return id;
}

void EventLoop::queueInLoop(std::function<void()> cb) {
    {
        std::lock_guard<std::mutex> lock(mutex_);
        pending_functors_.push_back(std::move(cb));
    }

    // 如果不是当前线程,唤醒事件循环
    if (!isInLoopThread()) {
        wakeup();
    }
}

void EventLoop::wakeup() {
    uint64_t one = 1;
    ssize_t n = write(wakeup_fd_, &one, sizeof(one));
    (void)n; // 忽略返回值
}

void EventLoop::handleWakeup() {
    uint64_t one;
    ssize_t n = read(wakeup_fd_, &one, sizeof(one));
    (void)n;
}

void EventLoop::doPendingFunctors() {
    std::vector<std::function<void()>> functors;
    {
        std::lock_guard<std::mutex> lock(mutex_);
        functors.swap(pending_functors_);
    }

    for (auto& f : functors) {
        f();
    }
}

} // namespace kivadb

4.2 TCP连接管理

// src/network/connection.h
#ifndef KIVADB_CONNECTION_H
#define KIVADB_CONNECTION_H

#include <memory>
#include <string>
#include <vector>
#include <functional>

namespace kivadb {

class EventLoop;

// 连接状态
enum ConnectionState {
    CONN_STATE_CONNECTING,
    CONN_STATE_CONNECTED,
    CONN_STATE_DISCONNECTING,
    CONN_STATE_DISCONNECTED
};

// 连接类
class Connection : public std::enable_shared_from_this<Connection> {
public:
    using Ptr = std::shared_ptr<Connection>;
    using CloseCallback = std::function<void(const Ptr&)>;
    using MessageCallback = std::function<void(const Ptr&, const char* data, size_t len)>;

    Connection(EventLoop* loop, int fd, const std::string& peer_addr);
    ~Connection();

    // 连接操作
    void send(const std::string& data);
    void send(const char* data, size_t len);
    void shutdown();
    void close();

    // 获取信息
    int fd() const { return fd_; }
    const std::string& peerAddr() const { return peer_addr_; }
    ConnectionState state() const { return state_; }

    // 设置回调
    void setCloseCallback(CloseCallback cb) { close_callback_ = std::move(cb); }
    void setMessageCallback(MessageCallback cb) { message_callback_ = std::move(cb); }

private:
    void handleRead();
    void handleWrite();
    void handleError();
    void sendInLoop(const char* data, size_t len);

    EventLoop* loop_;
    int fd_;
    std::string peer_addr_;
    ConnectionState state_;

    std::vector<char> input_buffer_;
    std::vector<char> output_buffer_;

    CloseCallback close_callback_;
    MessageCallback message_callback_;
};

} // namespace kivadb

#endif // KIVADB_CONNECTION_H

第五部分:持久化模块

5.1 RDB快照实现

// src/persistence/rdb.h
#ifndef KIVADB_RDB_H
#define KIVADB_RDB_H

#include <string>

namespace kivadb {

class Dict;

// RDB保存结果
enum RdbSaveResult {
    RDB_OK = 0,
    RDB_ERR = -1
};

// RDB加载结果
enum RdbLoadResult {
    RDB_LOAD_OK = 0,
    RDB_LOAD_ERR = -1,
    RDB_LOAD_EOF = -2
};

// 保存RDB快照
int rdbSave(const std::string& filename, Dict* dict);

// 加载RDB快照
int rdbLoad(const std::string& filename, Dict* dict);

// 后台保存RDB
int rdbSaveBackground(const std::string& filename, Dict* dict);

} // namespace kivadb

#endif // KIVADB_RDB_H
// src/persistence/rdb.cpp
#include "rdb.h"
#include "../datastore/dict.h"
#include "../utils/logger.h"
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <sys/wait.h>

namespace kivadb {

#define RDB_VERSION 9
#define RDB_OPCODE_SELECTDB 0xFE
#define RDB_OPCODE_EOF 0xFF
#define RDB_OPCODE_EXPIRETIME 0xFD
#define RDB_OPCODE_EXPIRETIME_MS 0xFC

// 写入魔数
static int rdbWriteMagic(FILE* fp) {
    if (fwrite("KIVADB", 6, 1, fp) != 1) return RDB_ERR;
    return RDB_OK;
}

// 写入版本号
static int rdbWriteVersion(FILE* fp) {
    char version[4];
    version[0] = (RDB_VERSION & 0xFF);
    version[1] = ((RDB_VERSION >> 8) & 0xFF);
    version[2] = ((RDB_VERSION >> 16) & 0xFF);
    version[3] = ((RDB_VERSION >> 24) & 0xFF);
    if (fwrite(version, 4, 1, fp) != 1) return RDB_ERR;
    return RDB_OK;
}

// 保存RDB文件
int rdbSave(const std::string& filename, Dict* dict) {
    std::string tmpfile = filename + ".tmp";
    FILE* fp = fopen(tmpfile.c_str(), "wb");
    if (!fp) {
        LOG_ERROR("Failed to open RDB file: %s", tmpfile.c_str());
        return RDB_ERR;
    }

    // 写入魔数
    if (rdbWriteMagic(fp) != RDB_OK) goto error;

    // 写入版本号
    if (rdbWriteVersion(fp) != RDB_OK) goto error;

    // 写入数据库编号(这里简化,只写一个数据库)
    fputc(RDB_OPCODE_SELECTDB, fp);
    fputc(0, fp);  // DB 0

    // 遍历字典,写入所有键值对
    DictIterator* iter = dictGetIterator(dict);
    DictEntry* entry;
    while ((entry = dictNext(iter)) != nullptr) {
        // 这里简化,实际需要写入键值对数据
        // rdbSaveObject(fp, key, val);
    }
    dictReleaseIterator(iter);

    // 写入EOF标记
    fputc(RDB_OPCODE_EOF, fp);

    // 写入校验和(这里简化)
    fwrite("12345678", 8, 1, fp);

    fclose(fp);

    // 原子替换
    if (rename(tmpfile.c_str(), filename.c_str()) != 0) {
        LOG_ERROR("Failed to rename RDB file");
        return RDB_ERR;
    }

    LOG_INFO("RDB saved to %s", filename.c_str());
    return RDB_OK;

error:
    fclose(fp);
    unlink(tmpfile.c_str());
    return RDB_ERR;
}

// 后台保存(fork子进程)
int rdbSaveBackground(const std::string& filename, Dict* dict) {
    pid_t child_pid = fork();

    if (child_pid < 0) {
        LOG_ERROR("fork failed for RDB save");
        return RDB_ERR;
    }

    if (child_pid == 0) {
        // 子进程
        int ret = rdbSave(filename, dict);
        exit(ret == RDB_OK ? 0 : 1);
    }

    // 父进程
    LOG_INFO("RDB save forked with pid %d", child_pid);
    return RDB_OK;
}

} // namespace kivadb

来源:
https://hllft.cn/

相关文章
|
7天前
|
人工智能 数据可视化 安全
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
本文详解如何用阿里云Lighthouse一键部署OpenClaw,结合飞书CLI等工具,让AI真正“动手”——自动群发、生成科研日报、整理知识库。核心理念:未来软件应为AI而生,CLI即AI的“手脚”,实现高效、安全、可控的智能自动化。
34455 17
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
|
18天前
|
人工智能 JSON 机器人
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
本文带你零成本玩转OpenClaw:学生认证白嫖6个月阿里云服务器,手把手配置飞书机器人、接入免费/高性价比AI模型(NVIDIA/通义),并打造微信公众号“全自动分身”——实时抓热榜、AI选题拆解、一键发布草稿,5分钟完成热点→文章全流程!
45284 142
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
|
8天前
|
人工智能 JSON 监控
Claude Code 源码泄露:一份价值亿元的 AI 工程公开课
我以为顶级 AI 产品的护城河是模型。读完这 51.2 万行泄露的源码,我发现自己错了。
4834 20
|
1天前
|
人工智能 自然语言处理 安全
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
本文介绍了Claude Code终端AI助手的使用指南,主要内容包括:1)常用命令如版本查看、项目启动和更新;2)三种工作模式切换及界面说明;3)核心功能指令速查表,包含初始化、压缩对话、清除历史等操作;4)详细解析了/init、/help、/clear、/compact、/memory等关键命令的使用场景和语法。文章通过丰富的界面截图和场景示例,帮助开发者快速掌握如何通过命令行和交互界面高效使用Claude Code进行项目开发,特别强调了CLAUDE.md文件作为项目知识库的核心作用。
1670 5
Claude Code 全攻略:命令大全 + 实战工作流(建议收藏)
|
7天前
|
人工智能 API 开发者
阿里云百炼 Coding Plan 售罄、Lite 停售、Pro 抢不到?最新解决方案
阿里云百炼Coding Plan Lite已停售,Pro版每日9:30限量抢购难度大。本文解析原因,并提供两大方案:①掌握技巧抢购Pro版;②直接使用百炼平台按量付费——新用户赠100万Tokens,支持Qwen3.5-Max等满血模型,灵活低成本。
1734 5
阿里云百炼 Coding Plan 售罄、Lite 停售、Pro 抢不到?最新解决方案