Spark MapOutputTracker

刘超 6天前 ⋅ 55 阅读   编辑

  基于spark2.0.2

一、概述

  Spark的shuffle过程分为writer和reader两块。 writer负责生成中间数据,reader负责整合中间数据。而中间数据的元信息,则由MapOutputTracker负责管理。 它负责writer和reader的沟通。shuffle writer会将中间数据保存到Block里面,然后将数据的位置发送给MapOutputTracker。shuffle reader通过向 MapOutputTracker获取中间数据的位置之后,才能读取到数据。以下图为例,

 

  rdd1 经过shuffle 生成 rdd2。rdd1有三个分区,对应着三个ShuffleMapTask。每一个ShuffleMapTask执行的结果,对应着一个MapStatus。以rdd1 的 parition 0 分区为例,可以看到它把该分区的数据,根据分区器分成三部分,对应着rdd2 的分区。

  shuffle reader 需要读取rdd1 shuffle的中间数据,才能生成 rdd2。 以rdd2的partition 0 分区为例,它需要 rdd1 计算出的数据,找到所有reduce0 的数据。这些数据的位置,需要从MapOutputTracker获取,需要提供 shuffleId, mapId, reduceId才能确定一个中间数据。shuffleId表示此次shuffle的唯一id。mapId表示map端 rdd的分区索引,表示由哪个父分区产生的数据。reduceId表示reduce端的分区索引,表示属于子分区的那部分数据

二、输出信息 MapStatus

  MapStatus类表示一个ShuffleMapTask执行返回的结果。MapStatus包含了这个ShuffleMapTask的数据输出信息。它有两个方法

  1、location , 返回数据存储所在 BlockManager 的 Id
  2、getSizeForBlock, 返回 shufle中指定 reduceId 的那部分数据的大小。注意这个值是有精度误差的
  以上图的rdd1 的 parition 0 分区为例,它对应着一个ShuffleMapTask。ShuffleMapTask会返回一个MapStatus,该MapStatus会包含了三块数据,分别是reduce0, reduce1, reduce2.

  因为一个MapStatus会包含多个reduce的数据信息,这样会占用太多的内存, 所以MapStatus会将结果进行压缩,主要是对长度的压缩。对于不同的reduce数量,对应着不同的子类,CompressedMapStatus 和 HighlyCompressedMapStatus。当reduceId超过了2000, 就使用HighlyCompressedMapStatus。否则使用CompressedMapStatus,如下

def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
    if (uncompressedSizes.length > 2000) {
      HighlyCompressedMapStatus(loc, uncompressedSizes)
    } else {
      new CompressedMapStatus(loc, uncompressedSizes)
    }
  }

  首先来看看MapStatus压缩数据长度的原理。 对于Long类型的长度,经过log数学运算, 转换为只占一个字节的Byte类型 。虽然结果压缩了,但是精确度却有一定的损失。算法如下,通过对数的方式压缩成整数类型,最大值为255。最大可以表示35GB的长度。

private[spark] object MapStatus {
    private[this] val LOG_BASE = 1.1
    
    def compressSize(size: Long): Byte = {
     if (size == 0) {
        0
     } else if (size <= 1L) {
        1
     } else {
        math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
     }
  }

三、CompressedMapStatus 压缩原理

  CompressedMapStatus的原理比较简单,只是将长度压缩保存。

 private[spark] class CompressedMapStatus(
    private[this] var loc: BlockManagerId,
    private[this] var compressedSizes: Array[Byte])
  extends MapStatus with Externalizable {

  protected def this() = this(null, null.asInstanceOf[Array[Byte]])  // For deserialization only
  // 接收Long类型的数组,数组的索引是reduceId,索引项是对应的长度
  def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
    // 调用上面的compressSize方法, 将Long类型的长度,转换为Byte类型
    this(loc, uncompressedSizes.map(MapStatus.compressSize))
  }

  override def location: BlockManagerId = loc

  override def getSizeForBlock(reduceId: Int): Long = {
    // 获取压缩的数据长度,然后还原成Long类型
    MapStatus.decompressSize(compressedSizes(reduceId))
  }

  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
    loc.writeExternal(out)
    out.writeInt(compressedSizes.length)
    out.write(compressedSizes)
  }

  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
    loc = BlockManagerId(in)
    val len = in.readInt()
    compressedSizes = new Array[Byte](len)
    in.readFully(compressedSizes)
  }

四、HighlyCompressedMapStatus 压缩原理

  HighlyCompressedMapStatus适用于reduce数量比较大的情况,直接返回平均值。

