Redisson—分布式集合

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: Redisson—分布式集合

7.1. 映射(Map)

基于Redis的Redisson的分布式映射结构的RMap Java对象实现了java.util.concurrent.ConcurrentMap接口和java.util.Map接口。与HashMap不同的是,RMap保持了元素的插入顺序。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。

除了同步接口外,还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。如果你想用Redis Map来保存你的POJO的话,可以考虑使用分布式实时对象(Live Object)服务。

在特定的场景下,映射缓存(Map)上的高度频繁的读取操作,使网络通信都被视为瓶颈时,可以使用Redisson提供的带有本地缓存功能的映射。

RMap<String, SomeObject> map = redisson.getMap("anyMap");
SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");
map.fastPut("321", new SomeObject());
map.fastRemove("321");
RFuture<SomeObject> putAsyncFuture = map.putAsync("321");
RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");
map.fastPutAsync("321", new SomeObject());
map.fastRemoveAsync("321");

映射的字段锁的用法:

RMap<MyKey, MyValue> map = redisson.getMap("anyMap");
MyKey k = new MyKey();
RLock keyLock = map.getLock(k);
keyLock.lock();
try {
   MyValue v = map.get(k);
   // 其他业务逻辑
} finally {
   keyLock.unlock();
}
RReadWriteLock rwLock = map.getReadWriteLock(k);
rwLock.readLock().lock();
try {
   MyValue v = map.get(k);
   // 其他业务逻辑
} finally {
   keyLock.readLock().unlock();
}

7.1.1. 映射(Map)的元素淘汰(Eviction),本地缓存(LocalCache)和数据分片(Sharding)

Redisson提供了一系列的映射类型的数据结构,这些结构按特性主要分为三大类:

  • 元素淘汰(Eviction) 类 -- 带有元素淘汰(Eviction)机制的映射类允许针对一个映射中每个元素单独设定 有效时间最长闲置时间
  • 本地缓存(LocalCache) 类 -- 本地缓存(Local Cache)也叫就近缓存(Near Cache)。这类映射的使用主要用于在特定的场景下,映射缓存(MapCache)上的高度频繁的读取操作,使网络通信都被视为瓶颈的情况。Redisson与Redis通信的同时,还将部分数据保存在本地内存里。这样的设计的好处是它能将读取速度提高最多 45倍 。 所有同名的本地缓存共用一个订阅发布话题,所有更新和过期消息都将通过该话题共享。
  • 数据分片(Sharding) 类 -- 数据分片(Sharding)类仅适用于Redis集群环境下,因此带有数据分片(Sharding)功能的映射也叫集群分布式映射。它利用分库的原理,将单一一个映射结构切分为若干个小的映射,并均匀的分布在集群中的各个槽里。这样的设计能使一个单一映射结构突破Redis自身的容量限制,让其容量随集群的扩大而增长。在扩容的同时,还能够使读写性能和元素淘汰处理能力随之成线性增长。

以下列表是Redisson提供的所有映射的名称及其特性:

接口名称
中文名称

RedissonClient
对应的构造方法

本地缓存功能
Local Cache

数据分片功能
Sharding

元素淘汰功能
Eviction

RMap
映射

getMap()

No

No

No

RMapCache
映射缓存

getMapCache()

No

No

Yes

RLocalCachedMap
本地缓存映射

getLocalCachedMap()

Yes

No

No

RLocalCachedMap
Cache
本地缓存映射缓存
仅限于Redisson PRO版本

getLocalCachedMapCache()

Yes

No

Yes

RClusteredMap
集群分布式映射存
仅限于Redisson PRO版本

getClusteredMap()

No

Yes

No

RClusteredMapCache
集群分布式映射缓存存
仅限于Redisson PRO版本

getClusteredMapCache()

No

Yes

Yes

RClusteredLocal
CachedMap
集群分布式本地缓存映射存
仅限于Redisson PRO版本

getClusteredLocal
CachedMap()

Yes

Yes

No

RClusteredLocal
CachedMapCache
集群分布式本地缓存映射缓存存
仅限于Redisson PRO版本

getClusteredLocal
CachedMapCache()

Yes

Yes

Yes

