Adhesive框架系列文章--分布式组件客户端模块实现

简介: Adhesive框架中是分布式组件客户端首先实现的是基于Json序列化+二进制协议的Memcached客户端。在本文中会介绍其中的实现细节。 我们先来看一下项目结构: 从这个结构大致可以看出: 1)Memcached只是其中的一个具体实现,这个组件期望提供一个ClientSocket-Cl...

Adhesive框架中是分布式组件客户端首先实现的是基于Json序列化+二进制协议的Memcached客户端。在本文中会介绍其中的实现细节。

我们先来看一下项目结构:

image

从这个结构大致可以看出:

1)Memcached只是其中的一个具体实现,这个组件期望提供一个ClientSocket-ClientNode-ClientCluster的基础实现,以后可以有各种客户端基于这种结构来实现

2)对于Memcached的实现,其中把协议部分放在的Protocol文件夹中,并且根据协议为每一个请求和响应封装类型,也就是使用面向对象的方式而不是拼数据包的方式来封装协议

那么现在首先来介绍基础结构。从最底部的层次开始,最底部应该是对Socket进行一个封装,在这里我们实现了一个ClientSocket,主要完成下面功能:

1)封装Read、Write、Connect、Reset(因为我们实现的是Socket池,所以在Socket使用之后,归还池之前需要重置)操作

2)封装Socket基本状态,包括创建时间、忙碌时间、闲置时间、发生错误时的回调方法

 

在ClientSocket之上的一层是ClientNode,也就是一个节点的客户端,很明显,这里需要做的是Socket连接池,具体完成的工作有:

1)进行连接池的维护,包括移除空闲超时的Socket、强制结束忙碌时间过长的Socket、补充新的Socket到连接池的下限

2)初始化池、结束池、从池获取Socket、把使用后的Socket返回池、创建非池Socket

在正常使用的时候,所有Socket都从池中获取,如果整个Node不可用,那么我们定时创建非池Socket来测试Node是否恢复

 

在ClientNode之上的是ClientCluster,也就是集群,对于需要客户端进行一致性哈希分发节点的分布式组件来说,这层就很必要了,完成的功能主要有:

1)初始化集群、使用一致性哈希从集群获得节点、直接获得ClientSocket

2)在节点出错的时候进行重新节点分配、尝试恢复出错的节点

 

ClientCluster是使用ClientNodeLocator来分配节点的,其中的算法也就是一致性哈希算法。

image

之前说过节点有权重的概念,在这里也就是通过虚拟节点的数量来设置节点权重,权重越高分配到Key的数量也就会越多。

 

在ClientCluster之上还封装了一层AbstractClient,也就是直接面向用户的API入口。

public abstract class AbstractClient<T> where T : AbstractClient<T>, new()

完成的功能有:

1)保存所有的Cluster,初始化Cluster

2)获取具体的XXXClient的实现,比如MemcachedClient

 

很明显,我们的第一个实现MemcachedClient是继承了AbstractClient:

public partial class MemcachedClient : AbstractClient<MemcachedClient>

在这里使用了部分类,内部的实现都放在了MemcachedClient_Internal.cs中,而对外的API都放在了MemcachedClient.cs中。

 

对于Memcached的二进制协议,我们首先是实现一个头的格式包:

    [StructLayout(LayoutKind.Sequential, Pack = 1)]
    internal struct Header
    {
        internal byte Magic;

        internal byte Opcode;

        internal ushort KeyLength;

        internal byte ExtraLength;

        internal byte DataType;

        internal ushort Reserved;

        internal uint TotalBodyLength;

        internal uint Opaque;

        internal ulong Version;
    }

