最佳实践 | RDS & POLARDB归档到X-Pack Spark计算

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 部分RDS和POLARDB For MySQL的用户曾遇到如下场景:当一张表的数据达到几千万时,你查询一次所花的时间会变多。这时候采取水平分表的策略,水平拆分是将同一个表的数据进行分块保存到不同的数据库中,这些数据库中的表结构完全相同。本文将介绍如何把这些水平分表的表归档到X-Pack Spark数仓,做统一的大数据计算。

X-Pack Spark服务通过外部计算资源的方式,为Redis、Cassandra、MongoDB、HBase、RDS存储服务提供复杂分析、流式处理及入库、机器学习的能力,从而更好的解决用户数据处理相关场景问题。

be0c5c87da12145325d0fbbf2770c011e63bbb7a.png

RDS & POLARDB分表归档到X-Pack Spark步骤

一键关联POLARDB到Spark集群

一键关联主要是做好spark访问RDS & POLARDB的准备工作。
3da40523db7ba8a13ad447479c143ae90e1f8f4d.png

POLARDB表存储

在database ‘test1’中每5分钟生成一张表,这里假设为表 'test1'、'test2'、'test2'、...
f8b752d3dd8c53fb20c66e01c00fb201f6396c1a.png

具体的建表语句如下:

*请左右滑动阅览

 CREATE TABLE `test1` ( `a` int(11) NOT NULL,
                        `b` time DEFAULT NULL,          
               `c` double DEFAULT NULL,
                         PRIMARY KEY (`a`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8

归档到Spark的调试

x-pack spark提供交互式查询模式支持直接在控制台提交sql、python脚本、scala code来调试。

1、首先创建一个交互式查询的session,在其中添加mysql-connector的jar包。

*请左右滑动阅览

wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/mysql-connector-java-5.1.34.jar

6ecbde3466820df6fbee0285e9d11d96cbd995fb.png

2、创建交互式查询

以pyspark为例,下面是具体归档demo的代码:

*请左右滑动阅览

spark.sql("drop table sparktest").show()
# 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致
spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) "
      "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()

#本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区
# CREATE TABLE `test1` (
#     `a` int(11) NOT NULL,
#                     `b` time DEFAULT NULL,
#                                      `c` double DEFAULT NULL,
#                                                         PRIMARY KEY (`a`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8
for num in range(1, 4): 
    #构造polardb的表名
    dbtable = "test1." + "test" + str(num)
    #spark外表关联polardb对应的表
    externalPolarDBTableNow = spark.read \
        .format("jdbc") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", "jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306") \
        .option("dbtable", dbtable) \
        .option("user", "name") \
        .option("password", "xxx*") \
        .load().registerTempTable("polardbTableTemp")
    #生成本次polardb表数据要写入的spark表的分区信息
    (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num))
    #执行导数据sql 
    spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s )  "
          "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show()
    #删除临时的spark映射polardb表的catalog
    spark.catalog.dropTempView("polardbTableTemp")
    #查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除
    spark.sql("show partitions sparktest").show(1000, False)
    spark.sql("select count(*) from sparktest").show()

归档作业上生产

交互式查询定位为临时查询及调试,生产的作业还是建议使用spark作业的方式运行,使用文档参考。这里以pyspark作业为例:
9730abaad2803bfb6a3b7b080a26fe03b808e5ad.png

/polardb/polardbArchiving.py 内容如下:

*请左右滑动阅览

# -*- coding: UTF-8 -*-

from __future__ import print_function

import sys
from operator import add

from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("PolardbArchiving") \
        .enableHiveSupport() \
        .getOrCreate()

    spark.sql("drop table sparktest").show()
    # 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致
    spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) "
          "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()

    #本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区
    # CREATE TABLE `test1` (
    #     `a` int(11) NOT NULL,
    #      `b` time DEFAULT NULL,
    #      `c` double DEFAULT NULL,
    #       PRIMARY KEY (`a`)
    # ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    for num in range(1, 4):
        #构造polardb的表名
        dbtable = "test1." + "test" + str(num)
        #spark外表关联polardb对应的表
        externalPolarDBTableNow = spark.read \
            .format("jdbc") \
            .option("driver", "com.mysql.jdbc.Driver") \
            .option("url", "jdbc:mysql://pc-.mysql.polardb.rds.aliyuncs.com:3306") \
            .option("dbtable", dbtable) \
            .option("user", "ma,e") \
            .option("password", "xxx*") \
            .load().registerTempTable("polardbTableTemp")
        #生成本次polardb表数据要写入的spark表的分区信息
        (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num))
        #执行导数据sql
        spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s )  "
              "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show()
        #删除临时的spark映射polardb表的catalog
        spark.catalog.dropTempView("polardbTableTemp")
        #查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除
        spark.sql("show partitions sparktest").show(1000, False)
        spark.sql("select count(*) from sparktest").show()
    spark.stop()

