close

繼上一篇實做的code generation,發現run完整個procedure已經要吐之後,接下來就是要介紹一下如何在不使用code generation的情況下來玩Avro。

PS. 後輩是使用Hadoop 1.0.3的版本,執行的時候出現了paranamer的error,所以有請google大神幫後輩抓了一下paranamer的jar包,請前輩們自行抓取囉^_^~

那麼接下來就是要使用MapReduce搭配Avro來做well-known"排序"的工作。

行前準備:

1. 設計相應的avro schema。

2. 序列化(serialize)資料後,並放在hdfs上做為input。

3. 設計MapReduce的code並執行。

4. 反序列化後顯示於CLI上。

=================================

1. 設計相應的avro schema。

{
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "left", "type": "string", "order": "ignore"},
        {"name": "right", "type": "int", "order": "descending"}
    ]
}

值得注意的是"order"的部分,ignore表示忽略排序,descending表示依照降冪做排序,ascending表示依照升冪做排序。這樣的schema的意義表示我只對"right"的部分做降冪排序。

2. 序列化資料後,並上在hdfs上做為input。

File file = new File(avroFilePath);

// ---- Avro schema parser ---- //
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(new File("SortedStringPair.avsc")); //上述的avro schema 

// ---- Using Generic API to build a Avro record instance ---- //
GenericRecord datum1 = new GenericData.Record(schema);
datum1.put("left", "A");
datum1.put("right", 1);

GenericRecord datum2 = new GenericData.Record(schema);
datum2.put("left", "C");
datum2.put("right", 2);

GenericRecord datum3 = new GenericData.Record(schema);
datum3.put("left", "B");
datum3.put("right", 3);

GenericRecord datum4 = new GenericData.Record(schema);
datum4.put("left", "B");
datum4.put("right", 2);

DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, file);
dataFileWriter.append(datum1);
dataFileWriter.append(datum2);
dataFileWriter.append(datum3);
dataFileWriter.append(datum4);
dataFileWriter.close();

OK~上述2.做完後會產生一個檔案在local端,而這個檔案就是序列化後的檔案,可以先Deserialize看一下是不是與上述code中的順序一致。

{"left": "A", "right": 1}
{"left": "C", "right": 2}
{"left": "B", "right": 3}
{"left": "B", "right": 2}

請在把這個檔案上傳至HDFS上,但也是可以把輸出檔案直接導到HDFS上(有需要code請在發信或留言^_^~感謝)。

3. MapReduce的code。

---- mapper ----

public class MyMapper<K> extends AvroMapper<K, Pair<K, K>> {
    public void map(K datum, AvroCollector<Pair<K, K>> collector, Reporter reporter) throws IOException {
        collector.collect(new Pair<K, K>(datum, null, datum, null));
    }
}
 
---- reducer ----
public class MyReducer<K> extends AvroReducer<K, K, K> {
    public void reduce(K key, Iterable<K> values, AvroCollector<K> collector, Reporter reporter) throws IOException {
        for ( K value: values ) {
            collector.collect(value);
        }
    }
}
 
---- main ----
public class MapReduce_Avro_Sort extends Configured implements Tool {
 
    public static final int successFlag = 1;
 
    public int run(String[] args) throws Exception {
 
        String hdfsInput = args[0];
        String hdfsOutput = args[1];
        String schemaFile = args[2];
 
        JobConf conf = new JobConf(getConf(), getClass());
        conf.setJobName("Avro sort");
 
        // ---- Avro schema setting ---- //
        Schema schema = new Schema.Parser().parse(new File(schemaFile));
        AvroJob.setInputSchema(conf, schema); /*set input schema*/
        
        Schema intermediateSchema = Pair.getPairSchema(schema, schema);
        AvroJob.setMapOutputSchema(conf, intermediateSchema); /*set intermediate Schema */
        
        AvroJob.setOutputSchema(conf, schema); /*set output schema*/
        
        // ---- set Mapper ---- //
        AvroJob.setMapperClass(conf, MyMapper.class);
 
        // ---- set Reducer ---- //
        AvroJob.setReducerClass(conf, MyReducer.class);
        
        // ---- set hdfs I/O ---- //
        FileInputFormat.addInputPath(conf, new Path(hdfsInput));
        FileOutputFormat.setOutputPath(conf, new Path(hdfsOutput));
       
        JobClient.runJob(conf);
        
        return 1;
    }
 
    public static void main(String[] args) throws Exception {
 
        if (args.length < 3) {
            System.out.println("Usage : " + "hdfsInput " + "hdfsOutput " + "schemaFile");
            System.exit(1);
        }
 
        String[] runsArgs = new String[] { args[0], args[1], args[2] };
        int curFlag = ToolRunner.run(new MapReduce_Avro_Sort(), runsArgs);
        if (curFlag == successFlag) {
            System.exit(curFlag);
        }
    }
 
}
 
4. 反序列化資料並顯示於CLI上。
 
Alright,接下來就是去HDFS的output找一下檔案囉,沒錯相信您一定可以找到part-00000.avro的檔案,這個就是排序後的序列化檔案,您可以把這個檔案先get到local端,然後再使用上一篇所說的Deserialize的方式解開,就可以看到如下的結果。

{"left": "B", "right": 3}
{"left": "C", "right": 2}
{"left": "B", "right": 2}
{"left": "A", "right": 1}

看的出來是有成功的做到降冪的排序。

接下來後輩會將HBase與Avro與MapReduce做個結合,若是能成功的結合,那麼就會有第三篇或以上的產生^.^,最近公司有夠忙,後輩實在是抽不出時間來寫blog...。

 

arrow
arrow
    全站熱搜
    創作者介紹
    創作者 TonyMoMo 的頭像
    TonyMoMo

    TonyMoMo的部落格

    TonyMoMo 發表在 痞客邦 留言(0) 人氣()