相信大家有寫過,利用繼承WritableComparator的方式實做自己的比較器,大家會發現其實寫起來好像不是那麼的方便,由於Avro在執行資料操作也是"不"需要反序列化後再做處理,且使用Avro的好處則是,

1. 若是要改變排序順序只需調整schema即可。

2. 不需要自己在開發繼承WritableComparator後再開發所需之程式碼,更也不需要花心力去maintain code。

問題 : 如何使用下述行前準備中的Input data,利用"score"降冪排序,以及使用"favoriteNumber"做升冪排序。

上述問題可以用,

1. 參考書籍上如何用WritableComparator的方式實做自己的比較器,或是參考後輩http://tonymomo.pixnet.net/blog/post/75211308這篇中的二次排序有帶出如何用WritableComparator的方式實做自己的比較器。

2. 使用Avro以及下述schema。

{
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string", "order": "ignore"},
        {"name": "score", "type": "int", "order": "ascending"},
        {"name": "favoriteNumber", "type" : "int", "order": "descending"}
    ]
}

流程: HDFS -> MapReduce -> HDFS

行前資料:

1. Input data(不要也把name score favoriteNumber一起放到檔案裡面,這裡只是要做個對應而已)。

name score favoriteNumber
Tony 100 4
Tony 100 5
Tony 100 1
Emma 90 8
Emma 90 9
Emma 90 10
Anne 60 3
Anne 60 5
Alen 80 2
James 70 1

2. Avro schema。

直接使用上述schema。

3. MapReduce  

---- Mapper ----

public class MyMapper extends AvroMapper<Utf8, Pair<GenericRecord, String>> {
    
    private static final Schema schema = new Schema.Parser().parse(
            "{" + 
            "    \"type\": \"record\"," + 
            "    \"name\": \"User\"," + 
            "    \"fields\": [" + 
            "        {\"name\": \"name\", \"type\": \"string\", \"order\": \"ignore\"}," +
            "        {\"name\": \"score\", \"type\": \"int\", \"order\": \"ascending\"}," +
            "        {\"name\": \"favoriteNumber\", \"type\" : \"int\", \"order\": \"descending\"}" +
                "]" + 
            "}"
    );
    
    private String sName;
    private int iScore;
    private int iFavoriteNumber;
    private GenericRecord genericRecord = new GenericData.Record(schema);
    
    public void map(Utf8 line, AvroCollector<Pair<GenericRecord, String>> collector, Reporter reporter) throws IOException {
 
        StringTokenizer sToken = new StringTokenizer(line.toString());
        
        sName = sToken.nextToken();
        iScore = Integer.parseInt(sToken.nextToken());
        iFavoriteNumber = Integer.parseInt(sToken.nextToken());
        
        genericRecord.put("name", sName);
        genericRecord.put("score", iScore);
        genericRecord.put("favoriteNumber", iFavoriteNumber);
        
        collector.collect(new Pair<GenericRecord, String>(genericRecord, sName));
        
    }
}

---- Reducer ----

public class MyReducer extends AvroReducer<String, GenericRecord, Pair<GenericRecord, String>> {
    
    private static final Schema schema = new Schema.Parser().parse(
            "{" + 
            "    \"type\": \"record\"," + 
            "    \"name\": \"User\"," + 
            "    \"fields\": [" + 
            "        {\"name\": \"name\", \"type\": \"string\", \"order\": \"ignore\"}," +
            "        {\"name\": \"score\", \"type\": \"int\", \"order\": \"ascending\"}," +
            "        {\"name\": \"favoriteNumber\", \"type\" : \"int\", \"order\": \"descending\"}" +
                "]" + 
            "}"
    );
 
    public void reduce(Utf8 key, Iterable<GenericRecord> values, AvroCollector<Pair<GenericRecord, String>> collector, Reporter reporter) 
            throws IOException {
        
        GenericRecord genericRecord = new GenericData.Record(schema);
        for (GenericRecord value: values ) {          
            genericRecord.put("name", value.get("name"));
            genericRecord.put("score", value.get("score"));
            genericRecord.put("favoriteNumber", value.get("favoriteNumber"));   
        }
        
        collector.collect(new Pair<GenericRecord, String>(genericRecord, key.toString()));
    }
}

