RDS&POLARDB归档到X-Pack Spark计算最佳实践

简介: 业务背景 对于RDS&POLARDB FOR MYSQL 有些用户场景会遇到,当一张的数据达到几千万时,你查询一次所花的时间会变多。这时候会采取水平分表的策略,水平拆分是将同一个表的数据进行分块保存到不同的数据库中,这些数据库中的表结构完全相同。

业务背景

对于RDS&POLARDB FOR MYSQL 有些用户场景会遇到,当一张的数据达到几千万时,你查询一次所花的时间会变多。
这时候会采取水平分表的策略,水平拆分是将同一个表的数据进行分块保存到不同的数据库中,这些数据库中的表结构完全相同。
本文主要介绍如何把这些水平分表的表归档到X-Pack Spark数仓,做统一的大数据计算。X-Pack Spark服务通过外部计算资源的方式,为Redis、Cassandra、MongoDB、HBase、RDS存储服务提供复杂分析、流式处理及入库、机器学习的能力,从而更好的解决用户数据处理相关场景问题。具体产品见
1571127702954_116c2051_c208_4068_b464_8b2549b8d1c9

RDS& POLARDB分表归档到X-Pack Spark步骤

一键关联POLARDB到Spark集群

一键关联主要是做好spark访问RDS& POLARDB的准备工作。
1571125315921_1718773a_0a16_419d_ae58_5389ba8cf04f

POLARDB表存储

在database ‘test1’中每5分钟生成一张表,这里假设为表 'test1'、'test2'、'test2'、... 。
1571124191915_23bd7c1f_c650_4628_9668_b373d7ce4017

具体的建表语句如下:

    CREATE TABLE `test1` ( `a` int(11) NOT NULL,
                        `b` time DEFAULT NULL,          
               `c` double DEFAULT NULL,
                         PRIMARY KEY (`a`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8

归档到Spark的调试

x-pack spark提供交互式查询模式支持直接在控制台提交sql、python脚本、scala code来调试,帮助文档
1、首先创建一个交互式查询的session,在其中添加mysql-connector的jar包,可参考

wget https://spark-home.oss-cn-shanghai.aliyuncs.com/spark_connectors/mysql-connector-java-5.1.34.jar

1571125626242_7564438f_29cd_47f5_aced_2536937354e4

2、创建交互式查询
以pyspark为例,下面是具体归档demo的代码:

spark.sql("drop table sparktest").show()
# 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致
spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) "
      "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()

#本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区
# CREATE TABLE `test1` (
#     `a` int(11) NOT NULL,
#                     `b` time DEFAULT NULL,
#                                      `c` double DEFAULT NULL,
#                                                         PRIMARY KEY (`a`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8
for num in range(1, 4): 
    #构造polardb的表名
    dbtable = "test1." + "test" + str(num)
    #spark外表关联polardb对应的表
    externalPolarDBTableNow = spark.read \
        .format("jdbc") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", "jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306") \
        .option("dbtable", dbtable) \
        .option("user", "name") \
        .option("password", "xxx*") \
        .load().registerTempTable("polardbTableTemp")
    #生成本次polardb表数据要写入的spark表的分区信息
    (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num))
    #执行导数据sql 
    spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s )  "
          "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show()
    #删除临时的spark映射polardb表的catalog
    spark.catalog.dropTempView("polardbTableTemp")
    #查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除
    spark.sql("show partitions sparktest").show(1000, False)
    spark.sql("select count(*) from sparktest").show()

1571124379955_e8869ced_593f_4964_aa87_739680ae1e13

归档作业上生产

交互式查询定位为临时查询及调试,生产的作业还是建议使用spark作业的方式运行,使用文档参考。这里以
pyspark作业为例:
1571125659012_f2645bbe_517d_4322_b149_e111649e2302
/polardb/polardbArchiving.py 内容如下:

# -*- coding: UTF-8 -*-

from __future__ import print_function

import sys
from operator import add

from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("PolardbArchiving") \
        .enableHiveSupport() \
        .getOrCreate()

    spark.sql("drop table sparktest").show()
    # 创建一张spark表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的5分钟的一张polardb表达的数据。字段和polardb里面的类型一致
    spark.sql("CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string) "
          "USING parquet PARTITIONED BY (dt ,hh ,mm )").show()

    #本例子在polardb里面创建了databse test1,具有三张表test1 ,test2,test3,这里遍历这三张表,每个表存储spark的一个5min的分区
    # CREATE TABLE `test1` (
    #     `a` int(11) NOT NULL,
    #      `b` time DEFAULT NULL,
    #      `c` double DEFAULT NULL,
    #       PRIMARY KEY (`a`)
    # ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    for num in range(1, 4):
        #构造polardb的表名
        dbtable = "test1." + "test" + str(num)
        #spark外表关联polardb对应的表
        externalPolarDBTableNow = spark.read \
            .format("jdbc") \
            .option("driver", "com.mysql.jdbc.Driver") \
            .option("url", "jdbc:mysql://pc-.mysql.polardb.rds.aliyuncs.com:3306") \
            .option("dbtable", dbtable) \
            .option("user", "ma,e") \
            .option("password", "xxx*") \
            .load().registerTempTable("polardbTableTemp")
        #生成本次polardb表数据要写入的spark表的分区信息
        (dtValue, hhValue, mmValue) = ("20191015", "13", str(05 * num))
        #执行导数据sql
        spark.sql("insert into sparktest partition(dt= %s ,hh= %s , mm=%s )  "
              "select * from polardbTableTemp " % (dtValue, hhValue, mmValue)).show()
        #删除临时的spark映射polardb表的catalog
        spark.catalog.dropTempView("polardbTableTemp")
        #查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除
        spark.sql("show partitions sparktest").show(1000, False)
        spark.sql("select count(*) from sparktest").show()
    spark.stop()

