|
18 | 18 | from __future__ import print_function |
19 | 19 |
|
20 | 20 | import sys |
| 21 | +import json |
21 | 22 |
|
22 | 23 | from pyspark import SparkContext |
23 | 24 |
|
|
27 | 28 | hbase(main):016:0> create 'test', 'f1' |
28 | 29 | 0 row(s) in 1.0430 seconds |
29 | 30 |
|
30 | | -hbase(main):017:0> put 'test', 'row1', 'f1', 'value1' |
| 31 | +hbase(main):017:0> put 'test', 'row1', 'f1:a', 'value1' |
31 | 32 | 0 row(s) in 0.0130 seconds |
32 | 33 |
|
33 | | -hbase(main):018:0> put 'test', 'row2', 'f1', 'value2' |
| 34 | +hbase(main):018:0> put 'test', 'row1', 'f1:b', 'value2' |
34 | 35 | 0 row(s) in 0.0030 seconds |
35 | 36 |
|
36 | | -hbase(main):019:0> put 'test', 'row3', 'f1', 'value3' |
| 37 | +hbase(main):019:0> put 'test', 'row2', 'f1', 'value3' |
37 | 38 | 0 row(s) in 0.0050 seconds |
38 | 39 |
|
39 | | -hbase(main):020:0> put 'test', 'row4', 'f1', 'value4' |
| 40 | +hbase(main):020:0> put 'test', 'row3', 'f1', 'value4' |
40 | 41 | 0 row(s) in 0.0110 seconds |
41 | 42 |
|
42 | 43 | hbase(main):021:0> scan 'test' |
43 | 44 | ROW COLUMN+CELL |
44 | | - row1 column=f1:, timestamp=1401883411986, value=value1 |
45 | | - row2 column=f1:, timestamp=1401883415212, value=value2 |
46 | | - row3 column=f1:, timestamp=1401883417858, value=value3 |
47 | | - row4 column=f1:, timestamp=1401883420805, value=value4 |
| 45 | + row1 column=f1:a, timestamp=1401883411986, value=value1 |
| 46 | + row1 column=f1:b, timestamp=1401883415212, value=value2 |
| 47 | + row2 column=f1:, timestamp=1401883417858, value=value3 |
| 48 | + row3 column=f1:, timestamp=1401883420805, value=value4 |
48 | 49 | 4 row(s) in 0.0240 seconds |
49 | 50 | """ |
50 | 51 | if __name__ == "__main__": |
|
64 | 65 | table = sys.argv[2] |
65 | 66 | sc = SparkContext(appName="HBaseInputFormat") |
66 | 67 |
|
| 68 | + # Other options for configuring scan behavior are available. More information available at |
| 69 | + # https://github.com/apache/hbase/blob/master/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java |
67 | 70 | conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table} |
68 | 71 | if len(sys.argv) > 3: |
69 | 72 | conf = {"hbase.zookeeper.quorum": host, "zookeeper.znode.parent": sys.argv[3], |
|
78 | 81 | keyConverter=keyConv, |
79 | 82 | valueConverter=valueConv, |
80 | 83 | conf=conf) |
| 84 | + hbase_rdd = hbase_rdd.flatMapValues(lambda v: v.split("\n")).mapValues(json.loads) |
| 85 | + |
81 | 86 | output = hbase_rdd.collect() |
82 | 87 | for (k, v) in output: |
83 | 88 | print((k, v)) |
|
0 commit comments