用于大数据的并查集(基于HBase)的java类

Comments: No Comments
Published on: 2012 年 09 月 10 日

在做推荐系统的时候想查看原始数据集中自然存在的类别有多少种,即找到一些子集,这些子集属于原始数据集,子集之间没有任何关联,而子集内部所有数据都有直接或间接的关联。

首先考虑的是由于数据规模,读入内存是不可能的,所以要借助硬盘(虽然很不情愿)。既然是借助硬盘,那就要文件存取。而又由于在处理过程中需要快速的查找数据是否存在于某个集合内和将数据集合关联等操作,选择使用并查集。

这样选择之后算是有一个解决方案了,但是还需要最后一个关键的部分,就是需要建立文件索引和缓存机制以便快速进行合并和查询过程。这里选择使用的工具还是最趁手的hbase,很好的解决这两个问题。

这个类主要解决的问题就是原始数据的聚类,有关联的聚在一起。核心的两个方法是:

public byte[] findSet(byte[] pos);
public void union(byte[] pos1, byte[] pos2);

其中还有一个

public byte[] findSet(byte[] pos)

是递归实现。两个方法都使用了路径压缩进行优化。union()方法的两个参数有顺序要求,其作用是后者集合连接到前者集合的根节点。

最后,计算的并行是使用MapReduce计算框架。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package recommendsystem;
 
import java.io.IOException;
import java.lang.reflect.Array;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
 
public class UnionFindSet {
	private Configuration _conf;
	private HBaseAdmin _hbAdmin;
	private HTable _unionTable;
 
	public static void main(String[] args) throws IOException {
		UnionFindSet ufs = new UnionFindSet("test");
		ufs.union(Bytes.toBytes("7"), Bytes.toBytes("8"));
		ufs.union(Bytes.toBytes("5"), Bytes.toBytes("9"));
		ufs.union(Bytes.toBytes("3"), Bytes.toBytes("7"));
		ufs.union(Bytes.toBytes("4"), Bytes.toBytes("6"));
		ufs.union(Bytes.toBytes("1"), Bytes.toBytes("7"));
		for (int i = 1; i < 10; i++) {
			System.out.println(Bytes.toString(ufs.findSet(Bytes.toBytes(String
					.valueOf(i)))));
		}
 
	}
 
	public UnionFindSet(String tableName) throws IOException {
		_conf = HBaseConfiguration.create();
		init(tableName);
	}
 
	public UnionFindSet(Configuration conf, String tableName)
			throws IOException {
		// _conf=new Configuration(conf);
		_conf = conf;
		init(tableName);
	}
 
	public UnionFindSet(HTable htable) {
		_unionTable = htable;
	}
 
	public void clear() throws IOException {
		_hbAdmin.close();
		_unionTable.close();
	}
 
	public int printSets() throws IOException {
		Scan scan = new Scan();
		scan.addColumn(GlobalName.CLASSIFICATION_FAMILY, GlobalName.NULL);
		scan.setCaching(1000);
		ResultScanner rs = _unionTable.getScanner(scan);
		int count = 0;
		for (Result r : rs) {
			if (arrayCompare(r.getRow(), r.value()) == 0) {
				System.out.println(Bytes.toString(r.getRow()));
				count++;
			}
		}
		return count;
	}
 
	public ResultScanner getSets() throws IOException {
		Scan scan = new Scan();
		scan.addColumn(GlobalName.CLASSIFICATION_FAMILY, GlobalName.NULL);
		scan.setCaching(1000);
		return _unionTable.getScanner(scan);
	}
 
	public byte[] RecursionFindSet(byte[] pos) throws IOException {
		byte[] tmpRes, tmpPre = pos;
		Result r = queryUnionHBase(pos);
		if (r.isEmpty()) {
			insertUnionHBase(pos, pos);
			return pos;
		}
		tmpRes = r.value();
		if (arrayCompare(r.value(), pos) != 0) {
			tmpPre = findSet(tmpRes);
			insertUnionHBase(pos, tmpPre);
		}
		return tmpPre;
	}
 
	public byte[] findSet(byte[] pos) throws IOException {
		byte[] tmpRes = pos, tmpPre;
		Result r = queryUnionHBase(pos);
		if (r.isEmpty()) {
			insertUnionHBase(pos, pos);
			return pos;
		}
		tmpPre = r.value();
		while (arrayCompare(tmpRes, tmpPre) != 0) {
			r = queryUnionHBase(tmpPre);
			tmpRes = tmpPre;
			tmpPre = r.value();
		}
		tmpRes = pos;
		while (arrayCompare(tmpPre, tmpRes) != 0) {
			insertUnionHBase(tmpPre, tmpRes);
			r = queryUnionHBase(tmpRes);
			tmpRes = r.value();
		}
		return tmpPre;
	}
 
	public void union(byte[] pos1, byte[] pos2) throws IOException {
		byte[] t1 = findSet(pos1);
		byte[] t2 = findSet(pos2);
		if (arrayCompare(t1, t2) == 0)
			return;
		insertUnionHBase(t2, t1);
	}
 
	private void init(String tableName) throws IOException {
		_hbAdmin = new HBaseAdmin(_conf);
		if (_hbAdmin.tableExists(tableName)) {
			_hbAdmin.disableTable(tableName);
			_hbAdmin.deleteTable(tableName);
		}
		createDB(tableName);
		_unionTable = new HTable(_conf, tableName);
	}
 
	private void createDB(String tableName) throws IOException {
		HTableDescriptor hd = new HTableDescriptor(tableName);
		hd.addFamily(new HColumnDescriptor(GlobalName.CLASSIFICATION_FAMILY));
		CreateDB.createTable(_hbAdmin, hd,
				CreateDB.getHexSplits("0", "110000000", 10));
	}
 
	private void insertUnionHBase(byte[] row, byte[] value) throws IOException {
		Put put = new Put(value);
		put.add(GlobalName.CLASSIFICATION_FAMILY, GlobalName.NULL, row);
		_unionTable.put(put);
	}
 
	private Result queryUnionHBase(byte[] row) throws IOException {
		Get get = new Get(row);
		get.addColumn(GlobalName.CLASSIFICATION_FAMILY, GlobalName.NULL);
		return _unionTable.get(get);
	}
 
	private int arrayCompare(byte[] o1, byte[] o2) {
		int len = Array.getLength(o1);
		if (len != Array.getLength(o2)) {
			return -1;
		}
		for (int i = 0; i < len; i++) {
			if (o1[i] != o2[i])
				return 1;
		}
		return 0;
	}
}

我猜你可能也喜欢:

No Comments - Leave a comment

Leave a comment

电子邮件地址不会被公开。 必填项已用*标注

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>


Welcome , today is 星期一, 2017 年 09 月 25 日