第四部分:网络层实现
4.1 事件循环(epoll封装)
// src/network/event_loop.h
#ifndef KIVADB_EVENT_LOOP_H
#define KIVADB_EVENT_LOOP_H
#include <functional>
#include <memory>
#include <vector>
#include <chrono>
#include <mutex>
#include <atomic>
namespace kivadb {
// 事件类型
enum EventType {
EVENT_NONE = 0,
EVENT_READABLE = 1,
EVENT_WRITABLE = 2
};
// 事件回调函数类型
using EventCallback = std::function<void(int fd, int events, void* data)>;
// 时间事件回调函数类型
using TimeEventCallback = std::function<bool()>;
// 事件循环类
class EventLoop : public NonCopyable {
public:
EventLoop();
~EventLoop();
// 启动事件循环
void loop();
// 停止事件循环
void stop();
// 添加文件事件
void addFileEvent(int fd, int events, EventCallback cb, void* data);
// 修改文件事件
void modifyFileEvent(int fd, int events);
// 删除文件事件
void removeFileEvent(int fd, int events);
// 添加时间事件(毫秒)
long long addTimeEvent(int milliseconds, TimeEventCallback cb);
// 删除时间事件
void removeTimeEvent(long long id);
// 是否在当前线程运行
bool isInLoopThread() const { return thread_id_ == std::this_thread::get_id(); }
// 断言在当前线程
void assertInLoopThread() { assert(isInLoopThread()); }
// 跨线程调用
void runInLoop(std::function<void()> cb);
void queueInLoop(std::function<void()> cb);
private:
void updateFileEvent(int fd, int events);
void wakeup();
void handleWakeup();
void doPendingFunctors();
int epoll_fd_;
int wakeup_fd_;
std::atomic<bool> stop_;
std::thread::id thread_id_;
struct FileEvent {
int fd;
int events;
EventCallback read_callback;
EventCallback write_callback;
void* data;
};
std::vector<FileEvent> file_events_;
std::vector<struct epoll_event> epoll_events_;
struct TimeEvent {
long long id;
std::chrono::steady_clock::time_point when;
TimeEventCallback callback;
};
std::vector<TimeEvent> time_events_;
long long next_time_event_id_;
std::mutex mutex_;
std::vector<std::function<void()>> pending_functors_;
};
} // namespace kivadb
#endif // KIVADB_EVENT_LOOP_H
// src/network/event_loop.cpp
#include "event_loop.h"
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <unistd.h>
#include <cassert>
#include <algorithm>
namespace kivadb {
EventLoop::EventLoop()
: stop_(false)
, thread_id_(std::this_thread::get_id())
, next_time_event_id_(0) {
// 创建epoll实例
epoll_fd_ = epoll_create1(EPOLL_CLOEXEC);
if (epoll_fd_ == -1) {
LOG_FATAL("Failed to create epoll: %s", strerror(errno));
}
// 创建eventfd用于唤醒事件循环
wakeup_fd_ = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (wakeup_fd_ == -1) {
LOG_FATAL("Failed to create eventfd: %s", strerror(errno));
}
// 添加wakeup_fd到epoll
addFileEvent(wakeup_fd_, EVENT_READABLE,
[this](int fd, int events, void* data) { this->handleWakeup(); }, nullptr);
}
EventLoop::~EventLoop() {
close(epoll_fd_);
close(wakeup_fd_);
}
void EventLoop::loop() {
assertInLoopThread();
LOG_INFO("EventLoop started");
while (!stop_) {
// 计算下一个时间事件的超时时间
int timeout = -1;
if (!time_events_.empty()) {
auto now = std::chrono::steady_clock::now();
auto next = time_events_[0].when;
auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(next - now);
timeout = diff.count() > 0 ? diff.count() : 0;
}
// 等待事件
int num_events = epoll_wait(epoll_fd_, epoll_events_.data(),
epoll_events_.size(), timeout);
if (num_events < 0) {
if (errno != EINTR) {
LOG_ERROR("epoll_wait error: %s", strerror(errno));
}
continue;
}
// 处理文件事件
for (int i = 0; i < num_events; ++i) {
int fd = epoll_events_[i].data.fd;
int events = 0;
if (epoll_events_[i].events & EPOLLIN) events |= EVENT_READABLE;
if (epoll_events_[i].events & EPOLLOUT) events |= EVENT_WRITABLE;
auto& fe = file_events_[fd];
if (fe.events & EVENT_READABLE && events & EVENT_READABLE) {
fe.read_callback(fd, events, fe.data);
}
if (fe.events & EVENT_WRITABLE && events & EVENT_WRITABLE) {
fe.write_callback(fd, events, fe.data);
}
}
// 处理时间事件
auto now = std::chrono::steady_clock::now();
for (auto it = time_events_.begin(); it != time_events_.end(); ) {
if (it->when <= now) {
bool should_continue = it->callback();
if (!should_continue) {
it = time_events_.erase(it);
} else {
++it;
}
} else {
++it;
}
}
// 执行跨线程任务
doPendingFunctors();
}
LOG_INFO("EventLoop stopped");
}
void EventLoop::addFileEvent(int fd, int events, EventCallback cb, void* data) {
assertInLoopThread();
if (fd >= (int)file_events_.size()) {
file_events_.resize(fd + 1);
}
auto& fe = file_events_[fd];
fe.fd = fd;
fe.events |= events;
if (events & EVENT_READABLE) {
fe.read_callback = std::move(cb);
}
if (events & EVENT_WRITABLE) {
fe.write_callback = std::move(cb);
}
fe.data = data;
updateFileEvent(fd, fe.events);
}
void EventLoop::updateFileEvent(int fd, int events) {
struct epoll_event ev;
ev.events = 0;
if (events & EVENT_READABLE) ev.events |= EPOLLIN;
if (events & EVENT_WRITABLE) ev.events |= EPOLLOUT;
ev.data.fd = fd;
if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev) == -1) {
if (errno == ENOENT) {
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
}
}
}
void EventLoop::removeFileEvent(int fd, int events) {
assertInLoopThread();
auto& fe = file_events_[fd];
fe.events &= ~events;
if (fe.events == 0) {
epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
} else {
updateFileEvent(fd, fe.events);
}
}
long long EventLoop::addTimeEvent(int milliseconds, TimeEventCallback cb) {
assertInLoopThread();
long long id = ++next_time_event_id_;
auto when = std::chrono::steady_clock::now() + std::chrono::milliseconds(milliseconds);
time_events_.push_back({id, when, std::move(cb)});
std::sort(time_events_.begin(), time_events_.end(),
[](const TimeEvent& a, const TimeEvent& b) { return a.when < b.when; });
return id;
}
void EventLoop::queueInLoop(std::function<void()> cb) {
{
std::lock_guard<std::mutex> lock(mutex_);
pending_functors_.push_back(std::move(cb));
}
// 如果不是当前线程,唤醒事件循环
if (!isInLoopThread()) {
wakeup();
}
}
void EventLoop::wakeup() {
uint64_t one = 1;
ssize_t n = write(wakeup_fd_, &one, sizeof(one));
(void)n; // 忽略返回值
}
void EventLoop::handleWakeup() {
uint64_t one;
ssize_t n = read(wakeup_fd_, &one, sizeof(one));
(void)n;
}
void EventLoop::doPendingFunctors() {
std::vector<std::function<void()>> functors;
{
std::lock_guard<std::mutex> lock(mutex_);
functors.swap(pending_functors_);
}
for (auto& f : functors) {
f();
}
}
} // namespace kivadb
4.2 TCP连接管理
// src/network/connection.h
#ifndef KIVADB_CONNECTION_H
#define KIVADB_CONNECTION_H
#include <memory>
#include <string>
#include <vector>
#include <functional>
namespace kivadb {
class EventLoop;
// 连接状态
enum ConnectionState {
CONN_STATE_CONNECTING,
CONN_STATE_CONNECTED,
CONN_STATE_DISCONNECTING,
CONN_STATE_DISCONNECTED
};
// 连接类
class Connection : public std::enable_shared_from_this<Connection> {
public:
using Ptr = std::shared_ptr<Connection>;
using CloseCallback = std::function<void(const Ptr&)>;
using MessageCallback = std::function<void(const Ptr&, const char* data, size_t len)>;
Connection(EventLoop* loop, int fd, const std::string& peer_addr);
~Connection();
// 连接操作
void send(const std::string& data);
void send(const char* data, size_t len);
void shutdown();
void close();
// 获取信息
int fd() const { return fd_; }
const std::string& peerAddr() const { return peer_addr_; }
ConnectionState state() const { return state_; }
// 设置回调
void setCloseCallback(CloseCallback cb) { close_callback_ = std::move(cb); }
void setMessageCallback(MessageCallback cb) { message_callback_ = std::move(cb); }
private:
void handleRead();
void handleWrite();
void handleError();
void sendInLoop(const char* data, size_t len);
EventLoop* loop_;
int fd_;
std::string peer_addr_;
ConnectionState state_;
std::vector<char> input_buffer_;
std::vector<char> output_buffer_;
CloseCallback close_callback_;
MessageCallback message_callback_;
};
} // namespace kivadb
#endif // KIVADB_CONNECTION_H
第五部分:持久化模块
5.1 RDB快照实现
// src/persistence/rdb.h
#ifndef KIVADB_RDB_H
#define KIVADB_RDB_H
#include <string>
namespace kivadb {
class Dict;
// RDB保存结果
enum RdbSaveResult {
RDB_OK = 0,
RDB_ERR = -1
};
// RDB加载结果
enum RdbLoadResult {
RDB_LOAD_OK = 0,
RDB_LOAD_ERR = -1,
RDB_LOAD_EOF = -2
};
// 保存RDB快照
int rdbSave(const std::string& filename, Dict* dict);
// 加载RDB快照
int rdbLoad(const std::string& filename, Dict* dict);
// 后台保存RDB
int rdbSaveBackground(const std::string& filename, Dict* dict);
} // namespace kivadb
#endif // KIVADB_RDB_H
// src/persistence/rdb.cpp
#include "rdb.h"
#include "../datastore/dict.h"
#include "../utils/logger.h"
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <sys/wait.h>
namespace kivadb {
#define RDB_VERSION 9
#define RDB_OPCODE_SELECTDB 0xFE
#define RDB_OPCODE_EOF 0xFF
#define RDB_OPCODE_EXPIRETIME 0xFD
#define RDB_OPCODE_EXPIRETIME_MS 0xFC
// 写入魔数
static int rdbWriteMagic(FILE* fp) {
if (fwrite("KIVADB", 6, 1, fp) != 1) return RDB_ERR;
return RDB_OK;
}
// 写入版本号
static int rdbWriteVersion(FILE* fp) {
char version[4];
version[0] = (RDB_VERSION & 0xFF);
version[1] = ((RDB_VERSION >> 8) & 0xFF);
version[2] = ((RDB_VERSION >> 16) & 0xFF);
version[3] = ((RDB_VERSION >> 24) & 0xFF);
if (fwrite(version, 4, 1, fp) != 1) return RDB_ERR;
return RDB_OK;
}
// 保存RDB文件
int rdbSave(const std::string& filename, Dict* dict) {
std::string tmpfile = filename + ".tmp";
FILE* fp = fopen(tmpfile.c_str(), "wb");
if (!fp) {
LOG_ERROR("Failed to open RDB file: %s", tmpfile.c_str());
return RDB_ERR;
}
// 写入魔数
if (rdbWriteMagic(fp) != RDB_OK) goto error;
// 写入版本号
if (rdbWriteVersion(fp) != RDB_OK) goto error;
// 写入数据库编号(这里简化,只写一个数据库)
fputc(RDB_OPCODE_SELECTDB, fp);
fputc(0, fp); // DB 0
// 遍历字典,写入所有键值对
DictIterator* iter = dictGetIterator(dict);
DictEntry* entry;
while ((entry = dictNext(iter)) != nullptr) {
// 这里简化,实际需要写入键值对数据
// rdbSaveObject(fp, key, val);
}
dictReleaseIterator(iter);
// 写入EOF标记
fputc(RDB_OPCODE_EOF, fp);
// 写入校验和(这里简化)
fwrite("12345678", 8, 1, fp);
fclose(fp);
// 原子替换
if (rename(tmpfile.c_str(), filename.c_str()) != 0) {
LOG_ERROR("Failed to rename RDB file");
return RDB_ERR;
}
LOG_INFO("RDB saved to %s", filename.c_str());
return RDB_OK;
error:
fclose(fp);
unlink(tmpfile.c_str());
return RDB_ERR;
}
// 后台保存(fork子进程)
int rdbSaveBackground(const std::string& filename, Dict* dict) {
pid_t child_pid = fork();
if (child_pid < 0) {
LOG_ERROR("fork failed for RDB save");
return RDB_ERR;
}
if (child_pid == 0) {
// 子进程
int ret = rdbSave(filename, dict);
exit(ret == RDB_OK ? 0 : 1);
}
// 父进程
LOG_INFO("RDB save forked with pid %d", child_pid);
return RDB_OK;
}
} // namespace kivadb