除此以外,Redisson还提供了Spring Cache和JCache的实现。

元素淘汰功能(Eviction)

Redisson的分布式的RMapCache Java对象在基于RMap的前提下实现了针对单个元素的淘汰机制。同时仍然保留了元素的插入顺序。由于RMapCache是基于RMap实现的,使它同时继承了java.util.concurrent.ConcurrentMap接口和java.util.Map接口。Redisson提供的Spring Cache整合以及JCache正是基于这样的功能来实现的。

目前的Redis自身并不支持散列(Hash)当中的元素淘汰,因此所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理300个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到1小时之间。比如该次清理时删除了300条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。

RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// 有效时间 ttl = 10分钟
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES);
// 有效时间 ttl = 10分钟, 最长闲置时间 maxIdleTime = 10秒钟
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);
// 有效时间 = 3 秒钟
map.putIfAbsent("key2", new SomeObject(), 3, TimeUnit.SECONDS);
// 有效时间 ttl = 40秒钟, 最长闲置时间 maxIdleTime = 10秒钟
map.putIfAbsent("key2", new SomeObject(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);

本地缓存功能(Local Cache)

在特定的场景下,映射(Map)上的高度频繁的读取操作,使网络通信都被视为瓶颈时,使用Redisson提供的带有本地缓存功能的分布式本地缓存映射RLocalCachedMapJava对象会是一个很好的选择。它同时实现了java.util.concurrent.ConcurrentMap和java.util.Map两个接口。本地缓存功能充分的利用了JVM的自身内存空间,对部分常用的元素实行就地缓存,这样的设计让读取操作的性能较分布式映射相比提高最多 45倍 。以下配置参数可以用来创建这个实例:

LocalCachedMapOptions options = LocalCachedMapOptions.defaults()
      // 用于淘汰清除本地缓存内的元素
      // 共有以下几种选择:
      // LFU - 统计元素的使用频率,淘汰用得最少(最不常用)的。
      // LRU - 按元素使用时间排序比较,淘汰最早(最久远)的。
      // SOFT - 元素用Java的WeakReference来保存,缓存元素通过GC过程清除。
      // WEAK - 元素用Java的SoftReference来保存, 缓存元素通过GC过程清除。
      // NONE - 永不淘汰清除缓存元素。
     .evictionPolicy(EvictionPolicy.NONE)
     // 如果缓存容量值为0表示不限制本地缓存容量大小
     .cacheSize(1000)
      // 以下选项适用于断线原因造成了未收到本地缓存更新消息的情况。
      // 断线重连的策略有以下几种:
      // CLEAR - 如果断线一段时间以后则在重新建立连接以后清空本地缓存
      // LOAD - 在服务端保存一份10分钟的作废日志
      //        如果10分钟内重新建立连接,则按照作废日志内的记录清空本地缓存的元素
      //        如果断线时间超过了这个时间,则将清空本地缓存中所有的内容
      // NONE - 默认值。断线重连时不做处理。
     .reconnectionStrategy(ReconnectionStrategy.NONE)
      // 以下选项适用于不同本地缓存之间相互保持同步的情况
      // 缓存同步策略有以下几种:
      // INVALIDATE - 默认值。当本地缓存映射的某条元素发生变动时,同时驱逐所有相同本地缓存映射内的该元素
      // UPDATE - 当本地缓存映射的某条元素发生变动时,同时更新所有相同本地缓存映射内的该元素
      // NONE - 不做任何同步处理
     .syncStrategy(SyncStrategy.INVALIDATE)
      // 每个Map本地缓存里元素的有效时间,默认毫秒为单位
     .timeToLive(10000)
      // 或者
     .timeToLive(10, TimeUnit.SECONDS)
      // 每个Map本地缓存里元素的最长闲置时间,默认毫秒为单位
     .maxIdle(10000)
      // 或者
     .maxIdle(10, TimeUnit.SECONDS);
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", options);
String prevObject = map.put("123", 1);
String currentObject = map.putIfAbsent("323", 2);
String obj = map.remove("123");
// 在不需要旧值的情况下可以使用fast为前缀的类似方法
map.fastPut("a", 1);
map.fastPutIfAbsent("d", 32);
map.fastRemove("b");
RFuture<String> putAsyncFuture = map.putAsync("321");
RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");
map.fastPutAsync("321", new SomeObject());
map.fastRemoveAsync("321");

当不再使用Map本地缓存对象的时候应该手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。

RLocalCachedMap<String, Integer> map = ...
map.destroy();

如何通过加载数据的方式来降低过期淘汰事件发布信息对网络的影响

代码范例:

public void loadData(String cacheName, Map<String, String> data) {
    RLocalCachedMap<String, String> clearMap = redisson.getLocalCachedMap(cacheName, 
            LocalCachedMapOptions.defaults().cacheSize(1).syncStrategy(SyncStrategy.INVALIDATE));
    RLocalCachedMap<String, String> loadMap = redisson.getLocalCachedMap(cacheName, 
            LocalCachedMapOptions.defaults().cacheSize(1).syncStrategy(SyncStrategy.NONE));
    loadMap.putAll(data);
    clearMap.clearLocalCache();
}

数据分片功能(Sharding)

Map数据分片是Redis集群模式下的一个功能。Redisson提供的分布式集群映射RClusteredMap Java对象也是基于RMap实现的。它同时实现了java.util.concurrent.ConcurrentMap和java.util.Map两个接口。在这里可以获取更多的内部信息。

RClusteredMap<String, SomeObject> map = redisson.getClusteredMap("anyMap");
SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");
map.fastPut("321", new SomeObject());
map.fastRemove("321");

7.1.2. 映射持久化方式(缓存策略)

Redisson供了将映射中的数据持久化到外部储存服务的功能。主要场景有一下几种:

  1. 将Redisson的分布式映射类型作为业务和外部储存媒介之间的缓存。
  2. 或是用来增加Redisson映射类型中数据的持久性,或是用来增加已被驱逐的数据的寿命。
  3. 或是用来缓存数据库,Web服务或其他数据源的数据。

Read-through策略

通俗的讲,如果一个被请求的数据不存在于Redisson的映射中的时候,Redisson将通过预先配置好的MapLoader对象加载数据。

Write-through(数据同步写入)策略

在遇到映射中某条数据被更改时,Redisson会首先通过预先配置好的MapWriter对象写入到外部储存系统,然后再更新Redis内的数据。

Write-behind(数据异步写入)策略

对映射的数据的更改会首先写入到Redis,然后再使用异步的方式,通过MapWriter对象写入到外部储存系统。在并发环境下可以通过writeBehindThreads参数来控制写入线程的数量,已达到对外部储存系统写入并发量的控制。

以上策略适用于所有实现了RMap、RMapCache、RLocalCachedMap和RLocalCachedMapCache接口的对象。

配置范例:

MapOptions<K, V> options = MapOptions.<K, V>defaults()
                              .writer(myWriter)
                              .loader(myLoader);
RMap<K, V> map = redisson.getMap("test", options);
// 或
RMapCache<K, V> map = redisson.getMapCache("test", options);
// 或
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
// 或
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options);

