Hadoop权威指南[一](Tom White)

Comments: 1 Comment
Published on: 2012 年 03 月 15 日

又开始啃书之旅了。。。看偶读书笔记前客官先给大爷笑一个呗?不笑?那我给你讲个笑话你再给大爷笑吧:一天小新的妈妈看到小新在哭,就问他你为什么哭啊,小新说:“因为刚才看到爸爸摔了一跤”,小新妈妈更奇怪了,问:“那你为什么哭呢?”,小新说:“因为当时我笑了”。。。

第一章:初始hadoop

这章总的来说介绍了现在数据增长的非常快,计算速度和IO瓶颈十分明显,而有不能快速的发明实用的超级计算机,所以分布式计算应运而生。hadoop就是这样一个并行计算的框架,提供了可靠的共享存储和分析系统。HDFS实现存储,MapReduce实现分析处理,这两个是hadoop的核心废话

然后介绍了MapReduce大概原理,几种并行计算的架构和hadoop的由来。

第二章:关于MapReduce

MapReduce任务过程分两个处理阶段:map阶段和reduce阶段。每个阶段都以键/值对作为输入和输出,并由程序员选择他们的类型。程序员还需要具体定义两个函数:map函数和reduce函数。

map阶段的输入是原始数据。比如一份年份和气温的数据,map函数只需要从原始数据里获取目标列,得到目标数据,然后reduce函数能继续处理这份数据。这个过程中会把数据按一定量分成一份份的小数据,然后处理各个小数据,最后合并出各个子结果得到的最终结果,这个流程也挺简单。主要就是数据的分发,处理,合并,最后存储。

这里要简单介绍一下新旧MapReduce的API编写的区别:
(1)新的API倾向于使用抽象类,而不是接口。新的API中Mapper和Reducer是抽象类。
(2)新的API在org.apache.hadoop.mapreduce包和子包中,旧版的API放在org.apache.hadoop.mapred中。在编程中一定要注意两个包不要混用或者用错,程序中要正确统一的的import进新包或者旧包。我在刚开始写代码的时候由于没有注意这一点,程序出现过错误,尤其是在刚建map或reduce类以及job的配置时。
(3)新的API中广泛使用context object,例如MapContext基本上充当这JobConf的OutputCollector和Reporter的角色。
(4)新的API同时支持“推”和“拉”式的迭代。
(5)新的API同一了配置。旧API使用JobConf对象进行作业配置,新API中作业配置通过Configuration来完成。
(6)新API中作业控制执行有Job类来负责,旧版使用JobClient。这也是写代码时要注意的地方。
了解这部分细节非常重要,因为作为一个新学习hadoop的程序猿来说,如果被两套不同的MapReduce搞混淆很久,会很影响编码心情和效率的,网上也有很多教程、笔记之类的东西,很多代码并不提示是什么版本的代码,这里就简单的分为两类,进版本的和旧版本的,不要被网上的很多代码搞混乱就好了。请详细记住上边六点。

第三章:hadoop分布式文件系统

全称是Hadoop Distributed File System,简称HDFS。它以流式数据访问模式来存储超大文件,运行于硬件集群上。HDFS的构思是一次写入、多次读取是最高效的访问模式。数据集通常由数据源生成或从数据源复制而来,接着长时间在此数据集上进行各类分析。每次分析都将涉及该数据集的大部分数据甚至全部,因此读取整个数据集的时间延迟比读取第一条记录的时间延迟更重要。因为hadoop并不需要运行在高可靠性的硬件上,所以设计时需要考虑故障硬件的处理,遇到故障时被设计成能够继续运行且不让用户察觉到明显的中断。

hadoop不适合要求低时间延迟数据访问的应用,它是为高数据吞吐量设计的,以高时间延迟为代价的。对于低延迟的访问需求,HBase是更好的选择。

由于namenode将文件系统的元数据组的信息存储在内存中,因此该文件系统所能存储的文件总数受限制与namenode的内存容量。根据经验,每个文件、目录和数据块的存储信息大约占150字节。因此,如果有一百万个文件,且每个文件占一个数据块,那至少需要300MB的内存。

