lindorm多模间数据无缝流转

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云原生多模数据库 Lindorm,多引擎 多规格 0-4节点
简介: 展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。

lindorm多模间数据无缝流转

1. 创建资源

开始实验之前,您需要先创建云服务器ECS和Lindorm集群资源。

  1. 在实验室页面,单击创建资源。
  2. (可选)在实验室页面左侧导航栏中,单击云产品资源列表,可查看本次实验资源相关信息(例如测试服务器IP地址、数据库及帐号密码信息等)。

说明:资源创建过程需要10~15分钟。


2. 实验背景说明

本步骤无需操作,可观看后进入下一步骤。

本实验用到的数据集包含有关2005年4月至2005年9月台湾地区信用卡客户的默认付款,人口统计因素,信用数据,付款历史和账单的信息。该数据可用于探讨拖欠还款的概率如何随不同人口统计学变量的类别而变化以及哪些变量是拖欠付款的最强预测因素。

本实验是介绍数据通过Kafka api写入流引擎,在宽表引擎和计算引擎中无缝流转,一站式完成流数据接受和处理、海量数据在线查询、数据转列存后在线分析的工作。


3. 初始化数据

本步骤将指导您登录Lindorm控制台,重置ldinsight密码,开通计算引擎并执行建表程序。

一、访问Lindorm控制台

  1. 远程桌面密码设置,账户名密码可均设置为root,在本实验中无退出远程桌面操作,此处账户名密码对实验无影响。设置完后可点击“切换到远程桌面”进行下一步实验。

  1. 双击 “Chromium” 浏览器,选择实验手册Tab栏的用户名,点击“下一步
  2. 复制用户名下的“子用户密码”,在密码输入框鼠标右键单击,选择“粘贴“(实验中在web页面的crtl+v快捷键会失效), 选择”登录“。

  1. 进入控制台,搜索框输入“lindorm”,会显示 “云原生多模数据库Lindorm” 的控制台,或者直接在实验浏览器输入以下地址就可以访问Lindorm上海地域的控制台。
https://lindorm.console.aliyun.com/cn-shanghai/cluster

  1. 选择对应的地域,找到对应的Lindorm 【华东2(上海)】实例,单击实例链接,进入”实例详情“页面(注意实例id 要和左侧的云产品资源 Tab 栏下的Lindorm 实例ID一致)。

  1. 进去以后需要用户确认”可运维时间段“,点击”确认“即可。

  1. 选择”宽表引擎“-”集群管理“,点击集权管理重置UI访问密码,密码设置为root并在密码确认中再次输入root;设置完成点击确定,为后续使用ldinsight做准备。

  1. 添加 IP 白名单
  1. 浏览器新开一个Tab,访问 `https://tool.lu/ip/` ,看到自己的外网ip 地址,例如 `140.205.11.13`

  1. 点击上方窗口切换回Lindorm控制台

  1. 选择 “访问控制”-“白名单”,点击“修改分组白名单“,输入上一步获取的外网ip,(如果白名单已经存在多个ip ,直接在尾部添加逗号和ip )

  1. 点击”确认“以后,大概10-30秒左右,集群状态会变为”运行中“,表示添加白名单成功。

  1. 修改密码
  1. 选择控制台左侧的”数据库连接“,将默认密码复制下来,作为后面修改密码的旧密码填写内容。

  1. 选择控制台左侧的”宽表引擎“,在页面中的”集群管理“。点击”ClusterManager公网“,在弹出的对话框当中点击“确认”(上一步已经添加了ip白名单)。

  1. 在弹出框中,输入用户名(admin)、密码(root),即可进入 Lindorm Insight首页。
  1. 注意:如果无法访问,请确保第2步的ip白名单添加成功。

  1. 进入首页后依次点击DataManager和User Manager,在用户列表中选择root用户进行密码修改,将之前在控制台上复制下来的旧密码输入,设置新密码为root。

  1. 切换回Lindorm控制台页面,单击左侧列表中的“计算引擎”,点击立即开通。

  1. 单击确定,开通计算引擎,为后续执行计算任务做准备。

二、访问WebTerminal

  1. 在上方新建一个窗口,在地址栏输入以下地址切换至ECS控制台。
https://ecs.console.aliyun.com/home

  1. 点击实例按钮,在上海的实例列表中找到在云产品资源中ECS实例id中显示的实例,单击远程连接按钮连接Web Terminal,登录到ECS。

  1. 点击立即登录。

  1. 复制云产品资源中的ECS登录密码到密码输入框,点击确定。

  1. 访问成功如图。
  2. 执行以下命令回车运行可以看到 /root 目录有一个初始化好的导入jar 包
ls

  1. 执行以安装 JDK,显示Complete代表安装成功(如下图所示)
