实现 memcached 客户端:TCP、连接池、一致性哈希、自定义协议

简介: 实现 memcached 客户端:TCP、连接池、一致性哈希、自定义协议

废话不多说,本文带你实现一个简单的 memcached 客户端。



01
集群 & 一致性哈希


memcached 本身并不支持集群,为了使用集群,我们可以自己在客户端实现路由分发,将相同的 key 路由到同一台 memcached 上去即可。


路由算法有很多,这里我们使用一致性哈希算法,其原理如下图所示:

一致性哈希算法已经有开源库 hashring 实现了,基本用法:

const HashRing = require('hashring');
// 输入集群地址构造 hash ring
const ring = new HashRing(['127.0.0.1:11211', '127.0.0.2:11211']);
// 输入 key 获取指定节点
const host = ring.get(key);

02


TCP 编程


包括 memcached 在内的许多系统对外都是通过 TCP 通信。在 Node.js 中建立一个 TCP 连接并进行数据的收发是很简单的:

const net = require('net');
const socket = new net.Socket();
socket.connect({
    host: host,                     // 目标主机
    port: port,                     // 目标端口
    // localAddress: localAddress,  // 本地地址
    // localPort: localPort,        // 本地端口
});
socket.setKeepAlive(true);      // 保活
// 连接相关
socket.on('connect', () => {
    console.log(`socket connected`);
});
socket.on('error', error => {
    console.log(`socket error: ${error}`);
});
socket.on('close', hadError => {
    console.log(`socket closed, transmission error: ${hadError}`);
});
socket.on('data', data => {
    // 接受数据
});
socket.write(data); // 发送数据

一条连接由唯一的五元组确定,所谓的五元组就是:协议(TCP / UDP)、本地地址、本地端口、远程地址、远程端口。


系统正是通过五元组去区分不同的连接,其中本地地址和本地端口由于在缺省的情况下会自动生成,常常会被我们忽视。


03


连接池


一次完整的 TCP 通信过程为:握手,建立连接  -->  数据传输  -->  挥手,关闭连接。


我们都应该知道 TCP 建立连接的过程是非常消耗资源的,而连接池就是为了解决这个问题,连接池是一个通用的模型,它包括:

  • 建立连接,将连接放入池中。
  • 需要使用连接时(进行数据收发),从连接池中取出连接。
  • 连接使用完毕后,将连接放回到池中。
  • 其它。


可以看到所谓的连接池其实就是在连接使用完成后并不是立即关闭连接,而是让连接保活,等待下一次使用,从而避免反复建立连接的过程。


正如上文所述,连接池是一个通用的模块,我们这里直接使用开源库 generic-pool


池化 TCP 连接及使用示例:

const net = require('net');
const genericPool = require('generic-pool');
// 自定义创建连接池的函数
function _buildPool(remote_server) {
    const factory = {
        create: function () {
            return new Promise((resolve, reject) => {
                const host = remote_server.split(':')[0];
                const port = remote_server.split(':')[1];
                const socket = new net.Socket();
                socket.connect({
                    host: host, // 目标主机
                    port: port, // 目标端口
                });
                socket.setKeepAlive(true);
                socket.on('connect', () => {
                    console.log(`socket connected: ${remote_server} , local: ${socket.localAddress}:${socket.localPort}`);
                    resolve(socket);
                });
                socket.on('error', error => {
                    console.log(`socket error: ${remote_server} , ${error}`);
                    reject(error);
                });
                socket.on('close', hadError => {
                    console.log(`socket closed: ${remote_server} , transmission error: ${hadError}`);
                });
            });
        },
        destroy: function (socket) {
            return new Promise((resolve) => {
                socket.destroy();
                resolve();
            });
        },
        validate: function (socket) { // validate socket
            return new Promise((resolve) => {
                if (socket.connecting || socket.destroyed || !socket.readable || !socket.writable) {
                    return resolve(false);
                } else {
                    return resolve(true);
                }
            });
        }
    };
    const pool = genericPool.createPool(factory, {
        max: 10,            // 最大连接数
        min: 0,             // 最小连接数
        testOnBorrow: true, // 从池中取连接时进行 validate 函数验证
    });
    return pool;
}
// 连接池基本使用
const pool = _buildPool('127.0.0.1:11211'); // 构建连接池
const s = await pool.acquire();             // 从连接池中取连接
await pool.release(s);                      // 使用完成后释放连接

04


自定义协议


包括 memcached 在内的许多系统都自定义了一套协议用于对外通信,为了实现 memcached 客户端当然就要遵守它的协议内容。


