Spark on Phoenix 4.x Connector:如何在Spark侧设置Phoenix参数

简介: 前言 X-Pack Spark可以使用Spark on Phoenix 4.x Connector直接对接Phoenix数据库,读取Phoenix数据表数据。有时在读取Phoenix时需要设置Phoenix的一些参数,例如Phoenix为了保障数据库的稳定性,默认开了索引包含,即查询Phoebe表必须要带上索引或者主键字段作为过滤条件。

前言

X-Pack Spark可以使用Spark on Phoenix 4.x Connector直接对接Phoenix数据库,读取Phoenix数据表数据。有时在读取Phoenix时需要设置Phoenix的一些参数,例如Phoenix为了保障数据库的稳定性,默认开了索引包含,即查询Phoebe表必须要带上索引或者主键字段作为过滤条件。此时Spark作为查询Phoenix数据库的客户端需要有传递参数的能力。本文就列举了Spark侧传递Phoenix参数的方法。
注意:本文的案例以X-Pack Spark和HBase SQL(Phoenix) 4.x作为背景。

案例描述

在Spark侧设置Phoenix的参数常见的有如下:

  1. phoenix.force.index,查询Phoenix的SQL语句中的过滤字段是否必须创建索引。本文以这个参数为例,具体使用方法见详细步骤。
  2. phoenix.no.index,是否不走索引。默认值为false,即查询语句会扫描索引表,如果过滤字段在索引表中;如果设置为true,则查询语句不会扫描索引表,即使过滤字段在索引表中。

详细步骤

提前在Phoenix中创建一张表,表的创建命令如下:

#创建语句
CREATE TABLE IF NOT EXISTS us_population (
   state CHAR(2) NOT NULL,
   city VARCHAR NOT NULL,
   population BIGINT
   CONSTRAINT my_pk PRIMARY KEY (state, city));
#插入数据语句  
UPSERT INTO us_population VALUES('NY','New York',8143197);
UPSERT INTO us_population VALUES('CA','Los Angeles',3844829);
UPSERT INTO us_population VALUES('IL','Chicago',2842518);
UPSERT INTO us_population VALUES('TX','Houston',2016582);
UPSERT INTO us_population VALUES('PA','Philadelphia',1463281);
UPSERT INTO us_population VALUES('AZ','Phoenix',1461575);
UPSERT INTO us_population VALUES('TX','San Antonio',1256509);
UPSERT INTO us_population VALUES('CA','San Diego',1255540);
UPSERT INTO us_population VALUES('TX','Dallas',1213825);
UPSERT INTO us_population VALUES('CA','San Jose',912332);

Phoenix表创建完毕后在Phoenix客户端运行如下查询SQL

select * from us_population where population = 912332

运行上面的SQL会报错,大致内容如下:

org.apache.phoenix.optimize.ForceIndexException: Default enable force index, please set phoenix.force.index=false to disable. The filters must be contains one index column at least.

意思是过滤条件必须包含至少一个索引字段,可以通过设置phoenix.force.index=false来关闭这个限制。
在Spark侧通过Spark on Phoenix 4.x Connectors读取Phoenix数据表也会有这个限制。下面介绍下在Spark侧如何设置Phoenix的参数。
1、通过Spark ThriftServer 执行SQL语句。
通过SQL语句的方式首先要在Spark侧创建一个Phoenix表的映射。建表语句如下:

CREATE TABLE spark_on_phoenix01 USING org.apache.phoenix.spark
OPTIONS (
'zkUrl' '${ZK链接地址}',
'table' 'us_population'
);

创建完毕后直接在Spark运行查询语句“select * from spark_on_phoenix01 where population = 912332”也会报相同的错误,此时可以通过set phoenix.force.index=false方法设置,如下:

select * from spark_on_phoenix01 where population=912332;
Error: java.lang.RuntimeException: org.apache.phoenix.optimize.ForceIndexException: ERROR 599 (42913): Default enable force index, please set phoenix.force.index=false to disable. The filters must be contains one index column at least. tableName=US_POPULATION (state=,code=0)
0: jdbc:hive2://ap-wz92zrkxow69379w8-master2-> set phoenix.force.index=false;
+----------------------+--------+--+
|         key          | value  |
+----------------------+--------+--+
| phoenix.force.index  | false  |
+----------------------+--------+--+
1 row selected (0.021 seconds)
0: jdbc:hive2://ap-wz92zrkxow69379w8-master2-> select * from spark_on_phoenix01 where population=912332;
+--------+-----------+-------------+--+
| STATE  |   CITY    | POPULATION  |
+--------+-----------+-------------+--+
| CA     | San Jose  | 912332      |
+--------+-----------+-------------+--+

