RDD轉換為DataFrame全方位解析

一、RDD轉換成DataFrame

RDD(Resilient Distributed Datasets)和DataFrame都是Spark中最常見的數據結構。在處理大數據時,RDD可以存儲分散式數據,並提供了許多操作函數來加速大規模數據分析。而DataFrame則是由SparkSQL引入的,它是一種基於RDD的抽象概念,提供了更多高層次數據處理的能力。Spark 2.0後,官方推薦使用DF和DS,而不是RDD。

RDD轉換成DataFrame的目的是將數據從一個不可變的RDD轉換為一個可變的Dataframe。這樣做有很多優勢,如使用SparkSQL更好地進行數據處理、可以使用標準的SQL語句、可以使用DataFrame API,這些API提供了更高級別的操作,使代碼更加簡潔易懂。

二、RDD轉換為DataFrame的重要性

RDD轉換成DataFrame是一種優化方法。這是因為SparkSQL充分使用了RDD的特點,如分散式計算、數據隨機訪問等。在SparkSQL中,DataFrame使用更加便捷,可以通過表名稱、列和行進行數據訪問。SparkSQL允許多種類型的數據源,如JSON、CSV、Hive表等,這使得開發人員可以無縫地使用不同的數據源進行數據分析。

此外,通過 RDD 轉換成 DataFrame,可以更加靈活地操作數據。RDD僅僅提供了map、reduce以及其他基本的操作函數,而DataFrame 不僅提供了基礎操作函數,還有類似於 SQL 中的 SELECT、JOIN 等操作函數。此外,DataFrame 操作函數可以直接將數據緩存到內存中,這些高級操作大大簡化了編程的工作。

三、RDD轉換為DataFrame的錯誤

當將 RDD 轉換為 DataFrame 時,經常會出現以下錯誤:

1. 大小寫錯誤

val df = RDD.toDataFrame

上面這行代碼編譯不會報錯,但是運行時會拋出「cannot resolve 『toDataFrame』」 的錯誤,這是因為API名稱應為toDF,而不是toDataFrame。

2. 數據類型錯誤

case class Person(id: String, age: Int, name: String, salary: String)
val peopleRDD = sc.textFile("person.txt").map(_.split(",")).map(p => Person(p(0),p(1),p(2),p(3)))

這段代碼試圖將一個字元串類型轉換為整數類型。嘗試執行DataFrame的df.show()時,會提示一個 IllegalArgumentException 異常。

3. 列名不匹配

case class Person(id: String, age: Int, name: String)
val peopleRDD = sc.textFile("person.txt").map(_.split(",")).map(p => Person(p(0),p(1),p(2)))
val peopleDF = peopleRDD.toDF("ID","Age")

這段代碼的第三行會出現IllegalArgumentException 異常。因為列名要與 Person 類定義的屬性名字一致。

四、RDD轉換為DataFrame的兩種方法

1. 使用SparkSession創建

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("RDDToDataFrame").master("local").getOrCreate()

val rdd = sc.parallelize(Seq((1,"A"),(2,"B"),(3,"C")))

val dfFromRDD = rdd.toDF("Id", "Name")

dfFromRDD.show()

上面的代碼是通過 SparkSession 創建 Dataframe,其中 appName("RDDToDataFrame") 中設置了名稱,「RDDToDataFrame」對應一個 Job(作業),而 master("local") 在本地模式下運行。

2. 使用SparkContext創建

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val sqlContext = new SQLContext(sc)

val rdd = sc.parallelize(Seq((1,"A"),(2,"B"),(3,"C")))

val schema = StructType(Seq(StructField("Id", IntegerType, true),StructField("Name", StringType, true)))

val dfFromRDD = sqlContext.createDataFrame(rdd.map(l => Row.fromTuple(l)), schema)

dfFromRDD.show()

這裡的代碼是使用SparkContext 創建 Dataframe 的,其中的用到的類是 SQLContext。這裡我們還設置了表的結構,即第一個 column 的名稱為 “Id”,數據類型為 Int 類型。第二個 column 的名稱為 “Name”,數據類型為 String 類型。

五、RDD和DataFrame的區別

1. 數據存儲方式不同

