spark讀取hbase提速的方法:spark讀取hbase數據速度

簡介

POLARDB數據庫是阿里雲自研的下一代關係型雲數據庫,100%兼容MySQL,性能最高是MySQL的6倍,但是隨着數據量不斷增大,面臨著單條SQL無法分析出結果的現狀。X-Pack Spark為數據庫提供分析引擎,旨在打造數據庫閉環,藉助X-Pack Spark可以將POLARDB數據歸檔至列式存儲Parquet文件,一條SQL完成複雜數據分析,並將分析結果迴流到業務庫提供查詢。本文主要介紹如何使用X-Pack Spark數據工作台對POLARDB數據歸檔。

業務架構

業務需要對多張表出不同緯度,按天、按月的報表並對外提供查詢服務;最大表當前500G,數據量還在不斷的增加。嘗試過spark直接通過jdbc去分析POLARDB,一方面比較慢,另外一方面每次掃全量的POLARDB數據,對在線業務有影響。基於以下幾點考慮選擇POLARDB+Spark的架構:

  • 選擇POLARDB按天增量歸檔到spark列存,每天增量數據量比較少,選擇業務低峰期歸檔,對在線查詢無影響
  • 選擇Spark作為報表分析引擎,因為Spark很適合做ETL,且內置支持數據迴流到POLARDB、MongoDB等多種在線庫
  • 選擇Spark離線數倉作為數據的中轉站,對於分析的結果數據迴流到在線庫提供查詢,能夠一條Spark SQL完成分析,不需要按維度值拆分多條分析SQL
X-Pack Spark歸檔POLARDB數據做分析

前置條件

1. 設置Spark訪問POLARDB白名單

Spark集群和POLARDB需在同一個VPC下才能訪問,目前X-Pack Spark上還不支持一鍵關聯POLARDB數據庫,需要將Spark集群的IP加到POLARDB白名單中。後續將會開放一鍵關聯POLARDB的功能。

在「HBase控制台」->「集群列表」中找到分析Spark實例,在「數據庫連接」欄中找到「VSwitch ID」交換機ID,如下圖:

X-Pack Spark歸檔POLARDB數據做分析

然後在「專有網絡VPC控制台」->”交換機”搜索交換機實例ID,查詢到IPV4網段。

X-Pack Spark歸檔POLARDB數據做分析

將Spark集群網絡加入到POLARDB白名單,進入「控制台」->「集群列表」找到所要關聯的POLARDB實例,然後在「基本信息」->「訪問信息」->「白名單」加入Spark集群所屬網段。

X-Pack Spark歸檔POLARDB數據做分析

2. 創建測試表

POLARDB中已經存在測試表,如果沒有可登錄POLARDB數據庫創建測試表,下文也以該測試表為例。

CREATE TABLE IF NOT EXISTS test.us_population (
 state CHAR(2) NOT NULL PRIMARY KEY,
 city VARCHAR(10),
 population INTEGER, 
 dt TIMESTAMP );
INSERT INTO test.us_population VALUES('NY','New York',8143197, CURRENT_DATE );
INSERT INTO test.us_population VALUES('CA','Los Angeles',3844829, CURRENT_DATE);
INSERT INTO test.us_population VALUES('IL','Chicago',2842518, '2019-04-13');
INSERT INTO test.us_population VALUES('TX','Houston',2016582, '2019-04-14');
INSERT INTO test.us_population VALUES('PA','Philadelphia',1463281, '2019-04-13');
INSERT INTO test.us_population VALUES('AZ','Phoenix',1461575, '2019-04-15');
INSERT INTO test.us_population VALUES('SA','San Antonio',1256509, CURRENT_DATE);
INSERT INTO test.us_population VALUES('SD','San Diego',1255540, CURRENT_DATE);
INSERT INTO test.us_population VALUES('DL','Dallas',1213825, '2019-04-15');
INSERT INTO test.us_population VALUES('SJ','San Jose',912332,'2019-04-15');

一、使用交互式工作台歸檔數據(調試、測試)

創建Spark運行會話

在”HBase控制台”->”會話管理”創建會話,指定會話名稱和執行集群,如圖:

X-Pack Spark歸檔POLARDB數據做分析

在編輯器中輸入Spark啟動參數,並運行會話,以便在交互式查詢中使用。

--driver-memory 1G 
--driver-cores 1
--executor-cores 1
--executor-memory 2G
--num-executors 1
--name spark_on_polardb
--jars /spark-demo/mysql-connector-java-5.1.34.jar

參數說明:

X-Pack Spark歸檔POLARDB數據做分析

註:上述參數在測試環境中給定偏小,大數據量時根據實際集群規格和數據量進行配置

