实现 memcached 客户端:TCP、连接池、一致性哈希、自定义协议

简介: 实现 memcached 客户端:TCP、连接池、一致性哈希、自定义协议

废话不多说,本文带你实现一个简单的 memcached 客户端。



01
集群 & 一致性哈希


memcached 本身并不支持集群,为了使用集群,我们可以自己在客户端实现路由分发,将相同的 key 路由到同一台 memcached 上去即可。


路由算法有很多,这里我们使用一致性哈希算法,其原理如下图所示:

一致性哈希算法已经有开源库 hashring 实现了,基本用法:

const HashRing = require('hashring');
// 输入集群地址构造 hash ring
const ring = new HashRing(['127.0.0.1:11211', '127.0.0.2:11211']);
// 输入 key 获取指定节点
const host = ring.get(key);

02


TCP 编程


包括 memcached 在内的许多系统对外都是通过 TCP 通信。在 Node.js 中建立一个 TCP 连接并进行数据的收发是很简单的:

const net = require('net');
const socket = new net.Socket();
socket.connect({
    host: host,                     // 目标主机
    port: port,                     // 目标端口
    // localAddress: localAddress,  // 本地地址
    // localPort: localPort,        // 本地端口
});
socket.setKeepAlive(true);      // 保活
// 连接相关
socket.on('connect', () => {
    console.log(`socket connected`);
});
socket.on('error', error => {
    console.log(`socket error: ${error}`);
});
socket.on('close', hadError => {
    console.log(`socket closed, transmission error: ${hadError}`);
});
socket.on('data', data => {
    // 接受数据
});
socket.write(data); // 发送数据

一条连接由唯一的五元组确定,所谓的五元组就是:协议(TCP / UDP)、本地地址、本地端口、远程地址、远程端口。


系统正是通过五元组去区分不同的连接,其中本地地址和本地端口由于在缺省的情况下会自动生成,常常会被我们忽视。


03


连接池


一次完整的 TCP 通信过程为:握手,建立连接  -->  数据传输  -->  挥手,关闭连接。


我们都应该知道 TCP 建立连接的过程是非常消耗资源的,而连接池就是为了解决这个问题,连接池是一个通用的模型,它包括:

  • 建立连接,将连接放入池中。
  • 需要使用连接时(进行数据收发),从连接池中取出连接。
  • 连接使用完毕后,将连接放回到池中。
  • 其它。


可以看到所谓的连接池其实就是在连接使用完成后并不是立即关闭连接,而是让连接保活,等待下一次使用,从而避免反复建立连接的过程。


正如上文所述,连接池是一个通用的模块,我们这里直接使用开源库 generic-pool


池化 TCP 连接及使用示例:

const net = require('net');
const genericPool = require('generic-pool');
// 自定义创建连接池的函数
function _buildPool(remote_server) {
    const factory = {
        create: function () {
            return new Promise((resolve, reject) => {
                const host = remote_server.split(':')[0];
                const port = remote_server.split(':')[1];
                const socket = new net.Socket();
                socket.connect({
                    host: host, // 目标主机
                    port: port, // 目标端口
                });
                socket.setKeepAlive(true);
                socket.on('connect', () => {
                    console.log(`socket connected: ${remote_server} , local: ${socket.localAddress}:${socket.localPort}`);
                    resolve(socket);
                });
                socket.on('error', error => {
                    console.log(`socket error: ${remote_server} , ${error}`);
                    reject(error);
                });
                socket.on('close', hadError => {
                    console.log(`socket closed: ${remote_server} , transmission error: ${hadError}`);
                });
            });
        },
        destroy: function (socket) {
            return new Promise((resolve) => {
                socket.destroy();
                resolve();
            });
        },
        validate: function (socket) { // validate socket
            return new Promise((resolve) => {
                if (socket.connecting || socket.destroyed || !socket.readable || !socket.writable) {
                    return resolve(false);
                } else {
                    return resolve(true);
                }
            });
        }
    };
    const pool = genericPool.createPool(factory, {
        max: 10,            // 最大连接数
        min: 0,             // 最小连接数
        testOnBorrow: true, // 从池中取连接时进行 validate 函数验证
    });
    return pool;
}
// 连接池基本使用
const pool = _buildPool('127.0.0.1:11211'); // 构建连接池
const s = await pool.acquire();             // 从连接池中取连接
await pool.release(s);                      // 使用完成后释放连接

04


自定义协议


包括 memcached 在内的许多系统都自定义了一套协议用于对外通信,为了实现 memcached 客户端当然就要遵守它的协议内容。


这里实现一个最简单的 get 方法:


发送的数据格式:

get <key>\r\n

接受的数据格式:

VALUE <key> <flags> <bytes>\r\n
<data block>\r\n

实现示例:

