Hadoop Streaming程序基础

Comments: No Comments
Published on: 2012 年 05 月 14 日

可以当做是所有程序员入hadoop门的一个非常简单好用的hadoop的一个编程组件。因为可以用几乎任何程序来书写hadoop程序。可以是shell,可以是php、java、python、c++等等。超简单的一个使用shell统计单词个数的程序如下:

1
2
3
4
5
$ HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper /bin/cat \
    -reducer /bin/wc

为了方便你可以自由的发挥,书写自己想要的程序,你需要先了解一点Hadoop程序的MapReduce编程的一些基本原理。
mapreduce
MapReduce程序一般分两个过程,一个是map过程,一个是reduce过程。map过程一次读取一行数据,然后进行map逻辑过程,输出一个Key/Value键值对,默认以\t作为分隔符。然后map输出的数据会以Key值进行排序,然后进行洗牌操作,根据Key的不同进行散列,散列到相应的reduce过程。reduce接收键值对进行reduce逻辑处理,然后进行输出。这样就完成了一个MapReduce程序流程。详细看上图。

Hadoop安装程序包里提供了contrib/streaming程序包,这个程序包能使更多不同语言更简单方便的执行这样的一个MapReduce过程,只要你的程序map按行读取数据,按键值对输出数据,reduce程序再处理键值对数据完成输出即可。

直接运行这个streaming的jar程序包,可以查看各个参数选项,类似如下:

Parameter Optional/Required Description
-input directoryname or filename Required Input location for mapper
-output directoryname Required Output location for reducer
-mapper executable or JavaClassName Required Mapper executable
-reducer executable or JavaClassName Required Reducer executable
-file filename Optional Make the mapper, reducer, or combiner executable available locally on the compute nodes
-inputformat JavaClassName Optional Class you supply should return key/value pairs of Text class. If not specified, TextInputFormat is used as the default
-outputformat JavaClassName Optional Class you supply should take key/value pairs of Text class. If not specified, TextOutputformat is used as the default
-partitioner JavaClassName Optional Class that determines which reduce a key is sent to
-combiner streamingCommand or JavaClassName Optional Combiner executable for map output
-cmdenv name=value Optional Pass environment variable to streaming commands
-inputreader Optional For backwards-compatibility: specifies a record reader class (instead of an input format class)
-verbose Optional Verbose output
-lazyOutput Optional Create output lazily. For example, if the output format is based on FileOutputFormat, the output file is created only on the first call to output.collect (or Context.write)
-numReduceTasks Optional Specify the number of reducers
-mapdebug Optional Script to call when map task fails
-reducedebug Optional Script to call when reduce task fails

像最开始的那个程序,是最简单的,一般一个过程都需要有输入输出,map和reduce程序的指定即可。如果提交的非shell程序,比如php、python、c++等程序,需要使用-file选项,来让Hadoop框架分发你的程序给各个作业的机器。比如一个php程序的执行:

1
2
3
4
5
6
$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper 'php myPythonScript.php' \
    -reducer /bin/wc \
    -file myPythonScript.php

这里有一点还是要提一下,stream目前还不支持管道命令,即类似-mapper 'cat * | sort | uniq ' 这样的一个管道过程。如果使用了,会出现"java.io.IOException: Broken pipe"错误。
同时提供的还有一些通用的选项,如下:

Parameter Optional/Required Description
-conf configuration_file Optional Specify an application configuration file
-D property=value Optional Use value for given property
-fs host:port or local Optional Specify a namenode
-jt host:port or local Optional Specify a job tracker
-files Optional Specify comma-separated files to be copied to the Map/Reduce cluster
-libjars Optional Specify comma-separated jar files to include in the classpath
-archives Optional Specify comma-separated archives to be unarchived on the compute machines

一般我们会常用的-D这个选项,它能够设置一些MapReduce程序在执行过程中的环境参数,比如有时候你不需要reduce过程,那么就设置一下不要reduce作业即可:

-D mapred.reduce.tasks=0

你还可能需要指定分隔符和第一个字段是key值:

-D stream.map.output.field.separator=.
-D stream.num.map.output.key.fields=4

