org.apache.spark.shuffle.FetchFailedException: Connection from n35-03.fn.ams.osa/172.17.30.15:44122 closed

刘超 1月前 ⋅ 5460 阅读   编辑

一、描述

  情况一:当前配置为每个executor使用4core,8GRAM,启动了10个executor跑spark任务报,org.apache.spark.shuffle.FetchFailedException: Connection from n35-03.fn.ams.osa/172.17.30.15:44122 closed,如下

  

  情况二:从日志中,我们知道连不上n46-08.fn.ams.osa/172.17.64.57:43344,日志如下

sdev@n-adx-hadoop-client-3:~$ yarn logs -applicationId application_1582757194275_603748 | grep n46-08.fn.ams.osa 
INFO - 20/11/26 01:48:05 INFO TaskSetManager: Task 135.0 in stage 3.0 (TID 1085) failed, but another instance of the task has already succeeded, so not re-queuing the task to be re-executed.
INFO - 20/11/26 01:48:05 WARN TaskSetManager: Lost task 143.0 in stage 3.0 (TID 1093, n05-19.fn.ams.osa): FetchFailed(BlockManagerId(1, n46-08.fn.ams.osa, 43344), shuffleId=1, mapId=16, reduceId=143, message=
INFO - org.apache.spark.shuffle.FetchFailedException: Connection from n46-08.fn.ams.osa/172.17.64.57:43344 closed
INFO - at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
INFO - at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
INFO - at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
INFO - at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
INFO - at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
INFO - at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
INFO - at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
INFO - at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
INFO - at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
INFO - at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
INFO - at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
INFO - at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
INFO - at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
INFO - at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
INFO - at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
INFO - at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
INFO - at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
INFO - at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
INFO - at org.apache.spark.scheduler.Task.run(Task.scala:86)
INFO - at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
INFO - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
INFO - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
INFO - at java.lang.Thread.run(Thread.java:745)
INFO - Caused by: java.io.IOException: Connection from n46-08.fn.ams.osa/172.17.64.57:43344 closed
INFO - at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128)
INFO - at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:109)
INFO - at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208)
INFO - at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194)
INFO - at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
INFO - at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:257)
INFO - at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208)
INFO - at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194)
INFO - at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
INFO - at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208)
INFO - at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194)
INFO - at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
INFO - at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182)
INFO - at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208)
INFO - at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194)
INFO - at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:828)
INFO - at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:621)
INFO - at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
INFO - at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
INFO - at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
INFO - ... 1 more

二、分析

  情况一:这种问题一般发生在有大量shuffle操作的时候,task不断的failed,然后又重执行,一直循环下去,直到application失败。

  情况二:1、执行container_address.sh脚本,我们知道n46-08.fn.ams.osa对应container_e41_1582757194275_603748_01_000007,如下

sdev@n-adx-hadoop-client-3:~/liujichao$ ./container_address.sh application_1582757194275_603748
----current applicationId is application_1582757194275_603748----
。。。。。。
container_e41_1582757194275_603748_01_000007 n46-08.fn.ams.osa
。。。。。。

  2、查看container日志,如下

