Spark对接Lindorm Phoenix5.x轻客户端

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
云原生多模数据库 Lindorm,多引擎 多规格 0-4节点
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Lindorm兼容Phoenix提供的是Phoenix 5.x轻客户端,在Spark官网上对接Phoenix的例子大多是Phoenix 4.x重客户端,因此本文给出Spark对接Phoenix 5.x轻客户端的例子,方便大家参考。

1. 背景

Lindorm兼容Phoenix提供的是Phoenix 5.x轻客户端,在Spark官网上对接Phoenix的例子大多是Phoenix 4.x重客户端,因此本文给出Spark对接Phoenix 5.x轻客户端的例子,方便大家参考。

2. Spark对接Phoenix 5.x轻客户端

2.1 从Spark官网下载Spark安装包

Spark官网下载Spark安装包,版本自行选择,本文以Spark-2.4.3版本为例。下载后解压,假定目录为spark-2.4.3

2.2 从阿里云仓库下载Phoenix5.x轻客户端

阿里云仓库下载Phoenix5.x轻客户端ali-phoenix-shaded-thin-client-5.2.5-HBase-2.x.jar, 放置于spark-2.4.3下的jars目录下。

2.3 生成log4j.properties

cd spark-2.4.3/conf
cp log4j.properties.template log4j.properties

2.3 启动spark-shell

./bin/spark-shell 

2.4 粘贴运行代码

2.4.1 Phoenix Statement方式访问

  1. 在spark-shell上输入:paste可以输入多行文本
:paste
  1. 修改下面代码中的url, user, password为自己的实例集群信息,然后全部粘贴于spark-shell中
import java.sql.{DriverManager, SQLException}
import java.util.Properties
val driver = "org.apache.phoenix.queryserver.client.Driver"
    val url= "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF"
    val info = new Properties()
    info.put("user", "xxxx") //表示用户名是root
    info.put("password", "xxxx") //表示密码是hadoop
    try {
      Class.forName(driver)
    } catch {
      case e: ClassNotFoundException => e.printStackTrace
    }
    val conn = DriverManager.getConnection(url, info)
    val stmt = conn.createStatement
    try {
      stmt.execute("drop table if exists test")
      stmt.execute("create table test(c1 integer primary key, c2 integer)")
      stmt.execute("upsert into test(c1,c2) values(1,1)")
      stmt.execute("upsert into test(c1,c2) values(2,2)")
      val rs = stmt.executeQuery("select * from test limit 1000")
      while (rs.next()) {
        println(rs.getString(1) + " | " +
          rs.getString(2) )
      }
      stmt.execute("drop table if exists test")
    } catch {
      case e: SQLException => e.printStackTrace()
    } finally {
      if (null != stmt) {
        stmt.close()
      }
      if (null != conn) {
        conn.close()
      }
    }
  1. 输入Ctrl+D 结束文本输入,即可看到运行结果, 会显示类似如下信息:
// Exiting paste mode, now interpreting.

1 | 1
2 | 2

2.4.2 DataFrame方式访问

DataFrame方式只能进行读写,建表操作和删表操作需要使用Phoenix Statement方式。

2.4.2.1 DataFrame方式读

输入:paste粘贴以下文本,然后输入Ctrl+D后开始运行。记得修改url,user,password信息。

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)

val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "driver" -> "org.apache.phoenix.queryserver.client.Driver", "dbtable" -> "TEST","fetchsize" -> "10000", "user" -> "xxxx", "password" -> "xxxx")).load()
jdbcDF.show()

2.4.2.1 DataFrame方式写

输入:paste粘贴以下文本,然后输入Ctrl+D后开始运行。记得修改url,user,password信息。

import java.util.Properties
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{IntegerType,StructField, StructType}
val sqlContext = new SQLContext(sc)
val testRDD = sc.parallelize(Array("3 3","4 4")).map(_.split(" "))
//创建schema
val schema = StructType(List(StructField("c1", IntegerType, true),StructField("c2", IntegerType, true)))
//创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = testRDD.map(p => Row(p(0).toInt,p(1).toInt))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val testDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//下面创建一个prop变量用来保存JDBC连接参数
val prop = new Properties()
prop.put("user", "xxxx") //表示用户名是root
prop.put("password", "xxxx") //表示密码是hadoop
prop.put("driver","org.apache.phoenix.queryserver.client.Driver") 
//下面就可以连接数据库,采用append模式,表示追加记录到数据库spark的student表中
testDataFrame.write.mode("append").jdbc("jdbc:phoenix:thin:url=http://ld-xxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "test", prop)

3. Maven工程示例

下面以一个maven工程为例介绍spark对接phoenix轻客户端的一些基本操作

3.1 建立maven工程

建立名叫demo的maven工程,pom文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.4.3</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.phoenix</groupId>
            <artifactId>ali-phoenix-queryserver-client</artifactId>
            <version>5.2.1-HBase-2.x</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

3.2 scala文件示例

PhoenixTest1.scala, 记得修改url,user,password信息。

import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext, SparkSession}

import java.sql.{DriverManager, SQLException}
import java.util.Properties

