从零实现kv存储V1.0:array初版

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: 从零实现kv存储V1.0:array初版

本节开始,逐步实现基于内存的kv存储引擎。


一、项目主要功能和知识点


参照redis,主要实现的功能:

1、数据的插入、查询、删除等操作

1)SET:插入key - value

2)GET:获取key对应的value

3)COUNT:统计已插入多少个key

4)DELETE:删除key以及对应的value

5)EXIST:判断key是否存在


2、实现不同的数据结构存储引擎

不同的数据结构,操作的效率是不一样的。因此我们实现基于数组array、红黑树rbtree、哈希hash、跳表skiptable 这四种数据结构,实现kv存储,并测试相应的性能。


3、测试用例

1)功能测试

2)10w的qps测试


主要涉及的知识点有:

1)基于协程,一个连接对应一个协程

2)tcp网络交互

3)数据结构:数组array、红黑树rbtree、哈希hash、跳表skiptable


二、架构设计


b5d36c39d1d93bbab41085f224e6fd56_78bd471bb3244c11970132eda46fdbbc.png

acedacc27d8acc53ecc9604456ac8960_0acaa0a62509487d8d9785b97679f8a4.png


三、具体实现


我们先实现简单的,基于数组array的kv存储引擎。


3.1 流程

以 SET NAME ZXM 为例,大致介绍一下流程:


1)客户端发送 SET NAME ZXM

2)服务器接收 SET NAME ZXM,进入解析流程

3)根据协议进程解析:

按空格拆分命令,存储到toekns[]。 tokens[0] = SET,tokens[1] = NAME, tokens[2]: ZXM

根据tokens[0],解析出对应的数据结构存储引擎,即数组。以及对应的操作命令,即插入。

根据key、value执行相应的操作命令,即插入新的key,value。

返回操作结果

4)解析完成,将操作结果返回给客户端。

5)客户端根据接收到的结果判断是否操作成功。


3.3 引擎层

typedef struct kvs_array_item_s {
  char *key;
  char *value;
} kvs_array_item_t;
// kvs_array_table 存储插入的 kv
kvs_array_item_t kvs_array_table [KVS_ARRAY_ITEM_SIEZ] = {0};

3.4 接口层

// 查找 key 在 kvs_array_table 的位置
kvs_array_item_t *kvs_array_search_item (char *key);
// KVS_CMD_EXIST: 判断 key 是否存在,存在返回 1 
int kvs_array_exist (char *key);
// KVS_CMD_SET:插入 kv
int kvs_array_set (char *key, char *value);
// KVS_CMD_GET:获取 key 对应的value
char *kvs_array_get(char *key);
// KVS_CMD_COUNT:统计以及插入多少个 key
int kvs_array_count (void) ;
// KVS_CMD_DELETE:删除 key
int kvs_array_delete(char *key);

3.5 协议层

// 根据msg,解析其具体的命令协议
int kvs_parser_protocol (char *msg, char **tokens, int count) ;
/*分割msg,比如 msg为 SET NAME ZXM ,分割为SET,NAME,ZXM,分别存储在tokens[]
 * tokens[0]: SET --------- 对应的是命令 cmd
 * tokens[1]: NAME  --------- 对应的是命令 key
 * tokens[2]: ZXM --------- 对应的是命令 value
 */
int kvs_spilt_tokens (char **tokens, char *msg) ;
// 解析协议
int kvs_protocol (char *msg, int length); 

3.6 网络层

Github:NtyCo


// 为每一个连接端口创建一个协程
nty_coroutine_create(&co, server, port); 


四、具体代码


4.1 kvstore.c