sdev@n-adx-hadoop-client-3:~/liujichao$ yarn logs -containerId container_e41_1582757194275_603748_01_000007
................
20/11/26 01:26:02 INFO TaskMemoryManager: Memory used in task 933
20/11/26 01:26:02 INFO TaskMemoryManager: Acquired by org.apache.spark.unsafe.map.BytesToBytesMap@1d0a1b1b: 656.0 MB
20/11/26 01:26:02 INFO TaskMemoryManager: 0 bytes of memory were used by task 933 but are not associated with specific consumers
20/11/26 01:26:02 INFO TaskMemoryManager: 1526726656 bytes of memory are used for execution and 698476 bytes of memory are used for storage
20/11/26 01:26:02 INFO UnsafeExternalSorter: Thread 255 spilling sort data of 64.0 MB to disk (0  time so far)
20/11/26 01:26:10 INFO UnsafeExternalSorter: Thread 255 spilling sort data of 64.0 MB to disk (0  time so far)
20/11/26 01:26:10 INFO CodeGenerator: Code generated in 11.956018 ms
20/11/26 01:26:34 INFO UnsafeExternalSorter: Thread 253 spilling sort data of 128.0 MB to disk (0  time so far)
20/11/26 01:28:03 INFO Executor: Finished task 43.0 in stage 2.0 (TID 933). 3059 bytes result sent to driver
20/11/26 01:28:27 INFO Executor: Finished task 9.0 in stage 2.0 (TID 899). 3858 bytes result sent to driver
20/11/26 01:28:40 ERROR TransportChannelHandler: Connection to n44-12.fn.ams.osa/172.17.64.22:46633 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong.
20/11/26 01:28:40 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from n44-12.fn.ams.osa/172.17.64.22:46633 is closed
20/11/26 01:28:40 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2 outstanding blocks after 5000 ms
20/11/26 01:28:45 INFO TransportClientFactory: Found inactive connection to n44-12.fn.ams.osa/172.17.64.22:46633, creating a new one.
20/11/26 01:28:45 INFO TransportClientFactory: Successfully created connection to n44-12.fn.ams.osa/172.17.64.22:46633 after 1 ms (0 ms spent in bootstraps)
20/11/26 01:30:45 ERROR TransportChannelHandler: Connection to n44-12.fn.ams.osa/172.17.64.22:46633 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong.
20/11/26 01:30:45 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from n44-12.fn.ams.osa/172.17.64.22:46633 is closed
20/11/26 01:30:45 ERROR OneForOneBlockFetcher: Failed while starting block fetches
java.io.IOException: Connection from n44-12.fn.ams.osa/172.17.64.22:46633 closed
    at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128)
    at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:109)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
    at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:257)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
    at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:828)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:621)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
20/11/26 01:30:45 INFO RetryingBlockFetcher: Retrying fetch (2/3) for 2 outstanding blocks after 5000 ms
20/11/26 01:30:50 INFO TransportClientFactory: Found inactive connection to n44-12.fn.ams.osa/172.17.64.22:46633, creating a new one.
20/11/26 01:30:50 INFO TransportClientFactory: Successfully created connection to n44-12.fn.ams.osa/172.17.64.22:46633 after 1 ms (0 ms spent in bootstraps)
20/11/26 01:31:35 INFO UnsafeExternalSorter: Thread 254 spilling sort data of 128.0 MB to disk (0  time so far)
20/11/26 01:33:32 INFO Executor: Finished task 26.0 in stage 2.0 (TID 916). 3059 bytes result sent to driver
20/11/26 01:36:49 INFO CoarseGrainedExecutorBackend: Got assigned task 955

  3、根据提示信息,我们增大spark.executor.heartbeatInterval与spark.network.timeout,但spark.executor.heartbeatInterval不能大于spark.network.timeout即可

三、解决方法

  情况一、 一般遇到这种问题提高executor内存即可,我们只是调大了--executor-memory 16G
  1、减少shuffle数据
    思考是否可以使用map side join或是broadcast join来规避shuffle的产生。将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。
  2、SparkSQL和DataFrame的join,group by等操作
    通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度提高这个值。
  3、Rdd的join,groupBy,reduceByKey等操作
  通过spark.default.parallelism控制shuffle read与reduce处理的分区数,默认为运行任务的core的总数(mesos细粒度模式为8个,local模式为本地的core总数),官方建议为设置成运行任务的core的2-3倍。
  4、提高executor的内存
  通过spark.executor.memory适当提高executor的memory值。
  -是否存在数据倾斜的问题
  空值是否已经过滤?异常数据(某个key数据特别大)是否可以单独处理?考虑改变数据分区规则

  情况二、增大spark.executor.heartbeatInterval与spark.network.timeout,但spark.executor.heartbeatInterval不能大于spark.network.timeout即可

四、参考文章
  1、https://www.jianshu.com/p/72547c1fe28b
  2、https://blog.csdn.net/lsshlsw/article/details/49155087


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: