1. 背景
数据湖的兴起,给数据存储带来了一轮新的革命。越来越多的公司选择将存储切换到云上对象存储。因为云上对象存储往往意味着大容量、低成本、易扩容。
说到对象存储,必然涉及到 S3 协议,S3 协议已经事实上成为对象存储的通用协议。不过,市面上不少数据平台公司,也会选择基于 S3 协议又兼顾 Hadoop 使用习惯的 S3A Connector,比如 Databricks 在对象存储上提供的表数据结构 Delta Lake。
我们就以 Hadoop 社区中的 S3A Connector 的实现为切入,来分析一下数据湖写入路径的安全性。
2. Hadoop S3 的写入支持
因为 S3 协议本身不支持增量写入,因此 S3A 实现时默认的写入方式是先通过缓存到本地,最后在文件 close 后再上传到对象存储。但是,这种默认的方式并不一定高效,对大文件来说,在 close 被调用前,本地已经缓存大量的数据,这样会造成 close 操作非常耗时,文件写入整体看也不高效。
从 Hadoop 2.8.5 版本开始,可以通过设置 fs.s3a.fast.upload 为 true,打开快速上传来优化写入路径。打开后,可以边写本地缓存块,边将满足大小的块异步上传(默认 100M 一个块)。这样也满足了对象存储中分阶段上传接口的一些限制,比如单个块不能小于 5M,分块总数不能大于 10000。
通过阅读 Hadoop 2.8.5 相关源码,我们可以发现打开 fs.s3a.fast.upload 后,S3AFileSystem 在创建文件时会打开 S3ABlockOutputStream(Hadoop 3.x 也有类似的 S3AFastOutputStream)随后,S3ABlockOutputStream 在处理 write、flush 等操作时,则会调用一个抽象的 S3ADataBlock 来执行。
而 S3ADataBlock 则可由三种工厂方法来创建,分别创建基于堆内存的 ArrayBlock、基于磁盘的 DiskBlock,或者基于堆外内存的 ByteBufferBlock。选择哪种工厂,由 fs.s3a.fast.upload.buffer 这个配置项控制,默认为磁盘(disk)。其他两种可选配置为堆内存(array)和 堆外内存(bytebuffer)。
3. 磁盘的问题
通过了解 Hadoop 社区中 S3A 的实现,我们发现借助磁盘缓存数据是常见甚至默认的行为。因为这样可以减少内存占用,缓存更多的数据。但是,这样也带来了磁盘本身的阿喀琉斯之踵 -- 磁盘的稳定性问题。
在数据存储领域,磁盘的问题往往非常令人头疼。比如磁盘写满,磁盘坏道问题,还有偶现的磁盘数据比特反转导致的数据安全性问题。
哪怕单块磁盘的可靠性非常高,但由于磁盘出现问题的概率会随着磁盘数的提升而变大,这会使数据安全性蒙上一层阴影。
对于R个副本的情况,设磁盘的年故障率为P,磁盘数为N,则整个机群有C ( N, R ) = N! / ( R! * ( N- R )! ) 种 R 副本的组合方式。机群数据总量为 M,分片大小为 T,那么有 R 个磁盘同时损坏造成数据丢失的概率是:
* 引用于《磁盘故障与存储系统的年失效率估算》
因此,要保证写路径的数据安全型,我们不能完全依赖底层存储介质的保证。仍需要我们在数据写入时就做一些努力。我们先来做一些实验来看看 S3AFileSystem 在这些问题上的表现。
3.1 模拟磁盘 IO 问题
- 修改 core-sites.xml 中的 fs.s3a.buffer.dir 指向 /dev/vdc 所在的路径,比如我机器上的 /data2/
<property><name>fs.s3a.fast.upload</name><value>true</value></property><property><!-- 本地 buffer 缓存目录,不存在会创建 --><name>fs.s3a.buffer.dir</name><value>/data2/tmp/</value></property>
- 创建并运行 stap 脚本,对所有在 /dev/vdc 上写操作的返回 IO Error
#!/usr/bin/stapprobevfs.write.return { if (devname=="vdc") { $return=-5 } }
$ stap-g io_errno.stp
- 执行写入程序 demo,验证 stap 脚本有效
$ ddif=/dev/zero of=test-1G-stap bs=1G count=1$ hadoop fs -put test-1G s3a://<your-bucket>/
返回结果:
put: 输入/输出错误
可以发现相关操作能正确抛出 IO 错误。
3.2 模拟磁盘比特反转
- 魔改 libfuse passthrough 中的 write 方法,并将 /data2/ 通过 fuse 挂载到 /mnt/passthrough
$ mkdir-p /mnt/passthrough/ $ ./passthrough /mnt/passthrough/ -omodules=subdir -osubdir=/data2/ -oauto_unmount
- 修改 core-sites.xml 中的 hadoop.tmp.dir 指向 /mnt/passthrough
<property><name>fs.s3a.fast.upload</name><value>true</value></property><property><!-- 本地 buffer 缓存目录,不存在会创建 --><name>fs.s3a.buffer.dir</name><value>/mnt/passthrough/</value></property>
- 执行写入程序 demo,验证上传内容的正确性。
$ mkdir-p input output $ ddif=/dev/zero of=input/test-1G-fuse bs=1G count=1$ hadoop fs -put input/test-1G-fuse s3a://<your-bucket>/ $ hadoop fs -get s3a://<your-bucket>/test-1G-fuse output/ $ md5sum input/test-1G-fuse output/test-1G-fuse
返回结果:
cd573cfaace07e7949bc0c46028904ff input/test-1G-fuse 37eb6e664e706ea48281acbd4676569e output/test-1G-fuse
可以发现,输入和输出的数据并不一致。
综上,通过 Hadoop S3AFileSystem 写入可以发现磁盘 IO 问题并正确抛出异常,但无法发现磁盘比特反转问题。
4. 网络的问题
既然磁盘写入有问题,那我们使用内存写入是否就一定可以避免踩坑呢?答案是不能,还可能有网络问题。
Amazon S3 在 2008 年就曾因为网络问题导致的比特位反转引发过重大事故。后来,大家分析这种问题多发生于两端间隔多个路由器的情况,路由器可能因为硬件/内存故障导致单/多比特位反转或双字节交换,这种反转如果发生在 payload 区,则无法通过链路层、网络层、传输层的 checksum 检查出来。
因此 Amazon S3 在这次事故中吸取的教训是,要通过在应用层给所有东西都添加 checksum 来保证数据正确性。
让我们来做一个实验,来看看 S3 是怎么做到 Checksum all of the things的,又是否能防止网络比特反转或者网络丢包呢?
4.1 模拟网络比特反转
- 安装 mitmproxy
$ pip3 install mitmproxy $ mitmproxy--versionMitmproxy: 5.3.0 Python: 3.6.8 OpenSSL: OpenSSL 1.1.1h 22 Sep 2020Platform: Linux-3.10.0-1160.71.1.el7.x86_64-x86_64-with-centos-7.9.2009-Core
- 利用 mitmdump 反向代理 s3a endpoint,并篡改其中的写请求。
- 编写 addons.py
frommitmproxyimportctx, httpimportjsonimporttimeimportosclassHookOssRequest: defrequest(self, flow: http.HTTPFlow): print("") print("="*50) print("FOR: "+flow.request.url) print(flow.request.method+" "+flow.request.path+" "+flow.request.http_version) print("-"*50+"request headers:") fork, vinflow.request.headers.items(): print("%-20s: %s"% (k.upper(), v)) ifflow.request.host=="<your-bucket>.oss-cn-shanghai-internal.aliyuncs.com"andflow.request.method=="PUT": clen=len(flow.request.content) rbit=ord('a') clist=list(flow.request.content) origin=clist[clen-1] clist[clen-1] =rbitupdated=clist[clen-1] flow.request.content=bytes(clist) ctx.log.info("updated requesting content pos("+str(clen-1) +") from "+str(chr(origin)) +" to "+str(chr(updated))) defresponse(self, flow: http.HTTPFlow): passaddons= [ HookOssRequest() ]
- 反向代理 http://.oss-cn-shanghai-internal.aliyuncs.com 到 http://localhost:8765
$ mitmdump-s addons.py -p8765--setblock_global=false--mode reverse:http://<your-bucket>.oss-cn-shanghai-internal.aliyuncs.com
- 修改 core-sites.xml 中的 fs.s3a.endpoint 指向 localhost:8765,并关闭ssl。
<property><name>fs.s3a.connection.ssl.enabled</name><value>false</value></property><property><name>fs.s3a.fast.upload</name><value>true</value></property>
- 执行写入程序 demo,验证上传内容的正确性
返回结果:
xx/xx/xx xx:xx:xx WARN s3a.S3ABlockOutputStream: Transfer failure of block FileBlock{index=2, destFile=/data/hadoop/hadoop-2.8.5/tmp/s3a/s3ablock-0002-6832685202941984333.tmp, state=Upload, dataSize=1, limit=104857600} xx/xx/xx xx:xx:xx WARN s3a.S3ABlockOutputStream: Transfer failure of block FileBlock{index=1, destFile=/data/hadoop/hadoop-2.8.5/tmp/s3a/s3ablock-0001-635596269039598032.tmp, state=Closed, dataSize=104857600, limit=104857600} put: Multi-part upload with id '14ABE04E57114D0D9D8DBCFE4CB9366E' to test-100M-proxy._COPYING_ on test-100M-proxy._COPYING_: com.amazonaws.AmazonClientException: Unable to verify integrity of data upload. Client calculated content hash (contentMD5: 93B885ADFE0DA089CDF634904FD59F71 in hex) didn't match hash (etag: 0CC175B9C0F1B6A831C399E269772661 in hex) calculated by Amazon S3. You may need to delete the data stored in Amazon S3. (bucketName: <your-bucket>, key: test-100M-proxy._COPYING_, uploadId: 14ABE04E57114D0D9D8DBCFE4CB9366E, partNumber: 2, partSize: 1): Unable to verify integrity of data upload. Client calculated content hash (contentMD5: 93B885ADFE0DA089CDF634904FD59F71 in hex) didn't match hash (etag: 0CC175B9C0F1B6A831C399E269772661 in hex) calculated by Amazon S3. You may need to delete the data stored in Amazon S3. (bucketName: <your-bucket>, key: test-100M-proxy._COPYING_, uploadId: 14ABE04E57114D0D9D8DBCFE4CB9366E, partNumber: 2, partSize: 1)
可见,Amazon S3 在 header 签名中强制对每个 upload part 的 payload 做了 Content-MD5 的校验,能够有效检测出网络比特反转。
4.2 模拟网络丢包
之前的测试验证了, S3 使用 Content-MD5 的校验可以保证单个请求的正确性,但在写一些大文件,或是涉及 JobCommitter 的作业中,往往会使用 multipart upload 来进行并发上传。而网络丢包也是一种常见的问题。于是,接下来我们来验证下,如果上传过程中其中一个 part 丢失,是否会给上传结果造成影响。
- 同样使用 mitmproxy 来模拟丢包
- 利用 mitmdump 反向代理 s3a endpoint,并丢弃其中 part2 的请求。
- 编写 addons.py
from mitmproxy import ctx, http import json import time import os class HookOssRequest: def request(self, flow: http.HTTPFlow): print("") print("="*50) print("FOR: "+ flow.request.url) print(flow.request.method +" "+ flow.request.path +" "+ flow.request.http_version) print("-"*50 +"request headers:") for k, v in flow.request.headers.items(): print("%-20s: %s" % (k.upper(), v)) if flow.request.host =="<your-bucket>.oss-cn-shanghai-internal.aliyuncs.com" and flow.request.method =="PUT": if"partNumber=2"in flow.request.path: flow.response = http.HTTPResponse.make( 200, # (optional) status code b"Hello World", # (optional) content {"Content-Type": "text/html"}, # (optional) headers ) ctx.log.info("drop part-2 request!") ctx.log.info("requesting length:"+ str(len(flow.request.content))) def response(self, flow: http.HTTPFlow): pass addons = [ HookOssRequest() ]
反向代理 http://.oss-cn-shanghai-internal.aliyuncs.com 到 http://localhost:8765
$ mitmdump-s addons.py -p8765--setblock_global=false--mode reverse:http://<your-bucket>.oss-cn-shanghai-internal.aliyuncs.com
- 同样修改 core-sites.xml 中的 fs.s3a.endpoint 指向 localhost:8765,并关闭ssl。
<property><name>fs.s3a.connection.ssl.enabled</name><value>false</value></property><property><name>fs.s3a.fast.upload</name><value>true</value></property>
- 执行写入程序 demo,验证上传内容的正确性
$ mkdir-p input output $ ddif=/dev/zero of=input/test-100M-proxy bs=$(( 100*1024*1024 + 1 ))count=1$ hadoop fs -put input/test-100M-proxy s3a://<your-bucket>/ xx/xx/xx xx:xx:x WARN s3a.S3ABlockOutputStream: Transfer failure of block FileBlock{index=2, destFile=/data/hadoop/hadoop-2.8.5/tmp/s3a/s3ablock-0002-2063629354855241099.tmp, state=Upload, dataSize=1, limit=104857600} put: Multi-part upload with id 'D58303E74A5F4E6D8A27DD112297D0BE' to test-100M-proxy._COPYING_ on test-100M-proxy._COPYING_: com.amazonaws.AmazonClientException: Unable to verify integrity of data upload. Client calculated content hash (contentMD5: 93B885ADFE0DA089CDF634904FD59F71 in hex) didn't match hash (etag: null in hex) calculated by Amazon S3. You may need to delete the data stored in Amazon S3. (bucketName: <your-bucket>, key: test-100M-proxy._COPYING_, uploadId: D58303E74A5F4E6D8A27DD112297D0BE, partNumber: 2, partSize: 1): Unable to verify integrity of data upload. Client calculated content hash (contentMD5: 93B885ADFE0DA089CDF634904FD59F71 in hex) didn't match hash (etag: null in hex) calculated by Amazon S3. You may need to delete the data stored in Amazon S3. (bucketName: <your-bucket>, key: test-100M-proxy._COPYING_, uploadId: D58303E74A5F4E6D8A27DD112297D0BE, partNumber: 2, partSize: 1)
可见,Amazon S3 在 close 请求中通过 CompleteMultipartUpload 对每个上传的 Part 做了检查,能够发现丢失的请求。
5. 校验算法的选择
上文已经证明了校验码的不可或缺性,而且可以看到 Amazon S3 默认采用了 MD5 作为校验码。那就是最优的选择了吗?让我们来看看还有没有别的选择。
5.1 数据摘要算法
MD5、SHA-1、SHA-256、SHA-512都是数据摘要算法,均被广泛作为密码的散列函数。
但由于MD5、SHA-1已经被证明为不安全的算法,目前建议使用较新的SHA-256和SHA-512。
所有算法的输入均可以是不定长的数据。MD5输出是16字节(128位),SHA-1输出为20字节(160位),SHA-256为32字节(256位),SHA-512为64字节(512位)。
可以看到,SHA算法的输出长度更长,因此更难发生碰撞,数据也更为安全。但运算速度与MD5相比,也更慢。
5.2 循环冗余校验
循环冗余校验又称 CRC(Cyclic redundancy check),将待发送的比特串看做是系数为 0 或者 1 的多项式。
M = 1001010
M(x) = 1*x^6 + 0*x^5 + 0*x^4 + 1*x^3 + 0*x^2 + 1*x^1 + 0*x^0
M(x) = x^6 + x^3 + x
CRC 编码时,发送方和接收方必须预先商定一个生成多项式 G(x)。
发送方将比特串和生成多项式 G(x) 进行运算得到校验码,在比特串尾附加校验码,使得带校验码的比特串的多项式能被 G(x) 整除。接收方接收到后,除以 G(x),若有余数,则传输有错。
5.3 校验算法的开销
CRC算法的优点是算法实现相对简单、运算速度较快。而且错误检错能力很强,因此被广泛应用于通信数据校验。
我们做了一些简单的benchmark以供参考:
CRC32 > CRC64 > MD5 > SHA-1 > SHA-512 > SHA-256
校验算法 |
单次操作耗时 |
BenchmarkMD5_100MB-8 |
175423280 ns/op |
BenchmarkSHA1_100MB-8 |
176478051 ns/op |
BenchmarkSHA256_100MB-8 |
344191216 ns/op |
BenchmarkSHA512_100MB-8 |
226938072 ns/op |
BenchmarkCRC32IEEE_100MB-8 |
10500107 ns/op |
BenchmarkCRC32Castagnoli_100MB-8 |
12991050 ns/op |
BenchmarkCRC64_100MB-8 |
86377178 ns/op |
而 OSS 支持的校验算法有 MD5 和 CRC64,那么同样的场景下,我们会优先选择 CRC64 替代 MD5。
6. 阿里云EMR JindoSDK 的最佳实践
在总结了 S3AFileSystem 做法中的优缺点,并结合 OSS 自身提供的一些功能取长补短后,阿里云EMR JindoSDK 得出了自己的最佳实践。
JindoSDK 实现的 JindoOutputStream 支持了两种校验方式,一种是请求级别的校验,一种是文件块级别的校验。
请求级别的校验,默认关闭。需要打开时,配置 fs.oss.checksum.md5.enable 为 true 即可。配置好之后,客户端会在块级别的请求(PutObject/MultipartUpload)Header 中添加 Payload 的 Content-MD5。如果服务端计算 Payload 的 md5 与 客户端提供的不符,则客户端会重试。
文件块级别的校验,默认打开。需要关闭时,需要配置 fs.oss.checksum.crc64.enable 为 false。则是在写入流一开始就在内存中同步计算传入 Buffer 的 CRC64,并在文件块落盘时和服务端计算返回的 CRC64 进行比较。
使用最新的 jindosdk-4.6.2 版本与 S3AFileSystem 在数据湖写入路径上,综合对比的结果如下:
场景 |
S3AFileSystem |
JindoOssFileSystem |
磁盘 IO 问题 |
抛出异常 java.io.IOException |
抛出异常 java.io.IOException |
磁盘比特反转 |
未抛出异常 |
抛出异常 java.io.IOException |
网络比特反转 |
抛出异常 org.apache.hadoop.fs.s3a.AWSClientIOException |
抛出异常 java.io.IOException |
网络丢包 |
抛出异常 org.apache.hadoop.fs.s3a.AWSClientIOException |
抛出异常 java.io.IOException |
写一个 5G 文件的耗时 |
13.375s |
6.849s |
可以看到 EMR JindoSDK 在写 OSS 时,不仅有着相比 S3AFileSystem 更完善的错误检查,性能也更为优异。
7. 总结与展望
数据湖存储的安全写入,必须要能考虑到内存、磁盘、网络的不可靠性。同时,也要结合存储介质本身的特性,选择合适的校验算法。熟悉数据写入完整链路,全面地考虑各种可能遇到的问题,并提供完善的测试方案验证可行性,才算有始有终。
阿里云EMR JindoSDK 通过以上方式形成了自己的最佳实践,不仅保证了对象存储写入链路的安全性,同样也支持了EMR JindoFS服务(OSS-HDFS)的写入链路。虽然 OSS-HDFS 中的一个文件可以对应 OSS 上的多个对象,但是在写入 OSS 时,底层复用了同一套实现。因此,在使用时也不需要做额外的适配,完全可以共用相同的配置项。
未来我们还将结合 OSS-HDFS,提供在数据随机读场景的安全性校验,而这是对象存储本身目前无法做到的。
8. 附录一:测试 S3A 的配置方式
core-sites.xml
<property><name>fs.s3a.impl</name><value>org.apache.hadoop.fs.s3a.S3AFileSystem</value></property><property><name>fs.AbstractFileSystem.s3a.impl</name><value>org.apache.hadoop.fs.s3a.S3A</value></property><property><name>fs.s3a.access.key</name><value>xxx</value></property><property><name>fs.s3a.secret.key</name><value>xx</value></property><property><name>fs.s3a.endpoint</name><value>localhost:8765</value></property><property><name>fs.s3a.connection.ssl.enabled</name><value>false</value></property><property><name>fs.s3a.fast.upload</name><value>true</value></property><property><!-- 本地 buffer 缓存目录,不存在会创建 --><name>fs.s3a.buffer.dir</name><value>/mnt/passthrough/</value></property>
9. 附录二:测试EMR JindoSDK 的配置方式
core-sites.xml
<property><name>fs.AbstractFileSystem.oss.impl</name><value>com.aliyun.jindodata.oss.OSS</value></property><property><name>fs.oss.impl</name><value>com.aliyun.jindodata.oss.JindoOssFileSystem</value></property><property><name>fs.oss.accessKeyId</name><value>xxx</value></property><property><name>fs.oss.accessKeySecret</name><value>xxx</value></property><property><name>fs.oss.endpoint</name><!-- 阿里云 ECS 环境下推荐使用内网 OSS Endpoint,即 oss-cn-xxx-internal.aliyuncs.com --><value>oss-cn-xxx.aliyuncs.com</value></property><property><!-- 客户端写入时的临时文件目录,可配置多个(逗号隔开),会轮流写入,多用户环境需配置可读写权限 --><name>fs.oss.tmp.data.dirs</name><value>/data2/tmp/</value></property><property><!-- 是否使用二级域名写入打开后 <your-bucket>.oss-cn-xxx-internal.aliyuncs.com/<your-dir>会变为 oss-cn-xxx-internal.aliyuncs.com/<your-bucket>/<your-dir> --><name>fs.oss.second.level.domain.enable</name><value>true</value></property>
log4j.properties
log4j.logger.com.aliyun.jindodata=INFO log4j.logger.com.aliyun.jindodata.common.FsStats=INFO
mitmproxy
获取 endpoint ip
ping oss-cn-shanghai-internal.aliyuncs.com 64 bytes from xxx.xxx.xxx.xx (xxx.xxx.xxx.xx): icmp_seq=1 ttl=102 time=0.937 ms
将 addons.py 中使用 ip 代替 .oss-cn-shanghai-internal.aliyuncs.com
ifflow.request.host=="xxx.xxx.xxx.xx"andflow.request.method=="PUT":
反向代理时也使用 ip 代替 .oss-cn-shanghai-internal.aliyuncs.com
mitmdump -s addons.py -p8765--setblock_global=false--mode reverse:http://xxx.xxx.xxx.xx:80
欢迎感兴趣的朋友加入钉钉交流群(钉钉搜索群号33413498 或 钉钉扫描下方二维码)