Pyspark 的基本概念
前言
本篇文章主要的目的是在整理 Spark: The Definitive Guide 這本書的內容,並且加上自己的理解,讓自己更加熟悉 Spark 的基本概念。
Spark Application
取自:Spark: The Definitive Guide
![]()
Spark Application mainly consist of two processes:
- Driver process:
- executing main() function, sits on a node in the cluster
- maintaining information about the Spark Application
- responding to a user’s program or input
- analyzing, distributing, and scheduling work across the executors
- Executor process:
- executing code assigned to it by the driver
- reporting the state of the computation on that executor back to the driver node
Spark’s APIs
![]()
Spark’s language APIs 提供其他程式語言像是Python或是R等,執行Spark code的能力。有一個 SparkSession object 會被建立,SparkSession 你可以想像他是執行 Spark code 的入口,如果使用其他語言像是Python時,不需要撰寫JVM指令,可以透過 Spark’s Language API 將 Python code 轉換成可以在 JVMs 上面執行的程式。
SparkSession
SparkSesion:透過 driver process 控制 Spark Application。
![]()
透過 PySpark (Spark API)來建立 SparkSession:
1 | import findspark |
Structured API
![]()
Datasets
- 差異:
- Datasets 是 Spark 中的類型安全 API,在 Java 和 Scala 中提供了靜態類型的支持。(也就是在編譯前就知道數據類型)
- 它允許將 Java/Scala 類型分配給 DataFrame 中的記錄,並以類似 Java ArrayList 或 Scala Seq 的方式操作它們。
- 使用時機:
- Datasets 適合於需要靜態類型支持的情況,特別是大型應用程序中,多個工程師通過明確定義的接口進行交互時。
- 就像Java中的Class,你會定義好他的屬性、名稱、函式等。
- 例子:
- 如果你需要對數據進行較為複雜的操作,而且
需要保證類型安全,那麼使用 Dataset 是個不錯的選擇。 - 例如,進行較複雜的業務邏輯,同時需要在數據處理中保持靜態類型檢查。
- 如果你需要對數據進行較為複雜的操作,而且
1 | // in Scala |
DataFrames
- 差異:
- DataFrames 是分佈式的 Row 對象集合,用於處理各種類型的表格數據。
- 它提供了一個
更接近 SQL 操作的界面(只是是透過函示呼叫),並且是 Python、R 和 Scala 中使用最廣泛的 API。
- 使用時機:
- 當你需要以類似於 SQL 的方式操作和轉換數據時,DataFrames 是一個很好的選擇。它的彈性和簡潔性使得能夠快速進行數據操作和轉換。
- 例子:
- 例如,進行數據過濾、聚合、連接和簡單的轉換等操作時,DataFrames 是一個非常方便的選擇。
1 | # 先讀取資料 |
SQL
- 差異:
- SQL 是一種結構化查詢語言,允許使用
類 SQL 的語法進行數據操作。(可以直接寫SQL) - 在 Spark 中,你可以使用 SQL 像操作關聯式數據庫一樣操作 DataFrame 和 TempView。
- SQL 是一種結構化查詢語言,允許使用
- 使用時機:
- 當你更擅長於 SQL 語法,或者希望使用標準 SQL 操作數據時,SQL 是一個很好的選擇。它也能提供可讀性強、易於維護的優勢。
- 例子:
- 如果你有大量的 SQL 經驗,或者希望通過使用 SQL 來表達數據操作,那麼在 Spark 中使用 SQL 是很直觀的。
- 舉例來說,對於熟悉 SQL 語法的用戶,將一些較複雜的數據操作轉化為 SQL 可能更為自然和高效。
1 | # 先讀取資料 |
三者比較
- 效率:DataFrames 和 SQL 提供了簡潔、易讀且高效的數據操作方式,適用於快速開發和簡單轉換。
- 類型安全:Datasets 提供了靜態類型支持,適用於對類型安全性要求較高的情況。
Structured Streaming
StreamingDataFrame 跟 staticDataFrame 有什麼差異呢?
差異
- StreamingDataFrame:
- 用於處理流式數據,是基於連續到達的數據流進行操作。
- 使用 readStream 方法讀取流式數據。
- 可以通過在流式數據上應用操作和轉換來進行實時處理。
- staticDataFrame:
- 用於處理靜態、固定的數據集,是一次性加載整個數據集進行操作。
- 使用 read 方法從靜態數據源(如文件、資料庫等)讀取數據。
- 適用於批處理作業,對於靜態且不會變動的數據進行操作和分析。
流式操作
- 它的目的是將數據寫入某個地方,而不僅僅是對數據進行計算或統計(例如 count 操作在流式上是沒有意義的)。
- 流式操作會將結果輸出到內存表,並在每次觸發(trigger)後更新該表。
- 在啟動了流式處理之後,可以使用透過儲存的內存 Table 使用 SQL 查詢來檢視內存表中的結果。
- 好處:這樣的操作讓你可以在將數據寫入最終目的地之前,先對計算結果進行預覽和檢查,以確保數據處理和計算的準確性和一致性。
1 | # in Python |
他就會不斷跳出結果,只要讀取了新的檔案就會觸發一次,然後你就可以查看檔案的變化,跳出的結果如下
![]()
使用時機
- StreamingDataFrame:
- 適用於處理連續到達的、持續變化的數據流,例如實時日誌、感測器數據等。
- 可以實時進行處理、分析和存儲。
- 優點是能夠處理動態數據,但也需要考慮流式處理的性能和延遲。
- staticDataFrame:
- 適用於一次性、靜態的數據集,例如批量文件、靜態數據庫內容等。
- 適用於批處理作業,例如數據清理、分析報告等。
- 優點是能夠
對整個數據集進行全局操作和分析,但無法處理持續更新的數據。
Machine Learning and Advanced Analytics
Spark 有一個內建的機器學習算法庫(MLlib),這使其能夠執行大規模的機器學習任務。MLlib 包括了預處理、數據整理、模型訓練和在大規模數據上進行預測的功能。甚至可以在 Structured Streaming 中使用在 MLlib 中訓練過的模型進行預測。Spark 提供了一個複雜的機器學習 API,可用於執行各種機器學習任務,從分類到回歸,從聚類到深度學習等。
資料清理
MLlib 中的機器學習演算法要求資料以數值表示。我們目前的資料由多種不同的類型表示,包括時間戳記、整數和字串。因此,我們需要將這些數據轉換為某種數字表示形式。那我們就來看看該怎麼做吧!
在本例中,我們將使用多個 DataFrame 轉換來操作日期資料:
- 先把nan的資料填補成0
- 將日期轉換成星期幾
coalesce來減少分區數量,預設是200的分區,但是我們希望 staticDataFrame 只要分成 5 區即可。
coalesce 跟spark.conf.set("spark.sql.shuffle.partitions", "5")有類似的效果,但是這個是針對所有的DataFrame,而coalesce是針對特定的DataFrame。
1 | from pyspark.sql.functions import date_format, col |
資料切割trainning and testing
接著我們可以把資廖根據時間分成 trainning 跟 testing 的資料集,並且將資料轉換成機器學習演算法所需的格式。
1 | # in Python |
資料轉換成 vector
切割好資料後,我們建立一個 Spark MLlib 中的一個轉換器(transformer):StringIndexer。
StringIndexer是用於**將「字串類別」型特徵轉換為「數字類別」**型特徵的轉換器。- 它將一列中的字串值按照它們在該列中出現的頻率或者字母表順序,映射為數字類別值。
在這個例子中,這段程式碼創建了一個 StringIndexer 物件並進行了相關設定:
.setInputCol("day_of_week"):設置了輸入列(input column),即要轉換的列的名稱,在這裡是 “day_of_week”。.setOutputCol("day_of_week_index"):設置了輸出列(output column),指定轉換後數字類別值的存儲位置,這裡是 “day_of_week_index”。
1 | # indexer |
準備好 indexer, encoder, vectorAssembler 之後,我們就可以建立一個 pipeline 來執行這些轉換。
1 | # pipeline |
接著我們就可以透過 pipeline 來轉換資料了。我們必須先透過 fit 來建立一個 pipeline model,因為當使用 StringIndexer 對類別型特徵進行轉換時,它需要將每個不同的類別值映射為一個唯一的數字。在這個過程中,需要確定要被索引的類別型特徵列中有多少個唯一的類別值。
例如,如果你有一列包含星期幾(Monday、Tuesday、Wednesday 等),StringIndexer 會將每個不同的星期幾映射為一個唯一的數字標識,例如 Monday: 0, Tuesday: 1, 等等。在訓練過程中,StringIndexer 需要看到整個訓練數據集中的所有不同的類別值,這樣它才能確保給每個類別值分配一個唯一的數字標識。
fittedPipeline,這是在訓練數據集上進行了訓練的 Pipeline。
1 | fittedPipeline = transformationPipeline.fit(trainDataFrame) |
接著我們透過 fittedPipeline 的 transform 方法將這個訓練完的 Pipeline 應用於訓練數據集 trainDataFrame,對數據進行相同的轉換。
1 | transformedTraining = fittedPipeline.transform(trainDataFrame) |
準備模型
在 MLlib 的 DataFrame API 中,每個算法都有兩種類型。
- 未訓練的版本按照 Algorithm 命名模式,而訓練過的版本則是 AlgorithmModel。
- 在這個例子中,例如 KMeans 表示未訓練的版本,而 KMeansModel 則是經過訓練的版本。
1 | from pyspark.ml.clustering import KMeans |
計算 Loss
因為 3.0.0 的版本後,就把 computeCost 移除了,所以要使用 ClusteringEvaluator 來計算 Loss。這邊我們要計算的是 Silhouette score,他與 Loss 是用於不同類型的機器學習問題評估的指標。
- Silhouette score:
- 主要用於評估聚類(Clustering)的質量。
- 分數範圍在 -1 到 1 之間,分數越接近 1 表示聚類結果越好,表示樣本與自己的聚類比與其他聚類更相似,而分數越接近 -1 則表示聚類結果較差,樣本更可能被分配到錯誤的聚類中。
- Loss:
- 損失是在監督式學習(Supervised Learning)中使用的評估指標,用於衡量模型預測與實際目標值之間的差距。
- 損失的計算方式因問題而異,例如在回歸問題中可以使用均方誤差(Mean Squared Error),在分類問題中可以使用交叉熵損失(Cross-Entropy Loss)。損失值越小表示模型預測與實際值之間的差距越小,模型的性能越好。
1 | from pyspark.ml.evaluation import ClusteringEvaluator |
Low-Level APIs
![]()
RDD 是 Spark 中的一個基本抽象,它是一個具有容錯性和可並行處理的、分佈式的元素集合。RDD 提供了一種能夠在大規模集群上進行並行操作的抽象方式,它是 Spark 運行的核心基礎。事實上,幾乎所有 Spark 的操作都是建立在 RDD 之上的。
然而,DataFrame 是建立在 RDD 之上的更高級抽象,它提供了一種更方便、更高效的方式來進行分佈式數據操作。DataFrame 基於 RDD,但它提供了更高層次的抽象,隱藏了底層 RDD 的物理特性(例如分區等),使得用戶可以更方便地進行數據操作,並且能夠獲得更好的性能。
在 Spark 中,大部分情況下,建議使用 DataFrame 或 Structured APIs 進行數據操作,因為它們提供了更高層次的抽象和更好的性能。但是在某些情況下,仍然可能會使用到 RDD,例如在讀取或操作原始數據時。
下面的程式碼示例展示了如何使用 parallelize 方法將一組數據轉換為 RDD,然後將其轉換為 DataFrame,這是一種使用 RDD 創建 DataFrame 的方式。
1 | # in Python |
RDD(Resilient Distributed Dataset)
RDD 是 Spark 中的基本抽象,它代表一個分佈式的、不可變的集合,能夠容錯和並行處理。RDD 提供了對數據的精細控制,允許使用者管理數據分區、內存使用以及計算過程。
適合的使用時機:
- 當需要更精細的控制和低層次的操作時,例如需要手動控制數據分區、優化內存使用或者需要使用一些特定於 RDD 的操作時。
- 在一些需要自定義的計算邏輯或者在遷移舊有 Spark 代碼到新版本時,可以使用 RDD 以保持兼容性。
優缺點:
- 優點:
- 提供更多細節級別的控制,可以手動調整分區、緩存策略等。
- 在特定場景下可以更靈活,並提供一些 DataFrame 不具備的操作。
- 缺點:
- 較為低階,需要更多的手動管理和優化。
- 較 DataFrame 更複雜,需要更多的代碼量。
與 DataFrame 的差異
與 DataFrame 的差異
- DataFrame 是在 RDD 基礎上的高層次抽象,提供了更簡潔的 API 和更高效的優化。
DataFrame 提供了結構化數據處理的抽象,允許使用者進行類似 SQL 的操作。
相對於 RDD,DataFrame 適合的使用時機:
- 當進行結構化數據處理時,例如進行篩選、聚合、排序等操作。
- 在需要高效並行處理和內建優化的情況下,DataFrame 更為適合。
相對於RDD的優缺點:
- 優點:
- 提供了更高級的抽象和更簡潔的 API,易於使用和理解。
- 具有內置的優化功能,能夠自動優化和提高效率。
- 缺點:
- 對於某些特定場景下的低層次操作,DataFrame 可能無法提供足夠的靈活性。
Stage, Executor, Driver
![]()
- spark context:
- cluster manager: 資源管理器收到請求,會在滿足條件的 worker node 上建立 executor
- executor: 他就是一個獨立的JVM process,單個節點的執行進程,裡面會有多個Task線程
- Driver: 他就是我們寫的spark應用程式,創建sparkcontext或sparksession,driver會和cluster manager通信,分配task道executor上。
- 會根據sparkcontext中的資源需求,向resource manager申請資源,包括executor數量與內存。
Fault Tolerance
If we want our system to be fault tolerant, it should be redundant because we ==require a redundant component to obtain the lost data==. The faulty data recovers by redundant data.
Cluster 容錯
Worker 異常
當 Worker 異常,Master 會透過心跳機制發現,並且移除異常的 Worker,然後重新啟動 Executor。
- Spark Worker會保持和Master的心跳,當Worker出現逾時時,Master呼叫timeOutDeadWorkers()方法進行處理,移除逾時的Worker。
- 移除時會通知Driver Executor已經移除(Executor異常處理詳見下文),然後設定運行在目前Worker上的Driver重啟或直接刪除:
Executor 異常
當 Executor 異常,會會重試三次,若仍是無法,則嘗試取得可用的 Worker 節點並重新啟動 Executor。
- Executor發生異常時,外部的包裝類別ExecutorRunner會把異常訊息傳送給Worker
- 然後Worker會傳訊息給Master。
- Master 接收 Executor 狀態變更訊息後,如果發現 Executor 出現異常退出,則呼叫 Master.schedule 方法,嘗試取得可用的 Worker 節點並重新啟動 Executor。
簡單來說,如果因為Worker異常而導致Stage失敗,就會觸發RDD容錯機制,會對運行失敗的Stage進行重試,預設三次。
Job Task 容錯
RDD Lineage:
- 基於RDD的各項transformation構成compute chain,這樣在計算結果丟失時可以根據lineage重新計算。
- 在窄依賴的情況下,因為每個父RDD Partition依賴於特定的子RDD Partition,重新計算時可以直接使用這個子RDD Partition的數據,沒有Redundant Computation(冗餘計算)。
- 在寬依賴的情況下,當丟失整一個子RDD Partition時,因為多個父RDD Partition可能會依賴於這個子RDD Partition,因此Spark必須將與該子RDD相關的所有父RDD都重新計算。
- 因此如果compute chain很長的寬依賴情況下,建議做一次Checkpoint或是cache來先做緩存,減少執行開銷。
Q: 什麼是Redundant Computation(冗餘計算)?
主要常發生在寬依賴的情況下,因為每個父RDD Partition依賴多個子RDD Partition,重新計算時需要重新計算所有子RDD Partition,這樣就會造成Redundant Computation(冗餘計算)。
補充:Spark’s Interactive Consoles
如果以 interactive mode 啟動 Spark,就是間接的建立管理 Spark Application 的SparkSession。
1 | ./bin/pyspark # Python console: starts an interactive Spark application |