何謂數(shù)據(jù)傾斜?數(shù)據(jù)傾斜指的是并行處理的數(shù)據(jù)集中,某一部分(如Spark或Kafka的一個(gè)Partition)的數(shù)據(jù)顯著多于其它部分,從而使得該部分的處理速度成為整個(gè)數(shù)據(jù)集處理的瓶頸。
一、數(shù)據(jù)傾斜概述
1.1 什么是數(shù)據(jù)傾斜
對(duì)Hadoop、Spark、Flink這樣的大數(shù)據(jù)系統(tǒng)來(lái)講,數(shù)據(jù)量大并不可怕,可怕的是數(shù)據(jù)傾斜。
何謂數(shù)據(jù)傾斜?數(shù)據(jù)傾斜指的是,并行處理的數(shù)據(jù)集中,某一部分(如Spark或Kafka的一個(gè)Partition)的數(shù)據(jù)顯著多于其它部分,從而使得該部分的處理速度成為整個(gè)數(shù)據(jù)集處理的瓶頸。
對(duì)于分布式系統(tǒng)而言,理想情況下,隨著系統(tǒng)規(guī)模(節(jié)點(diǎn)數(shù)量)的增加,應(yīng)用整體耗時(shí)線(xiàn)性下降。如果一臺(tái)機(jī)器處理一批大量數(shù)據(jù)需要120分鐘,當(dāng)機(jī)器數(shù)量增加到三時(shí),理想的耗時(shí)為120 / 3 = 40分鐘,如下圖所示
但是,上述情況只是理想情況,實(shí)際上將單機(jī)任務(wù)轉(zhuǎn)換成分布式任務(wù)后,會(huì)有overhead,使得總的任務(wù)量較之單機(jī)時(shí)有所增加,所以每臺(tái)機(jī)器的執(zhí)行時(shí)間加起來(lái)比單臺(tái)機(jī)器時(shí)更大。這里暫不考慮這些overhead,假設(shè)單機(jī)任務(wù)轉(zhuǎn)換成分布式任務(wù)后,總?cè)蝿?wù)量不變。
但即使如此,想做到分布式情況下每臺(tái)機(jī)器執(zhí)行時(shí)間是單機(jī)時(shí)的1 / N,就必須保證每臺(tái)機(jī)器的任務(wù)量相等。不幸的是,很多時(shí)候,任務(wù)的分配是不均勻的,甚至不均勻到大部分任務(wù)被分配到個(gè)別機(jī)器上,其它大部分機(jī)器所分配的任務(wù)量只占總得的小部分。比如一臺(tái)機(jī)器負(fù)責(zé)處理80%的任務(wù),另外兩臺(tái)機(jī)器各處理10%的任務(wù),如下圖所示
1.2 數(shù)據(jù)傾斜發(fā)生時(shí)的現(xiàn)象
• 絕大多數(shù) task 執(zhí)行得都非???,但個(gè)別 task 執(zhí)行極慢。比如,總共有 1000 個(gè) task,997 個(gè) task 都在 1 分鐘之內(nèi)執(zhí)行完了,但是剩余兩三個(gè) task 卻要一兩個(gè)小時(shí)。這種情況很常見(jiàn)。
• 原本能夠正常執(zhí)行的 Spark 作業(yè),某天突然報(bào)出 OOM(內(nèi)存溢出)異常,觀察異常棧,是我們寫(xiě) 的業(yè)務(wù)代碼造成的。這種情況比較少見(jiàn)。
• Task 類(lèi)似下圖所示
總結(jié):
1. 大部分任務(wù)都很快執(zhí)行完,用時(shí)也相差無(wú)幾,但個(gè)別Task執(zhí)行耗時(shí)很長(zhǎng),整個(gè)應(yīng)用程序一直處于99%左右 的狀態(tài)。
2. 一直運(yùn)行正常的Spark Application昨晚突然OOM了。
1.3 數(shù)據(jù)傾斜發(fā)生的原理
數(shù)據(jù)傾斜的原理很簡(jiǎn)單: 在進(jìn)行 shuffle 的時(shí)候,必須將各個(gè)節(jié)點(diǎn)上相同的 key 的數(shù)據(jù)拉取到某個(gè)節(jié)點(diǎn) 上的一個(gè) task 來(lái)進(jìn)行處理,比如按照 key 進(jìn)行聚合或 join 等操作。此時(shí)如果某個(gè) key 對(duì)應(yīng)的數(shù)據(jù)量特 別大的話(huà),就會(huì)發(fā)生數(shù)據(jù)傾斜。比如大部分 key 對(duì)應(yīng) 10 條數(shù)據(jù),但是個(gè)別 key 卻對(duì)應(yīng)了 100 萬(wàn)條數(shù) 據(jù),那么大部分 task 可能就只會(huì)分配到 10 條數(shù)據(jù),然后 1 秒鐘就運(yùn)行完了;但是個(gè)別 task 可能分配 到了 100 萬(wàn)數(shù)據(jù),要運(yùn)行一兩個(gè)小時(shí)。因此,整個(gè) Spark 作業(yè)的運(yùn)行進(jìn)度是由運(yùn)行時(shí)間最長(zhǎng)的那個(gè) task 決定的。
因此出現(xiàn)數(shù)據(jù)傾斜的時(shí)候,Spark 作業(yè)看起來(lái)會(huì)運(yùn)行得非常緩慢,甚至可能因?yàn)槟硞€(gè) task 處理的數(shù)據(jù) 量過(guò)大導(dǎo)致內(nèi)存溢出。
下圖就是一個(gè)很清晰的例子:hello 這個(gè) key,在三個(gè)節(jié)點(diǎn)上對(duì)應(yīng)了總共 7 條數(shù)據(jù),這些數(shù)據(jù)都會(huì)被拉 取到同一個(gè)task中進(jìn)行處理;而world 和 you 這兩個(gè) key 分別才對(duì)應(yīng) 1 條數(shù)據(jù),所以另外兩個(gè) task 只 要分別處理 1 條數(shù)據(jù)即可。此時(shí)第一個(gè) task 的運(yùn)行時(shí)間可能是另外兩個(gè) task 的 7 倍,而整個(gè) stage 的 運(yùn)行速度也由運(yùn)行最慢的那個(gè) task 所決定。
總結(jié):
* 數(shù)據(jù)傾斜發(fā)生的本質(zhì),就是在執(zhí)行多階段的計(jì)算的時(shí)候,中間的shuffle策略可能導(dǎo)致分發(fā)到下 游Task的數(shù)據(jù)量不均勻,進(jìn)而導(dǎo)致下游Task執(zhí)行時(shí)長(zhǎng)的不一致。不完全均勻是正常的,但是如果相差太大,那么就產(chǎn)生性能問(wèn)題了。
1.4 數(shù)據(jù)傾斜的危害
從上圖可見(jiàn),當(dāng)出現(xiàn)數(shù)據(jù)傾斜時(shí),小量任務(wù)耗時(shí)遠(yuǎn)高于其它任務(wù),從而使得整體耗時(shí)過(guò)大,未能充分發(fā) 揮分布式系統(tǒng)的并行計(jì)算優(yōu)勢(shì)。另外,當(dāng)發(fā)生數(shù)據(jù)傾斜時(shí),
少量部分任務(wù)處理的數(shù)據(jù)量過(guò)大,可能造成 內(nèi)存不足使得任務(wù)失敗,并進(jìn)而引進(jìn)整個(gè)應(yīng)用失敗。如果應(yīng)用并沒(méi)有因此失敗,但是大量正常任務(wù)都早 早完成處于等待狀態(tài),資源得不到充分利用。
總結(jié):
1. 整體耗時(shí)過(guò)大(整個(gè)任務(wù)的完成由執(zhí)行時(shí)間最長(zhǎng)的那個(gè)Task決定)
2. 應(yīng)用程序可能異常退出(某個(gè)Task執(zhí)行時(shí)處理的數(shù)據(jù)量遠(yuǎn)遠(yuǎn)大于正常節(jié)點(diǎn),則需要的資源容易出現(xiàn)瓶頸, 當(dāng)資源不足,則應(yīng)用程序退出)
3. 資源閑置(處理等待狀態(tài)的Task資源得不到及時(shí)的釋放,處于閑置浪費(fèi)狀態(tài))
1.5 數(shù)據(jù)傾斜是如何造成的
在 Spark 中,同一個(gè) Stage 的不同 Partition 可以并行處理,而具有依賴(lài)關(guān)系的不同 Stage 之間是串行 處理的。假設(shè)某個(gè) Spark Job 分為Stage0 和 Stage1 兩個(gè) Stage,且 Stage1 依賴(lài)于 Stage0,那 Stage0 完全處理結(jié)束之前不會(huì)處理 Stage1。而 Stage0 可能包含 N 個(gè)Task,這 N 個(gè) Task 可以并行進(jìn)行。如 果其中 N-1 個(gè) Task 都在 10 秒內(nèi)完成,而另外一個(gè) Task 卻耗時(shí) 1 分鐘,那該 Stage 的總時(shí)間至少為 1 分鐘。換句話(huà)說(shuō),一個(gè) Stage 所耗費(fèi)的時(shí)間,主要由最慢的那個(gè) Task 決定。由于同一個(gè) Stage 內(nèi)的所有 Task 執(zhí)行相同的計(jì)算,在排除不同計(jì)算節(jié)點(diǎn)計(jì)算能力差異的前提下,不同 Task 之間耗時(shí)的差異主要由該 Task 所處理的數(shù)據(jù)量決定。Stage 的數(shù)據(jù)來(lái)源主要分為如下兩類(lèi):
1. 數(shù)據(jù)源本身分布有問(wèn)題:從數(shù)據(jù)源直接讀取。如讀取HDFS,Kafka,有可能出現(xiàn),大概率不會(huì)
2. 自己指定的分區(qū)規(guī)則:讀取上一個(gè) Stage 的 Shuffle 數(shù)據(jù)
樸素的分布式計(jì)算的核心思想:
1. 大問(wèn)題拆分成小問(wèn)題:分而治之
2. 既然要分開(kāi)算,那最后就一定要把分開(kāi)計(jì)算的那么多的小 Task 的結(jié)果執(zhí)行匯總
3. 所以必然分布式計(jì)算引擎的設(shè)計(jì)中,應(yīng)用程序的執(zhí)行一定是分階段
4. 分布計(jì)算引擎的而核心:一個(gè)復(fù)雜的分布式計(jì)算應(yīng)用程序的執(zhí)行肯定要分成多個(gè)階段,每個(gè)階段分布式并 行運(yùn)行多個(gè)Task
5. DAG引擎:
Spark: stage1 ==> stage2 ===> stage3
mapreduce: 就只有兩個(gè)階段:mapper reducer
階段與階段之間需要進(jìn)行 shuffle,只要進(jìn)行了數(shù)據(jù)混洗,就存在著數(shù)據(jù)分發(fā)不均勻的情況。如果情況嚴(yán) 重,就是數(shù)據(jù)傾斜。
分布式計(jì)算引擎的設(shè)計(jì),免不了有shuffle,既然有shuffle操作,就一定有產(chǎn)生數(shù)據(jù)傾斜的可能。如果 你是做大數(shù)據(jù)處理的,就一定會(huì)遇到 數(shù)據(jù)傾斜!