close

趁著上班空檔,再分享一篇關於Avro於HBase的操作,其實說穿了無非就是如下所示:

Put : Data -> Avro(servialize) -> HBase

Get : HBase -> Avro(deserialize) -> Data

因為Avro的型態就是一個緊湊的binary格式,所以要放上HBase是非常容易的。

行前準備:

1. 於HBase上建立table,如果要直接引用下述的code且您懶的改code,請將table的family設定成f1。

2. schema的設計。

---- Schema ----

{
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": ["string", "null"], "order": "ignore"},
        {"name": "favorite_number", "type": ["int", "null"], "order": "ascending"},
        {"name": "favorite_color", "type": ["string", "null"], "order": "ignore"}
    ]
}

---- Put data to HBase ----

public class AvroHBase_PutData {
 
    public static void main(String[] args) throws IOException {
        
        if (args.length < 3) {
            System.out.println("Usage : " + "hostname " + "tablename " + "avroFilePath ");
            System.exit(0);
        }
        
        String hostName = args[0];
        String tableName = args[1];
        String avroFilePath = args[2];
        
        // ---- table configuration ---- //
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", hostName);
        config.set("hbase.zookeeper.property.clientPort", "2181");
             
        HTable hTable = new HTable(config, Bytes.toBytes(tableName));
        
        // ---- Schema parser ---- //
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(new File("user.avsc"));
 
        // ---- Using Generic API to build a Avro record instance ---- //
        GenericRecord user1 = new GenericData.Record(schema);
        user1.put("name", "Tony");
        user1.put("favorite_number", 4);
        user1.put("favorite_color", "blue"); // or without this line
 
        GenericRecord user2 = new GenericData.Record(schema);
        user2.put("name", "Emma");
        user2.put("favorite_number", 8);
        user2.put("favorite_color", "red"); // or without this line
        
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        // ---- Serialize user1 and memory buffer ---- //
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
        Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); // out : output stream to write to
       
        datumWriter.write(user1, encoder);
        encoder.flush();
        datumWriter.write(user2, encoder);
        encoder.flush();
 
        byte[] data1 = out.toByteArray();
        
        Put put = new Put( Bytes.toBytes("r1") );
        
        put.add( Bytes.toBytes("f1"),  
                 Bytes.toBytes("c1"),
                 data1 );
        
        hTable.put(put);
        out.close();
        
    }  
}
 
@ 貼心提示 : 可以使用hbase shell的scan來檢查table內有無資料: 應該可以看到value如下所示
r1                                 column=f1:c1, timestamp=1362728757166, value=\x08Tony\x00\x08\x00\x08blue\x08Emma\x00\x10\x00\x06red
 
---- Get data from HBase ----
 
public class AvroHBase_GetData {
 
    public static void main(String[] args) throws IOException {
 
        if (args.length < 2) {
            System.out.println("Usage : " + "hostname " + "tablename ");
            System.exit(0);
        }
 
        String hostName = args[0];
        String tableName = args[1];
        String avroFilePath = args[2];
 
        // ---- table configuration ---- //
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", hostName);
        config.set("hbase.zookeeper.property.clientPort", "2181");
 
        HTable hTable = new HTable(config, Bytes.toBytes(tableName));
 
        // ---- Schema parser ---- //
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(new File("user.avsc"));
        
        
        Scan s = new Scan();
        s.setStartRow( Bytes.toBytes("r1") );
        
        ResultScanner scanner = hTable.getScanner(s);
        for (Result rowResult2 : scanner) {
            for (KeyValue kv : rowResult2.raw()) {
                
                byte[] buffers = kv.getValue();
                ByteArrayInputStream byteIn = new ByteArrayInputStream(buffers);
                
                // ---- Deserialize user1 from memory buffer ---- //
                DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
                Decoder decoder = DecoderFactory.get().binaryDecoder(byteIn, null);
                GenericRecord result;
                
                while (true) {          
                    try {
                        result = datumReader.read(null, decoder);
                        System.out.println(result.get("name").toString());
                        System.out.println(result.get("favorite_number").toString());
                        System.out.println(result.get("favorite_color").toString());
                        System.out.println("============");
                    } catch ( EOFException e ) {
                        break;
                    }
                                      
                }
                
            }
        }
    }
}
 
其實在讀的時候比較值得注意的是可以用datumReader的read來做讀取,因為這邊是寫了user1與user2,故必須反覆的讀來達到retrieve完整資料的效果,執行完後可以看到如下結果:

Tony
4
blue
============
Emma
8
red
============

arrow
arrow
    全站熱搜
    創作者介紹
    創作者 TonyMoMo 的頭像
    TonyMoMo

    TonyMoMo的部落格

    TonyMoMo 發表在 痞客邦 留言(0) 人氣()