基于Openresty+CEPH实现海量数据管理系统

本文涉及的产品
数据管理 DMS,安全协同 3个实例 3个月
推荐场景:
学生管理系统数据库
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
简介: 作为一家专注于三维高精度地图服务的公司,有海量(PB级)的原始数据、中间数据、成功数据,需要存储、管理、并定期归档。 1. 按项目管理数据,数据分类航飞数据、控制点数据、中间数据、成果数据、其他数据。数据来源包括无人机数据、载荷数据、地面站数据、人工打点数据等。不同渠道汇集而来的数据。

「持续更新中,欢迎关注...」

1. 需求:

作为一家专注于三维高精度地图服务的公司,有海量(PB级)的原始数据、中间数据、成功数据,需要存储、管理、并定期归档。

  1. 按项目管理数据,数据分类航飞数据、控制点数据、中间数据、成果数据、其他数据。数据来源包括无人机数据、载荷数据、地面站数据、人工打点数据等。不同渠道汇集而来的数据。
  2. 采用类似百度网盘的形式,上传、下载,支持断点续传、进度跟踪。
  3. 支持细化到文件级别的权限控制,以及更多的文件(夹)属性。

2. 分析:

  1. 系统重点在于数据存储的选型,支持海量数据的存储,能够支持在复杂网络下的数据上传。选用CEPH作为数据存储,RGW对象存储,S3协议上传下载,完美支持分片和断点续传。
  2. 系统难点在于文件级别的业务权限控制,以及文件(夹)更多的属性支持。CEPH RGW本身支持权限控制,但是无法和业务权限做对接。对象存储本身没有文件夹的概念,无法对文件夹做分类、数量展示、大小展示。所以实现自定义索引服务,CEPH主要负责存储,自定义索引服务实现展示与查询。

3. 实现

系统重点在于海量数据上传的可靠性与海量数据索引的管理。

3.1 架构

空间数据系统架构图.png

  1. 上传助手就是类百度网盘的桌面端软件,采用Electron JS
    )实现。主要实现功能:项目展示、上传、下载。
  2. 业务层包括网关服务、账号服务、项目服务、文件索引服务等。采用Java + Spring Boot + Spring Cloud技术栈。其中重点服务是文件索引服务Index Server,负责海量文件的索引维护和查询。
  3. 业务数据MySQL集群+Redis集群,海量文件存储使用CEPH对象存储,支持S3 API。

3.3 关键流程图

上传流程.png

  1. 上传助手使用普通的Put Object请求上传文件,加上自定义的metadata字段(项目ID、用户ID等)即可完成数据的提交。
  2. Openresty使用proxy模式将文件请求转发到 CEPH RGW,由RGW完成后台数据存储处理。
  3. Openresty在RGW完成数据存储以后,调用log_by_lua_file将对应请求的用户自定义metadata和文件属性转发到后台Kafka。
  4. 文件索引服务(Index Server)从Kafka中消费任务,拿到每个文件的信息。
  5. 文件索引服务(Index Server)对文件数据按业务要求进行处理后,存入MySQL数据库。

3.4 示例代码

log_by_lua_file.lua:从Openresty获取文件信息,并发往Kafka

local cjson = require "cjson"
local producer = require "resty.kafka.producer"
local broker_list = {
    { host = "172.16.0.20", port = 9092 },
}
function send_job_to_kafka()
    local log_json = {}
    local req_headers_ = ngx.req.get_headers()

    for k, v in pairs(req_headers_) do
        if k == "content-length" then
            log_json["contentLength"] = tostring(v)
        end
        if k == "u-id" then
            log_json["uId"] = tostring(v)
        end
        if k == "p-id" then
            log_json["pId"] = tostring(v)
        end
    end

    local resp_headers_ = ngx.resp.get_headers()
    for k, v in pairs(resp_headers_) do
        if k == "etag" then
            log_json["etag"] = string.gsub(v, "\"", "")
            break
        end
    end

    log_json["uri"] = ngx.var.uri
    log_json["host"] = ngx.var.host
    log_json["remoteAddr"] = ngx.var.remote_addr
    log_json["status"] = ngx.var.status
    local message = cjson.encode(log_json);
    ngx.log(ngx.ERR, "message is[", message, "]")
    return message
end

--local is_args = ngx.var.is_args
local request_method = ngx.var.request_method
local status_code = ngx.var.status

-- 过滤Put Object成功的请求,记录相应的metadata及请求ID,并转发到kafka
if request_method == "PUT" and status_code == "200" then
    local bp = producer:new(broker_list, { producer_type = "async" })
    local ok, err = bp:send("ceph_lua_test", nil, send_job_to_kafka())
    if not ok then
        ngx.log(ngx.ERR, "kafka send err:", err)
        return
    end
    ngx.log(ngx.ERR, "kafka send success:", ok)
end

4. 总结

  1. 通过此架构方案,在海量文件归档过程中,将文件基本信息异步导入到业务数据库中,便于业务应用开发。
  2. 此架构一般也应用对象存储的多媒体文件处理,比如图片处理、视频处理、加水印、鉴黄、事件通知等。
相关实践学习
MySQL基础-学生管理系统数据库设计
本场景介绍如何使用DMS工具连接RDS,并使用DMS图形化工具创建数据库表。
相关文章
|
负载均衡 应用服务中间件 nginx
62分布式电商项目 -nginx介绍
62分布式电商项目 -nginx介绍
48 0
|
存储 Kubernetes 关系型数据库
K8s部署分布式存储Ceph系统搭建与实战
K8s部署分布式存储Ceph系统搭建与实战
1397 0
|
存储 关系型数据库 网络安全
分布式存储ceph入门介绍
ceph是当前最热门的分布式存储系统之一,是软件定义存储(SDS,SoftwareDefinedStorage)解决方案中的典范,本文对ceph的基础情况进行介绍。
864 0
分布式存储ceph入门介绍
|
存储 块存储 缓存
Ceph分布式存储性能调优(六)(上)
Ceph分布式存储性能调优(六)(上)
419 0
Ceph分布式存储性能调优(六)(上)
|
存储 缓存 固态存储
Ceph分布式存储性能调优(六)(下)
Ceph分布式存储性能调优(六)(下)
508 0
|
存储 Web App开发 应用服务中间件
|
存储 Linux 块存储
理解Ceph:一个开源的分布式存储平台
本文讲的是理解Ceph:一个开源的分布式存储平台,【编者的话】Ceph是一个符合POSIX、开源的分布式存储系统。最初由Sage Weill于2007年开发,Ceph的主要目标是设计成基于POSIX的没有单点故障的分布式文件系统,使数据能容错和无缝的复制。本文详细介绍了Ceph的历史和架构。
3087 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等