MapReduce生成HFile入库到HBase及源码分析

Comments: 1 Comment
Published on: 2012 年 07 月 27 日

如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即hbase提供的HFileOutputFormat类。

它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。

下边给出mapreduce程序样例,数据源是hbase,结果文件输出路径自定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class HFileOutput {
        //job 配置
	public static Job configureJob(Configuration conf) throws IOException {
		Job job = new Job(configuration, "countUnite1");
		job.setJarByClass(HFileOutput.class);
                //job.setNumReduceTasks(2);  
		//job.setOutputKeyClass(ImmutableBytesWritable.class);
		//job.setOutputValueClass(KeyValue.class);
		//job.setOutputFormatClass(HFileOutputFormat.class);
 
		Scan scan = new Scan();
		scan.setCaching(10);
		scan.addFamily(INPUT_FAMILY);
		TableMapReduceUtil.initTableMapperJob(inputTable, scan,
				HFileOutputMapper.class, ImmutableBytesWritable.class, LongWritable.class, job);
		//这里如果不定义reducer部分,会自动识别定义成KeyValueSortReducer.class 和PutSortReducer.class
                job.setReducerClass(HFileOutputRedcuer.class);
		//job.setOutputFormatClass(HFileOutputFormat.class);
		HFileOutputFormat.configureIncrementalLoad(job, new HTable(
				configuration, outputTable));
		HFileOutputFormat.setOutputPath(job, new Path());
                //FileOutputFormat.setOutputPath(job, new Path()); //等同上句
		return job;
	}
 
	public static class HFileOutputMapper extends
			TableMapper<ImmutableBytesWritable, LongWritable> {
		public void map(ImmutableBytesWritable key, Result values,
				Context context) throws IOException, InterruptedException {
			//mapper逻辑部分
			context.write(new ImmutableBytesWritable(Bytes()), LongWritable());
		}
	}
 
	public static class HFileOutputRedcuer extends
			Reducer<ImmutableBytesWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
		public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values,
				Context context) throws IOException, InterruptedException {
                        //reducer逻辑部分
			KeyValue kv = new KeyValue(row, OUTPUT_FAMILY, tmp[1].getBytes(),
					Bytes.toBytes(count));
			context.write(key, kv);
		}
	}
}

这里需要注意的是无论是map还是reduce作为最终的输出结果,输出的key和value的类型应该是:< ImmutableBytesWritable, KeyValue> 或者< ImmutableBytesWritable, Put>。否则报这样的错误:


java.lang.IllegalArgumentException: Can't read partitions file
...
Caused by: java.io.IOException: wrong key class: org.apache.hadoop.io.*** is not class org.apache.hadoop.hbase.io.ImmutableBytesWritable

上边配置部分,注释掉的其实写不写都无所谓,因为看源码(最后有贴出源码)就知道configureIncrementalLoad方法已经把固定的配置全配置完了,不固定的需要手动配置。setNumReduceTasks设置是根据region个数自动配置的。

生成的文件入库代码为:

1
2
3
4
5
6
7
8
9
public class TestLoadIncrementalHFileToHBase {  
    public static void main(String[] args) throws IOException {  
        Configuration conf = HBaseConfiguration.create();  
        byte[] TABLE = Bytes.toBytes(args[0]);  
        HTable table = new HTable(TABLE);  
        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);  
        loader.doBulkLoad(new Path(args[1]), table);  
    }  
}

另外hbase有打包好的入库jar包使用方法:

hadoop jar hbase-VERSION.jar completebulkload /myoutput mytable;

最后就是你执行你的程序时有可能遇到这样的问题:

FAILED Error: java.lang.ClassNotFoundException: com.google.common.util.concurrent.ThreadFactoryBuilder

就是你需要添加一个jar包,位置在HBASE_HOME/bin/guava-r09.jar ,添加上就OK了。

