Job, Stage, Task

本篇主要會記錄使用Spark的一些觀察日誌,希望可以了解以下問題:

  • Q: stage, task, job, partition 之間的關係?
  • Q: 何時會需要 Shuffle? Shuffle 是如何運作的?

簡單來說,一個 Spark Application 被提交之後,會根據 Action 的觸發產生 Job,每個 Job 根據 Shuffle 的分界點,又會被分成多個 Stage,而每個 Stage 預設會根據核心大小、資料大小,包含多個 Partition,也就是Task,以加快運算。大概是以下這種感覺:

Job

首先,Spark 中的數據都是由 RDD 組成的,而 RDD 是由 partition 組成的,每個 partition 代表一個數據塊。RDD 支援兩種操作分別是 Transformation 和 Action,Transformation 並不會讓程式馬上執行,而是會返回一個新的 RDD,而 Action 則會讓程式馬上執行,並返回一個結果

  • 常見的Transformation如:map、mapPartitions、flatMap、filter、union、groupByKey、repartition、cache等。
  • 常見的Action如:reduce、collect、show、count、foreach、saveAsTextFile等。

因此,一個 Job 需要由 Action 執行所觸發,而一個 Action 會觸發一個或多個 Transformation,也就代表 一個 Job 會有多個運算子操作(Tansformation)

1
2
3
4
5
# 舉例來說,當執行以下程式時,UI才會顯示出一個Job,因為他使用read這個action
ecommerce = spark.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv(root)

每個 Job 都是一系列的運算子操作,但是這些運算會有兩種情況:

  1. 窄依賴(Narrow Dependency):簡單來說就是下一個RDD所需要的數據來源是某個特定Partition,不需要跨越Partition
  2. 寬依賴(Wide Dependency):簡單來說就是下一個 RDD 的執行,需要來自多個不同 Partition 的資料

你會發現,因為Shuffle writing 變多了,這也會導致 gc 頻率提高更花時間,畢竟很多資料都要 Buffer,儘管沒有 memory 也會透過寫進磁碟的方式紀錄資料。(這也意味著gc頻率變多,所以時間也較長)。

需要注意的是如果使用cache,在下一個Action這個cache的內容被釋放掉的話,該數據並沒有被緩存,而是需要重新計算。

Task

Task是Spark最細的執行單元,Task的數量就是Stage的併行度,Task的數量由SparkContext的spark.default.parallelism設定,默認值是CPU核數。以Mac為例,透過輸入sysctl hw.physicalcpu可以得到CPU核數,而該CPU數量是一次Task可以執行的量。舉例來說,我的核心如果是10,因此從下圖可以看到一次執行10個Parallel Task

RDD 在計算的過程中,可能會分成多個 Partition,而每個 Partition 會被一個 Task 執行,一個 Task 只會執行一個 Partition。因此 RDD 的 Partition 數量決定了所有的 Task 數量總數

需要注意的是,如果使用 map 不會影響 Partition 數量,但是在 Reduce RDD的聚合階段(將數據減少為一個單一的結果),因為 Reduce 操作會在 Cluster 的 Parition 上執行,將各 Partition 的結果匯總起來,導致會觸發Shuffle操作,

聚合後的RDD其Partition數量跟操作有關,如果使用 repartition() 就會聚合成指定的 Partition 數量。但如果使用 coalesce 同樣可以改變 Partition 數量,但是他不會觸發 Shuffle,而是會將 Partition 數量減少到指定的數量,因此會導致資料不均勻,而且會導致資料移動,因此效率較差。

Stage

一個 Job 可能會包含多個 Stage,通常一個 Job 觸發多個 Stage ,而這些 Stage 的劃分是根據 Shuffle 所產生的,Shuffle依賴於兩個 Stage 的分界點

這句話什麼意思?
關於Shuffle的操作通常會導致一個Stage的結束,因為Shuffle涉及到數據的重新分發和組織,需要將數據從不同的Partition重新組合到不同的節點上。*這個重新分發的過程會引入Network和I/O的開銷(磁碟寫入和讀取),因此Spark通常會將Shuffle操作作為一個Stage的邊界。*也因此 Shuffle 也是任務中最消耗資源的部分,因為數據可能保存在不同的節點,所以下一個 Stage (也就是下一個 shuffle) 要執行前,需要先等待上一個 Stage 的 shuffle 完成,然後把上一個 Stage 的結果從不同節點上拉取過來,這會增加Network和I/O。

