C++企业项目实战(三)

简介: 教程来源 https://hllft.cn/category/artificial-intelligence.html KivaDB网络层基于epoll实现高性能事件循环,支持文件/时间事件调度与跨线程任务投递;TCP连接管理封装读写缓冲与状态机;持久化模块提供RDB快照功能,支持主进程保存与fork后台保存,具备魔数校验、版本标识及原子替换机制。

第四部分:网络层实现

4.1 事件循环(epoll封装)

// src/network/event_loop.h
#ifndef KIVADB_EVENT_LOOP_H
#define KIVADB_EVENT_LOOP_H

#include <functional>
#include <memory>
#include <vector>
#include <chrono>
#include <mutex>
#include <atomic>

namespace kivadb {

// 事件类型
enum EventType {
    EVENT_NONE = 0,
    EVENT_READABLE = 1,
    EVENT_WRITABLE = 2
};

// 事件回调函数类型
using EventCallback = std::function<void(int fd, int events, void* data)>;

// 时间事件回调函数类型
using TimeEventCallback = std::function<bool()>;

// 事件循环类
class EventLoop : public NonCopyable {
public:
    EventLoop();
    ~EventLoop();

    // 启动事件循环
    void loop();

    // 停止事件循环
    void stop();

    // 添加文件事件
    void addFileEvent(int fd, int events, EventCallback cb, void* data);

    // 修改文件事件
    void modifyFileEvent(int fd, int events);

    // 删除文件事件
    void removeFileEvent(int fd, int events);

    // 添加时间事件(毫秒)
    long long addTimeEvent(int milliseconds, TimeEventCallback cb);

    // 删除时间事件
    void removeTimeEvent(long long id);

    // 是否在当前线程运行
    bool isInLoopThread() const { return thread_id_ == std::this_thread::get_id(); }

    // 断言在当前线程
    void assertInLoopThread() { assert(isInLoopThread()); }

    // 跨线程调用
    void runInLoop(std::function<void()> cb);
    void queueInLoop(std::function<void()> cb);

private:
    void updateFileEvent(int fd, int events);
    void wakeup();
    void handleWakeup();
    void doPendingFunctors();

    int epoll_fd_;
    int wakeup_fd_;
    std::atomic<bool> stop_;
    std::thread::id thread_id_;

    struct FileEvent {
        int fd;
        int events;
        EventCallback read_callback;
        EventCallback write_callback;
        void* data;
    };

    std::vector<FileEvent> file_events_;
    std::vector<struct epoll_event> epoll_events_;

    struct TimeEvent {
        long long id;
        std::chrono::steady_clock::time_point when;
        TimeEventCallback callback;
    };

    std::vector<TimeEvent> time_events_;
    long long next_time_event_id_;
    std::mutex mutex_;
    std::vector<std::function<void()>> pending_functors_;
};

} // namespace kivadb

#endif // KIVADB_EVENT_LOOP_H
// src/network/event_loop.cpp
#include "event_loop.h"
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <unistd.h>
#include <cassert>
#include <algorithm>

