Facebook 經常使用分析來進行數據驅動的決策。在過去的幾年裡,用戶和産品都得到了增長,使得我們分析引擎中單個查詢的數據量達到了數十TB。我們的一些批處理分析都是基于 Hive 平台(Apache Hive 是 Facebook 在2009年貢獻給社區的)和 Corona( Facebook 内部的 MapReduce 實現)進行的。Facebook 還針對包括 Hive 在内的多個内部數據存儲,繼續增加了其 Presto 的 ANSI-SQL 查詢的覆蓋範圍。Facebook 内部還支持其他類型的分析,如圖計算、機器學習(Apache Giraph)和流處理(如 Puma、Swift 和 Stylus)。
盡管 Facebook 提供的服務涵蓋了分析領域的廣泛領域,但我們仍在不斷地與開源社區互動,以分享我們的經驗,并向他人學習。Apache Spark 于2009年由加州大學伯克利分校(UC-Berkeley)的 Matei Zaharia 創辦,并于2013年貢獻給 Apache。它是目前增長最快的數據處理平台之一,因為它能夠支持流處理、批處理、命令式(RDD)、聲明式(SQL)、圖計算和機器學習用例,所有這些都在相同的 API 和底層計算引擎中。Spark 可以有效地利用大量内存,跨整個管道(pipelines)優化代碼,并跨任務(tasks)重用 jvm 以獲得更好的性能。Facebook 認為 Spark 已經成熟到可以在許多批處理用例中與 Hive 進行比較的地步。在本文的後面部分,将介紹 Facebook 使用 Spark 替代 Hive 的經驗和教訓。
用例:為實體排序(entity ranking)做特性準備實時實體排名在 Facebook 有着多種使用場景。對于一些在線服務平台,原始的特性值是使用 Hive 離線生成的,并将生成的數據加載到這些實時關聯查詢系統中。這些 Hive 作業是數年前開發的,占用了大量的計算資源,并且難以維護,因為這些作業被拆分成數百個 Hive 小作業。為了使得業務能夠使用到新的特征數據,并且讓系統變得可維護,我們開始着手将這些作業遷移到 Spark 中。
以前的 Hive 作業實現
基于 Hive 的作業由三個邏輯階段組成,每個階段對應數百個由 entity_id 分割的較小 Hive 作業,因為為每個階段運行較大的 Hive 作業不太可靠,并且受到每個作業的最大任務數限制。具體如下:
以上三個邏輯階段可以概括如下:
基于 Hive 構建索引的作業大約需要運行三天。管理起來也很有挑戰性,因為這條管道包含數百個分片作業,因此很難進行監控。沒有簡單的方法來衡量作業的整體進度或計算 ETA。考慮到現有 Hive 作業的上述局限性,我們決定嘗試使用 Spark 來構建一個更快、更易于管理的作業。
Spark 實現如果使用 Spark 全部替換上面的作業可能會很慢,并且很有挑戰性,需要大量的資源。所以我們首先将焦點投入在 Hive 作業中資源最密集的部分:第二階段。我們從50GB的壓縮輸入樣本開始,然後逐步擴展到 300 GB、1 TB 和20 TB。在每次增加大小時,我們都解決了性能和穩定性問題,但是嘗試 20 TB 時我們發現了最大改進的地方。
在運行 20 TB 的輸入時,我們發現由于任務太多,生成了太多的輸出文件(每個文件的大小大約為100 MB)。在作業運行的10個小時中,有3個小時用于将文件從 staging 目錄移動到 HDFS 中的最終目錄。最初,我們考慮了兩個方案:要麼改進 HDFS 中的批量重命名以支持我們的用例;要麼配置 Spark 以生成更少的輸出文件(這一階段有大量的任務——70,000個)。經過認真思考,我們得到了第三種方案。由于我們在作業的第二步中生成的 tmp_table2 表是臨時的,并且隻用于存儲作業的中間輸出。最後,我們把上面 Hive 實現的三個階段的作業用一個 Spark 作業表示,該作業讀取 60 TB 的壓縮數據并執行 90 TB的 shuffle 和排序,最後的 Spark job 如下:
當然,在如此大的數據量上運行單個 Spark 作業在第一次嘗試甚至第十次嘗試時都不會起作用。據我們所知,這是生産環境中 shuffle 數據量最大的 Spark 作業(Databricks 的 PB 級排序是在合成數據上進行的)。我們對 Spark 内核和應用程序進行了大量的改進和優化,才使這項工作得以運行。這項工作的好處在于,其中許多改進都适用于 Spark 的其他大型工作負載,并且我們能夠将所有工作重新貢獻給開源 Apache Spark 項目 - 有關更多詳細信息,請參見下面相關的 JIRA。下面我們将重點介紹将一個實體排名作業部署到生産環境的主要改進。
可靠性修複(Reliability fixes)為了可靠地執行長時間運行的作業,我們希望系統能夠容錯并從故障中恢複(主要是由于正常維護或軟件錯誤導緻的機器重新啟動)。雖然 Spark 最初的設計可以容忍機器重動,但我們還是發現了各種各樣的 bug/問題,我們需要在系統正式投入生産之前解決這些問題。
在實現了上述可靠性改進之後,我們能夠可靠地運行 Spark 作業。此時,我們将工作重心轉移到與性能相關的問題上,以最大限度地利用 Spark。我們使用Spark 的指标和 profilers 來發現一些性能瓶頸。
在所有這些可靠性和性能改進之後,我們的實體排名系統變成了一個更快、更易于管理的管道,并且我們提供了在 Spark 中運行其他類似作業的能力。
使用 Spark 和 Hive 運行上面實體排名程序性能比較我們使用以下性能指标來比較 Spark 和 Hive 運行性能。
CPU time:這是從操作系統的角度來看 CPU 使用情況。例如,如果您的作業在32核機器上僅運行一個進程,使用所有 CPU 的50%持續10秒,那麼您的 CPU 時間将是 32 0.5 10 = 160 CPU 秒。
CPU reservation time:從資源管理框架的角度來看,這是 CPU 預留(CPU reservation)。例如,如果我們将32核機器預留10秒來運行這個作業,那麼 CPU 預留時間是 32 * 10 = 320 CPU秒。CPU 時間與 CPU 預留時間的比率反映了我們集群預留 CPU 資源的情況。準确地說,當運行相同的工作負載時,與 CPU 時間相比,預留時間可以更好地比較執行引擎。例如,如果一個進程需要1個 CPU 秒來運行,但是必須保留100個 CPU 秒,那麼根據這個指标,它的效率低于需要10個 CPU 秒但隻預留10個 CPU 秒來做相同數量的工作的進程。我們還計算了内存預留時間,但這裡沒有列出來,因為這些數字與 CPU 預留時間類似,而且使用 Spark 和 Hive 運行這個程序時都沒有在内存中緩存數據。Spark 有能力在内存中緩存數據,但由于集群内存的限制,我們并沒有使用這個功能。
Latency:作業從開始到結束運行時間。
Facebook 使用高性能和可擴展的分析引擎來幫助産品開發。Apache Spark 提供了将各種分析用例統一到單個 API ,并且提供了高效的計算引擎。我們将分解成數百個 Hive 作業管道替換為一個 Spark 作業,通過一系列的性能和可靠性改進,我們能夠使用 Spark 來處理生産中的實體數據排序的用例。在這個特殊的用例中,我們展示了 Spark 可以可靠地 shuffle 并排序 90 TB 以上的中間數據,并在一個作業中運行 250,000個 tasks。與舊的基于 Hive 計算引擎管道相比,基于 Spark 的管道産生了顯著的性能改進(4.5-6倍 CPU性能提升、節省了 3-4 倍資源的使用,并降低了大約5倍的延遲),并且已經在生産環境中運行了幾個月。
作者:過往記憶大數據
本文為阿裡雲原創内容,未經允許不得轉載。
,更多精彩资讯请关注tft每日頭條,我们将持续为您更新最新资讯!