HDFS中的文件可能只有一个writer,而且写操作总是将数据添加在文件的末尾。它不支持具有多个写入者的操作,也不支持在文件的任意位置进行修改。

类似磁盘的数据块,HDFS也有块的概念,但是比磁盘数据块大的多,默认是64MB。与单一磁盘上的文件系统相似,HDFS上的文件也被划分多块大小的多个分块,作为独立的存储单元。但与其他文件系统不同的是,HDFS中小于一个块大小的文件不会占据整个块空间。HDFS数据块这么大的原因是为了最小化寻址开销,比如寻址时间为10ms,传输速率100MB/s,为了使寻址时间占传输时间的1%,我们需要设置块大小为100M,默认是64MB,一般会设置成128MB。但是这个块也不能设置太大,因为MapReduce中的map任务通常一次处理一个块中的数据,如果任务太少(少于集群节点数量),作业的运行速度就会比较慢。

HDFS最明显的好处是可以存储大于任何一个节点磁盘容量的文件,因为它把文件块化存储在所有节点上。而且这样会大大提高IO效率。比如读取1TB的数据从一个磁盘上,读取效率为100M/s的话,大概需要2个半小时以上,如果有100块磁盘存储这1TB的数据,那么读取这些数据需要的时间不超过2分钟。不仅如此,这种储存也很适合数据备份,将每个块复制到少数几个独立机器上(默认3个),可确保发生意外数据的完整性。一个节点出现问题,集群也可以从这些独立机器上调取相应数据,使工作能够正常完成。

HDFS集群有两类节点,并以管理者-工作者模式运行,即一个namenode(管理者)和多个datanode(工作者)。namenode管理文件系统的命名空间。它维护着稳健系统树及整棵树内所有的文件和目录。这些信息以两个文件形式永久保存在本地磁盘上:命名空间镜像文件和编辑日志文件。namenode也记录着每个文件中各个块所在的数据节点信息,但它并不永久保存块的位置信息,因为这些信息会在系统启动时由数据节点重建。

客户端(client)代表用户通过与namenode和datanode交互来访问整个文件系统。datanode是文件系统的工作节点。他们根据需要存储并检索数据块(受客户端或namenode调度),并且定期向namenode发送他们所存储的块的列表。没有namenode文件系统将无法使用,事实上如果运行namenode服务的机器毁坏,文件系统上所有的文件将会丢失,因此namenode实现容错非常重要,Hadoop提供两种机制:第一种机制是备份那些组成文件系统元数据持久状态的文件。hadoop可以通过配置使namenode在多个文件系统上保存元数据的持久状态。这些写操作室实时同步的,是原子操作。一般的配置是,将持久状态写入本地磁盘的同时,写入一个远程挂载的网络文件系统(NFS)。另一种可行的方法是运行一个辅助namenode,但他不能被用作naemnode。这个辅助namenode的重要作用是定期通过编辑日志合并命名空间镜像,以防止编辑日志过大。这个辅助namenode一般在另一台单独的计算机上,因为它需要占用大量CPU时间和namenode相同容量的内存。它会保存合并后的命名空间镜像的副本,在namenode发生故障时启用。但是辅助namenode保存的状态时滞后主节点的,所以主节点全部失效时,难免会丢失部分数据。一般把存储在NFS上的namenode元数据复制到辅助namenode并作为新的主namenode运行。

下边利用伪分布模式的命令来认识HDFS。先来看伪分布模式下的core-site.xml,hdfs-site.xml,mapred-site.xml三个文件的配置。之前的一篇文章里有讲到伪分布模式这三个文件的配置,这里来说明一下:fs.default.name设置为hdfs://localhost/,用于设置Hadoop的默认文件系统。文件系统是由URI(Uniform Resource Identifier)指定的,这里使用hdfs URI来配置HDFS为默认Hadoop的文件系统。HDFS的守护程序将通过该属性来确定namenode的主机和端口。dfs.replication属性设置为1,这样HDFS就不会按默认将文件系统块副本设置为3。在单独一个namenode上运行时,HDFS就不会警告无法复制足够副本的警告了。