7.1.3. 映射监听器(Map Listener)

Redisson为所有实现了RMapCache或RLocalCachedMapCache接口的对象提供了监听以下事件的监听器:

事件 | 监听器 元素 添加 事件 | org.redisson.api.map.event.EntryCreatedListener
元素
过期 事件 | org.redisson.api.map.event.EntryExpiredListener
元素
删除 事件 | org.redisson.api.map.event.EntryRemovedListener
元素
更新 事件 | org.redisson.api.map.event.EntryUpdatedListener

使用范例:

RMapCache<String, Integer> map = redisson.getMapCache("myMap");
// 或
RLocalCachedMapCache<String, Integer> map = redisson.getLocalCachedMapCache("myMap", options);
int updateListener = map.addListener(new EntryUpdatedListener<Integer, Integer>() {
     @Override
     public void onUpdated(EntryEvent<Integer, Integer> event) {
          event.getKey(); // 字段名
          event.getValue() // 新值
          event.getOldValue() // 旧值
          // ...
     }
});
int createListener = map.addListener(new EntryCreatedListener<Integer, Integer>() {
     @Override
     public void onCreated(EntryEvent<Integer, Integer> event) {
          event.getKey(); // 字段名
          event.getValue() // 值
          // ...
     }
});
int expireListener = map.addListener(new EntryExpiredListener<Integer, Integer>() {
     @Override
     public void onExpired(EntryEvent<Integer, Integer> event) {
          event.getKey(); // 字段名
          event.getValue() // 值
          // ...
     }
});
int removeListener = map.addListener(new EntryRemovedListener<Integer, Integer>() {
     @Override
     public void onRemoved(EntryEvent<Integer, Integer> event) {
          event.getKey(); // 字段名
          event.getValue() // 值
          // ...
     }
});
map.removeListener(updateListener);
map.removeListener(createListener);
map.removeListener(expireListener);
map.removeListener(removeListener);

