一、Spark框架介紹
Spark是一種快速、通用、可擴展的數據處理引擎,可以輕鬆地處理大型數據集。Spark最初由加州大學伯克利分校的AMPLab開發,目的是為了解決Hadoop在處理迭代式算法時性能問題的限制。Spark的基本概念和功能與Hadoop類似,但是比Hadoop更快、更通用、更容易使用。Spark包含了Spark SQL、Spark Streaming、MLlib和GraphX等模塊。
二、Spark運行環境
Spark可以在本地機器或分布式集群上運行。在分布式集群上運行時,Spark需要一個主節點和多個工作節點。Spark通過集群管理器來分配任務和資源。Spark支持多種集群管理器,包括Standalone、Hadoop YARN和Apache Mesos。
三、Spark運行流程
1. Spark作業提交
Spark作業首先由應用程序啟動後,在驅動程序中創建。驅動程序是Spark應用的主要進程,負責協調集群上的所有任務。驅動程序通過Spark上下文對象(SparkContext)與集群進行通信,Spark上下文對象封裝了與集群管理器的通信過程和對應用程序可見的數據、計算資源。因此,一個Spark應用程序只有一個SparkContext。
Spark作業提交有兩種方式:直接使用spark-submit命令或使用程序內部的SparkConf對象提交。SparkConf對象用於配置Spark應用程序的運行環境。可以將SparkConf對象傳遞給SparkSession創建方法來創建SparkSession對象,獲得SparkContext。
2. DAG創建
Spark應用程序的執行過程是通過有向無環圖(DAG)來描述的。Spark中的每個操作都可以看作是RDD(彈性分布式數據集)之間的轉換,每個RDD有一定數量的分區,每個分區存儲一個數據塊。
Spark應用程序將創建一個DAG來表示RDD之間的依賴關係。DAG中的每個節點表示一個分區,每個節點都有一個父節點,表示RDD之間的依賴關係。DAG中的一個葉子節點表示一個輸入源或數據集,根節點表示一個輸出操作。例如,一個Map、Reduce或Join操作。
3. Stage劃分
DAG會被劃分成不同的Stage。一個Stage包含一組可以一起計算的任務。所有任務都依賴於相同的RDD分區,可以並行計算。Spark會將DAG中的RDD分區按照依賴關係劃分成多個Stage。
4. Task劃分
Spark將每個Stage劃分成多個Task,用於並行計算Stage中的RDD分區,每個Task在單個分區上執行,並返回一個或多個結果以便與其他Task合併。
Task我們可以理解為一個計算單元,一個Task對應着多個相同計算邏輯的Work。Work是Spark中真正執行Task的計算單元,每個Task可能需要被計算多次,每次被計算的Work數量就是Task的分片數(就是Spark中的Task concurrency)。Task被劃分的數量決定了公平性和並行度之間的平衡。
5. 任務執行
Spark為每個Task分配CPU時間和內存,並將Task發送到工作節點上的執行器(Executor)進行計算。Spark應用程序驅動程序將Task發送給集群管理器,集群管理器將Task發送給執行器,在執行器上執行Task,執行結果將被返回給集群管理器,最後將結果返回給驅動程序。
在執行期間,執行器通過網絡從驅動程序中獲取Task、數據和依賴項,並將計算結果發送回驅動程序的Spark上下文中。
6. 持久化(緩存)
由於RDD是高度可重用的數據結構,因此Spark允許將RDD保留在內存中以加速處理。可以使用cache()或persist()將重複使用的RDD緩存到內存中。Spark還支持將RDD存儲在磁盤或其他外部存儲器中,以便持久化目的。
四、Spark代碼示例
1. SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("example-app") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
2. RDD創建
from pyspark import SparkContext
sc = SparkContext("local", "Simple App")
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
3. RDD轉換
from pyspark import SparkContext
sc = SparkContext("local", "Simple App")
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData2 = distData.filter(lambda x: x % 2 == 0)
4. RDD操作
from pyspark import SparkContext
sc = SparkContext("local", "Simple App")
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData2 = distData.filter(lambda x: x % 2 == 0)
sum = distData2.reduce(lambda x, y: x + y)
5. 緩存
from pyspark import SparkContext
sc = SparkContext("local", "Simple App")
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData.cache()
五、總結
本文主要介紹了Spark的基本概念、運行環境和運行流程,包括Spark作業提交、DAG創建、Stage劃分、Task劃分、任務執行和持久化等內容。同時,本文提供了Spark代碼示例以便讀者更好地理解Spark的運行流程和使用方法。
原創文章,作者:MYTCL,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/333946.html