SparkSQLdistinct分析优化总结
⽬录
Spark count distinct原理
由于distinct过程会导致数据膨胀,导致shuffle、reduce双端数据倾斜,因此distinct算⼦操作特别慢
distinct慢的主要原因:
数据膨胀原理:
select
count(distinct id),
count(distinct name)
from table_a
1. distinct算⼦在处理过程中是将distinct后的字段和group by字段共同作为key传⼊reduce,导致shuffle前map阶段没有预聚合,同
时shuffle时⽹络传输数据量过⼤消耗增加,对reduce处理时负载也增⼤
2. distinct算⼦在处理过程中会将原有数据膨胀,有N个DISTINCT关键字数据就会在map端膨胀N倍,同时对shuffle和reduce的长尾
影响(原因1)也会扩⼤N倍
作业分析:此处有7个count distinct 操作,导致数据膨胀了7倍37,758,58037,758,580
优化⽅案:
当要算uv时需要⽤到count(),count(DISTINCT),可以分成两步计算
1. 先根据group by后的维度与distinct后字段共同作为维度聚合⼀次,计算出某cuid的pv
2. 再根据原有维度聚合,通过SUM(pv),COUNT(cuid)的⽅式避免出现DISTINCT关键字,避免数据膨胀及distinct shuffle的发⽣
样例:
优化前:
SELECT
select distinct fromfield1_all,
field2_all,
field3_all,
field4_all,
field5_all,
count(1) AS pv,
count(distinct uid) as uv,
sum(if(field1 = '1',1,0)) as download_pv,
sum(if(field1 = '2',1,0)) as install_pv,
sum(if(field1 = '3',1,0)) as launch_pv,
count(distinct case when field1 = '1' then cuid else null end) as download_uv,
count(distinct case when field1 = '2' then cuid else null end) as install_uv,
count(distinct case when field1 = '3' then cuid else null end) as launch_uv,
sum(if(field1 = '1' and field2 = '2',1,0)) as download_succ_pv,
sum(if(field1 = '2' and field2 = '13',1,0)) as install_succ_pv,
sum(if(field1 = '3' and field2 = '14',1,0)) as launch_succ_pv,
count(distinct case when field1 = '1' and field2 = '2' then cuid else null end) as download_succ_uv,     
  count(distinct case when field1 = '2' and field2 = '13' then cuid else null end) as install_succ_uv,        count(distinct case when field1 = '3' and field2 = '14' then cuid else null end) as launch_succ_uv    FROM
(
SELECT
field1,
uid,
field2,
field3,
field4,
field5,
field6
FROM table
WHERE day = '{DATE}'
AND id = 'xxx'
AND from = 'xxx'
) tbl_1
LATERAL VIEW explode(array(field1, 'all')) A AS field1_all
LATERAL VIEW explode(array(field2, 'all')) B AS field2_all
LATERAL VIEW explode(array(field3, 'all')) C AS field3_all
LATERAL VIEW explode(array(field4, 'all')) D AS field4_all
LATERAL VIEW explode(array(field5, 'all')) D AS field5_all
GROUP BY
field1_all,
field2_all,
field3_all,
field4_all,
field5_all
优化后:
执⾏时间从30h缩短到5h,shuffle阶段数据量降低10倍左右
SELECT
field1_all,
field2_all,
field3_all,
field4_all,
field5_all,
sum(cnt) AS pv,
sum(case when uv > 0 then 1 else 0 end) as uv,
sum(download_pv) as download_pv,
sum(install_pv) as install_pv,
sum(launch_pv) as launch_pv,
sum(case when download_uv > 0 then 1 else 0 end) as download_uv,
sum(distinct case when install_uv > 0 then 1 else 0 end) as install_uv,
sum(distinct case when launch_uv > 0 then 1 else 0 end) as launch_uv,
sum(download_succ_pv) as download_succ_pv,
sum(install_succ_pv) as install_succ_pv,
sum(launch_succ_pv) as launch_succ_pv,
sum(case when download_succ_uv > 0 then 1 else 0 end) as download_succ_uv,
sum(case when install_succ_uv > 0 then 1 else 0 end) as install_succ_uv,
sum(case when launch_succ_uv > 0 then 1 else 0 end) as launch_succ_uv
FROM (
SELECT
field1_all,
field2_all,
field3_all,
field4_all,
field5_all,
count(1) AS cnt,
count(uid) as uv,
sum(if(field1 = '1',1,0)) as download_pv,
sum(if(field1 = '2',1,0)) as install_pv,
sum(if(field1 = '3',1,0)) as launch_pv,
count(case when field1 = '1' then 1 else null end) as download_uv,
count(case when field1 = '2' then 1 else null end) as install_uv,
count(case when field1 = '3' then cuid else null end) as launch_uv,
sum(if(field1 = '1' and field2 = '2',1,0)) as download_succ_pv,
sum(if(field1 = '2' and field2 = '13',1,0)) as install_succ_pv,
sum(if(field1 = '3' and field2 = '14',1,0)) as launch_succ_pv,
count(case when field1 = '1' and field2 = '2' then 1 else null end) as download_succ_uv,        count(case when field1 = '2' and field2 = '13' then 1 else null end) as install_succ_uv,        count(case when field1 = '3' and field2 = '14' then 1 else null end) as launch_succ_uv
FROM
(
SELECT
field1,
uid,
field2,
field6,
field3,
field4,
field5
FROM table
WHERE day = '{DATE}'
AND id = 'xxx'
AND from = 'xxx'
GROUP BY
uid,
field1,
field2,
field3,
field4,
field5
) t1
LATERAL VIEW explode(array(field1, 'all')) A AS field1_all
LATERAL VIEW explode(array(field2, 'all')) B AS field2_all
LATERAL VIEW explode(array(field3, 'all')) C AS field3_all
LATERAL VIEW explode(array(field4, 'all')) D AS field4_all
LATERAL VIEW explode(array(field5, 'all')) D AS field5_all    GROUP BY
field1_all,
field2_all,
field3_all,
field4_all,
field5_all,
uid
) t2
GROUP BY
field1_all,
field2_all,
field3_all,
field4_all,
field5_all;
参考:
《阿⾥巴巴⼤数据之路》P269