RDD 是一種彈性分散式數據集,它將數據分布在集群的每個節點上。DataFrame 是一種分散式數據表格,它以列作為基本單元,並將數據存儲在列式存儲引擎中,它是基於 SparkSQL 的優化設計。

2. 內存佔用不同

RDD 存儲的是一些分散的、冗餘的數據塊,每次讀取數據時都需要重新讀取整個塊。而 DataFrame 對數據進行了高效的編碼和壓縮,每個不同數據類型的列都存儲在不同的內存區塊中,因此 DataFrame 的內存佔用量要比 RDD 低。

3. 訪問方式不同

RDD 的訪問方式是基於分區的,即分區內的數據只能在分區內進行計算,不能跨分區進行計算。而 DataFrame 的訪問方式是基於列的,可以對列進行操作,也可以對多列進行操作,更加靈活便利。

六、RDD轉換DataSet

DataSet 是 Spark 1.6 引入的抽象概念,與 RDD、DataFrame 並列,但 DataSet 內部數據類型可以是任何類型,不像 DataFrame 內部數據類型只能是 Row 類型或者是基本數據類型。

在 Spark 2.0 後, 更新了 DataSet API,包含了 DataFrame API 所有的操作,同時還支持 RDD 中的操作。同時還提供了類型安全的操作支持,因此可以使用強類型Scala和Java 集合來進行 DataSet 操作。

import org.apache.spark.sql.{Dataset, Encoder, Encoders}
import org.apache.spark.SparkContext

val sc = new SparkContext("local[*]", "RDDToDataFrame")

case class Person(Name: String, Age: Int)

val data = Seq(
  Person("Alice", 25),
  Person("Bob", 30),
  Person("Charlie", 35)
)

val rdd = sc.parallelize(data)

implicit val personEncoder: Encoder[Person] = Encoders.product[Person]

val ds = rdd.toDS()

ds.show()

在這裡,我們創建了一個 Person 類。然後,通過將 RDD 轉換為 DataSet,我們可以使用更多 SparkSQL 的操作方法。

七、RDD的轉換和操作方法是什麼?

RDD 提供了很多基本的轉換函數以及操作方法,如map、filter、flatMap、reduceByKey、join等等。這些操作一般都是對 RDD 進行操作,返回一個新的 RDD,這樣可以形成一個轉換鏈。

1. map

map函數用於對 RDD 中的每個元素應用輸入函數。此函數必須為一個函數,接受一個參數,並且返回一個結果。

val x = sc.parallelize(List(1,2,3))
val y = x.map(num => num*num)
y.collect()
批量處理數據類型的轉換,如Int->String。

2. filter

filter函數用於從 RDD 中過濾掉給定函數返回false的元素。

val x = sc.parallelize(List("spark", "hadoop", "spark2"))
val y = x.filter(w => w.contains("spark"))
y.collect()

3. flatMap

flatMap函數類似於map函數,但是flatMap返回的是一組值。

val x = sc.parallelize(List("Hello World", "Another World"))
val y = x.flatMap(line => line.split(" "))
y.collect()

4. reduceByKey

reduceByKey函數用於對相同的RDD Key值進行reduce操作。

val x =  sc.parallelize(Array(("key", 1), ("key", 2), ("key", 3)))
val y = x.reduceByKey(_ + _)
y.collect()

5. join

join函數用於對兩個RDD進行內聯操作。兩個 RDD 中都存在的元素會被連接起來,返回一個 (sourceRDD.Key, (sourceRDD.value, targetRDD.value)) 的結構。

val x = sc.parallelize(Array(("key1", "value1"),("key2", "value2")))
val y = sc.parallelize(Array(("key1", "value4"),("key2", "value5")))
val z = x.join(y)
z.collect()

在RDD轉換為DF的過程中,還有許多API可以使用。其重要性不言而喻,給出的例子只是 SparkSQL 提供的基礎操作。SparkSQL中提供了一組豐富的API讓您可以更加自由地處理數據,您可以進一步學習SparkSQL的官方文檔。

原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/244221.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
小藍的頭像小藍
上一篇 2024-12-12 13:01
下一篇 2024-12-12 13:01

相關推薦

發表回復

登錄後才能評論