本章則是要說明Hadoop的I/O,從這個章節開始我會把code貼上,我發現有些例子照著書上做根本做不出來,所以我會針對這些例子把它做完整的呈現,有些例子實在是太簡短了就懶得放了,如果有需要的話請跟我說一下,我會實作一下並再與您討論,請大家一起做練習吧~^.^~ 

- 4.1 HDFS的資料完整性,說明當你從叢集取資料時,hadoop能夠確保你拿到的資料的沒有缺失的。

    - 4.1.1 HDFS為了確保資料的完整性,首先當datanode在儲存資料前,會對資料建立檢查碼(checksum CRC-32),而這個checksum會用於

        - a Client端在寫入資料時,會將資料送到datanode上的一個管線(pipeline)做副本的建立,在pipeline中的最後一個datanode會核對檢查碼。

        - b Client端從datanode讀取資料時也會做檢查碼的檢驗。

        - c 每一個datanode的背景也會執行DataBlockScanner,用來定期檢查datanode上所有區塊。

    那麼當資料有錯時,當然client端是不知道囉,Namenode會把改區塊標記成"壞掉",且會把其他datanode上的副本複製過來用以達到預期數字(在hdfs-site.xml中設定的replication個數)。

- 4.2 資料的壓縮帶來兩個好處,第一可以減少硬碟空間,第二可以加速資料在網路或硬碟的傳送,並且介紹目前hadoop中有哪些壓縮工具,也衍生出適合MapReduce的壓縮工具。

    - 4.2.1 Codes是一個壓縮與解壓縮演算法的實作,在hadoop中codecs是用一個compressionCodec介面實作來表示,在這節中有說明如何壓縮與解壓縮資料,在這節中有說明當你在做解壓縮時可以使用CompressionCodecFactory來推測CompressionCodecs然後就可以得知是使用哪種codec( Gzip, bzip2, LZO, SNAPPY, etc... ),這裡我個人是認為在做mapreduce時如果是使用壓縮後的資料做為input時,我們不需要去管它是哪種codec應該也是由CompressionCodecFactory來推測的^_^~。

你可以從HADOOP_HOME/lib/native下找到hadoop原生函式庫,如果需要的話請參考"挑選compression tool for Hbase及如何安裝SNAPPY",裡面有說明如何安裝SNAPPY,BTW若是想裝LZO也是差不多同SNAPPY的安裝方式。

    - 4.2.2 壓縮和mapreduce中map的輸入分割是有密切的關係,想像一下,當有1GB的檔案要放入mapreduce去執行,在輸入分割(定義64MB為一個區塊)時會被拆成16個區塊,這也表示你可以獨立的使用16個map來執行,但若是你選用的壓縮工具在分割後所建立的區塊是無法獨立使用的話,那麼MapReduce會選擇把這16個壓縮的且分割的區塊丟到同一個map下面去執行,這樣子的話就會大大的降低執行的效率了,這麼一來也失去的"在地運算"的美妙了,那麼要如何挑選適合的分割工具也在於使用的環境囉^^~。

    - 4.2.3 在MapReduce中使用壓縮,這裡有兩個部份可以討論。

        - a 壓縮map的輸出,這麼作有一個好處是因為map的輸出會寫入硬碟並且透過網路傳送至reduce節點,使用方式請查閱書籍。

        - b 壓縮reduce的輸出。

- 4.3 在Hadoop序列化的討論中有四個部分很重要,一為"資料壓縮"、二為"快速傳送"、三為"可擴充性"、四為"交互操作性",在Hadoop叢集的資料傳輸乃是使用RPC(Remote Protocal Control),PRC協定中使用序列化將資料轉成2進位資料流,送到別的節點後,再將此二進位資料流經過反序列話成原本的資料再進行操作!!,在Hadoop中是使用Writable做為序列化的格式,但是Writable符合"一"和"二",但是不易擴增且不易讓Java以外的程式語言使用,這裡衍生出Avro的使用(本人尚未對Avro做更深入的研究QQ",有興趣的同學可以去看看^.^)。

        - 4.3.1、4.3.2、4.3.3 合併討論,這裡主要是要闡明如何實做一個"客制化(customized)的Writable的介面",以及如何使用RawComparator來加速mapreduce時sort的速度,底下會說明如何實現上述中"客制化的Writable的介面"以及RawComparator的使用,本人是直接貼上codes所以排版的部分請使用自己愛好的IDE(編譯環境)來做^_^~

客制化Writable介面 :   

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextPair implements WritableComparable<TextPair> {

Text first;
Text second;

// Default constructure
public TextPair() {
set(new Text(), new Text());
}

// constructure with String
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}

// constructure with Text
public TextPair(Text first, Text second) {
set(first, second);
}

public void set(Text first, Text second) {
this.first = first;
this.second = second;
}

public Text getFirst() {
return this.first;
}

public Text getSecond() {
return this.first;
}

@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
first.readFields(in);
second.readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
first.write(out);
second.write(out);
}