yum install -y java

  1. 安装结束以后,执行 以下命令来确认 JDK1.8 安装成功,安转成功后显示如下结果
java -version

  1. 安装结束以后,用Lindorm实例id 替换下面url中的实例id,将替换后的命令复制在命令行行回车运行。(实例为云产品资源中lindorm的实例ID)

cd ~
java  -Xms1G -Xmx1G -Durl="jdbc:lindorm:table:url=http://实例id-proxy-lindorm.lindorm.rds.aliyuncs.com:30060" -Dlindorm_user="root" -Dlindorm_password="root" -jar init_demo.jar

  1. 显示如下截图,没有报错,即表明建表成功。


4. 查询宽表引擎数据

本步骤将指导您通过ldinsight查询宽表引擎。

一、访问Lindorm Insight

  1. 切换回到Lindorm控制台,选择”宽表引擎“-”集群管理“。点击”ClusterManager公网“,在弹出的对话框当中点击“确认”(上一步已经添加了ip白名单)。

  1. 在弹出框中,输入用户名(admin)、密码(root),即可进入 Lindorm Insight 首页。
  2. 访问”数据查询“ - ”SQL 执行器“,namespace 选择”default“,就可以看到右边显示已经存在的 loan_record 表名。

  1. 在sql窗口中执行以下命令,可以看到当前表无数据。
SELECT * FROM  loan_record limit 3;


5. 连接流引擎

本步骤将指导您连接流引擎。

一、连接流引擎

  1. 点击右上角窗口切换回ECS窗口。

  1. 流引擎lindorm-cli安装包已预先安装在/root 目录下,将左侧云产品资源中获取的实例id替换到以下命令中,复制以下命令在命令窗口内,回车运行以连接Lindorm流引擎cli。
cd /root
tar -zxf lindorm-sqlline-0.1.5.tar.gz
/root/lindorm-sqlline-0.1.5/bin/lindorm-sqlline -url jdbc:streamsql:url=http://实例id-proxy-stream.lindorm.rds.aliyuncs.com:30060 -u root -p root
  1. 连接成功提示
  • 出现如下提示为连接成功。


6. 创建数据处理任务

本步骤将指导您通过创建数据处理任务来将流引擎内的数据关联到宽表引擎,Lindorm流引擎具备ETL能力,若有数据聚合清洗的需求在这步配置。本实验无数据清洗过滤需求,只需要将数据转移到宽表内,可直接复制以下任务代码,回车运行。

一、创建数据处理任务

  1. 在上一步连接完之后的命令行内复制以下代码在命令行,将左侧云产品资源中获取的Lindorm实例id替换代码中的实例id(有两处需要替换),回车运行以创建数据处理任务,将流数据导入到Lindorm宽表中。