namespace kivadb {

EventLoop::EventLoop()
    : stop_(false)
    , thread_id_(std::this_thread::get_id())
    , next_time_event_id_(0) {

    // 创建epoll实例
    epoll_fd_ = epoll_create1(EPOLL_CLOEXEC);
    if (epoll_fd_ == -1) {
        LOG_FATAL("Failed to create epoll: %s", strerror(errno));
    }

    // 创建eventfd用于唤醒事件循环
    wakeup_fd_ = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (wakeup_fd_ == -1) {
        LOG_FATAL("Failed to create eventfd: %s", strerror(errno));
    }

    // 添加wakeup_fd到epoll
    addFileEvent(wakeup_fd_, EVENT_READABLE, 
        [this](int fd, int events, void* data) { this->handleWakeup(); }, nullptr);
}

EventLoop::~EventLoop() {
    close(epoll_fd_);
    close(wakeup_fd_);
}

void EventLoop::loop() {
    assertInLoopThread();
    LOG_INFO("EventLoop started");

    while (!stop_) {
        // 计算下一个时间事件的超时时间
        int timeout = -1;
        if (!time_events_.empty()) {
            auto now = std::chrono::steady_clock::now();
            auto next = time_events_[0].when;
            auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(next - now);
            timeout = diff.count() > 0 ? diff.count() : 0;
        }

        // 等待事件
        int num_events = epoll_wait(epoll_fd_, epoll_events_.data(), 
                                     epoll_events_.size(), timeout);

        if (num_events < 0) {
            if (errno != EINTR) {
                LOG_ERROR("epoll_wait error: %s", strerror(errno));
            }
            continue;
        }

        // 处理文件事件
        for (int i = 0; i < num_events; ++i) {
            int fd = epoll_events_[i].data.fd;
            int events = 0;
            if (epoll_events_[i].events & EPOLLIN) events |= EVENT_READABLE;
            if (epoll_events_[i].events & EPOLLOUT) events |= EVENT_WRITABLE;

            auto& fe = file_events_[fd];
            if (fe.events & EVENT_READABLE && events & EVENT_READABLE) {
                fe.read_callback(fd, events, fe.data);
            }
            if (fe.events & EVENT_WRITABLE && events & EVENT_WRITABLE) {
                fe.write_callback(fd, events, fe.data);
            }
        }

        // 处理时间事件
        auto now = std::chrono::steady_clock::now();
        for (auto it = time_events_.begin(); it != time_events_.end(); ) {
            if (it->when <= now) {
                bool should_continue = it->callback();
                if (!should_continue) {
                    it = time_events_.erase(it);
                } else {
                    ++it;
                }
            } else {
                ++it;
            }
        }

        // 执行跨线程任务
        doPendingFunctors();
    }

    LOG_INFO("EventLoop stopped");
}

void EventLoop::addFileEvent(int fd, int events, EventCallback cb, void* data) {
    assertInLoopThread();

    if (fd >= (int)file_events_.size()) {
        file_events_.resize(fd + 1);
    }

    auto& fe = file_events_[fd];
    fe.fd = fd;
    fe.events |= events;
    if (events & EVENT_READABLE) {
        fe.read_callback = std::move(cb);
    }
    if (events & EVENT_WRITABLE) {
        fe.write_callback = std::move(cb);
    }
    fe.data = data;

    updateFileEvent(fd, fe.events);
}

void EventLoop::updateFileEvent(int fd, int events) {
    struct epoll_event ev;
    ev.events = 0;
    if (events & EVENT_READABLE) ev.events |= EPOLLIN;
    if (events & EVENT_WRITABLE) ev.events |= EPOLLOUT;
    ev.data.fd = fd;

    if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev) == -1) {
        if (errno == ENOENT) {
            epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
        }
    }
}

void EventLoop::removeFileEvent(int fd, int events) {
    assertInLoopThread();

    auto& fe = file_events_[fd];
    fe.events &= ~events;

    if (fe.events == 0) {
        epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
    } else {
        updateFileEvent(fd, fe.events);
    }
}

long long EventLoop::addTimeEvent(int milliseconds, TimeEventCallback cb) {
    assertInLoopThread();

    long long id = ++next_time_event_id_;
    auto when = std::chrono::steady_clock::now() + std::chrono::milliseconds(milliseconds);

    time_events_.push_back({id, when, std::move(cb)});
    std::sort(time_events_.begin(), time_events_.end(),
        [](const TimeEvent& a, const TimeEvent& b) { return a.when < b.when; });

    return id;
}

void EventLoop::queueInLoop(std::function<void()> cb) {
    {
        std::lock_guard<std::mutex> lock(mutex_);
        pending_functors_.push_back(std::move(cb));
    }

    // 如果不是当前线程,唤醒事件循环
    if (!isInLoopThread()) {
        wakeup();
    }
}

