【问题】spark运行python写的mapreduce任务,hadoop平台报错,java.net.ConnectException: 连接超时

简介:

问题:

用spark-submit以yarn-client方式提交任务,在集群的某些节点上的任务出现连接超时的错误,排查过各种情况后,确定在防火墙配置上出现问题。

原因:

我猜测是python程序启动后,作为Server,hadoop中资源调度是以java程序作为Client端访问,

Python启动的Server端需要接受localhost的client访问。

当你从一台linux主机向自身发送数据包时,实际上的数据包是通过虚拟的lo接口来发送接受的,而不会通过你的物理网卡 eth0/eth1....,此时防火墙就要允许来自本地lo接口的数据包,需要加入以下配置允许Python Server接受来自本地lo接口的数据包,然后解决该问题。

1
iptables -A INPUT -i lo -j ACCEPT 添加iptables规则,允许来自于lo接口的数据包


任务的部分报错日志

16/07/25 13:56:44 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev d62701d4d05dfa6115bbaf8d9dff002df142e62d]
16/07/25 13:56:44 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/07/25 13:56:44 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/07/25 13:56:44 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/07/25 13:56:44 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/07/25 13:56:44 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
16/07/25 13:57:47 WARN python.PythonWorkerFactory: Failed to open socket to Python daemon:
java.net.ConnectException: 连接超时
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:579)
	at java.net.Socket.connect(Socket.java:528)
	at java.net.Socket.<init>(Socket.java:425)
	at java.net.Socket.<init>(Socket.java:241)
	at org.apache.spark.api.python.PythonWorkerFactory.createSocket$1(PythonWorkerFactory.scala:75)
	at org.apache.spark.api.python.PythonWorkerFactory.liftedTree1$1(PythonWorkerFactory.scala:90)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:89)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:62)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:135)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:101)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
16/07/25 13:57:47 WARN python.PythonWorkerFactory: Assuming that daemon unexpectedly quit, attempting to restart
16/07/25 13:58:51 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)


参考地址:


http://stackoverflow.com/questions/15659132/connection-refused-between-a-python-server-and-a-java-client

http://stackoverflow.com/questions/26297551/connecting-python-and-java-via-sockets/38605208#38605208

http://www.zybang.com/question/9ab66451988eb2768194817f25a0b7a9.html





     本文转自巧克力黒 51CTO博客,原文链接:http://blog.51cto.com/10120275/1830831,如需转载请自行联系原作者


相关文章
|
2月前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
27 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
|
2月前
|
SQL 分布式计算 Java
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作
35 3
|
3月前
|
消息中间件 分布式计算 Java
Linux环境下 java程序提交spark任务到Yarn报错
Linux环境下 java程序提交spark任务到Yarn报错
45 5
|
3月前
|
分布式计算 资源调度 Hadoop
在YARN集群上运行部署MapReduce分布式计算框架
主要介绍了如何在YARN集群上配置和运行MapReduce分布式计算框架,包括准备数据、运行MapReduce任务、查看任务日志,并启动HistoryServer服务以便于日志查看。
67 0
|
4月前
|
分布式计算 Hadoop Java
Hadoop_MapReduce中的WordCount运行详解
MapReduce的WordCount程序在分布式系统中计算大数据集中单词出现的频率时,提供了一个可以复用和可伸缩的解决方案。它体现了MapReduce编程模型的强大之处:简单、可靠且将任务自动分布到一个集群中去执行。它首先运行一系列的Map任务来处理原始数据,然后通过Shuffle和Sort机制来组织结果,最后通过运行Reduce任务来完成最终计算。因此,即便数据量非常大,通过该模型也可以高效地进行处理。
99 1
|
5月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
SQL 分布式计算 数据处理
MaxCompute操作报错合集之使用Spark查询时函数找不到的原因是什么
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
分布式计算 Java Hadoop
如何在Java中使用MapReduce
如何在Java中使用MapReduce
|
5月前
|
存储 分布式计算 Hadoop
阿里巴巴飞天大数据架构体系与Hadoop生态系统的深度融合:构建高效、可扩展的数据处理平台
技术持续创新:随着新技术的不断涌现和应用场景的复杂化,阿里巴巴将继续投入研发力量推动技术创新和升级换代。 生态系统更加完善:Hadoop生态系统将继续扩展和完善,为用户提供更多元化、更灵活的数据处理工具和服务。
|
6月前
|
分布式计算 资源调度 Hadoop
Java大数据处理:Spark与Hadoop整合
Java大数据处理:Spark与Hadoop整合