本节开始,逐步实现基于内存的kv存储引擎。
一、项目主要功能和知识点
参照redis,主要实现的功能:
1、数据的插入、查询、删除等操作
1)SET:插入key - value
2)GET:获取key对应的value
3)COUNT:统计已插入多少个key
4)DELETE:删除key以及对应的value
5)EXIST:判断key是否存在
2、实现不同的数据结构存储引擎
不同的数据结构,操作的效率是不一样的。因此我们实现基于数组array、红黑树rbtree、哈希hash、跳表skiptable 这四种数据结构,实现kv存储,并测试相应的性能。
3、测试用例
1)功能测试
2)10w的qps测试
主要涉及的知识点有:
1)基于协程,一个连接对应一个协程
2)tcp网络交互
3)数据结构:数组array、红黑树rbtree、哈希hash、跳表skiptable
二、架构设计
三、具体实现
我们先实现简单的,基于数组array的kv存储引擎。
3.1 流程
以 SET NAME ZXM 为例,大致介绍一下流程:
1)客户端发送 SET NAME ZXM
2)服务器接收 SET NAME ZXM,进入解析流程
3)根据协议进程解析:
∘ 按空格拆分命令,存储到toekns[]。 tokens[0] = SET,tokens[1] = NAME, tokens[2]: ZXM
∘ 根据tokens[0],解析出对应的数据结构存储引擎,即数组。以及对应的操作命令,即插入。
∘ 根据key、value执行相应的操作命令,即插入新的key,value。
∘ 返回操作结果
4)解析完成,将操作结果返回给客户端。
5)客户端根据接收到的结果判断是否操作成功。
3.3 引擎层
typedef struct kvs_array_item_s { char *key; char *value; } kvs_array_item_t; // kvs_array_table 存储插入的 kv kvs_array_item_t kvs_array_table [KVS_ARRAY_ITEM_SIEZ] = {0};
3.4 接口层
// 查找 key 在 kvs_array_table 的位置 kvs_array_item_t *kvs_array_search_item (char *key); // KVS_CMD_EXIST: 判断 key 是否存在,存在返回 1 int kvs_array_exist (char *key); // KVS_CMD_SET:插入 kv int kvs_array_set (char *key, char *value); // KVS_CMD_GET:获取 key 对应的value char *kvs_array_get(char *key); // KVS_CMD_COUNT:统计以及插入多少个 key int kvs_array_count (void) ; // KVS_CMD_DELETE:删除 key int kvs_array_delete(char *key);
3.5 协议层
// 根据msg,解析其具体的命令协议 int kvs_parser_protocol (char *msg, char **tokens, int count) ; /*分割msg,比如 msg为 SET NAME ZXM ,分割为SET,NAME,ZXM,分别存储在tokens[] * tokens[0]: SET --------- 对应的是命令 cmd * tokens[1]: NAME --------- 对应的是命令 key * tokens[2]: ZXM --------- 对应的是命令 value */ int kvs_spilt_tokens (char **tokens, char *msg) ; // 解析协议 int kvs_protocol (char *msg, int length);
3.6 网络层
// 为每一个连接端口创建一个协程 nty_coroutine_create(&co, server, port);
四、具体代码
4.1 kvstore.c
// gcc -o kvstore1 kvstore1.c -I ./NtyCo/core/ -L ./NtyCo/ -lntyco -lpthread -ldl #include "nty_coroutine.h" #include <arpa/inet.h> #define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) // array: set, get, count, delete, exist // rbtree: rset, rget, rcount, rdelete, rexist // hash: hset, hget, hcount, hdelete, hexist // skiptable: zset, zget, zcount, zdelete, zexist //--------------------------------------------------------------------------------------------- //-------------------------------------- 接口层 ----------------------------------------- typedef enum kvs_cmd_e { KVS_CMD_START = 0, // array KVS_CMD_SET = KVS_CMD_START, KVS_CMD_GET, KVS_CMD_COUNT, KVS_CMD_DELETE, KVS_CMD_EXIST, // rbtree KVS_CMD_RSET, KVS_CMD_RGET, KVS_CMD_RCOUNT, KVS_CMD_RDELETE, KVS_CMD_REXIST, // hash KVS_CMD_HSET, KVS_CMD_HGET, KVS_CMD_HCOUNT, KVS_CMD_HDELETE, KVS_CMD_HEXIST, // skiptable KVS_CMD_ZSET, KVS_CMD_ZGET, KVS_CMD_ZCOUNT, KVS_CMD_ZDELETE, KVS_CMD_ZEXIST, KVS_CMD_END = KVS_CMD_ZEXIST, // 格式出错 KVS_CMD_ERROR, // 断开 KVS_CMD_QUIT, } kvs_cmd_t; // 命令集 const char *commands[] = { "SET", "GET", "COUNT", "DELETE", "EXIST", "RSET", "RGET", "RCOUNT", "RDELETE", "REXIST", "HSET", "HGET", "HCOUNT", "HDELETE", "HEXIST", "ZSET", "ZGET", "ZCOUNT", "ZDELETE", "ZEXIST" }; // 为了以后优化的可能,把内存开辟释放封装 void *kvs_malloc(size_t size) { return malloc(size); } void kvs_free(void *ptr) { return free(ptr); } //-------------------------------------- array ----------------------------------------- #define KVS_ARRAY_ITEM_SIEZ 1024 typedef struct kvs_array_item_s { char *key; char *value; } kvs_array_item_t; // kvs_array_table 存储插入的 kv kvs_array_item_t kvs_array_table [KVS_ARRAY_ITEM_SIEZ] = {0}; // 查找 key 在 kvs_array_table 的位置 kvs_array_item_t *kvs_array_search_item (char *key){ if (!key) return NULL; int i = 0; // 注意由于前面的 key 被删除而出现的 key == NULL , strcmp 不能与 NULL 比较 for (i = 0;i < KVS_ARRAY_ITEM_SIEZ; i++) { if (kvs_array_table[i].key != NULL && strcmp(kvs_array_table[i].key , key) == 0) { return &kvs_array_table[i]; } } return NULL; } // KVS_CMD_EXIST: 判断 key 是否存在,存在返回 1 int kvs_array_exist (char *key) { if (kvs_array_search_item(key) != NULL) return 1; } // KVS_CMD_SET:插入 kv, 成功返回 0 int array_count = 0; // 已插入元素的个数 int kvs_array_set (char *key, char *value) { if (key == NULL || value == NULL || array_count == KVS_ARRAY_ITEM_SIEZ - 1) { return -1; } // key 已存在,不能插入 if (kvs_array_exist(key)) { return -1; } char *kvs_key = kvs_malloc(strlen(key) + 1); if (kvs_key == NULL) return -1; strncpy(kvs_key, key, strlen(key) + 1); char *kvs_value = kvs_malloc(strlen(value) + 1); if (kvs_value == NULL) { free(kvs_key); return -1; } strncpy(kvs_value, value, strlen(value) + 1); int i = 0; for (i = 0;i < KVS_ARRAY_ITEM_SIEZ; i++) { if (kvs_array_table[i].key == NULL && kvs_array_table[i].value == 0) { break; } } kvs_array_table[i].key = kvs_key; kvs_array_table[i].value = kvs_value; array_count++; return 0; } // KVS_CMD_GET:获取 key 对应的value char *kvs_array_get(char *key) { kvs_array_item_t * item = kvs_array_search_item(key); if (item) { return item->value; } return NULL; } // KVS_CMD_COUNT:统计以及插入多少个 key int kvs_array_count (void) { return array_count; } // KVS_CMD_DELETE:删除 key int kvs_array_delete(char *key) { if (key == NULL) return -1; kvs_array_item_t * item = kvs_array_search_item(key); if (item == NULL) { return -1; } if (item->key) { kvs_free(item->key); item->key = NULL; } if (item->value) { kvs_free(item->value); item->value = NULL; } array_count--; return 0; } //--------------------------------------------------------------------------------------------- //-------------------------------------- 协议层 ----------------------------------------- #define MAX_TOKENS 32 #define CLINET_MSG_LENGTH 1024 // client发送msg的最大长度 // 根据msg,解析其具体的命令协议 int kvs_parser_protocol (char *msg, char **tokens, int count) { if (tokens == NULL || tokens[0] == NULL || count == 0) { return KVS_CMD_ERROR; } // 判断命令,即tokens[0],是否在命令集中 int cmd = KVS_CMD_START; for (cmd = KVS_CMD_START; cmd < KVS_CMD_END; cmd++) { if (strcmp(tokens[0], commands[cmd]) == 0) { break; } } // 根据cmd,选择对应的命令 switch (cmd) { case KVS_CMD_SET: { assert(count == 3); // SET NAME ZXM,应该有三个 int ret = 0; int res = kvs_array_set(tokens[1], tokens[2]); if (!res) { memset(msg, 0, CLINET_MSG_LENGTH); ret = snprintf(msg, CLINET_MSG_LENGTH, "SUCCESS\r\n"); } else { memset(msg, 0, CLINET_MSG_LENGTH); ret = snprintf(msg, CLINET_MSG_LENGTH, "FAILED\r\n"); } return ret; } case KVS_CMD_GET: { assert(count == 2); int ret = 0; char *value = kvs_array_get(tokens[1]); if (value) { memset(msg, 0, CLINET_MSG_LENGTH); ret = snprintf(msg, CLINET_MSG_LENGTH, "%s\r\n", value); } else { memset(msg, 0, CLINET_MSG_LENGTH); ret = snprintf(msg, CLINET_MSG_LENGTH, "FAILED: NO EXIST\r\n"); } return ret; } case KVS_CMD_COUNT: { assert(count == 1); int res = kvs_array_count(); memset(msg, 0, CLINET_MSG_LENGTH); int ret = snprintf(msg, CLINET_MSG_LENGTH, "%d\r\n", res); return ret; } case KVS_CMD_DELETE: { assert(count == 2); int ret = 0; int res = kvs_array_delete(tokens[1]); if (!res) { memset(msg, 0, CLINET_MSG_LENGTH); ret = snprintf(msg, CLINET_MSG_LENGTH, "SUCCESS\r\n"); } else { memset(msg, 0, CLINET_MSG_LENGTH); ret = snprintf(msg, CLINET_MSG_LENGTH, "FAILED\r\n"); } return ret; } case KVS_CMD_EXIST: { assert(count == 2); int res = kvs_array_exist(tokens[1]); memset(msg, 0, CLINET_MSG_LENGTH); int ret = snprintf(msg, CLINET_MSG_LENGTH, "%d\r\n", res); return ret; } } return 0; } /*分割msg,比如 msg为 SET NAME ZXM ,分割为SET,NAME,ZXM,分别存储在tokens[] * tokens[0]: SET --------- 对应的是命令 cmd * tokens[1]: NAME --------- 对应的是命令 key * tokens[2]: ZXM --------- 对应的是命令 value */ int kvs_spilt_tokens (char **tokens, char *msg) { char *token = strtok(msg, " "); // 按空格分割 int count = 0; while (token != NULL) { tokens[count++] = token; token = strtok(NULL, " "); } return count; } // 解析协议 int kvs_protocol (char *msg, int length) { char *tokens[MAX_TOKENS] = {0}; // 分割msg int count = kvs_spilt_tokens(tokens, msg); // 根据分割后的 msg ,解析其具体命令协议 // msg:命令 tokens:分割后的 msg return kvs_parser_protocol(msg, tokens, count); } //--------------------------------------------------------------------------------------------- //--------------------------------------NtyCo底层的协程----------------------------------------- void server_reader(void *arg) { int fd = *(int *)arg; int ret = 0; while (1) { char buf[CLINET_MSG_LENGTH] = {0}; // 接收msg,存放到buf中 ret = nty_recv(fd, buf, CLINET_MSG_LENGTH, 0); if (ret > 0) { printf("read from server: %.*s\n", ret, buf); // 根据协议解析msg: rec: SET NAME ZXM // 解析后:SET\r\n NAME\r\n ZXM\r\n ret = kvs_protocol(buf, ret); // 发送解析后的msg ret = nty_send(fd, buf, ret, 0); if (ret == -1) { nty_close(fd); break; } } else if (ret == 0) { nty_close(fd); break; } } } void server(void *arg) { unsigned short port = *(unsigned short *)arg; free(arg); int fd = nty_socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) return ; struct sockaddr_in local, remote; local.sin_family = AF_INET; // 设置地址族为IPv4 local.sin_port = htons(port); // 设置端口号 local.sin_addr.s_addr = INADDR_ANY; // 设置IP地址, INADDR_ANY 是一个常量,表示可以接受来自任意 IP 地址的连接。 bind(fd, (struct sockaddr*)&local, sizeof(struct sockaddr_in)); listen(fd, 20); printf("listen port : %d\n", port); //获取当前的时间和日期信息 struct timeval tv_begin; gettimeofday(&tv_begin, NULL); while (1) { socklen_t len = sizeof(struct sockaddr_in); int cli_fd = nty_accept(fd, (struct sockaddr*)&remote, &len); if (cli_fd % 1000 == 999) { struct timeval tv_cur; memcpy(&tv_cur, &tv_begin, sizeof(struct timeval)); gettimeofday(&tv_begin, NULL); int time_used = TIME_SUB_MS(tv_begin, tv_cur); printf("client fd : %d, time_used: %d\n", cli_fd, time_used); } printf("new client comming\n"); nty_coroutine *read_co; nty_coroutine_create(&read_co, server_reader, &cli_fd); } } int main(int argc, char *argv[]) { nty_coroutine *co = NULL; unsigned short base_port = 9999; unsigned short *port = calloc(1, sizeof(unsigned short)); *port = base_port ; // 为每一个连接端口创建一个协程 nty_coroutine_create(&co, server, port); nty_schedule_run(); //run return 0; }
4.2 测试案例
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <unistd.h> #include <arpa/inet.h> #define MAX_MSG_LENGTH 1024 int connect_kvstore(const char *ip ,int port){ int connfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in kvs_addr; memset(&kvs_addr, 0, sizeof(struct sockaddr_in)); kvs_addr.sin_family = AF_INET; kvs_addr.sin_addr.s_addr = inet_addr(ip); // inet_addr把ip转为点分十进制 kvs_addr.sin_port = htons(port); int ret = connect(connfd, (struct sockaddr*)&kvs_addr, sizeof(struct sockaddr_in)); // ret = 0,成功 if (ret) { perror("connect\n"); return -1; } return connfd; } // 发送msg int send_msg(int connfd, char *msg) { int res = send(connfd, msg, strlen(msg), 0); if (res < 0) { exit(1); } return res; } // 接收数据并存入msg int recv_msg(int connfd, char *msg) { int res = recv(connfd, msg, MAX_MSG_LENGTH, 0); if (res < 0){ exit(1); } return res; } // 对比接收到的结果 result 与 应返回的结果 pattern 是否一致 void equals (char *pattern, char *result, char *casename) { if (0 == strcmp(pattern, result)) { printf(">> PASS --> %s\n", casename); } else { printf(">> FAILED --> '%s' != '%s'\n", pattern, result); } } // cmd: 命令 pattern: 应返回的结果 casename:测试的名称 // 比如测试 SET NAME ZXM,cmd="SET NAME ZXM", pattern="SUCCESS\r\n", 测试的名称casename void test_case (int connfd, char *cmd, char *pattern, char *casename) { char result[MAX_MSG_LENGTH] = {0}; // 发送命令cmd send_msg(connfd, cmd); // 接收命令处理后的结果,存入result recv_msg(connfd, result); // 对比接收到的结果 result 与 应返回的结果 pattern 是否一致 equals(pattern, result, casename); } void array_testcase(int connfd ){ test_case(connfd, "SET Name zxm", "SUCCESS\r\n", "SetNameCase"); test_case(connfd, "COUNT", "1\r\n", "COUNTCase"); test_case(connfd, "SET Sex man", "SUCCESS\r\n", "SetNameCase"); test_case(connfd, "COUNT", "2\r\n", "COUNT"); test_case(connfd, "SET Score 100", "SUCCESS\r\n", "SetNameCase"); test_case(connfd, "COUNT", "3\r\n", "COUNT"); test_case(connfd, "SET Nationality China", "SUCCESS\r\n", "SetNameCase"); test_case(connfd, "COUNT", "4\r\n", "COUNT"); test_case(connfd, "EXIST Name", "1\r\n", "EXISTCase"); test_case(connfd, "GET Name", "zxm\r\n", "GetNameCase"); test_case(connfd, "DELETE Name", "SUCCESS\r\n", "DELETECase"); test_case(connfd, "COUNT", "3\r\n", "COUNT"); test_case(connfd, "EXIST Name", "0\r\n", "EXISTCase"); test_case(connfd, "EXIST Sex", "1\r\n", "EXISTCase"); test_case(connfd, "GET Sex", "man\r\n", "GetNameCase"); test_case(connfd, "DELETE Sex", "SUCCESS\r\n", "DELETECase"); test_case(connfd, "COUNT", "2\r\n", "COUNT"); test_case(connfd, "EXIST Score", "1\r\n", "EXISTCase"); test_case(connfd, "GET Score", "100\r\n", "GetNameCase"); test_case(connfd, "DELETE Score", "SUCCESS\r\n", "DELETECase"); test_case(connfd, "COUNT", "1\r\n", "COUNT"); test_case(connfd, "EXIST Nationality", "1\r\n", "EXISTCase"); test_case(connfd, "GET Nationality", "China\r\n", "GetNameCase"); test_case(connfd, "DELETE Nationality", "SUCCESS\r\n", "DELETECase"); test_case(connfd, "COUNT", "0\r\n", "COUNT"); } int main(int argc, char *argv[]) { if (argc < 3){ printf("argc < 3\n"); return -1; } const char *ip = argv[1]; int port = atoi(argv[2]); int connfd = connect_kvstore(ip, port); // array printf(" -----> array testcase <-------\n"); array_testcase(connfd); close(connfd); }
4.3 结果展示