利用HDFS的JavaAPI编程[一]

Tags: , , ,
Comments: 4 Comments
Published on: 2012 年 04 月 13 日

这篇主要介绍利用hdfs接口,使用java编程向hdfs写入数据。

一、模仿hadoop fs -put 和 -copyFromLoca命令,实现本地复制文件到hdfs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
 
public class test {
 
	public static void main(String[] args) throws Exception {
		String local = args[0];
		String hdfs = args[1];
		InputStream in = new BufferedInputStream(new FileInputStream(local));
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		OutputStream out = fs.create(new Path(hdfs), new Progressable() {
			public void progress() {
				System.out.print(".");
			}
		});
 
		IOUtils.copyBytes(in, out, 4096, true);
	}
}

FileSystem类有一系列创建文件的方法,详细看各个重载版本的create(),如允许我们制定是否需要强制覆盖、文件备份数量、写入文件所用缓冲区大小、文件块大小及文件权限等。该方法会自动创建不存在的父级目录。FileSystem对象中还有追加的方法FSDataOutputStream append(),需要配置hdfs的dfs.append.support属性值为ture,否则会提示:

Exception in thread "main" org.apache.hadoop.ipc.RemoteException: java.io.IOException: Append to hdfs not supported. Please refer to dfs.support.append configuration parameter.

另外apache官网的邮件列表文档里有一段话,说建议不要使用append,如下:

In short, appends in HDFS are extremely experimental and dangerous. Most would advise you to leave this disabled. Your best option for "append" like behavior is to rewrite the file with new content being added at the end. Append support was briefly introduced and then removed as a number of issues came up.

详尽的原因有一个append设计文档,可以去看看。

二、写一个与hadoop fs -getmerge相对应的一个简单程序: putmerge 。
我们知道,getmerge命令是从hdfs上获取大量文件组合成一个文件放到本地文件系统中的命令。但是hadoop没有提供与这一过程相逆的命令。不幸的是我们会在处理apache日志过程中常用到这样的一个命令,比如有很多按日期分的apache日志。

我们想传到hdfs中使用MepReduce来处理的话,我们只能用笨办法先本地合成大文件,然后上传这个大文件到hdfs,这种方法很低效。我们接下来给出一个程序,利用hdfs提供的JavaAPI来编写一个上传多个文件的过程中合成一个大文件的程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
 
public class putMerge {
	public static void main(String[] args) throws IOException {
		Configuration conf = new Configuration();
		FileSystem hdfs = FileSystem.get(conf);
		FileSystem local = FileSystem.getLocal(conf);
 
		Path inputDir = new Path(args[0]);
		Path hdfsFile = new Path(args[1]);
 
		try {
			FileStatus[] inputFiles = local.listStatus(inputDir);
			FSDataOutputStream out = hdfs.create(hdfsFile);
 
			for (int i = 0; i < inputFiles.length; i++) {
				System.out.println(inputFiles[i].getPath().getName());
				FSDataInputStream in = local.open(inputFiles[i].getPath());
				byte buffer[] = new byte[256];
				int bytesRead = 0;
				while ((bytesRead = in.read(buffer)) > 0) {
					out.write(buffer, 0, bytesRead);
				}
				in.close();
			}
			out.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
 
	}
}

一般为了方便,我们可以类似的给这个程序起个别名,为了连贯性,我这里把该程序打包放在$HADOOP_HOME/bin/putMerge.jar这个路径下,然后仿照这篇里边说的起别名规则,在/etc/profile里边添加一条新的别名:

alias hputm='hadoop jar $HADOOP_HOME/bin/putMerge.jar putMerge'

然后这个命令的使用方法就是:

hputm input(本地目录名) hdfsoutputfilename

三、有时候我们想合并hdfs中的文件,并存在hdfs里,又不想经过下载到local文件系统里这一过程,我们可以书写这样的程序,并且实现递归合并:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
 
public class filesmerge {
	public static boolean isRecur = false;
 
	public static void merge(Path inputDir, Path hdfsFile, FileSystem hdfs,FSDataOutputStream out) {
		try {
			FileStatus[] inputFiles = hdfs.listStatus(inputDir);
			for (int i = 0; i < inputFiles.length; i++) {
				if (!hdfs.isFile(inputFiles[i].getPath())) {
					if (isRecur){
						merge(inputFiles[i].getPath(), hdfsFile, hdfs,out);
						return ;
					}
					else {
						System.out.println(inputFiles[i].getPath().getName()
								+ "is not file and not allow recursion, skip!");
						continue;
					}
				}
				System.out.println(inputFiles[i].getPath().getName());
				FSDataInputStream in = hdfs.open(inputFiles[i].getPath());
				byte buffer[] = new byte[256];
				int bytesRead = 0;
				while ((bytesRead = in.read(buffer)) > 0) {
					out.write(buffer, 0, bytesRead);
				}
				in.close();
			}
			out.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
 
	public static void errorMessage(String str) {
		System.out.println("Error Message: " + str);
		System.exit(1);
	}
 
	public static void main(String[] args) throws IOException {
		if (args.length == 0)
			errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>");
		if (args[0].matches("^-[rR]$")) {
			isRecur = true;
		}
		if ((isRecur && args.length != 3) || ( !isRecur && args.length != 2)) {
			errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>");
		}
 
		Configuration conf = new Configuration();
		FileSystem hdfs = FileSystem.get(conf);
 
		Path inputDir;
		Path hdfsFile;
		if(isRecur){
			inputDir = new Path(args[1]);
			hdfsFile = new Path(args[2]);
		}
		else{
			inputDir = new Path(args[0]);
			hdfsFile = new Path(args[1]);
		}
 
		if (!hdfs.exists(inputDir)) {
			errorMessage("hdfsTargetDir not exist!");
		}
		if (hdfs.exists(hdfsFile)) {
			errorMessage("hdfsFileName exist!");
		}
 
		FSDataOutputStream out = hdfs.create(hdfsFile);
		merge(inputDir, hdfsFile, hdfs,out);
		System.exit(0);
	}
}

2012.4.13 ps:第四部分待续。。。
四、更不幸的是我们经常遇到的并非正常的文本文件,因为直接存储文本文件比较浪费空间,所以大部分服务器运维人员针对该类日志文件都是进行压缩打包存放的,所以我们有时候,或者说更多情况下需要的是对大量压缩包进行解压缩合并上传到hdfs的命令,为了方便我们同样只能自己搞生产了:

我猜你可能也喜欢:

4 Comments - Leave a comment
  1. 朱红之泪说道:

    貌似有个错误,模仿hadoop fs -put 和 -copyFromLoca命令,实现本地复制文件到hdfs里面的
    FileSystem fs = FileSystem.get(conf)好像错了应该改成FileSystem fs = FileSystem.get(URI.create(hdfs),conf),不然的话运行的时候在传入参数后读取hdfs文件系统时会出错,求解,我是新手

  2. JueFan_C说道:

    博主好~!
    我想请问一下,在第三部分中,你的代码实现了hdfs上数据移动的功能,虽然跳过了数据加载到本地的过程,但是在实际执行中,系统内部是否还是会占用带宽的?是不是在执行的时候还要会把hdfs上的数据放进本地的内存中进行运算?

    • 三江小渡说道:

      对的,只是节省掉整体下载再上传这样的过程,还是需要依靠本地内存进行处理的。
      当时学习的时候没考虑那么仔细,其实有JAVA的API可以直接rename的,这样就节省了带宽和本地的计算资源~

Leave a comment

电子邮件地址不会被公开。 必填项已用*标注

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>


Welcome , today is 星期二, 2017 年 10 月 24 日