7.1.4. LRU有界映射

Redisson提供了基于Redis的以LRU为驱逐策略的分布式LRU有界映射对象。顾名思义,分布式LRU有界映射允许通过对其中元素按使用时间排序处理的方式,主动移除超过规定容量限制的元素。

RMapCache<String, String> map = redisson.getMapCache("map");
// 尝试将该映射的最大容量限制设定为10
map.trySetMaxSize(10);
// 将该映射的最大容量限制设定或更改为10
map.setMaxSize(10);
map.put("1", "2");
map.put("3", "3", 1, TimeUnit.SECONDS);

7.2. 多值映射(Multimap)

基于Redis的Redisson的分布式RMultimap Java对象允许Map中的一个字段值包含多个元素。 字段总数受Redis限制,每个Multimap最多允许有4 294 967 295个不同字段。

7.2.1. 基于集(Set)的多值映射(Multimap)

基于Set的Multimap不允许一个字段值包含有重复的元素。

RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("myMultimap");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
Set<SimpleValue> allValues = map.get(new SimpleKey("0"));
List<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
Set<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);
Set<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));

7.2.2. 基于列表(List)的多值映射(Multimap)

基于List的Multimap在保持插入顺序的同时允许一个字段下包含重复的元素。

RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
List<SimpleValue> allValues = map.get(new SimpleKey("0"));
Collection<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
List<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);
List<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));

7.2.3. 多值映射(Multimap)淘汰机制(Eviction)

Multimap对象的淘汰机制是通过不同的接口来实现的。它们是RSetMultimapCache接口和RListMultimapCache接口,分别对应的是Set和List的Multimaps。

所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理100个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到2小时之间。比如该次清理时删除了100条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。

RSetMultimapCache的使用范例:

RSetMultimapCache<String, String> multimap = redisson.getSetMultimapCache("myMultimap");
multimap.put("1", "a");
multimap.put("1", "b");
multimap.put("1", "c");
multimap.put("2", "e");
multimap.put("2", "f");
multimap.expireKey("2", 10, TimeUnit.MINUTES);

7.3. 集(Set)

基于Redis的Redisson的分布式Set结构的RSet Java对象实现了java.util.Set接口。通过元素的相互状态比较保证了每个元素的唯一性。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。

RSet<SomeObject> set = redisson.getSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());

Redisson PRO版本中的Set对象还可以在Redis集群环境下支持单集合数据分片。

7.3.1. 集(Set)淘汰机制(Eviction)

基于Redis的Redisson的分布式RSetCache Java对象在基于RSet的前提下实现了针对单个元素的淘汰机制。由于RSetCache是基于RSet实现的,使它还集成了java.util.Set接口。

目前的Redis自身并不支持Set当中的元素淘汰,因此所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理100个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到2小时之间。比如该次清理时删除了100条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。

RSetCache<SomeObject> set = redisson.getSetCache("anySet");
// ttl = 10 seconds
set.add(new SomeObject(), 10, TimeUnit.SECONDS);

7.3.2. 集(Set)数据分片(Sharding)

Set数据分片是Redis集群模式下的一个功能。Redisson提供的分布式RClusteredSet Java对象也是基于RSet实现的。在这里可以获取更多的信息。

RClusteredSet<SomeObject> set = redisson.getClusteredSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());

