用IDEA运行spark,读取120M的TXT文件,做了解码跟解压缩的操作,然后进行foreach(print)就报错了 下面是代码
object Test4 {
def decompress(data: Array[Byte]): Array[Byte] = {
var output: Array[Byte] = null
val decompresser = new Inflater
decompresser.reset()
decompresser.setInput(data)
val o = new ByteArrayOutputStream(data.length)
try {
val buf = new Array[Byte](1024)
while ( {
!decompresser.finished
}) {
val i: Int = decompresser.inflate(buf)
if (i == 0) throw new Exception("数据有问题")
o.write(buf, 0, i)
}
output = o.toByteArray
} finally if (o != null) o.close()
decompresser.end()
output
}
def decodeAndDecompress(text: String): String = {
try {
val decode = Base64.getDecoder.decode(text.replace(" ", ""))
val decoBytes = decompress(decode)
new String(decoBytes, Charset.forName("UTF-8"))
} catch {
case ex: Throwable => {
// ex.printStackTrace()
null
}
}
}
def main(args: Array[String]): Unit = {
val url1="/Users/qingdianpan/Documents/file/test/data/00122a92460e4b33b0adf45332f8650cp2.txt"
val url2="/Users/qingdianpan/Documents/file/test/data/test"
val spark = SparkSession
.builder()
.appName("Demo1")
.master("local")
.getOrCreate()
val lines = spark.sparkContext.textFile(url1,500)
import spark.implicits._
val data=lines.map(decodeAndDecompress)
data.foreach(print)
spark.stop()
}
}
报错信息
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:3664)
at java.lang.String.<init>(String.java:207)
at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:570)
at java.nio.CharBuffer.toString(CharBuffer.java:1241)
at org.apache.hadoop.io.Text.decode(Text.java:412)
at org.apache.hadoop.io.Text.decode(Text.java:389)
at org.apache.hadoop.io.Text.toString(Text.java:280)
at org.apache.spark.SparkContext$$anonfun$textFile$1$$anonfun$apply$11.apply(SparkContext.scala:831)
at org.apache.spark.SparkContext$$anonfun$textFile$1$$anonfun$apply$11.apply(SparkContext.scala:831)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/08/05 11:37:58 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 0,5,main]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:3664)
at java.lang.String.<init>(String.java:207)
at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:570)
at java.nio.CharBuffer.toString(CharBuffer.java:1241)
at org.apache.hadoop.io.Text.decode(Text.java:412)
at org.apache.hadoop.io.Text.decode(Text.java:389)
at org.apache.hadoop.io.Text.toString(Text.java:280)
at org.apache.spark.SparkContext$$anonfun$textFile$1$$anonfun$apply$11.apply(SparkContext.scala:831)
at org.apache.spark.SparkContext$$anonfun$textFile$1$$anonfun$apply$11.apply(SparkContext.scala:831)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/08/05 11:37:58 INFO SparkContext: Invoking stop() from shutdown hook
21/08/05 11:37:58 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7947 bytes)
21/08/05 11:37:58 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:3664)
at java.lang.String.<init>(String.java:207)
at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:570)
at java.nio.CharBuffer.toString(CharBuffer.java:1241)
at org.apache.hadoop.io.Text.decode(Text.java:412)
at org.apache.hadoop.io.Text.decode(Text.java:389)
at org.apache.hadoop.io.Text.toString(Text.java:280)
at org.apache.spark.SparkContext$$anonfun$textFile$1$$anonfun$apply$11.apply(SparkContext.scala:831)
at org.apache.spark.SparkContext$$anonfun$textFile$1$$anonfun$apply$11.apply(SparkContext.scala:831)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/08/05 11:37:58 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
21/08/05 11:37:58 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
21/08/05 11:37:58 INFO SparkUI: Stopped Spark web UI at http://172.16.11.94:4040
21/08/05 11:37:58 INFO DAGScheduler: Job 0 failed: foreach at Test4.scala:67, took 1.876303 s
Exception in thread "main" org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:933)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:931)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:931)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2130)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2043)
at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1949)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1948)
at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:575)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:972)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:970)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:970)
at Test4$.main(Test4.scala:67)
at Test4.main(Test4.scala)
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。