我们可以使用hadoop fs -help 来获取所有文件系统命令的帮助,例如创建目录,移动文件,删除数据,列出目录等等。先试一下:讲本地文件系统的一个文件复制到HDFS:
# hadoop fs -copyFromLocal test.txt hdfs://localhost/user/tom/test.txt
查看文件目录就是hadoop fs -ls path 了,非常类似shell命令,直接hadoop fs 可以查看到fs下的所有命令。
都输入这么长的命令比较麻烦,建议使用alias hls='hadoop fs -ls' 类似这样的把常用的命令都起个别名,能省去很多输入麻烦,如果长久的使用,建议这些alias添加到/etc/profile里边。然后再source /etc/profile一下。分享一下我的hadoop fs的所有别名:

View Code SHELL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
alias hls='hadoop fs -ls'
alias hlsr='hadoop fs -lsr'
alias hcp='hadoop fs -cp '
alias hmv='hadoop fs -mv'
alias hget='hadoop fs -get'
alias hput='hadoop fs -put'
alias hrm='hadoop fs -rm'
alias hmkdir='hadoop fs -mkdir'
alias hcat='hadoop fs -cat'
alias hrmr='hadoop fs -rmr'
alias hstat='hadoop fs -stat'
alias htest='hadoop fs -test'
alias htext='hadoop fs -text'
alias htouchz='hadoop fs -touchz'
alias hdu='hadoop fs -du'
alias hdus='hadoop fs -dus'
alias hchmod='hadoop fs -chmod'
alias hchgrp='hadoop fs -chgrp'
alias hchown='hadoop fs -chown'
alias htail='hadoop fs -tail'
alias hcount='hadoop fs -count'
alias hcpfl='hadoop fs -copyFromLocal'
alias hmvfl='hadoop fs -moveFromLocal'
alias hcptl='hadoop fs -copyToLocal'
alias hmvtl='hadoop fs -moveToLocal'
alias hgetm='hadoop fs -getmerge'
alias hsetrep='hadoop fs -setrep'
alias hhelp='hadoop fs -help'

我们以后的命令尽量都用我们的别名了,不再写那么长的命令了。

现在我们就能把文件从hdfs里边下下来,看看是否文件完整:

# hcptl hdfs://localhost/user/tom/test.txt test.copy.txt
# md5 test.txt test.copy.txt

如果两个md5值一样,说明两个文件是一直的,虽然有极小概率出错。不了解md5的可以看这篇文章

我们来看一眼hdfs上边的所有路径:

View Code SHELL
1
2
3
4
5
#hlsr /
-rw-r--r--   1 hz_tony supergroup         40 2012-03-14 18:26 /test.txt
drwxr-xr-x   - hz_tony supergroup          0 2012-03-14 13:41 /tmp
drwxr-xr-x   - hz_tony supergroup          0 2012-03-14 13:41 /tmp/hadoop-hz_tony
...

结果很类似ls -l ,有些许差别,但都应该可以看懂的。需要说明的是第2列是文件爱你的备份数,因为我们设置的是1,所以这里的tset.txt备份数是1。另外这里没有使用副本的概念,目录作为元数据保存在namenode中,而非datanode中。这里大家看到有文件所有者和文件所在群组的属性,所以也能猜出hdfs中文件也是有访问权限问题的。类似linux,也提供三类权限模式:r、w、x。

Hadoop有一个抽象的文件系统概念,HDFS只是其中的一个实现。Hadoop对文件系统提供了许多接口,它一般使用URI方案来选取合适的文件系统实例进行交互。比如之前的单机模式时,使用的是本地文件系统,同样可以使用Hadoop提供的命令来查看:

# hlsr file:///

这样就会显示出所有本地磁盘的内容。尽管运行的MapReduce程序可以访问任何文件系统,但在处理大数据集时,你仍然需要选择一个具体数据本地优化的分布式文件系统,如HDFS或KFS。

文件系统| URI 方案|  Java实现的包(all under org.apache.hadoop)
Local | file | fs.LocalFileSystem
HDFS | hdfs | hdfs.DistributedFileSystem
HFTP | hftp | hdfs.HftpFileSystem
HSFTP | hsftp | hdfs.HsftpFileSystem
HAR | har | fs.HarFileSystem
KFS (Cloud-Store) | kfs | fs.kfs.KosmosFileSystem
FTP |  ftp | fs.ftp.FTPFileSystem
S3 (native)  | s3n | fs.s3native.NativeS3FileSystem
S3 (block-based) | s3 | fs.s3.S3FileSystem