private[spark] class HighlyCompressedMapStatus private (
    private[this] var loc: BlockManagerId,
    // 哪些reduce数据不为空的数目
    private[this] var numNonEmptyBlocks: Int,
    // reduce数据为空的集合,这里使用了RoaringBitmap,也是为了节省内存// reduce数据为空的集合,这里使用了RoaringBitmap,也是为了节省内存
    private[this] var emptyBlocks: RoaringBitmap,
    // 平均长度
    private[this] var avgSize: Long)
  extends MapStatus with Externalizable {

  // loc could be null when the default constructor is called during deserialization
  require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
    "Average size can only be zero for map stages that produced no output")

  protected def this() = this(null, -1, null, -1)  // For deserialization only

  override def location: BlockManagerId = loc

  override def getSizeForBlock(reduceId: Int): Long = {
    // 如果reduce对应的数据为空,则直接返回0
    if (emptyBlocks.contains(reduceId)) {
      0
    } else {
      // 否则,返回平局长度
      avgSize
    }
  }

  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
    loc.writeExternal(out)
    emptyBlocks.writeExternal(out)
    out.writeLong(avgSize)
  }

  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
    loc = BlockManagerId(in)
    emptyBlocks = new RoaringBitmap()
    emptyBlocks.readExternal(in)
    avgSize = in.readLong()
  }
}

private[spark] object HighlyCompressedMapStatus {
  def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = {
    // We must keep track of which blocks are empty so that we don't report a zero-sized
    // block as being non-empty (or vice-versa) when using the average block size.
    var i = 0
    var numNonEmptyBlocks: Int = 0
    var totalSize: Long = 0
    // From a compression standpoint, it shouldn't matter whether we track empty or non-empty
    // blocks. From a performance standpoint, we benefit from tracking empty blocks because
    // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
    val emptyBlocks = new RoaringBitmap()
    val totalNumBlocks = uncompressedSizes.length
    while (i < totalNumBlocks) {
      var size = uncompressedSizes(i)
      if (size > 0) {
        numNonEmptyBlocks += 1
        totalSize += size
      } else {
        emptyBlocks.add(i)
      }
      i += 1
    }
    val avgSize = if (numNonEmptyBlocks > 0) {
      totalSize / numNonEmptyBlocks
    } else {
      0
    }
    emptyBlocks.trim()
    emptyBlocks.runOptimize()
    new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
  }
}

五、MapOutputTracker 相关类

 

  MapOutputTrackerMaster是运行在driver节点上的,它管理着shuffle的中间数据信息。MapOutputTrackerWorker是运行在executor节点上的,它会向driver请求shuffle中间数据的信息。MapOutputTrackerMasterEndpoint是运行在driver节点上的Rpc服务。

  1、Driver节点的MapOutputTracker

  MapOutputTrackerMaster继承MapStatus, 运行在driver节点上。管理所有shuffle的数据信息。所有的shuffle中间数据,都必须在MapOutputTrackerMaster登记。它有两个主要的属性
  a、mapStatuses, 类型 ConcurrentHashMap[Int, Array[MapStatus]], Key为shuffleId, Value为该shuffle的MapStatus列表
  b、shuffleIdLocks, 类型为ConcurrentHashMap[Int, AnyRef], Key为shuffleId, Value为普通的Object实例,仅仅作为锁存在。

  新增shuffle接口

def registerShuffle(shuffleId: Int, numMaps: Int) {
  if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined) {
    throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
  }
  shuffleIdLocks.putIfAbsent(shuffleId, new Object())
}

  新增MapStatus接口

def registerMapOutputs(shuffleId: Int, statuses: Array[MapStatus], changeEpoch: Boolean = false) {
  mapStatuses.put(shuffleId, statuses.clone())
  if (changeEpoch) {
    incrementEpoch()
  }
}

  MapOutputTrackerMaster有一个队列,存储着请求MapStatus的消息。MapOutputTrackerMasterEndpoint在收到请求后,会将请求添加到这个队列里。MapOutputTrackerMaster还有着一个线程池,来处理队列的消息。

class MapOutputTrackerMaster {

  // 请求队列
  private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]

  // 处理请求的线程池
  private val threadpool: ThreadPoolExecutor = {
    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    for (i <- 0 until numThreads) {
      pool.execute(new MessageLoop)
    }
    pool
  }
  
  // MessageLoop线程,处理请求
  private class MessageLoop extends Runnable {
    override def run(): Unit = {
      try {
        while (true) {
          try {
            // 取出请求
            val data = mapOutputRequests.take()
             if (data == PoisonPill) {
              // Put PoisonPill back so that other MessageLoops can see it.
              mapOutputRequests.offer(PoisonPill)
              return
            }
            val context = data.context
            val shuffleId = data.shuffleId

            // 获取该shuffle对应的MapStatus
            val mapOutputStatuses = getSerializedMapOutputStatuses(shuffleId)
            // 返回结果给rpc客户端
            context.reply(mapOutputStatuses)
          } catch {
            case NonFatal(e) => logError(e.getMessage, e)
          }
        }
      } catch {
        case ie: InterruptedException => // exit
      }
    }
  }
}

  上面调用了getSerializedMapOutputStatuses函数,获取请求结果。它涉及到了缓存和序列化。这里缓存了每次的请求结果。对于缓存,必然涉及到缓存失效的问题。MapOutputTrackerMaster使用了epoch代表着数据的版本,cacheEpoch代表着缓存的版本。如果epoch 大于 cacheEpoch, 则表示缓存失效,需要重新获取。

