分布式实时数据处理实战:从选型、应用到优化

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介:

本文根据DBAplus社群第94期线上分享整理而成。

 

讲师介绍
 
 

卢誉声

Autodesk资深系统研发工程师

 

  • 《分布式实时处理系统:原理、架构与实现》作者。

  • Hurricane实时处理系统主要贡献者。

  • 多部C++领域译作。

 

分享大纲:

 

1. 海量数据处理的挑战

2. 基础处理架构选型

3. 分布式系统结构设计

4. 性能调优和数据存储(MongoDB)

 

一、海量数据处理的挑战

 

 

随着互联网与计算机的普及,我们可以通过传统途径或互联网收集到大量的数据,而在日常工作中对这么大量的数据处理需求也与日俱增。日常遇到的数据种类非常多,从结构化的表格数据、到半结构化非结构化的文本图像,我们需要掌握更多的技能与工具来学会如何处理这些数据。尤其在机器学习越来越热的今天,更加有必要学会这个技术。

 

近两年最火的恐怕就是深度学习,而深度学习又非常依赖数据量,很多时候不管网络再怎么精心设计,再怎么使用技巧,也不如数据量来得实在。比如在我们这里,就经常需要为此处理大量的文本和图像数据。但在这个过程中,我们发现总是在做很多重复的工作。

 

总结一下,日常的工作模式抽象出来基本就是这么几件事:

 

  1. 将需要处理的数据输出到一个列表文件(或者存到数据库里),每一项就是一个任务。

  2. 处理程序中开启多个Worker线程,并为每个线程分配任务,线程执行自己的任务,并将结果输出出来。

  3. 处理程序还需要记录处理了哪些数据,哪些是成功的,哪些是异常的。

  4. 需要将这么多个处理程序连接在一起完成数据处理任务。

 

二、基础处理架构选型

 

 

日常工作模式:

 

  • 为需要处理的数据建立列表

  • 启动程序,开启多个Worker线程处理列表中的数据

  • 将处理完的项目输出到另一个列表中

  • 启动下一个程序,继续开启多个Worker线程处理列表中的数据

  • ……

 

可以发现,这个需求其实就是一个简单的生产者-消费者模式。我们其实是在建立一个任务队列,然后让Worker来取任务并执行任务。为了简化这项工作,我自己写了一个简单的消息队列以及生产者消费者的抽象,让程序专注于数据处理的逻辑。

 

用户只需要建立一个MessageQueue(消息队列),一个Feeder(消息源),一个Consumer(消息处理单元),并且实现Feeder和Consumer的具体逻辑(可以使用函数对象或者lambda表达式)。这样就可以简化日常的任务,但是经过长时间的工作后,发现这样还是远远不够,还需要经常处理以下问题:

 

  • 如何分配任务?

  • 任务失败了怎么办?

  • 如何保存任务状态?

  • 如何分布式计算?

 

我们来分别看一看:

 

1、如何分配任务?一开始我们采取的是按序号分配任务,每个任务执行连续一批任务。后来发现这样会遇到很多问题,不如使用生产者消费者模式让Worker自己领取任务。但由于缺乏统一的调度者,因此无法确保整体具有最高的计算效率。

 

2、如何处理任务失败?我们一开始的方法是将成功任务和失败任务分别放到两个独立列表里,每次一个任务结束后都要重新处理失败的任务,有非常多手动工作。

 

3、如何保存任务状态?程序常常会因为各种原因在一半中断(未完全测试的程序可能会内存泄漏、内存越界,即使程序没有问题,也可能发生进程误杀甚至是断电等狗血的事情),因此我们需要保存任务状态,下次启动程序的时候可以自动跳过已经成功处理过的任务。

 

4、如何分布式计算?当数据过多时,需要手动分割数据放在几个机器上执行,部署和手动管理成本很高。

 

后来我们发现Apache Storm的数据处理方式很适合解决这些问题。但是非常可惜,一方面出于性能考虑,另一方面为了更加容易地调用本地C++程序,这种基于Java的方式并不是那么方便,每次还需要编写JNI来接入我们的C++代码。

 