Hadoop是用java写的,通过Java API可以调用所有Hadoop文件系统的交互操作。但是其他非Java应用程序访问Hadoop文件系统会比较麻烦。thriftfs定制功能模块中的Thrift API通过把Hadoop文件系统包装一个Apache Thrift服务来弥补这个不足,从而使任何具有Thrift绑定的语言都能轻松的与Hadoop文件系统进行交互。为了使用Thrift API,需要运行提供Thrift服务的Java服务器,并以代理的方式访问Hadoop文件系统。应用程序访问Thrift服务时,实际上两者是运行在同一台机器上的。

Thrift API包含有许多为其他语言实现远程过程调用的接口,如C++、Perl,PHP,Python及Ruby等。Thrift支持不同版本的Hadoop,因此我们可以通过同一个客户端代码访问不同版本的Hadoop文件系统。关于安装盒使用Thrift,参阅Hadoop发行版本下的src/contrib/thriftfs目录的文档。另外Hadoop提供了一个名为libhdfs的C语言库,该语言库是Java FileSystem接口类的一个镜像,它可以使用Java原生接口调用Java文件系统客户端。此外还有:用户空间文件系统,允许把按照用户空间实现的文件系统整合成一个Unix文件系统;WebDAV,它扩展了HTTP,并支持文件编辑和文件更新;还有HTTP和FTP接口,这些都能同HDFS进行交互。

一些java程序调用Hadoop的实例:

1、从Hadoop中读取数据的样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class URLCat {
  static {
    URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
  }
    public static void main(String[] args) throws Exception {
    InputStream in = null;
    try {
      in = new URL(args[0]).openStream();
      IOUtils.copyBytes(in, System.out, 4096, false);
    } finally {
      IOUtils.closeStream(in);
    }
  }
}

从Hadoop文件系统中读取文件,最简单的方法就是使用java.net.URL对象打开数据流,进而从中读取数据。这里是通过FsUrlStreamHandlerFactory实例调用URL中的setURLStreamHandlerFactory方法。由于Java虚拟机只能调用一次上述方法,因此通常在静态方法中。这个限制意味着如果程序的其他组件已经声明了一个URLStreamHandlerFactory实例,你将无法再使用这个方法从Hadoop中读取数据。然后调用Hadoop中间接的IOUtils类,并在finally子句中关闭数据流。copyBytes方法的最后两个参数,第一个用于设置复制的缓冲区大小,第二个用于设置复制结束后是否关闭数据流。

1
2
3
4
5
% hadoop URLCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

上边说道,通过Hadoop URL获取数据会有问题,不能多次调用URLStreamHandlerFactory类实例,这种情况下使用FileSystem API来打开一个文件的输入流。Hadoop文件系统中通过Hadoop Path对象来代表文件(而非java.io.File对象,因为它的语义与本地文件系统联系太紧密)。你可以将一条路径视为Hadoop文件系统URI,如hdfs://localhost/user/tom/quangle.txt 。
FileSystem是一个通用的文件系统API,所以第一步是检索我们需要使用的文件系统实例,这里是HDFS,获取FileSystem实例有两种静态方法:

public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf) throws IOException

Configuration对象封装了客户端或服务器的配置,通过设置配置文件读取类路径来实现,如conf/core-site.xml。第一个方法返回的是默认文件系统(在conf/core-site.xml中指定的,没指定则默认的)。第二个方法通过给定的URI方案和权限来确定要使用的文件系统,如果给定URI中没有指定方案,则返回默认文件系统。有了FileSystem实例后,调用open()函数来获取文件的输入流:

public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException

第一个方法使用默认的缓冲区大小4KB。通过上述方法可以实现读取文件系统内的文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class FileSystemCat {
  public static void main(String[] args) throws Exception {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    InputStream in = null;
    try {
      in = fs.open(new Path(uri));
      IOUtils.copyBytes(in, System.out, 4096, false);
    } finally {
      IOUtils.closeStream(in);
    }
  }
}

