开发者社区 问答 正文

Spark/Spark SQL 访问表格存储教程(Spark 访问示例)



Spark 访问示例


以下示例介绍如何使用 Spark 程序统计数据表 pet 的行数。 private static RangeRowQueryCriteria fetchCriteria() {
    RangeRowQueryCriteria res = new RangeRowQueryCriteria("YourTableName");
    res.setMaxVersions(1);
    List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
    List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
    lower.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MIN));
    upper.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MAX));
    res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
    res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
    return res;
}
public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    Configuration hadoopConf = new Configuration();
    TableStoreInputFormat.setCredential(
        hadoopConf,
        new Credential("YourAccessKeyId", "YourAccessKeySecret"));
    TableStoreInputFormat.setEndpoint(
        hadoopConf,
        new Endpoint("http://YourInstance.Region.ots.aliyuncs.com/"));
    TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());
    try {
        JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
            hadoopConf,
            TableStoreInputFormat.class,
            PrimaryKeyWritable.class,
            RowWritable.class);
        System.out.println(
            new Formatter().format("TOTAL: %d", rdd.count()).toString());
    } finally {
        sc.close();
    }
}


[backcolor=transparent]注意:如果使用 scala,只需把 JavaSparkContext 换成 SparkContext,JavaPairRDD 换成 PairRDD 即可。或者更简单,交给编译器自行做类型推断。


运行程序$ bin/spark-submit --master local --jars hadoop-connector.jar row-counter.jar
TOTAL: 9



展开
收起
云栖大讲堂 2017-10-25 10:41:05 2413 分享 版权
阿里云 AI 助理回答

这个示例展示了如何使用 Apache Spark 来统计阿里云表格存储(Table Store,原 OTS)中数据表的行数。下面是对代码段的关键部分进行解释:

  1. 初始化Spark配置与上下文:

    SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    

    这里设置了Spark应用的名字为"RowCounter"并创建了JavaSparkContext,它是Spark程序的主要入口点。

  2. 配置Hadoop环境以接入表格存储:

    Configuration hadoopConf = new Configuration();
    TableStoreInputFormat.setCredential(hadoopConf, 
       new Credential("YourAccessKeyId", "YourAccessKeySecret"));
    TableStoreInputFormat.setEndpoint(hadoopConf, 
       new Endpoint("http://YourInstance.Region.ots.aliyuncs.com/"));
    

    配置Hadoop环境时,需要设置访问表格存储所需的凭证(AccessKeyId和AccessKeySecret)以及OTS实例的Endpoint地址。

  3. 定义查询范围:

    private static RangeRowQueryCriteria fetchCriteria() { ... }
    

    通过fetchCriteria方法定义了一个查询条件,用于从指定的表格("YourTableName")中获取所有记录。这里使用了最小值和最大值作为主键范围,意味着要检索整个表的数据。

  4. 读取数据并计数:

    JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = 
       sc.newAPIHadoopRDD(hadoopConf, 
           TableStoreInputFormat.class, 
           PrimaryKeyWritable.class, 
           RowWritable.class);
    System.out.println(new Formatter().format("TOTAL: %d", rdd.count()).toString());
    

    使用newAPIHadoopRDD方法读取表格存储中的数据,并将其转换为Spark的RDD(弹性分布式数据集)。然后调用count方法计算数据集中元素的数量,即表中的行数。

  5. 运行程序: 在命令行中,使用spark-submit命令提交作业,指定了本地模式运行、依赖的hadoop-connector.jar以及主类或jar包路径。输出结果显示表中有9行数据。

注意:确保在运行此示例之前,已经正确安装了Spark环境,并且将阿里云表格存储的Hadoop Connector jar包(hadoop-connector.jar)添加到了Spark的类路径中。此外,替换示例中的"YourTableName"、"YourPkeyName"、"YourAccessKeyId"、"YourAccessKeySecret"以及Endpoint URL为实际的值。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答