下边把HFileOutputFormat类的源码贴出来看一看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
/**
 * Writes HFiles. Passed KeyValues must arrive in order.
 * Currently, can only write files to a single column family at a
 * time.  Multiple column families requires coordinating keys cross family.
 * Writes current time as the sequence id for the file. Sets the major compacted
 * attribute on created hfiles. Calling write(null,null) will forceably roll
 * all HFiles being written.
 * @see KeyValueSortReducer
 */
public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
  static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
  static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
  TimeRangeTracker trt = new TimeRangeTracker();
 
  public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
  throws IOException, InterruptedException {
    // Get the path of the temporary output file
    final Path outputPath = FileOutputFormat.getOutputPath(context);
    final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
    final Configuration conf = context.getConfiguration();
    final FileSystem fs = outputdir.getFileSystem(conf);
    // These configs. are from hbase-*.xml
    final long maxsize = conf.getLong("hbase.hregion.max.filesize",
        HConstants.DEFAULT_MAX_FILE_SIZE);
    final int blocksize = conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize",
        HFile.DEFAULT_BLOCKSIZE);
    // Invented config.  Add to hbase-*.xml if other than default compression.
    final String defaultCompression = conf.get("hfile.compression",
        Compression.Algorithm.NONE.getName());
 
    // create a map from column family to the compression algorithm
    final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
 
    return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
      // Map of families to writers and how much has been output on the writer.
      private final Map<byte [], WriterLength> writers =
        new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
      private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
      private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
      private boolean rollRequested = false;
 
      public void write(ImmutableBytesWritable row, KeyValue kv)
      throws IOException {
        // null input == user explicitly wants to flush
        if (row == null && kv == null) {
          rollWriters();
          return;
        }
 
        byte [] rowKey = kv.getRow();
        long length = kv.getLength();
        byte [] family = kv.getFamily();
        WriterLength wl = this.writers.get(family);
 
        // If this is a new column family, verify that the directory exists
        if (wl == null) {
          fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
        }
 
        // If any of the HFiles for the column families has reached
        // maxsize, we need to roll all the writers
        if (wl != null && wl.written + length >= maxsize) {
          this.rollRequested = true;
        }
 
        // This can only happen once a row is finished though
        if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
          rollWriters();
        }
 
        // create a new HLog writer, if necessary
        if (wl == null || wl.writer == null) {
          wl = getNewWriter(family, conf);
        }
 
        // we now have the proper HLog writer. full steam ahead
        kv.updateLatestStamp(this.now);
        trt.includeTimestamp(kv);
        wl.writer.append(kv);
        wl.written += length;
 
        // Copy the row so we know when a row transition.
        this.previousRow = rowKey;
      }
 
      private void rollWriters() throws IOException {
        for (WriterLength wl : this.writers.values()) {
          if (wl.writer != null) {
            LOG.info("Writer=" + wl.writer.getPath() +
                ((wl.written == 0)? "": ", wrote=" + wl.written));
            close(wl.writer);
          }
          wl.writer = null;
          wl.written = 0;
        }
        this.rollRequested = false;
      }
 
      /* Create a new HFile.Writer.
       * @param family
       * @return A WriterLength, containing a new HFile.Writer.
       * @throws IOException
       */
      private WriterLength getNewWriter(byte[] family, Configuration conf)
          throws IOException {
        WriterLength wl = new WriterLength();
        Path familydir = new Path(outputdir, Bytes.toString(family));
        String compression = compressionMap.get(family);
        compression = compression == null ? defaultCompression : compression;
        wl.writer =
          HFile.getWriterFactory(conf).createWriter(fs,
          StoreFile.getUniqueFile(fs, familydir), blocksize,
          compression, KeyValue.KEY_COMPARATOR);
        this.writers.put(family, wl);
        return wl;
      }
 
      private void close(final HFile.Writer w) throws IOException {
        if (w != null) {
          w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
              Bytes.toBytes(System.currentTimeMillis()));
          w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
              Bytes.toBytes(context.getTaskAttemptID().toString()));
          w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
              Bytes.toBytes(true));
          w.appendFileInfo(StoreFile.TIMERANGE_KEY,
              WritableUtils.toByteArray(trt));
          w.close();
        }
      }
 
      public void close(TaskAttemptContext c)
      throws IOException, InterruptedException {
        for (WriterLength wl: this.writers.values()) {
          close(wl.writer);
        }
      }
    };
  }
 
  /*
   * Data structure to hold a Writer and amount of data written on it.
   */
  static class WriterLength {
    long written = 0;
    HFile.Writer writer = null;
  }
 
  /**
   * Return the start keys of all of the regions in this table,
   * as a list of ImmutableBytesWritable.
   */
  private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)
  throws IOException {
    byte[][] byteKeys = table.getStartKeys();
    ArrayList<ImmutableBytesWritable> ret =
      new ArrayList<ImmutableBytesWritable>(byteKeys.length);
    for (byte[] byteKey : byteKeys) {
      ret.add(new ImmutableBytesWritable(byteKey));
    }
    return ret;
  }
 
  /**
   * Write out a SequenceFile that can be read by TotalOrderPartitioner
   * that contains the split points in startKeys.
   * @param partitionsPath output path for SequenceFile
   * @param startKeys the region start keys
   */
  private static void writePartitions(Configuration conf, Path partitionsPath,
      List<ImmutableBytesWritable> startKeys) throws IOException {
    if (startKeys.isEmpty()) {
      throw new IllegalArgumentException("No regions passed");
    }
 
    // We're generating a list of split points, and we don't ever
    // have keys < the first region (which has an empty start key)
    // so we need to remove it. Otherwise we would end up with an
    // empty reducer with index 0
    TreeSet<ImmutableBytesWritable> sorted =
      new TreeSet<ImmutableBytesWritable>(startKeys);
 
    ImmutableBytesWritable first = sorted.first();
    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
      throw new IllegalArgumentException(
          "First region of table should have empty start key. Instead has: "
          + Bytes.toStringBinary(first.get()));
    }
    sorted.remove(first);
 
    // Write the actual file
    FileSystem fs = partitionsPath.getFileSystem(conf);
    SequenceFile.Writer writer = SequenceFile.createWriter(fs,
        conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class);
 
    try {
      for (ImmutableBytesWritable startKey : sorted) {
        writer.append(startKey, NullWritable.get());
      }
    } finally {
      writer.close();
    }
  }
 
  /**
   * Configure a MapReduce Job to perform an incremental load into the given
   * table. This
   * <ul>
   *   <li>Inspects the table to configure a total order partitioner</li>
   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
   *   <li>Sets the output key/value class to match HFileOutputFormat's requirements</li>
   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
   *     PutSortReducer)</li>
   * </ul> 
   * The user should be sure to set the map output value class to either KeyValue or Put before
   * running this function.
   */
  public static void configureIncrementalLoad(Job job, HTable table)
  throws IOException {
    Configuration conf = job.getConfiguration();
    Class<? extends Partitioner> topClass;
    try {
      topClass = getTotalOrderPartitionerClass();
    } catch (ClassNotFoundException e) {
      throw new IOException("Failed getting TotalOrderPartitioner", e);
    }
    job.setPartitionerClass(topClass);
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(KeyValue.class);
    job.setOutputFormatClass(HFileOutputFormat.class);
 
    // Based on the configured map output class, set the correct reducer to properly
    // sort the incoming values.
    // TODO it would be nice to pick one or the other of these formats.
    if (KeyValue.class.equals(job.getMapOutputValueClass())) {
      job.setReducerClass(KeyValueSortReducer.class);
    } else if (Put.class.equals(job.getMapOutputValueClass())) {
      job.setReducerClass(PutSortReducer.class);
    } else {
      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
    }
 
    LOG.info("Looking up current regions for table " + table);
    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
        "to match current region count");
    job.setNumReduceTasks(startKeys.size());
 
    Path partitionsPath = new Path(job.getWorkingDirectory(),
        "partitions_" + System.currentTimeMillis());
    LOG.info("Writing partition information to " + partitionsPath);
 
    FileSystem fs = partitionsPath.getFileSystem(conf);
    writePartitions(conf, partitionsPath, startKeys);
    partitionsPath.makeQualified(fs);
 
    URI cacheUri;
    try {
      // Below we make explicit reference to the bundled TOP.  Its cheating.
      // We are assume the define in the hbase bundled TOP is as it is in
      // hadoop (whether 0.20 or 0.22, etc.)
      cacheUri = new URI(partitionsPath.toString() + "#" +
        org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner.DEFAULT_PATH);
    } catch (URISyntaxException e) {
      throw new IOException(e);
    }
    DistributedCache.addCacheFile(cacheUri, conf);
    DistributedCache.createSymlink(conf);
 
    // Set compression algorithms based on column families
    configureCompression(table, conf);
 
    TableMapReduceUtil.addDependencyJars(job);
    LOG.info("Incremental table output configured.");
  }
 
  /**
   * If > hadoop 0.20, then we want to use the hadoop TotalOrderPartitioner.
   * If 0.20, then we want to use the TOP that we have under hadoopbackport.
   * This method is about hbase being able to run on different versions of
   * hadoop.  In 0.20.x hadoops, we have to use the TOP that is bundled with
   * hbase.  Otherwise, we use the one in Hadoop.
   * @return Instance of the TotalOrderPartitioner class
   * @throws ClassNotFoundException If can't find a TotalOrderPartitioner.
   */
  private static Class<? extends Partitioner> getTotalOrderPartitionerClass()
  throws ClassNotFoundException {
    Class<? extends Partitioner> clazz = null;
    try {
      clazz = (Class<? extends Partitioner>) Class.forName("org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner");
    } catch (ClassNotFoundException e) {
      clazz =
        (Class<? extends Partitioner>) Class.forName("org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner");
    }
    return clazz;
  }
 
  /**
   * Run inside the task to deserialize column family to compression algorithm
   * map from the
   * configuration.
   * 
   * Package-private for unit tests only.
   * 
   * @return a map from column family to the name of the configured compression
   *         algorithm
   */
  static Map<byte[], String> createFamilyCompressionMap(Configuration conf) {
    Map<byte[], String> compressionMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
    String compressionConf = conf.get(COMPRESSION_CONF_KEY, "");
    for (String familyConf : compressionConf.split("&")) {
      String[] familySplit = familyConf.split("=");
      if (familySplit.length != 2) {
        continue;
      }
 
      try {
        compressionMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
            URLDecoder.decode(familySplit[1], "UTF-8"));
      } catch (UnsupportedEncodingException e) {
        // will not happen with UTF-8 encoding
        throw new AssertionError(e);
      }
    }
    return compressionMap;
  }
 
  /**
   * Serialize column family to compression algorithm map to configuration.
   * Invoked while configuring the MR job for incremental load.
   * 
   * Package-private for unit tests only.
   * 
   * @throws IOException
   *           on failure to read column family descriptors
   */
  static void configureCompression(HTable table, Configuration conf) throws IOException {
    StringBuilder compressionConfigValue = new StringBuilder();
    HTableDescriptor tableDescriptor = table.getTableDescriptor();
    if(tableDescriptor == null){
      // could happen with mock table instance
      return;
    }
    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
    int i = 0;
    for (HColumnDescriptor familyDescriptor : families) {
      if (i++ > 0) {
        compressionConfigValue.append('&');
      }
      compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
      compressionConfigValue.append('=');
      compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8"));
    }
    // Get rid of the last ampersand
    conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
  }
}

我猜你可能也喜欢:

1 Comment - Leave a comment

Leave a comment

电子邮件地址不会被公开。 必填项已用*标注

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>


Welcome , today is 星期二, 2017 年 10 月 24 日