object PhoenixTest1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("spark on phoenix")
    val sparkSession = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()
    val sc = sparkSession.sparkContext

    print("======= start ==========")
    val driver = "org.apache.phoenix.queryserver.client.Driver"
    val url= "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF"
    val info = new Properties()
    info.put("user", "xxxx") 
    info.put("password", "xxxx") 
    try {
      Class.forName(driver)
    } catch {
      case e: ClassNotFoundException => e.printStackTrace
    }

   //statement操作方式, 可以做所有phoenix ddl和dml操作
    val conn = DriverManager.getConnection(url, info)
    val stmt = conn.createStatement
    try {
      stmt.execute("drop table if exists test")
      stmt.execute("create table test(c1 integer primary key, c2 integer)")
      stmt.execute("upsert into test(c1,c2) values(1,1)")
      stmt.execute("upsert into test(c1,c2) values(2,2)")
      val rs = stmt.executeQuery("select * from test limit 1000")
      while (rs.next()) {
        println(rs.getString(1) + " | " +
          rs.getString(2) )
      }
    } catch {
      case e: SQLException => e.printStackTrace()
    } finally {
      if (null != stmt) {
        stmt.close()
      }
      if (null != conn) {
        conn.close()
      }
    }

    //DataFrame写入
    //生成记录
    val sqlContext = new SQLContext(sc)
    val testRDD = sc.parallelize(Array("3 3","4 4")).map(_.split(" "))
    val schema = StructType(List(StructField("c1", IntegerType, true),StructField("c2", IntegerType, true)))
    val rowRDD = testRDD.map(p => Row(p(0).toInt,p(1).toInt))
    val testDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    testDataFrame.show()
    
    // 写入记录
    testDataFrame
      .write
      .mode("append")
      .jdbc("jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "test", info)

    //DataFrame读取, option的两种写法
    val df1 = sqlContext
      .read.
      format("jdbc")
      .options(Map(
        "url" -> url,
        "driver" -> driver,
        "dbtable" -> "TEST",
        "fetchsize" -> "10000",
        "user" -> "root",
        "password" -> "root"))
      .load()
    df1.show()

    val jdbcDF2 = sqlContext.read.format("jdbc")
      .option("url", url)
      .option("driver", driver)
      .option("dbtable", "test")
      .option("fetchsize", "10000")
      .option("user", "xxxx")
      .option("password", "xxxx")
      .load()
    jdbcDF2.show()

    // 将SQL表写入parquet文件
    df1.select("*").write.format("parquet").save("file:///Volumes/wukong/data/work/spark/data/test.parquet")

    // 从parquet文件中加载SQL表
    val df2 = sqlContext.read.load("file:///Volumes/wukong/data/work/spark/data/test.parquet")
    df2.show()
  }
}

3.3 java文件示例

StatementTest.java, 记得修改url,user,password信息。

import org.apache.phoenix.queryserver.client.Driver;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;

public class StatementTest {
  public static void main(String[] args) {
    Connection pconn = null;
    Statement stmt = null;
    try {
      Class.forName(Driver.class.getName());
      String url = "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF";
      Properties props = new Properties();
      props.put("user", "xxxx");
      props.put("password", "xxxx");
      pconn = DriverManager.getConnection(url, props);
      pconn.setAutoCommit(true);
      stmt = pconn.createStatement();
      stmt.execute("drop table if exists test");
      stmt.execute("create table test(c1 integer primary key, c2 integer)");
      stmt.execute("upsert into test(c1,c2) values(1,1)");
      stmt.execute("upsert into test(c1,c2) values(2,2)");
      ResultSet rs = stmt.executeQuery("select * from test limit 1000");
      while (rs.next()) {
        System.out.println(rs.getString(1) + " | " +
            rs.getString(2));
      }
      stmt.execute("drop table if exists test");
    } catch (Throwable e) {
      e.printStackTrace();
    } finally {
      try {
        if (pconn != null) {
          pconn.close();
        }
      } catch (Throwable e) {
        e.printStackTrace();
      }
    }
  }
}

3.4 打包

mvn clean package -DskipTests

target下生成demo-1.0-SNAPSHOT.jar

3.5 提交到本地运行

./bin/spark-submit  --master local --verbose --class PhoenixTest1 /Volumes/wukong/data/work/spark/demo/target/demo-1.0-SNAPSHOT.jar 
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
6月前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之spark客户端执行时,报错,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
存储 SQL 分布式计算
DataWorks_数据开发_EMR Spark节点_计算Pi和对接MaxCompute案例
DataWorks_数据开发_EMR Spark节点 1)计算Pi; 2)对接MaxCompute。
606 0
DataWorks_数据开发_EMR Spark节点_计算Pi和对接MaxCompute案例
|
缓存
spark2.1.0之源码分析——RPC客户端TransportClient详解
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/82143001 提示:阅读本文前最好先阅读: 《Spark2.
1679 0
|
缓存 分布式计算 前端开发
spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80981101 提示:阅读本文前最好先阅读《Spark2.1.0之内置RPC框架》和《spark2.1.0之源码分析——RPC配置TransportConf》。
1546 0
|
27天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
81 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
63 0
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
42 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
93 0
|
29天前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
51 6