Shuffle

Working Process

Q: 何時會需要 Shuffle? Shuffle 是如何運作的?
Shuffle 是 Spark 中最耗資源的操作,這個過程會涉及到磁盤的讀寫,以及網路的傳輸(Network bound),因此會消耗大量的資源。以 reduceByKey() 為例,他會將上一個RDD 中的每個 Key 對應的所有 Value 聚合成一個 Value,然後生成一個新的 RDD,類似 <key, value>的形式。但是問題來了,當我們準備把上一個 RDD 中的 Key 其 Value 進行聚合的時候,這些 Key 不見得都在同一個 Partition 中(可能在別的 Node),這時候就需要進行 Shuffle

Q: 如何聚合?(這裡的聚合就是Shuffle)
聚合的過程中會牽涉到兩種操作:

  • Shuffle Write: 把上一個 RDD 執行玩的內容(但是不同Partition)其 Key 寫到 Parition File,這個 File 會被存在硬碟中(暫時的之後會gc處理掉),這麼做的目的是為了下一個 RDD 執行工作時,可以根據 Partition File 去不同的 Partition 獲取自己所需的 Value。
  • Shuffle Read:這個步驟就是下*一個 RDD 去讀取上一個 RDD 的 Partition File,*然後根據自己的 Key 去獲取 Value。

Q: 具體是怎麼執行 Shuffle Write 跟 Shuffle Read 的?
因為 Spark 有兩種不同的 Shuffle 類型,分別是 HashShuffle 和 SortShuffle,而這兩種 Shuffle 的差別在於 Shuffle Write 的時候,HashShuffle 會把 Key 進行 Hash 後,根據 Hash 後的值去決定寫到哪個 Partition File,而 SortShuffle 則是根據 Key 的值去決定寫到哪個 Partition File。而 Shuffle Read 的時候,HashShuffle 會根據 Key 的 Hash 值去決定從哪個 Partition File 讀取,而 SortShuffle 則是根據 Key 的值去決定從哪個 Partition File 讀取

而一般進行HashShuffle的機制是,Map Task 有獨立自己的 Buffer,如下:

  1. 每一個 map task 執行完後將 value 寫到 buffer 中,下一個 Stage 有多少個 Task 就會有多少個 Butter (=多少個 block file),buffer 的大小可以通過 spark.shuffle.file.buffer 設定,默認為32KB。
  2. 每個 buffer 會對應到一個磁碟小文件(也就是shuffle write的過程)。
  3. 最後,在執行 reduce task 時,會根據 map task 的數量,來決定 reduce task 要拉取多少個 map task 的輸出文件(也就是shuffle read的過程)。

產生的Block File(磁碟檔案)數量是:M×RM \times R
where R is the number of reduce tasks and M is the number of map task

Example Cause Shuffle

讓我們來舉例,Shuffle 發生的情況:

前提概要:比較一下兩段不一樣的程式,我們讀取了一份GB級別的csv檔案,因為檔案很大,所以會需要進行Partition,總共一個 Stage 執行 68 個 Task。

1
2
3
4
ecommerce = spark.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv(root)

以下這段程式,ecommerce是GB級別的資料,但是因為我們只顯示 5 筆,加上 filter 是 Narrow Dependency 所以不會觸發 Shuffle,因此執行速度非常快,甚至一個 Task 就可以搞定,我們也可以在 UI 中看到只有一個 Task。scaling 的方式會根據 action 執行的內容決定,舉例來說 show(5) 因為只需要 5 筆資料,所以當執行的queries滿足request時就會停止。

1
2
ecommerce = ecommerce.filter(col("event_type") == "purchase")
ecommerce.show(5) # job id 3, this is no shuffle due to the RDD is narrow dependency (filter)

但是當我們使用Wide Dependency時,就會觸發 Shuffle,因此會產生很多個 Task,而且執行速度也會變慢,因為需要等待 Shuffle Write 跟 Shuffle Read 的過程。

1
2
3
4
5
6
7
result = ecommerce.groupBy('user_session').agg(
spark_max('event_time').alias('Date_order'), # alias is rename function
spark_max('user_id').alias('user_id'),
count('user_session').alias('order_count'),
spark_sum('price').alias('price')
)
result.show(5) # job id 4, this accure shuffle due to the RDD is wide dependency (groupBy)

