极客时间架构师训练营 - week5 - 作业 1

简介: 极客时间架构师训练营 - week5 - 作业 1

  • 用你熟悉的编程语言实现一致性 hash 算法。
  • 编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。

   本周作业对于我而言比较吃力,搜索了极客时间的多门课程,还有微信公众号的文章,最终才得以勉强实现,并在这么晚的时间提交作业。虽然算法的思路还算是清晰,整个哈希环上,增加或减少节点,之间的缓存按次序挪到下一个节点处即可。个人觉得陶老师的这个图示很直观。


接下来是代码:

  • 缓存服务器实体类:
public class CacheServerNode {
    private final String socketAddress;
    public CacheServerNode(String socketAddress) {
        this.socketAddress = socketAddress;
    }
    public String getSocketAddress() {
        return socketAddress;
    }
}


  • 哈希算法接口:
public interface HashAlgorithm {
    long hash(final String k);
}


  • 哈希算法:
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.zip.CRC32;
public enum DefaultHashAlgorithm implements HashAlgorithm {
    /**
     * Native hash (String.hashCode()).
     */
    NATIVE_HASH,
    /**
     * CRC_HASH as used by the perl API. This will be more consistent both
     * across multiple API users as well as java versions, but is mostly likely
     * significantly slower.
     */
    CRC_HASH,
    /**
     * FNV hashes are designed to be fast while maintaining a low collision rate.
     * The FNV speed allows one to quickly hash lots of data while maintaining a
     * reasonable collision rate.
     *
     * @see <a href="http://www.isthe.com/chongo/tech/comp/fnv/">fnv
     *      comparisons</a>
     * @see <a href="http://en.wikipedia.org/wiki/Fowler_Noll_Vo_hash">fnv at
     *      wikipedia</a>
     */
    FNV1_64_HASH,
    /**
     * Variation of FNV.
     */
    FNV1A_64_HASH,
    /**
     * 32-bit FNV1.
     */
    FNV1_32_HASH,
    /**
     * 32-bit FNV1a.
     */
    FNV1A_32_HASH,
    /**
     * MD5-based hash algorithm used by ketama.
     */
    KETAMA_HASH,
    MURMUR_HASH;
    private static final long FNV_64_INIT = 0xcbf29ce484222325L;
    private static final long FNV_64_PRIME = 0x100000001b3L;
    private static final long FNV_32_INIT = 2166136261L;
    private static final long FNV_32_PRIME = 16777619;
    private static MessageDigest md5Digest = null;
    static {
        try {
            md5Digest = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("MD5 not supported", e);
        }
    }
    /**
     * Compute the hash for the given key.
     *
     * @return a positive integer hash
     */
    @Override
    public long hash(final String k) {
        long rv = 0;
        int len = k.length();
        switch (this) {
            case NATIVE_HASH:
                rv = k.hashCode();
                break;
            case CRC_HASH:
                // return (crc32(shift) >> 16) & 0x7fff;
                CRC32 crc32 = new CRC32();
                crc32.update(KeyUtil.getKeyBytes(k));
                rv = (crc32.getValue() >> 16) & 0x7fff;
                break;
            case FNV1_64_HASH:
                // Thanks to pierre@demartines.com for the pointer
                rv = FNV_64_INIT;
                for (int i = 0; i < len; i++) {
                    rv *= FNV_64_PRIME;
                    rv ^= k.charAt(i);
                }
                break;
            case FNV1A_64_HASH:
                rv = FNV_64_INIT;
                for (int i = 0; i < len; i++) {
                    rv ^= k.charAt(i);
                    rv *= FNV_64_PRIME;
                }
                break;
            case FNV1_32_HASH:
                rv = FNV_32_INIT;
                for (int i = 0; i < len; i++) {
                    rv *= FNV_32_PRIME;
                    rv ^= k.charAt(i);
                }
                break;
            case FNV1A_32_HASH:
                rv = FNV_32_INIT;
                for (int i = 0; i < len; i++) {
                    rv ^= k.charAt(i);
                    rv *= FNV_32_PRIME;
                }
                break;
            case KETAMA_HASH:
                byte[] bKey = computeMd5(k);
                rv = ((long) (bKey[3] & 0xFF) << 24)
                        | ((long) (bKey[2] & 0xFF) << 16)
                        | ((long) (bKey[1] & 0xFF) << 8)
                        | (bKey[0] & 0xFF);
                break;
            case MURMUR_HASH:
                rv = murmurHash(k);
                break;
            default:
                assert false;
        }
        return rv & 0xffffffffL; /* Truncate to 32-bits */
    }
    private long murmurHash(String key) {
        ByteBuffer buf = ByteBuffer.wrap(KeyUtil.getKeyBytes(key));
        int seed = 0x1234ABCD;
        ByteOrder byteOrder = buf.order();
        buf.order(ByteOrder.LITTLE_ENDIAN);
        long m = 0xc6a4a7935bd1e995L;
        int r = 47;
        long h = seed ^ (buf.remaining() * m);
        long k;
        while (buf.remaining() >= 8) {
            k = buf.getLong();
            k *= m;
            k ^= k >>> r;
            k *= m;
            h ^= k;
            h *= m;
        }
        if (buf.remaining() > 0) {
            ByteBuffer finish = ByteBuffer.allocate(8).order(
                    ByteOrder.LITTLE_ENDIAN);
            // for big-endian version, do this first:
            // finish.position(8-buf.remaining());
            finish.put(buf).rewind();
            h ^= finish.getLong();
            h *= m;
        }
        h ^= h >>> r;
        h *= m;
        h ^= h >>> r;
        buf.order(byteOrder);
        return h;
    }
    /**
     * Get the md5 of the given key.
     */
    public static byte[] computeMd5(String k) {
        MessageDigest md5;
        try {
            md5 = (MessageDigest) md5Digest.clone();
        } catch (CloneNotSupportedException e) {
            throw new RuntimeException("clone of MD5 not supported", e);
        }
        md5.update(KeyUtil.getKeyBytes(k));
        return md5.digest();
    }
}


  • 哈希环
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
public class ConsistentHashNodeLocator implements NodeLocator {
    private final static int VIRTUAL_NODE_SIZE = 12;
    private final static String VIRTUAL_NODE_SUFFIX = "-";
    private volatile TreeMap<Long, CacheServerNode> hashRing;
    private final HashAlgorithm hashAlg;
    public ConsistentHashNodeLocator(List<CacheServerNode> nodes, HashAlgorithm hashAlg) {
        this.hashAlg = hashAlg;
        this.hashRing = buildConsistentHashRing(hashAlg, nodes);
    }
    @Override
    public CacheServerNode getPrimary(String k) {
        long hash = hashAlg.hash(k);
        return getNodeForKey(hashRing, hash);
    }
    private CacheServerNode getNodeForKey(TreeMap<Long, CacheServerNode> hashRing, long hash) {
        /* 向右找到第一个key */
        Map.Entry<Long, CacheServerNode> locatedNode = hashRing.ceilingEntry(hash);
        /* 想象成为一个环,超出尾部取出第一个 */
        if (locatedNode == null) {
            locatedNode = hashRing.firstEntry();
        }
        return locatedNode.getValue();
    }
    private TreeMap<Long, CacheServerNode> buildConsistentHashRing(HashAlgorithm hashAlgorithm, List<CacheServerNode> nodes) {
        TreeMap<Long, CacheServerNode> virtualNodeRing = new TreeMap<>();
        for (CacheServerNode node : nodes) {
            for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
                // 新增虚拟节点的方式如果有影响,也可以抽象出一个由物理节点扩展虚拟节点的类
                virtualNodeRing.put(hashAlgorithm.hash(node.getSocketAddress().toString() + VIRTUAL_NODE_SUFFIX + i), node);
            }
        }
        return virtualNodeRing;
    }
}


  • 节点定位接口