于是,我们需要自己建立一套系统来解决这个问题。这套系统中包含这些东西:

  1. 使用NodeJS编写的网络爬虫,因为NodeJS单线程异步非阻塞,简化了高性能爬虫的编写工作。

  2. 使用MongoDB存储数据,因为MongoDB是文档型数据库,而且可以无模式,处理图像和网页数据的时候非常方便。

  3. 使用Caffe来进行训练和数据处理,由于我们的机器并不是特别多,这种情况下Caffe可以提供比Tensorflow更好的性能。

  4. Hurricane实时处理系统( http://github.com/samblg/hurricanehttp://hurricane-project.net),是Storm的计算模型在C++11中的实现,不过做了部分简化和调整,以适应我们自己的工作。

 

三、分布式系统结构设计

 

 

这里面的关键就是Hurricane这个系统:

 

 

这张图就是Hurricane的计算模型,Hurricane实时处理系统是一个基于流的分布式实时处理平台,其计算模型是Topology。每个Topology都是一个网络,该网络由计算任务和任务之间的数据流组成。

 

该模型中Spout负责产生新的元组,Bolt负责处理前一级任务传递的元组,并将处理过的元组发送给下一级。Spout是元组的生成器,而Bolt则是元组的处理单元。每个任务都会将数据封装为元组传递给其他的任务。

 

在系统中任务被定义为Task。Task是对计算任务的统一抽象,规定了计算任务的统一接口。Spout和Bolt都是Task的特殊实现。为了处理这种分布式的计算模型,我们设计了自己的分布式系统架构,如下图所示:

 

 

最上方的是President,这是整个集群的管理者,负责存储集群的所有元数据,所有Manager都需要与之通信并受其控制。下方的是多个Manager,每个Manager中会包含多个Executor,每个Executor会执行一个任务,可能为Spout和Bolt。

 

从任务的抽象角度来讲,每个Executor之间都会相互传递数据,只不过都需要通过Manager完成数据的传递,Manager会帮助Executor将数据以元组的形式传递给其他的Executor。

 

Manager之间可以自己传递数据(如果分组策略是确定的),有些情况下还需要通过President来得知自己应该将数据发送到哪个节点中。

 

在这个基础架构与计算模型之上,我们还设计了一套上层接口Squared:

 

 

左侧是Hurricane基本的计算模型,在该计算模型中,系统是一个计算任务组成的网络。我们需要考虑每个节点的琐屑实现。但如果在日常任务中,使用这种模型相对来说会显得比较复杂,尤其当网络非常复杂的时候。

 

为了解决这个问题,看一下右边这个计算模型,这是对我们完成计算任务的再次抽象。

 

第一步是产生语句的数据源。然后每条语句需要使用名为SplitSentence的函数处理将句子划分为单词。接下来根据单词分组,使用CountWord这个统计操作完成单词的计数。

 

所以这个接口的本质是将网络映射成了简单的数据操作流程。解决问题和讨论问题都会变得更为简单直观,现在我们来看看Hurricane的实际应用。

 

四、性能调优和数据存储

 

 

 

这是一个数据的预处理任务,我们需要从网络上搜索一堆图片,然后对图片做初步处理(部分检测任务),处理完成后将数据保存在数据库中,作为日后的训练数据使用。

 

使用Hurricane后这一切都变得非常简单。我们使用一个Spout读取数据库中的任务,每一个任务是一个词条,第一任务需要使用搜索引擎检索这些词条对应的图像URL。

 

这个爬取工作会通过简单的消息队列传给NodeJS,由NodeJS爬取并解析完网页,抽取URL将结果返回给Spout。然后将图像URL保存到数据库中,并传递给下一个任务。

 

下一个任务会调度NodeJS将一批图像都爬取并保存下来,这里大家也可以自己使用C++编写获取数据与解析数据的程序,只不过使用JS爬取数据和解析网页比较方便,因此我们把这个任务交给JS完成了。

 

完成任务后将图像数据传递给检测器A,检测器A完成检测后将结果和图像送给检测器B,检测器B完成最后检测任务并将数据保存在数据中。最后处理完成的数据和图像经过人工整理后将会作为日后训练数据和测试数据的来源。

 

最后就是系统的优化问题了。

 

这里很多是实际工程问题,比如在存储大量数据时,由于MongoDB自身支持分布式存储,所以处理起来非常方便。我们只需要设定副本集,然后指定分片的字段就可以建立一个分布式集群,这里比较讲究的就是要根据实际情况选择分片字段。

 

和传统开源的MySQL方案相比还是比较简单的,唯一不足就是MongoDB出现过宕机无法恢复的情况,所以日常额外的数据备份工作一定要进行。MongoDB不但自身支持分布式(副本和自动分片),而且还是本人使用过的检索功能最强大的NoSQL数据库之一,日常的许多业务任务都可以使用MongoDB处理。

 

日常使用NodeJS配合MongoDB可以快速构建足够健壮的脚本与小型服务,MongoDB也支持对单个文档的原子查找更新,合理设计后可以解决很多问题。

 

比如充当简单的任务队列,同时MongoDB中也可以建立全文索引,虽然没有ElasticSearch那么强大,但是已经可以满足简单的需求。最大的优点体现在处理半结构化数据、或者数据模型不确定的时候,比起需要反复修改表结构的关系型数据库来说,MongoDB实在是方便。

 

当然MongoDB也存在很多问题:(抛砖引玉,个人感受,如有不当,望大家指正)

 

  1. 统计功能不够强大,虽然有aggregate等功能,但比起关系型数据库来说确实羸弱。

  2. 无法实现连表查询,所以在设计数据模型时会和关系型数据库方式不同,也无法完全替代关系型数据库。

  3. 不支持事务,虽然MongoDB支持单文档的原子操作,但是无法支持包含多个操作的事务,必须要自己处理这些问题,因此很多有事务要求的系统来说不一定适用。

 

当然这些只是我在日常处理管理数据中的感受,也恰恰可以适应我们的工作。因为现在数据形式多种多样,需求也多种多样。只不过在我们日常的数据处理过程中,Hurricane配合MongoDB等工具可以更好地流式处理半结构化与非结构化数据。

 

最后,一些其他特性:

 

  • 保序

    1)根据顺序处理数据

    2)使用Orderld和队列实现保序

  • 多语言支持

    1)C

    2)Java

    3)Python

    4)JavaScript

     

