相信大家有寫過,利用繼承WritableComparator的方式實做自己的比較器,大家會發現其實寫起來好像不是那麼的方便,由於Avro在執行資料操作也是"不"需要反序列化後再做處理,且使用Avro的好處則是,
1. 若是要改變排序順序只需調整schema即可。
2. 不需要自己在開發繼承WritableComparator後再開發所需之程式碼,更也不需要花心力去maintain code。
問題 : 如何使用下述行前準備中的Input data,利用"score"降冪排序,以及使用"favoriteNumber"做升冪排序。
上述問題可以用,
1. 參考書籍上如何用WritableComparator的方式實做自己的比較器,或是參考後輩http://tonymomo.pixnet.net/blog/post/75211308這篇中的二次排序有帶出如何用WritableComparator的方式實做自己的比較器。
2. 使用Avro以及下述schema。
{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string", "order": "ignore"},
{"name": "score", "type": "int", "order": "ascending"},
{"name": "favoriteNumber", "type" : "int", "order": "descending"}
]
}
流程: HDFS -> MapReduce -> HDFS
行前資料:
1. Input data(不要也把name score favoriteNumber一起放到檔案裡面,這裡只是要做個對應而已)。
name score favoriteNumber
Tony 100 4
Tony 100 5
Tony 100 1
Emma 90 8
Emma 90 9
Emma 90 10
Anne 60 3
Anne 60 5
Alen 80 2
James 70 1
2. Avro schema。
直接使用上述schema。
3. MapReduce
---- Mapper ----
public class MyMapper extends AvroMapper<Utf8, Pair<GenericRecord, String>> {
private static final Schema schema = new Schema.Parser().parse(
"{" +
" \"type\": \"record\"," +
" \"name\": \"User\"," +
" \"fields\": [" +
" {\"name\": \"name\", \"type\": \"string\", \"order\": \"ignore\"}," +
" {\"name\": \"score\", \"type\": \"int\", \"order\": \"ascending\"}," +
" {\"name\": \"favoriteNumber\", \"type\" : \"int\", \"order\": \"descending\"}" +
"]" +
"}"
);
private String sName;
private int iScore;
private int iFavoriteNumber;
private GenericRecord genericRecord = new GenericData.Record(schema);
public void map(Utf8 line, AvroCollector<Pair<GenericRecord, String>> collector, Reporter reporter) throws IOException {
StringTokenizer sToken = new StringTokenizer(line.toString());
sName = sToken.nextToken();
iScore = Integer.parseInt(sToken.nextToken());
iFavoriteNumber = Integer.parseInt(sToken.nextToken());
genericRecord.put("name", sName);
genericRecord.put("score", iScore);
genericRecord.put("favoriteNumber", iFavoriteNumber);
collector.collect(new Pair<GenericRecord, String>(genericRecord, sName));
}
}
---- Reducer ----
public class MyReducer extends AvroReducer<String, GenericRecord, Pair<GenericRecord, String>> {
private static final Schema schema = new Schema.Parser().parse(
"{" +
" \"type\": \"record\"," +
" \"name\": \"User\"," +
" \"fields\": [" +
" {\"name\": \"name\", \"type\": \"string\", \"order\": \"ignore\"}," +
" {\"name\": \"score\", \"type\": \"int\", \"order\": \"ascending\"}," +
" {\"name\": \"favoriteNumber\", \"type\" : \"int\", \"order\": \"descending\"}" +
"]" +
"}"
);
public void reduce(Utf8 key, Iterable<GenericRecord> values, AvroCollector<Pair<GenericRecord, String>> collector, Reporter reporter)
throws IOException {
GenericRecord genericRecord = new GenericData.Record(schema);
for (GenericRecord value: values ) {
genericRecord.put("name", value.get("name"));
genericRecord.put("score", value.get("score"));
genericRecord.put("favoriteNumber", value.get("favoriteNumber"));
}
collector.collect(new Pair<GenericRecord, String>(genericRecord, key.toString()));
}
}
---- Main ----
public class AvroHBase_MapReduce extends Configured implements Tool {
public int run(String[] args) throws Exception {
String hdfsInput = args[0];
String hdfsOutput = args[1];
String schemaFile = args[2];
String cacheFilePath = args[3];
JobConf conf = new JobConf(getConf(), getClass());
conf.setJobName("Avro sort");
// ---- Avro schema setting ---- //
Schema schema = new Schema.Parser().parse(new File(schemaFile));
AvroJob.setInputSchema(conf, Schema.create(Schema.Type.STRING)); /*set input schema*/
Schema intermediateSchema = Pair.getPairSchema(schema, Schema.create(Schema.Type.STRING));
AvroJob.setMapOutputSchema( conf, intermediateSchema ); /*set intermediate Schema */
Schema outputSchema = Pair.getPairSchema(schema, Schema.create(Schema.Type.STRING));
AvroJob.setOutputSchema( conf, outputSchema ); /*set output schema*/
// ---- set Mapper ---- //
AvroJob.setMapperClass(conf, MyMapper.class);
// ---- set Reducer ---- //
// AvroJob.setReducerClass(conf, MyReducer.class);
// ---- set hdfs I/O ---- //
FileInputFormat.addInputPath(conf, new Path(hdfsInput));
FileOutputFormat.setOutputPath(conf, new Path(hdfsOutput));
// ---- input format ---- //
conf.setInputFormat(AvroUtf8InputFormat.class);
JobClient.runJob(conf);
return 1;
}
public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.out.println("Usage : " + "hdfsInput " + "hdfsOutput " + "schemaFilePath " + "cacheFilePath ");
System.exit(1);
}
System.out.println(args[0]);
System.out.println(args[1]);
System.out.println(args[2]);
System.out.println("========");
String[] runsArgs = new String[] { args[0], args[1], args[2] };
int curFlag = ToolRunner.run(new AvroHBase_MapReduce(), runsArgs);
}
}
4. 結果
可以至您指定的output HDFS上拿取part-00000.avro檔,並使用反序列化的方式解開來看,因為MR過後是以Pair的方式儲存,所以在反序列化讀資料時會與之前稍有不同。
GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
FileReader<GenericRecord> dataFileReader = DataFileReader.openReader(file, datumReader); //file: the path of part-00000.avro
System.out.println(dataFileReader.getSchema());
while (dataFileReader.hasNext()) {
GenericRecord gr1 = dataFileReader.next();
AvroKeyValue<GenericRecord, String> avroKeyValue = new AvroKeyValue<GenericRecord, String>(gr1);
GenericRecord gr2 = avroKeyValue.getKey();
System.out.println(gr2.get("name").toString() + " " +
gr2.get("score").toString() + " " +
gr2.get("favoriteNumber").toString() );
}
對應的schema:
{"type":"record","name":"Pair","namespace":"org.apache.avro.mapred","fields":[{"name":"key","type":{"type":"record","name":"User","fields":[{"name":"name","type":"string","order":"ignore"},{"name":"score","type":"int"},{"name":"favoriteNumber","type":"int","order":"descending"}]},"doc":""},{"name":"value","type":"string","doc":"","order":"ignore"}]}
print出的結果 :
Anne 60 5
Anne 60 3
James 70 1
Alen 80 2
Emma 90 10
Emma 90 9
Emma 90 8
Tony 100 5
Tony 100 4
Tony 100 1
可以看的出來score是升冪排序,而當score一樣時favoriteNumber是降冪排序,如此一來就完成secondary sort囉,那麼third sort、fourth sort...就是依此類推拉。
於下一篇Avro - 綜合篇(5) 將會展示如何將 Avro與MapReduce及HBase整合再一起。
全站熱搜