注意:上述设置方法只在当前session有效。

2、通过写代码SparkSession调用SQL,代码如下:

//创建SparkSession
val sparkSession = SparkSession
      .builder()
      .enableHiveSupport()
      .appName("spark on phoenix4x")
      .getOrCreate()
//方法1:通过在sql中执行set phoenix.force.index=false 设置Phoenix参数
sparkSession.sql("set phoenix.force.index=false")
querySql = "select * from " + sparkTableName + " where population = 912332"
var result = sparkSession.sql(querySql)
result.show()

//方法2:通过在sparkSession的配置中设置 phoenix.force.index=false 设置Phoenix参数
sparkSession.sqlContext.setConf("phoenix.force.index", "false")
querySql = "select * from " + sparkTableName + " where population = 912332"
var result = sparkSession.sql(querySql)
result.show()

3、通过SparkSession.read获取Phoenix表数据,设置参数方法如下:

//初始化sparkSession时加入设置
val sparkSession = SparkSession
    .builder()
    .config("phoenix.force.index", false)
    .enableHiveSupport()
    .appName("spark on phoenix4x")
    .getOrCreate()

sparkSession
    .read
    .format("org.apache.phoenix.spark")
    .option("table", phoenixTableName)
    .option("zkUrl", zkAddress)
    .load()
    .show()

4、通过X-Pack Spark 控制台--conf传递参数。
通过控制传递参数的方法如下:

--conf spark.hadoop.phoenix.force.index=false

如下图:
image
注意:因为Spark获取--conf传递参数的机制会过滤“spark.hadoop”开头的参数,所以通过--conf设置的参数需要在参数前面加上“spark.hadoop.”,即完成的参数为spark.hadoop.phoenix.force.index=false。

小结

Spark 对接Phoenix 4.x 方法可以参考:Spark对接Phoenix4.x快速入门
X-Pack Spark详细介绍可参考:Spark 基本介绍
Phoenix 介绍可参考:HBase SQL(Phoenix) 4.x 使用说明

相关文章
|
分布式计算 Hadoop Spark
【Spark】【设置】关闭INFO提示
【Spark】【设置】关闭INFO提示
334 0
【Spark】【设置】关闭INFO提示
|
3月前
|
分布式计算 DataWorks Java
DataWorks产品使用合集之如何引用在spark jar中引用密文的空间参数
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
6月前
|
分布式计算 Scala Spark
Spark参数解析之MasterArguments
Spark参数解析之MasterArguments
40 0
|
分布式计算 Spark
Spark重要参数详解
Spark重要参数详解
97 0
|
6月前
|
分布式计算 Kubernetes Java
spark on k8s native
spark on k8s native
|
分布式计算 Kubernetes Serverless
Hago 的 Spark on ACK 实践
Hago 的 Spark on ACK 实践
|
资源调度 分布式计算 Hadoop
大数据平台搭建(容器环境)——Spark3.X on Yarn安装配置
大数据平台搭建(容器环境)——Spark3.X on Yarn安装配置
大数据平台搭建(容器环境)——Spark3.X on Yarn安装配置
|
分布式计算 资源调度 Hadoop
Spark on Yarn集群模式搭建及测试
Spark on Yarn集群模式搭建及测试
328 0
|
资源调度 分布式计算 大数据
大数据Spark on YARN
大数据Spark on YARN
129 0
|
存储 分布式计算 Cloud Native
[实战系列]SelectDB Cloud Spark Connector 最佳实践
Spark SelectDB Connector 以 Spark 这个大数据计算的优秀组件作为核心,实现了利用 Spark 将外部数据源的大数据量同步到 SelectDB Cloud,便于我们实现大批量数据的快速同步,继而利用 SelectDB Cloud 为基石构建新一代的云原生数据仓库,结合 SelectDB Cloud 强大的分析计算性能,能够为企业带来业务便捷性以及增效将本的目标。
152 0