业务背景:
项目需要接入全省的GPS数据(数据量从一开始的1000w+ 到 现在的 4000w+),原始数据存于Datahub中。
GPS数据的使用场景:
1.地图撒点(最新GPS点位)
2.统计车辆当日车辆和在线数
3.历史轨迹(按时间查询点位列表)
4.实时跟踪(后端主动推送实时点位信息)
5.报废车辆报警 (根据报废车辆信息生成告警)
6.围栏管控(根据GPS信息与围栏规则进行计算)
…………
业务分析:
GPS撒点:只取最新的GPS数据,查询频率较高,条件也较为复杂,因此原始数据尽量要少,能够直接满足我们查询需求。 针对业务场景,新建一张GPS_CURRENT表,表中只存每辆车的最新GPS点位,根据车辆ID新增/更新,同时根据车辆ID关联出区域,类型等信息,以满足我们撒点的查询需求。
历史轨迹: 由于每日的数据量在千万级,使用传统MYSQL数据库,存在极大的存储和查询压力,因此首先要考虑的是,我们的存储极限在哪里,与产品协商后,仅保留7天数据,而7天的数据也有2亿+的数据量,单表还是比较吃力的,因此考虑到作分区表,较为理想的情况是每个分区数据在千万级,5G以内; 轨迹的查询条件比较清晰,根据车牌ID + 时间范围作为查询条件, 因此分区条件必然在这两个字段中产生,由于我们需要定时按时间清理分区,使用时间作为分区字段,我们可以很方便的使用drop分区的方式达到清理历史数据的效果,而车辆ID作为条件则不能,因此选用时间作为分区字段。
因此我们需要两张表:
gps_current: 根据车辆ID进行插入或更新,用于撒点,在线统计等
gps_log:以时间为分区字段,用于轨迹查询
方案:
version.1
// datahub 协同消费代码RecordEntryrecord=consumer.read(maxRetry);
datahub协同
消费
数据过滤
insertOnDupilicate
gps-current
insert
gps-log
线程池处理其他
GPS计算
该方案是需求最平铺直叙的表达,很快就遇到了问题 —— 消费速度无法跟上生产速度。
version.2
在version.1的基础上,增加队列机制,使gps_log插入变为batchInsert, gps_current的insertOnDuplicate 变为 batchInsertOnDuplicate。
datahub协同
消费
数据过滤
线程池处理其他
存放到Queue
GPS计算
线程池异步批量
获取数据
batchinsertOnDupilicate
batchlnsert
gps-current
gps-log
在某一次版本后, 我们修改过滤条件, 由原先的只处理危货车 且 速度不为0的数据 变更为 处理危货/包车/客车 数据,且去掉速度的过滤条件后, 终于,在某天这套方案的消费能力也开始捉襟见肘。
version.3
重新梳理业务后,我们把业务划分为两类: 需要较高的实时性的 和 能够接收一定延时的
高实时性 |
低实时性 |
GPS撒点 |
轨迹 |
在线数 |
围栏计算 |
实时跟踪 |
报废车辆告警 |
因此我们先将gps_current相关的消费服务独立消费并拆分为单独的服务部署,在拆分独立服务后,gps_current相关的消费能力于原先等到了数十倍的提升。
datahub协同
datahub协同
datahub协同
消费
消灵
消寒
数据过滤
数据过滤
数据过滤
钱程池socket推
按车牌IDhash到
线程池处理其他
按车牌IDhash到
GPS计算
送实时数据
不同Queue
不同Queue
线程池异步按根
线程池异步按根
据shardld批量获
据shardld批量获
取数据
取数据
batchlnsert
batchinsertOnDupilicate
gps-log
gps-current
单独服务
compute服务
原先我们gps消费是放在compute服务, 这个服务中有大量的datahub消费服务和计算服务,且存在一定线程池的滥用,通过arms监控可看到单个服务存在500+线程,大量线程处理等待状态,导致该服务内的线程效率较差,重构后解决了一部分问题。 因此最终我们拆分为3个订阅,由于sockeServer是集成在compute服务(原先是单独的服务,通过RPC调用,由于推送都是由compute服务发起的,且日均百万次调用,考虑IO成本因此将两服务合并)。
version.4
其实在version.3版本,服务的消费性能已经能满足我们在相当长的时间内的性能需求了,但是存在一个不得不解决的问题 -> GPS时间上是乱序的, 即存在同一辆车,时间更早的GPS点反而更晚写入datahub, 而我们是直接更新gps_curret表的,导致存在最新点位时间向下更新的情况,因此需要加时间的判断条件。
第一直觉上,通过mysql判断肯定是不可取的,车辆最新的点位信息扔到redis,通过redis判断就能满足要求; 而实际实施过程发现,直接使用redis是消费速度是无法满足要求的,即使使用pipeline作批处理,与上一版本也存在较大的性能差。最后的方案是使用java内部的Map来做判断,该方案与version.3消费性能略有下降,但仍是当前的生产速度5-8倍,能够满足性能上的要求。
总结:
1.使用批量处理
2.更快的IO速度 数据库 -> redis -> java内存
3.取舍和拆分
实际上我们对于gps的处理,是不止上面4种方案的演进的;还有一些细节问题,在这里简单介绍下:
1.多节点datahub协同消费,但实际只有一个节点处于忙碌状态,其他节点都处于空闲状态, 原因是datahub协同消费时,是根据shard进行负载均衡的,即同一shard,只能分配到一个consumer上;当服务中consumer(多线程模拟多consumer) 数 > shard数时,由于节点发布有先后顺序,后续启动的节点几乎是只能“干瞪眼”了。这里需要说明两点 (1)当consumer数 < shard数 时,一个consumer会被分配多个shard,所以为了保证多节点能够正确的协同消费,尽量使单服务的consumer数 < shard数(2)以上结论只基于当前datahub版本
2.在多线程batchInsertOnDuplicate时,发生了数据库死锁问题(具体原因就不分析了,篇幅较长,有兴趣的可以去找相关材料阅读), 我们的处理方式是,某一线程保证每个线程只拿到; 因此,对队列作了一层包装,内部有多个队列组成,根据车牌ID hash到不同队列, 消费队列时,要带上shard参数(即每个shard对应的是一个实际的队列),确保线程shard不会重复且与队列数量对应即可。
3.在version.4中,多线程下使用hashMap会存在线程安全问题,而currentHashMap性能上又低于hashMap, 我们采用的方案是针对每个线程使用一个HashMap (同时用多队列保证某一俩车只会被同一线程消费)