// 定义一个请求方法并返回响应数据
function _request(command) {
    return new Promise(async (resolve, reject) => {
        try {
            // ...这里省略了连接池构建相关部分
            const s = await pool.acquire(); // 取连接
            const bufs = [];
            s.on('data', async buf => { // 监听 data 事件接受响应数据
                bufs.push(buf);
                const END_BUF = Buffer.from('\r\n'); // 数据接受完成的结束位
                if (END_BUF.equals(buf.slice(-2))) {
                    s.removeAllListeners('data'); // 移除监听
                    try {
                        await pool.release(s); // 释放连接
                    } catch (error) { }
                    const data = Buffer.concat(bufs).toString();
                    return resolve(data);
                }
            });
            s.write(command);
        } catch (error) {
            return reject(error);
        }
    });
}
// get
function get(key) {
    return new Promise(async (resolve, reject) => {
        try {
            const command = `get ${key}\r\n`;
            const data = await _request(key, command);
            // ...响应数据的处理,注意有省略
            // key not exist
            if (data === 'END\r\n') {
                return resolve(undefined);
            }
            /*
                VALUE <key> <flags> <bytesLength>\r\n
                <data block>\r\n
            */
            const data_arr = data.split('\r\n');
            const response_line = data_arr[0].split(' ');
            const value_flag = response_line[2];
            const value_length = Number(response_line[3]);
            let value = data_arr.slice(1, -2).join('');
            value = unescapeValue(value); // unescape \r\n
            // ...有省略
            return resolve(value);
        } catch (error) {
            return reject(error);
        }
    });
}

以上示例都单独拿出来了,其实都是整合在一个 class 中的:

class Memcached {
    constructor(serverLocations, options) {
        this._configs = {
            ...{
                pool: {
                    max: 1,
                    min: 0,
                    idle: 30000,             // 30000 ms.
                },
                timeout: 5000,              // timeout for every command, 5000 ms.
                retries: 5,                 // max retry times for failed request.
                maxWaitingClients: 10000,   // maximum number of queued requests allowed
            }, ...options
        };
        this._hashring = new HashRing(serverLocations);
        this._pools = {}; // 通过 k-v 的形式存储具体的地址及它的连接池
    }
    _buildPool(remote_server) {
        // ...
    }
    _request(key, command) {
        // ...
    }
    // get
    async get(key) {
        // ...
    }
    // ... 其他方法
}
// 使用实例
const memcached = new Memcached(['127.0.0.1:11211'], {
    pool: {
        max: 10,
        min: 0
    }
});
const key = 'testkey';
const result = await memcached.get(key);


目录
相关文章
|
3月前
|
存储 缓存 监控
Memcached玩转Web性能:一致性哈希、数据持久化,一文全掌握!
【8月更文挑战第24天】Memcached是一款高性能的分布式内存对象缓存系统,它通过在网络中存储数据并使用简单的键值对机制来提高动态Web应用的性能。它可以显著减少数据库查询次数,进而减轻数据库负载并加快响应时间。为了最大化利用Memcached的优势,建议合理配置内存使用、采用一致性哈希策略、实施数据持久化措施,并持续监控系统健康状况。提供的示例代码展示了如何使用Java创建客户端、添加和获取数据。
40 1
|
3月前
|
物联网 C# 智能硬件
智能家居新篇章:WPF与物联网的智慧碰撞——通过MQTT协议连接与控制智能设备,打造现代科技生活的完美体验
【8月更文挑战第31天】物联网(IoT)技术的发展使智能家居设备成为现代家庭的一部分。通过物联网,家用电器和传感器可以互联互通,实现远程控制和状态监测等功能。本文将探讨如何在Windows Presentation Foundation(WPF)应用中集成物联网技术,通过具体示例代码展示其实现过程。文章首先介绍了MQTT协议及其在智能家居中的应用,并详细描述了使用Wi-Fi连接方式的原因。随后,通过安装Paho MQTT客户端库并创建MQTT客户端实例,演示了如何编写一个简单的WPF应用程序来控制智能灯泡。
116 0
|
存储 移动开发 缓存
艾伟:自己实现memcached客户端库
What's memcached ? memcached是一个以key-value的形式缓存数据的缓存系统。通过将数据缓存到内存中,从而提高数据的获取速度。memcached以key-value的形式来保存数据,你可以为你每一段数据关联一个key,然后以后可以通过这个key获取这段数据。
837 0
|
测试技术 Memcache
艾伟_转载:.NET平台上的Memcached客户端介绍
早上接到一个任务,需要对Linux服务器的Memcached的update操作进行性能测试,我发现我是一个典型的“手里拿着锤子,就把所有问题都当成钉子”的人。我第一个念头就是,上Memcached的官网找.NET的客户端。
894 0
|
缓存 网络协议 Linux
|
存储 移动开发 Unix
|
缓存 Java API
Memcached学习笔记 — 第四部分:Memcached Java 客户端-gwhalin(1)-介绍及使用
 介绍 Memcached java client是官方推荐的最早的memcached java客户端。最新版本:java_memcached-release_2.6.1。 官方下载地址:https://github.com/gwhalin/Memcached-Java-Client采用阻塞式SOCKET通讯,据说目前版本进行了很多优化,性能有所提高(只看过1.5的源代码,还没来及看
1270 0
|
移动开发 双11 存储
Memcached 二进制协议(BinaryProtocol) incr指令泄露内存数据的bug
缘起 最近有个分布式限速的需求。支付宝的接口双11只允许每秒调用10次。 单机的限速,自然是用google guava的RateLimiter。
1127 0