在大數據框架Spark的源碼中我們使用addFile方法将一些文件分發給各個節點,當我們要訪問Spark作業中的文件,将使用Sparkfiles.get(fileName)找到它的下載位置,但是Spark隻提供給我們addFile方法,卻沒有提供deleteFile。我們知道addFile是SparkContext類的方法,而SparkContext是Spark功能的主要入口。SparkContext代表了與Spark集群的連接,可用于在該集群上創建RDD、累積器和廣播變量。每個JVM隻能活動一個SparkContext。
在SparkContext添加deleteFile方法下面是SparkContext中addFile的源碼,首先我們是不知道源碼中是怎麼操作這些文件的,我們将通過閱讀addFile的源碼來學習怎麼去添加deleteFile方法,我們隻有知道怎麼添加才知道怎麼去修改它,所謂觸類旁通。
def addFile(path: String, recursive: Boolean): Unit = {
val uri = new Path(path).toUri
val schemeCorrectedPath = uri.getScheme match {
case null | "local" => new File(path).getCanonicalFile.toURI.toString
case _ => path
}
val hadoopPath = new Path(schemeCorrectedPath)
val scheme = new URI(schemeCorrectedPath).getScheme
if (!Array("http", "https", "ftp").contains(scheme)) {
val fs = hadoopPath.getFileSystem(hadoopConfiguration)
val isDir = fs.getFileStatus(hadoopPath).isDirectory
if (!isLocal && scheme == "file" && isDir) {
throw new SparkException(s"addFile does not support local directories when not running "
"local mode.")
}
if (!recursive && isDir) {
throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not "
"turned on.")
}
} else {
Utils.validateURL(uri)
}
val key = if (!isLocal && scheme == "file") {
env.rpcEnv.fileServer.addFile(new File(uri.getPath))
} else {
schemeCorrectedPath
}
val timestamp = System.currentTimeMillis
if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
logInfo(s"Added file $path at $key with timestamp $timestamp")
Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf,
env.securityManager, hadoopConfiguration, timestamp, useCache = false)
postEnvironmentUpdate()
}
}
通過上面的源碼我們知道,是使用addedFiles 這個ConcurrentHashMap[用于存儲每個靜态文件/jar的URL以及文件的本地時間戳的
private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala
上面的學習我們已經知道了添加的方法,然後就是添加deleteFilele了,具體的實現如下:
我們已經了解了在SparkContext添加deleteFile方法,我們了解下NettyStreamManager。NettyStreamManager是StreamManager實現,用于服務于NettyRpcEnv中的文件。在這個管理器中可以注冊三種資源,都是由實際文件支持的。
def addFile(file: File): String
2. NettyStreamManager中addFile方法實現
override def addFile(file: File): String = {
val existingPath = files.putIfAbsent(file.getName, file)
require(existingPath == null || existingPath == file,
s"File ${file.getName} was already registered with a different path "
s"(old path = $existingPath, new path = $file")
s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}"
}
3. 跟addFile一樣,先去父類RpcEnvFileServer中添加deleteFile方法
4. 接下來我們将在NettyStreamManager中實現deleteFile方法,如下
我們知道Spark中的SQL解析是通過ANTLR4來解析成語法樹的,如果不清楚這個過程,可以閱讀我的這篇博客【Spark SQL解析過程以及Antlr4入門】來了解,所以我們如果要在Spark Sql也支持的話,那麼需要修改SqlBase.g4這個文件,添加DElETE
override def visitManageResource(ctx: ManageResourceContext): LogicalPlan = withOrigin(ctx) {
val mayebePaths = remainder(ctx.identifier).trim
ctx.op.getType match {
case SqlBaseParser.ADD =>
ctx.identifier.getText.toLowerCase match {
case "file" => AddFileCommand(mayebePaths)
case "jar" => AddJarCommand(mayebePaths)
case other => operationNotAllowed(s"ADD with resource type '$other'", ctx)
}
/*
*TODO 添加支持移除文件
* */
case SqlBaseParser.DELETE =>
ctx.identifier.getText.toLowerCase(Locale.ROOT) match {
case "file" => DeleteFileCommand(mayebePaths)
case other => operationNotAllowed(s"DELETE with resource type '$other'", ctx)
}
case SqlBaseParser.LIST =>
ctx.identifier.getText.toLowerCase match {
case "files" | "file" =>
if (mayebePaths.length > 0) {
ListFilesCommand(mayebePaths.split("\\s "))
} else {
ListFilesCommand()
}
case "jars" | "jar" =>
if (mayebePaths.length > 0) {
ListJarsCommand(mayebePaths.split("\\s "))
} else {
ListJarsCommand()
}
case other => operationNotAllowed(s"LIST with resource type '$other'", ctx)
}
case _ => operationNotAllowed(s"Other types of operation on resources", ctx)
}
}
找到org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver 這個類,然後修改
我們将源碼放到linux的服務器中去編譯,然後部署
./dev/make-distribution.sh --name 2.6.0-cdh5.14.2 --tgz -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver -Dhadoop.version=2.6.0-cdh5.14.2
如果你對源碼感興趣,關注我獲取已經修改好的源碼
,更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!