CREATE FJOB write_to_lindorm(
    CREATE TABLE orders( 
        id BIGINT,
        loanAmnt DOUBLE ,
        term INT,
        interestRate DOUBLE,
        installment DOUBLE,
        grade STRING,
        subGrade STRING,
        employmentTitle INT,
        employmentLength STRING,
        homeOwnership INT,
        annualIncome DOUBLE,
        verificationStatus INT,
        issueDate STRING,
        purpose INT,
        postCode INT,
        regionCode INT,
        dti DOUBLE,
        delinquency_2years INT,
        ficoRangeLow INT,
        ficoRangeHigh INT,
        openAcc INT,
        pubRec INT,
        pubRecBankruptcies INT,
        revolBal INT,
        revolUtil DOUBLE,
        totalAcc INT,
        initialListStatus INT,
        applicationType INT,
        earliesCreditLine STRING,
        title INT,
        policyCode INT,
        n0 INT,
        n1 INT,
        n2 INT,
        n3 INT,
        n4 INT,
        n5 INT,
        n6 INT,
        n7 INT,
        n8 INT,
        n9 INT,
        n10 INT,
        n11 INT,
        n12 INT,
        n13 INT,
        n14 INT
    ) WITH (  
        'connector'='kafka',
        'topic'='order_topic',
        'properties.group.id' = 'order_group',
        'scan.startup.mode' = 'earliest-offset',
        'properties.bootstrap.servers'='{实例id}-proxy-stream.lindorm.rds.aliyuncs.com:30080',
        'format'='json');    
    CREATE TABLE loan_record(
        id BIGINT,
        loanAmnt DOUBLE ,
        term INT,
        interestRate DOUBLE,
        installment DOUBLE,
        grade STRING,
        subGrade STRING,
        employmentTitle INT,
        employmentLength STRING,
        homeOwnership INT,
        annualIncome DOUBLE,
        verificationStatus INT,
        issueDate STRING,
        purpose INT,
        postCode INT,
        regionCode INT,
        dti DOUBLE,
        delinquency_2years INT,
        ficoRangeLow INT,
        ficoRangeHigh INT,
        openAcc INT,
        pubRec INT,
        pubRecBankruptcies INT,
        revolBal INT,
        revolUtil DOUBLE,
        totalAcc INT,
        initialListStatus INT,
        applicationType INT,
        earliesCreditLine STRING,
        title INT,
        policyCode INT,
        n0 INT,
        n1 INT,
        n2 INT,
        n3 INT,
        n4 INT,
        n5 INT,
        n6 INT,
        n7 INT,
        n8 INT,
        n9 INT,
        n10 INT,
        n11 INT,
        n12 INT,
        n13 INT,
        n14 INT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (  
        'connector'='lindorm',
        'tableName'='loan_record',
        'seedServer'='{实例id}-proxy-lindorm.lindorm.rds.aliyuncs.com:30020',
        'namespace'='default',
        'userName' = 'root',
        'password' = 'root');
    INSERT INTO loan_record
    SELECT * FROM orders
);
  1. 建立成功会显示以下返回结果。


7. 通过Kafka API写入流数据

本步骤将指导您通过Kafka API写入流数据到Lindorm内,。

一、通过Kafka API写入流数据

说明:

流引擎写入的数据格式支持JSON、Avro和CSV,本实验用的是csv,实验已在根目录 /root 下预置csv2lindorm.jar文件,该jar包用处为使用kafka API写入csv数据到流引擎内。

流引擎命令格式如下:java -jar csv2lindorm.jar 【数据文件目录】 【流表Topic名】 【流表kafka连接地址】

操作:

  1. 执行Ctrl+Z 退出sqlline ,进入到 shell 终端。
  2. 替换实例id后复制以下命令到命令行回车执行即可。
java -jar csv2lindorm.jar /root/train.csv order_topic {实例id}-stream-001.lindorm.rds.aliyuncs.com:30080
  1. 运行成功会显示写入的进度(需要导入80w条模拟数据,基于当前的资源规格,大概需要8min左右,请您耐心等待)。

  1. 写入成功显示


8. 再次查询宽表内数据

本步骤将指导您再次查询宽表引擎,确认已有数据写入。

一、再次查询宽表内数据

  1. 切换回Lindorm控制台。

  1. 进入ldinsight

  1. 通过ldinsight查询宽表内数据,显示有通过流处理任务处理完的数据。
SELECT * FROM  loan_record;


9. 从宽表中读取数据建立列存表

本步骤将指导您通过计算引擎从宽表中读取数据来建立列存表。

一、行列转存

  1. 点击左上窗口返回Lindorm控制台。

  1. 如下图所示点击【数据库连接】后点击计算引擎,复制JDBC地址

  1. 切换回ECS的窗口。

  1. 替换从上一步中复制到的地址后,复制以下命令并回车运行来连接 spark。
cd /root
tar -zxf ldspark-3.2.1-current.tgz
mv 3.2.1-ldspark-1.0* 3.2.1-ldspark-1.0
export SPARK_HOME=/root/3.2.1-ldspark-1.0; 
$SPARK_HOME/bin/beeline -u "此处替换为JDBC地址" -n root -p root
  1. 连接成功显示如下

  1. 复制以下命令回车运行来建立 spark表 :spark_catalog.default.loan_record_col
create table spark_catalog.default.loan_record_col(id LONG,
loanAmnt DOUBLE ,
term INT,
interestRate DOUBLE,
installment DOUBLE,
grade STRING,
subGrade STRING,
employmentTitle INT,
employmentLength STRING,
homeOwnership INT,
annualIncome DOUBLE,
verificationStatus INT,
issueDate STRING,
purpose INT,
postCode INT,
regionCode INT,
dti DOUBLE,
delinquency_2years INT,
ficoRangeLow INT,
ficoRangeHigh INT,
openAcc INT,
pubRec INT,
pubRecBankruptcies INT,
revolBal INT,
revolUtil DOUBLE,
totalAcc INT,
initialListStatus INT,
applicationType INT,
earliesCreditLine STRING,
title INT,
policyCode INT,
n0 INT,
n1 INT,
n2 INT,
n3 INT,
n4 INT,
n5 INT,
n6 INT,
n7 INT,
n8 INT,
n9 INT,
n10 INT,
n11 INT,
n12 INT,
n13 INT,
n14 INT) using parquet;
  1. 导入宽表内的行存数据,导入后即为列存数据,导入过程中完成了行列转存。(导入时间约需三分钟)
insert into spark_catalog.default.loan_record_col select * from lindorm_table.default.loan_record;
  1. 导入成功后显示如下


10. 进行计算任务

本步骤将指导您查询复杂计算语句。

一、进行计算任务

  1. 此时数据是以列存方式存储,可以灵活的对任意列进行查询、统计,不再需要额外建立索引

可在命令行写入sql执行计算任务,以下为计算语句的示例。

  1. 查询贷款等级
select distinct(grade) from loan_record_col;

  1. 每个贷款等级下客户的年收入
select sum(annualIncome) annualIncomeTotal, grade from loan_record_col group by grade order by annualIncomeTotal desc;
  1. 查找年收入 TOP10 的贷款用户
select * from loan_record_col where annualIncome in (select annualIncome from loan_record_col order by annualIncome desc limit 10);


11. 实验总结

一、实验总结

本实验展现了数据在Lindorm各引擎间的流转和处理,Lindorm是个云原生多模数据库,优势有以下的几点:

  1. 多模超融合:支持宽表、时序、对象、文本、队列、空间等多种数据模型,模型之间数据互融互通,具备数据接入、存储、检索、计算、分析等一体化融合处理与服务的能力,帮助应用开发更加敏捷、灵活、高效。
  2. 高性价比:支持千万级高并发吞吐、毫秒级访问延迟,并通过多级存储介质、智能冷热分离、自适应特征压缩,大幅减少存储成本。
  3. 云原生弹性:支持计算资源、存储资源独立弹性伸缩,并提供按需即时弹性、按使用量付费的Serverless服务。
  4. 开放兼容:兼容SQL、HBase/Cassandra/S3、TSDB、HDFS、Solr、Kafka等多种标准接口,支持与Hadoop、Spark、Flink、Kafka等系统无缝打通,并提供简单易用的数据交换、处理、订阅等能力。

实验链接:https://developer.aliyun.com/adc/scenario/5a54b129b3994e118347c9e078530006

相关文章
|
24天前
|
SQL 存储 运维
从建模到运维:联犀如何完美融入时序数据库 TDengine 实现物联网数据流畅管理
本篇文章是“2024,我想和 TDengine 谈谈”征文活动的三等奖作品。文章从一个具体的业务场景出发,分析了企业在面对海量时序数据时的挑战,并提出了利用 TDengine 高效处理和存储数据的方法,帮助企业解决在数据采集、存储、分析等方面的痛点。通过这篇文章,作者不仅展示了自己对数据处理技术的理解,还进一步阐释了时序数据库在行业中的潜力与应用价值,为读者提供了很多实际的操作思路和技术选型的参考。
40 1
|
7月前
|
存储 SQL 多模数据库
多模数据库Lindorm再升级:对接Dataphin,打通数据治理“最后一公里”
Lindorm通过与Dataphin的深度整合,进一步解决了数据集成和数据治理的问题,为企业提供更加高效和更具性价比的方案。
多模数据库Lindorm再升级:对接Dataphin,打通数据治理“最后一公里”
|
6月前
|
安全 数据管理
DataphinV4.1大升级:支持Lindorm开启高性价比数据治理,迎来“公共云半托管”云上自助新模式
DataphinV4.1大升级:支持Lindorm开启高性价比数据治理,迎来“公共云半托管”云上自助新模式
|
7月前
|
数据采集 安全 API
DataphinV4.1大升级: 支持Lindorm开启高性价比数据治理,迎来“公共云半托管”云上自助新模式
Dataphin 是阿里巴巴旗下的一个智能数据建设与治理平台,旨在帮助企业构建高效、可靠、安全的数据资产。在V4.1版本升级中,Dataphin 引入了Lindorm等多项新功能,并开启公共云半托管模式,优化代码搜索,为用户提供更加高效、灵活、安全的数据管理和运营环境,提升用户体验,促进企业数据资产的建设和价值挖掘。
1623 3
DataphinV4.1大升级: 支持Lindorm开启高性价比数据治理,迎来“公共云半托管”云上自助新模式
|
6月前
|
SQL 分布式计算 BI
实时计算 Flink版产品使用问题之基于宽表数据展示实时报表,该如何实现
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
存储 DataWorks 安全
DataWorks产品使用合集之没有使用独享资源组,如何将Lindorm中的数据导出或迁移到其他数据存储服务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
58 0
|
7月前
|
时序数据库
时序数据库工具grafana里的$timeFilter查询1个小时内的数据如何写查询条件
【6月更文挑战第24天】时序数据库工具grafana里的$timeFilter查询1个小时内的数据如何写查询条件
887 0
|
存储 NoSQL Oracle
「时序数据库」使用cassandra进行时间序列数据扫描
「时序数据库」使用cassandra进行时间序列数据扫描
|
SQL 存储 分布式计算
【时序数据库】时间序列数据和MongoDB第三部分-查询、分析和呈现时间序列数据
【时序数据库】时间序列数据和MongoDB第三部分-查询、分析和呈现时间序列数据
|
存储 分布式计算 NoSQL
「时序数据库」时间序列数据与MongoDB:第一部分-简介
「时序数据库」时间序列数据与MongoDB:第一部分-简介