Spark写入流到IBM Cloud对象存储失败,“Access KEY为空。请提供有效的访问密钥“-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

Spark写入流到IBM Cloud对象存储失败,“Access KEY为空。请提供有效的访问密钥“

2018-12-19 17:01:20 2872 1

我目前正在使用Apache Spark 2.3.2并创建一个管道来从文件系统中读取流csv文件,然后将其写入IBM Cloud对象存储。

我正在使用Stocator连接器。通过以下配置,对IBM COS的常规读取和写入工作正常。但是,读写流操作会抛出错误:

com.ibm.stocator.fs.common.exception.ConfigurationParseException:配置解析异常:Access KEY为空。请提供有效的访问密钥

stocator配置:

sc.hadoopConfiguration.set("fs.cos.impl","com.ibm.stocator.fs.ObjectStoreFileSystem")
sc.hadoopConfiguration.set("fs.stocator.scheme.list","cos")
sc.hadoopConfiguration.set("fs.stocator.cos.impl","com.ibm.stocator.fs.cos.COSAPIClient")
sc.hadoopConfiguration.set("fs.stocator.cos.scheme", "cos")
sc.hadoopConfiguration.set("fs.cos.service_name.endpoint", "{url}")
sc.hadoopConfiguration.set("fs.cos.service_name.access.key", "{access_key}")
sc.hadoopConfiguration.set("fs.cos.service_name.secret.key", {secret_key})
读取流:

val csvDF = sqlContext
.readStream
.option("sep", ",")
.schema(fschema)
.csv({path})
写入流:

val query = csvDF
.writeStream
.outputMode(OutputMode.Append())
.format("parquet")
.option("checkpointLocation", "cos://bucket.service_name/")
.option("path", "cos://bucket.service_name/")
.start()
错误日志:

"2018-12-17 16:51:14 WARN FileStreamSinkLog:66 - Could not use FileContext API for managing metadata log files at path cos://bucket.service_name/_spark_metadata. Using FileSystem API instead for managing log files. The log may be inconsistent under failures.
2018-12-17 16:51:14 INFO ObjectStoreVisitor:110 - Stocator registered as cos for cos://bucket.service_name/_spark_metadata
2018-12-17 16:51:14 INFO COSAPIClient:251 - Init : cos://bucket.service_name/_spark_metadata
Exception in thread "main" com.ibm.stocator.fs.common.exception.ConfigurationParseException: Configuration parse exception: Access KEY is empty. Please provide valid access key"

有没有办法解决此错误或其他替代方案以达成解决方案

更新了更多日志:

scala> val csvDF = spark.readStream.option("sep", ",").schema(fschema).csv("C:\Users\abc\Desktop\stream")
csvDF: org.apache.spark.sql.DataFrame = [EMP_NO: string, EMP_SALARY: string ... 2 more fields]