// gcc -o kvstore1 kvstore1.c -I ./NtyCo/core/ -L ./NtyCo/ -lntyco -lpthread -ldl 
#include "nty_coroutine.h"
#include <arpa/inet.h>
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
// array: set, get, count, delete, exist
// rbtree: rset, rget, rcount, rdelete, rexist
// hash: hset, hget, hcount, hdelete, hexist
// skiptable: zset, zget, zcount, zdelete, zexist
//---------------------------------------------------------------------------------------------
//-------------------------------------- 接口层 -----------------------------------------
typedef enum kvs_cmd_e {
  KVS_CMD_START = 0,
  // array
  KVS_CMD_SET = KVS_CMD_START,
  KVS_CMD_GET,
  KVS_CMD_COUNT,
  KVS_CMD_DELETE,
  KVS_CMD_EXIST,
  // rbtree
  KVS_CMD_RSET,
  KVS_CMD_RGET,
  KVS_CMD_RCOUNT,
  KVS_CMD_RDELETE,
  KVS_CMD_REXIST,
  // hash
  KVS_CMD_HSET,
  KVS_CMD_HGET,
  KVS_CMD_HCOUNT,
  KVS_CMD_HDELETE,
  KVS_CMD_HEXIST,
  // skiptable
  KVS_CMD_ZSET,
  KVS_CMD_ZGET,
  KVS_CMD_ZCOUNT,
  KVS_CMD_ZDELETE,
  KVS_CMD_ZEXIST,
  KVS_CMD_END = KVS_CMD_ZEXIST,
  // 格式出错
  KVS_CMD_ERROR,
  // 断开
  KVS_CMD_QUIT,
} kvs_cmd_t;
// 命令集
const char *commands[] = {
  "SET", "GET", "COUNT", "DELETE", "EXIST",
  "RSET", "RGET", "RCOUNT", "RDELETE", "REXIST",
  "HSET", "HGET", "HCOUNT", "HDELETE", "HEXIST",
  "ZSET", "ZGET", "ZCOUNT", "ZDELETE", "ZEXIST"
};
// 为了以后优化的可能,把内存开辟释放封装
void *kvs_malloc(size_t size) {
  return malloc(size);
}
void kvs_free(void *ptr) {
  return free(ptr);
}
//-------------------------------------- array -----------------------------------------
#define KVS_ARRAY_ITEM_SIEZ     1024
typedef struct kvs_array_item_s {
  char *key;
  char *value;
} kvs_array_item_t;
// kvs_array_table 存储插入的 kv
kvs_array_item_t kvs_array_table [KVS_ARRAY_ITEM_SIEZ] = {0};
// 查找 key 在 kvs_array_table 的位置
kvs_array_item_t *kvs_array_search_item (char *key){
  if (!key) return NULL;
  int i = 0;
  // 注意由于前面的 key 被删除而出现的 key == NULL , strcmp 不能与 NULL 比较
  for (i = 0;i < KVS_ARRAY_ITEM_SIEZ; i++) {
    if (kvs_array_table[i].key != NULL && strcmp(kvs_array_table[i].key , key) == 0) {
      return &kvs_array_table[i];
    }
  }
  return NULL;
}
// KVS_CMD_EXIST: 判断 key 是否存在,存在返回 1 
int kvs_array_exist (char *key) {
  if (kvs_array_search_item(key) != NULL) return 1;
}
// KVS_CMD_SET:插入 kv, 成功返回 0
int array_count = 0;  // 已插入元素的个数
int kvs_array_set (char *key, char *value) {
  if (key == NULL || value == NULL || array_count == KVS_ARRAY_ITEM_SIEZ - 1) {
    return -1;
  }
  // key 已存在,不能插入
  if (kvs_array_exist(key)) {
    return -1;
  }
  char *kvs_key = kvs_malloc(strlen(key) + 1);
  if (kvs_key == NULL) return -1;
  strncpy(kvs_key, key, strlen(key) + 1);
  char *kvs_value = kvs_malloc(strlen(value) + 1);
  if (kvs_value == NULL) {
    free(kvs_key);
    return -1;
  }
  strncpy(kvs_value, value, strlen(value) + 1);
  int i = 0;
  for (i = 0;i < KVS_ARRAY_ITEM_SIEZ; i++) {
    if (kvs_array_table[i].key == NULL && kvs_array_table[i].value == 0) {
      break;
    }
  }
  kvs_array_table[i].key = kvs_key;
  kvs_array_table[i].value = kvs_value;
  array_count++;
  return 0;
}
// KVS_CMD_GET:获取 key 对应的value
char *kvs_array_get(char *key) {
  kvs_array_item_t * item = kvs_array_search_item(key);
  if (item) {
    return item->value;
  }
  return NULL;
}
// KVS_CMD_COUNT:统计以及插入多少个 key
int kvs_array_count (void) {
  return array_count;
}
// KVS_CMD_DELETE:删除 key
int kvs_array_delete(char *key) {
  if (key == NULL) return -1;
  kvs_array_item_t * item = kvs_array_search_item(key);
  if (item == NULL) {
    return -1;
  }
  if (item->key) {
    kvs_free(item->key);
    item->key = NULL;
  }
  if (item->value) {
    kvs_free(item->value);
    item->value = NULL;
  }
  array_count--;
  return 0;
}
//---------------------------------------------------------------------------------------------
//-------------------------------------- 协议层 -----------------------------------------
#define MAX_TOKENS    32
#define CLINET_MSG_LENGTH   1024    // client发送msg的最大长度
// 根据msg,解析其具体的命令协议
int kvs_parser_protocol (char *msg, char **tokens, int count) {
  if (tokens == NULL || tokens[0] == NULL || count == 0) {
    return KVS_CMD_ERROR;
  }
  // 判断命令,即tokens[0],是否在命令集中
  int cmd = KVS_CMD_START;
  for (cmd = KVS_CMD_START; cmd < KVS_CMD_END; cmd++) {
    if (strcmp(tokens[0], commands[cmd]) == 0) {
      break;
    }
  }
  // 根据cmd,选择对应的命令
  switch (cmd) {
  case KVS_CMD_SET: {
    assert(count == 3);   // SET NAME ZXM,应该有三个
    int ret = 0;
    int res = kvs_array_set(tokens[1], tokens[2]);
    if (!res) {
      memset(msg, 0, CLINET_MSG_LENGTH);
      ret = snprintf(msg, CLINET_MSG_LENGTH, "SUCCESS\r\n");
    } else {
      memset(msg, 0, CLINET_MSG_LENGTH);
      ret = snprintf(msg, CLINET_MSG_LENGTH, "FAILED\r\n");     
    }
    return ret;
  }
  case KVS_CMD_GET: {
    assert(count == 2);
    int ret = 0;
    char *value = kvs_array_get(tokens[1]);
    if (value) {
      memset(msg, 0, CLINET_MSG_LENGTH);
      ret = snprintf(msg, CLINET_MSG_LENGTH, "%s\r\n", value);
    } else {
      memset(msg, 0, CLINET_MSG_LENGTH);
      ret = snprintf(msg, CLINET_MSG_LENGTH, "FAILED: NO EXIST\r\n");
    }
    return ret;
  }
  case KVS_CMD_COUNT: {
    assert(count == 1);
    int res = kvs_array_count();
    memset(msg, 0, CLINET_MSG_LENGTH);
    int ret = snprintf(msg, CLINET_MSG_LENGTH, "%d\r\n", res);
    return ret;
  }
  case KVS_CMD_DELETE: {
    assert(count == 2);
    int ret = 0;
    int res = kvs_array_delete(tokens[1]);
    if (!res) {
      memset(msg, 0, CLINET_MSG_LENGTH);
      ret = snprintf(msg, CLINET_MSG_LENGTH, "SUCCESS\r\n");
    } else {
      memset(msg, 0, CLINET_MSG_LENGTH);
      ret = snprintf(msg, CLINET_MSG_LENGTH, "FAILED\r\n");
    }
    return ret;
  }
  case KVS_CMD_EXIST: {
    assert(count == 2);
    int res = kvs_array_exist(tokens[1]);
    memset(msg, 0, CLINET_MSG_LENGTH);
    int ret = snprintf(msg, CLINET_MSG_LENGTH, "%d\r\n", res);
    return ret;
  }
  }
  return 0;
}
/*分割msg,比如 msg为 SET NAME ZXM ,分割为SET,NAME,ZXM,分别存储在tokens[]
 * tokens[0]: SET --------- 对应的是命令 cmd
 * tokens[1]: NAME  --------- 对应的是命令 key
 * tokens[2]: ZXM --------- 对应的是命令 value
 */