扫描下方 ⬇️二维码

了解关于X-Pack Spark计算服务的更多信息

了解更多.jpeg

双十一还不知道买什么?

阿里云数据库双11爆款直降

这份购物清单 ⬇️给你拿去!

双十一活动长图.png

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
48 3
|
1月前
|
存储 监控 关系型数据库
MySQL计算某条数据与上一条数据的生成时间差
MySQL计算某条数据与上一条数据的生成时间差
41 2
|
2月前
|
关系型数据库 分布式数据库 PolarDB
PolarDB Ganos的实时时空计算
PolarDB是阿里云自主研发的云原生关系型数据库,提供极致弹性、高性能、海量存储及安全可靠的数据库服务。PolarDB PostgreSQL版100%兼容PostgreSQL和Oracle语法,集成Ganos——新一代云原生时空数据库引擎,具备几何、栅格、轨迹等十大核心引擎能力,支持物理世界时空多模数据的混合存储与分析。本文介绍的Ganos实时电子围栏计算基于PolarDB PostgreSQL版,适用于交通物流、禁飞区管理、营销等多种场景,通过Flink实时计算实现高效的空间数据处理。
31 1
|
2月前
|
关系型数据库 分布式数据库 PolarDB
基于PolarDB Ganos的实时时空计算:电子围栏篇
PolarDB是阿里云自主研发的云原生关系型数据库,提供极致弹性、高性能、海量存储及高安全性的数据库服务。PolarDB PostgreSQL版100%兼容PostgreSQL与Oracle语法,集成Ganos云原生时空数据库引擎,支持几何、栅格、轨迹等多种核心功能,实现物理世界时空数据的混合存储与分析。本文介绍的Ganos实时电子围栏计算依托PolarDB PostgreSQL版,展示了其在交通物流、安防、营销等多个领域的应用场景和技术实现细节,包括数据源配置、空间计算函数注册、电子围栏表生成及计算结果存储等步骤。通过Flink实时计算框架,Ganos实现了高效、实时的电子围栏运算
72 0
|
4月前
|
关系型数据库 分布式数据库 数据库
PolarDB产品使用问题之将RDS切换到PolarDB-X 2.0时,代码层的SQL该如何改动
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
4月前
|
关系型数据库 MySQL 数据库
MySQL 保姆级教程(八):创建计算字段
MySQL 保姆级教程(八):创建计算字段
|
4月前
|
存储 分布式计算 运维
EMR Serverless Spark服务最佳实践测评
EMR Serverless Spark服务最佳实践测评
134 2
|
4月前
|
关系型数据库 分布式数据库 数据库
基于PolarDB Ganos的实时时空计算:电子围栏篇
文章着重介绍了PolarDB Ganos如何应用于实现实时电子围栏计算。这是一种依赖于位置技术来创建虚拟地理边界的解决方案,广泛应用于交通安全、应急管理、营销推广等多个领域。通过与阿里云实时计算Flink版产品的集成,PolarDB Ganos能够高效地进行空间计算和数据分析,显著提高了地理围栏应用的实时性和准确性。文章还提供了使用Ganos进行电子围栏计算的实际步骤、性能测试及优化建议,并强调了PolarDB Ganos在提高数据处理效率和降低成本方面的优势。
|
4月前
|
存储 关系型数据库 分布式数据库
PolarDB,阿里云的云原生分布式数据库,以其存储计算分离架构为核心,解决传统数据库的扩展性问题
【7月更文挑战第3天】PolarDB,阿里云的云原生分布式数据库,以其存储计算分离架构为核心,解决传统数据库的扩展性问题。此架构让存储层专注数据可靠性,计算层专注处理SQL,提升性能并降低运维复杂度。通过RDMA加速通信,多副本确保高可用性。资源可独立扩展,便于成本控制。动态添加计算节点以应对流量高峰,展示了其灵活性。PolarDB的开源促进了数据库技术的持续创新和发展。
293 2
|
5月前
|
SQL 关系型数据库 数据管理
数据管理DMS产品使用合集之归档数据至其它MySQL数据库时,如何指定目的库
阿里云数据管理DMS提供了全面的数据管理、数据库运维、数据安全、数据迁移与同步等功能,助力企业高效、安全地进行数据库管理和运维工作。以下是DMS产品使用合集的详细介绍。
67 1

相关产品

  • 云数据库 RDS MySQL 版
  • 云原生数据库 PolarDB
  • 下一篇
    无影云桌面