接下來是第六章了,這個章節主要是說明,MapReduce的運作過程,主要是可以了解一下從作業的提交到開始執行作業中的發生過程。
- 6.1 作業的剖析,這裡主要分成四塊
a. 客戶端 : 提交作業。
b. JobTracker : 用來協調作業的運行。
c. TaskTracker : 用來執行JobTracker所分配的任務。
d. HDFS : 用來分享檔案以及作業,Ex: MapReduce的jar檔、分割的檔案、...等等。
可以用一張圖來解釋整個執行的過程:(這張表在第二章也有出現唷^.^)
- 步驟1. 執行作業: 新版的已經不用runJob()方法了,改用job.waitForCompletion()方法了,作業送出後一樣可以看到執行進度(由50030可以看到)。
- 步驟2. 詢問jobtracker取得一個新的作業ID。
- a. 檢察作業指定的"輸出目錄"。
- b. 計算作業的輸入"分割數量"。
- 步驟3. 將所需的資源,包含JAR檔、組態檔、輸入分割,複製到以步驟2所取得的作業ID的目錄上,JAR檔會複製很多份(由mapred.submit.replication控制,default為10)。
- 步驟4. 通知JobTracker基本的設定已經ready,可以準備開始處理作業。
- 步驟5. 把作業放到內部佇列(queue),作業排程器(default FIFO排程)會從queue中取出作業並初始化它,初始化會建立一個"物件"用來表示執行的作業,它封裝"任務and紀錄執行資訊",用來追蹤任務狀態與訊息。
- 步驟6. 初始化map任務數與reduce任務數。
- a. 從HDFS上擷取輸入分割數,用來決定要分配幾個map任務。
- b. reduce任務數則可以由setNumReduceTasks()來決定。
- 步驟7. TaskTracker會定期使用Heartbeat回報狀態給JobTracker來告知本身的狀態,由heartbeat來告訴JobTracker我可以執行作業!,但是在指派任務給TaskTracker前JobTracker必須先指定所維護的排程器(如上所式default為FIFO)。
- 步驟8. 現在TaskTracker被指派任務了,接下來就是要執行任務了,
- a. 首先從HDFS上抓到"JAR檔"與"應用系統的分散式快取上複製任何所需要的檔案(這個部分可參考253頁的分散式快取)"至TaskTracker的檔案系統上(一般來說應該是在地的檔案,這是因為JAR檔已經被copy很多份了,但也有可能不夠)。
- b. 再來對任務建立一個本地工作"目錄",並將JAR檔的內容解壓縮到這個目錄上。
- c. 最後會建立一個TaskRunner的實例來執行任務。
- 步驟9. TaskRunner會啟動一個新的Java virtual machine(JVM)。
- 步驟10. 執行任務。
這裡有一個重點,就是MapReduce它會回報進度,是藉由計數器(counter)的方式來達成,在上一章有介紹過如何對counter來操作。
- 6.2 當然執行MapReduce也會有失敗的狀況發生,如程式沒寫好有bug。
- 6.2.1
任務失敗(Failed)的情況:
- a. 當map or reduce失敗時,在離開子JVM前會先將錯誤回報至上一層的TaskTracker,而TaskTracker會標記任務為失敗,並釋放任務插槽(slot)來執行其它任務。
- b. 當TaskTracker注意到有一段時間沒有收到更新進度,就會標記該任務為失敗(可利用mapred.task.timeout調長timeout的時間),子JVM在執行一段時間後會被自動刪除。
JobTracker藉由TaskTracker的heartbeat來得到任務失敗時,會重新安排的任務並且會避免安排任務到失敗的TaskTracker上,通常任務重新安排預設值是4,可藉由mapred.map.max.attempts與mapred.reduce.max.attempts來控制最大重新安排次數,但是如果超過最大重新安排次數就代表任務失敗了且不會再重試。
對某些程式而言,因為少數幾次失敗導致任務失敗而放棄整個作業是挺麻煩的,故可以利用設定最大百分比的方式來解決,分別為mapred.max.map.failures.percent與mapred.max.reduce.failures.percent。
任務刪除(killed)的情況:
- a. 任務的刪除與失敗不同,任務刪除可能是因為它是一個推測式執行的複本,被刪除的任務不會做計數器的累計。
- 6.2.2 TaskTracker 失敗,這樣的情況有可能是TaskTracker當機(停止送heartbeat給JobTracker)或是執行過於緩慢(送heartbeat給JobTracker的區間過長),JobTracker會安排這些未完成的作業至別台已經執行成功的TaskTracker的map任務上,這也是因為Reduce任務無法存取這些未完成的任務。
- 6.2.3 JobTracker 失敗,目前書上是無解,但是有說明或許可以使用Zookeeper做為協調器來決定下一個JobTracker。
- 6.3 在Hadoop的default作業排程中是FIFO(first in first out),這樣的方式也代表著先執行的作業擁有所有的資源,等作業執行完畢後才會釋放資源,但是這也會造成小量作業的困擾,故Hadoop中目前有兩種可選的作業排程器,分別是公平排程器與容量排程器。
- 6.3.1 公平排程器(Fair Scheduler),顧名思義就是大家都可以公平的共享資源,更詳細可以參考http://dongxicheng.org/mapreduce/hadoop-fair-scheduler/
- a. 作業池 : 作業會被放在作業池中,預設上每一個使用者都有自己的作業池,Hadoop也可以就map和reduce的數量來客制化最小的作業池容量,並設定每一個作業池的權重。
- b. 插隊(preemption)機制 : 這個排程器有插隊(preemption)的功能當作業太久沒有公平的使用資源的話,那麼排程器就會刪除作業池中得到過多資源的任務,讓插槽空出來給資源少的作業池使用。
- c. 使用方式 : 在新版的Hadoop已經把Fair Scheduler包在一起了,故只需設定即可,。
- 6.3.2 容量排程器(Capacity Scheduler),沒用過且書上寫的我有看沒懂Orz...,等我有空的時候再研究這個排程器,到時再與大家分享,或是有使用過的同學可以跟我說一下,感激不盡~。
- 6.4 洗牌(shuffle)和排序(sort),shuffle是一個從map輸出至reduce輸入的一個過程。
- 6.4.1 map端 : map的輸出不會直接到硬碟,而會先到記憶體(每一個map任務有一個循環記憶體,default大小由io.sort.mb=100MB),然後超過一定閥值(threshold,預設值為io.sort.spill.percent設定,default=0.8*io.sort.mb)之後會溢(spill)到硬碟,溢出的部分會輪循寫入mapred.local.dir所指定的目錄。
- a. 分區(partition),在寫入硬碟之前,執行緒會先把資料放到與reducer相對應的分區,這是最後它們被輸出的地方。
- b. 合併(merge on disk),在map任務結束前,溢出的檔案會被合併到一個已分區且排序的輸出檔案,可用io.sort.factor來控制一次要合併的檔案個數,default為10
- c. 對combiner而言,如果有設定combiner且溢出次數至少為3(可由min.num.spills.for.combine設定),combiner會在寫入輸出檔案之前執行,重新呼叫combiner對資料再次執行,可降低copy到reduce的傳輸量。
- d. 對compression而言,當map要寫入硬碟之前,先做壓縮是不錯的做法,可省空間、減少傳輸量。
- 6.4.2 reduce端 : Reducer會透過HTTP取得可用的partition中的檔案。
- a. 複製(copy),map任務不一定會同時完成,但reduce任務會開始複製這些map的輸出,直到複製完成,reduce可以平行複製,而執行緒的個數上是透過mapred.reduce.parallel.copies來設定(default為5)。
- b. 合併(merge)與排序(sort),這是複製完成後所進入的階段,這個階段會合併map的輸出且維護排序的順序,舉例來說有50個map輸出,會根據一個合併係數假設為10(可由io.sort.factory設定),則最後會產生5個中間檔案。
- c. reduce,上述5個中間檔案會進入reduce的function,這個階段的輸出會寫入到HDFS上,一般而言執行這個reduce的TaskTracker會與DataNode在一起,故reduce所輸出的第一個複本也會在本機上。
- d. 若是map的輸出資料夠小,則它會直接複製到執行reduce的TaskTracker的記憶體上(buffer大小由mapred.job.shuffle.input.buffer.percetn控制)。
- 6.4.3 組態調教(tune),我認為調教的部分是by case的,故這裡只寫通則。
- a. 在map的部分應避免多次溢出到硬碟,可以達到最佳效能,1是最佳次數(可參考計數器中的溢出紀錄筆數)。
- 可透過增加 io.sort.mb 來避免。
- b. 在reduce的部分應考慮過程資料能全都在記憶體內,但在實際上是較難實現,除非資料很小,但是資料夠小又使用MapReduce的架構,就目前來看的CP值是不高的。
- 可透過增加 io.file.buffer.size來測試。
- 6.5 這裡說明在Task的執行上可以得到的幾個控制。
- 6.5.1 推測式(Speculative Execution)任務,MapReduce是把作業切分成多個任務,並且透過平行執行來讓執行時間小於依序執行的時間。
- a. 何謂推測式執行,當一個任務比預期慢,則會啟動另一個相同的任務做為備援。
- b. 任務慢的原因 : 可能是因為硬體等級低、或是不適當的軟體配置。
- c. 執行過程 : 首先等全部的任務都啟動,並且針對已經"執行一段時間"的任務或是"失敗次數過多"的任務,此時才會啟動推測式任務,若是推測式任務先完成,則刪除原有的任務,反之則刪除推測式任務。
- d. 執行目的 : 目的是為了要減少作業執行時間,但這個功能不是用來始作業執行更可靠。
- 6.5.2 JVM的重用,可應用於比較耗費CPU資源的任務,原因是啟動新的JVM大約需花費一秒,這對於短時間的任務比較不明顯,但對於長時間的任務若是使用HotSpot JVM會有較顯著的效能提升,另一個共用JVM的好處就是作業共享,任務可以很快的快取分享資料。
- 6.5.3 忽略損壞紀錄,這裡可在搭配174頁的「任務失敗」在閱讀一次,若是因為任務失敗過多次而導致整體結果fail掉,那可真是可惜,常常可能是因為log在parse時,遺漏某些欄位就會引起失敗。
- Debug方法 :
- 1. 啟用counter(參閱5.5.5)。
- 2. 啟用IsolationRunner(參閱5.5.6)。
- 3. 啟用skipping mode,值得注意的是忽略模式只有在任務失敗兩次後才會啟動且這個機制只能適用於偵測個別的損壞紀錄,你可以增加任務重試的最大次數,讓忽略模式有足夠的嘗試次數(mapred.map.max.attempts與mapred.reduce.max.attempts),偵測到損壞的紀錄會儲存成循序檔放在作業的輸出目錄下的_logs/skip子目錄中,可以使用hadoop fs -text來查看。