需求
产品中需要向不同的客户推送数据,原来的实现是每条数据产生后就立即向客户推送数据,走的的是HTTP协议。因为每条数据都比较小,而数据生成的频次也比较高,这就会频繁的建立HTTP连接,而且每次HTTP传输中携带的业务数据都很小,对网络的实际利用率不高。希望能够提高网络的利用率,并降低系统的负载。
分析
一个很自然的想法就是将多条数据一起发送,这里有几个关键点:
1、多条数据的聚合逻辑: 是攒够几条发送,还是按照时间周期发送。如果是攒够几条发送,在数据比较稀疏或者产生频率不那么稳定的时候,攒够需要的数据条数可能比较困难,这时候还得需要一个过期时间,因为客户可能接受不了太多的延迟。既然不管怎样都需要使用时间进行控制,我这里索性就选择按照时间周期发送了。思路是:自上次发送时间起,经过了某个时长之后,就发送客户在这段时间内产生的所有数据。
2、数据到期判断方法: 既然选择了按照时间周期发送,那么就必须有办法判断是否到了发送时间。一个很简单的想法就是轮询,把所有客户轮询一遍,看看谁的数据到期了,就发送谁的。这个算法的时间复杂度是O(N),如果客户比较多,就会消耗过多的时间在这上边。还有一个办法:如果客户按照时间排序好了,那么只需要取时间最早的客户的数据时间判断就好了,满足就发送,一直向后找,直到获取的客户数据时间不符合条件,则退出处理,然后等一会再进行判断处理。这就需要有一个支持排序的数据结构,写入数据时自动排序,这种数据结构的时间复杂度一般可以做到O(log(n))。对于这个数据结构的读写操作原理上就是队列的操作方式,只不过是个可排序的队列。
3、区分客户: 不同客户的数据接收地址不同,向具体某个客户发送数据时,应该能比较方便的聚合他的数据,最好是直接就能拿到需要发送的数据。可以使用字典数据结构来满足这个需求,取某个客户数据的时间复杂度可以降低到O(1)。
4、数据的安全性问题: 如果程序在数据发送成功之前退出了,未发送的数据怎么办?是还能继续发送,还是就丢掉不管了。如果要在程序重启后恢复未发送成功的数据,则必须将数据同步到别的地方,比如持久化到磁盘。因为我这里的数据安全性要求不高,丢失一些数据也是允许的,所以要发送的数据收到之后放到内存就行了。
实现
上文提到可排序的数据结构,可以使用SortedList<TKey,TValue>,键是时间,值是这个时间产生了数据的客户标识列表。不过它的读写操作不是线程安全的,需要自己做同步,这里简单点就使用lock了。
对于不同客户的数据,为了方便获取,使用Dictionary<TKey,TValue>来满足,键是客户的标识,值是累积的未发送客户数据。这个数据读写也不是线程安全的,可以和SortedList的读写放到同一个lock中。
下边是它们的定义:
SortedList<DateTime, List<TKey>> _queue = new SortedList<DateTime, List<TKey>>(); Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>(); readonly object _lock = new object();
插入数据的时候,需要先写入SortedList,然后再写入Dictionary。代码逻辑比较简单,请看:
public void Publish(TKey key, TValue value) { DateTime now = DateTime.Now; lock (_lock) { if (_queue.TryGetValue(now, out List<TKey>? keys)) { if (!keys!.Contains(key)) { keys.Add(key); } } else { _queue.Add(now, new List<TKey> { key }); } if (_data.TryGetValue(key, out List<TValue>? values)) { values.Add(value); } else { _data.Add(key, new List<TValue> { value }); } } }
对于消费数据,这里采用拉数据的模式。最开始写的方法逻辑是:读取一条数据,处理它,然后从队列中删除。但是这个逻辑需要对队列进行读写,所以必须加锁。一般处理数据比较耗时,比如这里要通过HTTP发送数据,加锁的话就可能导致写数据到队列时阻塞的时间比较长。所以这里实现的是把可以发送的数据全部提取出来,然后就释放锁,数据的处理放到锁的外部实现,这样队列的读写性能就比较好了。
public List<(TKey key, List<TValue> value)> Pull(int maxNumberOfMessages) { List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>(); DateTime now = DateTime.Now; lock (_lock) { int messageCount = 0; while (true) { if (!_queue.Any()) { break; } var first = _queue.First(); var diffMillseconds = now.Subtract(first.Key).TotalMilliseconds; if (diffMillseconds < _valueDequeueMillseconds) { break; } var keys = first.Value; foreach (var key in keys) { if (_data.TryGetValue(key, out List<TValue>? keyValues)) { result.Add((key, keyValues)); _data.Remove(key); messageCount += keyValues!.Count; } } _queue.RemoveAt(0); if (messageCount >= maxNumberOfMessages) { break; } } } return result; }
这段代码比较长一些,我梳理下逻辑:取队列的第一条数据,判断时间是否达到发送周期,未达到则直接退出,方法返回空列表。如果达到发送周期,则取出第一条数据中存储的客户标识,然后根据这些标识获取对应的客户未发送数据,将这些数据按照客户维度添加到返回列表中,将这些客户及其数据从队列中移除,返回有数据的列表。这里还增加了一个拉取数据的条数限制,方便根据业务实际情况进行控制。
再来看一下怎么使用这个队列,这里模拟多个生产者加一个消费者,其实可以任意多个生产者和消费者:
TimeSortedQueue<string, string> queue = new TimeSortedQueue<string, string>(3000); List<Task> publishTasks = new List<Task>(); for (int i = 0; i < 4; i++) { var j = i; publishTasks.Add(Task.Factory.StartNew(() => { int k = 0; while (true) { queue.Publish($"key_{k}", $"value_{j}_{k}"); Thread.Sleep(15); k++; } }, TaskCreationOptions.LongRunning)); } Task.Factory.StartNew(() => { while (true) { var list = queue.Pull(100); if (list.Count <= 0) { Thread.Sleep(100); continue; } foreach (var item in list) { Console.WriteLine($"{DateTime.Now.ToString("mmss.fff")}:{item.key}, {string.Join(",", item.value)}"); } } }, TaskCreationOptions.LongRunning); Task.WaitAll(publishTasks.ToArray());
以上就是针对这个特定需求实现的一个按照时间进行排序的队列。