1. 背景
Lindorm兼容Phoenix提供的是Phoenix 5.x轻客户端,在Spark官网上对接Phoenix的例子大多是Phoenix 4.x重客户端,因此本文给出Spark对接Phoenix 5.x轻客户端的例子,方便大家参考。
2. Spark对接Phoenix 5.x轻客户端
2.1 从Spark官网下载Spark安装包
从Spark官网下载Spark安装包,版本自行选择,本文以Spark-2.4.3版本为例。下载后解压,假定目录为spark-2.4.3
2.2 从阿里云仓库下载Phoenix5.x轻客户端
从阿里云仓库下载Phoenix5.x轻客户端ali-phoenix-shaded-thin-client-5.2.5-HBase-2.x.jar, 放置于spark-2.4.3下的jars目录下。
2.3 生成log4j.properties
cd spark-2.4.3/conf
cp log4j.properties.template log4j.properties
2.3 启动spark-shell
./bin/spark-shell
2.4 粘贴运行代码
2.4.1 Phoenix Statement方式访问
- 在spark-shell上输入:paste可以输入多行文本
:paste
- 修改下面代码中的url, user, password为自己的实例集群信息,然后全部粘贴于spark-shell中
import java.sql.{DriverManager, SQLException}
import java.util.Properties
val driver = "org.apache.phoenix.queryserver.client.Driver"
val url= "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF"
val info = new Properties()
info.put("user", "xxxx") //表示用户名是root
info.put("password", "xxxx") //表示密码是hadoop
try {
Class.forName(driver)
} catch {
case e: ClassNotFoundException => e.printStackTrace
}
val conn = DriverManager.getConnection(url, info)
val stmt = conn.createStatement
try {
stmt.execute("drop table if exists test")
stmt.execute("create table test(c1 integer primary key, c2 integer)")
stmt.execute("upsert into test(c1,c2) values(1,1)")
stmt.execute("upsert into test(c1,c2) values(2,2)")
val rs = stmt.executeQuery("select * from test limit 1000")
while (rs.next()) {
println(rs.getString(1) + " | " +
rs.getString(2) )
}
stmt.execute("drop table if exists test")
} catch {
case e: SQLException => e.printStackTrace()
} finally {
if (null != stmt) {
stmt.close()
}
if (null != conn) {
conn.close()
}
}
- 输入Ctrl+D 结束文本输入,即可看到运行结果, 会显示类似如下信息:
// Exiting paste mode, now interpreting.
1 | 1
2 | 2
2.4.2 DataFrame方式访问
DataFrame方式只能进行读写,建表操作和删表操作需要使用Phoenix Statement方式。
2.4.2.1 DataFrame方式读
输入:paste粘贴以下文本,然后输入Ctrl+D后开始运行。记得修改url,user,password信息。
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "driver" -> "org.apache.phoenix.queryserver.client.Driver", "dbtable" -> "TEST","fetchsize" -> "10000", "user" -> "xxxx", "password" -> "xxxx")).load()
jdbcDF.show()
2.4.2.1 DataFrame方式写
输入:paste粘贴以下文本,然后输入Ctrl+D后开始运行。记得修改url,user,password信息。
import java.util.Properties
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{IntegerType,StructField, StructType}
val sqlContext = new SQLContext(sc)
val testRDD = sc.parallelize(Array("3 3","4 4")).map(_.split(" "))
//创建schema
val schema = StructType(List(StructField("c1", IntegerType, true),StructField("c2", IntegerType, true)))
//创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = testRDD.map(p => Row(p(0).toInt,p(1).toInt))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val testDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//下面创建一个prop变量用来保存JDBC连接参数
val prop = new Properties()
prop.put("user", "xxxx") //表示用户名是root
prop.put("password", "xxxx") //表示密码是hadoop
prop.put("driver","org.apache.phoenix.queryserver.client.Driver")
//下面就可以连接数据库,采用append模式,表示追加记录到数据库spark的student表中
testDataFrame.write.mode("append").jdbc("jdbc:phoenix:thin:url=http://ld-xxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "test", prop)
3. Maven工程示例
下面以一个maven工程为例介绍spark对接phoenix轻客户端的一些基本操作
3.1 建立maven工程
建立名叫demo的maven工程,pom文件内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spark.version>2.4.3</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-queryserver-client</artifactId>
<version>5.2.1-HBase-2.x</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
3.2 scala文件示例
PhoenixTest1.scala, 记得修改url,user,password信息。
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import java.sql.{DriverManager, SQLException}
import java.util.Properties
object PhoenixTest1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("spark on phoenix")
val sparkSession = SparkSession
.builder()
.config(conf)
.getOrCreate()
val sc = sparkSession.sparkContext
print("======= start ==========")
val driver = "org.apache.phoenix.queryserver.client.Driver"
val url= "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF"
val info = new Properties()
info.put("user", "xxxx")
info.put("password", "xxxx")
try {
Class.forName(driver)
} catch {
case e: ClassNotFoundException => e.printStackTrace
}
//statement操作方式, 可以做所有phoenix ddl和dml操作
val conn = DriverManager.getConnection(url, info)
val stmt = conn.createStatement
try {
stmt.execute("drop table if exists test")
stmt.execute("create table test(c1 integer primary key, c2 integer)")
stmt.execute("upsert into test(c1,c2) values(1,1)")
stmt.execute("upsert into test(c1,c2) values(2,2)")
val rs = stmt.executeQuery("select * from test limit 1000")
while (rs.next()) {
println(rs.getString(1) + " | " +
rs.getString(2) )
}
} catch {
case e: SQLException => e.printStackTrace()
} finally {
if (null != stmt) {
stmt.close()
}
if (null != conn) {
conn.close()
}
}
//DataFrame写入
//生成记录
val sqlContext = new SQLContext(sc)
val testRDD = sc.parallelize(Array("3 3","4 4")).map(_.split(" "))
val schema = StructType(List(StructField("c1", IntegerType, true),StructField("c2", IntegerType, true)))
val rowRDD = testRDD.map(p => Row(p(0).toInt,p(1).toInt))
val testDataFrame = sqlContext.createDataFrame(rowRDD, schema)
testDataFrame.show()
// 写入记录
testDataFrame
.write
.mode("append")
.jdbc("jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "test", info)
//DataFrame读取, option的两种写法
val df1 = sqlContext
.read.
format("jdbc")
.options(Map(
"url" -> url,
"driver" -> driver,
"dbtable" -> "TEST",
"fetchsize" -> "10000",
"user" -> "root",
"password" -> "root"))
.load()
df1.show()
val jdbcDF2 = sqlContext.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("dbtable", "test")
.option("fetchsize", "10000")
.option("user", "xxxx")
.option("password", "xxxx")
.load()
jdbcDF2.show()
// 将SQL表写入parquet文件
df1.select("*").write.format("parquet").save("file:///Volumes/wukong/data/work/spark/data/test.parquet")
// 从parquet文件中加载SQL表
val df2 = sqlContext.read.load("file:///Volumes/wukong/data/work/spark/data/test.parquet")
df2.show()
}
}
3.3 java文件示例
StatementTest.java, 记得修改url,user,password信息。
import org.apache.phoenix.queryserver.client.Driver;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;
public class StatementTest {
public static void main(String[] args) {
Connection pconn = null;
Statement stmt = null;
try {
Class.forName(Driver.class.getName());
String url = "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF";
Properties props = new Properties();
props.put("user", "xxxx");
props.put("password", "xxxx");
pconn = DriverManager.getConnection(url, props);
pconn.setAutoCommit(true);
stmt = pconn.createStatement();
stmt.execute("drop table if exists test");
stmt.execute("create table test(c1 integer primary key, c2 integer)");
stmt.execute("upsert into test(c1,c2) values(1,1)");
stmt.execute("upsert into test(c1,c2) values(2,2)");
ResultSet rs = stmt.executeQuery("select * from test limit 1000");
while (rs.next()) {
System.out.println(rs.getString(1) + " | " +
rs.getString(2));
}
stmt.execute("drop table if exists test");
} catch (Throwable e) {
e.printStackTrace();
} finally {
try {
if (pconn != null) {
pconn.close();
}
} catch (Throwable e) {
e.printStackTrace();
}
}
}
}
3.4 打包
mvn clean package -DskipTests
target下生成demo-1.0-SNAPSHOT.jar
3.5 提交到本地运行
./bin/spark-submit --master local --verbose --class PhoenixTest1 /Volumes/wukong/data/work/spark/demo/target/demo-1.0-SNAPSHOT.jar