在我的工作中,有不同的环境(开发,预生产和生产),在每个环境中,我们在其Hive Metastore中都有某些表。我的用户有权通过直接访问和查询所有这些Metastore,但我想使用sqlContext(或HiveContext)在spark-shell会话中访问这些Metallores。
例如,当我使用ssh访问Preproduction环境时,如果我启动spark-shell会话,它会自动创建一个sqlContext变量,我可以使用该变量对Preproduction Metastore执行查询。
我也可以使用beeline对Preproduction Metastore的Production Metoreore执行查询,所以我尝试更改Hive中的一些配置(如何在SparkSQL中以编程方式连接到Hive Metastore?)。我更改了以下属性:
hive.metastore.uris和hive.server2.authentication.kerberos.principal到生产环境中的对应属性。
spark-shell中的代码:
System.setProperty("hive.server2.authentication.kerberos.principal","hive/URL@URL2")
System.setProperty("hive.metastore.uris","thrift://URLTOPRODUCTION:9083")
import org.apache.spark.sql.hive.HiveContext
val hive=new HiveContext(sc)
val df=hive.sql("SELECT * FROM DB.Table limit 10")
但是当我执行前一个代码块的最后一个句子时,我遇到了以下错误。
java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice1
at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:406)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:762)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:693)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:158)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache
$$ anonfun$1.apply(interfaces.scala:449) at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache $$
anonfun$1.apply(interfaces.scala:447)
at scala.collection.TraversableLike
$$ anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike $$
anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)
at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:477)
at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation
$$ fileStatusCache$lzycompute(interfaces.scala:489) at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation $$
fileStatusCache(interfaces.scala:487)
at org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:494)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:398)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation
$$ metadataCache$lzycompute(ParquetRelation.scala:145) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation $$
metadataCache(ParquetRelation.scala:143)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
$$ anonfun$6.apply(ParquetRelation.scala:202) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation $$
anonfun$6.apply(ParquetRelation.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:202)
at org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:636)
at org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)
at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:39)
at org.apache.spark.sql.hive.HiveMetastoreCatalog
$$ anonfun$12.apply(HiveMetastoreCatalog.scala:504) at org.apache.spark.sql.hive.HiveMetastoreCatalog $$
anonfun$12.apply(HiveMetastoreCatalog.scala:503)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.hive.HiveMetastoreCatalog.org$apache$spark$sql$hive$HiveMetastoreCatalog
$$ convertToParquetRelation(HiveMetastoreCatalog.scala:503) at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions $$
anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:565)
at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions
$$ anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:545) at org.apache.spark.sql.catalyst.trees.TreeNode $$
anonfun$transformUp$1.apply(TreeNode.scala:335)
at org.apache.spark.sql.catalyst.trees.TreeNode
$$ anonfun$transformUp$1.apply(TreeNode.scala:335) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334) at org.apache.spark.sql.catalyst.trees.TreeNode $$
anonfun$5.apply(TreeNode.scala:332)
at org.apache.spark.sql.catalyst.trees.TreeNode
$$ anonfun$5.apply(TreeNode.scala:332) at org.apache.spark.sql.catalyst.trees.TreeNode $$
anonfun$4.apply(TreeNode.scala:281)
at scala.collection.Iterator
$$ anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332) at org.apache.spark.sql.catalyst.trees.TreeNode $$
anonfun$5.apply(TreeNode.scala:332)
at org.apache.spark.sql.catalyst.trees.TreeNode
$$ anonfun$5.apply(TreeNode.scala:332) at org.apache.spark.sql.catalyst.trees.TreeNode $$
anonfun$4.apply(TreeNode.scala:281)
at scala.collection.Iterator
$$ anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332) at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$.apply(HiveMetastoreCatalog.scala:545) at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$.apply(HiveMetastoreCatalog.scala:539) at org.apache.spark.sql.catalyst.rules.RuleExecutor $$
anonfun$execute$1
$$ anonfun$apply$1.apply(RuleExecutor.scala:83) at org.apache.spark.sql.catalyst.rules.RuleExecutor $$
anonfun$execute$1
$$ anonfun$apply$1.apply(RuleExecutor.scala:80) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor $$
anonfun$execute$1.apply(RuleExecutor.scala:80)
at org.apache.spark.sql.catalyst.rules.RuleExecutor
$$ anonfun$execute$1.apply(RuleExecutor.scala:72) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:37) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:37) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:35) at org.apache.spark.sql.DataFrame.(DataFrame.scala:133) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:829) at $iwC $$
iwC
$$ iwC $$
iwC
$$ iwC $$
iwC
$$ iwC $$
iwC.(:30)
at $iwC
$$ iwC $$
iwC
$$ iwC $$
iwC
$$ iwC $$
iwC.(:35)
at $iwC
$$ iwC $$
iwC
$$ iwC $$
iwC
$$ iwC.(:37) at $iwC $$
iwC
$$ iwC $$
iwC
$$ iwC.(:39) at $iwC $$
iwC
$$ iwC $$
iwC.(:41)
at $iwC
$$ iwC $$
iwC.(:43)
at $iwC
$$ iwC.(:45) at $iwC.(:47) at (:49) at .(:53) at .() at .(:7) at .() at $print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1045) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:821) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:800) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop $$
loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop
$$ anonfun$org$apache$spark$repl$SparkILoop $$
process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop
$$ anonfun$org$apache$spark$repl$SparkILoop $$
process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop
$$ anonfun$org$apache$spark$repl$SparkILoop $$
process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop
$$ process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1064) at org.apache.spark.repl.Main$.main(Main.scala:35) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit $$
runMain(SparkSubmit.scala:730)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.UnknownHostException: nameservice1
... 141 more
我正在使用Spark 1.6.0和Scala 2.10.5的Cloudera发行版。
最后,在我查看了spark-shell在服务器中自动创建的sqlContext变量的配置之后,我看到有很多url和配置变量,我在HDFS或其他服务器中没有权限。需要对PROD Metastore执行查询。
我知道用直线工作查询PROD Metastore,我知道我可以通过JDBC查询这个Metastore,所以我把这个调用的JDBC URL带到beeline。
然后我使用这个JDBC URL并开始使用本机Java(来自Scala)方法和实用程序来连接DBvíaJDBC:
/We will need hive-jdbc-0.14.0.jar to connect to a Hive metastore via JDBC /
import java.sql.ResultSetMetaData
import java.sql.{DriverManager, Connection, Statement, ResultSet}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row
/ In the following lines I connect to Prod Metastore via JDBC and I execute the query as if I am connecting to a simple DB. Notice that, using this method, you are not using distributed computing /
val url="jdbc:hive2://URL_TO_PROD_METASTORE/BD;CREDENTIALS OR URL TO KERBEROS"
val query="SELECT * FROM BD.TABLE LIMIT 100"
val driver="org.apache.hive.jdbc.HiveDriver"
Class.forName(driver).newInstance
val conn: Connection = DriverManager.getConnection(url)
val r: ResultSet = conn.createStatement.executeQuery(query)
val list =scala.collection.mutable.MutableList[Row]()
/ Now we want to get all the values from all the columns. Notice that I creat a ROW object for each row of the results. Then I add each Row to a MutableList/
while(r.next()){
var value : Array[String] = new ArrayString)
for(i<-1 to r.getMetaData.getColumnCount()){
value(i-1) = r.getString(i)}
list+=Row.fromSeq(value)}
/ Now we have the results of the query to PROD metastore and we want to transform this data to a Dataframe so we have to create a StructType with the names of the columns and we also need a list of rows with previous results /
var array : Array[StructField] = new ArrayStructField)
for(i<- 1 to r.getMetaData.getColumnCount){
array(i-1) =StructField(r.getMetaData.getColumnName(i),StringType)}
val struct=StructType(array)
val rdd=sc.parallelize(list)
val df=sqlContext.createDataFrame(rdd,struct)
r.close
conn.close
请注意,此问题与我的其他答案有关。因为将Hive查询的结果导出到CSV的最佳方法是使用Spark(如何将Hive表导出为CSV文件?)。我想从PRE服务器中的Spark会话查询Prod Metastore。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。