hbase表操作优化

Comments: No Comments
Published on: 2012 年 07 月 24 日

Hbase表写入

1、使用批量加载工具,具体看链接: Section 9.8, “Bulk Loading”

2、巨量数据插入空表或小表的时候,需要注意hbase的分表原理。只有当一个region的数据打过阈值的时候才会进行分表操作。这意味着你巨量数据将会同时写入同一个region直到表大到需要分割。比较好的做法是创建表的时候预先创建多个空的region。但是不要太多,太多同样会降低程序效率,有需要的可以做下实验预留多少个空表合适。官网给出了一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static boolean createTable(HBaseAdmin admin, HTableDescriptor table, byte[][] splits)
throws IOException {
  try {
    admin.createTable( table, splits );
    return true;
  } catch (TableExistsException e) {
    logger.info("table " + table.getNameAsString() + " already exists");
    // the table already exists...
    return false;  
  }
}
 
public static byte[][] getHexSplits(String startKey, String endKey, int numRegions) {
  byte[][] splits = new byte[numRegions-1][];
  BigInteger lowestKey = new BigInteger(startKey, 16);
  BigInteger highestKey = new BigInteger(endKey, 16);
  BigInteger range = highestKey.subtract(lowestKey);
  BigInteger regionIncrement = range.divide(BigInteger.valueOf(numRegions));
  lowestKey = lowestKey.add(regionIncrement);
  for(int i=0; i < numRegions-1;i++) {
    BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));
    byte[] b = String.format("%016x", key).getBytes();
    splits[i] = b;
  }
  return splits;
}

调用createTable进行表创建,getHexSplits的三个参数是起始键、结束键和分表的个数。上述程序key是十六进制,根据需要自行修改。

3、延迟日志刷新
hbase的机制是put一写入就刷新日志,如果使用延迟机制,则日志存入内存直到日志刷新周期。这样的好处不用再说,但是坏处是region如果宕机,这会造成存入内存的日志信息丢失不能进行数据回滚等行为。

该项配置可以在HTableDescriptor中设定,也可以配置参数hbase.regionserver.optionallogflushinterval,默认是 1000ms.

4、自动刷写
当你进行大量的Put的时候,要确认你的HTable的setAutoFlush是关闭着的。否则的话,每执行一个Put就要想RegionServer发一个请求。通过 htable.add(Put) 和 htable.add( Put)来将Put添加到写缓冲中。如果 autoFlush = false,要等到写缓冲都填满的时候才会发起请求。要想显式的发起请求,可以调用flushCommits。在HTable实例上进行的close操作也会发起flushCommits

5、在Puts上关闭WAL
加大puts插表的吞吐量和效率需要设定put.setWriteToWAL为false。设定为false意味着put入表前不会写日志,仅存入内存。带来的后果就是如果region宕机,数据就会丢失。你可能在实际应用中发现其他效率提升没那么明显,如果数据负载在分布式集群中。

一般都会不使用该选项,而是用bulk loading技术替代。

6、Group Puts by RegionServer
除了使用writeBuffer,puts通过regionServer分组可以减少客户端RPC每秒调用writeBuffer flush数量。有个实用的HTableUtil 能做这些。如果版本在0.90x或更早的可以拷贝使用。

7、跳过reducer步骤
如果大量数据入一个表时,如果mapper能做完,就不要使用reducer过程。当reducer过程被使用,所有的mapper输出将入磁盘,并会触发sorted和shuffled过程。但是你也有必须使用reducer的时候。

8、 Anti-Pattern: One Hot Region

If all your data is being written to one region at a time, then re-read the section on processing timeseries data.

Also, if you are pre-splitting regions and all your data isstill winding up in a single region even though your keys aren't monotonically increasing, confirm that your keyspace actually works with the split strategy. There are a variety of reasons that regions may appear "well split" but won't work with your data. As the HBase client communicates directly with the RegionServers, this can be obtained viaHTable.getRegionLocation.

See Section 11.7.2, “ Table Creation: Pre-Creating Regions ”, as well as Section 11.4, “HBase Configurations”