會話運行成功後如下圖所示:

X-Pack Spark歸檔POLARDB數據做分析

交互式查詢歸檔數據

創建Spark映射POLARDB表

進入”HBase控制台”->”交互式查詢”,在會話列表中選擇上一步創建會話「spark_on_polardb」,然後新建查詢,指定查詢名稱,選擇查詢類型為「SQL」類型,如圖:

X-Pack Spark歸檔POLARDB數據做分析

在查詢輸入框中輸入Spark建表語句,與POLARDB表進行關聯,建表語句為:

create table spark_polordb
using org.apache.spark.sql.jdbc
options (
 driver "com.mysql.jdbc.Driver",
 url "jdbc:mysql://pc-xxx.rwlb.rds.aliyuncs.com:3306",
 dbtable "test.us_population",
 user 'xxx',
 password 'xxxxxx'
)

參數說明:

X-Pack Spark歸檔POLARDB數據做分析

點擊運行,查詢狀態為「success」時表明創建成功。

X-Pack Spark歸檔POLARDB數據做分析

查詢測試

在上步創建查詢編輯器中輸入查詢語句,然後運行:

SELECT * FROM spark_polordb

查詢成功後返回結果如圖:

X-Pack Spark歸檔POLARDB數據做分析

創建歸檔表

X-Pack Spark將POLARDB數據歸檔至Parquet列式存儲格式中,一方面能夠獲取更優的壓縮空間,另一方面後續分析任務中具有更高的效率。

Spark創建parquet分區表語句如下,同樣在第一步中交互式查詢編輯中輸入:

CREATE table parquetTable(state CHAR(2), city VARCHAR(10), population int)
USING parquet
PARTITIONED BY(dt timestamp)

參數說明:

X-Pack Spark歸檔POLARDB數據做分析

建表成功後,可以將POLARDB數據寫入至Parquet表。

X-Pack Spark歸檔POLARDB數據做分析

歸檔數據

將POLARDB數據查詢出寫入parquet表即可完成數據歸檔,操作語句為:

INSERT INTO parquetTable partition(dt) SELECT state, city, population, dt FROM spark_polordb

運行成功後數據歸檔完成。查詢parquet表數據:

X-Pack Spark歸檔POLARDB數據做分析

二、工作流調度周期歸檔(生產T+1歸檔)

交互式查詢主要用來測試調試,歸檔一般需要做t+1的操作,每天定期把當前的數據做歸檔,這就需要使用工作流的周期調度,下面具體介紹如何使用工作流的周期調度實現t+1的歸檔。

歸檔代碼編寫

使用工作流之前需要創建對應的Spark作業,Spark歸檔POLARDB可以實現一個完整作業,包括以下流程:

  1. 在Spark中創建POLARDB表映射表(前提POLARDB中表已經存在)
  2. 創建Spark分區歸檔表
  3. 將數據寫入歸檔表

雲Spark提供了Spark歸檔POLARDB的代碼DEMO,請參考github:SparkArchivePolarDB

具體歸檔代碼需結合實際場景,歸檔不同表,設置特定分區和歸檔條件等。

上傳Spark歸檔作業資源

將打成jar包的spark歸檔demo代碼通過資源管理上傳至資源列表,jar包下載地址:Spark歸檔工具DEMO下載

自己編寫的Spark作業同樣需要打成jar包後上傳至資源列表,後面作業需要運行jar包中歸檔作業。

創建Spark作業

進入「HBase控制台」->”數據工作台”->「作業管理」->「創建作業」, 如圖

X-Pack Spark歸檔POLARDB數據做分析

編輯作業內容

作業內容中主要指定了Spark作業運行參數,以及具體的歸檔作業編碼類和傳入參數等,以SparkArchivePolarDB demo為例:

--class com.aliyun.spark.polardb.SparkOnPolarDBArchive
--driver-memory 1G 
--driver-cores 1
--executor-cores 1
--executor-memory 2G
--num-executors 1
--jars /spark-demo/mysql-connector-java-5.1.34.jar
/spark-demo/spark-examples-0.0.1-SNAPSHOT.jar
pc-xxx.rwlb.rds.aliyuncs.com:3306 test.us_population username passwd sparkTestPolarDB

參數說明:

X-Pack Spark歸檔POLARDB數據做分析

其餘參數可參見上述章節介紹

作業配置如圖:

X-Pack Spark歸檔POLARDB數據做分析

運行作業並查看結果

作業運行後一段時間可以查看到運行狀態,成功後可在交互式查詢中查看歸檔表數據。

X-Pack Spark歸檔POLARDB數據做分析

進入交互式工作台,使用可參考上述介紹,查看歸檔表數據:

X-Pack Spark歸檔POLARDB數據做分析

配置工作流

進入「HBase控制台」->「數據工作台」->「工作流」,選擇新建工作流,指定工作流名稱、描述和執行集群,

X-Pack Spark歸檔POLARDB數據做分析

然後進入工作流設計工作台,拖動Spark作業並進行配置,選擇上一步配置作業並連線:

X-Pack Spark歸檔POLARDB數據做分析

選擇”工作流配置”->”調度屬性”,開啟調度狀態並設置其實時間和調度周期,工作流即將進行周期性調度,如圖:

X-Pack Spark歸檔POLARDB數據做分析

三、歸檔方式(產出表的形式)

全量歸檔

全量歸檔方式主要用來對原庫中歷史數據進行歸檔或者針對數據量比較小的表,歸檔步驟如下:

  1. 使用Spark的jdbc datasource創建POLARDB的映射表;
  2. 在Spark中創建相同表結構的歸檔表,歸檔表使用Parquet列式存儲,能夠最大化節約存儲空間,並加速分析性能;
  3. 通過映射表讀取POLARDB數據並寫入Spark歸檔表,注意寫入時保證字段順序一致。

創建歸檔表時如果表數據量較大,可以創建分區表。分區策略一般分為時間分區和業務分區:

  • 時間分區易於使用,即將相同時間的數據歸檔到同一個目錄,比如選擇按年或者按天進行時間分區,在分析時限定數據分區即可過濾掉與分析任務無關的數據。
  • 業務分區字段需要具有有限的類別,比如性別、年齡、部門等。業務分區需要結合具體業務進行考慮,分區個數不宜過多,spark默認最大分區數為1000。
  • 分區方式可以選擇靜態分區和動態分區,默認使用靜態分區,即寫入數據時必須指定寫入哪個分區,動態分區需要將hive.exec.dynamic.partition.mode設置為nonstrict,寫入時根據具體分區字段值動態創建分區,相同partition key值寫入同一個分區。

使用示例可參考:SparkOnPolarDBArchivedemo

增量歸檔

業務數據僅增量

在業務表中數據不存在更新和刪除的操作,僅僅是向數據表中增量寫入,這種情況下只需要在數據表中記錄數據入庫時間或者其他標記記錄新增數據,在Spark中使用工作流周期調度,傳入增量數據條件,定期將新增數據歸檔只Spark中即可。

業務數據更新

針對業務數據存在更新的數據,如果原表中無法辨別更新的數據,目前只能通過全量歸檔的方式每次對全量數據進行一次歸檔,將原歸檔表數據進行overwrite;如果存在更新數據標記,如update_time字段,由於Spark目前不支持ACID,無法使用merge..into功能直接更新已有數據,增量更新歸檔步驟如下:

  1. 設置更新增量數據選擇條件(歸檔表全量歸檔時已創建),如update_time大於某個日期;
  2. 抽取增量更新的數據寫入spark臨時表;
  3. 將歷史數據歸檔表與增量更新數據表進行left out join並過濾出增量表字段為空的數據,表示歷史數據中未參與增量更新的數據,然後與增量更新的數據進行union合併,寫入Spark臨時表;
  4. 將臨時表數據覆蓋寫入到歸檔表中作為新的歸檔數據參與後續業務分析。

Spark更新增量歸檔目前只能使用join關聯方式遍歷所有數據完成數據更新,但好處是盡量避免影響在線庫POLARDB的數據訪問,每次只讀取更新和增量的部分數據,將計算工作放在廉價的Spark集群中。

使用示例可參考:SparkOnPolarDBIncrement

另一種方式:如果在業務側需要保留多個版本更新的數據,可以直接將更新和增量的數據追加到歸檔表中,然後在業務側通過最新時間判斷出有效的數據,可以避免每次更新時複雜計算過程。

業務數據更新刪除

業務表中如果存在delete,目前Spark沒有較好的辦法進行支持,需要在業務庫記錄刪除的關鍵字段信息,與歸檔表進行join,過濾掉join到的數據然後覆寫到歸檔表中,達到delete的效果。

總結

在進行實際數據開發中,往往需要多個Spark作業配合完成數據歸檔以及分析工作,單個工作流中支持配置多個作業並按序執行,同時配合交互式工作台進行數據驗證,減少很多開發中不便。目前工作台仍在不斷優化中,在使用中遇到不便之處可隨時提出建議,便於簡化您的數據開發工作。

原創文章,作者:投稿專員,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/216476.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
投稿專員的頭像投稿專員
上一篇 2024-12-08 22:18
下一篇 2024-12-08 22:18

相關推薦

發表回復

登錄後才能評論