public interface NodeLocator {
    /**
     * Get the primary location for the given key.
     *
     * @param k the object key
     * @return the QueueAttachment containing the primary storage for a key
     */
    CacheServerNode getPrimary(String k);
}



package cn.remcarpediem.consistenthash;
public class StatisticsUtil {
    //方差s^2=[(x1-x)^2 +...(xn-x)^2]/n
    public static double variance(Long[] x) {
        int m = x.length;
        double sum = 0;
        for (int i = 0; i < m; i++) {//求和
            sum += x[i];
        }
        double dAve = sum / m;//求平均值
        double dVar = 0;
        for (int i = 0; i < m; i++) {//求方差
            dVar += (x[i] - dAve) * (x[i] - dAve);
        }
        return dVar / m;
    }
    //标准差σ=sqrt(s^2)
    public static double standardDeviation(Long[] x) {
        int m = x.length;
        double sum = 0;
        for (int i = 0; i < m; i++) {//求和
            sum += x[i];
        }
        double dAve = sum / m;//求平均值
        double dVar = 0;
        for (int i = 0; i < m; i++) {//求方差
            dVar += (x[i] - dAve) * (x[i] - dAve);
        }
        return Math.sqrt(dVar / m);
    }
}



mport com.google.common.util.concurrent.AtomicLongMap;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
public class NodeLocatorTest {
    private static List<String> ips = new ArrayList<>(50);
    static {
        for (int i = 1; i <= 50; i++) {
            ips.add(String.format("192.168.0.%s", i));
        }
    }
    /**
     * 测试节点新增删除后的变化程度
     */
    @Test
    public void testNodeAddAndRemove() {
        List<CacheServerNode> servers = new ArrayList<>();
        for (String ip : ips) {
            servers.add(new CacheServerNode(ip));
        }
        // 构造 100_0000 缓存
        List<String> keys = new ArrayList<>();
        for (int i = 0; i < 100_0000; i++) {
            keys.add(UUID.randomUUID().toString());
        }
        for (int index = 10; index <= servers.size(); index++) {
            AtomicLongMap<CacheServerNode> atomicLongMap = AtomicLongMap.create();
            List<CacheServerNode> serverChanged = servers.subList(0, index);
            NodeLocator nodeLocator = new ConsistentHashNodeLocator(serverChanged, DefaultHashAlgorithm.KETAMA_HASH);
            for (String key : keys) {
                CacheServerNode node = nodeLocator.getPrimary(key);
                atomicLongMap.getAndIncrement(node);
            }
            System.out.println(String.format("服务器数目:%d, 标准差:%s", serverChanged.size(), StatisticsUtil.standardDeviation(atomicLongMap.asMap().values().toArray(new Long[]{}))));
        }
    }
}


