剩下兩個章節就要結束MapReduce的練習囉^0^~,但這也只是MapReduce V1的版本,往後還會在與大家分享MapReduce V2(YARN)的內容,在這個章節要說明的就是MapReduce的類型(type)與格式(format)。

- 7.1 MapReduce的類型,這個小節主要說明map任務與reduce任務的輸入與輸出型別,大致上詳細的流程可以為:

Input Formt -> mapper -> partitioner -> Grouper -> combiner -> reducer -> Output Format

只考慮紅色的部分:

Input Format : (K1, V1)

mapper : (K1, V1) -> list(K2, V2)

combiner : list(K2, list(V2)) -> (K2, V2)

reducer : (K2, list(V2)) -> (K3, V3)

其它的角色 :

Partitioner : 使用 K2做partition

Grouper : 使用 K2做group

上述介紹都是基本的概念,那麼要怎麼用就直接利用code來做介紹,下述的code不介紹Partition與Grouper,因為在第8章會說明,底下的code切勿直接複製使用(因為code不完整,只是講解用)。

public class MyMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {

    ....

    context.write(IntWritable, IntWritable);

}

public class MyCombiner extends Reducer<IntWritable, IntWritable, IntWritableText> {

    ....

    context.write(IntWritable, Text);

}

public class MyReducer extends Reducer<IntWritable, Text, IntWritable, Text> {  

    ....

    context.write(IntWritableText);

}

public static void main(String[] args) {

    Configuration conf = new Configuration();

    Job job = new Job(conf, "Output SequenceFile");

    job.setJarByClass(MapReduce_OutputSequenceFile.class);

    // ---- Set InputFormat ---- //

    job.setInputFormatClass(TextInputFormat.class); // 設定Input Format,預設為TextInputFormat,故K1型別:LongWritable,V1型別:Text

    // ---- set Mapper ---- //

    job.setMapperClass(MyMapper.class);

    job.setMapOutputKeyClass(IntWritable.class);

    job.setMapOutputValueClass(IntWritable.class);

    // ---- set Combiner ---- //

    job.setCombinerClass(MyCombiner.class);

    // ----- Set Reducer ---- //

    job.setReducerClass(MyReducer.class);

    // job.setNumReduceTasks(0);

    job.setOutputKeyClass(IntWritable.class);

    job.setOutputValueClass(Text.class);

    // ---- Set OutputFormat ---- //

    job.setOutputFormatClass(SequenceFileOutputFormat.class); // 設定Output Format,可設定為SequenceFile,型別則根據OutputKey與OutputValue。

}

參考上述即可知道從InputFormat至OutputFomat的類型應該要如何做對應,於接下來的小節會介紹輸入與輸出的格式。

- 7.2 Hadoop可以處理很多不同類型的資料格式,從純文字到資料庫。    

    - 7.2.1 輸入格式,FileInputFormat提供兩件事,

    - a. 作業路徑中包含哪些檔案,路徑可以是一個檔案,一個目錄或是使用模塊(glob)來表示一個檔案和目錄的集合,且FileInputFormat會使用預設的過濾器排除隱藏的檔案,這些檔案名稱前面都會有一個「. 」或是「_ 」的符號,故過濾器的設定可參考書上的說明(201頁)。

    - b. 對輸入檔案做分割,對大檔案分割後可以算出要使用幾個map任務,分割的大小大致上都是讓 HDFS的區塊大小 = 分割大小

對Hadoop而言,少量的大檔案的處理效能會遠好於大量的小檔案,想像一下,將一個1GB的資料切成16的64MB的區塊,或者是10000個100KB的檔案,前者只需使用16個map任務,後者需10000個map任務,除非你有10000個map slot,不然後者執行作業所需的時間遠大於前者的時間,且大量小檔案會很占據namenode的記憶體。

故針對大量小檔案而言hadoop有幾種方式來解決:

    - a. CombineFileInputFormat,這個是將一些檔案打包到一個分割中,且區塊的配置也會根據節點和機架本地化的策略來決定,故可以看成是一般的MapReduce作業的輸入,但是目前尚未有明確的實做,故要自己來...很cool~但是有點麻煩。

    - b. 把小檔案都寫到SequenceFile中,因為SequenceFile支援分割。