类似的还有"stream.map.input.field.separator" 和 "stream.reduce.input.field.separator"。
还有可能你需要指定几个字段来作为key值:

-D mapred.text.key.partitioner.options=-k1,2

这个命令的意思就是让第一个和第二个字段作为key的值。还有一些属性:

stream.map.output.field.separator :设置map输出中key和value的分隔符
stream.num.map.output.key.fields : 设置map程序分隔符的位置,该位置之前的部分作为key,之后的部分作为value
map.output.key.field.separator : 设置map输出中key内部的分割符
num.key.fields.for.partition : 指定分桶时,key按照分隔符切割后,其中用于分桶key所占的列数(配合-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 使用)
stream.reduce.output.field.separator:设置reduce输出中key和value的分隔符
stream.num.reduce.output.key.fields:设置reduce程序分隔符的位置

你还可能需要指定多个大输入数据,可以使用通用选项里的-files来指定,用逗号分隔路径:

-files hdfs://host:fs_port/user/testfile1.txt,hdfs://host:fs_port/user/testfile2.txt

详细的各种参数设置可以查看之前一篇《hadoop实战》里提到的mapred-default,里边介绍有默认设置,当然你这里就可以进行自己的设置了。

进行到这里,我们就可以写自己的MapReduce程序了,还拿Hadoop提供的wordcount样例程序来看看:
Php的单词统计程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//map程序:mapclass.php
 $value=1;
 while(($row=fgets(STDIN))){
        $arr=explode(" ",trim($row));
        foreach($arr as $key){
            echo $key."\t".$value."\n";
                }   
    } 
//reduce程序:reducer.php
   $sum=0;
   $first=0;
   while(($row=fgets(STDIN))){
    $arr=explode("\t",trim($row));
    $key=$arr[0];
    if($first==0){
        $first=1;
        $lastkey=$key;
    }   
    if($key!=$lastkey){
        echo $lastkey."\t".$sum."\n";
        $sum=1;
        $lastkey=$key;
        }   
    else{
        $sum +=intval($arr[1]);
        }   
    }   
    echo $key."\t".$sum."\n";

Shell的单词统计程序:

1
2
3
4
5
$ HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper /bin/cat \
    -reducer /bin/wc

C++的单词统计程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//mapper
#include <stdio.h>
#include <string>
#include <iostream>
using namespace std;
 
int main(){
        string key;
        string value = "1";
        while(cin>>key){
                cout<<key<<"\t"<<value<<endl;
        }
        return 0;
}
//------------------------------------------------------------------------------------------------------------
//reducer
#include <string>
#include <map>
#include <iostream>
#include <iterator>
using namespace std;
int main(){
        string key;
        string value;
        map<string, int> word2count;
        map<string, int>::iterator it;
        while(cin>>key){
                cin>>value;
                it = word2count.find(key);
                if(it != word2count.end()){
                        (it->second)++;
                }
                else{
                        word2count.insert(make_pair(key, 1));
                }
        }
 
        for(it = word2count.begin(); it != word2count.end(); ++it){
                cout<<it->first<<"\t"<<it->second<<endl;
        }
        return 0;
}

Python的单词统计程序:

View Code PYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#!/usr/bin/env python
 
import sys
 
# maps words to their counts
word2count = {}
 
# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words while removing any empty strings
    words = filter(lambda word: word, line.split())
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)
#---------------------------------------------------------------------------------------------------------
#!/usr/bin/env python
 
from operator import itemgetter
import sys
 
# maps words to their counts
word2count = {}
 
# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
 
    # parse the input we got from mapper.py
    word, count = line.split()
    # convert count (currently a string) to int
    try:
        count = int(count)
        word2count[word] = word2count.get(word, 0) + count
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        pass
 
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
 
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
    print '%s\t%s'% (word, count)

下边举例使用方法,就以php程序的执行为例:

1
2
3
4
5
6
$ HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper 'php mapclass.php' \
    -reducer 'php reduce.php' \
    -file 'mapclass.php'

