tft每日頭條

 > 生活

 > spark讀寫流程

spark讀寫流程

生活 更新时间:2024-07-30 18:25:19
修改源碼的背景

在大數據框架Spark的源碼中我們使用addFile方法将一些文件分發給各個節點,當我們要訪問Spark作業中的文件,将使用Sparkfiles.get(fileName)找到它的下載位置,但是Spark隻提供給我們addFile方法,卻沒有提供deleteFile。我們知道addFile是SparkContext類的方法,而SparkContext是Spark功能的主要入口。SparkContext代表了與Spark集群的連接,可用于在該集群上創建RDD、累積器和廣播變量。每個JVM隻能活動一個SparkContext。

在SparkContext添加deleteFile方法

下面是SparkContext中addFile的源碼,首先我們是不知道源碼中是怎麼操作這些文件的,我們将通過閱讀addFile的源碼來學習怎麼去添加deleteFile方法,我們隻有知道怎麼添加才知道怎麼去修改它,所謂觸類旁通。

spark讀寫流程(Spark源碼中添加deleteFile方法)1

def addFile(path: String, recursive: Boolean): Unit = { val uri = new Path(path).toUri val schemeCorrectedPath = uri.getScheme match { case null | "local" => new File(path).getCanonicalFile.toURI.toString case _ => path } val hadoopPath = new Path(schemeCorrectedPath) val scheme = new URI(schemeCorrectedPath).getScheme if (!Array("http", "https", "ftp").contains(scheme)) { val fs = hadoopPath.getFileSystem(hadoopConfiguration) val isDir = fs.getFileStatus(hadoopPath).isDirectory if (!isLocal && scheme == "file" && isDir) { throw new SparkException(s"addFile does not support local directories when not running " "local mode.") } if (!recursive && isDir) { throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " "turned on.") } } else { Utils.validateURL(uri) } val key = if (!isLocal && scheme == "file") { env.rpcEnv.fileServer.addFile(new File(uri.getPath)) } else { schemeCorrectedPath } val timestamp = System.currentTimeMillis if (addedFiles.putIfAbsent(key, timestamp).isEmpty) { logInfo(s"Added file $path at $key with timestamp $timestamp") Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, hadoopConfiguration, timestamp, useCache = false) postEnvironmentUpdate() } }

通過上面的源碼我們知道,是使用addedFiles 這個ConcurrentHashMap[用于存儲每個靜态文件/jar的URL以及文件的本地時間戳的

private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala

上面的學習我們已經知道了添加的方法,然後就是添加deleteFilele了,具體的實現如下:

spark讀寫流程(Spark源碼中添加deleteFile方法)2

NettyStreamManager中添加deleteFile方法實現

我們已經了解了在SparkContext添加deleteFile方法,我們了解下NettyStreamManager。NettyStreamManager是StreamManager實現,用于服務于NettyRpcEnv中的文件。在這個管理器中可以注冊三種資源,都是由實際文件支持的。

  • - "/files":一個扁平的文件列表;作為SparkContext.addFile的後端。
  • - "/jars":一個扁平的文件列表;作為SparkContext.addJar的後端。
  • - 任意目錄;該目錄下的所有文件通過管理器變得可用,尊重目錄的層次結構。隻支持流媒體(openStream)。
  1. 我們還是先看addFile的源碼,先看父類RpcEnvFileServer的接口,RpcEnv用來向應用程序所擁有的其他進程提供文件的服務器。該file Server可以返回由普通庫處理的URI(如 "http "或 "hdfs"),也可以返回由RpcEnv#fetchFile處理的 "spark "URI。

def addFile(file: File): String

2. NettyStreamManager中addFile方法實現

override def addFile(file: File): String = { val existingPath = files.putIfAbsent(file.getName, file) require(existingPath == null || existingPath == file, s"File ${file.getName} was already registered with a different path " s"(old path = $existingPath, new path = $file") s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}" }

3. 跟addFile一樣,先去父類RpcEnvFileServer中添加deleteFile方法

spark讀寫流程(Spark源碼中添加deleteFile方法)3

4. 接下來我們将在NettyStreamManager中實現deleteFile方法,如下

spark讀寫流程(Spark源碼中添加deleteFile方法)4

修改Antlr4的sqlBase.g4文件

我們知道Spark中的SQL解析是通過ANTLR4來解析成語法樹的,如果不清楚這個過程,可以閱讀我的這篇博客【Spark SQL解析過程以及Antlr4入門】來了解,所以我們如果要在Spark Sql也支持的話,那麼需要修改SqlBase.g4這個文件,添加DElETE

spark讀寫流程(Spark源碼中添加deleteFile方法)5

修改SparkSqlParser解析器
  • 找到org.apache.spark.sql.execution.SparkSqlParser類,添加對移除文件的支持。SparkSqlParser是Spark SQL語句的具體解析器。

override def visitManageResource(ctx: ManageResourceContext): LogicalPlan = withOrigin(ctx) { val mayebePaths = remainder(ctx.identifier).trim ctx.op.getType match { case SqlBaseParser.ADD => ctx.identifier.getText.toLowerCase match { case "file" => AddFileCommand(mayebePaths) case "jar" => AddJarCommand(mayebePaths) case other => operationNotAllowed(s"ADD with resource type '$other'", ctx) } /* *TODO 添加支持移除文件 * */ case SqlBaseParser.DELETE => ctx.identifier.getText.toLowerCase(Locale.ROOT) match { case "file" => DeleteFileCommand(mayebePaths) case other => operationNotAllowed(s"DELETE with resource type '$other'", ctx) } case SqlBaseParser.LIST => ctx.identifier.getText.toLowerCase match { case "files" | "file" => if (mayebePaths.length > 0) { ListFilesCommand(mayebePaths.split("\\s ")) } else { ListFilesCommand() } case "jars" | "jar" => if (mayebePaths.length > 0) { ListJarsCommand(mayebePaths.split("\\s ")) } else { ListJarsCommand() } case other => operationNotAllowed(s"LIST with resource type '$other'", ctx) } case _ => operationNotAllowed(s"Other types of operation on resources", ctx) } }

  • 再找到resources.scala文件

spark讀寫流程(Spark源碼中添加deleteFile方法)6

修改SparkSQLCLIDriver

找到org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver 這個類,然後修改

spark讀寫流程(Spark源碼中添加deleteFile方法)7

源碼編譯

我們将源碼放到linux的服務器中去編譯,然後部署

./dev/make-distribution.sh --name 2.6.0-cdh5.14.2 --tgz -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver -Dhadoop.version=2.6.0-cdh5.14.2

驗證是否生效

spark讀寫流程(Spark源碼中添加deleteFile方法)8

如果你對源碼感興趣,關注我獲取已經修改好的源碼

,

更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!

查看全部

相关生活资讯推荐

热门生活资讯推荐

网友关注

Copyright 2023-2024 - www.tftnews.com All Rights Reserved