對MapReduce作業而言,不是每次執行都想要做檔案的分割,有很多方法可以用來確保檔案不會被分割,有一種很快的方式就是將最小分割大小提高到比最大的檔案都還要大即可,另一種方式就是繼承FileInputFormat類別,實做自己的子類別,並覆寫isSplitable()方法,將回傳值改成false。

    - 7.2.2 文字(Text)輸入,Hadoop擅長處理非結構化文字,底下會有幾種處理文字時的幾種不同的InputFormats。

        - a. TextInputFormat,預設的InputFormat,每一行輸入就是一筆記錄。上面的code中有介紹過(K,V)的型別了,但是K表示並非行號而是每一行資料的偏移量(offset),而V就是整行資料。

        - b. KeyValueTextInputFormat,TextInputFormat的Key就是每一行的偏移量,但是不怎麼常用,通常檔案中的每一行都會變成<Key,Value>資料,中間利用一個分隔符號,通常是tab,通常處理TextOutputFormat的輸出結果,正確的方式應該是使用KeyValueTextInputFormat來處理

        - c. NLineInputFormat,通常TextInputFormat與KeyValueInputFormat所收到的行數並不固定(與分割的大小和資料行的長度有關係),如果mapper想收到固定的行數,可以考慮使用這個InputFormat,而<Key,Value>的型別與TextInputFormat一樣。

    - 7.2.3 二進位(Binary)輸入,

        - a. SequenceFileInputFormat,Hadoop的序列化檔案格式儲存二進位key-value資料序列,這裡要注意的是SequenceFileInputFormat的key及value型別須先知道,舉例來說SequenceFileInputFormat的key為IntWritable,value為Text,則就要宣告Mapper<IntWritable, Text, K, V>,這裡的K,V為map任務的output型別。

        - b. SequenceFileAsTextInputFormat,為SequenceFileInputFormat的變形,會將序列化檔案中的key及value轉換為Text物件,轉換時會呼叫toString()對key和value轉換。

        - c. SequenceFileAsBinaryInputFormat,為SequenceFileInputFormat的變形,會將key及value轉換為Binary物件,它們會被封裝成BytesWritable物件,值得注意的是使用SequenceFile.Reader的appendRaw()方法,能使MapReducce可以在任何的二進位資料類型上使用。

    - 7.2.4 多樣化(Multiple)輸入,當你有多種資料格式要同時做MapReduce時,就可以考慮這種方法,舉個例子,當你同時有純文字檔與二進位檔想做MapReduce,值得注意的是map的輸出型別是相同的,因為reducer看到的是聚合後的map輸出。

    - 7.2.5 資料庫(Data base)輸入,目前本人有在用的只有HBase可使用TableIputFormat來達到讀取HBase的方法。,但是Hadoop ecosystem中還有一個Sqoop,可以用來做為HDFS與關聯式資料庫的溝通。

- 7.3 輸出格式

    - 7.3.1 文字(Text)輸出,

        - TextOutputFormat,為預設輸出格式,預設每一行的資料以key-value呈現且以tab做分隔,適合用KeyValueTextInputFormat來讀取。

        - NullOutputformat,如果將輸出格式設定成這種就不會送出任何資訊,且也可以用NullWritable隱藏key或value,適合用TextInputFormat來讀取。

    - 7.3.2 二進位(Binary)輸出,

        - SequenceFileOutputFormat,這種輸出格式的好處就是可分割、很緊密、可壓縮。

        - SequenceFileAsBinaryOutputFormat,它會將key和value以Binary的方式寫到一個SequenceFile的容器上。

    - 7.3.3 多樣化(Multiple)輸出,在書上是說明若是想把每一個站的輸出成一個獨立的檔案,那麼每個檔案就得包含該站的全部資料,那麼要怎麼做則用code來說明。

        - a. 首先先準備輸入資料 : 就隨便寫個資料如下所示:

            p1 I

            p2 Love

            p3 Hadoop

            p4 Very

            P5 Much

        - b. 程式部分 :

public class MultipleOutputMapper extends Mapper<LongWritable, Text, Text, Text> {

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        StringTokenizer token = new StringTokenizer( value.toString() );

        String stationID = token.nextToken();
        String content = token.nextToken();

        context.write(new Text(stationID), new Text(content));
    }
}

public class MultipleOutputReducer extends Reducer<Text, Text, NullWritable, Text> {

    private MultipleOutputs<NullWritable, Text> multipleOutputs;

    @Override
    protected void setup(Context context)
            throws IOException, InterruptedException {
        multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
    }

    public void reduce(Text key, Iterable<Text> values, Context context)
            throws InterruptedException, IOException {

        String basePath;
        for (Text value : values) {
        // ---- add path ---- //
        basePath = key.toString(); //做為輸出名稱
        // basePath = String.format("%s/part",key.toString());
        multipleOutputs.write( NullWritable.get(), value, basePath ); // KeyOut, ValueOut, baseOutputPath
    }
}

    @Override
    public void cleanup(Context context)
            throws InterruptedException, IOException {
        multipleOutputs.close();
    }
}

public class MapReduce_PartitionByManualUsingMultipleOutputs {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        String[] gopArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if (gopArgs.length < 2) {
            System.out.println(" Usage : HDFSinput HDFSoutput ");
            System.exit(1);
        }

        Job job = new Job(conf, "Customized Writable test");
        job.setJarByClass(MapReduce_PartitionByManualUsingMultipleOutputs.class);

        // ---- set Mapper ---- //
        job.setMapperClass(MultipleOutputMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        // ---- set Reducer ---- //
        job.setReducerClass(MultipleOutputReducer.class);
        // job.setNumReduceTasks(0);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class); 

        // ---- set I/O Format ---- //
        FileInputFormat.addInputPath( job, new Path(gopArgs[0]) );
        FileOutputFormat.setOutputPath( job, new Path(gopArgs[1]) );

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

        - c. 結果部分:

            可於HDFS的output(gopArgs[1])路徑找到p1-r-00000~p5-r-00000內容分別為I, Love, Hadoop, Very, Much

    - 7.3.4 延遲(Lazy)輸出,這裡是確定當partition有紀錄時才會建立part-m-nnnnn/part-r-nnnnn的輸出。

    - 7.3.5 資料庫(Data base)輸出,目前本人只有使用HBase,可使用TableOutputFormat來達到寫入HBase的方法。

    

 

 

 

 

 

arrow
arrow
    全站熱搜
    創作者介紹
    創作者 TonyMoMo 的頭像
    TonyMoMo

    TonyMoMo的部落格

    TonyMoMo 發表在 痞客邦 留言(0) 人氣()