数据采集
从本章开始,我们将逐一讨论实时流计算系统各方面的内容。
为了更加方便和清楚地阐述问题,本文将以互联网金融风控为场景,构建一个实时流计算风控系统。
虽然是以互联网金融风控为场景,但大多数情形下实时流计算系统在架构上是大同小异的,或者具有异曲同工之妙。
所以,本文在互联网金融风控场景下讨论的有关实时流计算系统的各种概念、问题和解决方法也能推广应用到其他使用场景。
常言道“巧妇难为无米之炊”,没有数据,我们就没有了讨论的基础。大多数情况下,数据采集是我们构建实时流计算系统的起点,所以本书将首先从数据采集讲起。事实上,我们不能小瞧数据采集的过程。数据采集通常涉及对外提供服务,涵盖许多I/O、网络、异步和并发的技术,在性能、可靠和安全等方面都不容大意。
本章将讨论实时流计算系统的数据采集部分,不过我们会将重心放在讲解有关BIO和NIO、同步和异步、异步和流之间的关联关系等内容。这些内容不仅有助于我们在实际生产中构建高性能数据采集服务器,而且有助于我们加深对异步和高并发编程的理解,并为后续章节对“流”的讨论和理解打下坚实基础。
2.1 设计数据采集的接口
在本文中,我们以互联网金融风控场景来展开对实时流计算系统的讨论。
在金融风控场景下,分析的风险总体上可以分成两类:一类是贷款对象信用风险,另一类是贷款对象欺诈风险。两类风险的风控因素和模型不同。
贷款对象信用风险关注的是贷款对象自身的信用状况、还款意愿和还款能力。信用风险评估常用的分析因素有四要素认证和征信报告,使用的风控模型主要是可解释性强的逻辑回归评分卡。
贷款对象欺诈风险关注的则是贷款对象是不是在骗贷。在欺诈情形下,贷款对象提供的所有征信信息可能都是正常的,但是这些信息是通过伪造或“黑产”渠道得来的,贷款对象以大量具有良好信用的不同身份获得贷款后,欺诈成功,卷款而逃。欺诈风险评估使用的分析因素多种多样。例如,网络因素(如IP是否集中),用户属性因素(如年龄和职业),用户行为因素(如是否在某个时间段集中贷款),社会信用因素(如社保缴纳情况),第三方征信(如芝麻信用得分),还有各种渠道而来的黑名单等。总体而言,欺诈风险评估使用的因素来源更多,使用的模型也更加多样,如决策树、聚类分析等。
互联网金融风控的一般流程如下所述。用户在手机或网页等客户端发出注册、贷款申请等事件时,客户端将用户属性、行为、生物识别、终端设备信息、网络状况、TCP/IP协议栈等信息发送到数据采集服务器;数据采集服务器收到数据后,进行字段提取和转化,发送给特征提取模块;特征提取模块按照预先设定的特征清单进行特征提取,然后以提取出来的特征清单作为模型或规则系统的输入;最终依据模型或规则系统的评估结果做出决策。
根据上面描述的业务流程,完整的互联网金融风控系统架构设计如图2-1所示。
图2-1 完整的互联网金融风控系统架构设计
从手机或网页等客户端,通过互联网发送事件到采集服务器,是金融风控场景下常用的数据采集方式之一。客户端发送的事件包含用户属性、行为、生物识别、终端设备信息、网络状况、TCP/IP协议栈等信息。HTTP/HTTPS协议筑造了整个互联网的基石,也是当前最主要的应用层通信协议。没有特别必要,我们采用HTTP/HTTPS协议来进行客户端和数据采集服务器之间的数据通信。
确定数据通信协议后,还需要制定事件上报API。以REST风格为代表的API设计方式提供了相对标准的API设计准则。依照REST风格,设计事件上报API如下。
POST event/
{
"user_id": "u200710918",
"client_timestamp": "1524646221000",
"event_type": "loan",
"amount": 1000,
"……": "……"
}
上面的RES.API表示向服务器上报一个事件,其中:用户账号“user_id”是“u200710918”,发送时间戳“client_timestamp”是“1524646221000”,事件类型“event_type”是“loan”,金额“amount”是“1000”,其他信息用“……”表示。
至此通信协议和API都确定了,接下来实现采集服务器。
使用Sprin Boot实现数据采集服务器
说到REST风格Web服务器开发,大部分Java编程开发者首先想到的是Spring系列中的Spring Boot。毫无疑问,Spring Boot使得用Java做Web服务开发的体验相比过去有了极大的提升。几乎在数分钟之内,一个可用的Web服务就可以开发完毕。所以,我们也用Spring Boot来实现数据采集服务器,具体实现如下。
@Controller
@EnableAutoConfiguration
public class SpringDataCollector {
private static final Logger logger = LoggerFactory.getLogger
(SpringDataCollector.class);
private JSONObject doExtractCleanTransform(JSONObject event) {
// TODO: 实现抽取、清洗、转化的具体逻辑
return event;
}
private final String kafkaBroker = "127.0.0.1:9092";
private final String topic = "collector_event";
private final KafkaSender kafkaSender = new KafkaSender(kafkaBroker);
@PostMapping(path = "/event", produces =
MediaType.APPLICATION_JSON_UTF8_VALUE)
@ResponseBody()
public String uploadEvent(@RequestBody byte[] body) {
// step1: 对消息进行解码
JSONObject bodyJson = JSONObject.parseObject(new String(body,
Charsets.UTF_8));
// step2: 对消息进行抽取、清洗、转化
JSONObject normEvent = doExtractCleanTransform(bodyJson);
// step3: 将格式规整化的消息发到消息中间件Kafka
kafkaSender.send(topic,
normEvent.toJSONString().getBytes(Charsets.UTF_8));
// 通知客户端数据采集成功
return RestHelper.genResponse(200, "ok").toJSONString();
}
public static void main(String[] args) throws Exception {
SpringApplication.run(SpringDataCollector.class, args);
}
}
注意:为了节省篇幅,本文中的样例代码均只保留了主要逻辑以阐述问题,大部分略去了异常处理和日志打印。如需将这些代码用于真实产品环境,则需要读者自行添加异常处理和日志打印相关内容。异常处理和日志打印是可靠软件的重要因素,在编程开发时务必重视这两点。
在上面的示例代码中,uploadEvent实现了事件上报接口。收到上报事件后,首先对数据进行解码,解码结果用FastJson中的通用JSON类JSONObject表示;然后在JSONObject对象基础上进行抽取、清洗和转化,规整为统一格式数据;最后将规整好的数据发往数据传输系统Kafka。这个程序在实现功能上并没有特别的地方,我们只是感觉到基于Spring Boot的服务开发体验是如此轻松、愉快。
BIO与NIO
我们使用Spring Boot非常迅速地开发好了数据采集服务器,之后的测试和上线工作也一帆风顺。客户开始接入流量,服务运转良好,似乎一切都预示着程序员的工作就是这样轻松、美好。但好景不长,随着业务流量的增加,晴朗天空不知不觉飘来两朵“乌云”。
·随着用户越来越多,采集服务器连接数逐渐增加,甚至在高峰时出现成千上万并发连接的情况。每个连接的服务质量急剧下降,不时返回408或503错误。
·监控显示,客户请求响应的时延非常大,进一步分析发现是doExtractCleanTransform函数比较耗时。这个函数耗时的原因可能是计算比较复杂,也可能是有较多的I/O操作,还可能是有较多的外部请求调用。数据采集服务器的性能表现很差,但是看系统监控又发现CPU和I/O的使用效率并不高,似乎它们都在“偷懒”不干活。
基本上,当我们初次开始认真关注程序的性能问题时,都会碰到上面的问题。根据笔者经验,如果此时能够深入地钻研下去,我们将从此掌握编写高性能程序的高级技能点,将在以后的程序开发过程中受益良多。
我们先看采集服务器的连接问题。当使用Spring Boot做Web服务开发时,默认情况下Spring Boot使用Tomcat容器。
早期版本的Tomcat默认使用BIO连接器。虽然现在的版本已经去掉BIO连接器,并默认采用NIO连接器,但是我们还是来比较下BIO和NIO连接器的区别,这样对理解BIO和NIO、同步和异步的原理,以及编写高性能程序都有很大的帮助。
2.3.1 BIO连接器
在Java中,最基础的I/O方式是BIO(Blocking I/O,阻塞式I/O)。BIO是一种同步并且阻塞式的I/O方式。图2-2描述了BIO连接器的工作原理,当接收器(acceptor)线程接收到新的请求连接套接字(socket)时,从工作线程栈(worker stack)中取出一个空闲的工作线程(worker),用于处理新接收的连接套接字。如果工作线程栈没有空闲工作线程,且创建的工作线程数量没有达到设置的上限值,则新建一个工作线程用于处理连接套接字。
而如果工作线程栈没有空闲工作线程,且创建的工作线程数量已达到设置上限值,则接收器被阻塞,它将暂停接收新的连接请求。只有当某个工作线程处理完其对应的请求后,它会被重新放入工作线程栈,成为空闲线程之后,接收器才能继续接收新的请求,并交由工作线程栈中的空闲工作线程处理。工作线程从连接套接字中读取请求数据并进行处理,处理完成后再将结果通过连接套接字发送回客户端。
图2-2 BIO连接器的工作原理
在请求连接数比较小、请求处理逻辑比较简单、工作线程请求处理时延很短的场景下,使用BIO连接器是很合适的。但很显然,在实际工作中的大多数场景下,这些前提条件都是可遇而不可求的。就如在互联网金融风控系统中,上报数据的客户端是分布在全世界各地的成千上万,甚至数十万、数百万的手机、平板和个人电脑,这些终端平均下来每秒发送到数据采集服务器的请求少则数千,多则上万。
再考虑工作线程处理较慢的情况,如计算逻辑较复杂或外部I/O较多。当所有工作线程都在工作时,可用工作线程耗尽,这时请求接收器将阻塞,等待工作线程可用,而不能接收新的请求套接字。当工作线程处理完请求后,由于没有立即可用的新请求需要处理,它必须等到请求接收器接收新的请求之后,才能继续工作。经过以上分析就会发现,这种处理方案的性能比较低下。一方面请求接收线程和工作线程都很忙碌,另一方面请求接收线程和工作线程却要时不时地相互等待,这就导致请求接收器和工作线程时不时处于空闲状态。进一步深入到操作系统层面,表现在CPU和网络I/O很多时候处于空闲状态。操作系统资源大量空闲,造成资源浪费,性能却还十分低下。很显然,这是我们不能接受的情况,必须对其做出改进和提升。
为了在使用BIO连接器时提高资源的使用效率,一种行之有效的方法是增加工作线程数量。理想情况下,如果有成千上万甚至上百万个工作线程来处理连接套接字,那么请求接收器不用担心工作线程不够用,因为任何时候总会有工作线程可用。这样,数据采集服务器的并发连接数也能够达到成千上万。当然,如果要支持百万并发连接,还需要专门配置一些操作系统参数,这里不做详细讨论,感兴趣的读者可以自行搜索相关资料。
当前大多数操作系统在处理上万个甚至只需几千个线程时,性能就会明显下降。这是因为,当需要调度的线程变得非常多后,操作系统在进行线程调度和上下文切换时,需要占用大量CPU时间,使得真正用于有效任务计算的CPU时间变少。以Linux操作系统为例,在现代处理器上一次线程上下文切换的典型时延为数微秒(microsecond)。如果以5微秒来计算,则全部1万个线程各做一次上下文切换就要占用50毫秒,这个时延已相当明显。除了线程切换的时间显著增加外,由于每个线程拥有自己独立的线程栈,过多的线程还会占用大量内存,这也是一个主要的资源消耗和性能损耗因素。虽然启用过多线程会对CPU资源和内存资源造成浪费,但是充足的线程还是有一定好处的,毕竟足够多的线程能够同时触发足够多的I/O任务,从而使I/O资源使用得更加充分。
Linux操作系统线程调度原理如图2-3所示。我们在开发多线程应用时常说的线程,在Linux操作系统中实际上被实现为轻量级进程。而每个轻量级进程以1︰1的关系对应一个内核线程。所有内核线程会根据其运行已消耗CPU时间、线程所处状态及线程优先级等多种因素被调度器不停轮流调度执行。通常而言,当有数千个线程时,调度器尚可以高效处理;但当有数十万、数百万线程时,调度器就会“累趴下”了。
图2-3 Linux操作系统线程调度原理
既然不能在一台机器上运行太多线程,我们很自然地想到可以用多台机器来分担计算任务。不错,这是一个很好的办法。在多个对等的服务节点之前,架设一个负载均衡器(如Nginx),可以有效地将请求分发到多台服务器,这既可以提高服务整体的吞吐能力,也能在一定程度上降低因为请求积压造成的服务响应时延。但除非是线上情况紧急,需要立刻提升服务处理能力以应对突发的流量高峰冲击,否则我们不应该立刻这样做!作为有极客精神的程序员,同时为了降低成本着想,在将一台机器的资源充分利用前,我们不能简单地寄希望于通过横向增加机器数量来提高服务的性能。
既不能运行太多线程,也不愿意水平扩展机器数量,那怎样才能提升程序的性能呢?我们不妨这样思考,接收器无阻塞地接收连接套接字,并将新接收的连接套接字暂存到一个缓冲区。当工作线程在处理完一个连接套接字后,从缓冲区取出暂存的连接套接字进行处理。如此一来,接收器可以不停地接收新的连接套接字,而工作线程的任务也被安排得满满当当。
因此,BIO连接器的本质缺陷是接收器和工作线程执行步调耦合太紧。如果将接收器和工作线程通过缓冲区隔离开来,让它们互不干扰地独立运行,那么接收器和工作线程的工作效率都会得到提高,进而提升程序性能。图2-4展示了改进BIO的方法,在接收器接收到新的连接套接字时,不再需要获取一个处于空闲状态的工作线程,而是只需将其放入连接套接字队列即可。而工作线程则完全不需要理会接收器在做什么,它只需要看队列有没有待处理的连接套接字即可:如果有,就将连接套接字取出来处理;如果没有,说明暂时没有请求,它可以休息一会儿了。接下来我们将看到,Tomcat的NIO连接器正是按照类似的思路做的。
图2-4 改进BIO的方法
2.3.2 NIO连接器
在编写本文时,最新版本的Tomcat已经将NIO作为默认连接器。
图2-5描述了NIO连接器的工作原理,当接收器接收新的连接套接字时,先将其依次封装成NioChannel对象和PollerEvent对象,再将PollerEvent对象放入PollerEvent队列。与此同时,轮询器不断从其PollerEvent队列中取出新的PollerEvent对象,获得代表连接套接字的NioChannel,再将其SocketChannel注册到选择器。选择器从注册在其上的SocketChannel中挑选出处于Rea.Ready状态的SocketChannel,再将其交到工作线程池的队列。工作线程池中的各个工作线程从队列中取出连接套接字,并读取请求数据进行处理,在处理完成时再将结果通过连接套接字发送回客户端。
图2-5 NIO连接器的工作原理
从NIO连接器的工作过程可以看出,Tomcat的NIO连接器相比BIO连接器,主要做出了两大改进。除了类似于我们在图2-4中提到的使用“队列”将接收器和工作线程隔离开的改进方法之外,Tomcat的NIO连接器还引入选择器(包含在轮询器中)来更加精细地管理连接套接字,也就是说,选择器只有在连接套接字中的数据处于可读(Read Ready)状态时,才将其交由工作线程来处理,避免了工作线程等待连接套接字准备数据的过程。NIO连接器的这两点改进带来了两种好处。
1)接收器和工作线程隔离开,让它们彼此之间不会因为对方阻塞而影响自己的连续运行。这样接收器和工作线程都能尽其所能地工作,从而更加充分地使用I/O和CPU资源。
2)因为有了队列缓存待处理的连接套接字,NIO连接器能够保持的并发连接数也就不再受限于工作线程数量,而只受限于系统设置的上限值(由LimitLatch指定)。这样,无须分配大量线程,数据采集服务器就能支持大量并发连接了。