开发者社区 > 大数据与机器学习 > 大数据计算 MaxCompute > 正文

开源spark3.1.3结构化流写maxcompute报错

当我使用https://github.com/aliyun/aliyun-maxcompute-data-collectors/tree/master/spark-datasource-v3.1 中开源的spark连接器往maxcompute写数据时会在固定时间段报错,白天可以正常写入数据,但是到凌晨有一定概率会报错,提示报错如下:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 32.0 failed 4 times, most recent failure: Lost task 1.3 in stage 32.0 (TID 130) (10.233.122.167 executor 1): java.net.SocketException: Unexpected end of file from server
at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(Unknown Source)
at java.base/sun.net.www.http.HttpClient.parseHTTP(Unknown Source)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown Source)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown Source)
at java.base/java.net.HttpURLConnection.getResponseCode(Unknown Source)
at com.aliyun.odps.commons.transport.DefaultConnection.getResponse(DefaultConnection.java:132)
at com.aliyun.odps.tunnel.io.TunnelRecordWriter.write(TunnelRecordWriter.java:75)
at com.aliyun.odps.cupid.table.v1.tunnel.impl.TunnelWriter.write(TunnelWriter.java:62)
at com.aliyun.odps.cupid.table.v1.tunnel.impl.TunnelWriter.write(TunnelWriter.java:19)
at org.apache.spark.sql.odps.writer.DynamicPartitionWriter.write(DynamicPartitionWriter.scala:47)
at org.apache.spark.sql.odps.writer.DynamicPartitionWriter.write(DynamicPartitionWriter.scala:30)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Suppressed: java.io.IOException: Stream is closed
at java.base/sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(Unknown Source)
at java.base/sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(Unknown Source)
at java.base/java.util.zip.DeflaterOutputStream.deflate(Unknown Source)
at java.base/java.util.zip.DeflaterOutputStream.write(Unknown Source)
at org.apache.commons.io.output.ProxyOutputStream.write(ProxyOutputStream.java:90)
at com.google.protobuf.CodedOutputStream.refreshBuffer(CodedOutputStream.java:833)
at com.google.protobuf.CodedOutputStream.writeRawByte(CodedOutputStream.java:892)
at com.google.protobuf.CodedOutputStream.writeRawByte(CodedOutputStream.java:900)
at com.google.protobuf.CodedOutputStream.writeRawVarint32(CodedOutputStream.java:1012)
at com.google.protobuf.CodedOutputStream.writeTag(CodedOutputStream.java:994)
at com.google.protobuf.CodedOutputStream.writeSInt64(CodedOutputStream.java:273)
at com.aliyun.odps.commons.proto.ProtobufRecordStreamWriter.close(ProtobufRecordStreamWriter.java:371)
at com.aliyun.odps.tunnel.io.TunnelRecordWriter.close(TunnelRecordWriter.java:85)
at com.aliyun.odps.cupid.table.v1.tunnel.impl.TunnelWriter.close(TunnelWriter.java:71)
at org.apache.spark.sql.odps.writer.DynamicPartitionWriter.abort(DynamicPartitionWriter.scala:62)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$6(WriteToDataSourceV2Exec.scala:448)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1484)
... 10 more
Suppressed: java.lang.NullPointerException: Deflater has been closed
at java.base/java.util.zip.Deflater.ensureOpen(Unknown Source)
at java.base/java.util.zip.Deflater.deflate(Unknown Source)
at java.base/java.util.zip.Deflater.deflate(Unknown Source)
at java.base/java.util.zip.DeflaterOutputStream.deflate(Unknown Source)
at java.base/java.util.zip.DeflaterOutputStream.write(Unknown Source)
at org.apache.commons.io.output.ProxyOutputStream.write(ProxyOutputStream.java:90)
at com.google.protobuf.CodedOutputStream.refreshBuffer(CodedOutputStream.java:833)
at com.google.protobuf.CodedOutputStream.writeRawByte(CodedOutputStream.java:892)
at com.google.protobuf.CodedOutputStream.writeRawByte(CodedOutputStream.java:900)
at com.google.protobuf.CodedOutputStream.writeRawVarint32(CodedOutputStream.java:1012)
at com.google.protobuf.CodedOutputStream.writeTag(CodedOutputStream.java:994)
at com.google.protobuf.CodedOutputStream.writeSInt64(CodedOutputStream.java:273)
at com.aliyun.odps.commons.proto.ProtobufRecordStreamWriter.close(ProtobufRecordStreamWriter.java:371)
at com.aliyun.odps.tunnel.io.TunnelRecordWriter.close(TunnelRecordWriter.java:85)
at com.aliyun.odps.cupid.table.v1.tunnel.impl.TunnelWriter.close(TunnelWriter.java:71)
at org.apache.spark.sql.odps.writer.DynamicPartitionWriter.close(DynamicPartitionWriter.scala:68)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$9(WriteToDataSourceV2Exec.scala:452)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1495)
... 10 more

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2303)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2252)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2251)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2251)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2490)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2432)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2421)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)
... 49 more
Caused by: java.net.SocketException: Unexpected end of file from server
at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(Unknown Source)
at java.base/sun.net.www.http.HttpClient.parseHTTP(Unknown Source)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown Source)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown Source)
at java.base/java.net.HttpURLConnection.getResponseCode(Unknown Source)
at com.aliyun.odps.commons.transport.DefaultConnection.getResponse(DefaultConnection.java:132)
at com.aliyun.odps.tunnel.io.TunnelRecordWriter.write(TunnelRecordWriter.java:75)
at com.aliyun.odps.cupid.table.v1.tunnel.impl.TunnelWriter.write(TunnelWriter.java:62)
at com.aliyun.odps.cupid.table.v1.tunnel.impl.TunnelWriter.write(TunnelWriter.java:19)
at org.apache.spark.sql.odps.writer.DynamicPartitionWriter.write(DynamicPartitionWriter.scala:47)
at org.apache.spark.sql.odps.writer.DynamicPartitionWriter.write(DynamicPartitionWriter.scala:30)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:416)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Suppressed: java.io.IOException: Stream is closed
at java.base/sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(Unknown Source)
at java.base/sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(Unknown Source)
at java.base/java.util.zip.DeflaterOutputStream.deflate(Unknown Source)
at java.base/java.util.zip.DeflaterOutputStream.write(Unknown Source)
at org.apache.commons.io.output.ProxyOutputStream.write(ProxyOutputStream.java:90)
at com.google.protobuf.CodedOutputStream.refreshBuffer(CodedOutputStream.java:833)
at com.google.protobuf.CodedOutputStream.writeRawByte(CodedOutputStream.java:892)
at com.google.protobuf.CodedOutputStream.writeRawByte(CodedOutputStream.java:900)
at com.google.protobuf.CodedOutputStream.writeRawVarint32(CodedOutputStream.java:1012)
at com.google.protobuf.CodedOutputStream.writeTag(CodedOutputStream.java:994)
at com.google.protobuf.CodedOutputStream.writeSInt64(CodedOutputStream.java:273)
at com.aliyun.odps.commons.proto.ProtobufRecordStreamWriter.close(ProtobufRecordStreamWriter.java:371)
at com.aliyun.odps.tunnel.io.TunnelRecordWriter.close(TunnelRecordWriter.java:85)
at com.aliyun.odps.cupid.table.v1.tunnel.impl.TunnelWriter.close(TunnelWriter.java:71)
at org.apache.spark.sql.odps.writer.DynamicPartitionWriter.abort(DynamicPartitionWriter.scala:62)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$6(WriteToDataSourceV2Exec.scala:448)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1484)
... 10 more
Suppressed: java.lang.NullPointerException: Deflater has been closed
at java.base/java.util.zip.Deflater.ensureOpen(Unknown Source)
at java.base/java.util.zip.Deflater.deflate(Unknown Source)
at java.base/java.util.zip.Deflater.deflate(Unknown Source)
at java.base/java.util.zip.DeflaterOutputStream.deflate(Unknown Source)
at java.base/java.util.zip.DeflaterOutputStream.write(Unknown Source)
at org.apache.commons.io.output.ProxyOutputStream.write(ProxyOutputStream.java:90)
at com.google.protobuf.CodedOutputStream.refreshBuffer(CodedOutputStream.java:833)
at com.google.protobuf.CodedOutputStream.writeRawByte(CodedOutputStream.java:892)
at com.google.protobuf.CodedOutputStream.writeRawByte(CodedOutputStream.java:900)
at com.google.protobuf.CodedOutputStream.writeRawVarint32(CodedOutputStream.java:1012)
at com.google.protobuf.CodedOutputStream.writeTag(CodedOutputStream.java:994)
at com.google.protobuf.CodedOutputStream.writeSInt64(CodedOutputStream.java:273)
at com.aliyun.odps.commons.proto.ProtobufRecordStreamWriter.close(ProtobufRecordStreamWriter.java:371)
at com.aliyun.odps.tunnel.io.TunnelRecordWriter.close(TunnelRecordWriter.java:85)
at com.aliyun.odps.cupid.table.v1.tunnel.impl.TunnelWriter.close(TunnelWriter.java:71)
at org.apache.spark.sql.odps.writer.DynamicPartitionWriter.close(DynamicPartitionWriter.scala:68)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$9(WriteToDataSourceV2Exec.scala:452)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1495)
... 10 more
23/11/06 05:49:05 INFO ShutdownHookManager: Shutdown hook called
23/11/06 05:49:05 INFO ShutdownHookManager: Deleting directory /var/data/spark-d92fd15e-9117-485c-a426-29bb36269af6/spark-b2b68550-ac67-4daa-9ace-1796efe27dc2
23/11/06 05:49:05 INFO ShutdownHookManager: Deleting directory /tmp/spark-16859bbb-6a2a-43c1-aa11-32ca5ee840a2