private[spark] class MapOutputTrackerMaste {
  // 缓存版本
  private var cacheEpoch = epoch
  
  // 缓存请求结果,注意到结果是Byte类型,是序列化之后的数据
  private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala
   
  def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
    // 该shuffleId 对应的 MapStatus列表
    var statuses: Array[MapStatus] = null
    // 序列化的结果
    var retBytes: Array[Byte] = null
    var epochGotten: Long = -1

    // 检查是否缓存有效,如果有效则返回true,并且设置retBytes的值
    // 如果失效,则返回false
    def checkCachedStatuses(): Boolean = {
      epochLock.synchronized {
        // 如果缓存失效,则清除缓存,并且保持cacheEpoch与epoch一致
        if (epoch > cacheEpoch) {
          cachedSerializedStatuses.clear()
          clearCachedBroadcast()
          cacheEpoch = epoch
        }
        
        cachedSerializedStatuses.get(shuffleId) match {
          // 如果有对应的shuffle缓存,则返回true,设置retBytes的值
          case Some(bytes) =>
            retBytes = bytes
            true
            
          // 如果没有对应的shuffle缓存,则从mapStatuses取出MapStatues
          case None =>
            logDebug("cached status not found for : " + shuffleId)
            statuses = mapStatuses.getOrElse(shuffleId, Array.empty[MapStatus])
            epochGotten = epoch
            false
        }
      }
    }
      
    // 调用checkCachedStatuses, 查看是否有缓存
    if (checkCachedStatuses()) return retBytes  
    
    // 序列化MapStatues列表
    var shuffleIdLock = shuffleIdLocks.get(shuffleId)
      shuffleIdLock.synchronized {
         if (checkCachedStatuses()) return retBytes
         // 序列化
         val (bytes, bcast) = MapOutputTracker.serializeMapStatuses(statuses, broadcastManager,
             isLocal, minSizeForBroadcast)
         // 更新缓存
         epochLock.synchronized {
           if (epoch == epochGotten) {
              cachedSerializedStatuses(shuffleId) = bytes
              if (null != bcast) cachedSerializedBroadcast(shuffleId) = bcast
           } else {
              logInfo("Epoch changed, not caching!")
              removeBroadcast(bcast)
          }
      }
     // 返回序列化的结果
     bytes
   }
}

  2、Executor节点的MapOutputTracker

MapOutputTrackerWorker继承MapOutputTracker,运行在Executor节点。它同样有mapStatuses属性,但这里是表示executor节点的缓存。它提供了getStatuses方法,提供给Executor节点,获取指定shuffle的MapStatus。getStatuses方法,会优先从本地缓存mapStatuses获取,如果没有,则发送GetMapOutputStatuses请求给driver。

// 正在请求的shuffleId集合
private val fetching = new HashSet[Int]

private def getStatuses(shuffleId: Int): Array[MapStatus] = {
  // 试图从缓存mapStatuses获取结果
  val statuses = mapStatuses.get(shuffleId).orNull
  // 如果mapStatuses没有shuffleId的数据,则会向dirver请求
  if (statuses == null) {
    var fetchedStatuses: Array[MapStatus] = null
    fetching.synchronized {
      // 如果已经在请求该shuffle的MapStatus
      while (fetching.contains(shuffleId)) {
        try {
          // 等待通知
          fetching.wait()
        } catch {
          case e: InterruptedException =>
        }
      }

      // 判断shuffleId的结果是否已经获取到了
      fetchedStatuses = mapStatuses.get(shuffleId).orNull
      if (fetchedStatuses == null) {
        // 没有获取到,则添加到fetching集合
        fetching += shuffleId
      }
    }

    if (fetchedStatuses == null) {
     
      try {
        // askTracker方法里,会去调用Rpc的客户端,发送GetMapOutputStatuses请求
        val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
        // 将字节反序列化,得到MapStatus列表
        fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
        // 将结果添加到mapStatuses
        mapStatuses.put(shuffleId, fetchedStatuses)
      } finally {
        fetching.synchronized {
          // 通知此shuffleId请求完成
          fetching -= shuffleId
          fetching.notifyAll()
        }
      }
    }

    if (fetchedStatuses != null) {
      return fetchedStatuses
    } else {
      // 如果请求结果,没有该shuffle的输出数据,则抛出异常
      logError("Missing all output locations for shuffle " + shuffleId)
      throw new MetadataFetchFailedException(
        shuffleId, -1, "Missing all output locations for shuffle " + shuffleId)
    }
  } else {
    return statuses
  }
}

六、参考文章

  1、https://github.com/apache/spark/blob/v2.0.2/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
  2、https://zhmin.github.io/2019/01/18/spark-map-output-tracker/


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: