万物皆可排序的队列
我们很容易想到,既然可以按照时间排序,那么按照别的数据类型排序也是可以的。这个数据结构可以应用的场景很多,比如按照权重排序的队列、按照优先级排序的队列、按照年龄排序的队列、按照银行存款排序的队列,等等。这就是一个万物皆可排序的队列。
我这里把主要代码贴出来(完整代码和示例请看文末):
public class SortedQueue<TSortKey, TKey, TValue> where TSortKey : notnull, IComparable where TKey : notnull where TValue : notnull { Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>(); SortedList<TSortKey, List<TKey>> _queue = new SortedList<TSortKey, List<TKey>>(); readonly object _lock = new object(); /// <summary> /// Create a new instance of SortedQueue /// </summary> public SortedQueue(int maxNumberOfMessageConsumedOnce) { } /// <summary> /// Publish a message to queue /// </summary> /// <param name="sortKey">The key in the queue for sorting. Different messages can use the same key.</param> /// <param name="key">The message key.</param> /// <param name="value">The message value.</param> public void Publish(TSortKey sortKey, TKey key, TValue value) { lock (_lock) { if (_queue.TryGetValue(sortKey, out List<TKey>? keys)) { keys.Add(key); } else { _queue.Add(sortKey, new List<TKey> { key }); } if (_data.TryGetValue(key, out List<TValue>? values)) { values.Add(value); } else { _data.Add(key, new List<TValue> { value }); } } } /// <summary> /// Pull a batch of messages. /// </summary> /// <param name="maxNumberOfMessages">The maximum number of pull messages.</param> /// <returns></returns> public List<(TKey Key, List<TValue> Value)> Pull(int maxNumberOfMessages) { List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>(); lock (_lock) { int messageCount = 0; while (true) { if (!_queue.Any()) { break; } var keys = _queue.First().Value; foreach (var key in keys) { if (_data.TryGetValue(key, out List<TValue>? keyValues)) { result.Add((key, keyValues)); _data.Remove(key); messageCount += keyValues!.Count; } } _queue.RemoveAt(0); if (messageCount >= maxNumberOfMessages) { break; } } } return result; } }
代码逻辑还是比较简单的,就不罗嗦了,如有问题欢迎留言交流。
再说数据安全
因为在这个实现中所有待处理的数据都在内存中,丢失数据会带来一定的风险,因为我这个程序前边还有一个队列,即使程序崩溃了,也只损失没处理的一小部分数据,业务上可以接受,所以这样做没有问题。如果你对这个程序感兴趣,需要慎重考虑你的应用场景。
来看看数据丢失可能发生的两种情况:
一是数据还在队列中时程序重启了:对于这种情况,前文提到将数据同步到其它地方,比如写入Redis、写入数据库、写入磁盘等等。不过因为网络IO、磁盘IO较慢,这往往会带来吞吐量的大幅下降,想要保证一定的吞吐量,还得引入一些分片机制,又因为分布式的不可靠,可能还得增加一些容错容灾机制,比较复杂,可以参考Kafka。
二是数据处理的时候失败了:对于这种情况,可以让程序重试;但是如果异常导致程序崩溃了,数据已经从内存或者其它存储中移除了,数据还是会发生丢失。这时候可以采用一个ACK机制,处理成功后向队列发送一个ACK,携带已经处理的数据标识,队列根据标识删除数据。否则消费者还能消费到这些数据。
这些问题并不一定要完全解决,还是得看业务场景,有可能你把数据持久化到Redis就够了,或者你也不用引入ACK机制,记录下处理到哪一条了就行了。
以上就是本文的主要内容了,完整代码和示例请访问Github:github.com/bosima/dotn…