在编写Spark应用时,有时需要使用到一些额外的jar包、额外的资源文件,因此在每个Executor上都需要一份files的拷贝。那么files是怎么被复制到各个Executor上的呢?本文将基于Spark1.1.1来分析Spark files的传输流程。

1. 应用启动与参数传递

不考虑Driver的启动流程,只关注files的传递流程:

  1. 使用spark-submit启动应用时,可以多次指定--files=/path/to/files参数[1]

  2. 这个参数会被org.apache.spark.deploy.SparkSubmit写入到System Properties中(多条路径以','分隔)[2],[3]

  3. 随着应用以本地或集群模式启动,在Driver端会创建SparkContex实例,SparkContex会读取SparkConf,将files读取出来[4],根据URI Schema对文件的URI进行进一步处理并记录[5]

    • 若为null | file,则将files添加到HttpFileServer[6]的文件目录中,此时files会被copyDriver端的一个临时目录。
    • 若为local说明应用只在Driver端以本地模式运行
    • 其它,如HTTPFTPHDFS等则直接记录其URI即可

SparkContext添加文件代码片段:

/**
 * Add a file to be downloaded with this Spark job on every node.
 * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
 * filesystems), or an HTTP, HTTPS or FTP URI.  To access the file in Spark jobs,
 * use `SparkFiles.get(path)` to find its download location.
 */
def addFile(path: String) {
  val uri = new URI(path)
  val key = uri.getScheme match {
    case null | "file" => env.httpFileServer.addFile(new File(uri.getPath))
    case "local"       => "file:" + uri.getPath
    case _             => path
  }
  addedFiles(key) = System.currentTimeMillis
}

2. 文件传输到Executor

考虑以HttpFileServer方式传递文件到Executor,可以知道Driver端开了一个文件服务器来等待Executor来获取文件。当然,文件的URL会作为启动参数传递给Executor

不考虑Executor是如何启动的,org.apache.spark.executor.Executor中有一个launchTask方法,该方法在Executor内执行一个Task[7]

def launchTask(
      context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
  val tr = new TaskRunner(context, taskId, taskName, serializedTask)
  runningTasks.put(taskId, tr)
  threadPool.execute(tr)
}

Task在执行时,会下载文件依赖[8]

override def run() {
   val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
   updateDependencies(taskFiles, taskJars)
}
/**
 * Download any missing dependencies if we receive a new set of files and JARs from the
 * SparkContext. Also adds any new JARs we fetched to the class loader.
 */
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
  synchronized {
    // Fetch missing dependencies
    for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
      Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
    }
}

Utils.fetchFile的具体实现不用考虑,就是下载文件到指定目录而已。关键的是文件被下载到了哪个目录,SparkFiles.getRootDirectory是在哪呢?[8]

3. Executor的工作目录

SparkEnv

// Set the sparkFiles directory, used when downloading dependencies.  In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
// directory.
val sparkFilesDir: String = if (isDriver) {
  Utils.createTempDir().getAbsolutePath
} else {
  "."
}

所以是被下载到了Executor运行时的pwdExecutor是由ExecutorRunner启动的,[9]

/**
 * Download and run the executor described in our ApplicationDescription
 */
def fetchAndRunExecutor() {
  try {
    // Create the executor's working directory
    val executorDir = new File(workDir, appId + "/" + execId)
    // Launch the process
    val command = getCommandSeq
    val builder = new ProcessBuilder(command: _*).directory(executorDir)
  }
}

Executorworking dirnew File(workDir, appId + "/" + execId)workDir指的是ExecutorRunnerworking dir

ExecutorWorker中实例化[10]

def createWorkDir() {
  workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
}
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => {
  val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
        self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING)
  executors(appId + "/" + execId) = manager
  manager.start()
}

所以files的存放路径为$SPARK_HOME/work/appId/execId

(完)