因此你也可以透過 Job id 3, 4 發現這兩者執行的任務會產生不一樣的 Task 數量與處理時間。

下圖中,你會發現在資料量大的情況下,每個Partition所負責的資料很多,default是200個Partition,每個

你會發現,在不同size 資料讀取間,造成的明顯影響有以下:

  • GC Time: 因為把 Shuffle 時執行的 Buffer 進行 GC。
  • Getting Result Time
  • Peak Execution Meemory
  • Shuffle Write Time

1BM file

1000 data raw

Problem

Q: Shuffle 可能會導致的問題?
你可以從上面的問題發現,執行Shuffle Writing 會產生很多磁碟小文件(Block File)當磁碟小文件過多時可能會導致以下問題:

  1. Shuffle Read 的故城中會需要讀取很多 Block File (磁碟小文件)對象。因此在 JVM 堆內存中對象過多會造成頻繁的 gc,當gc因為內存不夠時,會導致 OOM (Out Of Memory)。
  2. 在數據傳輸過程中會需要平凡的 Network Bound,這可能會提高通信故障的可能性,一旦通信故障,就會導致 Task 失敗發生 Shuffle file cannot find這個錯誤,TaskScheduler 不負責重試,由DAGScheduler負責重試 Stage。

Partition

  • Ref: 關於Repartition() vs Coalesce()之間的差異?

  • 通常一個RDD的DD(Distributed dataset),是由數個不同的Partition所組成,而Partition會分散在cluster不同node上。

  • partition數量太小可能導致cluster使用率降低,或是單一partition太大無法放入單機的記憶體中

  • partition數量太多可能導致較多的跨節點溝通,造成太多無謂的網路IO

    • Spark官方與DataBricks都建議partition數量為叢集中可用核心數的3~4倍為一個參考值

To be continue…

Fault Tolerance

If we want our system to be fault tolerant, it should be redundant because we ==require a redundant component to obtain the lost data==. The faulty data recovers by redundant data.

Cluster 容錯

Worker 異常

當 Worker 異常,Master 會透過心跳機制發現,並且移除異常的 Worker,然後重新啟動 Executor。

  • Spark Worker會保持和Master的心跳,當Worker出現逾時時,Master呼叫timeOutDeadWorkers()方法進行處理,移除逾時的Worker
  • 移除時會通知Driver Executor已經移除(Executor異常處理詳見下文),然後設定運行在目前Worker上的Driver重啟或直接刪除:

Executor 異常

當 Executor 異常,會會重試三次,若仍是無法則嘗試取得可用的 Worker 節點並重新啟動 Executor。

  • Executor發生異常時,外部的包裝類別ExecutorRunner會把異常訊息傳送給Worker
  • 然後Worker會傳訊息給Master
  • Master 接收 Executor 狀態變更訊息後,如果發現 Executor 出現異常退出,則呼叫 Master.schedule 方法,嘗試取得可用的 Worker 節點並重新啟動 Executor。

簡單來說,如果因為Worker異常而導致Stage失敗,就會觸發RDD容錯機制,會對運行失敗的Stage進行重試,預設三次。

我們可以透過點擊Executors Tab來觀察,有關為應用程式創建的執行程序的摘要信息,包括內存和磁碟使用情況以及任務和shuffle信息。

Job Task 容錯

RDD Lineage:

  • 基於RDD的各項transformation構成compute chain,這樣在計算結果丟失時可以根據lineage重新計算。
  • 窄依賴的情況下,因為每個父RDD Partition依賴於特定的子RDD Partition,重新計算時可以直接使用這個子RDD Partition的數據,沒有Redundant Computation(冗餘計算)。
  • 寬依賴的情況下,當丟失整一個子RDD Partition時,因為多個父RDD Partition可能會依賴於這個子RDD Partition,因此Spark必須將與該子RDD相關的所有父RDD都重新計算
    • 因此如果compute chain很長的寬依賴情況下,建議做一次Checkpoint或是cache來先做緩存,減少執行開銷。

Q: 什麼是Redundant Computation(冗餘計算)?
主要常發生在寬依賴的情況下,因為每個父RDD Partition依賴多個子RDD Partition,重新計算時需要重新計算所有子RDD Partition,這樣就會造成Redundant Computation(冗餘計算)。