前言

本篇文章主要的目的是在整理 Spark: The Definitive Guide 這本書的內容,並且加上自己的理解,讓自己更加熟悉 Spark 的基本概念。

Spark Application

取自:Spark: The Definitive Guide

Spark Application mainly consist of two processes:

  1. Driver process
    • executing main() function, sits on a node in the cluster
    • maintaining information about the Spark Application
    • responding to a user’s program or input
    • analyzing, distributing, and scheduling work across the executors
  2. Executor process
    • executing code assigned to it by the driver
    • reporting the state of the computation on that executor back to the driver node

Spark’s APIs

Spark’s language APIs 提供其他程式語言像是Python或是R等,執行Spark code的能力。有一個 SparkSession object 會被建立,SparkSession 你可以想像他是執行 Spark code 的入口,如果使用其他語言像是Python時,不需要撰寫JVM指令,可以透過 Spark’s Language API 將 Python code 轉換成可以在 JVMs 上面執行的程式

SparkSession

SparkSesion:透過 driver process 控制 Spark Application。

透過 PySpark (Spark API)來建立 SparkSession:

1
2
3
4
5
6
import findspark 
findspark.init()
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.range(1000).toDF("number")

Structured API

Datasets

  • 差異
    • Datasets 是 Spark 中的類型安全 API,在 Java 和 Scala 中提供了靜態類型的支持。(也就是在編譯前就知道數據類型)
    • 它允許將 Java/Scala 類型分配給 DataFrame 中的記錄,並以類似 Java ArrayList 或 Scala Seq 的方式操作它們。
  • 使用時機
    • Datasets 適合於需要靜態類型支持的情況,特別是大型應用程序中,多個工程師通過明確定義的接口進行交互時。
    • 就像Java中的Class,你會定義好他的屬性、名稱、函式等。
  • 例子
    • 如果你需要對數據進行較為複雜的操作,而且需要保證類型安全,那麼使用 Dataset 是個不錯的選擇。
    • 例如,進行較複雜的業務邏輯,同時需要在數據處理中保持靜態類型檢查。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// in Scala
// 先定義好class 型別
case class Flight(DEST_COUNTRY_NAME: String,
ORIGIN_COUNTRY_NAME: String,
count: BigInt)


// 讀取資料
val flightsDF = spark.read
.parquet("/data/flight-data/parquet/2010-summary.parquet/")

// 轉換資料對應至型別中
val flights = flightsDF.as[Flight]

// in Scala
flights
.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
.map(flight_row => flight_row)
.take(5)

flights
.take(5)
.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
.map(fr => Flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME, fr.count + 5))

DataFrames

  • 差異
    • DataFrames 是分佈式的 Row 對象集合,用於處理各種類型的表格數據。
    • 它提供了一個更接近 SQL 操作的界面(只是是透過函示呼叫),並且是 Python、R 和 Scala 中使用最廣泛的 API。
  • 使用時機
    • 當你需要以類似於 SQL 的方式操作和轉換數據時,DataFrames 是一個很好的選擇。它的彈性和簡潔性使得能夠快速進行數據操作和轉換。
  • 例子
    • 例如,進行數據過濾、聚合、連接和簡單的轉換等操作時,DataFrames 是一個非常方便的選擇。
1
2
3
4
5
6
7
8
# 先讀取資料
flight = spark.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv(os.path.join(root, "flight-data/csv/2015-summary.csv"))

# 接近SQL操作介面,但是使用起來更加方便
dataframeway = flight.groupBy("DEST_COUNTRY_NAME").count()

SQL

  • 差異
    • SQL 是一種結構化查詢語言,允許使用類 SQL 的語法進行數據操作。(可以直接寫SQL)
    • 在 Spark 中,你可以使用 SQL 像操作關聯式數據庫一樣操作 DataFrame 和 TempView。
  • 使用時機
    • 當你更擅長於 SQL 語法,或者希望使用標準 SQL 操作數據時,SQL 是一個很好的選擇。它也能提供可讀性強、易於維護的優勢。
  • 例子
    • 如果你有大量的 SQL 經驗,或者希望通過使用 SQL 來表達數據操作,那麼在 Spark 中使用 SQL 是很直觀的。
    • 舉例來說,對於熟悉 SQL 語法的用戶,將一些較複雜的數據操作轉化為 SQL 可能更為自然和高效。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 先讀取資料