由于我们会直接把结构打包为字节数组,所以这里声明了结构的内存布局。在Protocol.cs中,我们有一些实用的方法,比如结构和字节数组双向转换的实现:

        internal static T BytesToStruct<T>(this byte[] rawData)
        {
            T result = default(T);
            RespectEndianness(typeof(T), rawData);
            GCHandle handle = GCHandle.Alloc(rawData, GCHandleType.Pinned);
            try
            {
                IntPtr rawDataPtr = handle.AddrOfPinnedObject();
                result = (T)Marshal.PtrToStructure(rawDataPtr, typeof(T));
            }
            finally
            {
                handle.Free();
            }
            return result;
        }

        internal static byte[] StructToBytes<T>(this T data)
        {
            byte[] rawData = new byte[Marshal.SizeOf(data)];
            GCHandle handle = GCHandle.Alloc(rawData, GCHandleType.Pinned);
            try
            {
                IntPtr rawDataPtr = handle.AddrOfPinnedObject();
                Marshal.StructureToPtr(data, rawDataPtr, false);
            }
            finally
            {
                handle.Free();
            }
            RespectEndianness(typeof(T), rawData);
            return rawData;
        }

        private static void RespectEndianness(Type type, byte[] data)
        {
            var fields = type.GetFields(BindingFlags.NonPublic | BindingFlags.Instance).Select(field => new
            {
                Field = field,
                Offset = Marshal.OffsetOf(type, field.Name).ToInt32()
            }).ToList();

            fields.ForEach(item => Array.Reverse(data, item.Offset, Marshal.SizeOf(item.Field.FieldType)));
        }

在定义了头之后,我们就可以封装一个抽象的请求包了:

image

只要实现这个包,然后调用其GetBytes方法就可以直接获得需要发送的请求数据包,它会在内部处理Header和Body数据的打包。

比如,我们来看一个Set操作的包实现:

    internal class SetRequestPackage : AbstractRequestPackage
    {
        private TimeSpan expireSpan;
        private byte[] valueBytes;
        private ulong version;

        public override Opcode Opcode
        {
            get { return Opcode.Set; }
        }

        internal SetRequestPackage(string key, byte[] valueBytes, TimeSpan expireSpan, ulong version)
            : base(key)
        {
            if (expireSpan > TimeSpan.FromDays(30))
                throw new ArgumentOutOfRangeException("过期时间不能超过30天!");
            this.expireSpan = expireSpan;
            this.valueBytes = valueBytes;
            this.version = version;
        }

        internal SetRequestPackage(string key, string value, TimeSpan expireSpan, ulong version)
            : this(key, Encoding.UTF8.GetBytes(value), expireSpan, version)
        {
        }

        internal SetRequestPackage(string key, string value, ulong version)
            : this(key, Encoding.UTF8.GetBytes(value), TimeSpan.FromDays(30), version)
        {
        }

        internal SetRequestPackage(string key, byte[] valueBytes, ulong version)
            : this(key, valueBytes, TimeSpan.FromDays(30), version)
        {
        }

        protected override ulong GetVersion()
        {
            return version;
        }

        protected override byte[] GetExtraBytes()
        {
            var extraBytes = new List<byte>();
            uint flag = 0xdeadbeef;
            extraBytes.AddRange(flag.GetBigEndianBytes());
            uint expire = Convert.ToUInt32(expireSpan.TotalSeconds);
            extraBytes.AddRange(expire.GetBigEndianBytes());
            return extraBytes.ToArray();
        }

        protected override byte[] GetValueBytes()
        {
            return valueBytes;
        }
    }

在这里,我们只是实现了抽象方法来为基类提供没有的数据,并不需要关心数据是如何打包的。那么,之后发送Set请求的操作就很简单了:

        private bool InternalSet(string key, string value, TimeSpan expire, ulong version)
        {
            using (var socket = GetCluster().AcquireSocket(key))
            {
                if (socket != null)
                {
                    AbstractRequestPackage requestPackage = expire == TimeSpan.MaxValue ? new SetRequestPackage(key, value, version)
                            : new SetRequestPackage(key, value, expire, version);
                    var requestData = requestPackage.GetBytes();
                    if (requestData != null)
                    {
                        socket.Write(requestData);
                        var responsePackage = ResponsePackageCreator.GetPackage(socket);
                        if (responsePackage != null)
                        {
                            if (responsePackage.ResponseStatus == ResponseStatus.NoError)
                            {
                                return true;
                            }
                            else if (responsePackage.ResponseStatus != ResponseStatus.KeyExists
                                    && responsePackage.ResponseStatus != ResponseStatus.KeyNotFound)
                            {
                                LocalLoggingService.Warning("在 {0} 上执行操作 {1} 得到了不正确的回复 Key : {2} -> {3}",
                                            socket.Endpoint.ToString(),
                                            requestPackage.Opcode,
                                            key,
                                            responsePackage.ResponseStatus);
                            }
                        }
                        else
                        {
                            LocalLoggingService.Error("在 {0} 上执行操作 {1} 没有得到回复 Key : {2}",
                                       socket.Endpoint.ToString(),
                                       requestPackage.Opcode,
                                       key);
                        }
                    }
                }
            }
            return false;
        }