int kvs_spilt_tokens (char **tokens, char *msg) {
  char *token = strtok(msg, " ");   // 按空格分割
  int count  = 0;
  while (token != NULL) {
    tokens[count++] = token;
    token  = strtok(NULL, " "); 
  }
  return count;
}
// 解析协议
int kvs_protocol (char *msg, int length) {
  char *tokens[MAX_TOKENS] = {0};
  // 分割msg
  int count = kvs_spilt_tokens(tokens, msg);
  // 根据分割后的 msg ,解析其具体命令协议
  // msg:命令    tokens:分割后的 msg   
  return kvs_parser_protocol(msg, tokens, count);
}
//---------------------------------------------------------------------------------------------
//--------------------------------------NtyCo底层的协程-----------------------------------------
void server_reader(void *arg) {
  int fd = *(int *)arg;
  int ret = 0;
  while (1) {
    char buf[CLINET_MSG_LENGTH] = {0};
    // 接收msg,存放到buf中
    ret = nty_recv(fd, buf, CLINET_MSG_LENGTH, 0);
    if (ret > 0) {
      printf("read from server: %.*s\n", ret, buf);
      // 根据协议解析msg: rec: SET NAME ZXM  
      // 解析后:SET\r\n NAME\r\n ZXM\r\n
      ret = kvs_protocol(buf, ret);
      // 发送解析后的msg
      ret = nty_send(fd, buf, ret, 0);
      if (ret == -1) {
        nty_close(fd);
        break;
      }
    } else if (ret == 0) {  
      nty_close(fd);
      break;
    }
  }
}
void server(void *arg) {
  unsigned short port = *(unsigned short *)arg;
  free(arg);
  int fd = nty_socket(AF_INET, SOCK_STREAM, 0);
  if (fd < 0) return ;
  struct sockaddr_in local, remote;
  local.sin_family = AF_INET;       // 设置地址族为IPv4
  local.sin_port = htons(port);     // 设置端口号
  local.sin_addr.s_addr = INADDR_ANY;   // 设置IP地址, INADDR_ANY 是一个常量,表示可以接受来自任意 IP 地址的连接。
  bind(fd, (struct sockaddr*)&local, sizeof(struct sockaddr_in));
  listen(fd, 20);
  printf("listen port : %d\n", port);
  //获取当前的时间和日期信息
  struct timeval tv_begin;
  gettimeofday(&tv_begin, NULL);
  while (1) {
    socklen_t len = sizeof(struct sockaddr_in);
    int cli_fd = nty_accept(fd, (struct sockaddr*)&remote, &len);
    if (cli_fd % 1000 == 999) {
      struct timeval tv_cur;
      memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));
      gettimeofday(&tv_begin, NULL);
      int time_used = TIME_SUB_MS(tv_begin, tv_cur);
      printf("client fd : %d, time_used: %d\n", cli_fd, time_used);
    }
    printf("new client comming\n");
    nty_coroutine *read_co;
    nty_coroutine_create(&read_co, server_reader, &cli_fd);
  }
}
int main(int argc, char *argv[]) {
  nty_coroutine *co = NULL;
  unsigned short base_port = 9999;
  unsigned short *port = calloc(1, sizeof(unsigned short));
  *port = base_port ;
  // 为每一个连接端口创建一个协程
  nty_coroutine_create(&co, server, port); 
  nty_schedule_run(); //run
  return 0;
}

