一、數據傾斜的概念
數據傾斜是指在某些任務執行過程中,數據被分配到不同的處理節點上,但是某些節點上的數據過多,造成這些節點的負載壓力過大,其他節點的負載並不高,造成了任務執行效率低下的情況。
我們常見的大數據處理技術,如Hive、Spark等,當數據傾斜發生時,任務的執行時間將會大大增加,因為數據傾斜產生的節點將會成為任務瓶頸,導致任務難以完成。
在Hive運行時發生數據傾斜的主要原因有:數據分布不均、Join操作中分桶列不相同、分桶列數據分布不均等。因此,我們需要針對這些原因進行改進和優化,才能有效避免數據傾斜帶來的問題。
二、數據傾斜的解決方案
1. 數據分布不均
數據傾斜最常見的情況就是數據分布不均,一個或幾個分區的數據量過大導致負載嚴重不平衡。我們可以採取以下措施:
(1)把存儲和計算分離,先對數據進行採樣,然後把採樣的結果放到一個中間表中,再進行計算操作。
示例代碼:
create table sample_table as select * from original_table tablesample (10 percent); create temporary table temp_table as select /*+ mapjoin(a) */ a.*, b.xxx from sample_table a left join big_table b on a.id=b.id; insert into result_table select /*+ mapjoin(a) */ a.*, b.xxx from original_table a left join temp_table b on a.id=b.id;
(2)通過調整分區和使用動態分區將數據均勻分散到各個節點上。
示例代碼:
insert overwrite table target_table partition(date) select * where date='2021-01-01';
2. Join操作中分桶列不相同
當兩個表通過Join操作進行連接時,若連接的列不是分桶列,會導致數據傾斜。若連接的列是分桶列但分桶列不相同,同樣會導致數據傾斜。我們可以通過以下措施解決:
(1)讓連接的列也成為相同的分桶列。
示例代碼:
set hive.optimize.bucketmapjoin=true; set hive.optimize.bucketmapjoin.sortedmerge=true; set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; create table raw_table(...) clustered by (id) into 3 buckets; create table result_table(...) clustered by (id) into 3 buckets; insert overwrite table result_table select /*+ mapjoin(a) */ a.*, b.xxx from raw_table a join raw_table b on a.id=b.id and a.dt=b.dt;
(2)使用Map Join或者Sort Merge Join。
示例代碼:
set hive.optimize.bucketmapjoin=true; set hive.optimize.bucketmapjoin.sortedmerge=true; create temporary table temp_table as select /*+ mapjoin(a) */ a.*, b.xxx from table_a a join table_b b on a.id=b.id; insert overwrite table result_table select /*+ mapjoin(a) */ a.*, b.xxx from table_a a left join temp_table b on a.id=b.id;
3. 分桶列數據分布不均
如果分桶列的數據分布不均,同樣會導致數據傾斜。我們可以使用以下方法解決:
(1)增加分桶數。
示例代碼:
set hive.enforce.bucketing=true; set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.optimize.bucketmapjoin=true; create table test_a(...) clustered by (id) into 10 buckets; create table result_table(...) clustered by (id) into 10 buckets; insert overwrite table result_table select /*+ mapjoin(a) */ a.*, b.xxx from test_a a left join table_b b on a.id=b.id;
(2)對分桶列進行求模操作。
示例代碼:
create table tab1 (key int, value string) partitioned by (ds string, dt string) clustered by (key) into 10 buckets stored as orc; create table tab2 (key int, value string) partitioned by (ds string, dt string) clustered by (key) into 10 buckets stored as orc; insert into table tab1 partition (ds='2019-10-01', dt='2019-10-01') values(10,'a'); insert into table tab1 partition (ds='2019-10-01', dt='2019-10-01') values(11,'b'); insert into table tab1 partition (ds='2019-10-02', dt='2019-10-02') values(12,'c'); insert into table tab1 partition (ds='2019-10-02', dt='2019-10-02') values(13,'d'); insert into table tab1 partition (ds='2019-10-03', dt='2019-10-03') values(14,'e'); insert into table tab1 partition (ds='2019-10-03', dt='2019-10-03') values(15,'f'); insert into table tab2 partition (ds='2019-10-01', dt='2019-10-01') values(23,'x'); insert into table tab2 partition (ds='2019-10-02', dt='2019-10-02') values(25,'y'); insert into table tab2 partition (ds='2019-10-03', dt='2019-10-03') values(27,'z'); set mapred.reduce.tasks=10; select * from tab1 a join tab2 b on a.key%10=b.key%10 and a.ds=b.ds and a.dt=b.dt;
三、總結
數據傾斜問題一直是大數據處理領域的難點和痛點,解決數據傾斜問題關乎整個大數據技術的發展和應用。在Hive運行中,可以採取對數據分布、Join操作、分桶列等方面的優化來解決數據傾斜問題。我們需要對各種優化方法進行不斷的總結和實踐,以期達到更優秀的處理效果。
原創文章,作者:VWTPP,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/363817.html