一个非常棒的样例MR,其中setup会在mapper执行前调用一次,cleanup会在最后执行完毕调用一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class test extends Configured implements Tool {
	static final Log LOG = LogFactory.getLog(test.class);
	public static final String JOBNAME = "MRImport ";
 
	public static class Map extends
			Mapper<LongWritable, Text, NullWritable, NullWritable> {
		Configuration configuration = null;
		HTable xTable = null;
		private boolean wal = true;
		private Put put = null;
		static long count = 0;
 
		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			super.setup(context);
			configuration = context.getConfiguration();
			xTable = new HTable(configuration, "testKang");
			xTable.setAutoFlush(false);
			xTable.setWriteBufferSize(12 * 1024 * 1024);
			wal = true;
		}
 
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String all[] = value.toString().split("/t");
			if (all.length == 2) {
				put = new Put(Bytes.toBytes(all[0]));
				put.add(Bytes.toBytes("xxx"), Bytes.toBytes("20110313"),
						Bytes.toBytes(all[1]));
			}
 
			if (!wal) {
				put.setWriteToWAL(false);
			}
 
			xTable.put(put);
			if ((++count % 100) == 0) {
				context.setStatus(count + " DOCUMENTS done!");
				context.progress();
				System.out.println(count + " DOCUMENTS done!");
			}
		}
 
		@Override
		protected void cleanup(Context context) throws IOException,
				InterruptedException {
			super.cleanup(context);
			xTable.flushCommits();
			xTable.close();
		}
 
	}
 
	public int run(String[] args) throws Exception {
		String input = args[0];
		Configuration conf = HBaseConfiguration.create(getConf());
		conf.set("hbase.master", "m0:60000");
		Job job = new Job(conf, JOBNAME);
		job.setJarByClass(test.class);
		job.setMapperClass(Map.class);
		job.setNumReduceTasks(0);
		job.setInputFormatClass(TextInputFormat.class);
		TextInputFormat.setInputPaths(job, input);
		job.setOutputFormatClass(NullOutputFormat.class);
		return job.waitForCompletion(true) ? 0 : 1;
	}
 
	public static void main(String[] args) throws IOException {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();
		int res = 1;
		try {
			res = ToolRunner.run(conf, new test(), otherArgs);
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.exit(res);
 
	}
 
}

Hbase表读取

1、设定scan缓存数量
默认的是scan一次读取一行数据,请求一次regionServer,设定scan.setCaching(100)适合的值非常重要,也不能过大,因为会消耗内存。

2、scan caching在mapreduce任务里要特别注意
如果caching值设置过大会造成超时异常(e.g. UnknownScannerException 、 socket timelimit之类的,我就被这点坑惨了,也没找到相关解决办法,这里看到了~T-T),即不能在设定的阈值时间范围内获取到下一个数据集。因为一般的一个map处理一行数据,但是获取的多了可能使map处理多行数据时间过久(可能map逻辑复杂、时间复杂度高),都会造成超时问题。一般这时候就设置低一些。

同样这个超时问题也可能出现在非mapreduce任务中,这种情况多出现于同时跑了mapreduce作业。

3、scan属性选择
用scan处理大量行时,注意 scan.addFamily和 scan.addColumn的选择,一般尽量选择的精确,否则会造成资源较大的浪费。

4、MapReduce - Input Splits
For MapReduce jobs that use HBase tables as a source, if there a pattern where the "slow" map tasks seem to have the same Input Split (i.e., the RegionServer serving the data), see the Troubleshooting Case Study in Section 13.3.1, “Case Study #1 (Performance Issue On A Single Node)”.

5、记得关闭 ResultScanners
这与其说是提高性能,倒不如说是避免发生性能问题。如果你忘记了关闭ResultScanners,会导致RegionServer出现问题。所以一定要把ResultScanner包含在try/catch 块中...

1
2
3
4
5
6
7
8
9
10
Scan scan = new Scan();
// set attrs...
ResultScanner rs = htable.getScanner(scan);
try {
  for (Result r = rs.next(); r != null; r = rs.next()) {
  // process result...
} finally {
  rs.close();  // always close the ResultScanner!
}
htable.close();

6、块缓存
Scan实例可以在RegionServer中使用块缓存,可以由setCacheBlocks方法控制。如果Scan是MapReduce的输入源,要将这个值设置为 false。对于经常读到的行,就建议使用块缓冲。

7、Row Keys 的负载优化
scan一个表的时候, 如果仅仅需要row key(不需要no families, qualifiers, values 和 timestamps),在加入FilterList的时候,要使用Scanner的setFilter方法的时候,要填上MUST_PASS_ALL操作参数(译者注:相当于And操作符)。一个FilterList要包含一个FirstKeyOnlyFilter 和一个 KeyOnlyFilter.通过这样的filter组合,就算在最坏的情况下,RegionServer只会从磁盘读一个值,同时最小化客户端的网络带宽占用。

8、监视数据传输
如果执行一个高并发的读取操作,监视数据的传输目标表,如果目标表局限在几个少量的region上,那么可能能进行服务的节点就会很少。
See Section 11.7.2, “ Table Creation: Pre-Creating Regions ”, as well as Section 11.4, “HBase Configurations”

Hbase表删除

1、将hbase表当队列
hbase表有时用起来像queue,这种情况下必须很小心,要经常执行主要压缩程序使表紧凑。在删除很多行后建立storeFiles当需要读取的时候。删除数据只在精简程序执行的时候被清除(Tombstones only get cleaned up with major compactions.)。
See also Section 9.7.5.5, “Compaction” and HBaseAdmin.majorCompact.

2、删除的RPC行为
必须注意到htable.delete(Delete)不是使用的writeBuffer。它将执行RegionServer RPC在每次调用。如果删除大量数据考虑使用table.delete(List)。
See http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/HTable.html#delete%28org.apache.hadoop.hbase.client.Delete%29

我猜你可能也喜欢:

No Comments - Leave a comment

Leave a comment

电子邮件地址不会被公开。 必填项已用*标注

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>


Welcome , today is 星期五, 2017 年 12 月 15 日