4.2 测试案例

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#define MAX_MSG_LENGTH    1024
int connect_kvstore(const char *ip ,int port){
  int connfd = socket(AF_INET, SOCK_STREAM, 0);
  struct sockaddr_in kvs_addr;
  memset(&kvs_addr, 0, sizeof(struct sockaddr_in)); 
  kvs_addr.sin_family = AF_INET;
  kvs_addr.sin_addr.s_addr = inet_addr(ip); // inet_addr把ip转为点分十进制
  kvs_addr.sin_port = htons(port);
  int ret = connect(connfd, (struct sockaddr*)&kvs_addr, sizeof(struct sockaddr_in));
  // ret = 0,成功
  if (ret) { 
    perror("connect\n");
    return -1;
  }
  return connfd;
}
// 发送msg
int send_msg(int connfd, char *msg) {
  int res = send(connfd, msg, strlen(msg), 0);
  if (res < 0) {
    exit(1);
  }
  return res;
}
// 接收数据并存入msg
int recv_msg(int connfd, char *msg) {
  int res = recv(connfd, msg, MAX_MSG_LENGTH, 0);
  if (res < 0){
    exit(1);
  }
  return res;
}
// 对比接收到的结果 result 与 应返回的结果 pattern 是否一致
void equals (char *pattern, char *result, char *casename) {
  if (0 == strcmp(pattern, result)) {
    printf(">> PASS --> %s\n", casename);
  } else {
    printf(">> FAILED --> '%s' != '%s'\n", pattern, result);
  }
}
// cmd: 命令      pattern: 应返回的结果   casename:测试的名称
// 比如测试 SET NAME ZXM,cmd="SET NAME ZXM", pattern="SUCCESS\r\n", 测试的名称casename
void test_case (int connfd, char *cmd, char *pattern, char *casename) {
  char result[MAX_MSG_LENGTH] = {0};
  // 发送命令cmd
  send_msg(connfd, cmd);
  // 接收命令处理后的结果,存入result
  recv_msg(connfd, result);
  // 对比接收到的结果 result 与 应返回的结果 pattern 是否一致
  equals(pattern, result, casename);
}
void array_testcase(int connfd ){
  test_case(connfd, "SET Name zxm", "SUCCESS\r\n", "SetNameCase");
  test_case(connfd, "COUNT", "1\r\n", "COUNTCase");
  test_case(connfd, "SET Sex man", "SUCCESS\r\n", "SetNameCase");
  test_case(connfd, "COUNT", "2\r\n", "COUNT");
  test_case(connfd, "SET Score 100", "SUCCESS\r\n", "SetNameCase");
  test_case(connfd, "COUNT", "3\r\n", "COUNT");
  test_case(connfd, "SET Nationality China", "SUCCESS\r\n", "SetNameCase");
  test_case(connfd, "COUNT", "4\r\n", "COUNT");
  test_case(connfd, "EXIST Name", "1\r\n", "EXISTCase");
  test_case(connfd, "GET Name", "zxm\r\n", "GetNameCase");
  test_case(connfd, "DELETE Name", "SUCCESS\r\n", "DELETECase");
  test_case(connfd, "COUNT", "3\r\n", "COUNT");
  test_case(connfd, "EXIST Name", "0\r\n", "EXISTCase");
  test_case(connfd, "EXIST Sex", "1\r\n", "EXISTCase");
  test_case(connfd, "GET Sex", "man\r\n", "GetNameCase");
  test_case(connfd, "DELETE Sex", "SUCCESS\r\n", "DELETECase");
  test_case(connfd, "COUNT", "2\r\n", "COUNT");
  test_case(connfd, "EXIST Score", "1\r\n", "EXISTCase");
  test_case(connfd, "GET Score", "100\r\n", "GetNameCase");
  test_case(connfd, "DELETE Score", "SUCCESS\r\n", "DELETECase");
  test_case(connfd, "COUNT", "1\r\n", "COUNT");
  test_case(connfd, "EXIST Nationality", "1\r\n", "EXISTCase");
  test_case(connfd, "GET Nationality", "China\r\n", "GetNameCase");
  test_case(connfd, "DELETE Nationality", "SUCCESS\r\n", "DELETECase");
  test_case(connfd, "COUNT", "0\r\n", "COUNT");
}
int main(int argc, char *argv[]) {
  if (argc < 3){
    printf("argc < 3\n");
    return -1;
  }
  const char *ip = argv[1];
  int port = atoi(argv[2]);
  int connfd = connect_kvstore(ip, port);
  // array
  printf(" -----> array testcase <-------\n");
  array_testcase(connfd);
  close(connfd);
} 

4.3 结果展示

61c497beac0c6948b197b4472aff944a_308b7706511d4fa0913350e42ef76578.png

目录
相关文章
|
存储 XML NoSQL
KV 存储那些事儿
开发中,我们总会需要存储些 KV 数据,虽然看上去简单,但考虑因素也是很多的,实现手段也就各有差异。今天,我们就来看看 Android 目前有哪些 KV 库可以供我们使用,以及其有哪些优缺点。
371 0
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
25 2
|
6月前
|
存储 搜索推荐 C++
【C++高阶(二)】熟悉STL中的map和set --了解KV模型和pair结构
【C++高阶(二)】熟悉STL中的map和set --了解KV模型和pair结构
|
6月前
toParamStr支持数组
toParamStr支持数组
30 0
|
存储 测试技术
从零实现kv存储V2.0
从零实现kv存储V2.0
166 1
|
存储 SQL 分布式数据库
记录一次 Hbase 线上问题的分析和解决,并分析总结下背后的知识点 - KeyValue size too large
记录一次 Hbase 线上问题的分析和解决,并分析总结下背后的知识点 - KeyValue size too large
|
JSON 安全 程序员
GoFrame的gmap相比Go原生的map,天然支持排序和有序遍历
这篇文章就是给初学的小伙伴们答疑解惑的,会为大家介绍: 为什么Go语言中的map是无序的,如何自定义实现map的排序?
226 0
GoFrame的gmap相比Go原生的map,天然支持排序和有序遍历
|
前端开发
两个数组数据的高效合并方案
作为一个前端,服务器返回的数据易用,能极大的提升开发效率。 能一个接口提供的数据,就不要用去调用两次或者更多网络请求,然后进行数据合并。 然而,理想和现实两者,现实总是找我,感觉不到理想对的温暖。
265 0
两个数组数据的高效合并方案
|
SQL 算法 关系型数据库
开发指南—函数—拆分函数—STR_HASH
开发指南—函数—拆分函数—STR_HASH
109 0