Q&A
 
 

Q1:Hurricane系统开源吗?

A1:hurricane real-time processing在Apache协议下开源,可以访问

http://github.com/samblg/hurricane欢迎想了解更多内容和感兴趣的同学参与进来。

 

Q2:刚刚大神提到的mongo统计功能的aggregate,我们目前就遇到这问题,数据量并不大,十万左右的数据吧,现在一个统计查询要一秒多这个时间挺吓人的,有没有优化的办法?

A2:aggregate并不是mongo的强项。在编写aggregate语句的时候有许多要注意的,比如对设计到的字段尽可能建立索引,$match或者$sort之类的操作尽量放在整个操作流水线的前面。提前用$match过滤数据,减少后面数据的计算量,排序操作尽量在使用索引的字段上进行等等,如果MongoDB本身优化问题无法解决,那就只能将计算压力放在应用服务器上。尽量少地将数据分片取出到不同的应用服务器上,通过Hurricane这种实时分布式处理系统来完成统计工作,就能很好的解决这类问题---> Hurricane实时处理系统完全开源,不依赖任何第三方库,易于维护和2次开发,相较其他系统,Hurricane 十分轻量级,可维护性高。

原文发布时间为:2017-03-07

本文来自云栖社区合作伙伴DBAplus

 

相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。   相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
相关文章
|
5天前
|
数据管理 API 调度
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
HarmonyOS Next 是华为新一代操作系统,专注于分布式技术的深度应用与生态融合。本文通过技术特点、应用场景及实战案例,全面解析其核心技术架构与开发流程。重点介绍分布式软总线2.0、数据管理、任务调度等升级特性,并提供基于 ArkTS 的原生开发支持。通过开发跨设备协同音乐播放应用,展示分布式能力的实际应用,涵盖项目配置、主界面设计、分布式服务实现及部署调试步骤。此外,深入分析分布式数据同步原理、任务调度优化及常见问题解决方案,帮助开发者掌握 HarmonyOS Next 的核心技术和实战技巧。
122 76
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
|
5天前
|
物联网 调度 vr&ar
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
鸿蒙技术分享:HarmonyOS Next 深度解析 随着万物互联时代的到来,华为发布的 HarmonyOS Next 在技术架构和生态体验上实现了重大升级。本文从技术架构、生态优势和开发实践三方面深入探讨其特点,并通过跨设备笔记应用实战案例,展示其强大的分布式能力和多设备协作功能。核心亮点包括新一代微内核架构、统一开发语言 ArkTS 和多模态交互支持。开发者可借助 DevEco Studio 4.0 快速上手,体验高效、灵活的开发过程。 239个字符
150 13
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
|
12天前
|
NoSQL Java Redis
秒杀抢购场景下实战JVM级别锁与分布式锁
在电商系统中,秒杀抢购活动是一种常见的营销手段。它通过设定极低的价格和有限的商品数量,吸引大量用户在特定时间点抢购,从而迅速增加销量、提升品牌曝光度和用户活跃度。然而,这种活动也对系统的性能和稳定性提出了极高的要求。特别是在秒杀开始的瞬间,系统需要处理海量的并发请求,同时确保数据的准确性和一致性。 为了解决这些问题,系统开发者们引入了锁机制。锁机制是一种用于控制对共享资源的并发访问的技术,它能够确保在同一时间只有一个进程或线程能够操作某个资源,从而避免数据不一致或冲突。在秒杀抢购场景下,锁机制显得尤为重要,它能够保证商品库存的扣减操作是原子性的,避免出现超卖或数据不一致的情况。
44 10
|
22天前
|
机器学习/深度学习 存储 运维
分布式机器学习系统:设计原理、优化策略与实践经验
本文详细探讨了分布式机器学习系统的发展现状与挑战,重点分析了数据并行、模型并行等核心训练范式,以及参数服务器、优化器等关键组件的设计与实现。文章还深入讨论了混合精度训练、梯度累积、ZeRO优化器等高级特性,旨在提供一套全面的技术解决方案,以应对超大规模模型训练中的计算、存储及通信挑战。
55 4
|
1月前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
43 5
|
2月前
|
算法
基于粒子群算法的分布式电源配电网重构优化matlab仿真
本研究利用粒子群算法(PSO)优化分布式电源配电网重构,通过Matlab仿真验证优化效果,对比重构前后的节点电压、网损、负荷均衡度、电压偏离及线路传输功率,并记录开关状态变化。PSO算法通过迭代更新粒子位置寻找最优解,旨在最小化网络损耗并提升供电可靠性。仿真结果显示优化后各项指标均有显著改善。
|
2月前
|
人工智能 文字识别 Java
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
尼恩,一位拥有20年架构经验的老架构师,通过其深厚的架构功力,成功指导了一位9年经验的网易工程师转型为大模型架构师,薪资逆涨50%,年薪近80W。尼恩的指导不仅帮助这位工程师在一年内成为大模型架构师,还让他管理起了10人团队,产品成功应用于多家大中型企业。尼恩因此决定编写《LLM大模型学习圣经》系列,帮助更多人掌握大模型架构,实现职业跃迁。该系列包括《从0到1吃透Transformer技术底座》、《从0到1精通RAG架构》等,旨在系统化、体系化地讲解大模型技术,助力读者实现“offer直提”。此外,尼恩还分享了多个技术圣经,如《NIO圣经》、《Docker圣经》等,帮助读者深入理解核心技术。
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
|
2月前
|
NoSQL Java Redis
开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!
本文详细介绍了 Redisson 延迟队列(DelayedQueue)的实现原理,包括基本使用、内部数据结构、基本流程、发送和获取延时消息以及初始化延时队列等内容。文章通过代码示例和流程图,逐步解析了延迟消息的发送、接收及处理机制,帮助读者深入了解 Redisson 延迟队列的工作原理。
|
3月前
|
存储 NoSQL Java
分布式session-SpringSession的应用
Spring Session 提供了一种创建和管理 Servlet HttpSession 的方案,默认使用外置 Redis 存储 Session 数据,解决了 Session 共享问题。其特性包括:API 及实现用于管理用户会话、以应用容器中性方式替换 HttpSession、简化集群会话支持、管理单个浏览器实例中的多个用户会话以及通过 headers 提供会话 ID 以使用 RESTful API。Spring Session 通过 SessionRepositoryFilter 实现,拦截请求并转换 request 和 response 对象,从而实现 Session 的创建与管理。
分布式session-SpringSession的应用
|
2月前
|
存储 缓存 数据处理
深度解析:Hologres分布式存储引擎设计原理及其优化策略
【10月更文挑战第9天】在大数据时代,数据的规模和复杂性不断增加,这对数据库系统提出了更高的要求。传统的单机数据库难以应对海量数据处理的需求,而分布式数据库通过水平扩展提供了更好的解决方案。阿里云推出的Hologres是一个实时交互式分析服务,它结合了OLAP(在线分析处理)与OLTP(在线事务处理)的优势,能够在大规模数据集上提供低延迟的数据查询能力。本文将深入探讨Hologres分布式存储引擎的设计原理,并介绍一些关键的优化策略。
149 0

热门文章

最新文章