MapReduce於Hadoop definitive guide edtion 2 中於要進入尾聲了,我會盡量把前面的情況整理一遍,或許會帶有一點情境的感覺,前幾天老闆買了MapReduce Design Pattern,害我看了好想買,但是我又想買Hadoop definitive guide edtion 3和HBase中文版,先看狀況好了,因為第二版還有HBase和Zookeeper沒有跟大家做個介紹,那麼第8章結束後還有第9章與第10章的介紹,9、10章大致上是要說明Hadoop的管理。
那麼第8章主要是要闡述一些MapReduce更進階的特性,包含計數器(在5.5.5也有說過)、資料集的排序和連結。
- 8.1 計數器(Counter),也是因為MapReduce是分散式的運作,這在debug上會有個問題就是你印出的log不知道會出現在哪一個節點上,所以就可以利用計數的方式來得知你的程式有沒有進行一些你想做的事情,相對於印出log來看,使用counter是比較容易的方法。
- 8.1.1 內建計數器,執行完MapReduce可以從50030或是從CLI的Shell上都可以看到的資訊,主要是回報關於作業的各種指標,由於版本不同輸出的內容也不同,請自行參閱^^~。
- 8.1.2 自訂計數器,這個部分可以搭配5.5.5一起閱讀,5.5.5已經有貼過code了,值得注意的是擷取計數器的值得方式。
- c. 擷取方式大致上有三種,這裡比較著重在Java API的部分,可以參考一下code的部分。
- c.1 50030(web介面)
- c.2 CLI(command line)
- c.3 Java API
Counters counters = job.getCounters();
Counter c1 = counters.findCounter(MyMapper.InputCount.Count_input);
System.out.println(c1.getDisplayName()+":"+c1.getValue());
如此一來就可以藉由Java API的方式得到計數器的值。
- 8.2 排序(Sort),這個小節對於排序分成三個部分,分別為部分排序、全域排序以及二次排序。
- 8.2.1 準備好你的輸入資料,
file1.txt : file2.txt : file3.txt :
5 6 8
3 7 11
1 2 4
- 8.2.2 部分(partital)排序,目的是在於產生對應三個file的輸出檔,並有經過排序,則使用NLineInputFormat並將N設定為3,來使得每個map任務都只拿到一個file,並輸出有排序的檔案。
- 插曲1. 實驗後發現NLineInputFormat在conf上的設定是"mapreduce.input.lineinputformat.linespermap"而不是書上寫的喔請注意,故可在看到Launched map tasks=3,可是看到結果後發現竟然沒排序Orz...,寫到這邊不自覺得淚水在眼眶打轉...,難道我前面念的都是假的嗎????
- 插曲1的結果如下所示,這裡我不放code,需要的再跟我說^^~,想一起哭也可以~呵呵
part-m-00000, part-m-00001, part-m-00002
8 5 6
11 3 7
4 1 2
- 插曲2. 既然如此,我一定要搞清楚為什麼會這樣子, 由於之前都會搭配reduce做輸出,而且書上又說了排序是在map輸出前就做了(冏Orz....難道書騙我?),所以是我大意了?,順便一提,我這邊並沒有使用multiple output喔,我只是創造三個map任務分別輸出而已,所以誰先做完不一定,故part-m-nnnnn的結果不一定會對應輸入file1~file3的順序。
- 插曲3. 有道是眼淚擦乾後還是要繼續走,看來目前map-only的output結果是不會有sort的功效的,所以我很容易的就使用job.setNumReduceTasks(3)的方式來測,這不測不要緊,一測又再次傷了我的心Orz....,
- 插曲3的結果如下所示,
part-r-00000, part-r-00001, part-r-00002
3 1 2
6 4 5
7 8
11
- 插曲4. 以為這樣就可以打敗我了嗎?fine, 我跟你拼了,我加入job.setPartitionerClass(HashPartitioner.class)。
- 插曲4的結果如下所示,冏Orz...結果一樣,哈~因為其實default就是使用HashPartitioner,不知道有沒有被我唬到。
part-r-00000, part-r-00001, part-r-00002
3 1 2
6 4 5
7 8
11
- 結論. 好吧~大家看過笑笑就算了...,結論還是要下!!
- a. map-only到是可以做不少事情拉,像是filter拉、log的parse拉,anyway 記得沒有sort喔,如果客倌發現有的話請記得一定要糾正我且拜託你教我。
- b. 雖然有加入三個reduce的task,看到有排序了,但是位置好像怪怪的,我個人是認為這是因為HashPartition造成的(要看code才會知道是怎麼做的),但這也告訴我們不要亂設Reduce的task個數,要看狀況!!。
- c. 其實這樣的問題就是帶出一個想法,若是要解決這類的問題建議還是使用Mutiple output的方式來做。
- 8.2.3 全域(global)排序,書上有說到,全域排序比較好的作法還是使用更高階的語法像是Pig或是Hive,但還是可以用MapReduce來實現唷^^。
- 問題解析,像是上述8.2.2的問題,想把它做全域的排序,書上提供兩種方法。
- a. 只使用一個reduce來做,這是最簡單的做法,但請想想看,這樣不就浪費了平行運算的優勢了嗎。
- b. 再來就是使用InputSampler+TotalOrderPartitioner的方式來做,那麼怎麼做後面會說到,接下來就是先分析一下要怎麼實作全域排序。
- 首先,分析數據分佈的並做取樣。
- a. 先利用一個mapreduce做,但是這不是很好的方法,因為總共需要執行兩次mapreduce。
- b. 故捨去a的方法,改成使用InputSampler中的RandomSampler來做取樣的動作。
- 目的做出如下例的分佈(數據是使用8.2.2的三個input,可以參考一下書上第238頁)。
< 4 [4,7) >=7
- 再來,把對應的值丟到這切割中。
- 最後,排序這些切割中的內容。
那麼到這邊大家應該是想說,喔~大概的程序就是三個檔案做input,然後經過map輸出後再來用全域排序,你試著寫寫看這樣的code,恐怕會想去google一下吧^^",其實我們不需要這麼複雜,請記得我們希望是可以多個mapreduce而非超複雜的mapreduce函式。
實現方式如下所示:
- a. 先寫一個map-only的mapreduce程式,輸入為8.2.2的三個檔案,輸出為SequenceFileOutputFormat,要不要壓縮都可以(這邊是沒做壓縮,不過在寫第二個mapreduce的時候不需要去管"解壓縮"的事情,還記得嗎再第四章壓縮有說到mapreduce會透過CompressionCodecFactory來推測你所使用的CompressionCodecs可參考第80頁)。
- b. 在寫一個全域排序的mapreduce程式,輸入為SequenceFileInputFormat,輸出為SequenceFileOutputFormat即可。
- c. 查看的時候請使用 hadoop fs -text 來欣賞結果。
part a的code ================
public class MyMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
private IntWritable outKey = new IntWritable();
private Text outValue = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer tokens = new StringTokenizer(value.toString());
int intOutKey = Integer.parseInt(tokens.nextToken().trim());
outKey.set(intOutKey);
outValue.set(value.toString());
context.write(outKey, outValue);
}
}
public class MyReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
public static enum RepeatLogDetection {
RepeatLogs
}
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text val : values) {
context.write(key, val);
}
}
}
public class MapReduce_OutputSequenceFile {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "masteribm:9001");
System.out.println("mapred.job.tracker = " + conf.get("mapred.job.tracker"));
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.out.println("Usage : HDFSinput HDFSoutput ");
System.exit(1);
}
Job job = new Job(conf, "Output SequenceFile");
job.setJarByClass(MapReduce_OutputSequenceFile.class);
// ---- set Mapper ---- //
job.setMapperClass(MyMapper.class);
// job.setMapOutputKeyClass(IntWritable.class);
// job.setMapOutputValueClass(Text.class);
// ---- set Reducer ---- //
// job.setReducerClass(MyReducer.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
// ---- set Partitioner ---- //
// ---- set Combiner ---- //
// ---- set I/O format ---- //
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class); // hadoop fs
// -text
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
part b的code ================
public class MapReduce_GlobalSort extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (job == null) {
return -1;
}
job.setInputFormatClass(SequenceFileInputFormat.class); // <Key, Value> = <IntWritable,Text>
job.setOutputKeyClass(IntWritable.class); // default outuptKey LongWritable, so you have to set this class, or you will get error message
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// ---- Set Partitioner ---- //
job.setPartitionerClass(TotalOrderPartitioner.class);
// ---- Set file path ---- //
Path input = FileInputFormat.getInputPaths(job)[0];
// Path output = FileOutputFormat.getOutputPath(job);
// System.out.println("inputPath = " + input);
// System.out.println("outputPath = " + output);
input = input.makeQualified(input.getFileSystem(getConf()));
// System.out.println("makeQualified = " + input);
Path partitionFile = new Path(input, "_partitions");
// System.out.println("partitionFile = " + partitionFile);
// ---- Set PartitionFile ---- //
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFile);
// ---- Set Sampler ---- //
InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(0.1, 10000, 10);
InputSampler.writePartitionFile(job, sampler);
// ---- Add to DistributedCache ---- //
URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
DistributedCache.addCacheFile(partitionUri, getConf());
DistributedCache.createSymlink(getConf());
// System.out.println("GetCacheFile[0] : " + DistributedCache.getCacheFiles(getConf())[0]);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MapReduce_GlobalSort(), args);
System.exit(exitCode);
}
}
================================================
注意一下JobBuilder已經包了一些東西也請記得要一同複製如下所示
public class JobBuilder {
private final Class<?> driverClass;
private final Job job;
private final int extraArgCount;
private final String extrArgsUsage;
private String[] extraArgs;
public JobBuilder(Class<?> driverClass) throws IOException {
this(driverClass, 0, "");
}
public JobBuilder(Class<?> driverClass, int extraArgCount, String extrArgsUsage) throws IOException {
this.driverClass = driverClass;
this.extraArgCount = extraArgCount;
this.job = new Job();
this.job.setJarByClass(driverClass);
this.extrArgsUsage = extrArgsUsage;
}
// vv JobBuilder
public static Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws IOException {
if (args.length != 2) {
printUsage(tool, "<input> <output>");
return null;
}
Job job = new Job(conf);
job.setJarByClass(tool.getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job;
}
public static void printUsage(Tool tool, String extraArgsUsage) {
System.err.printf("Usage: %s [genericOptions] %s\n\n", tool.getClass().getSimpleName(), extraArgsUsage);
GenericOptionsParser.printGenericCommandUsage(System.err);
}
// ^^ JobBuilder
public JobBuilder withCommandLineArgs(String... args) throws IOException {
Configuration conf = job.getConfiguration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
String[] otherArgs = parser.getRemainingArgs();
if (otherArgs.length < 2 && otherArgs.length > 3 + extraArgCount) {
System.err
.printf("Usage: %s [genericOptions] [-overwrite] <input path> <output path> %s\n\n", driverClass.getSimpleName(), extrArgsUsage);
GenericOptionsParser.printGenericCommandUsage(System.err);
System.exit(-1);
}
int index = 0;
boolean overwrite = false;
if (otherArgs[index].equals("-overwrite")) {
overwrite = true;
index++;
}
Path input = new Path(otherArgs[index++]);
Path output = new Path(otherArgs[index++]);
if (index < otherArgs.length) {
extraArgs = new String[otherArgs.length - index];
System.arraycopy(otherArgs, index, extraArgs, 0, otherArgs.length - index);
}
if (overwrite) {
output.getFileSystem(conf).delete(output, true);
}
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
return this;
}
public Job build() {
return job;
}
public String[] getExtraArgs() {
return extraArgs;
}
}
===========================================
若是使用上述例子跑出來的最終結果如下所示 : (請記得要用hadoop fs -text的方式來看喔)
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
11 11
- 8.2.4 二次(secondary)排序,這是目前最後一個練習了,後面雖然還有使用mapreduce做合併(join),但我想等開始閱讀mapreduce design pattern時在把code呈現給大家,請多見諒^^"。那麼甚麼是二次排序呢?當你的key已經不在是只有一個值(1)而是兩個值(1, 2)時,當使用key在做排序時,若是第一個key值一樣,那麼就應該要比較第二個key值,那所謂的二次排序就是當第一個key值一樣的時候,將會排第二個key值。
- 準備工作 : 一個例子,一次滿足~cool!!
- a. 實做WritableComparable
public class IntPair implements WritableComparable<IntPair> {
IntWritable first;
IntWritable second;
// Default constructure
public IntPair() {
set(new IntWritable(), new IntWritable());
}
// constructure with Text
public IntPair(int first, int second) {
set(new IntWritable(first), new IntWritable(second));
}
public void set(int first, int second) {
this.first = new IntWritable(first);
this.second = new IntWritable(second);
}
public void set(IntWritable first, IntWritable second) {
this.first = first;
this.second = second;
}
public IntWritable getFirst() {
return this.first;
}
public IntWritable getSecond() {
return this.first;
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
first.readFields(in);
second.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
first.write(out);
second.write(out);
}
@Override
public int compareTo(IntPair intPair) {
// TODO Auto-generated method stub
int cmp = first.compareTo(intPair.first);
if (cmp != 0) {
return cmp;
}
return second.compareTo(intPair.second);
}
@Override
public int hashCode() {
return (first.hashCode() * 127 + second.hashCode());
}
}
- b. 實做Mapper
public class MyMapper extends Mapper<LongWritable, Text, IntPair, IntWritable> {
private IntPair outKey = new IntPair();
private IntWritable outValue = new IntWritable(1);
public static enum LogNumberDetection {
LogNumbers
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer tokens = new StringTokenizer(value.toString());
int intOutKey1 = Integer.parseInt(tokens.nextToken().trim());
int intOutKey2 = Integer.parseInt(tokens.nextToken().trim());
outKey.set(intOutKey1, intOutKey2);
// ---- set Counter ---- //
context.setStatus("LogNumbers");
context.getCounter(LogNumberDetection.LogNumbers).increment(1);
context.write(outKey, outValue);
}
}
- c. 實做Partitioner的RawComparator
public class MyKeyComparator extends WritableComparator {
protected MyKeyComparator() {
super(IntPair.class, true);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int firstL1 = Integer.SIZE/8;
int firstL2 = Integer.SIZE/8;
// Compare first pair
int cmp = WritableComparator.compareBytes(b1, s1, firstL1, b2, s2, firstL2);
if (cmp != 0) {
return cmp;
}
// If first pair is equal then Compare second
return WritableComparator.compareBytes(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 - firstL2);
}
@Override
//Compare two WritableComparables.
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
int left_first = ip1.first.get();
int right_first = ip2.first.get();
int left_second = ip1.second.get();
int right_second = ip2.second.get();
if (left_first != right_first) {
if (left_first > right_first) {
return 1;
} else {
return -1;
}
} else if (left_second != right_second){
if (left_second > right_second) {
return 1;
} else {
return -1;
}
} else {
return 0;
}
}
static { // register this comparator
WritableComparator.define(IntPair.class, new MyKeyComparator());
}
}
- d. 實做Grouper的RawComparator
public class MyGroupComparator extends WritableComparator {
protected MyGroupComparator() {
super(IntPair.class, true);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int firstL1 = Integer.SIZE/8; // very important
int firstL2 = Integer.SIZE/8; // very important
// Compare first element to group same first element
return WritableComparator.compareBytes(b1, s1, firstL1, b2, s2, firstL2);
}
@Override
//Compare two WritableComparables.
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
int l = ip1.first.get();
int r = ip2.first.get();
return l == r ? 0 : (l < r ? -1 : 1);
}
static { // register this comparator
WritableComparator.define(IntPair.class, new MyGroupComparator());
}
}
- e. 實做Reducer
public class MyReducer extends Reducer<IntPair, IntWritable, Text, IntWritable> {
public static enum RepeatLogDetection {
RepeatLogs
}
private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1);
private static final Text SEPARATOR = new Text("------------------------------------------------");
public void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
context.write(SEPARATOR, null);
int sum = 0;
//從這邊可以看的出來grouper有無效果
for (IntWritable val : values) {
// sum += 1;
outKey.set( Integer.toString(key.first.get()) + " " + Integer.toString(key.second.get()) );
context.write(outKey, outValue);
}
}
}
- f. 實做main function
public class MapReduce_SecondarySort extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
conf.set("mapreduce.input.lineinputformat.linespermap", "3");
Job job = JobBuilder.parseInputAndOutput(this, conf, args);
if (job == null) {
return -1;
}
// ---- Set Mapper ---- //
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(IntWritable.class);
// ---- Set Partitioner ---- //
job.setPartitionerClass(MyPartitioner.class);
// ---- Set Combiner ---- //
// ---- Set SortComparator ---- //
job.setSortComparatorClass(MyKeyComparator.class);
// ---- Set GroupingComparator ---- //
job.setGroupingComparatorClass(MyGroupComparator.class);
// ---- Set Reducer ---- //
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(IntPair.class); // default outuptKey LongWritable, so you have to set this class, or you will get error message
job.setOutputValueClass(IntWritable.class);
// ---- Set I/O Format ---- //
job.setInputFormatClass(NLineInputFormat.class); // <Key, Value> = <IntWritable, Text>
// job.setOutputFormatClass(SequenceFileOutputFormat.class);
// FileInputFormat.addInputPath(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MapReduce_SecondarySort(), args);
System.exit(exitCode);
}
}
- 輸入5個檔案,分別為:
1900.log, 1901.log, 1902.log, 1903.log, 1904.log
1900 35 1901 36 1902 33 1903 22 1904 21
1900 22 1901 33 1902 25 1904 25
1900 35 1904 21
1904 21
- a. job.setPartitionerClass(MyPartitioner.class);
- b. job.setSortComparatorClass(MyKeyComparator.class);
- c. job.setGroupingComparatorClass(MyGroupComparator.class);
- 只有a的情況與a+b的情況是一樣的,就算只有a它還是會做排序,因為IntPair是WritableComparable的實作,所以會呼叫compareTo方法()來做排序,但是b是RawComparator的實現可以使比較速度更快,將key做hash後丟入相應的partition,然後由reduce輸出。
------------------------------------------------
1900 22 1
------------------------------------------------
1900 35 1
1900 35 1
------------------------------------------------
1901 33 1
------------------------------------------------
1901 36 1
------------------------------------------------
1902 25 1
------------------------------------------------
1902 33 1
------------------------------------------------
1903 22 1
------------------------------------------------
1904 21 1
1904 21 1
1904 21 1
------------------------------------------------
1904 25 1
從上面的結果可以發現,reduce的輸出並沒有把19開頭的數字做merge的動作,只有完全一樣的組合才有被merge,故需要加入c。
- a + c = a + b + c的情況,先根據key丟入對應的partition,然後利用RawComparator做key pair做排序,最後利用Group將19開頭一樣的數字做merge,結果如下所示。
------------------------------------------------
1900 22 1
1900 35 1
1900 35 1
------------------------------------------------
1901 33 1
1901 36 1
------------------------------------------------
1902 25 1
1902 33 1
------------------------------------------------
1903 22 1
------------------------------------------------
1904 21 1
1904 21 1
1904 21 1
1904 25 1
從上述結果可發現,19開頭的值都被merge起來了。
- 8.4.2 分散式快取(Distributed cache),顧名思義就是要能更讓資料可以是分散的,好讓map或reduce可以更方便的取得資料。
- 使用方式,
- a. CLI下則是使用-flies的方式來做。
for ex. hadoop jar OOO.jar -files hdfs://IP:Port/user/hadoopLog/xxx.log -libjars hdfs://IP:port/user/hadoopJar/xxx.jar
- b. Java API (可參閱239頁的例子8-8,在說明全域排序時有用到),直接節錄上面的code,這裡需要注意的是,在使用之前請記得先把檔案放置HDFS上。
// ---- Set input path of HDFS---- //
Path input = FileInputFormat.getInputPaths(job)[0];
input = input.makeQualified(input.getFileSystem(getConf())); //
Path partitionFile = new Path(input, "_partitions");
// ---- Add to DistributedCache ---- //
URI partitionUri = new URI(partitionFile.toString() + "#_partitions");//這是一個連結符號
DistributedCache.addCacheFile(partitionUri, getConf()); //透過addCacheFile告知DisTributedCache檔案再HDFS的位置
DistributedCache.createSymlink(getConf()); //做同步連結的意思