Skip to content
Published on

HBase Mapreduceの高度な機能

Authors
  • Name
    Twitter

Background

以下のように、salt:timestamp:data形式のrow_keyがある場合、特定のtimestamp範囲のrow keyだけを抽出して新しいテーブルを作成したい時、検索したいキーがrow_keyの中間にあるためfull scanせざるを得ません。full scanは時間がかかりすぎるため、MapReduce Jobを作成して目的のrow keyで新しいテーブルを生成することができます。

hbase:004:0> scan 'testtable'
ROW                                                           COLUMN+CELL
 15g:112:a                                                    column=cf:, timestamp=2023-06-11T01:01:03.539, value=value1
 45g:111:a                                                    column=cf:, timestamp=2023-06-11T01:00:56.308, value=value1
 55g:114:a                                                    column=cf:, timestamp=2023-06-11T01:01:36.731, value=value1
 8xg:123:a                                                    column=cf:, timestamp=2023-06-11T01:00:04.247, value=value1
 95g:113:a                                                    column=cf:, timestamp=2023-06-11T01:01:27.374, value=value1
 a5g:124:a                                                    column=cf:, timestamp=2023-06-11T01:00:17.300, value=value1
 g5g:126:a                                                    column=cf:, timestamp=2023-06-11T01:00:34.410, value=value1
 k5g:127:a                                                    column=cf:, timestamp=2023-06-11T01:00:44.973, value=value1
 z5g:125:a                                                    column=cf:, timestamp=2023-06-11T01:00:24.036, value=value1
9 row(s)
Took 0.0372 seconds

Mapreduce code

Driver

public class RowFilteringJob {
    private static final String zookeeper_quorum = "latte01,latte02,latte03";
    private static final String zookeeper_port = "2181";

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // HBase関連のconfig登録
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", zookeeper_quorum);
        config.set("hbase.zookeeper.property.clientPort", zookeeper_port);
        // Job作成
        Job job = new Job(config, "RowFilteringJob");
        job.setJarByClass(RowFilteringJob.class);
        Scan scan = new Scan();
        // Full Scanを行う際、scanオブジェクトを以下のように設定する必要があります。
        scan.setCaching(500);
        scan.setCacheBlocks(false);
        // Table Mapper登録
        TableMapReduceUtil.initTableMapperJob(
                "testtable", // テーブル名
                scan,
                RowFilteringMapper.class, // Mapper Class登録
                Text.class,
                IntWritable.class,
                job
        );
        TableMapReduceUtil.initTableReducerJob(
                "output_testtable",    // Output table name
                RowFilteringReducer.class,
                job
        );

        job.setNumReduceTasks(1);
        // Job送信
        boolean b = job.waitForCompletion(true);
        if(!b){
            throw new IOException("error with job");
        }
    }
}

Mapper

public class RowFilteringMapper extends TableMapper<Text, IntWritable> {
    private final static IntWritable ONE = new IntWritable(1);
    private Text textKey = new Text();
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        String rowKey = Bytes.toString(key.get());
        String columnValue = Bytes.toString(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("")));
        // parsing row key
        String[] rowkey_parts = rowKey.split(":");
        if( "123".compareTo(rowkey_parts[1]) <=0 && rowkey_parts[1].compareTo("125")<=0){
            // Set the output key-value pair
            textKey.set(rowKey);
            // Emit the output key-value pair
            context.write(textKey, ONE);
        }
    }

Reducer

public class RowFilteringReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException {
        int sum = 0;

        // Iterate through the values and calculate the sum
        for (IntWritable value : values) {
            sum += value.get();
        }
        // Create a Put object with the result sum
        Put put = new Put(Bytes.toBytes(key.toString()));
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("result"), Bytes.toBytes(sum));

        // Write the Put object to the context
        context.write(null, put);
    }
}

result

以下のように、timestamp 123、124、125のみを含むテーブルを生成できます。

hbase:005:0> scan 'output_testtable'
ROW                                                           COLUMN+CELL
 8xg:123:a                                                    column=cf:result, timestamp=2023-06-11T01:33:07.352, value=\x00\x00\x00\x01
 a5g:124:a                                                    column=cf:result, timestamp=2023-06-11T01:33:07.352, value=\x00\x00\x00\x01
 z5g:125:a

output tableには冪等性がありません。