RxCache
RxCache 是一款支持 Java 和 Android 的 Local Cache 。目前,支持堆内存、堆外内存(off-heap memory)、磁盘缓存。
github地址:https://github.com/fengzhizi715/RxCache
堆外内存(off-heap memory)
对象可以存储在 堆内存、堆外内存、磁盘缓存甚至是分布式缓存。
在 Java 中,与堆外内存相对的是堆内存。堆内存遵守 JVM 的内存管理机制,而堆外内存不受到此限制,它由操作系统进行管理。
JVM的内存管理以及堆外内存.jpg
堆外内存和堆内存有明显的区别,或者说有相反的应用场景。
堆外内存更适合:
- 存储生命周期长的对象
- 可以在进程间可以共享,减少 JVM 间的对象复制,使得 JVM 的分割部署更容易实现。
- 本地缓存,减少磁盘缓存或者分布式缓存的响应时间。
RxCache 中使用的堆外内存
首先,创建一个 DirectBufferConverter ,用于将对象和 ByteBuffer 相互转换,以及对象和byte数组相互转换。其中,ByteBuffer.allocteDirect(capability) 用于分配堆外内存。Cleaner 是自己定义的一个类,用于释放 DirectByteBuffer。具体代码可以查看:https://github.com/fengzhizi715/RxCache/blob/master/offheap/src/main/java/com/safframework/rxcache/offheap/converter/Cleaner.java
public abstract class DirectBufferConverter<V> { public void dispose(ByteBuffer direct) { Cleaner.clean(direct); } public ByteBuffer to(V from) { if(from == null) return null; byte[] bytes = toBytes(from); ByteBuffer.wrap(bytes); ByteBuffer bf = ByteBuffer.allocateDirect(bytes.length); bf.put(bytes); bf.flip(); return bf; } abstract public byte[] toBytes(V value); abstract public V toObject(byte[] value); public V from(ByteBuffer to) { if(to == null) return null; byte[] bs = new byte[to.capacity()]; to.get(bs); to.flip(); return toObject(bs); } }
接下来,定义一个 ConcurrentDirectHashMap<K, V> 实现Map接口。它是一个范性,支持将 V 转换成 ByteBuffer 类型,存储到 ConcurrentDirectHashMap 的 map 中。
public abstract class ConcurrentDirectHashMap<K, V> implements Map<K, V> { final private Map<K, ByteBuffer> map; private final DirectBufferConverter<V> converter = new DirectBufferConverter<V>() { @Override public byte[] toBytes(V value) { return convertObjectToBytes(value); } @Override public V toObject(byte[] value) { return convertBytesToObject(value); } }; ConcurrentDirectHashMap() { map = new ConcurrentHashMap<>(); } ConcurrentDirectHashMap(Map<K, V> m) { map = new ConcurrentHashMap<>(); for (Entry<K, V> entry : m.entrySet()) { K key = entry.getKey(); ByteBuffer val = converter.to(entry.getValue()); map.put(key, val); } } protected abstract byte[] convertObjectToBytes(V value); protected abstract V convertBytesToObject(byte[] value); @Override public int size() { return map.size(); } @Override public boolean isEmpty() { return map.isEmpty(); } @Override public boolean containsKey(Object key) { return map.containsKey(key); } @Override public V get(Object key) { final ByteBuffer byteBuffer = map.get(key); return converter.from(byteBuffer); } @Override public V put(K key, V value) { final ByteBuffer byteBuffer = map.put(key, converter.to(value)); converter.dispose(byteBuffer); return converter.from(byteBuffer); } @Override public V remove(Object key) { final ByteBuffer byteBuffer = map.remove(key); final V value = converter.from(byteBuffer); converter.dispose(byteBuffer); return value; } @Override public void putAll(Map<? extends K, ? extends V> m) { for (Entry<? extends K, ? extends V> entry : m.entrySet()) { ByteBuffer byteBuffer = converter.to(entry.getValue()); map.put(entry.getKey(), byteBuffer); } } @Override public void clear() { final Set<K> keys = map.keySet(); for (K key : keys) { map.remove(key); } } @Override public Set<K> keySet() { return map.keySet(); } @Override public Collection<V> values() { Collection<V> values = new ArrayList<>(); for (ByteBuffer byteBuffer : map.values()) { V value = converter.from(byteBuffer); values.add(value); } return values; } @Override public Set<Entry<K, V>> entrySet() { Set<Entry<K, V>> entries = new HashSet<>(); for (Entry<K, ByteBuffer> entry : map.entrySet()) { K key = entry.getKey(); V value = converter.from(entry.getValue()); entries.add(new Entry<K, V>() { @Override public K getKey() { return key; } @Override public V getValue() { return value; } @Override public V setValue(V v) { return null; } }); } return entries; } @Override public boolean containsValue(Object value) { for (ByteBuffer v : map.values()) { if (v.equals(value)) { return true; } } return false; } }
创建 ConcurrentStringObjectDirectHashMap,它的 K 是 String 类型,V 是任意的 Object 对象。其中,序列化和反序列化采用《Java 字节的常用封装》提到的 bytekit。
public class ConcurrentStringObjectDirectHashMap extends ConcurrentDirectHashMap<String,Object> { @Override protected byte[] convertObjectToBytes(Object value) { return Bytes.serialize(value); } @Override protected Object convertBytesToObject(byte[] value) { return Bytes.deserialize(value); } }
基于 FIFO 以及堆外内存来实现 Memory 级别的缓存。
public class DirectBufferMemoryImpl extends AbstractMemoryImpl { private ConcurrentStringObjectDirectHashMap cache; private List<String> keys; public DirectBufferMemoryImpl(long maxSize) { super(maxSize); cache = new ConcurrentStringObjectDirectHashMap(); this.keys = new LinkedList<>(); } @Override public <T> Record<T> getIfPresent(String key) { T result = null; if(expireTimeMap.get(key)!=null) { if (expireTimeMap.get(key)<0) { // 缓存的数据从不过期 result = (T) cache.get(key); } else { if (timestampMap.get(key) + expireTimeMap.get(key) > System.currentTimeMillis()) { // 缓存的数据还没有过期 result = (T) cache.get(key); } else { // 缓存的数据已经过期 evict(key); } } } return result != null ? new Record<>(Source.MEMORY,key, result, timestampMap.get(key),expireTimeMap.get(key)) : null; } @Override public <T> void put(String key, T value) { put(key,value, Constant.NEVER_EXPIRE); } @Override public <T> void put(String key, T value, long expireTime) { if (keySet().size()<maxSize) { // 缓存还有空间 saveValue(key,value,expireTime); } else { // 缓存空间不足,需要删除一个 if (containsKey(key)) { keys.remove(key); saveValue(key,value,expireTime); } else { String oldKey = keys.get(0); // 最早缓存的key evict(oldKey); // 删除最早缓存的数据 FIFO算法 saveValue(key,value,expireTime); } } } private <T> void saveValue(String key, T value, long expireTime) { cache.put(key,value); timestampMap.put(key,System.currentTimeMillis()); expireTimeMap.put(key,expireTime); keys.add(key); } @Override public Set<String> keySet() { return cache.keySet(); } @Override public boolean containsKey(String key) { return cache.containsKey(key); } @Override public void evict(String key) { cache.remove(key); timestampMap.remove(key); expireTimeMap.remove(key); keys.remove(key); } @Override public void evictAll() { cache.clear(); timestampMap.clear(); expireTimeMap.clear(); keys.clear(); } }
到了这里,已经完成了堆外内存在 RxCache 中的封装。其实,已经有很多缓存框架都支持堆外内存,例如 Ehcache、MapDB 等。RxCache 目前已经有了 MapDB 的模块。