一般需要注意的是,执行的其他脚本程序一定要加上解释器的名字,默认的是执行shell脚本。另外包保证php、python等程序的正确性,一些错误的总结这篇里有一些说明。

另外还有一些已经实现的好用的类来让我们使用。
如果想对一些数据进行排序,可以类似这样:

1
2
3
4
5
6
7
8
9
10
11
$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
    -D stream.map.output.field.separator=. \
    -D stream.num.map.output.key.fields=4 \
    -D map.output.key.field.separator=. \
    -D mapred.text.key.comparator.options=-k2,2nr \
    -D mapred.reduce.tasks=12 \
    -input myInputDirs \
    -output myOutputDir \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -reducer org.apache.hadoop.mapred.lib.IdentityReducer

这里使用的就是提供的KeyFieldBasedComparator的比较器。它提供一个Unix/GNU的排序。这里的排序是对2,2这个字段进行数字(n)的逆序(r)排序输出类似如下:

11.14.2.3
11.14.2.2
11.12.1.2
11.12.1.1
11.11.4.1

Hadoop还提供了Aggregate的程序包。使用Aggregate包非常简单,使用的时候只需要把-reducer参数设置为 aggregate即可。但是map程序的输出格式有要求,格式如下:

function:key\tvalue

即Key和Value值之间以\t分隔。function是需要执行的行为,有如下行为:

DoubleValueSum 一个double值序列的求和
LongValueMax 求一个long值序列的最大值
LongValueMin 求一个long值序列的最小值
LongValueSum 求一个long值序列的和
StringValueMax 求一个string值序列的字母序最大值
StringValueMin 求一个string值序列的字母序最小值
UniqValueCount 为每个键求单一值的个数
ValueHistogram 求每个值的个数、最小值、中值、最大值、平均值和标准方差

这里提一下,如果function是ValueHistogram的话,输出格式必须是:

ValueHistogram:key\tvalue\tcount

这里给出样例,也是hadoop实战里给的样例:

View Code PYTHON
1
2
3
4
5
6
7
8
#!/usr/bin/env python
# Name: AttributeCount.py
import sys
 
index=int(sys.argv[1])
for line in sys.stdin:
    fields=line.split(",")
    print "LongValueSum:"+fields[index]+"\t"+"1"

使用命令:

$ hadoop jar contrib/streaming/hadoop-*-streaming.jar -input apat63_99.txt -output output
-file AttributeCount.py -mapper 'AttributeCount.py 1' -reducer aggregate

样例数据apat63_99.txt如下:

3858241,956203
3858241,1324234
3858241,3398406
3858241,3557384
3858241,3634889

Hadoop还提供了org.apache.hadoop.mapred.lib.FieldSelectionMapReduce包,能够完成类似Unix命令cut的功能,使用如下,不再详解。

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-D map.output.key.field.separa=. \
-D mapred.text.key.partitioner.options=-k1,2 \
-D mapred.data.field.separator=. \
-D map.output.key.value.fields.spec=6,5,1-3:0- \
-D reduce.output.key.value.fields.spec=0-2:5- \
-D mapred.reduce.tasks=12 \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce \
-reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

"-D map.output.key.value.fields.spec=6,5,1-3:0-"选项指定了map输出的key/value值,key包含6,5,1,2,3,冒号分隔,value包含了0和之后所有字段。
"-D reduce.output.key.value.fields.spec=0-2:5-"选项指定了reduce的输出,key值包括0,1,2(这里相当于map的6,5,1),value值包含5和之后的所有内容。

开始提到stream的参数-cmdenv选项,这个选项可以以环境变量的形式传递参数给程序,类似这样:

-cmdenv test=1

程序里从main函数里的第三个参数来获取参入的环境变量:

int main(int argc, char *argv[], char *env[]){
int i, test;
for (i = 0; env[i] != NULL; i++)
if(strncmp(env[i], "test=", 6) == 0)
test=atoi(env[i]+6);
……

个人原创,转载请注明:三江小渡

我猜你可能也喜欢:

No Comments - Leave a comment

Leave a comment

电子邮件地址不会被公开。 必填项已用*标注

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>


Welcome , today is 星期六, 2017 年 10 月 21 日