如何在spark-jdbc应用程序中提供表名来读取RDBMS数据库中的数据?-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

如何在spark-jdbc应用程序中提供表名来读取RDBMS数据库中的数据?

2018-12-19 16:06:51 2793 1

我正在尝试使用spark读取greenplum数据库中的表格,如下所示:

val execQuery = s"select ${allColumns}, 0 as ${flagCol} from schema.table where period_year=2017 and period_num=12"
val yearDF = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016")

                            .option("user", devUserName)
                            .option("password", devPassword)
                            .option("partitionColumn","header_id")
                            .option("lowerBound", 16550)
                            .option("upperBound", 1152921481695656862L)
                            .option("numPartitions",450).load()

当我使用spark-submit运行代码时,我得到一个例外:

Exception in thread "main" org.postgresql.util.PSQLException: ERROR: relation "public.(select je_header_id,source_system_name,je_line_num,last_update" does not exist
Position: 15

at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2310)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2023)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:217)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:421)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:318)
at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:281)
at com.zaxxer.hikari.pool.ProxyStatement.executeQuery(ProxyStatement.java:111)
at com.zaxxer.hikari.pool.HikariProxyStatement.executeQuery(HikariProxyStatement.java)
at io.pivotal.greenplum.spark.jdbc.Jdbc$.resolveTable(Jdbc.scala:301)
at io.pivotal.greenplum.spark.GreenplumRelationProvider.createRelation(GreenplumRelationProvider.scala:29)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
at com.partition.source.YearPartition$.prepareFinalDF$1(YearPartition.scala:141)
at com.partition.source.YearPartition$.main(YearPartition.scala:164)
at com.partition.source.YearPartition.main(YearPartition.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:782)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

在execQuery我可以看到模式名称和表名称正确形成。当我提交代码时,它说public.(select je_header_id,source_system_name,) relation not found。我不明白为什么将public模式名称和查询(select je_header_id,source_system_name,je_line_num,last_update" 作为表名称。以下是我使用的spark-submit:

SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.YearPartition --master=yarn --conf spark.ui.port=4090 --driver-class-path /home/devuser/jars/greenplum-spark_2.11-1.3.0.jar --conf spark.jars=/home/devuser/jars/greenplum-spark_2.11-1.3.0.jar --executor-cores 3 --executor-memory 13G --keytab /home/devuser/devuser.keytab --principal devuser@DEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/devuser/jars/greenplum-spark_2.11-1.3.0.jar splinter_2.11-0.1.jar

取消 提交回答
全部回答(1)
  • 社区小助手
    2019-07-17 23:23:00

    如果您使用spark jdbc,则可以包装查询并将其传递给dbtable参数。如果关键只是像任何jdbc一样工作,这应该工作。

    val query = """
    (select a.id,b,id,a.name from a left outer join b on a.id=b.id

    limit 100) foo

    """

    val df = sqlContext.format("jdbc").
    option("url", "jdbc:mysql://localhost:3306/local_content").
    option("driver", "com.mysql.jdbc.Driver").
    option("useUnicode", "true").
    option("continueBatchOnError","true").
    option("useSSL", "false").
    option("user", "root").
    option("password", "").
    option("dbtable",query).
    load()

    0 0
相关问答

5

回答

Spark 【问答合集】

社区小助手 2019-05-29 14:13:40 129558浏览量 回答数 5

20

回答

【大咖问答】对话PostgreSQL 中国社区发起人之一,阿里云数据库高级专家 德哥

阿里ACE 彭飞 2019-07-10 09:36:10 1225860浏览量 回答数 20

145

回答

【新手入门】云服务器linux使用手册

fanyue88888 2012-11-26 17:14:18 159460浏览量 回答数 145

8

回答

OceanBase 使用动画(持续更新)

mq4096 2019-02-20 17:16:36 340540浏览量 回答数 8

22

回答

爬虫数据管理【问答合集】

我是管理员 2018-08-10 16:37:41 148557浏览量 回答数 22

2

回答

mySQL数据库报错You have an error in your SQL syntax

落地花开啦 2016-02-14 16:09:24 133185浏览量 回答数 2

10

回答

[@墨玖tao][¥20]为什么流式处理框架都是 java 写成的,JVM 是不是在流和批存在着特殊优势。还有分布式资源调度,感觉Mesos 的成长速度跟不上 Yarn。这是为什么?

管理贝贝 2018-10-23 13:18:03 137442浏览量 回答数 10

39

回答

安全组详解,新手必看教程

我的中国 2017-11-30 15:23:46 262940浏览量 回答数 39

21

回答

请教一下数据量有100万条左右要什么配置?

易网网络 2013-03-27 15:18:02 193545浏览量 回答数 21

251

回答

阿里云LNAMP(Linux + Nginx + Apache + MySQL + PHP)环境一键安装脚本

云代维 2014-02-14 15:26:06 309416浏览量 回答数 251
+关注
社区小助手
社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。
12
文章
824
问答
问答排行榜
最热
最新
相关电子书
更多
JS零基础入门教程(上册)
立即下载
性能优化方法论
立即下载
手把手学习日志服务SLS,云启实验室实战指南
立即下载