在编写Spark应用时,有时需要使用到一些额外的jar包、额外的资源文件,因此在每个Executor上都需要一份files的拷贝。那么files是怎么被复制到各个Executor上的呢?本文将基于Spark1.1.1来分析Spark files的传输流程。
1. 应用启动与参数传递
不考虑Driver的启动流程,只关注files的传递流程:
-
使用
spark-submit启动应用时,可以多次指定--files=/path/to/files参数[1] -
这个参数会被
org.apache.spark.deploy.SparkSubmit写入到System Properties中(多条路径以','分隔)[2],[3] -
随着应用以本地或集群模式启动,在
Driver端会创建SparkContex实例,SparkContex会读取SparkConf,将files读取出来[4],根据URI Schema对文件的URI进行进一步处理并记录[5]:- 若为
null | file,则将files添加到HttpFileServer[6]的文件目录中,此时files会被copy到Driver端的一个临时目录。 - 若为
local说明应用只在Driver端以本地模式运行 - 其它,如
HTTP,FTP,HDFS等则直接记录其URI即可
- 若为
SparkContext添加文件代码片段:
/**
* Add a file to be downloaded with this Spark job on every node.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(path)` to find its download location.
*/
def addFile(path: String) {
val uri = new URI(path)
val key = uri.getScheme match {
case null | "file" => env.httpFileServer.addFile(new File(uri.getPath))
case "local" => "file:" + uri.getPath
case _ => path
}
addedFiles(key) = System.currentTimeMillis
}
2. 文件传输到Executor
考虑以HttpFileServer方式传递文件到Executor,可以知道Driver端开了一个文件服务器来等待Executor来获取文件。当然,文件的URL会作为启动参数传递给Executor。
不考虑Executor是如何启动的,org.apache.spark.executor.Executor中有一个launchTask方法,该方法在Executor内执行一个Task。[7]
def launchTask(
context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, taskName, serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
Task在执行时,会下载文件依赖[8]
override def run() {
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
}
/**
* Download any missing dependencies if we receive a new set of files and JARs from the
* SparkContext. Also adds any new JARs we fetched to the class loader.
*/
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
synchronized {
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
}
}
Utils.fetchFile的具体实现不用考虑,就是下载文件到指定目录而已。关键的是文件被下载到了哪个目录,SparkFiles.getRootDirectory是在哪呢?[8]
3. Executor的工作目录
在SparkEnv中
// Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
// directory.
val sparkFilesDir: String = if (isDriver) {
Utils.createTempDir().getAbsolutePath
} else {
"."
}
所以是被下载到了Executor运行时的pwd。Executor是由ExecutorRunner启动的,[9]
/**
* Download and run the executor described in our ApplicationDescription
*/
def fetchAndRunExecutor() {
try {
// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
// Launch the process
val command = getCommandSeq
val builder = new ProcessBuilder(command: _*).directory(executorDir)
}
}
Executor的working dir是new File(workDir, appId + "/" + execId),workDir指的是ExecutorRunner的working dir。
而Executor在Worker中实例化[10]
def createWorkDir() {
workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
}
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => {
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
}
所以files的存放路径为$SPARK_HOME/work/appId/execId
(完)