这里实现一个最简单的 get 方法:


发送的数据格式:

get <key>\r\n

接受的数据格式:

VALUE <key> <flags> <bytes>\r\n
<data block>\r\n

实现示例:

// 定义一个请求方法并返回响应数据
function _request(command) {
    return new Promise(async (resolve, reject) => {
        try {
            // ...这里省略了连接池构建相关部分
            const s = await pool.acquire(); // 取连接
            const bufs = [];
            s.on('data', async buf => { // 监听 data 事件接受响应数据
                bufs.push(buf);
                const END_BUF = Buffer.from('\r\n'); // 数据接受完成的结束位
                if (END_BUF.equals(buf.slice(-2))) {
                    s.removeAllListeners('data'); // 移除监听
                    try {
                        await pool.release(s); // 释放连接
                    } catch (error) { }
                    const data = Buffer.concat(bufs).toString();
                    return resolve(data);
                }
            });
            s.write(command);
        } catch (error) {
            return reject(error);
        }
    });
}
// get
function get(key) {
    return new Promise(async (resolve, reject) => {
        try {
            const command = `get ${key}\r\n`;
            const data = await _request(key, command);
            // ...响应数据的处理,注意有省略
            // key not exist
            if (data === 'END\r\n') {
                return resolve(undefined);
            }
            /*
                VALUE <key> <flags> <bytesLength>\r\n
                <data block>\r\n
            */
            const data_arr = data.split('\r\n');
            const response_line = data_arr[0].split(' ');
            const value_flag = response_line[2];
            const value_length = Number(response_line[3]);
            let value = data_arr.slice(1, -2).join('');
            value = unescapeValue(value); // unescape \r\n
            // ...有省略
            return resolve(value);
        } catch (error) {
            return reject(error);
        }
    });
}

以上示例都单独拿出来了,其实都是整合在一个 class 中的:

class Memcached {
    constructor(serverLocations, options) {
        this._configs = {
            ...{
                pool: {
                    max: 1,
                    min: 0,
                    idle: 30000,             // 30000 ms.
                },
                timeout: 5000,              // timeout for every command, 5000 ms.
                retries: 5,                 // max retry times for failed request.
                maxWaitingClients: 10000,   // maximum number of queued requests allowed
            }, ...options
        };
        this._hashring = new HashRing(serverLocations);
        this._pools = {}; // 通过 k-v 的形式存储具体的地址及它的连接池
    }
    _buildPool(remote_server) {
        // ...
    }
    _request(key, command) {
        // ...
    }
    // get
    async get(key) {
        // ...
    }
    // ... 其他方法
}
// 使用实例
const memcached = new Memcached(['127.0.0.1:11211'], {
    pool: {
        max: 10,
        min: 0
    }
});
const key = 'testkey';
const result = await memcached.get(key);


目录
相关文章
|
存储 移动开发 缓存
艾伟:自己实现memcached客户端库
What's memcached ? memcached是一个以key-value的形式缓存数据的缓存系统。通过将数据缓存到内存中,从而提高数据的获取速度。memcached以key-value的形式来保存数据,你可以为你每一段数据关联一个key,然后以后可以通过这个key获取这段数据。
819 0
|
测试技术 Memcache
艾伟_转载:.NET平台上的Memcached客户端介绍
早上接到一个任务,需要对Linux服务器的Memcached的update操作进行性能测试,我发现我是一个典型的“手里拿着锤子,就把所有问题都当成钉子”的人。我第一个念头就是,上Memcached的官网找.NET的客户端。
872 0
|
缓存 网络协议 Linux
|
存储 移动开发 Unix
|
存储 移动开发 数据库
看看mina和memcached的联姻(适合不同语言客户端,高并发?)
[size=medium]/** * 作者:张荣华 * 日期:2008-07-21 **/ 看看mina和memcached的联姻 先来解释一下这两个东东的身世 Mina,是什么? Minan是一个network 应用框架,她能很方便的帮助用户开发出高性能和高可扩展性的网络应用程
2135 0
|
缓存 Java API
Memcached学习笔记 — 第四部分:Memcached Java 客户端-gwhalin(1)-介绍及使用
 介绍 Memcached java client是官方推荐的最早的memcached java客户端。最新版本:java_memcached-release_2.6.1。 官方下载地址:https://github.com/gwhalin/Memcached-Java-Client采用阻塞式SOCKET通讯,据说目前版本进行了很多优化,性能有所提高(只看过1.5的源代码,还没来及看
1202 0