1)首先是获取到Cluster,再获取到池中的Socket

2)然后初始化一个SetRequestPackage,再通过GetBytes获得数据

3)直接把数据写入Socket

4)通过ResponsePackageCreator来获得返回的数据包

 

很明显,ResponsePackageCreator和AbstractRequestPackage的意图差不多,用来把响应的数据包封装成我们需要的数据,其中有一个:

internal static GeneralResponsePackage GetPackage(ClientSocket socket)

获得的是一个通用的响应数据包:

    internal class GeneralResponsePackage
    {
        internal Opcode Opcode { get; set; }

        internal ResponseStatus ResponseStatus { get; set; }

        internal string Key { get; set; }

        internal byte[] ValueBytes { get; set; }

        internal ulong Version { get; set; }

        internal string Value
        {
            get
            {
                if (ValueBytes != null)
                {
                    return Encoding.UTF8.GetString(ValueBytes);
                }
                else
                {
                    return null;
                }
            }
        }
    }

在这里基本的信息都有了,比如操作代码、响应状态、Key、Value、版本号。正因为Memcached的协议比较简单,所有的响应包都是这么一个格式,所以我们并没有实现特殊的响应包。如果要实现的话,只需要在类头部标记OpCode并且继承GeneralResponsePackage,ResponsePackageCreator会自动返回相应的子类:

    [AttributeUsage(AttributeTargets.Class)]
    internal class ResponsePackageAttribute : Attribute
    {
        internal Opcode Opcode { get; private set; }

        internal ResponsePackageAttribute(Opcode opcode)
        {
            this.Opcode = opcode;
        }
    }

在获得了响应之后,通过判断ResponseStatus来知道响应是否正确,并且记录相关日志即可。这么一来,数据一去一回以及协议如何实现的整个过程就介绍完了。下面,我们再介绍一下客户端中几个特色功能的实现。

 

1)获取一组Key功能。由于一个集群会有多个节点,所以要获取一组Key,我们首先需要把Key按照节点分类,然后对于不同的节点,采用并行的方式同时获取,这样速度会很快,代码片段如下:

            var nodeCache = new Dictionary<ClientNode, List<string>>();
            foreach (var key in keys)
            {
                var node = GetCluster().AcquireNode(key);
                if (!nodeCache.ContainsKey(node))
                    nodeCache.Add(node, new List<string> { key });
                else if (!nodeCache[node].Contains(key))
                    nodeCache[node].Add(key);
            }

            var data = new Dictionary<string, string>();
            Parallel.ForEach(nodeCache, node =>

2)List功能。Memcached只提供了Key、Value的存储,有的时候我们的Value是一个列表,那么我们可以有两种方式完成这个功能。第一种就是直接把列表序列化作为一个Value保存,优点是简单,缺点是如果以后需要修改的话需要整个列表取出,修改后再把整个列表保存进去,并且由于Memcached Value大小的限制,这么做也不能保存大列表;第二种方式是一个Value保存列表中的一个项,再使用一个KeyValue来保存其中每一项的ID,这么优点是修改方便,获取的数据可以是列表中的一部分,缺点是实现麻烦,要考虑并发问题、要维护另外一个KeyValue来保存所有的ID。在这里,我们封装了后一种方式的实现。

