开发者社区> 沐远> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

RDS&POLARDB归档到X-Pack Spark计算最佳实践

简介: 业务背景 对于RDS&POLARDB FOR MYSQL 有些用户场景会遇到,当一张的数据达到几千万时,你查询一次所花的时间会变多。这时候会采取水平分表的策略,水平拆分是将同一个表的数据进行分块保存到不同的数据库中,这些数据库中的表结构完全相同。
+关注继续查看

业务背景

对于RDS&POLARDB FOR MYSQL 有些用户场景会遇到,当一张的数据达到几千万时,你查询一次所花的时间会变多。
这时候会采取水平分表的策略,水平拆分是将同一个表的数据进行分块保存到不同的数据库中,这些数据库中的表结构完全相同。
本文主要介绍如何把这些水平分表的表归档到X-Pack Spark数仓,做统一的大数据计算。X-Pack Spark服务通过外部计算资源的方式,为Redis、Cassandra、MongoDB、HBase、RDS存储服务提供复杂分析、流式处理及入库、机器学习的能力,从而更好的解决用户数据处理相关场景问题。具体产品见
1571127702954_116c2051_c208_4068_b464_8b2549b8d1c9

RDS& POLARDB分表归档到X-Pack Spark步骤

一键关联POLARDB到Spark集群

一键关联主要是做好spark访问RDS& POLARDB的准备工作。
1571125315921_1718773a_0a16_419d_ae58_5389ba8cf04f

POLARDB表存储

在database ‘test1’中每5分钟生成一张表,这里假设为表 'test1'、'test2'、'test2'、... 。
1571124191915_23bd7c1f_c650_4628_9668_b373d7ce4017

具体的建表语句如下:

    CREATE TABLE `test1` ( `a` int(11) NOT NULL,
                        `b` time DEFAULT NULL,          
               `c` double DEFAULT NULL,
                         PRIMARY KEY (`a`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8

归档到Spark的调试

x-pack spark提供交互式查询模式支持直接在控制台提交sql、python脚本、scala code来调试,帮助文档
1、首先创建一个交互式查询的session,在其中添加mysql-connector的jar包,可参考

wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/mysql-connector-java-5.1.34.jar

1571125626242_7564438f_29cd_47f5_aced_2536937354e4

2、创建交互式查询
以pyspark为例,下面是具体归档demo的代码:

spark.sql("drop table sparktest").show()
# 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致
spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) "
      "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()

#本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区
# CREATE TABLE `test1` (
#     `a` int(11) NOT NULL,
#                     `b` time DEFAULT NULL,
#                                      `c` double DEFAULT NULL,
#                                                         PRIMARY KEY (`a`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8
for num in range(1, 4): 
    #构造polardb的表名
    dbtable = "test1." + "test" + str(num)
    #spark外表关联polardb对应的表
    externalPolarDBTableNow = spark.read \
        .format("jdbc") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", "jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306") \
        .option("dbtable", dbtable) \
        .option("user", "name") \
        .option("password", "xxx*") \
        .load().registerTempTable("polardbTableTemp")
    #生成本次polardb表数据要写入的spark表的分区信息
    (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num))
    #执行导数据sql 
    spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s )  "
          "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show()
    #删除临时的spark映射polardb表的catalog
    spark.catalog.dropTempView("polardbTableTemp")
    #查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除
    spark.sql("show partitions sparktest").show(1000, False)
    spark.sql("select count(*) from sparktest").show()

1571124379955_e8869ced_593f_4964_aa87_739680ae1e13

归档作业上生产

交互式查询定位为临时查询及调试,生产的作业还是建议使用spark作业的方式运行,使用文档参考。这里以
pyspark作业为例:
1571125659012_f2645bbe_517d_4322_b149_e111649e2302
/polardb/polardbArchiving.py 内容如下:

# -*- coding: UTF-8 -*-

from __future__ import print_function

import sys
from operator import add