---- Main ----

 

public class AvroHBase_MapReduce extends Configured implements Tool  {
 
    public int run(String[] args) throws Exception {
 
        String hdfsInput = args[0];
        String hdfsOutput = args[1];
        String schemaFile = args[2];
        String cacheFilePath = args[3];
 
        JobConf conf = new JobConf(getConf(), getClass());
        conf.setJobName("Avro sort");
 
        // ---- Avro schema setting ---- //
        Schema schema = new Schema.Parser().parse(new File(schemaFile));
        AvroJob.setInputSchema(conf, Schema.create(Schema.Type.STRING)); /*set input schema*/
        
        Schema intermediateSchema = Pair.getPairSchema(schema, Schema.create(Schema.Type.STRING));
        AvroJob.setMapOutputSchema( conf, intermediateSchema ); /*set intermediate Schema */
        
        Schema outputSchema = Pair.getPairSchema(schema, Schema.create(Schema.Type.STRING));
        AvroJob.setOutputSchema( conf, outputSchema ); /*set output schema*/
        
        // ---- set Mapper ---- //
        AvroJob.setMapperClass(conf, MyMapper.class);
 
        // ---- set Reducer ---- //
        // AvroJob.setReducerClass(conf, MyReducer.class);
        
        // ---- set hdfs I/O ---- //
        FileInputFormat.addInputPath(conf, new Path(hdfsInput));
        FileOutputFormat.setOutputPath(conf, new Path(hdfsOutput));
       
        // ---- input format ---- //
        conf.setInputFormat(AvroUtf8InputFormat.class);
        
        JobClient.runJob(conf);
        
        return 1;
    }
 
    public static void main(String[] args) throws Exception {
 
        if (args.length < 3) {
            System.out.println("Usage : " + "hdfsInput " + "hdfsOutput " + "schemaFilePath " + "cacheFilePath ");
            System.exit(1);
        }
 
        System.out.println(args[0]);
        System.out.println(args[1]);
        System.out.println(args[2]);
        System.out.println("========");
        
        String[] runsArgs = new String[] { args[0], args[1], args[2] };
        int curFlag = ToolRunner.run(new AvroHBase_MapReduce(), runsArgs);
 
    }
}
 
4. 結果
可以至您指定的output HDFS上拿取part-00000.avro檔,並使用反序列化的方式解開來看,因為MR過後是以Pair的方式儲存,所以在反序列化讀資料時會與之前稍有不同。
 
GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();          
FileReader<GenericRecord> dataFileReader = DataFileReader.openReader(file, datumReader); //file: the path of part-00000.avro
System.out.println(dataFileReader.getSchema());
while (dataFileReader.hasNext()) {          
    GenericRecord gr1 = dataFileReader.next();
    AvroKeyValue<GenericRecord, String> avroKeyValue = new AvroKeyValue<GenericRecord, String>(gr1);    
    GenericRecord gr2 = avroKeyValue.getKey();
    System.out.println(gr2.get("name").toString() + " " +
                                  gr2.get("score").toString() + " " +
                                  gr2.get("favoriteNumber").toString() );        
}
 
對應的schema:
{"type":"record","name":"Pair","namespace":"org.apache.avro.mapred","fields":[{"name":"key","type":{"type":"record","name":"User","fields":[{"name":"name","type":"string","order":"ignore"},{"name":"score","type":"int"},{"name":"favoriteNumber","type":"int","order":"descending"}]},"doc":""},{"name":"value","type":"string","doc":"","order":"ignore"}]}
 
print出的結果 : 
Anne 60 5
Anne 60 3
James 70 1
Alen 80 2
Emma 90 10
Emma 90 9
Emma 90 8
Tony 100 5
Tony 100 4
Tony 100 1
 
可以看的出來score是升冪排序,而當score一樣時favoriteNumber是降冪排序,如此一來就完成secondary sort囉,那麼third sort、fourth sort...就是依此類推拉。
於下一篇Avro - 綜合篇(5) 將會展示如何將 Avro與MapReduce及HBase整合再一起。  
 
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 TonyMoMo 的頭像
    TonyMoMo

    TonyMoMo的部落格

    TonyMoMo 發表在 痞客邦 留言(0) 人氣()