除了RClusteredSet以外,Redisson还提供了另一种集群模式下的分布式集(Set),它不仅提供了透明的数据分片功能,还为每个元素提供了淘汰机制。RClusteredSetCache 类分别同时提供了RClusteredSet 和RSetCache 这两个接口的实现。当然这些都是基于java.util.Set的接口实现上的。

该功能仅限于Redisson PRO版本。

7.4. 有序集(SortedSet)

基于Redis的Redisson的分布式RSortedSet Java对象实现了java.util.SortedSet接口。在保证元素唯一性的前提下,通过比较器(Comparator)接口实现了对元素的排序。

RSortedSet<Integer> set = redisson.getSortedSet("anySet");
set.trySetComparator(new MyComparator()); // 配置元素比较器
set.add(3);
set.add(1);
set.add(2);
set.removeAsync(0);
set.addAsync(5);

7.5. 计分排序集(ScoredSortedSet)

基于Redis的Redisson的分布式RScoredSortedSet Java对象是一个可以按插入时指定的元素评分排序的集合。它同时还保证了元素的唯一性。

RScoredSortedSet<SomeObject> set = redisson.getScoredSortedSet("simple");
set.add(0.13, new SomeObject(a, b));
set.addAsync(0.251, new SomeObject(c, d));
set.add(0.302, new SomeObject(g, d));
set.pollFirst();
set.pollLast();
int index = set.rank(new SomeObject(g, d)); // 获取元素在集合中的位置
Double score = set.getScore(new SomeObject(g, d)); // 获取元素的评分

7.6. 字典排序集(LexSortedSet)

基于Redis的Redisson的分布式RLexSortedSet Java对象在实现了java.util.Set<String>接口的同时,将其中的所有字符串元素按照字典顺序排列。它公式还保证了字符串元素的唯一性。

RLexSortedSet set = redisson.getLexSortedSet("simple");
set.add("d");
set.addAsync("e");
set.add("f");
set.lexRangeTail("d", false);
set.lexCountHead("e");
set.lexRange("d", true, "z", false);

7.7. 列表(List)

基于Redis的Redisson分布式列表(List)结构的RList Java对象在实现了java.util.List接口的同时,确保了元素插入时的顺序。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。

RList<SomeObject> list = redisson.getList("anyList");
list.add(new SomeObject());
list.get(0);
list.remove(new SomeObject());

7.8. 队列(Queue)

基于Redis的Redisson分布式无界队列(Queue)结构的RQueue Java对象实现了java.util.Queue接口。尽管RQueue对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。

RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
queue.add(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();

7.9. 双端队列(Deque)

基于Redis的Redisson分布式无界双端队列(Deque)结构的RDeque Java对象实现了java.util.Deque接口。尽管RDeque对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。

RDeque<SomeObject> queue = redisson.getDeque("anyDeque");
queue.addFirst(new SomeObject());
queue.addLast(new SomeObject());
SomeObject obj = queue.removeFirst();
SomeObject someObj = queue.removeLast();

7.10. 阻塞队列(Blocking Queue)

基于Redis的Redisson分布式无界阻塞队列(Blocking Queue)结构的RBlockingQueue Java对象实现了java.util.concurrent.BlockingQueue接口。尽管RBlockingQueue对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。

RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
queue.offer(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

poll, pollFromAny, pollLastAndOfferFirstTo和take方法内部采用话题订阅发布实现,在Redis节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。

7.11. 有界阻塞队列(Bounded Blocking Queue)

基于Redis的Redisson分布式有界阻塞队列(Bounded Blocking Queue)结构的RBoundedBlockingQueue Java对象实现了java.util.concurrent.BlockingQueue接口。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。队列的初始容量(边界)必须在使用前设定好。

RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
// 如果初始容量(边界)设定成功则返回`真(true)`,
// 如果初始容量(边界)已近存在则返回`假(false)`。
queue.trySetCapacity(2);
queue.offer(new SomeObject(1));
queue.offer(new SomeObject(2));
// 此时容量已满,下面代码将会被阻塞,直到有空闲为止。
queue.put(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

poll, pollFromAny, pollLastAndOfferFirstTo和take方法内部采用话题订阅发布实现,在Redis节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。

7.12. 阻塞双端队列(Blocking Deque)

基于Redis的Redisson分布式无界阻塞双端队列(Blocking Deque)结构的RBlockingDeque Java对象实现了java.util.concurrent.BlockingDeque接口。尽管RBlockingDeque对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。

RBlockingDeque<Integer> deque = redisson.getBlockingDeque("anyDeque");
deque.putFirst(1);
deque.putLast(2);
Integer firstValue = queue.takeFirst();
Integer lastValue = queue.takeLast();
Integer firstValue = queue.pollFirst(10, TimeUnit.MINUTES);
Integer lastValue = queue.pollLast(3, TimeUnit.MINUTES);

poll, pollFromAny, pollLastAndOfferFirstTo和take方法内部采用话题订阅发布实现,在Redis节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。

7.13. 阻塞公平队列(Blocking Fair Queue)

基于Redis的Redisson分布式无界阻塞公平队列(Blocking Fair Queue)结构的RBlockingFairQueue Java对象在实现Redisson分布式无界阻塞队列(Blocking Queue)结构RBlockingQueue接口的基础上,解决了多个队列消息的处理者在复杂的网络环境下,网络延时的影响使“较远”的客户端最终收到消息数量低于“较近”的客户端的问题。从而解决了这种现象引发的个别处理节点过载的情况。

以分布式无界阻塞队列为基础,采用公平获取消息的机制,不仅保证了poll、pollFromAny、pollLastAndOfferFirstTo和take方法获取消息的先入顺序,还能让队列里的消息被均匀的发布到处在复杂分布式环境中的各个处理节点里。

RBlockingFairQueue queue = redisson.getBlockingFairQueue("myQueue");
queue.offer(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

该功能仅限于Redisson PRO版本。

7.14. 阻塞公平双端队列(Blocking Fair Deque)

基于Redis的Redisson分布式无界阻塞公平双端队列(Blocking Fair Deque)结构的RBlockingFairDeque Java对象在实现Redisson分布式无界阻塞双端队列(Blocking Deque)结构RBlockingDeque接口的基础上,解决了多个队列消息的处理者在复杂的网络环境下,网络延时的影响使“较远”的客户端最终收到消息数量低于“较近”的客户端的问题。从而解决了这种现象引发的个别处理节点过载的情况。

以分布式无界阻塞双端队列为基础,采用公平获取消息的机制,不仅保证了poll、take、pollFirst、takeFirst、pollLast和takeLast方法获取消息的先入顺序,还能让队列里的消息被均匀的发布到处在复杂分布式环境中的各个处理节点里。

RBlockingFairDeque deque = redisson.getBlockingFairDeque("myDeque");
deque.offer(new SomeObject());
SomeObject firstElement = queue.peekFirst();
SomeObject firstElement = queue.pollFirst();
SomeObject firstElement = queue.pollFirst(10, TimeUnit.MINUTES);
SomeObject firstElement = queue.takeFirst();
SomeObject lastElement = queue.peekLast();
SomeObject lastElement = queue.pollLast();
SomeObject lastElement = queue.pollLast(10, TimeUnit.MINUTES);
SomeObject lastElement = queue.takeLast();

该功能仅限于Redisson PRO版本。

7.15. 延迟队列(Delayed Queue)

基于Redis的Redisson分布式延迟队列(Delayed Queue)结构的RDelayedQueue Java对象在实现了RQueue接口的基础上提供了向队列按要求延迟添加项目的功能。该功能可以用来实现消息传送延迟按几何增长或几何衰减的发送策略。

RQueue<String> distinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
// 10秒钟以后将消息发送到指定队列
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// 一分钟以后将消息发送到指定队列
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

在该对象不再需要的情况下,应该主动销毁。仅在相关的Redisson对象也需要关闭的时候可以不用主动销毁。

RDelayedQueue<String> delayedQueue = ...
delayedQueue.destroy();

7.16. 优先队列(Priority Queue)

基于Redis的Redisson分布式优先队列(Priority Queue)Java对象实现了java.util.Queue的接口。可以通过比较器(Comparator)接口来对元素排序。

RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.add(3);
queue.add(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.poll();

7.17. 优先双端队列(Priority Deque)

基于Redis的Redisson分布式优先双端队列(Priority Deque)Java对象实现了java.util.Deque的接口。可以通过比较器(Comparator)接口来对元素排序。

RPriorityDeque<Integer> queue = redisson.getPriorityDeque("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.addLast(3);
queue.addFirst(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.pollFirst();
queue.pollLast();

7.18. 优先阻塞队列(Priority Blocking Queue)

基于Redis的分布式无界优先阻塞队列(Priority Blocking Queue)Java对象的结构与java.util.concurrent.PriorityBlockingQueue类似。可以通过比较器(Comparator)接口来对元素排序。PriorityBlockingQueue的最大容量是4 294 967 295个元素。

RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.add(3);
queue.add(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.take();

当Redis服务端断线重连以后,或Redis服务端发生主从切换,并重新建立连接后,断线时正在使用poll,pollLastAndOfferFirstTo或take方法的对象Redisson将自动再次为其订阅相关的话题。

7.19. 优先阻塞双端队列(Priority Blocking Deque)

基于Redis的分布式无界优先阻塞双端队列(Priority Blocking Deque)Java对象实现了java.util.Deque的接口。addLast、 addFirst、push方法不能再这个对里使用。PriorityBlockingDeque的最大容量是4 294 967 295个元素。

RPriorityBlockingDeque<Integer> queue = redisson.getPriorityBlockingDeque("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.pollFirst();
queue.pollLast();
queue.takeFirst();
queue.takeLast();

当Redis服务端断线重连以后,或Redis服务端发生主从切换,并重新建立连接后,断线时正在使用poll,pollLastAndOfferFirstTo或take方法的对象Redisson将自动再次为其订阅相关的话题。



官网:Doker 多克; 官方旗舰店:首页-Doker 多克 多克创新科技企业店-淘宝网 技术人的数码品牌!!!全品优惠,期待您的支持!!!

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
6月前
|
存储 缓存 NoSQL
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
redis分布式锁、redisson、可重入、主从一致性、WatchDog、Redlock红锁、zookeeper;Redis集群、主从复制,全量同步、增量同步;哨兵,分片集群,Redis为什么这么快,I/O多路复用模型——用户空间和内核空间、阻塞IO、非阻塞IO、IO多路复用,Redis网络模型
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
|
16天前
|
供应链 NoSQL Java
关于Redisson分布式锁的用法
Redisson分布式锁是实现分布式系统中资源同步的有效工具。通过合理配置和使用Redisson的各种锁机制,可以确保系统的高可用性和数据一致性。本文详细介绍了Redisson分布式锁的配置、基本用法和高级用法,并提供了实际应用示例,希望对您在实际项目中使用Redisson分布式锁有所帮助。c
61 10
|
3月前
|
缓存 NoSQL Java
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
76 3
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
|
7月前
|
消息中间件 NoSQL Java
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
256 0
|
3月前
|
NoSQL Java Redis
开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!
本文详细介绍了 Redisson 延迟队列(DelayedQueue)的实现原理,包括基本使用、内部数据结构、基本流程、发送和获取延时消息以及初始化延时队列等内容。文章通过代码示例和流程图,逐步解析了延迟消息的发送、接收及处理机制,帮助读者深入了解 Redisson 延迟队列的工作原理。
|
4月前
|
缓存 NoSQL Java
谷粒商城笔记+踩坑(12)——缓存与分布式锁,Redisson+缓存数据一致性
缓存与分布式锁、Redisson分布式锁、缓存数据一致性【必须满足最终一致性】
174 14
谷粒商城笔记+踩坑(12)——缓存与分布式锁,Redisson+缓存数据一致性
|
6月前
|
负载均衡 NoSQL Java
|
3月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
30天前
|
存储 NoSQL Java
使用lock4j-redis-template-spring-boot-starter实现redis分布式锁
通过使用 `lock4j-redis-template-spring-boot-starter`,我们可以轻松实现 Redis 分布式锁,从而解决分布式系统中多个实例并发访问共享资源的问题。合理配置和使用分布式锁,可以有效提高系统的稳定性和数据的一致性。希望本文对你在实际项目中使用 Redis 分布式锁有所帮助。
98 5
|
2月前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
70 8