目前怀疑是在凌晨的时候dataworks的定时调度较多,导致网络阻塞引起的,请问有人了解该问题吗?是否有什么解决办法?

展开
收起
灵魂丶不语 2023-12-01 10:43:45 68 0
1 条回答
写回答
取消 提交回答
  • 这个问题可能是由于网络不稳定或者服务器端的问题导致的。你可以尝试以下方法来解决这个问题:

    1. 增加重试次数:在写入数据时,可以设置一个较大的重试次数,当遇到错误时,会自动重试,直到达到最大重试次数。

    2. 增加超时时间:在连接服务器时,可以设置一个较长的超时时间,以便在网络不稳定的情况下,有足够的时间来完成连接。

    3. 检查服务器状态:确保服务器正常运行,没有出现故障或维护。

    4. 使用其他数据源:如果问题仍然存在,可以考虑使用其他数据源,如MaxCompute的ODPS Connector,看看是否能正常工作。

    2023-12-02 16:16:06
    赞同 展开评论 打赏

MaxCompute(原ODPS)是一项面向分析的大数据计算服务,它以Serverless架构提供快速、全托管的在线数据仓库服务,消除传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您经济并高效的分析处理海量数据。

相关产品

  • 云原生大数据计算服务 MaxCompute
  • 相关电子书

    更多
    Hybrid Cloud and Apache Spark 立即下载
    Scalable Deep Learning on Spark 立即下载
    Comparison of Spark SQL with Hive 立即下载