void EventLoop::wakeup() {
    uint64_t one = 1;
    ssize_t n = write(wakeup_fd_, &one, sizeof(one));
    (void)n; // 忽略返回值
}

void EventLoop::handleWakeup() {
    uint64_t one;
    ssize_t n = read(wakeup_fd_, &one, sizeof(one));
    (void)n;
}

void EventLoop::doPendingFunctors() {
    std::vector<std::function<void()>> functors;
    {
        std::lock_guard<std::mutex> lock(mutex_);
        functors.swap(pending_functors_);
    }

    for (auto& f : functors) {
        f();
    }
}

} // namespace kivadb

4.2 TCP连接管理

// src/network/connection.h
#ifndef KIVADB_CONNECTION_H
#define KIVADB_CONNECTION_H

#include <memory>
#include <string>
#include <vector>
#include <functional>

namespace kivadb {

class EventLoop;

// 连接状态
enum ConnectionState {
    CONN_STATE_CONNECTING,
    CONN_STATE_CONNECTED,
    CONN_STATE_DISCONNECTING,
    CONN_STATE_DISCONNECTED
};

// 连接类
class Connection : public std::enable_shared_from_this<Connection> {
public:
    using Ptr = std::shared_ptr<Connection>;
    using CloseCallback = std::function<void(const Ptr&)>;
    using MessageCallback = std::function<void(const Ptr&, const char* data, size_t len)>;

    Connection(EventLoop* loop, int fd, const std::string& peer_addr);
    ~Connection();

    // 连接操作
    void send(const std::string& data);
    void send(const char* data, size_t len);
    void shutdown();
    void close();

    // 获取信息
    int fd() const { return fd_; }
    const std::string& peerAddr() const { return peer_addr_; }
    ConnectionState state() const { return state_; }

    // 设置回调
    void setCloseCallback(CloseCallback cb) { close_callback_ = std::move(cb); }
    void setMessageCallback(MessageCallback cb) { message_callback_ = std::move(cb); }

private:
    void handleRead();
    void handleWrite();
    void handleError();
    void sendInLoop(const char* data, size_t len);

    EventLoop* loop_;
    int fd_;
    std::string peer_addr_;
    ConnectionState state_;

    std::vector<char> input_buffer_;
    std::vector<char> output_buffer_;

    CloseCallback close_callback_;
    MessageCallback message_callback_;
};

} // namespace kivadb

#endif // KIVADB_CONNECTION_H

第五部分:持久化模块

5.1 RDB快照实现

// src/persistence/rdb.h
#ifndef KIVADB_RDB_H
#define KIVADB_RDB_H

#include <string>

namespace kivadb {

class Dict;

// RDB保存结果
enum RdbSaveResult {
    RDB_OK = 0,
    RDB_ERR = -1
};

// RDB加载结果
enum RdbLoadResult {
    RDB_LOAD_OK = 0,
    RDB_LOAD_ERR = -1,
    RDB_LOAD_EOF = -2
};

// 保存RDB快照
int rdbSave(const std::string& filename, Dict* dict);

// 加载RDB快照
int rdbLoad(const std::string& filename, Dict* dict);

// 后台保存RDB
int rdbSaveBackground(const std::string& filename, Dict* dict);

} // namespace kivadb

#endif // KIVADB_RDB_H
// src/persistence/rdb.cpp
#include "rdb.h"
#include "../datastore/dict.h"
#include "../utils/logger.h"
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <sys/wait.h>