flight = spark.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv(os.path.join(root, "flight-data/csv/2015-summary.csv"))

# 註冊為一個名為 "flight_data_2015" 的暫時性視圖。
# 這意味著你可以使用 SQL 語法或 Spark 的 DataFrame API 在程式中直接查詢這個名為 "flight_data_2015" 的視圖。
flight.createOrReplaceTempView("flight_data_2015")

# 這樣就可以透過 SQL 語法進行查詢
sqlway = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

三者比較

  • 效率:DataFrames 和 SQL 提供了簡潔、易讀且高效的數據操作方式,適用於快速開發和簡單轉換。
  • 類型安全:Datasets 提供了靜態類型支持,適用於對類型安全性要求較高的情況。

Structured Streaming

StreamingDataFrame 跟 staticDataFrame 有什麼差異呢?

差異

  • StreamingDataFrame:
    • 用於處理流式數據,是基於連續到達的數據流進行操作。
    • 使用 readStream 方法讀取流式數據。
    • 可以通過在流式數據上應用操作和轉換來進行實時處理。
  • staticDataFrame:
    • 用於處理靜態、固定的數據集,是一次性加載整個數據集進行操作。
    • 使用 read 方法從靜態數據源(如文件、資料庫等)讀取數據。
    • 適用於批處理作業,對於靜態且不會變動的數據進行操作和分析。

流式操作

  • 它的目的是將數據寫入某個地方,而不僅僅是對數據進行計算或統計(例如 count 操作在流式上是沒有意義的)。
  • 流式操作會將結果輸出到內存表,並在每次觸發(trigger)後更新該表。
  • 在啟動了流式處理之後,可以使用透過儲存的內存 Table 使用 SQL 查詢來檢視內存表中的結果。
  • 好處:這樣的操作讓你可以在將數據寫入最終目的地之前,先對計算結果進行預覽和檢查,以確保數據處理和計算的準確性和一致性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# in Python

