- 用你熟悉的编程语言实现一致性 hash 算法。
- 编写测试用例测试这个算法,测试 100 万 KV 数据,10 个服务器节点的情况下,计算这些 KV 数据在服务器上分布数量的标准差,以评估算法的存储负载不均衡性。
本周作业对于我而言比较吃力,搜索了极客时间的多门课程,还有微信公众号的文章,最终才得以勉强实现,并在这么晚的时间提交作业。虽然算法的思路还算是清晰,整个哈希环上,增加或减少节点,之间的缓存按次序挪到下一个节点处即可。个人觉得陶老师的这个图示很直观。
接下来是代码:
- 缓存服务器实体类:
public class CacheServerNode { private final String socketAddress; public CacheServerNode(String socketAddress) { this.socketAddress = socketAddress; } public String getSocketAddress() { return socketAddress; } }
- 哈希算法接口:
public interface HashAlgorithm { long hash(final String k); }
- 哈希算法:
import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.zip.CRC32; public enum DefaultHashAlgorithm implements HashAlgorithm { /** * Native hash (String.hashCode()). */ NATIVE_HASH, /** * CRC_HASH as used by the perl API. This will be more consistent both * across multiple API users as well as java versions, but is mostly likely * significantly slower. */ CRC_HASH, /** * FNV hashes are designed to be fast while maintaining a low collision rate. * The FNV speed allows one to quickly hash lots of data while maintaining a * reasonable collision rate. * * @see <a href="http://www.isthe.com/chongo/tech/comp/fnv/">fnv * comparisons</a> * @see <a href="http://en.wikipedia.org/wiki/Fowler_Noll_Vo_hash">fnv at * wikipedia</a> */ FNV1_64_HASH, /** * Variation of FNV. */ FNV1A_64_HASH, /** * 32-bit FNV1. */ FNV1_32_HASH, /** * 32-bit FNV1a. */ FNV1A_32_HASH, /** * MD5-based hash algorithm used by ketama. */ KETAMA_HASH, MURMUR_HASH; private static final long FNV_64_INIT = 0xcbf29ce484222325L; private static final long FNV_64_PRIME = 0x100000001b3L; private static final long FNV_32_INIT = 2166136261L; private static final long FNV_32_PRIME = 16777619; private static MessageDigest md5Digest = null; static { try { md5Digest = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new RuntimeException("MD5 not supported", e); } } /** * Compute the hash for the given key. * * @return a positive integer hash */ @Override public long hash(final String k) { long rv = 0; int len = k.length(); switch (this) { case NATIVE_HASH: rv = k.hashCode(); break; case CRC_HASH: // return (crc32(shift) >> 16) & 0x7fff; CRC32 crc32 = new CRC32(); crc32.update(KeyUtil.getKeyBytes(k)); rv = (crc32.getValue() >> 16) & 0x7fff; break; case FNV1_64_HASH: // Thanks to pierre@demartines.com for the pointer rv = FNV_64_INIT; for (int i = 0; i < len; i++) { rv *= FNV_64_PRIME; rv ^= k.charAt(i); } break; case FNV1A_64_HASH: rv = FNV_64_INIT; for (int i = 0; i < len; i++) { rv ^= k.charAt(i); rv *= FNV_64_PRIME; } break; case FNV1_32_HASH: rv = FNV_32_INIT; for (int i = 0; i < len; i++) { rv *= FNV_32_PRIME; rv ^= k.charAt(i); } break; case FNV1A_32_HASH: rv = FNV_32_INIT; for (int i = 0; i < len; i++) { rv ^= k.charAt(i); rv *= FNV_32_PRIME; } break; case KETAMA_HASH: byte[] bKey = computeMd5(k); rv = ((long) (bKey[3] & 0xFF) << 24) | ((long) (bKey[2] & 0xFF) << 16) | ((long) (bKey[1] & 0xFF) << 8) | (bKey[0] & 0xFF); break; case MURMUR_HASH: rv = murmurHash(k); break; default: assert false; } return rv & 0xffffffffL; /* Truncate to 32-bits */ } private long murmurHash(String key) { ByteBuffer buf = ByteBuffer.wrap(KeyUtil.getKeyBytes(key)); int seed = 0x1234ABCD; ByteOrder byteOrder = buf.order(); buf.order(ByteOrder.LITTLE_ENDIAN); long m = 0xc6a4a7935bd1e995L; int r = 47; long h = seed ^ (buf.remaining() * m); long k; while (buf.remaining() >= 8) { k = buf.getLong(); k *= m; k ^= k >>> r; k *= m; h ^= k; h *= m; } if (buf.remaining() > 0) { ByteBuffer finish = ByteBuffer.allocate(8).order( ByteOrder.LITTLE_ENDIAN); // for big-endian version, do this first: // finish.position(8-buf.remaining()); finish.put(buf).rewind(); h ^= finish.getLong(); h *= m; } h ^= h >>> r; h *= m; h ^= h >>> r; buf.order(byteOrder); return h; } /** * Get the md5 of the given key. */ public static byte[] computeMd5(String k) { MessageDigest md5; try { md5 = (MessageDigest) md5Digest.clone(); } catch (CloneNotSupportedException e) { throw new RuntimeException("clone of MD5 not supported", e); } md5.update(KeyUtil.getKeyBytes(k)); return md5.digest(); } }
- 哈希环
import java.util.List; import java.util.Map; import java.util.TreeMap; public class ConsistentHashNodeLocator implements NodeLocator { private final static int VIRTUAL_NODE_SIZE = 12; private final static String VIRTUAL_NODE_SUFFIX = "-"; private volatile TreeMap<Long, CacheServerNode> hashRing; private final HashAlgorithm hashAlg; public ConsistentHashNodeLocator(List<CacheServerNode> nodes, HashAlgorithm hashAlg) { this.hashAlg = hashAlg; this.hashRing = buildConsistentHashRing(hashAlg, nodes); } @Override public CacheServerNode getPrimary(String k) { long hash = hashAlg.hash(k); return getNodeForKey(hashRing, hash); } private CacheServerNode getNodeForKey(TreeMap<Long, CacheServerNode> hashRing, long hash) { /* 向右找到第一个key */ Map.Entry<Long, CacheServerNode> locatedNode = hashRing.ceilingEntry(hash); /* 想象成为一个环,超出尾部取出第一个 */ if (locatedNode == null) { locatedNode = hashRing.firstEntry(); } return locatedNode.getValue(); } private TreeMap<Long, CacheServerNode> buildConsistentHashRing(HashAlgorithm hashAlgorithm, List<CacheServerNode> nodes) { TreeMap<Long, CacheServerNode> virtualNodeRing = new TreeMap<>(); for (CacheServerNode node : nodes) { for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) { // 新增虚拟节点的方式如果有影响,也可以抽象出一个由物理节点扩展虚拟节点的类 virtualNodeRing.put(hashAlgorithm.hash(node.getSocketAddress().toString() + VIRTUAL_NODE_SUFFIX + i), node); } } return virtualNodeRing; } }
- 节点定位接口
public interface NodeLocator { /** * Get the primary location for the given key. * * @param k the object key * @return the QueueAttachment containing the primary storage for a key */ CacheServerNode getPrimary(String k); }
package cn.remcarpediem.consistenthash; public class StatisticsUtil { //方差s^2=[(x1-x)^2 +...(xn-x)^2]/n public static double variance(Long[] x) { int m = x.length; double sum = 0; for (int i = 0; i < m; i++) {//求和 sum += x[i]; } double dAve = sum / m;//求平均值 double dVar = 0; for (int i = 0; i < m; i++) {//求方差 dVar += (x[i] - dAve) * (x[i] - dAve); } return dVar / m; } //标准差σ=sqrt(s^2) public static double standardDeviation(Long[] x) { int m = x.length; double sum = 0; for (int i = 0; i < m; i++) {//求和 sum += x[i]; } double dAve = sum / m;//求平均值 double dVar = 0; for (int i = 0; i < m; i++) {//求方差 dVar += (x[i] - dAve) * (x[i] - dAve); } return Math.sqrt(dVar / m); } }
mport com.google.common.util.concurrent.AtomicLongMap; import org.junit.Test; import java.util.ArrayList; import java.util.List; import java.util.UUID; public class NodeLocatorTest { private static List<String> ips = new ArrayList<>(50); static { for (int i = 1; i <= 50; i++) { ips.add(String.format("192.168.0.%s", i)); } } /** * 测试节点新增删除后的变化程度 */ @Test public void testNodeAddAndRemove() { List<CacheServerNode> servers = new ArrayList<>(); for (String ip : ips) { servers.add(new CacheServerNode(ip)); } // 构造 100_0000 缓存 List<String> keys = new ArrayList<>(); for (int i = 0; i < 100_0000; i++) { keys.add(UUID.randomUUID().toString()); } for (int index = 10; index <= servers.size(); index++) { AtomicLongMap<CacheServerNode> atomicLongMap = AtomicLongMap.create(); List<CacheServerNode> serverChanged = servers.subList(0, index); NodeLocator nodeLocator = new ConsistentHashNodeLocator(serverChanged, DefaultHashAlgorithm.KETAMA_HASH); for (String key : keys) { CacheServerNode node = nodeLocator.getPrimary(key); atomicLongMap.getAndIncrement(node); } System.out.println(String.format("服务器数目:%d, 标准差:%s", serverChanged.size(), StatisticsUtil.standardDeviation(atomicLongMap.asMap().values().toArray(new Long[]{})))); } } }
实验结果:
服务器数目:10, 标准差:20281.492706406007 服务器数目:11, 标准差:18614.617684421835 服务器数目:12, 标准差:17859.70210806689 服务器数目:13, 标准差:16765.78199539645 服务器数目:14, 标准差:14417.187791532944 服务器数目:15, 标准差:14167.77614008478 服务器数目:16, 标准差:14855.251743911982 服务器数目:17, 标准差:13433.657892278708 服务器数目:18, 标准差:13288.260571823965 服务器数目:19, 标准差:14300.142293424507 服务器数目:20, 标准差:14261.868727484487 服务器数目:21, 标准差:14661.00541110326 服务器数目:22, 标准差:14967.524383044274 服务器数目:23, 标准差:13407.09552638774 服务器数目:24, 标准差:11908.900724761384 服务器数目:25, 标准差:11754.436530944391 服务器数目:26, 标准差:11354.83654060706 服务器数目:27, 标准差:11155.564944047006 服务器数目:28, 标准差:11088.63582636728 服务器数目:29, 标准差:10772.03044176388 服务器数目:30, 标准差:10567.32368619836 服务器数目:31, 标准差:9979.726791578381 服务器数目:32, 标准差:9159.913959748748 服务器数目:33, 标准差:9299.08377853936 服务器数目:34, 标准差:7982.205795167233 服务器数目:35, 标准差:7840.779142354648 服务器数目:36, 标准差:7079.957525418383 服务器数目:37, 标准差:7079.6900431712165 服务器数目:38, 标准差:6900.269774123231 服务器数目:39, 标准差:6467.352466227541 服务器数目:40, 标准差:6516.695938126928 服务器数目:41, 标准差:6407.049942438552 服务器数目:42, 标准差:6037.666114201412 服务器数目:43, 标准差:6315.702547069574 服务器数目:44, 标准差:6390.455142289932 服务器数目:45, 标准差:6372.179916162962 服务器数目:46, 标准差:6270.7003097174975 服务器数目:47, 标准差:6003.942253324767 服务器数目:48, 标准差:6208.585649235169 服务器数目:49, 标准差:6185.0661103718985 服务器数目:50, 标准差:5659.331229041113
可见,在一段范围内,增加缓存服务器的数量,是有利于提升系统的稳定性能的,且缓存服务器的数量到达一定程度之后,标准差的大小并没有线性下降。所以我个人看来,在实际业务场景中,按业务情况,选用恰当数目的缓存服务器,才是明智之选。