scala> val query = csvDF.writeStream.outputMode(OutputMode.Append()).format("csv").option("checkpointLocation", "cos://stream-csv.Cloud Object Storage-POCDL/").option("path", "cos://stream-csv.Cloud Object Storage-POCDL/").start()
18/12/18 10:47:40 WARN FileStreamSinkLog: Could not use FileContext API for managing metadata log files at path cos://stream-csv.Cloud%20Object%20Storage-POCDL/_spark_metadata. Using FileSystem API instead for managing log files. The log may be inconsistent under failures.
18/12/18 10:47:40 DEBUG ObjectStoreVisitor: Stocator schema space : cos, provided cos. Implementation com.ibm.stocator.fs.cos.COSAPIClient
18/12/18 10:47:40 INFO ObjectStoreVisitor: Stocator registered as cos for cos://stream-csv.Cloud%2520Object%2520Storage-POCDL/_spark_metadata
18/12/18 10:47:40 DEBUG ObjectStoreVisitor: Load implementation class com.ibm.stocator.fs.cos.COSAPIClient
18/12/18 10:47:40 DEBUG ObjectStoreVisitor: Load direct init for COSAPIClient. Overwrite com.ibm.stocator.fs.cos.COSAPIClient
18/12/18 10:47:40 INFO COSAPIClient: Init : cos://stream-csv.Cloud%2520Object%2520Storage-POCDL/_spark_metadata
18/12/18 10:47:40 DEBUG ConfigurationHandler: COS driver: initialize start for cos://stream-csv.Cloud%2520Object%2520Storage-POCDL/_spark_metadata
18/12/18 10:47:40 DEBUG ConfigurationHandler: extracted host name from cos://stream-csv.Cloud%2520Object%2520Storage-POCDL/_spark_metadata is stream-csv.Cloud%20Object%20Storage-POCDL
18/12/18 10:47:40 DEBUG ConfigurationHandler: Initiaize for bucket: stream-csv, service: Cloud%20Object%20Storage-POCDL
18/12/18 10:47:40 DEBUG ConfigurationHandler: Filesystem cos://stream-csv.Cloud%2520Object%2520Storage-POCDL/_spark_metadata, using conf keys for fs.cos.Cloud%20Object%20Storage-POCDL. Alternative list [fs.s3a.Cloud%20Object%20Storage-POCDL, fs.s3d.Cloud%20Object%20Storage-POCDL]
18/12/18 10:47:40 DEBUG ConfigurationHandler: Initialize completed successfully for bucket stream-csv service Cloud%20Object%20Storage-POCDL
18/12/18 10:47:40 DEBUG MemoryCache: Guava initiated with size 2000 expiration 30 seconds
18/12/18 10:47:40 ERROR ObjectStoreVisitor: Configuration parse exception: Access KEY is empty. Please provide valid access key
com.ibm.stocator.fs.common.exception.ConfigurationParseException: Configuration parse exception: Access KEY is empty. Please provide valid access key
at com.ibm.stocator.fs.cos.COSAPIClient.initiate(COSAPIClient.java:276)
at com.ibm.stocator.fs.ObjectStoreVisitor.getStoreClient(ObjectStoreVisitor.java:130)
at com.ibm.stocator.fs.ObjectStoreFileSystem.initialize(ObjectStoreFileSystem.java:105)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileSystemManager.(HDFSMetadataLog.scala:409)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.createFileManager(HDFSMetadataLog.scala:292)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:63)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46)
at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.(FileStreamSinkLog.scala:85)
at org.apache.spark.sql.execution.streaming.FileStreamSink.(FileStreamSink.scala:98)
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293)
... 49 elided

取消 提交回答
全部回答(1)
  • 社区小助手
    2019-07-17 23:23:03

    刚测试流媒体,似乎对我有用,我测试了一些类似的代码

    val userSchema = spark.read.parquet("/mydata/test.parquet").schema
    val streamDf = spark.readStream.schema(userSchema).parquet("/mydata/")

    streamDf.writeStream.format("parquet").option("checkpointLocation",
    "cos://bucket.my_service/").option("path","cos://bucket.my_service").start()

    你正在使用什么Stocator版本?您可以从日志USER AGENT标头中看到这一点

    0 0
相关问答

1

回答

Noxmobi系统使用流式计算Spark Streaming的目的是什么?

2021-12-08 18:25:33 419浏览量 回答数 1

1

回答

Noxmobi系统使用流式计算Spark Streaming的要求有哪些?

2021-12-08 18:27:58 419浏览量 回答数 1

1

回答

Spark Streaming中的JobGenerator的作用是什么?

2021-12-07 20:14:49 180浏览量 回答数 1

1

回答

Spark Streaming中的StreamingContext的作用是什么?

2021-12-07 20:11:17 208浏览量 回答数 1

1

回答

Spark Streaming中的StreamingContext.start()方法的作用是什么?

2021-12-07 20:13:58 90浏览量 回答数 1

1

回答

Spark Streaming中的Receiver的作用是什么?

2021-12-07 20:14:22 103浏览量 回答数 1

1

回答

Spark Streaming中的DStream的转化操作是什么?

2021-12-07 20:17:05 152浏览量 回答数 1

1

回答

Spark Streaming中的DStream的输出操作是什么?

2021-12-07 20:17:49 186浏览量 回答数 1

0

回答

Spark Streaming进行流数据处理大致可以分为哪些步骤?

2021-12-07 20:31:19 160浏览量 回答数 0

1

回答

独享云虚拟主机更换系统后备案是否失效

2019-01-31 17:15:07 644浏览量 回答数 1
+关注
社区小助手
社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。
文章
问答
问答排行榜
最热
最新
相关电子书
更多
基于Spark的统一数据管理与数据探索平台
立即下载
Hive Bucketing in Apache Spark
立即下载
Spark Compute as a Service @Paypal
立即下载