namespace kivadb {

#define RDB_VERSION 9
#define RDB_OPCODE_SELECTDB 0xFE
#define RDB_OPCODE_EOF 0xFF
#define RDB_OPCODE_EXPIRETIME 0xFD
#define RDB_OPCODE_EXPIRETIME_MS 0xFC

// 写入魔数
static int rdbWriteMagic(FILE* fp) {
    if (fwrite("KIVADB", 6, 1, fp) != 1) return RDB_ERR;
    return RDB_OK;
}

// 写入版本号
static int rdbWriteVersion(FILE* fp) {
    char version[4];
    version[0] = (RDB_VERSION & 0xFF);
    version[1] = ((RDB_VERSION >> 8) & 0xFF);
    version[2] = ((RDB_VERSION >> 16) & 0xFF);
    version[3] = ((RDB_VERSION >> 24) & 0xFF);
    if (fwrite(version, 4, 1, fp) != 1) return RDB_ERR;
    return RDB_OK;
}

// 保存RDB文件
int rdbSave(const std::string& filename, Dict* dict) {
    std::string tmpfile = filename + ".tmp";
    FILE* fp = fopen(tmpfile.c_str(), "wb");
    if (!fp) {
        LOG_ERROR("Failed to open RDB file: %s", tmpfile.c_str());
        return RDB_ERR;
    }

    // 写入魔数
    if (rdbWriteMagic(fp) != RDB_OK) goto error;

    // 写入版本号
    if (rdbWriteVersion(fp) != RDB_OK) goto error;

    // 写入数据库编号(这里简化,只写一个数据库)
    fputc(RDB_OPCODE_SELECTDB, fp);
    fputc(0, fp);  // DB 0

    // 遍历字典,写入所有键值对
    DictIterator* iter = dictGetIterator(dict);
    DictEntry* entry;
    while ((entry = dictNext(iter)) != nullptr) {
        // 这里简化,实际需要写入键值对数据
        // rdbSaveObject(fp, key, val);
    }
    dictReleaseIterator(iter);

    // 写入EOF标记
    fputc(RDB_OPCODE_EOF, fp);

    // 写入校验和(这里简化)
    fwrite("12345678", 8, 1, fp);

    fclose(fp);

    // 原子替换
    if (rename(tmpfile.c_str(), filename.c_str()) != 0) {
        LOG_ERROR("Failed to rename RDB file");
        return RDB_ERR;
    }

    LOG_INFO("RDB saved to %s", filename.c_str());
    return RDB_OK;

error:
    fclose(fp);
    unlink(tmpfile.c_str());
    return RDB_ERR;
}

// 后台保存(fork子进程)
int rdbSaveBackground(const std::string& filename, Dict* dict) {
    pid_t child_pid = fork();

    if (child_pid < 0) {
        LOG_ERROR("fork failed for RDB save");
        return RDB_ERR;
    }

    if (child_pid == 0) {
        // 子进程
        int ret = rdbSave(filename, dict);
        exit(ret == RDB_OK ? 0 : 1);
    }

    // 父进程
    LOG_INFO("RDB save forked with pid %d", child_pid);
    return RDB_OK;
}

} // namespace kivadb

来源:
https://hllft.cn/