The program runs as follows:

1
2
3
4
5
% hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

这里可以使用java.io.InputStream中的skip()和seek()方法来对文件流的位置进行定位,in.seek()可以移动到文件中任意的一个绝对位置,in.skip()则只能相对于当前位置定位到一个新位置。这样可以通过重新定位对输出进行控制。open()返回的是FSDataInputStream对象,这个类继承了java.io.DataInputStream接口的一个特殊类,他也实现了PositionedReadable接口,从一个指定偏移量读取文件的一部分:

1
2
3
4
5
public interface PositionedReadable {
  public int read(long position, byte[] buffer, int offset, int length) throws IOException;
  public void readFully(long position, byte[] buffer, int offset, int length) throws IOException;
  public void readFully(long position, byte[] buffer) throws IOException;
}

read()方法从文件的制定position处读取至多length字节的数据并存入buffer指定的偏移量offset处。返回值是实际获取字节数。readFully()方法指定length长度的字节数据读取到buffer中,如果没读取够则抛出EOFException异常。另外需要记住的是seek()方法是一个相对开销高的操作,慎用。

2、向Hadoop中写入数据的样例:
同样的可以使用FileSystem类中的方法,最简单的是给准备创建的文件指定一个Path对象,然后返回一个用于写入数据的输出流:

public FSDataOutputStream create(Path f) throws IOException

上述方法有多个重载版本,允许我们制定是否需要强制覆盖已有文件、文件备份数量、写入文件时所用缓冲区大小、文件块大小以及文件权限。create()方法能够为需要写入且当前不存在的文件创建父目录。如果你不希望有这样的事情发生,则应该先调用exists()方法检查一下。
还有一个重载方法Progressable,用于传递回调接口,如此一来,可以把数据写入数据节点的进度通知到你的应用:

1
2
3
4
package org.apache.hadoop.util;
public interface Progressable {
  public void progress();
}

另一种新建文件的方法是使用append()方法,在一个已有文件末尾追加数据(还有其他重载版本):

public FSDataOutputStream append(Path f) throws IOException

该追加操作允许一个writer打开文件后边访问该文件的最后偏移量处追加数据。该追加操作室可选的,并非所有Hadoop文件系统都支持,例如HDFS支持,但S3文件系统就不支持。

下边给出一个将本地文件复制到Hadoop文件系统的样例。每次Hadoop调用progress()方法时,也就是每次将64KB数据包写入datanode管线后,打印一个时间点来显示整个运行过程,这个操作并非通过API实现,因此后续版本是否支持尚未可知:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class FileCopyWithProgress {
  public static void main(String[] args) throws Exception {
    String localSrc = args[0];
    String dst = args[1];
 
    InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
 
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(dst), conf);
    OutputStream out = fs.create(new Path(dst), new Progressable() {
      public void progress() {
        System.out.print(".");
      }
    });
 
    IOUtils.copyBytes(in, out, 4096, true);
  }
}

使用该程序:

% hadoop FileCopyWithProgress input/docs/1400-8.txt hdfs://localhost/user/tom/1400-8.txt
...............

FileSystem实例的create()方法返回FSDataOutputStream对象,与FSDataInputStream类相似,也有一个查询文件当前位置的方法:getPos()。但不同的是FSDataOutputStream类不允许在文件中定位。这是因为HDFS只允许对一个以打开的文件顺序写入,不允许定位插入。

3、Hadoop中创建目录和查询目录状态的样例:
FileSystem提供了创建目录的方法:
public boolean mkdirs(Path f) throws IOException
这个方法可以一次性建立路径上所有必要的目录,通常不需要用,因为调用cream()方法写入文件时会自动创建路径。

