Apache Spark是一個為大數據處理而設計的分散式計算系統,它可以運行在Hadoop集群之上,也可以獨立部署。而PySpark是Spark的Python API,提供了易用性和靈活性,是進行數據處理和分析的優秀選擇。
一、環境搭建
在開始學習PySpark之前,需要安裝Python以及Spark,這裡提供兩種安裝方式。第一種是使用Anaconda,它是一個開源的Python發行版,可以安裝Python以及眾多常用庫。第二種是手動安裝Python和Spark。這裡需要注意,PySpark需要跟Spark版本匹配。
下面是Anaconda環境下安裝PySpark指南:
conda create -n pyspark python=3.7
conda activate pyspark
conda install pyspark
手動安裝Python和Spark,需要先下載好對應版本的Python和Spark。然後按照以下步驟執行:
# 安裝Python
tar -zxvf Python-3.7.10.tgz
cd Python-3.7.10
./configure --prefix=/usr/local/python3.7
make && make install
# 安裝Spark
tar -zxvf spark-3.1.2-bin-hadoop3.2.tgz
mv spark-3.1.2-bin-hadoop3.2 /usr/local/spark
注意在安裝完Spark之後,還需要配置環境變數:
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
二、RDD基礎
Resilient Distributed Dataset(RDD)是Spark的核心數據結構。它具有高度的容錯性和可靠性,並且可以在內存中緩存數據,提高處理效率。
在PySpark中創建一個RDD,需要指定一個集合或者文件作為數據源。例如,下面的代碼將一個文本文件中的行讀入並創建一個RDD:
from pyspark import SparkContext
sc = SparkContext("local", "WordCount")
lines = sc.textFile("file:///usr/local/spark/README.md")
上面代碼指定了一個本地模式的SparkContext,並將文件/usr/local/spark/README.md
中的行讀入為RDD。在創建RDD後,可以使用如下的操作來操作它:
- map: 對RDD中每個元素執行一個函數,返回一個新的RDD。例如:
words = lines.flatMap(lambda line: line.split(" "))
- filter: 對RDD中每個元素執行一個函數,返回該函數返回值為True的元素組成的新RDD。例如:
filteredWords = words.filter(lambda word: len(word) > 5)
- reduceByKey: 根據RDD中的Key對Value執行聚合操作。例如:
wordCount = filteredWords.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)
- collect: 將RDD中的所有元素都收集到Driver端並返回一個列表。例如:
result = wordCount.collect()
三、DataFrame和SQL
除了RDD之外,在Spark中還有一種強大的數據結構:DataFrame。它是一種以列為基本操作對象的數據結構,提供了一系列的列轉換和過濾操作。同時,Spark還提供了類SQL查詢的API,可以通過SparkSession使用。
下面兩段代碼分別是如何創建和使用DataFrame和Spark SQL進行查詢:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()
# 從CSV文件讀入DataFrame
df = spark.read.csv("file:///usr/local/spark/examples/src/main/resources/people.csv", header=True, inferSchema=True)
# 顯示前20行
df.show(20)
# 計算平均年齡
avgAge = df.select("age").groupBy().mean().collect()[0][0]
print("Average Age: {:.2f}".format(avgAge))
# 註冊為臨時表
df.createOrReplaceTempView("people")
# SQL查詢
result = spark.sql("SELECT name, age FROM people WHERE age > 30")
# 顯示結果
result.show()
四、機器學習
最後,我們來介紹一下PySpark中的機器學習庫,它提供了多種常見的機器學習演算法,包括分類、回歸、聚類等。
下面是一個利用PySpark進行邏輯回歸分類的例子:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import SparkSession
# 讀入數據集
spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()
data = spark.read.format("csv").option("header", True).option("inferSchema", True).load("file:///usr/local/spark/examples/src/main/resources/student_scores.csv")
# 將特徵轉換為向量
assembler = VectorAssembler(inputCols=["math", "physics"], outputCol="features")
data = assembler.transform(data)
# 劃分訓練集和測試集
train, test = data.randomSplit([0.7, 0.3])
# 建立邏輯回歸模型
lr = LogisticRegression()
# 設置調參參數
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.1]).build()
# 交叉驗證
crossval = CrossValidator(estimator=lr,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=5)
model = crossval.fit(train)
# 預測
result = model.transform(test)
# 評估
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("AUC: {:.2f}".format(evaluator.evaluate(result)))
五、總結
本篇文章主要介紹了PySpark的基本知識和常用操作。首先,我們介紹了環境搭建的兩種方法,並給出了相關的代碼。接著,我們講述了RDD這一核心數據結構的相關操作,並給出了相應的代碼。然後,我們介紹了DataFrame和Spark SQL的使用方法,並提供了相應的代碼。最後,我們介紹了機器學習部分,包括建立模型、調參和評估等,並給出了相應的代碼。掌握了這些知識後,讀者可以利用PySpark進行大規模數據處理和機器學習。
原創文章,作者:YXDZL,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/335127.html