周六晚上下了一场雪,周日去爬了莽山,只能说这场雪对这次游玩非常非常加分,非常漂亮的北国风光千里冰封万里雪飘的群山景色。非常漂亮,玩的也很开心~~
第四章:Hadoop I/O
HDFS会对写入的所有数据计算校验和,以保证在I/O过程中数据的完整性,不仅如此,为了保证数据的完好,每个datanode也会在一个后台线程中运行一个DataBlockScanner,来定期验证存储在这个datanode上的所有数据块。 Hadoop提供文件压缩功能,以优化文件存储所需的磁盘空间和网络磁盘传输的消耗。
压缩格式 | 工具 | 算法 | 文件扩展名 | 是否包含多个文件 | 是否可切分 |
DEFLATE | N/A | DEFLATE | .deflate | No | No |
gzip | gzip | DEFLATE | .gz | No | No |
bzip2 | bzip2 | bzip2 | .bz2 | No | Yes |
LZO | lzop | LZO | .lzo | No | No |
DEFLATE是一个标准压缩算法,该算法的标准实现是zlib。没有可用于生成DEFLATE文件的常用命令行工具,因为通常都是用gzip格式。gzip文件格式只是在DEFLATE格式上增加了文件头和一个文件尾。.deflate文件扩展名是Hadoop约定的。 所有压缩算法都需要权衡空间/时间:压缩和解压缩速度更快,其代价通常是只能节省少量的空间,上边列出的压缩工具都提供9个不同的选项来控制压缩时必须考虑的权衡:选项-1为优化压缩速度,-9为优化压缩空间,如gzip -1 file 会通过最快的压缩方法创建一个名为file.gz的压缩文件。gzip在权衡中是居中的一个:bzip2比gzip高效,但压缩速度慢,LZO优化压缩速度,比gzip更快,但压缩效率差一些。表中说的“是否可切分”这一列,表示该压缩算法是否支持切分,也就是说是否可以搜索数据流的任意位置并进一步往下读取数据。可切分压缩格式尤其适合MapReduce。 codec实现了一种压缩-解压缩算法,在Hadoop中,一个对CompressionCodec接口的实现代表一个codec。
压缩格式 | Hadoop CompressionCodec DEFLATE | org.apache.hadoop.io.compress.DefaultCodec gzip | org.apache.hadoop.io.compress.GzipCodec bzip2 |org.apache.hadoop.io.compress.BZip2Codec LZO | com.hadoop.compression.lzo.LzopCodec
CompressionCodec包含两个函数,如果要对数据压缩,可用createOutputStream(OutputStream out)方法在底层的数据流中对需要压缩格式写入在此之前尚未压缩的数据新建一个CompressionOutputStream对象。相反,对输入数据流中读取的数据进行解压缩的时候,则调用createInputStream(InputStream in)获取CompressionInputStream,通过该方法从底层数据流读取解压缩后的数据。这两类似于java.util.zip.DeflaterOutputStream和java.util.zip.DeflaterInputStream,只不过前两者能够重置底层的压缩解压缩方法,对于某些讲部分数据流压缩为数据块的应用,如SequenceFile,这个能力非常重要。 下边程序希望将CompressionCodec实现的完全合格名称作为第一个命令行参数。使用ReflectionUtils来构建一个新的codec实例,然后在System.out上包裹一个压缩方法。由此我们可以对IOUtils对象调用copyBytes()方法,从而将输入的数据复制到输出,输出由CompressionOutputStream对象压缩,最后调用finish()方法,要求压缩方法完成到压缩数据流的写操作,但不关闭这个数据流。
1 2 3 4 5 6 7 8 9 10 11 12 13 | public class StreamCompressor { public static void main(String[] args) throws Exception { String codecClassname = args[0]; Class codecClass = Class.forName(codecClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); CompressionOutputStream out = codec.createOutputStream(System.out); IOUtils.copyBytes(System.in, out, 4096, false); out.finish(); } } |
可以进行这个程序的测试:
% echo "Text" | hadoop StreamCompressor org.apache.hadoop.io.compress.GzipCodec | gunzip -Text
在读取一个压缩文件时,通常可以通过文件后缀推断需要使用哪个codec,通过使用getCodec()方法,CompressionCodecFactory提供了一个方法将文件扩展名映射到CompressionCodec,该方法取文件的Path对象作为参数,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | public class FileDecompressor { public static void main(String[] args) throws Exception { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path inputPath = new Path(uri); CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(inputPath); if (codec == null) { System.err.println("No codec found for " + uri); System.exit(1); } String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension()); InputStream in = null; OutputStream out = null; try { in = codec.createInputStream(fs.open(inputPath)); out = fs.create(new Path(outputUri)); IOUtils.copyBytes(in, out, conf); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); } } } |
一旦找到对应的codec,便取出文件后缀形成输出文件名,这是通过CompressionCodecFactory对象的静态方法removeSuffix()实现的。可以验证这个程序:
% hadoop FileDecompressor file.gz
另外,为了性能,使用原生类库来实现压缩和解压缩比高效很多。压缩代码库的实现如下:
压缩格式|Java实现|原生实现
DEFLATE | Yes | Yes
gzip | Yes | Yes
bzip2 | Yes | No
LZO | No | Yes
Hadoop本身包含为32位和64位linux构建的压缩代码库,位于lib/native目录。可以通过Java系统的java.library.path属性指定原生代码库。bin文件夹中的hadoop脚本可以帮助设置该属性,否则需要在程序中手动设置该属性。默认情况下Hadoop会根据自身平台搜索原生代码库,如果找到则自动加载。如果需要禁用原生代码库,讲属性hadoop.native.lib的值设置成false即可,这样可确保使用内置的java代码库。如果使用原生代码库可以考虑CodePool,它允许你反复使用压缩和解压缩,以分摊创建这些对象所涉及的开销。样例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | public class PooledStreamCompressor { public static void main(String[] args) throws Exception { String codecClassname = args[0]; Class codecClass = Class.forName(codecClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); Compressor compressor = null; try { compressor = CodecPool.getCompressor(codec); CompressionOutputStream out = codec.createOutputStream(System.out, compressor); IOUtils.copyBytes(System.in, out, 4096, false); out.finish(); } finally { CodecPool.returnCompressor(compressor); } } } |
如果不压缩存储1GB的文件到HDFS,默认数据块大小为64MB的话,那么会被存储在16个块中。如果是1GB的压缩文件,这样就行不通了,MapReduce会做正确的事情,就分割这个压缩文件只使用一个map来存储到一个datanode,但是如果这个压缩文件是可以分割的,就是有16个map(如果有这么多的话)来执行命令存储到多个datanode里。bzip2文件提供不同数据块之间的同步标识,因为支持切分。所以对于大文件来说不应该使用不支持分割的压缩格式,这样就失去了MapReduce的优势。
ps:书中这部分内容有些老,已经删除更改如下:
在MapReduce程序框架下的运用只需要进行简单的配置即可,配置命令如下:
1 2 3 4 5 | ... Configuration conf=new Configuration(); conf.setBoolean("mapreduce.map.output.compress", true); conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class); ... |
老版本的MR程序配置压缩在是JobConf里配置,上边是新版本的配置方法。BZip2Codec.class 是指定的压缩方法,CompressionCodec.class是接口类。压缩方法也可以指定为GZipCodec.class。也可以使用LZO,但是LZO代码库拥有GPL许可,没有在apache的hadoop发行版本里,需要单独从网上下载。
序列化(serialization)是指将结构化对象转化为字节流,以便在网络上传输或写到磁盘进行永久存储。反序列化是指将字节流转回结构化对象的逆过程。序列化在分布式数据处理的两大领域经常出现:进程间通信和永久存储。 在Hadoop中,系统中多个节点上进程间的通信是通过“远程过程调用”(remote procedure call ,RPC)实现的。RPC协议将消息序列化成二进制流发送到远程节点,远程节点接着将二进制流反序列化为原始消息。Hadoop使用自己的序列化格式Writable,Writable接口定义了两个方法,一个将其状态写到DataOutput二进制流,另一个从DataInput二进制流读取其状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | package org.apache.hadoop.io; import java.io.DataOutput; import java.io.DataInput; import java.io.IOException; public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataInput in) throws IOException; } //我们可以尝试使用: IntWritable writable = new IntWritable(); writable.set(163); /*这样就可以了,我们也可以使用构造函数创建:IntWritable writable = new IntWritable(163); *下边尝试一下逆序列化 */ public static byte[] deserialize(Writable writable, byte[] bytes) throws IOException { ByteArrayInputStream in = new ByteArrayInputStream(bytes); DataInputStream dataIn = new DataInputStream(in); writable.readFields(dataIn); dataIn.close(); return bytes; } //我们可以验证其结果: IntWritable newWritable = new IntWritable(); deserialize(newWritable, bytes); assertThat(newWritable.get(), is(163)); |
IntWritable实现了WritableComparable接口,该接口继承自Writable和java.lang.Comparable接口:
1 2 3 | package org.apache.hadoop.io; public interface WritableComparable extends Writable, Comparable { } |
对MapReduce来说,类型的比较是非常重要的,因为中间有个基于键的排序阶段。Hadoop提供的一个优化接口是继承自Java Comparator的RawComparator接口:
1 2 3 4 5 6 7 8 | package org.apache.hadoop.io; import java.util.Comparator; public interface RawComparator extends Comparator { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); } |
该接口允许其实现直接比较数据流中的记录,无需先把数据流反序列化为对象,这样便避免了新建对象的额外开销。 WritableComparator是对继承自WritableComparable类的RawComparator类的一个通用实现。它提供两个主要功能:第一,它提供了对原始compare()方法的一个默认实现,该方法能够反序列化将在流中进行比较的对象,并调用对象的compare()方法。第二,它充当的是RawComparator实力的工厂,例如:
1 2 3 4 5 6 7 8 9 | RawComparator comparator = WritableComparator.get(IntWritable.class); //这个comparator可以用于比较两个IntWritable对象: IntWritable w1 = new IntWritable(163); IntWritable w2 = new IntWritable(67); assertThat(comparator.compare(w1, w2), greaterThan(0)); //或其序列化表示: byte[] b1 = serialize(w1); byte[] b2 = serialize(w2); assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length),greaterThan(0)); |
Hadoop自带的org.apache.hadoop.io包中有广泛的Writable类可供选择,如图: 对这个图简单说明一下:
Java基本类型 | Writable实现 | 序列化字节数 |
boolean | BooleanWritable | 1 |
byte | ByteWritable | 1 |
int | IntWritable | 4 |
VIntWritable | 1–5 | |
float | FloatWritable | 4 |
long | LongWritable | 8 |
VLongWritable | 1–9 |
VIntWritable和VLongWritable是变长格式。定长格式非常适合对整个数值域空间中分布非常均匀的数值进行编码,如精心设计的哈希函数。变长格式对大多数分布不均匀的更节省空间。 Text类型是针对UTF-8序列的Writable类,一般可以认为它等价于java.lang.String的Writable。索引,由于编码问题,Text类和Java String类之间存在一定差别。对Text类的索引是根据编码后字节序列中的位置实现的,并非字符串中的Unicode字符,也不是Java Char的编码单元(如String)。对于ASCII字符串,这三个索引位置概念一直。索引的方法是charAt()。需要注意的是这个方法返回的是一个表示Unicode编码位置的int类型值,与String不同返回的是char类型值。Text还有一个find()方法,类似于String的indexOf()方法。一旦使用需要多个字节来编码的字符时,Text和String之间的区别就十分明显了。Text类Unicode字符的迭代比较复杂,因为不能简单的通过增加位置的索引值来实现,可以将Text对象转换为java.nio.ByteBuffer对象,然后利用缓冲区对Text对象反复调用bytesToCodePoint()静态方法,该方法能够获取下一代码的位置,并返回相应的int值,最后更新缓冲区中的位置。通过bytesToCodePoint()方法可以检测出字符串的末尾,并返回-1。Text是可变的,通过调用set()方法可以重置Text实例。重置的情况可能使getBytes()和getLength()方法得到的值不同,由此可知字节数组中多少字符有效。 BytesWritable是对二进制数据数组的封装,它的序列化格式为一个用于指定后面数据字节数的整数域(4字节),后跟字节本身。BytesWritable是可变的,同样通过set()方法。但也同样可能使getBytes()和getLength()方法得到的值不同。 NullWritable是Writable的一个特殊类型,它的序列化长度为0.它并不从数据流中读取数据,也不写入数据。它充当占位符。如在MapReduce中,如果不需要使用键或值,就可以使用它。它不可变。 ObjectWritable 是对Java基本类型(String,enum,Writable,null或这些类型组吃呢个的数组)的一个通用封装,它在Hadoop RPC中用于对方法的参数和返回类型进行封装和解封装。当一个字段中包含多个类型时,ObjectWritable是非常有用的,例如:如果SequenceFile中的值包含多个类型,就可以将值类型声明为ObjectWritable,并将每个类型封装在一个ObjectWritable中。作为一个通用机制,每次序列化都封装类型的名称,这非常浪费空间,如果封装的类型数量比较少并且能够提前知道,那么可以通过使用静态类型的数组,并使用对序列化后的类型的引用加入位置索引提高性能。这是 GenericWritable 类型采取的方法,并且你可以在继承子类中指定需要支持的类型。 在org.apache.hadoop.io包中,有4个Writable集合类:ArrayWritable,TwoDArrayWritable,MapWritable和SortedMapWritable。前两个是对Writable的数组和两维数组的实现,它们中所有元素必须是同一类的实例。后两个分别实现了java.util.Map和java.util/OsrtedMap。每个键/值字段使用的类型是相应字段序列化形成的一部分。类型存储为单个字节(充当类型数组的索引)。在org.apache.hadoop.io包中,数组经常与标准类型结合使用,而定制的Writable类型也通常结合使用,但对于非标准类型,则需要在包头中指明所使用的数组类型。根据实现,MapWritable类和SortedMapWritable类通过正byte值来指示定制的类型,所以在MapWritable和SortedMapWritable实例中最多可以使用127个不同的非标准Wirtable类。下边给出使用不同键值类型的MapWritable实例:
1 2 3 4 5 6 7 8 9 | MapWritable src = new MapWritable(); src.put(new IntWritable(1), new Text("cat")); src.put(new VIntWritable(2), new LongWritable(163)); MapWritable dest = new MapWritable(); WritableUtils.cloneInto(dest, src); assertThat((Text) dest.get(new IntWritable(1)), is(new Text("cat"))); assertThat((LongWritable) dest.get(new VIntWritable(2)), is(new LongWritable(163))); |
Hadoop有一套非常有用的Writable实现可以满足大部分需求,但在有些情况下,我们需要根据自己的需求构造一个新的实现。有了定制的Writable类型,就可以完全控制二进制表示和排序顺序。由于Writable是MapReduce数据路径的核心,所以调整二进制表示能对性能产生显著效果。Hadoop自带的Writable实现已经很好,但如果希望将结构调整得更好,往往需要自己建一个Writable类型,下边演示如果新建一个定制的Writable类型,我们需要写一个表示一对字符串的实现,名为TextPair:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | import org.apache.hadoop.io.*; public class TextPair implements WritableComparable { private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } @Override public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } @Override public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object o) { if (o instanceof TextPair) { TextPair tp = (TextPair) o; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public String toString() { return first + "\t" + second; } @Override public int compareTo(TextPair tp) { int cmp = first.compareTo(tp.first); if (cmp != 0) { return cmp; } return second.compareTo(tp.second); } } |
这段代码我们还能优化, 我们可以参考之前WritableComparable和Comparator那部分说的,我们可以在其序列化表示就比较两个TextPair对象。事实上TextPair是两个Text对象连接而成,而Text对象的二进制表示是一个长度可变的整数,包含字符串之UTF-8表示的字节数和UTF-8字节本身。由此得知第一个Text对象的字节表示有多长,然后长度传给Text对象的RawComparator方法,最后通过计算第一个字符串和第二个字符串恰当的偏移量,这样便可以实现对象的比较:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | public static class Comparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public Comparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); if (cmp != 0) { return cmp; } return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 - firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } } static { WritableComparator.define(TextPair.class, new Comparator()); } |
这里继承的WritableComparable类而非实现RawComparator接口,因为它提供了一些好用的方法和默认实现。也可以定制comparator,有必要参考org.apache.hadoop.io包中对Writable接口的实现。同时定制的comparator也应该继承自RawComparator。下边给出定制的RawComparator用于比较TextPair对象字节表示的第一个字段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | public static class Comparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public Comparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); if (cmp != 0) { return cmp; } return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 - firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } } static { WritableComparator.define(TextPair.class, new Comparator()); } |
尽管大多数MapReduce程序使用的都是Writable类型的键和值,但这并不是MapReduce API强制使用的。事实上可以使用任何类型,只要能有一种机制对每个类型进行类型与二进制表示的来回转换。为了支持这一机制,Hadoop有一个可以替换序列化框架的API。一个序列化框架用一个Serialization实现(org.apache.hadoop.io.serializer包)来表示。 另外还有序列化“接口定义语言”(Interface Description Language,IDL)。书中这里详细讲述了Avro,这里不再多数,需要了解的请看书。 基于文件的数据结构 对于某些应用而言,需要特殊的数据结构来存储自己的数据。对于基于MapReduce的数据处理,将每个二进制数据的大对象融入自己的文件中并不能实现很高的可扩展性,所以Hadoop开发了一组更高层次的容器。 考虑日志文件,其中每一条日志记录是一行文本。如果想记录二进制类型,纯文本是不合适的,这种情况下Hadoop的SequenceFile类非常合适,因为它提供了二进制键/值对的永久存储的数据结构。当作为日志文件的存储格式时,你可以自己选择键,比如由LongWritable类型表示时间戳,以及值可以是Writable类型,用于表示日志记录的数量。SequenceFiles同样也可以作为小文件的容器。而HDFS和MapReduce是针对大文件进行优化的,所以通过SequenceFile类型将小文件包装起来,可以获得更高效的存储和处理。 通过creatWriter()静态方法可以创建SequenceFile对象,并返回SequenceFile.Writer实例。该静态方法有多个重载版本,但都需要指定待写入的数据流(FSDataOutputStream或FileSystem或Path),Configuration对象,以及键和值的类型。另外可选参数包括压缩类型以及相应的codec,Progressable回调函数用于通知写入的进度,以及在SequenceFile头文件中存储的Metadata实例。存储在SequenceFile中的键和值并不一定需要Writable类型,任何可以通过Serialization类实现序列化和反序列化的类型均可以。一旦拥有SequenceFile.Writer实例,就可以通过append()方法在文件末尾附加键/值。写完后调用close():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | public class SequenceFileWriteDemo { private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" }; public static void main(String[] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); IntWritable key = new IntWritable(); Text value = new Text(); SequenceFile.Writer writer = null; try { writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass()); for (int i = 0; i < 100; i++) { key.set(100 - i); value.set(DATA[i % DATA.length]); System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value); writer.append(key, value); } } finally { IOUtils.closeStream(writer); } } } |
顺序文件中存储的键是从100到1降序排列的整数,表示为IntWritable对象,值为Text对象。将每条记录追加到SequenceFile.Writer实例末尾之前,我们需要使用getLength()方法来获取文件访问的当前位置。
从头到尾读取顺序文件的过程是创建SequenceFile.Reader实例后反复调用next()方法迭代读取记录的过程。读取的是哪条记录与你使用的序列化框架相关。如果你使用的是Writable类型,那么通过键和值作为参数的next()方法可以将数据流中的下一条键值对读入变量中:
public boolean next(Writable key, Writable val)
如果读取成功则返回true,如果以读到文件尾则返回false。如果读取非Writable类型的序列化框架,则需要使用:
public Object next( Object key ) throws IOException
public Object getCurrentValue(Object val) throws IOException
这种情况下请确保在io.serializations属性已经设置了你想使用的序列化框架。如果next()方法返回非空对象,则可以从数据流中读取键值对,并且可以通过getCurrentValue()方法读取该值。否则返回null表示到文件尾:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | public class SequenceFileReadDemo { public static void main(String[] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); SequenceFile.Reader reader = null; try { reader = new SequenceFile.Reader(fs, path, conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); long position = reader.getPosition(); while (reader.next(key, value)) { String syncSeen = reader.syncSeen() ? "*" : ""; System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value); position = reader.getPosition(); // beginning of next record } } finally { IOUtils.closeStream(reader); } } } |
在顺序文件中搜索给定位置有两种方法:seek()和同步点找到记录便捷。
通过命令行接口显示SequenceFile对象:hadoop fs -text ,可以以文本形式显示顺序文件的内容。该选项可以识别gzip压缩的文件和顺序文件,否则假设输入为纯文本文件。
MapReduce是对多个顺序文件进行排序(合并)最有效的方法。MapReduce本身具有并行执行能力,并且可由你指定reduce的数量。
顺序文件的格式是由文件头和随后的一条或多条记录组成。顺序文件的前三个字节为SEQ(顺序文件代码),紧随其后的是一个字节表示顺序文件的版本号。文件头还包括其他一些字段,包括键和值相应类的名称,数据压缩细节,用户定义的元数据,以及同步标识。同步标识主要用于读取文件的时候能够从任意位置开始识别记录便捷。每个文件有随机生成的同步标识,该标识内容存储在文件头中,同步标识位于顺序文件中的记录与记录之间。同步标识的额外存储开销要求小于1%,所以没有必要再每条记录末尾添加该标识。
记录的内部结构与是否启用压缩有关。如果启用,则与是记录压缩还是数据块压缩有关。如果没有启用压缩(默认情况),那么每条记录有记录长度(字节数),键长度,键和值组成。长度字段为4字节长的整数,并且需要遵循java.io.DataOutput类中writeInt()方法的协定。通过为数据写入顺序文件而定义的Serialization类,可以实现对键和值的序列化。
记录压缩的格式与无压缩情况相同,只不过值需要通过文件头中定义的压缩codec进行压缩。注意,键是不会压缩的。
MapFile是已经排序的SequenceFile,它已加入用于搜索键的索引。可以将MapFile视为java.util.Map的持久化形式。
MapFile写入类似于SequenceFile的写入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | public class MapFileWriteDemo { private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" }; public static void main(String[] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); IntWritable key = new IntWritable(); Text value = new Text(); MapFile.Writer writer = null; try { writer = new MapFile.Writer(conf, fs, uri, key.getClass(), value.getClass()); for (int i = 0; i < 1024; i++) { key.set(i + 1); value.set(DATA[i % DATA.length]); writer.append(key, value); } } finally { IOUtils.closeStream(writer); } } } |
执行以下这个程序:
% hadoop MapFileWriteDemo numbers.map
如果我们观察MapFile,我们会发现它实际上是一个其中包含data和index两个文件的文件夹:
% ls -l numbers.map
total 104
-rw-r--r-- 1 tom tom 47898 Jul 29 22:06 data
-rw-r--r-- 1 tom tom 251 Jul 29 22:06 index
两个文件都是SequenceFile,记录如下:
% hadoop fs -text numbers.map/data | head
1 One, two, buckle my shoe
2 Three, four, shut the door
3 Five, six, pick up sticks
4 Seven, eight, lay them straight
5 Nine, ten, a big fat hen
6 One, two, buckle my shoe
7 Three, four, shut the door
8 Five, six, pick up sticks
9 Seven, eight, lay them straight
10 Nine, ten, a big fat hen
#The index file contains a fraction of the keys, and contains a mapping from the key to
#that key’s offset in the data file:
% hadoop fs -text numbers.map/index
1 128
129 6079
257 12054
385 18030
513 24002
641 29976
769 35947
897 41922
遍历MapFile文件所有条目的过程类似SequenceFile中的过程:先建立MapFile.Reader实例,然后调用next()方法,知道返回值为false。通过调用get()方法可以随机访问文件中的数据。
在MapFile中搜索就相当于在索引和排过序的SequenceFile中搜索,所以自然想到将SequenceFile转换为MapFile。之前有讲过如何对SequenceFile排序,这里说一下对SequenceFile建索引:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | public class MapFileFixer { public static void main(String[] args) throws Exception { String mapUri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(mapUri), conf); Path map = new Path(mapUri); Path mapData = new Path(map, MapFile.DATA_FILE_NAME); // Get key and value types from data sequence file SequenceFile.Reader reader = new SequenceFile.Reader(fs, mapData, conf); Class keyClass = reader.getKeyClass(); Class valueClass = reader.getValueClass(); reader.close(); // Create the map file index file long entries = MapFile.fix(fs, map, keyClass, valueClass, false, conf); System.out.printf("Created MapFile %s with %d entries\n", map, entries); } } |
fix()方法通常用于重建已损坏的索引。
你的文章很不错啊,我正在学习hadoop,很少很经典的博客。admire!!!加油!!!
谢谢~一起共勉呀