实现淘宝母婴订单实时查询和实时大屏
1. 实验资源方式简介及开始实验
云起实验室实验资源方式介绍
云起实验室支持实验资源体验、领取免费试用额度、个人账户资源三种实验资源方式。
- 实验资源体验
- 资源归属于客户,仅供本次实验使用
- 实验结束后,实验资源及实验记录将被释放。
- 资源创建过程需要3~5分钟(视资源不同开通时间有所差异,ACK等资源开通时间较长)。完成实验资源的创建后,在实验室页面左侧导航栏中,单击云产品资源列表,可查看本次实验资源相关信息(例如子用户名称、子用户密码、AK ID、AK Secret、资源中的项目名称等)。
- 说明:实验环境一旦开始创建则进入计时阶段,建议学员先基本了解实验具体的步骤、目的,真正开始做实验时再进行创建。
- 领取免费试用额度
- 使用个人账号开通试用,平台仅提供手册参考。
- 所有实验操作将保留至您的账号,请谨慎操作。
- 在实验页面下方卡片会展示本实验支持的试用规格,可以选择你要试用的云产品资源进行开通。您在实验过程中,可以随时用右下角icon唤起试用卡片。阿里云支持试用的产品列表、权益及具体规则说明请参考开发者试用中心。
- 说明:试用云产品开通在您的个人账号下,并占用您的试用权益。如试用超出免费试用额度,可能会产生一定费用。
- 个人账户资源
- 使用您个人的云资源进行操作,资源归属于个人。
- 所有实验操作将保留至您的账号,请谨慎操作。
- 平台仅提供手册参考,不会对资源做任何操作。
- 说明:使用个人账户资源,在创建资源时,可能会产生一定的费用,请您及时关注相关云产品资源的计费概述。
准备开始实验
在实验开始前,请您选择其中一种实验资源,单击确认开启实验。
说明:每个实验所支持的实验资源方式都不相同,实验不一定能满足有三种实验资源方式,请根据实验的实际情况,进行选择。
2. 领取免费试用资源
实验前必看!
- 在创建实时计算Flink版时,需要使用到对象存储OSS和负载均衡SLB,您需要领取负载均衡SLB和对象存储OSS资源抵扣包进行抵扣(单击对应产品卡片上的立即试用,在确认并了解相关信息后,根据页面提示申请试用),否则将会产生额外费用。如果您的对象存储OSS或SLB资源抵扣包已使用完毕或无领取资格,开通Flink试用后,将正常收取费用,计费详情请参见SLB按量付费和OSS按量付费。
- 如果您的阿里云主账号符合开通免费试用实时计算Flink版的资格,建议您开通免费试用实时计算Flink版,实时计算Flink版提供的按量付费(3个月5000CUH计算资源)试用,具有时长和容量限制,使用完请及时清理相关资源。实时计算Flink版如果未及时清理资源,相关Flink工作空间将按照按量付费模式正常进行计费,涉及的其他产品服务也会正常收取费用。具体计费详情,请参见按量付费。
- 如果您的阿里云主账号符合开通免费试用云数据库RDS MySQL Serverless的资格,建议您开通免费试用云数据库RDS MySQL Serverless,云数据库RDS MySQL Serverless只能免费试用3个月。实例到期后,您可以手动释放。具体操作,请参见释放实例。如果一直未释放该实例,实例将在试用时间结束后,按照正常价格计费,计费标准请参见Serverless费用。
- 如果您的阿里云主账号符合开通免费试用的资格,建议您开通免费试用检索分析服务Elasticsearch版,检索分析服务Elasticsearch版免费试用时长为1个月,超出免费试用时长继续使用Elasticsearch实例将会计费,具体计费详情,请参见计费概述。
- 实验所开通的云产品因数据连通性要求,需使用同一Region可用区,建议都选取杭州Region的同一可用区。涉及的云产品包括阿里云实时计算Flink版、阿里云数据库 RDS和阿里云检索分析服务Elasticsearch版。
- 如果您的阿里云账号只能领取部分免费试用产品,请您领取符合免费试用资格的产品,不满足免费试用资格的产品将会使用个人账户资源进行创建,并会产生一定的费用,请您及时关注账户扣费。
- 为了避免资源浪费并造成账号扣费的情况,请严格按照本文提供的参数进行配置。在实验完成之后,请您及时删除或禁用压测任务。
- 在实验开始前,请您选择领取免费试用额度。
- 开通对象存储OSS免费试用。
2.1 在实验室页面下方,选择对象存储OSS,单击立即试用。
说明:如果您的对象存储OSS资源抵扣包已使用完毕或无领取资格,请您跳过2.1-2.2步骤,请根据2.3-2.4步骤创建对象存储OSS,将正常收取费用,计费详情请参见OSS按量付费。
2.2 在对象存储OSS试用开通页面,在确认并了解相关信息后,根据页面提示申请试用。
2.3 前往对象存储OSS控制台。如果您没有开通过OSS服务,系统会提示您开通OSS服务,请按照页面提示开通OSS服务。
2.4 在左侧导航栏中,单击Bucket列表。
2.5 在Bucket列表页面,单击创建Bucket。
2.6 在创建Bucket页面,根据页面引导进行创建Bucket,其中地域选择华东1(杭州),其他配置保持默认即可,单击确定。
- 开通负载均衡SLB免费试用。
3.1 在实验室页面下方,选择传统型负载均衡CLB,单击立即试用。
说明:如果您的负载均衡SLB资源抵扣包已使用完毕或无领取资格,请您跳过此步骤,开通Flink试用后,将正常收取费用,计费详情请参见SLB按量付费。
3.2 在传统型负载均衡CLB开通页面,地域选择华东1(杭州),可用区选择华东1可用区J,选中服务协议,单击立即试用。
- 开通实时计算Flink版免费试用。
4.1 在实验室页面下方,选择实时计算Flink版,单击立即试用。
4.2 如果您第一次访问该服务,可能需要进行角色授权。请在弹出的授权请求页面,单击前往RAM进行授权后,单击同意授权,完成自动化角色授权。授权成功后,需返回实验室页面。
4.3 Flink开通页面,单击上方提示框中的领取免费的资源抵扣包,领取Flink资源抵扣包。
4.4 在Flink资源抵扣包页面,确认相关信息后,单击确认订单,根据页面提示完成领取。
4.5 领取完毕后,在Flink开通页面,刷新此页面后完成参数配置。
参数 |
说明 |
教程配置 |
付费模式 |
阿里云实时计算Flink版免费试用仅支持按量付费。 |
按量付费 |
地域 |
本教程不涉及上下游存储,所以可不与上下游存储保持一致。 |
华东1(杭州) |
可用区 |
本教程不涉及上下游存储,所以可不与上下游存储保持一致。 |
可用区J |
资源抵扣包 |
可用于抵扣中国内地公有云地域(目前仅支持上海,杭州,北京,深圳,张家口区域)按量付费模式用量,自购买资源包日起有效期三个月。领取资源包后,单击刷新按钮,即可更新此参数。 其中,本教程示例不适用于张家口地域。 |
5000CUH |
您需要领取负载均衡SLB和对象存储OSS资源抵扣包进行抵扣(即单击所给链接,单击对应产品卡片上的立即试用,在确认并了解相关信息后,根据页面提示申请试用),否则将会产生额外费用。 如果您的对象存储OSS或SLB资源抵扣包已使用完毕或无领取资格,开通Flink试用后,将正常收取费用,计费详情请参见SLB按量付费和OSS按量付费。 |
确认领取后,选中 |
|
SLB服务 |
SLB是通过浏览器访问Flink产品控制台的必要网络连接组件。在您开通阿里云实时计算Flink版服务后,会自动开通SLB服务,该SLB专用于阿里云实时计算Flink版服务。 |
选中 |
专有网络 |
选择您已创建的专有网络。如果您没有创建,请参见创建和管理专有网络进行创建。 |
选中您需要使用的VPC名称。 |
虚拟交换机 |
每个Flink作业的Task Manager和Job Manager实例都会占用一个IP,您需要至少选择1个虚拟交换机。 如果当前无可用虚拟交换机,请在当前地域当前可用区下进行创建,详情请参见创建和管理专有网络。 |
选中您需要使用的交换机名称。 |
工作空间名称 |
每个工作空间的计算资源隔离,开发控制台相互独立。 创建成功后不可修改。 |
flink-test |
OSS存储 |
OSS用于存储作业系统检查点、作业快照、日志和JAR包等信息。新建OSS存储的具体操作,请参见开通OSS服务。 不推荐对选择的Bucket开启非默认策略,例如多版本、合规保留等,可能会导致Flink作业异常。 |
选中您需要使用的OSS Bucket名称。 |
监控服务 |
如果您选中了监控服务(为您提供商业版Prometheus监控能力),则会正常进行收费,计费详情请参见ARMS按量计费。 |
不选中 |
4.6 请再次确认已了解并领取实时计算Flink版资源抵扣包。
实时计算Flink版资源抵扣包领取已在上述步骤2.3-2.4中完成。若未领取请查看对应步骤完成领取。
如果您的实时计算Flink版资源抵扣包已使用完毕或无领取资格,开通Flink试用后,将正常收取费用。
4.7 单击立即试用,并根据页面提示,完成阿里云实时计算Flink版工作空间创建。
说明:工作空间创建需要约5~10分钟,请耐心等待。5~10分钟后,请在实时计算控制台Flink全托管页签,刷新页面查看工作空间状态,当工作空间状态为运行中时,即可进入下一步。
- 开通云数据库RDS MySQL Serverless免费试用。
5.1 在实验室页面下方,选择云数据库RDS MySQL Serverless,单击立即试用。
5.2 在云数据库RDS MySQL Serverless开通页面,根据页面引导进行开通云数据库RDS MySQL Serverless,其中地域选择华东1(杭州),可用区选择杭州 可用区J,数据库类型选择MySQL,数据库版本号选择8.0,其他配置保持默认即可,选中服务协议,单击立即试用。
说明:如果您的云数据库RDS MySQL Serverless资源抵扣包已使用完毕或无领取资格,请您跳过5.1-5.2步骤,使用个人账户创建云数据库RDS MySQL Serverless,将正常收取费用,计费详情请参见Serverless费用。前往云数据库RDS控制台,在实例列表页面,单击创建实例。在购买页面,根据页面引导进行开通云数据库RDS MySQL Serverless,计费方式选择为Serverless,地域选择华东1(杭州),数据库类型选择MySQL,数据库版本号选择8.0,其他配置保持默认即可,单击立即购买。
- 开通检索分析服务Elasticsearch版免费试用。
6.1 在实验室页面下方,选择检索分析服务Elasticsearch版,单击立即试用。
6.2 在检索分析服务Elasticsearch版面板,根据如下说明进行参数配置,未提及的参数保持默认即可,选中服务协议,然后单击立即试用。
说明:使用前需注意,检索分析服务Elasticsearch版免费试用时长为1个月。
配置项 |
教程配置 |
实例类型 |
通用商业版 |
Elasticsearch版本 |
7.16 本实验的操作和示例均以Elasticsearch 7.16版本为例,其他版本操作和示例可能略有差别,详细信息请参见Elasticsearch快速入门。 |
场景初始化配置 |
通用场景 |
地域和可用区 |
地域:华东1(杭州) 可用区:杭州可用区J |
数据节点规格 |
2核4 GB |
数据节点存储类型 |
高效云盘 |
数据单节点存储空间 |
20 GB |
数据节点数量 |
3 |
Kibana规格 |
2核4 GB |
专有网络 |
选择您已创建的专有网络。如果没有创建,请参见创建和管理专有网络创建。 |
虚拟交换机 |
选择您已创建的虚拟交换机。如果没有创建,请参见创建和管理专有网络创建。 |
资源组 |
选择您已创建的资源组,可以选择默认资源组。如果没有创建,请参见创建资源组创建。 |
实例名称 |
test-es |
登录名 |
elastic |
登录密码 |
自定义密码 |
到期自动续费 |
根据您的实际需求进行选择。 如果选中到期自动续费,到期后会自动续费并产生一定的费用。 如果未选中到期自动续费,到期后实例会自动释放。 检索分析服务Elasticsearch版免费试用时长为1个月。 |
6.3 按照页面提示进入控制台或直接单击阿里云Elasticsearch控制台。
6.4 在阿里云Elasticsearch控制台的左侧导航栏,单击Elasticsearch实例。
6.5 在Elasticsearch实例页面,查看创建成功的实例及其状态。
说明:实例创建后,大约需要5分钟生效。等待实例状态变为正常,才可继续执行后续步骤。
- 领取完免费试用后,返回资源领取界面,单击我已开通,进入实验。
说明:如果您的阿里云账号只能领取部分免费试用产品,请您领取符合免费试用资格的产品,然后进入实验,不满足免费试用资格的产品将会使用个人账户资源进行创建,并会产生一定的费用,请您及时关注账户扣费。
3. 创建资源
实验所开通的云产品因数据连通性要求,需使用同一Region可用区,建议都选取杭州Region的同一可用区。涉及的云产品包括阿里云实时计算Flink版、阿里云数据库 RDS和阿里云检索分析服务Elasticsearch版。
- 创建对象存储OSS。
说明:
- 如果您已有对象存储OSS资源,您可跳过此步骤。
- 请您根据如下操作,开通对象存储OSS,会产生一定的费用,具体计费详情,请参见OSS按量付费。
1.1 前往对象存储OSS控制台。如果您没有开通过OSS服务,系统会提示您开通OSS服务,请按照页面提示开通OSS服务。
1.2 在左侧导航栏中,单击Bucket列表。
2.5 在Bucket列表页面,单击创建Bucket。
2.6 在创建Bucket页面,根据页面引导进行创建Bucket,其中地域选择华东1(杭州),其他配置保持默认即可,单击确定。
- 创建实时计算Flink版。
说明:
- 如果您已开通实时计算Flink,请您跳过本步骤。
- 请您根据如下操作,开通实时计算Flink,并会产生一定的费用,具体计费详情,请参见计费概述。在开通实时计算Flink时,需要使用到对存储OSS和负载均衡SLB,计费详情请参见SLB按量付费和OSS按量付费。
2.1 前往实时计算Flink版控制台。
2.2 在实时计算控制台页面,单击立即购买。
2.3 在Flink开通页面,按照如下提示完成参数配置。计费详情请参见实时计算 Flink版按量计费。
说明:如果您第一次访问该服务,可能需要进行角色授权。请在弹出的授权请求页面,单击前往RAM进行授权后,单击同意授权,完成自动化角色授权。
参数 |
说明 |
教程配置 |
付费模式 |
阿里云实时计算Flink版免费试用仅支持按量付费。 |
按量付费 |
地域 |
本教程不涉及上下游存储,所以可不与上下游存储保持一致。 |
华东1(杭州) |
可用区 |
本教程不涉及上下游存储,所以可不与上下游存储保持一致。 |
可用区J |
SLB服务 |
SLB是通过浏览器访问Flink产品控制台的必要网络连接组件。在您开通阿里云实时计算Flink版服务后,会自动开通SLB服务,该SLB专用于阿里云实时计算Flink版服务。SLB会产生额外付费,单个收费约3元/天,实际收费详情请参见SLB按量付费。 |
选中 |
专有网络 |
选择您已创建的专有网络。如果您没有创建,请参见创建和管理专有网络进行创建。 |
选中您需要使用的VPC名称。 |
虚拟交换机 |
每个Flink作业的Task Manager和Job Manager实例都会占用一个IP,您需要至少选择1个虚拟交换机。 如果当前无可用虚拟交换机,请在当前地域当前可用区下进行创建,详情请参见创建和管理专有网络。 |
选中您需要使用的交换机名称。 |
工作空间名称 |
每个工作空间的计算资源隔离,开发控制台相互独立。 创建成功后不可修改。 |
例如flink-test |
OSS存储 |
OSS用于存储作业系统检查点、作业快照、日志和JAR包等信息。新建OSS存储的具体操作,请参见开通OSS服务。 不推荐对选择的Bucket开启非默认策略,例如多版本、合规保留等,可能会导致Flink作业异常。 |
选中您需要使用的OSS Bucket名称。 |
监控服务 |
如果您选中了监控服务(为您提供商业版Prometheus监控能力),则会正常进行收费,计费详情请参见ARMS按量计费。 |
不选中 |
2.4 单击确认订单,并根据页面提示,完成阿里云实时计算Flink版工作空间创建。
说明:工作空间创建需要约5~10分钟,请耐心等待。5~10分钟后,请在实时计算控制台Flink全托管页签,刷新页面查看工作空间状态,当工作空间状态为运行中时,即可进入下一步。
- 创建云数据库RDS MySQL Serverless。
说明:
- 如果您已开通云数据库RDS MySQL Serverless,请您跳过本步骤。
- 请您根据如下操作,开通云数据库RDS MySQL Serverless,并会产生一定的费用,具体计费详情,请参见Serverless费用。
3.1 前往云数据库RDS控制台。
3.2 在实例列表页面,单击创建实例。
3.3 在购买页面,根据页面引导进行开通云数据库RDS MySQL Serverless,计费方式选择为Serverless,地域选择华东1(杭州),可用区选择杭州可用区J,数据库类型选择MySQL,数据库版本号选择8.0,其他配置保持默认即可,单击立即购买。
3.4 在实例列表页面,请耐心等待大约5分钟,当工作空间状态为运行中时,即可进入下一步。
- 创建Elasticsearch实例。
4.1 前往阿里云Elasticsearch控制台。
4.2 在左侧导航栏中,单击Elasticsearch实例。
4.3 在Elasticsearch实例页面,单击创建。
4.4 在购买页面的前三个配置页面,完成参数配置,参考如下说明配置参数,未提及的参数保持默认即可,单击下一步:确认订单。
说明:使用前需注意,检索分析服务Elasticsearch版免费试用时长为1个月。
配置项 |
教程配置 |
实例类型 |
通用商业版 |
Elasticsearch版本 |
7.16 本实验的操作和示例均以Elasticsearch 7.16版本为例,其他版本操作和示例可能略有差别,详细信息请参见Elasticsearch快速入门。 |
场景初始化配置 |
通用场景 |
地域和可用区 |
地域:华东1(杭州) 可用区:杭州可用区J |
数据节点规格 |
2核4 GB |
数据节点存储类型 |
高效云盘 |
数据单节点存储空间 |
20 GB |
数据节点数量 |
3 |
Kibana规格 |
2核4 GB |
专有网络 |
选择您已创建的专有网络。如果没有创建,请参见创建和管理专有网络创建。 |
虚拟交换机 |
选择您已创建的虚拟交换机。如果没有创建,请参见创建和管理专有网络创建。 |
资源组 |
选择您已创建的资源组,可以选择默认资源组。如果没有创建,请参见创建资源组创建。 |
实例名称 |
test-es |
登录名 |
elastic |
登录密码 |
自定义密码 |
到期自动续费 |
根据您的实际需求进行选择。 如果选中到期自动续费,到期后会自动续费并产生一定的费用。 如果未选中到期自动续费,到期后实例会自动释放。 检索分析服务Elasticsearch版免费试用时长为1个月。 |
4.5 在确认订单页面,预览实例配置,选中服务协议,单击立即购买。
4.6 提示开通成功后,单击管理控制台。
4.7 在Elasticsearch实例页面中查看创建成功的实例及其状态。
说明:实例创建后,大约需要5分钟生效。等待实例生效且状态变为正常,才可继续执行后续步骤。
4. 获取Flink实例的虚拟交换机的网段
- 前往实时计算控制台,找到您创建的Flink,选择右侧操作列下的更多>工作空间详情。
- 在工作空间详情面板中,您可查看到虚拟交换机的网段。
说明:在下一小节中,你需要将虚拟交换机的网段添加到RDS的安全组中,实现内网访问。
5. 配置RDS Serverless版实例
步骤将指导您在RDS MySQL Serverless版实例上创建数据库、数据库账号、获取数据库访问地址及设置白名单。
- 前往云数据库RDS控制台。
- 在左侧导航栏中,单击实例列表。
返回如下页面,您可看到刚刚创建的RDS MySQL Serverless版实例资源。
- 创建数据库。
3.1 在实例列表页面,单击实例ID,进入实例基本信息页面,在左侧导航栏单击数据库管理,可以创建数据库。
3.2 在创建数据库对话框中,根据如下说明配置数据库,单击创建。
参数说明:
- 数据库(DB)名称:输入数据库名称,例如serverless。
- 支持字符集:默认设为utf8。
- 备注说明:非必填。用于备注该数据库的相关信息,便于后续数据库管理,最多支持256个字符。
- 创建账号及授权。
4.1 在左侧导航栏单击账号管理,可以创建数据库帐号,并完成授权。
4.2 在创建账号对话框中,根据如下说明配置账号,单击确定。
- 数据库账号:输入数据库账号名称,例如test_user。
- 账号类型:选择普通账号。
- 授权数据库:将上一步中创建的数据库添加至已授权数据库列表中,并设置权限为读写(DDL+DML)。
- 密码:输入账号密码,例如Password123。
- 确认密码:再次输入账号密码。
- 白名单授权。
为确保数据安全,云数据库RDS默认采用白名单策略,仅支持白名单内的客户端访问。
您需要将Flink实例的虚拟交换机的网段添加至RDS白名单分组内,手动操作如下截图。
Flink实例的虚拟交换机的网段您可在Flink控制台>您的Flink实例>更多>工作空间详情>虚拟交换机的网段Flink实例的虚拟交换机的网段
- 获取数据库访问地址。
后续步骤需要使用数据库的内网地址。
6. 创建数据库表并导入数据
在这个例子中,我们将创建三张数据表,其中一张orders_dataset_tmp是导入数据的临时表,其他两张作为源表,体验淘宝母婴订单实时查询。
- 在数据库连接页面,单击登录数据库。
- 在登录实例对话框中,输入数据库账号和数据库密码,单击测试连接,测试连接通过后,单击登录。
- 在首页左侧,单击数据库实例,在已登录实例中找到您创建的数据库,并双击数据库。
- 在SQLConsole页签中,输入如下SQL建表语句,然后单击执行。
create table orders_dataset_tmp( user_id bigint comment '用户身份信息', auction_id bigint comment '购买行为编号', cat_id bigint comment '商品种类序列号', cat1 bigint comment '商品序列号(根类别)', property TEXT comment '商品属性', buy_mount int comment '购买数量', day TEXT comment '购买时间' ); create table orders_dataset( order_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY comment '订单id', user_id bigint comment '用户身份信息', auction_id bigint comment '购买行为编号', cat_id bigint comment '商品种类序列号', cat1 bigint comment '商品序列号(根类别)', property TEXT comment '商品属性', buy_mount int comment '购买数量', day TEXT comment '购买时间' ); -- create table baby_dataset( user_id bigint NOT NULL PRIMARY KEY, birthday text comment '婴儿生日', gender int comment '0 denotes female, 1 denotes male, 2 denotes unknown' );
- 在DMS数据管理平台,选择左侧的常用功能>数据导入。
- 配置如下信息后单击提交申请,将 (sample)sam_tianchi_mum_baby_trade_history.csv 导入 orders_dataset_tmp 表。
说明:请您单击此链接下载(sample)sam_tianchi_mum_baby_trade_history.csv。
配置项 |
说明 |
数据库 |
选择您的数据库实例 |
文件编码 |
自动识别 |
导入模式 |
极速模式 |
文件类型 |
CSV 格式 |
目标表 |
选择orders_dataset_tmp表 |
数据位置 |
选择第1行为属性 |
写入方式 |
INSERT |
附件 |
单击上传文件,选择(sample)sam_tianchi_mum_baby_trade_history.csv文件。 |
- 审批完成后,单击执行变更。
返回如下结果,表示数据导入完成。
- 返回数据导入页面,单击右上角的批量数据导入。
- 根据5-7步骤操作,配置如下信息后单击提交申请,将(sample)sam_tianchi_mum_baby.csv 导入 baby_dataset 表。
说明:请您单击此链接下载(sample)sam_tianchi_mum_baby.csv。
配置项 |
说明 |
数据库 |
选择您的数据库实例 |
文件编码 |
自动识别 |
导入模式 |
极速模式 |
文件类型 |
CSV 格式 |
目标表 |
选择baby_dataset表 |
数据位置 |
选择第1行为属性 |
写入方式 |
INSERT |
附件 |
单击上传文件,选择(sample)sam_tianchi_mum_baby.csv文件。 |
- 导入完成之后,在SQLConsole页签中,输入如下SQL,然后单击执行,将订单数据导入到订单源表orders_dataset 中。
insert into orders_dataset(user_id,auction_id,cat_id,cat1,property,buy_mount,day) select * from orders_dataset_tmp;
7. 配置Elasticsearch自动创建索引
- 前往阿里云Elasticsearch控制台。
- 在左侧导航栏中,单击Elasticsearch实例。
- 在Elasticsearch实例页面,单击目标实例ID,进入实例管理页面。
- 在左侧导航栏,选择配置与管理>安全配置。
- 在集群网络设置页面,单击VPC私网访问白名单右侧的修改。
- 在修改VPC私网访问白名单面板,单击default右侧的配置。
- 在新增IP白名单分组对话框中,您需要将Flink实例的虚拟交换机的网段添加至Elasticsearch实例的VPC私网访问白名单内,手动操作如下截图。
Flink实例的虚拟交换机的网段您可在Flink控制台>您的Flink实例>更多>工作空间详情>虚拟交换机的网段Flink实例的虚拟交换机的网段
- 在左侧导航栏,选择配置与管理>ES集群配置。
- 在YML文件配置区域,单击右侧的修改配置。
- 在YML文件配置面板,自动创建索引选择允许自动创建索引,选中下方的重启提示,单击确定。
- 在Elasticsearch实例页面,请您耐心等待5-10分钟左右,状态从生效中变为运行中,表示配置变更完成,才可继续使用Elasticsearch。
8. 创建实时查询SQL作业
- 前往实时计算控制台,找到您创建的Flink,单击右侧操作列下的控制台。
- 在左侧导航栏,单击Session集群。
- 在Session集群页面,单击创建Session集群。
- 在创建Session集群页面,根据如下说明配置参数,未提及的参数保持默认值即可,然后单击创建Session集群。
参数说明:
配置项 |
说明 |
教程配置 |
名称 |
Session集群名称。 |
test |
状态 |
设置当前集群的期望运行状态: STOPPED:当集群配置完成后保持停止状态,同样会停止所有在运行中的作业。 RUNNING:当集群配置完成后保持运行状态。 |
RUNNING |
设置为SQL Previews集群 |
将此Session集群设置为SQL Preview查询的资源集群。 |
开启 |
- 在Session集群页面,等待您创建的Session集群的状态从启动中变为运行中后,您可以进入后续步骤。
- 在左侧导航栏,单击作业开发。
- 在作业开发页签,单击新建。
- 在新建文件对话框,自定义文件名称,例如输入为test,文件类型选择流作业/SQL,其他保持默认即可,单击确认。
- 接下来,我们在右侧编辑窗格中输入以下语句来创建一张临时表,并使用 MySQL CDC 连接器实时捕获 orders_dataset和 baby_dataset的变化。
说明:您需要将语句中hostname参数替换为RDS MySQL Serverless数据库的内网地址、将username和 password参数替换为您创建的数据库账号及密码、将 database-name参数替换为您创建的数据库名称。其中,'connector' = 'mysql'指定了使用MySQL CDC连接器来捕获变化数据。
任何时候您都可以单击顶部工具栏中的验证,来确认作业Flink SQL语句中是否存在语法错误。
CREATE TEMPORARY TABLE orders_dataset ( order_id BIGINT, `user_id` bigint, auction_id bigint, cat_id bigint, cat1 bigint, property varchar, buy_mount int, `day` varchar , PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '******************.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = '***********', 'password' = '***********', 'database-name' = '***********', 'table-name' = 'orders_dataset' ); CREATE TEMPORARY TABLE baby_dataset ( `user_id` bigint, birthday varchar, gender int, PRIMARY KEY(user_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '******************.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = '***********', 'password' = '***********', 'database-name' = '***********', 'table-name' = 'baby_dataset' );
- 为了测试是否成功地捕获了源表数据,紧接着在下面写一行 SELECT * FROM orders_dataset;语句,选中临时表和select语句,并单击工具栏中的执行。如果控制台中打印了相应的数据行,则说明捕获成功,如下图所示:
- 我们在右侧编辑窗格中输入以下语句来创建一张临时表,并使用 Elasticsearch 连接器连接到 Elasticsearch 实例。
说明:您需要将语句中hosts参数替换为Elasticsearch实例的内网地址、将password参数替换为您创建Elasticsearch实例时的输入的登录Kibana密码。其中,'connector' = 'elasticsearch-7'指定了使用 Elasticsearch 连接器来连接 Elasticsearch 实例写入数据。。
任何时候您都可以单击顶部工具栏中的验证,来确认作业Flink SQL语句中是否存在语法错误。
编辑窗格中需要去掉 SELECT * FROM orders_dataset;语句。
CREATE TEMPORARY TABLE es_sink( order_id BIGINT, `user_id` bigint, auction_id bigint, cat_id bigint, cat1 bigint, property varchar, buy_mount int, `day` varchar , birthday varchar, gender int, PRIMARY KEY(order_id) NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。 ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://**********:9200', 'index' = 'enriched_orders', 'username' ='elastic', 'password' ='*******'--创建ES实例时自定义的密码 );
- 接下来,我们希望对原始数据按照 user_id 进行 JOIN,构成一张宽表。并把宽表数据写入到 Elasticsearch 的 enriched_orders 索引中。我们在 Flink 作业编辑窗格中输入如下代码:
INSERT INTO es_sink SELECT o.*, b.birthday, b.gender FROM orders_dataset /*+ OPTIONS('server-id'='123450-123452') */ o LEFT JOIN baby_dataset /*+ OPTIONS('server-id'='123453-123455') */ as b ON o.user_id = b.user_id;
在保证源表中有数据的情况下,再次执行 Flink 作业,观察控制台的输出结果:
- 在作业开发页面,单击上线后,再单击运维。将我们编写的Flink SQL作业部署上线执行。
- 在运维页面,单击启动。您也可以在Flink UI控制台中观察流数据处理图。
说明:阿里云实时计算控制台在使用执行功能调试时,不会写入任何数据到下游中。因此为了测试使用 SQL Connector 写入汇表,您必须使用上线功能。
- 返回Elasticsearch控制台,在左侧导航中,选中配置与管理>可视化控制。
- 在Kibana区域,单击修改配置,配置Kibana的公网访问白名单。
15.1 在访问配置区域,单击Kibana公网访问白名单右侧的修改。
15.2 在修改白名单面板,单击default分组右侧的配置。
15.3 在新增IP白名单分组的对话框中,去掉默认的IP地址后(默认禁止所有IP地址访问),将待访问设备的公网IP地址添加至白名单中。
说明:获取本地设备公网IP地址的方式可能因你所处的网络环境或操作不同而不同。以下是不同系统通过命令方式获取本地设备公网IP地址的参考方法:
Linux操作系统:打开终端,输入curl ifconfig.me命令后回车。
Windows操作系统:打开命令提示符,输入curl ip.me命令后回车。
macOS操作系统:打开终端,输入curl ifconfig.me命令后回车。
15.4 单击确认。
确认后,如果对应白名单中出现您添加的IP地址,说明配置成功。
- 返回实例管理页面,在Kibana区域,单击公网入口。
- 在登录页面,输入用户名和密码,单击登录。
说明:用户名为elastic,密码为您创建实例时设置的密码。
- 在欢迎使用Elastic页面,单击自己浏览。
- 在左侧导航栏中,单击Stack Management。
- 在左侧管理区域,单击Index Management。
- 在索引管理页面,搜索enriched_orders,单击enriched_orders查看enriched_orders索引是否成功创建。
- Elasticsearch 的enriched_orders索引创建成功后,单击页面左上角的图标,选择Analytics>Discover>创建索引模式,输入enriched_orders,单击创建索引模式。创建完成后就可以在重新进入Kibana>Discover看到写入的数据了。
- 接下来,我们通过对 MySQL 中源表的数据进行增改删操作,每执行一步就刷新一下Kibana>Discover界面,观察数据的变化。
23.1 在RDS数据库中,执行如下命令,order_dataset 表添加一条数据。然后在Discover页面中输入user_id:2222222,并更新页面查看结果。
insert into orders_dataset values ( DEFAULT ,2222222,2222222,50018855,38,'21458:33304;6933666:4421827;21475:137319;12121566:3861755',1,'20130915');
23.2 baby_dataset 表中添加一条数据。
insert into baby_dataset values(144335047,'20150523',1);
写入前
写入后
23.3 order_dataset表更新一条数据。
select order_id from orders_dataset where user_id = 2757; --根据查到的order_id更新数据 UPDATE orders_dataset SET auction_id = 2222223 WHERE order_id = ;
更新前
更新后
23.4 order_dataset 表中删除一条数据。
select order_id from orders_dataset where user_id = 2222222; DELETE FROM orders_dataset WHERE order_id = ;
删除前
删除后
9. 创建实时大屏SQL作业
前面三步和上一小节相同,区别在于后面步骤作业的处理逻辑 SQL 不同,要统计的指标不同,所以 Elasticsearch 的 Schema 与之前不同。
- 在左侧导航栏,单击作业开发。
- 在作业开发页签,单击新建。
- 在新建文件对话框,自定义文件名称,例如输入为test1,文件类型选择流作业/SQL,其他保持默认即可,单击确认。
- 接下来,我们在右侧编辑窗格中输入以下语句来创建一张临时表,并使用 MySQL CDC 连接器实时捕获 orders_dataset和 baby_dataset的变化。
说明:您需要将语句中hostname参数替换为RDS MySQL Serverless数据库的内网地址、将username和 password参数替换为您创建的数据库账号及密码、将 database-name参数替换为您创建的数据库名称。其中,'connector' = 'mysql'指定了使用MySQL CDC连接器来捕获变化数据。
任何时候您都可以单击顶部工具栏中的验证,来确认作业Flink SQL语句中是否存在语法错误。
CREATE TEMPORARY TABLE orders_dataset ( order_id BIGINT, `user_id` bigint, auction_id bigint, cat_id bigint, cat1 bigint, property varchar, buy_mount int, `day` varchar , PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '******************.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = '***********', 'password' = '***********', 'database-name' = '***********', 'table-name' = 'orders_dataset' ); CREATE TEMPORARY TABLE baby_dataset ( `user_id` bigint, birthday varchar, gender int, PRIMARY KEY(user_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '******************.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = '***********', 'password' = '***********', 'database-name' = '***********', 'table-name' = 'baby_dataset' );
- 为了测试是否成功地捕获了源表数据,紧接着在下面写一行 SELECT * FROM orders_dataset;语句,选中临时表和select语句,并单击工具栏中的执行。如果控制台中打印了相应的数据行,则说明捕获成功,如下图所示:
- 我们在右侧编辑窗格中输入以下语句来创建一张临时表,并使用 Elasticsearch 连接器连接到 Elasticsearch 实例。
说明:您需要将语句中hosts参数替换为Elasticsearch实例的内网地址、将password参数替换为您创建Elasticsearch实例时的输入的登录Kibana密码。其中,'connector' = 'elasticsearch-7'指定了使用 Elasticsearch 连接器来连接 Elasticsearch 实例写入数据。。
任何时候您都可以单击顶部工具栏中的验证,来确认作业Flink SQL语句中是否存在语法错误。
编辑窗格中需要去掉 SELECT * FROM orders_dataset;语句。
CREATE TEMPORARY TABLE es_sink( day_year varchar, `buy_num` bigint, baby_num bigint, PRIMARY KEY(day_year) NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。 ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://**********:9200', 'index' = 'enriched_orders_view', 'username' ='elastic', 'password' ='*******'--创建ES实例时自定义的密码 );
- 接下来,我们希望对原始数据按照 user_id 进行 JOIN,构成一张宽表。然后对宽表数据的订单时间取到月份进行分组 GROUP BY,并统计每个分组中订单的购买数量SUM和出生婴儿的数量COUNT,并将结果数据写入到 Elasticsearch 的 enriched_orders_view索引中。我们在 Flink 作业编辑窗格中输入如下代码:
INSERT INTO es_sink SELECT SUBSTRING(tmp1.`day` FROM 1 FOR 6) as day_year, SUM(tmp1.buy_mount) as buy_num, COUNT(birthday) as baby_num FROM( SELECT o.*, b.birthday, b.gender FROM orders_dataset /*+ OPTIONS('server-id'='123456-123457') */ o LEFT JOIN baby_dataset /*+ OPTIONS('server-id'='123458-123459') */ as b ON o.user_id = b.user_id ) tmp1 GROUP BY SUBSTRING(tmp1.`day` FROM 1 FOR 6)
在保证源表中有数据的情况下,再次执行 Flink 作业,观察控制台的输出结果:
- 在作业开发页面,单击上线后,再单击运维。将我们编写的Flink SQL作业部署上线执行。
- 在运维页面,单击启动。您也可以在Flink UI控制台中观察流数据处理图。
说明:阿里云实时计算控制台在使用执行功能调试时,不会写入任何数据到下游中。因此为了测试使用 SQL Connector 写入汇表,您必须使用上线功能。
- 返回至Kibana页面,左侧导航栏中,单击Stack Management。
- 在左侧管理区域,单击Index Management。
- 在索引管理页面,搜索enriched_orders_view,单击enriched_orders_view查看enriched_orders_view索引是否成功创建。
- Elasticsearch的enriched_orders_view索引创建成功后,在左侧导航栏,单击索引模式。
- 在索引模式页面,单击创建索引模式。
- 在创建索引模式页面,输入enriched_orders_view,单击创建索引模式。
- 单击页面左上角的图标,选择Analytics>Discover,在页面左侧选择enriched_orders_view,您就可以看到写入的数据了。
- 在Discover页面,单击左下角Available fields>baby_num,单击后会展示TOP 5 VALUES小窗口,单击窗口下方的Visualize,即可跳转到可视化图表界面。
跳转界面后切换图形格式为柱状图Bar。
配置右侧X-axis、Y-axis。
X-axis配置Select a field为day_year.keyword,Number of values选择到最大100,order by选择 alphabetical ,order direction 选择 ascending,Display name自定义横轴名称,此处定义为 day_year_month ,然后单击Close。
Y-axis配置Select a field为buy_num,Display name自定义纵轴名称,此处定义为 buy_num ,Axis side 选择 Left ,然后点击Close。界面中间即生成了对应的折线图。
- 单击右下角的+图标,新建一个layer,切换新建的layer的图格式为折线图Line
配置右侧X-axis、Y-axis
X-axis配置Select a field为day_year.keyword,Number of values选择到最大100,order by选择 alphabetical ,order direction 选择 ascending,Display name自定义横轴名称,此处定义为 day_year_month ,然后点击Close。与上一个X-axis配置完全相同。
Y-axis配置Select a field为baby_num,Display name自定义纵轴名称,此处定义为 baby_num ,Value format选择Pecent,Axis side 选择 Right ,然后点击Close。界面中间即生成了对应的折线图与柱状图的复合图。
- 最后单击右上角的Save,定义此图表的名称即可保存。
10. 实验附件
以上就是本实验的全部步骤。
完整的 Flink SQL 语句如下:
- 实时查询SQL作业
CREATE TEMPORARY TABLE orders_dataset ( order_id BIGINT, `user_id` bigint, auction_id bigint, cat_id bigint, cat1 bigint, property varchar, buy_mount int, `day` varchar , PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '******************.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = '***********', 'password' = '***********', 'database-name' = '***********', 'table-name' = 'source_table' ); CREATE TEMPORARY TABLE baby_dataset ( `user_id` bigint, birthday varchar, gender int, PRIMARY KEY(user_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '******************.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = '***********', 'password' = '***********', 'database-name' = '***********', 'table-name' = 'source_table' ); CREATE TEMPORARY TABLE es_sink( order_id BIGINT, `user_id` bigint, auction_id bigint, cat_id bigint, cat1 bigint, property varchar, buy_mount int, `day` varchar , birthday varchar, gender int, PRIMARY KEY(order_id) NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。 ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://**********:9200', 'index' = 'enriched_orders', 'username' ='elastic', 'password' ='*******'--创建ES实例时自定义的密码 ); INSERT INTO es_sink SELECT o.*, b.birthday, b.gender FROM orders_dataset /*+ OPTIONS('server-id'='123450-123452') */ o LEFT JOIN baby_dataset /*+ OPTIONS('server-id'='123453-123455') */ as b ON o.user_id = b.user_id;
- 实时大屏SQL作业
CREATE TEMPORARY TABLE orders_dataset ( order_id BIGINT, `user_id` bigint, auction_id bigint, cat_id bigint, cat1 bigint, property varchar, buy_mount int, `day` varchar , PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '******************.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = '***********', 'password' = '***********', 'database-name' = '***********', 'table-name' = 'orders_dataset' ); CREATE TEMPORARY TABLE baby_dataset ( `user_id` bigint, birthday varchar, gender int, PRIMARY KEY(user_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '******************.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = '***********', 'password' = '***********', 'database-name' = '***********', 'table-name' = 'baby_dataset' ); CREATE TEMPORARY TABLE es_sink( day_year varchar, `buy_num` bigint, baby_num bigint, PRIMARY KEY(day_year) NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。 ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://**********:9200', 'index' = 'enriched_orders_view', 'username' ='elastic', 'password' ='*******'--创建ES实例时自定义的密码 ); INSERT INTO es_sink SELECT SUBSTRING(tmp1.`day` FROM 1 FOR 6) as day_year, SUM(tmp1.buy_mount) as buy_num, COUNT(birthday) as baby_num FROM( SELECT o.*, b.birthday, b.gender FROM orders_dataset /*+ OPTIONS('server-id'='123456-123457') */ o LEFT JOIN baby_dataset /*+ OPTIONS('server-id'='123458-123459') */ as b ON o.user_id = b.user_id ) tmp1 GROUP BY SUBSTRING(tmp1.`day` FROM 1 FOR 6)
向云数据库RDS中导入数据,您可以在数据库后台导入下面的CSV文件:
云数据库RDS调试SQL语句如下:
MySQL调试DML语句
--order_dataset表添加一条数据 insert into orders_dataset values ( DEFAULT ,2222222,2222222,50018855,38,'21458:33304;6933666:4421827;21475:137319;12121566:3861755',1,'20130915'); -- baby_dataset 表中添加一条数据 insert into baby_dataset values(144335047,'20150523',1); --order_dataset表更新一条数据 select order_id from orders_dataset where user_id = 2757; UPDATE orders_dataset SET auction_id = 2222223 WHERE order_id = ; -- order_dataset 表中删除一条数据 select order_id from orders_dataset where user_id = 2222222; DELETE FROM orders_dataset WHERE order_id = ;
11. 清理及后续
通过简单的数据实时分析示例,您已完成了阿里云实时计算Flink版的开通、SQL查询语句的执行,以及可视化数据分析,快速体验了实时计算Flink版产品。
说明:Flink产品试用到期时若没有继续付费需求,请按时释放实例,以免产生额外付费。
清理
实时计算Flink版提供的按量付费(3个月5000CUH计算资源)试用,具有时长和容量限制,使用完请及时清理相关资源。如果未及时清理资源,相关Flink工作空间将按照按量付费模式正常进行计费,涉及的其他产品服务也会正常收取费用。完成教程后,您可以按照如下场景进行处理:
- 如果您需要继续使用,此时实时计算Flink版的5000CUH资源已使用完或者已经超过3个月,请随时查看欠费金额,并在规定时间内充值结清欠费账单,否则无法继续正常使用工作空间。计费详情请参见按量付费和欠费说明。
- 如果您不需要继续使用,请及时清理测试数据和试用资源:
- 实时计算Flink版资源清理
登录实时计算控制台,单击目标工作空间操作列下的更多>释放资源,单击确定。应用实时监控服务ARMS(如果您开通时选中)和负载均衡SLB会随实时计算Flink版一起被释放。
- 对象存储OSS资源清理
删除对象存储空间,详情请参见删除存储空间。
- 负载均衡SLB资源清理
删除传统型负载均衡CLB,详情请参见释放负载均衡实例。
- 专有网络VPC资源清理
删除专有网络,详情请参见删除专有网络。
- RDS MySQL Serverless资源清理
本教程使用的RDS MySQL Serverless实例只能免费试用3个月。实例到期后,您可以手动释放。具体操作,请参见释放实例。如果一直未释放该实例,实例将在试用时间结束后,按照正常价格计费,计费标准请参见Serverless费用。
- 阿里云Elasticsearch实例资源清理
如果您不需要继续使用,请及时清理检索分析服务Elasticsearch版资源。前往阿里云Elasticsearch控制台,在Elasticsearch实例页面,找到您创建的Elasticsearch实例,选择右侧操作列下的>释放实例,根据页面提示释放Elasticsearch实例资源。
后续
如果您需要将某个作业提交至生产环境运行(请勿将Session集群用于正式生产环境),需要在作业开发页面,单击上线,在弹出的确认对话框中,单击确定,然后在作业运维页面,单击启动,具体操作请参见作业开发和作业启动。
12. 清理及后续
通过简单的数据实时分析示例,您已完成了阿里云实时计算Flink版的开通、SQL查询语句的执行,以及可视化数据分析,快速体验了实时计算Flink版产品
清理
- 实时计算Flink版资源清理
登录实时计算控制台,单击目标工作空间操作列下的更多>释放资源,单击确定。应用实时监控服务ARMS(如果您开通时选中)和负载均衡SLB会随实时计算Flink版一起被释放。
- 对象存储OSS资源清理
删除对象存储空间,详情请参见删除存储空间。
- 专有网络VPC资源清理
删除专有网络,详情请参见删除专有网络。
- RDS MySQL Serverless资源清理
删除RDS MySQL Serverless,详情请参见释放实例。如果一直未释放该实例,实例将在试用时间结束后,按照正常价格计费,计费标准请参见Serverless费用。
- 阿里云Elasticsearch实例资源清理
如果您不需要继续使用,请及时清理检索分析服务Elasticsearch版资源。前往阿里云Elasticsearch控制台,在Elasticsearch实例页面,找到您创建的Elasticsearch实例,选择右侧操作列下的>释放实例,根据页面提示释放Elasticsearch实例资源。
后续
如果您需要将某个作业提交至生产环境运行(请勿将Session集群用于正式生产环境),需要在作业开发页面,单击上线,在弹出的确认对话框中,单击确定,然后在作业运维页面,单击启动,具体操作请参见作业开发和作业启动。
实验链接:https://developer.aliyun.com/adc/scenario/14502c36ab164ccd98f80b24cfecf76c