阿里云NoSQL数据库其他动态

阿里云Cassandra数据库正式公测,提供免费试用:https://www.aliyun.com/product/cds

xxx

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
目录
相关文章
|
4月前
|
SQL 人工智能 关系型数据库
AI Agent的未来之争:任务规划,该由人主导还是AI自主?——阿里云RDS AI助手的最佳实践
AI Agent的规划能力需权衡自主与人工。阿里云RDS AI助手实践表明:开放场景可由大模型自主规划,高频垂直场景则宜采用人工SOP驱动,结合案例库与混合架构,实现稳定、可解释的企业级应用,推动AI从“能聊”走向“能用”。
1055 39
AI Agent的未来之争:任务规划,该由人主导还是AI自主?——阿里云RDS AI助手的最佳实践
|
10月前
|
人工智能 运维 关系型数据库
云服务API与MCP深度集成,RDS MCP最佳实践
近日,阿里云数据库RDS发布开源RDS MCP Server,将复杂的技术操作转化为自然语言交互,实现"对话即运维"的流畅体验。通过将RDS OpenAPI能力封装为MCP协议工具,用户只需像聊天一样描述需求,即可完成数据库实例创建、性能调优、故障排查等专业操作。本文介绍了RDS MCP(Model Context Protocol)的最佳实践及其应用,0代码,两步即可轻松完成RDS实例选型与创建,快来体验!
云服务API与MCP深度集成,RDS MCP最佳实践
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
172 3
|
NoSQL 关系型数据库 MySQL
阿里云PolarDB游戏场景最佳实践
阿里云PolarDB游戏场景最佳实践涵盖了数据库体系演进、行业优化、Redis解决方案、性能优化、备份还原及全球部署等内容。PolarDB通过共享存储、物理复制等技术提升读扩展和大容量支持,针对游戏行业的高IO需求进行优化,提供秒级备份与快速恢复能力。同时,PolarDB for Redis实现了一写多读架构,支持百TB级别的高性能存储,具备成本优势。该方案已在米哈游等大型游戏中广泛应用,确保了高并发下的稳定性和数据一致性,满足游戏行业的特殊需求。
626 36
|
人工智能 关系型数据库 分布式数据库
PolarDB-PG AI最佳实践3 :PolarDB AI多模态相似性搜索最佳实践
本文介绍了如何利用PolarDB结合多模态大模型(如CLIP)实现数据库内的多模态数据分析和查询。通过POLAR_AI插件,可以直接在数据库中调用AI模型服务,无需移动数据或额外的工具,简化了多模态数据的处理流程。具体应用场景包括图像识别与分类、图像到文本检索和基于文本的图像检索。文章详细说明了技术实现、配置建议、实战步骤及多模态检索示例,展示了如何在PolarDB中创建模型、生成embedding并进行相似性检索
|
SQL 人工智能 关系型数据库
PolarDB-PG AI最佳实践 2 :PolarDB AI X EAS实现自定义库内模型推理最佳实践
PolarDB通过POLAR_AI插件支持使用SQL调用AI/ML模型,无需专业AI知识或额外部署环境。结合阿里云EAS在线模型服务,可轻松部署自定义模型,在SQL中实现如文本翻译等功能。
|
SQL 人工智能 自然语言处理
PolarDB-PG AI最佳实践 1:基础能力实践
Polar_AI 是 PolarDB 数据库的 AI 扩展,集成了先进的人工智能模型和算法,使数据库能够执行机器学习和自然语言处理任务。它支持 PostgreSQL 及 Oracle 兼容版本,通过标准 SQL 轻松调用 AI 模型,具备简单易用、灵活可定制、无缝数据融合、数据安全和高性能等优势。用户可以通过 SQL 快速实现文本转向量、情感分类等功能,并能自定义扩展 AI 模型。
|
存储 监控 关系型数据库
MySQL计算某条数据与上一条数据的生成时间差
MySQL计算某条数据与上一条数据的生成时间差
350 2

推荐镜像

更多