相关文章
|
2月前
|
SQL JSON 缓存
别再用过时的地区数据了!闸北区都消失了,教你一次性搞定省市区同步更新!(附实战源码)
本文记录了更新地区表至最新行政区划的完整实践:从权威数据源获取2024年省市区数据,通过Python脚本实现新旧表(省-市-区三级)精准比对,支持新增、软删除及层级关系维护,并附详细代码与分步缓存策略,兼顾准确性、安全性和可追溯性。(239字)
187 5
|
20天前
|
人工智能 弹性计算 安全
2026年阿里云入门级云服务器特惠价格:2核2G38元1年、99元1年,2核4G9.9元1个月、199元1年
阿里云2026年推出四款特惠云服务器,覆盖从个人到企业的多元需求。轻量应用服务器2核2G抢购价仅38元/年,内置WordPress、OpenClaw等镜像,开箱即用,适合个人开发者与学生;2核4G版本9.9元/月起,可一键部署AI助理。经济型e实例99元/年、通用算力型u1实例199元/年,均享"新购续费同价"政策,有效解决后续涨价顾虑。其中e实例不限新老用户,u1实例面向企业用户,活动持续至2027年3月31日。此外还有u2a实例2.5折、九代实例6.4折及百炼Token Plan、JVS Claw等AI产品优惠。
|
2月前
|
人工智能 JavaScript 安全
阿里云部署 Hermes Agent / OpenClaw 及Skill技能选型生态完全指南,打造专属AI操作系统
很多用户部署OpenClaw(原Clawdbot)后都会有这样的困惑:"这款工具看似强大,实际用起来却没发挥出预期价值"。其实答案很简单——OpenClaw的核心潜力不在本体,而在其Skills生态。就像操作系统的价值源于丰富的应用程序,OpenClaw作为"本地AI操作系统",需要通过安装Skills(功能插件)才能解锁自动化办公、开发辅助、生活服务等多元场景。本文将从生态认知、阿里云快速部署、Skills获取渠道、精选技能推荐、安装管理等维度,全方位解析OpenClaw Skills的使用逻辑,搭配可直接复制的代码命令与避坑指南,帮助用户快速构建专属AI能力矩阵
363 3
|
26天前
|
缓存 网络协议 测试技术
【免费CDN】阿里云ESA免费版配置,10分钟搞定
阿里云ESA免费版0元开通!含CDN加速、DDoS防护、WAF拦截、Bot管理及HTTPS支持,适合个人站与测试环境。6步完成:领额度→加站点→选免费版→配源站→改DNS→验证生效,全程无需付费。
【免费CDN】阿里云ESA免费版配置,10分钟搞定
|
2月前
|
前端开发 开发者
前端组件库——Radix UI知识点大全(二)
教程来源 http://yvyus.cn/ Radix UI提供50+无样式、高可访问性React原语组件,如Dialog、DropdownMenu、Popover等,内置ARIA支持、键盘导航与焦点管理,专注交互逻辑,样式完全由开发者掌控。
|
23天前
|
人工智能 自然语言处理 BI
用办公Agent接管Excel苦力活:跨表匹配、格式清洗、自动图表生成
本文揭秘如何用AI办公Agent自动化处理Excel月度报表:15分钟搞定跨表匹配(模糊+精确双策略)、智能清洗(日期/数字/空白全覆盖)、自动绘图(配色+标题+标签)。告别VLOOKUP、分列、手动调图,让重复劳动归零——真正的效率革命,始于教会机器做脏活。
193 4
|
2月前
|
人工智能 安全 Linux
阿里云OpenClaw镜像太香了!开箱即用、内置Skills、分钟级部署AI龙虾助理
阿里云OpenClaw镜像基于Alibaba Cloud Linux 3 LTS,开箱即用,阿里云龙虾官方部署链接:https://t.aliyun.com/U/vFiCUH 预装Docker、VS Code等工具及agent-browser等增强Skills,支持钉钉/飞书等多渠道集成与qwen3.5-plus等多模态模型,分钟级部署本地AI助理,安全稳定、计费可控。
261 4
|
16天前
|
人工智能 Linux API
全平台零门槛:Win11、Mac、Linux 通用 Hermes Agent 安装教程
Hermes Agent是Nous Research开源的自进化AI助手(MIT协议),越用越懂你。支持多工具并行、自动记忆习惯,Python编写,v0.13.0版。兼容Win/macOS/Linux/Docker,国内用户可配清华镜像快速部署,需API密钥(如Kimi)。
|
2月前
|
Java 测试技术 C++
C++企业项目实战(四)
教程来源 https://hllft.cn/category/software-apps.html 本文详述KivaDB分布式KV存储系统的C++企业级实现:涵盖线程池(基于任务队列与条件变量)、主程序入口(信号处理与配置加载)、Google Test单元测试、Docker容器化部署及运维编排,突出内存安全、并发控制与性能优化实践。

热门文章

最新文章