Spark 访问示例
以下示例介绍如何使用 Spark 程序统计数据表 pet 的行数。
private static RangeRowQueryCriteria fetchCriteria() {
RangeRowQueryCriteria res = new RangeRowQueryCriteria("YourTableName");
res.setMaxVersions(1);
List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
lower.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MIN));
upper.add(new PrimaryKeyColumn("YourPkeyName", PrimaryKeyValue.INF_MAX));
res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
return res;
}
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
Configuration hadoopConf = new Configuration();
TableStoreInputFormat.setCredential(
hadoopConf,
new Credential("YourAccessKeyId", "YourAccessKeySecret"));
TableStoreInputFormat.setEndpoint(
hadoopConf,
new Endpoint("http://YourInstance.Region.ots.aliyuncs.com/"));
TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());
try {
JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
hadoopConf,
TableStoreInputFormat.class,
PrimaryKeyWritable.class,
RowWritable.class);
System.out.println(
new Formatter().format("TOTAL: %d", rdd.count()).toString());
} finally {
sc.close();
}
}
[backcolor=transparent]注意:如果使用 scala,只需把 JavaSparkContext 换成 SparkContext,JavaPairRDD 换成 PairRDD 即可。或者更简单,交给编译器自行做类型推断。
$ bin/spark-submit --master local --jars hadoop-connector.jar row-counter.jar
TOTAL: 9
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
这个示例展示了如何使用 Apache Spark 来统计阿里云表格存储(Table Store,原 OTS)中数据表的行数。下面是对代码段的关键部分进行解释:
初始化Spark配置与上下文:
SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
这里设置了Spark应用的名字为"RowCounter"并创建了JavaSparkContext
,它是Spark程序的主要入口点。
配置Hadoop环境以接入表格存储:
Configuration hadoopConf = new Configuration();
TableStoreInputFormat.setCredential(hadoopConf,
new Credential("YourAccessKeyId", "YourAccessKeySecret"));
TableStoreInputFormat.setEndpoint(hadoopConf,
new Endpoint("http://YourInstance.Region.ots.aliyuncs.com/"));
配置Hadoop环境时,需要设置访问表格存储所需的凭证(AccessKeyId和AccessKeySecret)以及OTS实例的Endpoint地址。
定义查询范围:
private static RangeRowQueryCriteria fetchCriteria() { ... }
通过fetchCriteria
方法定义了一个查询条件,用于从指定的表格("YourTableName")中获取所有记录。这里使用了最小值和最大值作为主键范围,意味着要检索整个表的数据。
读取数据并计数:
JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd =
sc.newAPIHadoopRDD(hadoopConf,
TableStoreInputFormat.class,
PrimaryKeyWritable.class,
RowWritable.class);
System.out.println(new Formatter().format("TOTAL: %d", rdd.count()).toString());
使用newAPIHadoopRDD
方法读取表格存储中的数据,并将其转换为Spark的RDD(弹性分布式数据集)。然后调用count
方法计算数据集中元素的数量,即表中的行数。
运行程序: 在命令行中,使用spark-submit
命令提交作业,指定了本地模式运行、依赖的hadoop-connector.jar以及主类或jar包路径。输出结果显示表中有9行数据。
注意:确保在运行此示例之前,已经正确安装了Spark环境,并且将阿里云表格存储的Hadoop Connector jar包(hadoop-connector.jar)添加到了Spark的类路径中。此外,替换示例中的"YourTableName"、"YourPkeyName"、"YourAccessKeyId"、"YourAccessKeySecret"以及Endpoint URL为实际的值。