3)Locker功能。使用Memcached完成锁的功能其实很简单,我们只需要在获取锁的时候判断Add一个空值是否成功,如果不成功则表示占有,等待一段时间尝试获取,一直到超时,在返回锁的时候删除这个项即可。在这里,我们封装了MemcachedLocker来完成这个功能。

作者: lovecindywang
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
相关文章
|
3月前
|
存储 监控 数据可视化
常见的分布式定时任务调度框架
分布式定时任务调度框架用于在分布式系统中管理和调度定时任务,确保任务按预定时间和频率执行。其核心概念包括Job(任务)、Trigger(触发器)、Executor(执行器)和Scheduler(调度器)。这类框架应具备任务管理、任务监控、良好的可扩展性和高可用性等功能。常用的Java生态中的分布式任务调度框架有Quartz Scheduler、ElasticJob和XXL-JOB。
1018 66
|
2月前
|
数据采集 人工智能 分布式计算
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
阿里云推出的MaxFrame是链接大数据与AI的分布式Python计算框架,提供类似Pandas的操作接口和分布式处理能力。本文从部署、功能验证到实际场景全面评测MaxFrame,涵盖分布式Pandas操作、大语言模型数据预处理及企业级应用。结果显示,MaxFrame在处理大规模数据时性能显著提升,代码兼容性强,适合从数据清洗到训练数据生成的全链路场景...
113 5
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
|
2月前
|
人工智能 分布式计算 大数据
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
112 7
|
3月前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
135 2
|
4月前
|
消息中间件 运维 数据库
Seata框架和其他分布式事务框架有什么区别
Seata框架和其他分布式事务框架有什么区别
67 1
|
22天前
|
NoSQL Java 中间件
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
本文介绍了从单机锁到分布式锁的演变,重点探讨了使用Redis实现分布式锁的方法。分布式锁用于控制分布式系统中多个实例对共享资源的同步访问,需满足互斥性、可重入性、锁超时防死锁和锁释放正确防误删等特性。文章通过具体示例展示了如何利用Redis的`setnx`命令实现加锁,并分析了简化版分布式锁存在的问题,如锁超时和误删。为了解决这些问题,文中提出了设置锁过期时间和在解锁前验证持有锁的线程身份的优化方案。最后指出,尽管当前设计已解决部分问题,但仍存在进一步优化的空间,将在后续章节继续探讨。
467 131
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
|
25天前
|
NoSQL Java Redis
Springboot使用Redis实现分布式锁
通过这些步骤和示例,您可以系统地了解如何在Spring Boot中使用Redis实现分布式锁,并在实际项目中应用。希望这些内容对您的学习和工作有所帮助。
157 83
|
5月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
21天前
|
缓存 NoSQL 搜索推荐
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
本文介绍了如何通过Lua脚本在Redis中实现分布式锁的原子性操作,避免并发问题。首先讲解了Lua脚本的基本概念及其在Redis中的使用方法,包括通过`eval`指令执行Lua脚本和通过`script load`指令缓存脚本。接着详细展示了如何用Lua脚本实现加锁、解锁及可重入锁的功能,确保同一线程可以多次获取锁而不发生死锁。最后,通过代码示例演示了如何在实际业务中调用这些Lua脚本,确保锁操作的原子性和安全性。
51 6
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
|
1月前
|
缓存 NoSQL 中间件
Redis,分布式缓存演化之路
本文介绍了基于Redis的分布式缓存演化,探讨了分布式锁和缓存一致性问题及其解决方案。首先分析了本地缓存和分布式缓存的区别与优劣,接着深入讲解了分布式远程缓存带来的并发、缓存失效(穿透、雪崩、击穿)等问题及应对策略。文章还详细描述了如何使用Redis实现分布式锁,确保高并发场景下的数据一致性和系统稳定性。最后,通过双写模式和失效模式讨论了缓存一致性问题,并提出了多种解决方案,如引入Canal中间件等。希望这些内容能为读者在设计分布式缓存系统时提供有价值的参考。感谢您的阅读!
130 6
Redis,分布式缓存演化之路

热门文章

最新文章