from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("PolardbArchiving") \
        .enableHiveSupport() \
        .getOrCreate()

    spark.sql("drop table sparktest").show()
    # 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致
    spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) "
          "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()

    #本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区
    # CREATE TABLE `test1` (
    #     `a` int(11) NOT NULL,
    #      `b` time DEFAULT NULL,
    #      `c` double DEFAULT NULL,
    #       PRIMARY KEY (`a`)
    # ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    for num in range(1, 4):
        #构造polardb的表名
        dbtable = "test1." + "test" + str(num)
        #spark外表关联polardb对应的表
        externalPolarDBTableNow = spark.read \
            .format("jdbc") \
            .option("driver", "com.mysql.jdbc.Driver") \
            .option("url", "jdbc:mysql://pc-.mysql.polardb.rds.aliyuncs.com:3306") \
            .option("dbtable", dbtable) \
            .option("user", "ma,e") \
            .option("password", "xxx*") \
            .load().registerTempTable("polardbTableTemp")
        #生成本次polardb表数据要写入的spark表的分区信息
        (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num))
        #执行导数据sql
        spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s )  "
              "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show()
        #删除临时的spark映射polardb表的catalog
        spark.catalog.dropTempView("polardbTableTemp")
        #查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除
        spark.sql("show partitions sparktest").show(1000, False)
        spark.sql("select count(*) from sparktest").show()
    spark.stop()

阿里云NoSQL数据库其他动态

阿里云Cassandra数据库正式公测,提供免费试用:https://www.aliyun.com/product/cds

xxx

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
RDS数据库与自建库的gtid主从同步
一、在centos7上部署MySQL数据库1、从MYSQL官网下载安装文件wget http://dev.mysql.com/get/mysql-community-release-el7-5.noarch.
1805 0
RDS MySQL 5.7三节点企业版重磅发布 企业级业务云上数据库首选
随着云计算技术的逐渐普及,使用云服务的客户行业、场景的边界也在不断地被拓宽,不断提出新的需求。
0 0
你知道数据库RDS手动续费的方法有几种么?
包年包月实例有到期时间,如果到期未续费,会导致业务中断甚至数据丢失,建议您及时手动续费。
265 0
【云栖号案例 | 物联网&人工智能】RDS为慧联无限数据库运维减负
业务持续增长没有专业运维人员,导致 MySQL 不堪重负。上云后RDS数据库实现了纵向弹性扩缩容,提供自助服务能力,短期内不需要专业运维人员,控制了成本。
1484 0
【云栖号案例 | 新零售】数据库RDS为跨境电商管理平台支撑亿级流水
智赢科技每天面对用户修改价格库存对更新即时性要求高,索引和表结构变更不易。RDS支持数组和分区,降低中间表的数量又可以自动分区,加快开发速度。
1637 0
ecs与数据库(rds,redis,mongodb,memcached)连通性判断流程图
由于文档https://yq.aliyun.com/articles/164796内容较多,不方便查看,可以参考流程图来初步判断连通性是否正常
0 0
【阿里云新品发布·周刊】第11期:云数据库 MySQL 8.0 重磅发布,更适合企业使用场景的RDS数据库
云数据库MySQL 8.0 升级发布会2019年5月29日15时,阿里云云数据库 MySQL 8.0 重磅发布,2倍以上性能提升,SQL窗口函数、JSON扩展语法等企业级新功能震撼上市!主要从技术层面介绍MySQL 8.0的优势和与过去版本对比。
432 0
小微企业阿里云最佳实践系列(一):ECS 服务器与 RDS 数据库
本博文主要写给创业团队、技术团队人数 < 5 人、没有专业运维等小微企业作为参考,需要掌握基础的服务器管理、软件开发等经验。 博文主要内容 本博文主要使用传统服务器架构与云服务架构进行横向对比,解决企业在搭建软件系统中所遇到等问题和痛点,以及为小微企业降低成本的同时尽可能提高软件系统的高可通、低延迟、高规范、低人力投入。
1119 0
+关注
文章
问答
来源圈子
更多
相关文档: 云数据库Cassandra
文章排行榜
最热
最新
相关电子书
更多
POLARDB云数据库分布式存储引擎揭秘
立即下载
阿里云分析引擎Spark On 多数据源介绍
立即下载
PolarDB for PostgreSQL三节点功能介绍
立即下载