利用HDFS的JavaAPI编程[二]

Tags: , , ,
Comments: No Comments
Published on: 2012 年 05 月 23 日

这一篇主要介绍了一些java程序读取hdfs数据的方法。

一、先模仿hadoop fs -cat命令来写一个简单的读取HDFS中文件数据的小程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import java.io.InputStream;
import java.net.URL;
 
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
 
public class URLcat{
	static {
		URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
	}
 
	public static void main(String[] args) throws Exception {
		InputStream in = null;
		try {
			in = new URL(args[0]).openStream();
			IOUtils.copyBytes(in, System.out, 4086, false);
		} finally {
			IOUtils.closeStream(in);
		}
	}
}

使用的时候的命令是:

hadoop jar URLcat.jar URLcat hdfs://localhost:9000/user/hadoop/test.txt
hadoop jar URLcat.jar URLcat file:///usr/home/hadoop/test.txt

可以读取hdfs里的,当然也可以读取本地文件系统里的内容,不过不能指定读取多个文件,可以对程序进行循环读取的修改。还有一个弊端是Java虚拟机只能调用URL中的setURLStreamHandlerFactory方法一次,所以其他java程序不能再使用该方法读取数据了。改进程序看第二例。

二、改进hdfs数据读取程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.io.InputStream;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
 
public class HDFScat{
 
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		InputStream in = null;
		try {
			in = fs.open(new Path(args[0]));
			IOUtils.copyBytes(in, System.out, 4096, false);
		} finally {
			IOUtils.closeStream(in);
		}
 
	}
}

使用命令如下:

hadoop jar HDFScat.jar HDFScat test.txt

这里的命令不用再输入URI,是因为Configuration实例化的时候读取的配置信息,指定的配置的文件系统。所以这时候也不能再重新指定其他的文件系统,如本地的 file:/// 这样的参数。

三、实际上FileSystem对象的open方法返回的是FSDataInputStream对象,不是标准的java.io类,它继承了java.io.DataInputStream接口,并支持随机访问,因此可从数据流任意处读取数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
 
import org.apache.hadoop.io.IOUtils;
 
public class doubleCat{
 
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		FSDataInputStream in = null;
		try {
			in = fs.open(new Path(args[0]));
			IOUtils.copyBytes(in, System.out, 4096, false);
			in.seek(0);
			IOUtils.copyBytes(in, System.out, 4096, false);
		} finally {
			IOUtils.closeStream(in);
		}
 
	}
}

使用命令同上例,使用FSDataInputStream对象的seek方法可以重新定位数据流的输出位置。这里FSDataInputStream也实现了PositionedReadable接口,实现了方法read(position, buffer, offset, length)等方法。这里的read方法是从position位置开始至多读取length个字符存入缓冲区buffer的指定偏移量offset处。这里需注意seek是高开销的方法。

我猜你可能也喜欢:

No Comments - Leave a comment

Leave a comment

电子邮件地址不会被公开。 必填项已用*标注

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>


Welcome , today is 星期六, 2017 年 10 月 21 日