# 1. 讀取資料適用 load 而不是 read
# 2. maxFilesPerTrigger 代表每次觸發的檔案數量 也就是新的檔案進來時,會觸發一次
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger", 1)\
.format("csv")\
.option("header", "true")\
.load("/data/retail-data/by-day/*.csv")

# 當trigger發生時,會將結果輸出到內存表中,但是以下是lazy evaluation,所以不會真的執行
# 要透過 writeStream 來啟動
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")

# writeStream 當每次 trigger 觸發的時候,會將結果輸出到內存表中
purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()

# 透過儲存的內存 Table 使用 SQL 查詢來檢視內存表中的結果
# 可以查看目前處理的狀況
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""")\
.show(5)

# 通過使用 writeStream 方法,將結果輸出到控制台,這樣你可以在控制台上看到實時的結果變化。
purchaseByCustomerPerHour.writeStream
.format("console")
.queryName("customer_purchases_2")
.outputMode("complete")
.start()

他就會不斷跳出結果,只要讀取了新的檔案就會觸發一次,然後你就可以查看檔案的變化,跳出的結果如下

使用時機

  • StreamingDataFrame:
    • 適用於處理連續到達的、持續變化的數據流,例如實時日誌、感測器數據等。
    • 可以實時進行處理、分析和存儲。
    • 優點是能夠處理動態數據,但也需要考慮流式處理的性能和延遲。
  • staticDataFrame:
    • 適用於一次性、靜態的數據集,例如批量文件、靜態數據庫內容等。
    • 適用於批處理作業,例如數據清理、分析報告等。
    • 優點是能夠對整個數據集進行全局操作和分析,但無法處理持續更新的數據。

Machine Learning and Advanced Analytics

Spark 有一個內建的機器學習算法庫(MLlib),這使其能夠執行大規模的機器學習任務。MLlib 包括了預處理、數據整理、模型訓練和在大規模數據上進行預測的功能。甚至可以在 Structured Streaming 中使用在 MLlib 中訓練過的模型進行預測。Spark 提供了一個複雜的機器學習 API,可用於執行各種機器學習任務,從分類到回歸,從聚類到深度學習等。

資料清理

MLlib 中的機器學習演算法要求資料以數值表示。我們目前的資料由多種不同的類型表示,包括時間戳記、整數和字串。因此,我們需要將這些數據轉換為某種數字表示形式。那我們就來看看該怎麼做吧!

在本例中,我們將使用多個 DataFrame 轉換來操作日期資料:

  • 先把nan的資料填補成0
  • 將日期轉換成星期幾
  • coalesce 來減少分區數量,預設是200的分區,但是我們希望 staticDataFrame 只要分成 5 區即可。

coalescespark.conf.set("spark.sql.shuffle.partitions", "5")有類似的效果,但是這個是針對所有的DataFrame,而coalesce是針對特定的DataFrame。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
.na.fill(0)\
.withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
.coalesce(5)

# output
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country|day_of_week|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
| 580538| 23084| RABBIT NIGHT LIGHT| 48|2011-12-05 08:38:00| 1.79| 14075.0|United Kingdom| Monday|
| 580538| 23077| DOUGHNUT LIP GLOSS | 20|2011-12-05 08:38:00| 1.25| 14075.0|United Kingdom| Monday|
| 580538| 22906|12 MESSAGE CARDS ...| 24|2011-12-05 08:38:00| 1.65| 14075.0|United Kingdom| Monday|
| 580538| 21914|BLUE HARMONICA IN...| 24|2011-12-05 08:38:00| 1.25| 14075.0|United Kingdom| Monday|
| 580538| 22467| GUMBALL COAT RACK| 6|2011-12-05 08:38:00| 2.55| 14075.0|United Kingdom| Monday|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
only showing top 5 rows

資料切割trainning and testing

接著我們可以把資廖根據時間分成 trainning 跟 testing 的資料集,並且將資料轉換成機器學習演算法所需的格式。

1
2
3
4
5
6
7
8
# in Python
trainDataFrame = preppedDataFrame\
.where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
.where("InvoiceDate >= '2011-07-01'")

trainDataFrame.count()
testDataFrame.count()

資料轉換成 vector

切割好資料後,我們建立一個 Spark MLlib 中的一個轉換器(transformer):StringIndexer

  • StringIndexer 是用於**將「字串類別」型特徵轉換為「數字類別」**型特徵的轉換器。
  • 它將一列中的字串值按照它們在該列中出現的頻率或者字母表順序,映射為數字類別值

在這個例子中,這段程式碼創建了一個 StringIndexer 物件並進行了相關設定:

  • .setInputCol("day_of_week"):設置了輸入列(input column),即要轉換的列的名稱,在這裡是 “day_of_week”。
  • .setOutputCol("day_of_week_index"):設置了輸出列(output column),指定轉換後數字類別值的存儲位置,這裡是 “day_of_week_index”。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# indexer 
## setInputCol 針對 day_of_week 轉換成 day_of_week_index
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
.setInputCol("day_of_week")\
.setOutputCol("day_of_week_index")

# encoder
## 把 index 轉換成 one-hot encoding
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
.setInputCol("day_of_week_index")\
.setOutputCol("day_of_week_encoded")

# vector
## 把所有的特徵轉換成一個向量
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler()\
.setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
.setOutputCol("features")

準備好 indexer, encoder, vectorAssembler 之後,我們就可以建立一個 pipeline 來執行這些轉換。

1
2
3
4
5
# pipeline
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
.setStages([indexer, encoder, vectorAssembler])

接著我們就可以透過 pipeline 來轉換資料了。我們必須先透過 fit 來建立一個 pipeline model,因為當使用 StringIndexer 對類別型特徵進行轉換時,它需要將每個不同的類別值映射為一個唯一的數字。在這個過程中,需要確定要被索引的類別型特徵列中有多少個唯一的類別值

例如,如果你有一列包含星期幾(Monday、Tuesday、Wednesday 等),StringIndexer 會將每個不同的星期幾映射為一個唯一的數字標識,例如 Monday: 0, Tuesday: 1, 等等。在訓練過程中,StringIndexer 需要看到整個訓練數據集中的所有不同的類別值,這樣它才能確保給每個類別值分配一個唯一的數字標識

fittedPipeline,這是在訓練數據集上進行了訓練的 Pipeline。

1
fittedPipeline = transformationPipeline.fit(trainDataFrame)

接著我們透過 fittedPipeline 的 transform 方法將這個訓練完的 Pipeline 應用於訓練數據集 trainDataFrame,對數據進行相同的轉換

1
2
3
4
5
transformedTraining = fittedPipeline.transform(trainDataFrame)

# 使用快取這種優化技術,該技術會將中間轉換後的數據集存儲到內存中
# 這樣就可以以更低的成本重複訪問它。
transformedTraining.cache()

準備模型

在 MLlib 的 DataFrame API 中,每個算法都有兩種類型。

  • 未訓練的版本按照 Algorithm 命名模式,而訓練過的版本則是 AlgorithmModel。
  • 在這個例子中,例如 KMeans 表示未訓練的版本,而 KMeansModel 則是經過訓練的版本。
1
2
3
4
5
6
7
8
9
from pyspark.ml.clustering import KMeans

# 還未訓練的版本
kmeans = KMeans()\
.setK(20)\
.setSeed(1)

# 訓練好的 model
kmModel = kmeans.fit(transformedTraining)

計算 Loss

因為 3.0.0 的版本後,就把 computeCost 移除了,所以要使用 ClusteringEvaluator 來計算 Loss。這邊我們要計算的是 Silhouette score,他與 Loss 是用於不同類型的機器學習問題評估的指標。

  • Silhouette score
    • 主要用於評估聚類(Clustering)的質量。
    • 分數範圍在 -1 到 1 之間,分數越接近 1 表示聚類結果越好,表示樣本與自己的聚類比與其他聚類更相似,而分數越接近 -1 則表示聚類結果較差,樣本更可能被分配到錯誤的聚類中。
  • Loss
    • 損失是在監督式學習(Supervised Learning)中使用的評估指標,用於衡量模型預測與實際目標值之間的差距。
    • 損失的計算方式因問題而異,例如在回歸問題中可以使用均方誤差(Mean Squared Error),在分類問題中可以使用交叉熵損失(Cross-Entropy Loss)。損失值越小表示模型預測與實際值之間的差距越小,模型的性能越好。
1
2
3
4
5
6
7
8
9
10
from pyspark.ml.evaluation import ClusteringEvaluator

# 在訓練後使用訓練好的模型對測試數據進行預測
predictions = kmModel.transform(transformedTest)

# 評估聚類模型的 Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)

print("Silhouette with squared euclidean distance = " + str(silhouette))

Low-Level APIs

RDD 是 Spark 中的一個基本抽象,它是一個具有容錯性和可並行處理的、分佈式的元素集合。RDD 提供了一種能夠在大規模集群上進行並行操作的抽象方式,它是 Spark 運行的核心基礎。事實上,幾乎所有 Spark 的操作都是建立在 RDD 之上的。

然而,DataFrame 是建立在 RDD 之上的更高級抽象,它提供了一種更方便、更高效的方式來進行分佈式數據操作。DataFrame 基於 RDD,但它提供了更高層次的抽象,隱藏了底層 RDD 的物理特性(例如分區等),使得用戶可以更方便地進行數據操作,並且能夠獲得更好的性能。

在 Spark 中,大部分情況下,建議使用 DataFrame 或 Structured APIs 進行數據操作,因為它們提供了更高層次的抽象和更好的性能。但是在某些情況下,仍然可能會使用到 RDD,例如在讀取或操作原始數據時。

下面的程式碼示例展示了如何使用 parallelize 方法將一組數據轉換為 RDD,然後將其轉換為 DataFrame,這是一種使用 RDD 創建 DataFrame 的方式。

1
2
3
4
5
6
7
# in Python
from pyspark.sql import Row
spark.sparkContext.parallelize([
Row(1),
Row(2),
Row(3)
]).toDF()

RDD(Resilient Distributed Dataset)

RDD 是 Spark 中的基本抽象,它代表一個分佈式的、不可變的集合,能夠容錯和並行處理。RDD 提供了對數據的精細控制,允許使用者管理數據分區、內存使用以及計算過程。

適合的使用時機

  • 當需要更精細的控制和低層次的操作時,例如需要手動控制數據分區、優化內存使用或者需要使用一些特定於 RDD 的操作時。
  • 在一些需要自定義的計算邏輯或者在遷移舊有 Spark 代碼到新版本時,可以使用 RDD 以保持兼容性。

優缺點

  • 優點:
    • 提供更多細節級別的控制,可以手動調整分區、緩存策略等。
    • 在特定場景下可以更靈活,並提供一些 DataFrame 不具備的操作。
  • 缺點:
    • 較為低階,需要更多的手動管理和優化。
    • 較 DataFrame 更複雜,需要更多的代碼量。

與 DataFrame 的差異

與 DataFrame 的差異

  • DataFrame 是在 RDD 基礎上的高層次抽象,提供了更簡潔的 API 和更高效的優化。
    DataFrame 提供了結構化數據處理的抽象,允許使用者進行類似 SQL 的操作。

相對於 RDD,DataFrame 適合的使用時機

  • 當進行結構化數據處理時,例如進行篩選、聚合、排序等操作。
  • 在需要高效並行處理和內建優化的情況下,DataFrame 更為適合。

相對於RDD的優缺點

  • 優點:
    • 提供了更高級的抽象和更簡潔的 API,易於使用和理解。
    • 具有內置的優化功能,能夠自動優化和提高效率。
  • 缺點:
    • 對於某些特定場景下的低層次操作,DataFrame 可能無法提供足夠的靈活性。

Stage, Executor, Driver

  • spark context:
  • cluster manager: 資源管理器收到請求,會在滿足條件的 worker node 上建立 executor
  • executor: 他就是一個獨立的JVM process,單個節點的執行進程,裡面會有多個Task線程
  • Driver: 他就是我們寫的spark應用程式,創建sparkcontext或sparksession,driver會和cluster manager通信,分配task道executor上。
    • 會根據sparkcontext中的資源需求,向resource manager申請資源,包括executor數量與內存。

Fault Tolerance

If we want our system to be fault tolerant, it should be redundant because we ==require a redundant component to obtain the lost data==. The faulty data recovers by redundant data.

Cluster 容錯

Worker 異常

當 Worker 異常,Master 會透過心跳機制發現,並且移除異常的 Worker,然後重新啟動 Executor。

  • Spark Worker會保持和Master的心跳,當Worker出現逾時時,Master呼叫timeOutDeadWorkers()方法進行處理,移除逾時的Worker
  • 移除時會通知Driver Executor已經移除(Executor異常處理詳見下文),然後設定運行在目前Worker上的Driver重啟或直接刪除:

Executor 異常

當 Executor 異常,會會重試三次,若仍是無法,則嘗試取得可用的 Worker 節點並重新啟動 Executor。

  • Executor發生異常時,外部的包裝類別ExecutorRunner會把異常訊息傳送給Worker
  • 然後Worker會傳訊息給Master
  • Master 接收 Executor 狀態變更訊息後,如果發現 Executor 出現異常退出,則呼叫 Master.schedule 方法,嘗試取得可用的 Worker 節點並重新啟動 Executor。

簡單來說,如果因為Worker異常而導致Stage失敗,就會觸發RDD容錯機制,會對運行失敗的Stage進行重試,預設三次。

Job Task 容錯

RDD Lineage:

  • 基於RDD的各項transformation構成compute chain,這樣在計算結果丟失時可以根據lineage重新計算。
  • 窄依賴的情況下,因為每個父RDD Partition依賴於特定的子RDD Partition,重新計算時可以直接使用這個子RDD Partition的數據,沒有Redundant Computation(冗餘計算)。
  • 寬依賴的情況下,當丟失整一個子RDD Partition時,因為多個父RDD Partition可能會依賴於這個子RDD Partition,因此Spark必須將與該子RDD相關的所有父RDD都重新計算
    • 因此如果compute chain很長的寬依賴情況下,建議做一次Checkpoint或是cache來先做緩存,減少執行開銷。

Q: 什麼是Redundant Computation(冗餘計算)?
主要常發生在寬依賴的情況下,因為每個父RDD Partition依賴多個子RDD Partition,重新計算時需要重新計算所有子RDD Partition,這樣就會造成Redundant Computation(冗餘計算)。

補充:Spark’s Interactive Consoles

如果以 interactive mode 啟動 Spark,就是間接的建立管理 Spark Application 的SparkSession。

1
2
3
4
./bin/pyspark # Python console: starts an interactive Spark application 
./bin/spark-shell # Scala Console: access the Scala console to start an interactive sesion
./bin/spark-sql # SQL console
./bin/spark-submit # submit an application to a cluster 他可以幫你把程式打包