- Authors
- Name
Background
以下のように、salt:timestamp:data形式のrow_keyがある場合、特定のtimestamp範囲のrow keyだけを抽出して新しいテーブルを作成したい時、検索したいキーがrow_keyの中間にあるためfull scanせざるを得ません。full scanは時間がかかりすぎるため、MapReduce Jobを作成して目的のrow keyで新しいテーブルを生成することができます。
hbase:004:0> scan 'testtable'
ROW COLUMN+CELL
15g:112:a column=cf:, timestamp=2023-06-11T01:01:03.539, value=value1
45g:111:a column=cf:, timestamp=2023-06-11T01:00:56.308, value=value1
55g:114:a column=cf:, timestamp=2023-06-11T01:01:36.731, value=value1
8xg:123:a column=cf:, timestamp=2023-06-11T01:00:04.247, value=value1
95g:113:a column=cf:, timestamp=2023-06-11T01:01:27.374, value=value1
a5g:124:a column=cf:, timestamp=2023-06-11T01:00:17.300, value=value1
g5g:126:a column=cf:, timestamp=2023-06-11T01:00:34.410, value=value1
k5g:127:a column=cf:, timestamp=2023-06-11T01:00:44.973, value=value1
z5g:125:a column=cf:, timestamp=2023-06-11T01:00:24.036, value=value1
9 row(s)
Took 0.0372 seconds
Mapreduce code
Driver
public class RowFilteringJob {
private static final String zookeeper_quorum = "latte01,latte02,latte03";
private static final String zookeeper_port = "2181";
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// HBase関連のconfig登録
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", zookeeper_quorum);
config.set("hbase.zookeeper.property.clientPort", zookeeper_port);
// Job作成
Job job = new Job(config, "RowFilteringJob");
job.setJarByClass(RowFilteringJob.class);
Scan scan = new Scan();
// Full Scanを行う際、scanオブジェクトを以下のように設定する必要があります。
scan.setCaching(500);
scan.setCacheBlocks(false);
// Table Mapper登録
TableMapReduceUtil.initTableMapperJob(
"testtable", // テーブル名
scan,
RowFilteringMapper.class, // Mapper Class登録
Text.class,
IntWritable.class,
job
);
TableMapReduceUtil.initTableReducerJob(
"output_testtable", // Output table name
RowFilteringReducer.class,
job
);
job.setNumReduceTasks(1);
// Job送信
boolean b = job.waitForCompletion(true);
if(!b){
throw new IOException("error with job");
}
}
}
Mapper
public class RowFilteringMapper extends TableMapper<Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
private Text textKey = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context) throws IOException, InterruptedException {
String rowKey = Bytes.toString(key.get());
String columnValue = Bytes.toString(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("")));
// parsing row key
String[] rowkey_parts = rowKey.split(":");
if( "123".compareTo(rowkey_parts[1]) <=0 && rowkey_parts[1].compareTo("125")<=0){
// Set the output key-value pair
textKey.set(rowKey);
// Emit the output key-value pair
context.write(textKey, ONE);
}
}
Reducer
public class RowFilteringReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException {
int sum = 0;
// Iterate through the values and calculate the sum
for (IntWritable value : values) {
sum += value.get();
}
// Create a Put object with the result sum
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("result"), Bytes.toBytes(sum));
// Write the Put object to the context
context.write(null, put);
}
}
result
以下のように、timestamp 123、124、125のみを含むテーブルを生成できます。
hbase:005:0> scan 'output_testtable'
ROW COLUMN+CELL
8xg:123:a column=cf:result, timestamp=2023-06-11T01:33:07.352, value=\x00\x00\x00\x01
a5g:124:a column=cf:result, timestamp=2023-06-11T01:33:07.352, value=\x00\x00\x00\x01
z5g:125:a
output tableには冪等性がありません。