Skip to content

필사 모드: HBase Mapreduce advanced feature

English
0%
정확도 0%
💡 왼쪽 원문을 읽으면서 오른쪽에 따라 써보세요. Tab 키로 힌트를 받을 수 있습니다.
원문 렌더가 준비되기 전까지 텍스트 가이드로 표시합니다.

Background

As shown below, when you have row keys in the format `salt:timestamp:data` and want to extract only the row keys within a specific `timestamp` range into a new table, a full scan is unavoidable because the key you want to query is in the middle of the row key. Since a full scan takes too long, you can write a MapReduce job to create a new table with the desired row keys.

hbase:004:0> scan 'testtable'

ROW COLUMN+CELL

15g:112:a column=cf:, timestamp=2023-06-11T01:01:03.539, value=value1

45g:111:a column=cf:, timestamp=2023-06-11T01:00:56.308, value=value1

55g:114:a column=cf:, timestamp=2023-06-11T01:01:36.731, value=value1

8xg:123:a column=cf:, timestamp=2023-06-11T01:00:04.247, value=value1

95g:113:a column=cf:, timestamp=2023-06-11T01:01:27.374, value=value1

a5g:124:a column=cf:, timestamp=2023-06-11T01:00:17.300, value=value1

g5g:126:a column=cf:, timestamp=2023-06-11T01:00:34.410, value=value1

k5g:127:a column=cf:, timestamp=2023-06-11T01:00:44.973, value=value1

z5g:125:a column=cf:, timestamp=2023-06-11T01:00:24.036, value=value1

9 row(s)

Took 0.0372 seconds

Mapreduce code

Driver

public class RowFilteringJob {

private static final String zookeeper_quorum = "latte01,latte02,latte03";

private static final String zookeeper_port = "2181";

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

// Register HBase-related config

Configuration config = HBaseConfiguration.create();

config.set("hbase.zookeeper.quorum", zookeeper_quorum);

config.set("hbase.zookeeper.property.clientPort", zookeeper_port);

// Create Job

Job job = new Job(config, "RowFilteringJob");

job.setJarByClass(RowFilteringJob.class);

Scan scan = new Scan();

// When performing a full scan, the scan object should be configured as below.

scan.setCaching(500);

scan.setCacheBlocks(false);

// Register Table Mapper

TableMapReduceUtil.initTableMapperJob(

"testtable", // table name

scan,

RowFilteringMapper.class, // Mapper Class registration

Text.class,

IntWritable.class,

job

);

TableMapReduceUtil.initTableReducerJob(

"output_testtable", // Output table name

RowFilteringReducer.class,

job

);

job.setNumReduceTasks(1);

// Submit job

boolean b = job.waitForCompletion(true);

if(!b){

throw new IOException("error with job");

}

}

}

Mapper

public class RowFilteringMapper extends TableMapper<Text, IntWritable> {

private final static IntWritable ONE = new IntWritable(1);

private Text textKey = new Text();

@Override

protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context) throws IOException, InterruptedException {

String rowKey = Bytes.toString(key.get());

String columnValue = Bytes.toString(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("")));

// parsing row key

String[] rowkey_parts = rowKey.split(":");

if( "123".compareTo(rowkey_parts[1]) <=0 && rowkey_parts[1].compareTo("125")<=0){

// Set the output key-value pair

textKey.set(rowKey);

// Emit the output key-value pair

context.write(textKey, ONE);

}

}

Reducer

public class RowFilteringReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {

@Override

protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException {

int sum = 0;

// Iterate through the values and calculate the sum

for (IntWritable value : values) {

sum += value.get();

}

// Create a Put object with the result sum

Put put = new Put(Bytes.toBytes(key.toString()));

put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("result"), Bytes.toBytes(sum));

// Write the Put object to the context

context.write(null, put);

}

}

result

As shown below, a table containing only timestamps 123, 124, and 125 can be created.

hbase:005:0> scan 'output_testtable'

ROW COLUMN+CELL

8xg:123:a column=cf:result, timestamp=2023-06-11T01:33:07.352, value=\x00\x00\x00\x01

a5g:124:a column=cf:result, timestamp=2023-06-11T01:33:07.352, value=\x00\x00\x00\x01

z5g:125:a

The output table is not idempotent.

Quiz

HBase Mapreduce advanced feature

result As shown below, a table containing only timestamps 123, 124, and 125 can be created. The

output table is not idempotent.

현재 단락 (1/92)

As shown below, when you have row keys in the format `salt:timestamp:data` and want to extract only ...

작성 글자: 0원문 글자: 3,824작성 단락: 0/92