在编写Spark
应用时,有时需要使用到一些额外的jar
包、额外的资源文件,因此在每个Executor
上都需要一份files
的拷贝。那么files
是怎么被复制到各个Executor
上的呢?本文将基于Spark1.1.1
来分析Spark files
的传输流程。
1. 应用启动与参数传递
不考虑Driver
的启动流程,只关注files
的传递流程:
-
使用
spark-submit
启动应用时,可以多次指定--files=/path/to/files
参数[1] -
这个参数会被
org.apache.spark.deploy.SparkSubmit
写入到System Properties
中(多条路径以','分隔)[2],[3] -
随着应用以本地或集群模式启动,在
Driver
端会创建SparkContex
实例,SparkContex
会读取SparkConf
,将files
读取出来[4],根据URI Schema
对文件的URI
进行进一步处理并记录[5]:- 若为
null | file
,则将files
添加到HttpFileServer
[6]的文件目录中,此时files
会被copy
到Driver
端的一个临时目录。 - 若为
local
说明应用只在Driver
端以本地模式运行 - 其它,如
HTTP
,FTP
,HDFS
等则直接记录其URI即可
- 若为
SparkContext
添加文件代码片段:
/**
* Add a file to be downloaded with this Spark job on every node.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(path)` to find its download location.
*/
def addFile(path: String) {
val uri = new URI(path)
val key = uri.getScheme match {
case null | "file" => env.httpFileServer.addFile(new File(uri.getPath))
case "local" => "file:" + uri.getPath
case _ => path
}
addedFiles(key) = System.currentTimeMillis
}
2. 文件传输到Executor
考虑以HttpFileServer
方式传递文件到Executor
,可以知道Driver
端开了一个文件服务器来等待Executor
来获取文件。当然,文件的URL
会作为启动参数传递给Executor
。
不考虑Executor
是如何启动的,org.apache.spark.executor.Executor
中有一个launchTask
方法,该方法在Executor
内执行一个Task
。[7]
def launchTask(
context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, taskName, serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
Task
在执行时,会下载文件依赖[8]
override def run() {
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
}
/**
* Download any missing dependencies if we receive a new set of files and JARs from the
* SparkContext. Also adds any new JARs we fetched to the class loader.
*/
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
synchronized {
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
}
}
Utils.fetchFile
的具体实现不用考虑,就是下载文件到指定目录而已。关键的是文件被下载到了哪个目录,SparkFiles.getRootDirectory
是在哪呢?[8]
3. Executor
的工作目录
在SparkEnv
中
// Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
// directory.
val sparkFilesDir: String = if (isDriver) {
Utils.createTempDir().getAbsolutePath
} else {
"."
}
所以是被下载到了Executor
运行时的pwd
。Executor
是由ExecutorRunner
启动的,[9]
/**
* Download and run the executor described in our ApplicationDescription
*/
def fetchAndRunExecutor() {
try {
// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
// Launch the process
val command = getCommandSeq
val builder = new ProcessBuilder(command: _*).directory(executorDir)
}
}
Executor
的working dir
是new File(workDir, appId + "/" + execId)
,workDir
指的是ExecutorRunner
的working dir
。
而Executor
在Worker
中实例化[10]
def createWorkDir() {
workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
}
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => {
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
}
所以files
的存放路径为$SPARK_HOME/work/appId/execId
(完)