实验结果:

服务器数目:10, 标准差:20281.492706406007
服务器数目:11, 标准差:18614.617684421835
服务器数目:12, 标准差:17859.70210806689
服务器数目:13, 标准差:16765.78199539645
服务器数目:14, 标准差:14417.187791532944
服务器数目:15, 标准差:14167.77614008478
服务器数目:16, 标准差:14855.251743911982
服务器数目:17, 标准差:13433.657892278708
服务器数目:18, 标准差:13288.260571823965
服务器数目:19, 标准差:14300.142293424507
服务器数目:20, 标准差:14261.868727484487
服务器数目:21, 标准差:14661.00541110326
服务器数目:22, 标准差:14967.524383044274
服务器数目:23, 标准差:13407.09552638774
服务器数目:24, 标准差:11908.900724761384
服务器数目:25, 标准差:11754.436530944391
服务器数目:26, 标准差:11354.83654060706
服务器数目:27, 标准差:11155.564944047006
服务器数目:28, 标准差:11088.63582636728
服务器数目:29, 标准差:10772.03044176388
服务器数目:30, 标准差:10567.32368619836
服务器数目:31, 标准差:9979.726791578381
服务器数目:32, 标准差:9159.913959748748
服务器数目:33, 标准差:9299.08377853936
服务器数目:34, 标准差:7982.205795167233
服务器数目:35, 标准差:7840.779142354648
服务器数目:36, 标准差:7079.957525418383
服务器数目:37, 标准差:7079.6900431712165
服务器数目:38, 标准差:6900.269774123231
服务器数目:39, 标准差:6467.352466227541
服务器数目:40, 标准差:6516.695938126928
服务器数目:41, 标准差:6407.049942438552
服务器数目:42, 标准差:6037.666114201412
服务器数目:43, 标准差:6315.702547069574
服务器数目:44, 标准差:6390.455142289932
服务器数目:45, 标准差:6372.179916162962
服务器数目:46, 标准差:6270.7003097174975
服务器数目:47, 标准差:6003.942253324767
服务器数目:48, 标准差:6208.585649235169
服务器数目:49, 标准差:6185.0661103718985
服务器数目:50, 标准差:5659.331229041113




可见,在一段范围内,增加缓存服务器的数量,是有利于提升系统的稳定性能的,且缓存服务器的数量到达一定程度之后,标准差的大小并没有线性下降。所以我个人看来,在实际业务场景中,按业务情况,选用恰当数目的缓存服务器,才是明智之选。


目录
相关文章
|
11天前
|
机器学习/深度学习 算法 安全
隐私计算训练营第三讲-详解隐私计算的架构和技术要点
SecretFlow 是一个隐私保护的统一框架,用于数据分析和机器学习,支持MPC、HE、TEE等隐私计算技术。它提供设备抽象、计算图表示和基于图的ML/DL能力,适应数据水平、垂直和混合分割场景。产品层包括SecretPad(快速体验核心能力)和SecretNote(开发工具)。算法层涉及PSI、PIR、数据分析和联邦学习(水平、垂直、混合)。此外,SecretFlow还有YACL密码库和Kusica任务调度框架,Kusica提供轻量化部署、跨域通信和统一API接口。
92 0
|
8月前
|
消息中间件 缓存 NoSQL
|
9月前
|
消息中间件 存储 关系型数据库
极客时间架构实战营作业八
极客时间架构实战营作业八
117 0
|
9月前
|
消息中间件 Java 中间件
极客时间架构实战营作业六
极客时间架构实战营作业六
73 0
|
9月前
|
运维 关系型数据库 MySQL
极客时间架构实战营作业三
极客时间架构实战营作业三
102 0
|
9月前
|
SQL 分布式计算 架构师
极客时间架构师训练营 - week12 - 作业 2
极客时间架构师训练营 - week12 - 作业 2
75 0
|
7月前
|
资源调度 分布式计算 调度
Fink--3、Flink运行时架构(并行度、算子链、任务槽、作业提交流程)
Fink--3、Flink运行时架构(并行度、算子链、任务槽、作业提交流程)
|
9月前
|
容灾 网络协议
极客时间架构实战营模块 7 作业
极客时间架构实战营模块 7 作业
59 0
|
9月前
|
存储 缓存 负载均衡
极客时间架构实战营作业五
极客时间架构实战营作业五
91 0
|
9月前
|
存储 JSON NoSQL
极客时间架构实战营作业四
极客时间架构实战营作业四
80 0