@Override
public int compareTo(TextPair tp) {
// TODO Auto-generated method stub
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
return second.compareTo(tp.second);
}

@Override
public int hashCode() {
return ( first.hashCode() * 163 + second.hashCode() );
}
}

    RawComparator的實做 : 

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;


public class CustomizedWritable extends WritableComparator {

private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();

public CustomizedWritable() {
super(TextPair.class);
}

@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
try{
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
if (cmp != 0) {
return cmp;
}
return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,
b2, s2 + firstL2, l2 - firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}

static {
WritableComparator.define(TextPair.class, new CustomizedWritable());
}
}

    MapReduce實做 : 此實做可使用一組text pair做為map的output key,以及使用上述RawComparator做sort及shuffle後送入reduce端做處理。

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class CustomizedMR {

public static class MyMap extends Mapper<Text, Text, TextPair, IntWritable> {

// To declare method
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {

StringTokenizer tokens = new StringTokenizer(value.toString() , " ,");

String i = tokens.nextToken();
String j = tokens.nextToken();

TextPair textPair = new TextPair(i, j);
context.write( textPair, new IntWritable(1) );
}
}

public static class MyReduce extends Reducer<TextPair, IntWritable, Text, IntWritable> {

public void reduce(TextPair key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {

int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}

String s1 = key.first.toString();
String s2 = key.second.toString();
String st = "[" + s1 + ", " + s2 + "]";

context.write( new Text(st), new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {

if (args.length < 2) {
System.out.println("Usage : absolute-input-path absolute-output-path ");
System.exit(1);
}

Configuration conf = new Configuration();
// conf.set("mapred.job.tracker", "masteribm:9001");
Job job = new Job(conf, "Customized Writable test");

job.setJarByClass(CustomizedMR.class);
job.setSortComparatorClass(CustomizedWritable.class); // To use "RawComparator" to sort key in map procedure!!

job.setMapperClass(MyMap.class);
job.setReducerClass(MyReduce.class);
// job.setNumReduceTasks(0);

job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// job.setInputFormatClass(SequenceFileInputFormat.class);

FileInputFormat.addInputPath( job, new Path(args[0]) );
FileOutputFormat.setOutputPath( job, new Path(args[1]) );

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

上述的codes可以搭配hadoop definitive guide 2th來做閱讀以增加熟悉度,值得注意的是客制化的comparator應該要寫成RawComparator。

    - 4.3.4、4.3.5則是介紹Avro的使用(本人尚未深入研究QQ")

- 4.4 有說明mapreduce中是無法直接處理2進位檔的,故hadoop開發了一系列的高階容器來處理這些狀況。

    - 4.4.1 如何寫讀/SequenceFile(書上已有例子)。

    - 4.4.2 如何寫/讀MapFile(書上已有例子),比較值得注意的是MapFile是經過"排序(sort)"及"索引(index)"的SequenceFile,以及如何將sequence file轉為map file(書上已有例子)。

不過目前我尚未使用SequenceFile or MapFile在任何實際的應用上Orz...(因為我都是使用HBase來解決binary的問題),不過SequenceFile可以做為MapReduce的input(SequenceFileInputFormat),這樣的例子是當你輸入為binary時,你可以先把您的binary檔做成SequenceFile然後再進行mapreduce。

    

 

arrow
arrow
    全站熱搜
    創作者介紹
    創作者 TonyMoMo 的頭像
    TonyMoMo

    TonyMoMo的部落格

    TonyMoMo 發表在 痞客邦 留言(0) 人氣()