文件系统的目录结构浏览和相关文件或目录的信息查询,需要用到FileStatus类,封装了文件系统中文件和目录的元数据,包括文件长度、块大小、备份、修改时间、所有者以及权限信息。FileSystem的getFileStatus()方法用于获取文件或目录的FileStatus对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class ShowFileStatusTest {
 
  private MiniDFSCluster cluster; // use an in-process HDFS cluster for testing
  private FileSystem fs;
  @Before
  public void setUp() throws IOException {
    Configuration conf = new Configuration();
    if (System.getProperty("test.build.data") == null) {
      System.setProperty("test.build.data", "/tmp");
    }
    cluster = new MiniDFSCluster(conf, 1, true, null);
    fs = cluster.getFileSystem();
    OutputStream out = fs.create(new Path("/dir/file"));
    out.write("content".getBytes("UTF-8"));
    out.close();
  }
 
  @After
  public void tearDown() throws IOException {
    if (fs != null) { fs.close(); }
    if (cluster != null) { cluster.shutdown(); }
  }
 
  @Test(expected = FileNotFoundException.class)
  public void throwsFileNotFoundForNonExistentFile() throws IOException {
    fs.getFileStatus(new Path("no-such-file"));
  }
 
  @Test
  public void fileStatusForFile() throws IOException {
    Path file = new Path("/dir/file");
    FileStatus stat = fs.getFileStatus(file);
    assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));
    assertThat(stat.isDir(), is(false));
    assertThat(stat.getLen(), is(7L));
    assertThat(stat.getModificationTime(),
        is(lessThanOrEqualTo(System.currentTimeMillis())));
    assertThat(stat.getReplication(), is((short) 1));
    assertThat(stat.getBlockSize(), is(64 * 1024 * 1024L));
    assertThat(stat.getOwner(), is("tom"));
    assertThat(stat.getGroup(), is("supergroup"));
    assertThat(stat.getPermission().toString(), is("rw-r--r--"));
  }
 
  @Test
  public void fileStatusForDirectory() throws IOException {
    Path dir = new Path("/dir");
    FileStatus stat = fs.getFileStatus(dir);
    assertThat(stat.getPath().toUri().getPath(), is("/dir"));
    assertThat(stat.isDir(), is(true));
    assertThat(stat.getLen(), is(0L));
    assertThat(stat.getModificationTime(),
        is(lessThanOrEqualTo(System.currentTimeMillis())));
    assertThat(stat.getReplication(), is((short) 0));
    assertThat(stat.getBlockSize(), is(0L));
    assertThat(stat.getOwner(), is("tom"));
    assertThat(stat.getGroup(), is("supergroup"));
    assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));
  }
 
}

如果不是获取文件或目录信息,只是检查是否存在,还是使用exists()更方便。要列出目录的内容,就是用FileSystem的listStatus()方法:

1
2
3
4
public FileStatus[] listStatus(Path f) throws IOException
public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException
public FileStatus[] listStatus(Path[] files) throws IOException
public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException

下边给出一个现实Hadoop文件系统中一组路径的信息的样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ListStatus {
  public static void main(String[] args) throws Exception {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
 
    Path[] paths = new Path[args.length];
    for (int i = 0; i < paths.length; i++) {
      paths[i] = new Path(args[i]);
    }
        FileStatus[] status = fs.listStatus(paths);
    Path[] listedPaths = FileUtil.stat2Paths(status);
    for (Path p : listedPaths) {
      System.out.println(p);
    }
  }
}

执行该程序如下:

1
2
3
4
5
% hadoop ListStatus hdfs://localhost/ hdfs://localhost/user/tom
hdfs://localhost/user
hdfs://localhost/user/tom/books
hdfs://localhost/user/tom/quangle.txt
......

有的时候我们可能处理一批匹配某种规则的文件,Hadoop提供了两个FileSystem方法:

1
2
public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException

globStatus()方法返回与路径相匹配的所有文件的FileStatus对象数组,并按路径排序。PathFilter命令作为可选项可进一步对匹配进行限制。匹配的通配符有\c 、* 、? 、 [^0-9a-z] 、 {a,b}这些,类似正则,不多解释了。要说一下{a,b},它的含义跟正则不一样,表示匹配a或者b。

有时候通配符匹配不能满足我们的选择要求,FileSystem中的listStatus()和globalStatus()方法提供了可选的PathFilter对象,类似C++STL中的比较器的函数对象,写一个函数返回个bool值来确定是否是我们需要的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//package org.apache.hadoop.fs;
//public interface PathFilter {
//  boolean accept(Path path);
//}
public class RegexExcludePathFilter implements PathFilter {
 
  private final String regex;
  public RegexExcludePathFilter(String regex) {
    this.regex = regex;
  }
  public boolean accept(Path path) {
    return !path.toString().matches(regex);
  }
}

4、Hadoop中删除操作的样例:
FileSystem的delete()方法可以永久性删除文件或目录:

public boolean delete(Path f, boolean recursive) throws IOException

如果f是一个文件或空目录,那么recursive的值就会被忽略,只有recrusive值为true时,才递归删除。

5、HDFS集群之间的数据传输:

# hadoop distcp hdfs://namenode1/foo hdfs://namenode2/bar

即把第一个集群/foo目录里所有内容复制到第二个集群的/bar。如果/bar不存在,则创建路径。也可以指定多个源路径,并把所有的路径都复制到目标路径下,注意源路径必须是绝对路径。默认情况下,distcp会跳过目标路径下已经存在的文件,但可以通过-overwrite选项覆盖现有文件,也可以通过-update选项来选择仅更新修改过的文件。另外如果两个hdfs系统版本不同,会导致作业失败,因为两个版本的RPC系统是不兼容的。想要弥补这种情况,可以使用基于只读HTTP协议的HFTP文件系统并从源文件系统中读取数据。这个作业必须运行在目标集群上:

# hadoop distcp hftp://namenode1:9000/foo hdfs://namenode2/bar

注意URI源中的端口号,这是由dfs.http.address属性决定的。

6、HDFS文件系统内的文档存档:

# hadoop archive -archiveName files.har /my/files /my

类似压缩命令,即把/my/files下的文件打包,存储在/my文件夹下,命名为files.har,这个存档文件以har后缀命名是必须的。可以使用hadoop fs -ls 命令来查看har的打包文件。你会发现har打包文件里会存储两个索引文件以及部分文件的集合。这些部分文件中包含已经链接在一起的大量原始文件的内容,并且我们通过索引可以找到包含在文档文件中的部分文件,它的起始点和长度。所有这些细节对于使用har URI方案与HAR文件交互的应用都是隐式的,并且HAR文件系统式建立在基础文件系统上的,这里是HDFS。下边命令以递归方式列出存档文件中的部分文件:

# hadoop fs -lsr har:///my/files.har

如果想在其他文件系统中引用HAR文件,需要使用特别的URI路径格式:

# hadoop fs -lsr har:///my/files.har/my/files/dir
# hadoop fs -lsr har://hdfs-localhost:9000/my/files.har/my/files/dir

第二个格式,仍以har方案表示一个HAR文件系统,但是由hdfs指定基础文件系统方案的权限,后边加上一个横杠和主机名端口号。

想要删除存档文件需要递归格式进行删除,因为对于基础文件系统来说,HAR文件是一个目录:

# hadoop fs -rmr /my/files.har

另外需要知道的是创建了存档文件(同原始文件内容,源文件可删除),是不能修改的,若想从中增加或删除文件,必须重新创建文档。HAR文件也可以作为MapReduce的输入,InputFormat类是不知道文档已经存档的,尽管该类可以将多个文件打包成一个MapReduce分片,所以即使在HAR文件中处理许多小文件,也仍然是低效的。

我猜你可能也喜欢:

1 Comment - Leave a comment
  1. tracymkgld说道:

    你好:你在“从Hadoop中读取数据的样例”中提到“open()返回的是FSDataInputStream对象”,其中具体的操作是根据读取文件的偏移量及长度,由namenode返回对应的几个块,及这几个块没有损坏的副本所在的datanode的列表,即一个LocatedBlocks信息。那我想问的是看起来程序直接从这个流里调用IOUtils.copyBytes方法就读到了数据,那从各个datanode读取数据的过程是哪里实现的?PositionedReadable接口的read方法好像实现了这个功能,但是这个read是哪里调用的?能帮我贴出代码吗?最近看的有点懵。谢谢!

Leave a comment

电子邮件地址不会被公开。 必填项已用*标注

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>


Welcome , today is 星期日, 2017 年 11 月 19 日