Hadoop权威指南[三](Tom White)

Comments: 4 Comments
Published on: 2012 年 03 月 19 日

3.21ps:发这篇博客的时候同学通知我昨天晚上有事发生,我搜了搜,被吓到了。。。真的吓的不轻。。。帝~&都让我害怕。。。赶紧听听梁静茹《为你我受冷风吹》冷静冷静。。

第五章:MapReduce应用开发

第二章有介绍过一点,用MapReduce来编写程序,有一个特定的流程。首先写map函数和reduce函数,最好使用单元测试来确保函数的运行符合预期。然后写一个驱动程序来运行作业,要看这个驱动程序是否可以运行,可以从本地IDE用一个小的数据集来运行它。如果运行不正确,就用本地IDE调试器来找出问题根源。通过这些调试,改进mapper或reducer。

一旦程序如期通过小的数据集测试,就可以准备运行在集群上。这时候可能暴漏更多问题,也可以像之前一样调试修复。在集群上调试程序很具有挑战性,但Hadoop提供了一些辅助工具,例如IsolationRunner,该工具允许在失败的相同输入数据上(必要时用附带的调试器)来运行任务。

程序正确运行后,可能需要一些优化调整,首先执行一些标准检查,借此加快MapReduce程序的运行,然后做一些任务剖析(task profiling)。分布式程序的分析并不简单,Hadoop提供了钩子(hook)来辅助这个分析过程。

在真正开始开发前,先学习如何配置Hadoop。

Hadoop中,组件的配置是通过Hadoop提供的API来进行的。一个Configuration类的实例(可以在org.apache.hadoop.conf包中找到)代表配置属性及其取值的一个集合。每个属性由一个String来命名,而直类型可以是多种类型之一,包括Java基本类型和其他有用的类型(如String、Class、java.io.File和String集合)。Configuration从XML文件中读取其属性值。样例程序:

1
2
3
4
5
6
7
8
9
10
import org.apache.hadoop.conf.*;
public class configself{
    public static void main(String[] args){
        System.out.println("hello world!");
        Configuration conf = new Configuration();
        conf.addResource("/usr/hadoop-0.20.2/conf/configuration.xml");
        System.out.println(conf.get("color"));
        System.out.println(conf.getInt("size",0));
    }   
}

XML里没有的属性会被认为是默认值。这里算是我的第一个hadoop程序,所以输出了helloworld,如果这里你使用命令行或者别的什么进行编译,遇到任何问题的话请看之前的一篇hadoop配置文章的最下面,有一些错误解释。

这里遇到多个configuration.xml的问题,如果添加多个源xml,hadoop不会进行覆盖,而是所有的属性值都有,但是如果遇到相同的属性值,后载入的会覆盖之前载入的。很多程序貌似都有这样的规则。另外配置属性可以用其他属性或系统属性进行定义。并且系统属性的优先级高于源文件中定义的属性。

事实上在开发Hadoop应用时,经常需要在本地运行和集群运行之间切换,有时候还包括伪分布式上。应对这些变化的一种方法就是使用Hadoop配置文件包含每个集群的链接设置,并且指定运行Hadoop应用或工具时使用哪一个链接设置。最好的做法是把这些文件放在Hadoop安装目录之外,以便轻松地在Hadoop不同版本之间进行切换,从而避免重复或丢失配置信息。

说明一下conf目录里的3个配置文件:
hadoop-local.xml文件包含Hadoop默认文件系统和jabtracker的默认配置信息:

<?xml version="1.0"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>file:///</value>
</property>

<property>
<name>mapred.job.tracker</name>
<value>local</value>
</property>

</configuration>

hadoop-loalhost.xml文件里设置指向本地主机上运行的namenode和jobtracker:

<?xml version="1.0"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost/</value>
</property>

<property>
<name>mapred.job.tracker</name>
<value>localhost:8021</value>
</property>

</configuration>

hadoop-cluster.xml文件包含集群内namenode和jobtracker的详细信息,事实上我们会以集群的名称来命名这个文件名,而不是用cluster泛指:

<?xml version="1.0"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://namenode/</value>
</property>

<property>
<name>mapred.job.tracker</name>
<value>jobtracker:8021</value>
</property>

</configuration>

还可以根据需要为这些文件添加其他配置信息,有了这些设置,便可以轻松使用-conf命令行开关来使用各种配置:
$ hadoop fs -conf conf/hadoop-localhost.xml -ls ./
如果省略掉-conf选项,可以从conf子目录下的$HADOOP_INSTALL中找到Hadoop的配置信息。至于独立模式还是伪分布或集群模式,取决于具体的设置。Hadoop自带的工具支持-conf选项,也可以直接使用程序通过使用Tool接口来支持-conf选项。

为了简化命令行方式运行作业,Hadoop自带了一些辅助类。GenericOptionsParser是一个类,用来解释常用的Hadoop命令行选项,并根据需要,为Configuration对象设置相应的取值。通常不直接使用GenericOptionsParser,更方便的方式是:实现Tool接口,通过ToolRunner来运行应用程序,ToolRunner内部调用GenericOptionsParser:

1
2
3
public interface Tool extends Configurable {
  int run(String [] args) throws Exception;
}

下边给出非常简单的Tool的实现,用来打印Tool的Configuration对象所有属性的键值对:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ConfigurationPrinter extends Configured implements Tool {
 
  static {
    Configuration.addDefaultResource("hdfs-default.xml");
    Configuration.addDefaultResource("hdfs-site.xml");
    Configuration.addDefaultResource("mapred-default.xml");
    Configuration.addDefaultResource("mapred-site.xml");
  }
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    for (Entry entry: conf) {
      System.out.printf("%s=%s\n", entry.getKey(), entry.getValue());
    }
    return 0;
  }
 
  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new ConfigurationPrinter(), args);
    System.exit(exitCode);
  }
}

我们把ConfigurationPrinter作为Configured的一个子类,Configured是Configurable接口的一个实现。Tool的所有实现都需要实现Configurable(因为Tool继承于Configurable),Configured子类通常是一种最简单的实现方式。run()方法通过Configurable的getconf()方法获取Configuration,然后重复执行,输出属性。静态代码部分用来获取HDFS和MapReduce配置和核心配置。ConfigurationPrinter的main()方法没有直接调用自身的run()方法,而是调用ToolRunner的静态run()方法,该方法负责在调用run()方法之前,为Tool建立一个Configuration对象。ToolRunner还是用了GenericOptionsParser来获取在命令行方式中指定的任何标准选项,然后,在Configuration实例上进行设置,运行如下代码可看到conf/hadoop-localhost.xml中设置的属性:

$ hadoop ConfigurationPrinter -conf conf/hadoop-localhost.xml | grep mapred.job.tracker=mapred.job.tracker=localhost:8021

在环境中设置什么属性,一个有用的工具便是ConfigurationPrinter。也可以在Hadoop安装路径的docs目录中,查看所有公共属性的默认设置,相关文件包括:coredefault.html, hdfs-default.html和 mapred-default.html这几个,每个属性都有用来解释属性作用和取值范围的描述。需要注意的是在客户端配置某些属性将不会产生影响。

GenericOptionsParser也允许设置个别属性。例如:

$ hadoop ConfigurationPrinter -D color=yellow | grep color

-D选项用于将键color的配置属性值设置为yellow。-D优先级高于配置文件。下边给出GenericOptionsParser选项和ToolRunner选项的列表:

选项名称 描述
-D property=value 将指定值赋值给确定的Hadoop配置属性。覆盖配置文件里的默认属性或站点属性,或通过-conf选项设置的任何属性
-conf filename ... 将指定文件添加到配置的资源列表中。这是设置站点属性或同时设置一组属性的简便方法
-fs uri 用指定的URI设置默认文件系统。这是-D fs.default.name=uri的快捷方式
-jt host:port 用指定主机和端口设置jobtracker。这是-D mapred.job.tracker= host:port的快捷方式
-filesfile1,file2,... 从本地文件系统(或任何指定模式的文件系统)中复制指定文件到jobtracker所用的共享文件系统(通常是HDFS),确保在任务工作目录的MapReduce程序可以访问这些文件(要想进一步了解如何复制文件到tasktracker机器的分布式缓存机制)
-archivesarchive1,archive2,.. 从本地文件系统(或任何指定模式的文件系统)复制指定存档到jobtracker所用的共享文件系统(通常是HDFS),打开存档文件,确保任务工作目录的MapReduce程序可以访问这些存档
-libjars jar1,jar2,… 从本地文件系统(或任何指定模式的文件系统)复制指定JAR文件到被jobtracker 使用的共享文件系统
(通常是HDFS),把它们加入MapReduce任务的类路径中。这个选项适用于传输作业需要的JAR文件

编写单元测试
在MapReduce中,map和reduce函数的独立测试非常方便,这是由函数风格决定的。针对已知输入,得到已知输出。然后由于输出写到一个OutputCollector,而不是通过简单的方法调用进行返回,所以OutputCollector需要用一个mock进行替换,以便验证输出的正确性。有几个Java mock对象框架可以用来建立模拟。书中主要讲的是Mockito。这里先给一个mapper样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import static org.mockito.Mockito.*;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.OutputCollector;
import org.junit.*;
public class MaxTemperatureMapperTest {
  @Test
  public void processesValidRecord() throws IOException {
    MaxTemperatureMapper mapper = new MaxTemperatureMapper();
 
    Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
                                  // Year ^^^^
        "99999V0203201N00261220001CN9999999N9-00111+99999999999");
                              // Temperature ^^^^^
    OutputCollector<Text, IntWritable> output = mock(OutputCollector.class);
    mapper.map(null, value, output, null);
    verify(output).collect(new Text("1950"), new IntWritable(-11));
  }
}

输入数据调用Mockito的mock()方法来传递一个想要模拟的类。然后调用mapper的map()方法来执行测试代码。最后使用verify()来验证mock对象已调用了正确的方法和参数。

上例创建了一个能够通过测试的Mapper实现。由于本章要进行类的扩展,所以每个类被放在包含版本信息的不同包中,例如v1.MaxTemperatureMapper是MaxTemperatureMapper的第一个版本,当然不重新打包实际上也可以对类进行扩展:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MaxTemperatureMapper extends MapReduceBase
  implements Mapper<LongWritable, Text, Text, IntWritable> {
 
  public void map(LongWritable key, Text value,
      OutputCollector<Text, IntWritable> output, Reporter reporter)
      throws IOException {
 
    String line = value.toString();
    String year = line.substring(15, 19);
    int airTemperature = Integer.parseInt(line.substring(87, 92));
    output.collect(new Text(year), new IntWritable(airTemperature));
  }
}

这是一个非常简单的实现,从行中抽出年份和气温,在OutputCollector中输出。现在增加一个缺失值的测试,该值在原始数据中表示气温+9999:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void ignoresMissingTemperatureRecord() throws IOException {
  MaxTemperatureMapper mapper = new MaxTemperatureMapper();
 
  Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
                                // Year ^^^^
      "99999V0203201N00261220001CN9999999N9+99991+99999999999");
                            // Temperature ^^^^^
  OutputCollector<Text, IntWritable> output = mock(OutputCollector.class);
  mapper.map(null, value, output, null);
 
  verify(output, never()).collect(any(Text.class), any(IntWritable.class));
}

由于确实的气温已经被过滤掉,所以在这个测试中,Mockito用来验证OutputCollector的collect()方法没有任何Text键或IntWritable值调用。由于parseInt()不能解析带加号的整数,所以测试最后抛出NumberFormatException异常,修改实现版本2来处理缺失值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void map(LongWritable key, Text value,
    OutputCollector<Text, IntWritable> output, Reporter reporter)
    throws IOException {
 
  String line = value.toString();
  String year = line.substring(15, 19);
  String temp = line.substring(87, 92);
  if (!missing(temp)) {
      int airTemperature = Integer.parseInt(temp);
      output.collect(new Text(year), new IntWritable(airTemperature));
  }
}
 
private boolean missing(String temp) {
  return temp.equals("+9999");
}

这个测试通过后,我们接下来写reducer的测试。reducer必须找出指定键的最大值,先写一个测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void returnsMaximumIntegerInValues() throws IOException {
  MaxTemperatureReducer reducer = new MaxTemperatureReducer();
 
  Text key = new Text("1950");
  Iterator<IntWritable> values = Arrays.asList(
      new IntWritable(10), new IntWritable(5)).iterator();
  OutputCollector<Text, IntWritable> output = mock(OutputCollector.class);
 
  reducer.reduce(key, values, output, null);
 
  verify(output).collect(key, new IntWritable(10));
}

我们对一些IntWritable值构建一个迭代器来验证MaxTemperatureReducer能找到最大值:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MaxTemperatureReducer extends MapReduceBase
  implements Reducer<Text, IntWritable, Text, IntWritable> {
  public void reduce(Text key, Iterator<IntWritable> values,
      OutputCollector<Text, IntWritable> output, Reporter reporter)
      throws IOException {
 
    int maxValue = Integer.MIN_VALUE;
    while (values.hasNext()) {
      maxValue = Math.max(maxValue, values.next().get());
    }
    output.collect(key, new IntWritable(maxValue));
  }
}

现在mapper和reducer已经能够在可控的输入上进行工作了,下一步是写一个作业驱动程序,然后在开发机器上使用测试数据运行它:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class MaxTemperatureDriver extends Configured implements Tool {
  @Override
  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.printf("Usage: %s [generic options] <input> <output>\n",
          getClass().getSimpleName());
      ToolRunner.printGenericCommandUsage(System.err);
      return -1;
    }
 
    JobConf conf = new JobConf(getConf(), getClass());
    conf.setJobName("Max temperature");
    FileInputFormat.addInputPath(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));
 
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(IntWritable.class);
    conf.setMapperClass(MaxTemperatureMapper.class);
    conf.setCombinerClass(MaxTemperatureReducer.class);
    conf.setReducerClass(MaxTemperatureReducer.class);
    JobClient.runJob(conf);
    return 0;
  }
 
  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
    System.exit(exitCode);
  }
}

MaxTemperatureDriver实现了Tool接口,所以,我们能够设置GenericOptionsParser支持的选项。在开始JobConf所描述的作业前,run()方法创建和配置一个Jobconf对象。在所有可能的作业配置参数中,可以设置输入和输出文件路径,mapper、reducer和combiner,以及输出类型(输入类型由输入格式决定,默认为TextInputFormat,包括Lang Writable键和Text值)。为作业设置一个名称也是很好的做法,这样可以在执行过程中或作业完成后方便地从作业列表中查找作业。默认情况下,作业名称是JAR文件,通常情况下没有特殊的描述。

现在我们可以在一些本地文件上运行这个应用。Hadoop有一个本地作业运行器(job runner),它是在MapReduce执行引擎运行单个JVM上的MapReduce作业的简化版本。它是为测试而设计的,在IDE中使用起来非常方便,因为我们可以在调试器中单步运行mapper和reducer代码。job runner只能运行在本地,并且不支持DistributedCache特性。它通过一个配置设置来激活,正常情况下,mapred.job.tracker是一个主机:端口(host:port),用来设置jobtracker的地址,但它是一个特殊的local值时,作业就在不访问外部jobtracker的情况下运行。
执行以下这个驱动程序:

$ hadoop v2.MaxTemperatureDriver -conf conf/hadoop-local.xml input/ncdc/micro max-temp

类似地,可以使用GenericOptionsParser提供的-fs和-jt选项:

$ hadoop v2.MaxTemperatureDriver -fs file:/// -jt local input/ncdc/micro max-temp

这条指令使用本地input/ncdc/micro目录的输入来执行MaxTemperatureDriver,产生的输出存放在本地max-temp目录中。虽然我们设置了-fs,可以使用本地文件系统(file:///),但本地作业运行器实际上可以在包括HDFS在内的任何文件系统上正常工作。

这个程序会有以下异常:

java.lang.NumberFormatException: For input string: "+0000"

这个异常表明map方法仍然不能解析带正号的气温。我们可以给出一个解析器来封装解析逻辑,来解析NCDC格式的气温记录:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class NcdcRecordParser {
 
  private static final int MISSING_TEMPERATURE = 9999;
 
  private String year;
  private int airTemperature;
  private String quality;
 
  public void parse(String record) {
    year = record.substring(15, 19);
    String airTemperatureString;
    // Remove leading plus sign as parseInt doesn't like them
    if (record.charAt(87) == '+') { 
      airTemperatureString = record.substring(88, 92);
    } else {
      airTemperatureString = record.substring(87, 92);
    }
    airTemperature = Integer.parseInt(airTemperatureString);
    quality = record.substring(92, 93);
  }
 
  public void parse(Text record) {
    parse(record.toString());
  }
  public boolean isValidTemperature() {
    return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]");
  }
 
  public String getYear() {
    return year;
  }
  public int getAirTemperature() {
    return airTemperature;
  }
}

最终的mapper相当简单,只调用解析类的parser()方法,它用isValidTemperature()方法检查是否合法气温,如果是就用解析类的getter方法获取年份和气温数据。也可以在这个方法中过滤非法或错误数据。创建解析类的另外一个好处是,相似作业的mapper不需要重新写代码。也提供了一个机会直接针对解析类编写测试单元,用于更多目标测试。下边给出mapper使用utility类来解析的样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class MaxTemperatureMapper extends MapReduceBase
  implements Mapper<LongWritable, Text, Text, IntWritable> {
 
private NcdcRecordParser parser = new NcdcRecordParser();
 
  public void map(LongWritable key, Text value,
      OutputCollector<Text, IntWritable> output, Reporter reporter)
      throws IOException {
 
parser.parse(value);
    if (parser.isValidTemperature()) {
      output.collect(new Text(parser.getYear()),
          new IntWritable(parser.getAirTemperature()));
    }
  }
}

现在大功告成,算是完成了这个应用。以上程序的组织情况大致给你捋一下:MaxTemperatureDriver类中run方法配置各种信息,包括自己实现的map和reduce两个类,这里类名是MaxTemperatureMapper和MaxTemperatureReducer。其中NcdcRecordParser是MaxTemperatureMapper用来过滤合法数据的,MaxTemperatureMapper实现map方法来完成数据的读取,总体就是这样了,你可以再回头看一遍,思路很清晰的话就不用了。

我们现在来测试驱动程序。灵活的配置选项可以使用应用程序实现Tool,还可以插入任意的Configuration来增加可测试性。可以利用这点来编写测试程序,他将利用本地作业运行器在已经输入数据上运行作业,借此来检查输出是否满足预期。实现这个目标有两个方法:第一种方法是使用本地作业运行器,在本地文件系统的测试文件上运行作业,这种思路的样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  @Test
  public void test() throws Exception {
    JobConf conf = new JobConf();
    conf.set("fs.default.name", "file:///");
    conf.set("mapred.job.tracker", "local");
 
    Path input = new Path("input/ncdc/micro");
    Path output = new Path("output");
 
    FileSystem fs = FileSystem.getLocal(conf);
    fs.delete(output, true); // delete old output
 
    MaxTemperatureDriver driver = new MaxTemperatureDriver();
    driver.setConf(conf);
 
    int exitCode = driver.run(new String[] {
        input.toString(), output.toString() });
    assertThat(exitCode, is(0));
 
    checkOutput(conf, output);
  }

测试代码明确设置fs.default.name和mapred.job.tracker,所以使用的是本地文件系统和本地作业运行器。随后通过Tool接口在少数已知数据上运行MaxTemperatureDriver。最后checkOutput()方法被调用以逐行对比实际输出与预期输出。

测试驱动程序第二种方法就是使用一个mini集群来运行它。Hadoop有一对测试类,名为MiniDFSCluster和MiniMRCluster,它以程序方式创建正在运行的集群。不用于本地作业运行器,他们不允许在整个HDFS和MapReduce机器上运行测试。mini集群上的tasktracker启动不同的JVM来运行任何,会增大调试难度。

至此程序测试完成,可以在Hadoop集群上运行了,这里先用伪分布集群。

首先是先把程序打包成jar文件发给集群。使用Ant可以简化这个过程,使用如下的任务:

如果每个JAR文件都有一个作业,可以指定manifest中执行要运行的main类。如果main类不在manifest中,则必须在命令行指定。任何非独立的JAR文件应该打包到JAR文件的lib子目录中。这与Java Web application archive或WAR文件类似,只不过JAR文件是放在WEB-INF/lib子目录下WAR文件中的。

然后我们需要运行驱动程序来启动作业,使用-conf选项来指定想要运行的作业集群(同样可以使用-fs和-jt):

$ hadoop jar job.jar v3.MaxTemperatureDriver -conf conf/hadoop-cluster.xml input/ncdc/all max-temp

JobClient的runJob()方法启动作业并检查进程,有任何变化就输出一行map和reduce进度结果。输出信息很多,在作业之前打印作业ID,作业完成后统计信息被打印,有利于确认作业是否完成。

Hadoop的Web界面用来浏览作业信息,对于跟踪作业运行进度、查找作业完成后的统计信息和日志非常有用可以在http://jobtracker-host:***/找到用户界面信息。里边很多信息,详细情况请另外找文档或者看书。这里只简单说一下hadoop的web页面地址:

  • http://localhost:50030 可以查看 JobTracker 的运行状态
  • http://localhost:50060 可以查看 TaskTracker 的运行状态
  • http://localhost:50070 可以查看 NameNode 以及整个分布式文件系统的状态,浏览分布式文件系统中的文件以及 log 等。

一旦作业完成,每个reduce产生一个输出文件,因此,max-temp目录中会有很多个命名part-****的文件,文件比较小的话很容易使用hadoop fs命令中 -getmerge选项合并成一个单独文件放到本地文件系统中。

作业调试。最经典方法是打印输出的方式,在hadoop同样适用,然而需要考虑复杂的情况:程序运行在很多节点的集群上。为了处理这种情况,我们要查找一个特殊情况,我们用一个调试语句记录到一个标准错误中,它将发送一个信息来更新任务的状态信息以提示我们查看错误日志,Web UI简化了这个操作。

我们还要创建一个自定义的计数器来统计不合理记录总数,记录这些信息而不是简单丢掉,写入日志,这往往对我们是有帮助的。如果调试期间产生的日志数据规模比较大,可以将这些信息写到map的输出流供reduce分析和汇总,这种方法通常必须改变程序结构;另外一种是编写一个程序(也是MapReduce程序)来分析作业产生的日志。我们把调试加入mapper(版本4),相对于reducer,因为我们希望找到导致这些异常输出的数据源:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MaxTemperatureMapper extends MapReduceBase
  implements Mapper<LongWritable, Text, Text, IntWritable> {
  enum Temperature {
    OVER_100
  }
 
  private NcdcRecordParser parser = new NcdcRecordParser();
  public void map(LongWritable key, Text value,
      OutputCollector<Text, IntWritable> output, Reporter reporter)
      throws IOException {
 
    parser.parse(value);
    if (parser.isValidTemperature()) {
      int airTemperature = parser.getAirTemperature();
      if (airTemperature > 1000) {
        System.err.println("Temperature over 100 degrees for input: " + value);
        reporter.setStatus("Detected possibly corrupt record: see logs.");
        reporter.incrCounter(Temperature.OVER_100, 1);
      }
      output.collect(new Text(parser.getYear()), new IntWritable(airTemperature));
    }
  }
}

如果数据有问题,就输出到标准错误流,同时使用Reporter的setStatus()方法来更新map中的状态信息,引导我们查看日志。另外还增加了一个计数器OVER_100,来统计这些记录数。

然后可以在任务详细信息页面web上查看,如task attempt的运行节点和指向任务日志文件、计数器的链接。Actions列包括终止task attempt的链接。默认情况下这个功能是禁止的,Web用户界面是只读接口。将webinterface.private.actions设置成true即可启动该链接。跟踪日志可发现我们的错误输出信息。作业完成后我们也可以查看我们定义的计数器的值:

% hadoop job -counter job_200904110811_0003 'v4.MaxTemperatureMapper$Temperature' OVER_100

-counter选项的输入参数包括作业ID,计数器的组名(一般是类名)和计数器名称(enum名)。

这里提一下,针对不同用户Hadoop在不同地方生成日志,一会列个表总结。MapReduce任务日志可以从Web界面访问,非常便捷。也可以从正在进行task attempt,每个日志文件累加成为整个JVM运行日志,所以多个task attempt存放在一个日志中。这些日志文件的写操作很直接。任何到标准输出或标准错误流的写操作都直接写到相关日志文件。这排除Streaming方式下,标准输出被用于map或reduce的输出,不会出现在日志文件。在Java中,如果用Apache Commons Logging API,就会写入任务的系统日志文件中(syslog file)。这里实际的日志记录由log4j来做:相关的log4j附加文件称为TLA(Task Log Appender),在Hadoop配置目录下的log4j.properties文件中。有一些控制用于管理任务的日志的大小和记录保留时间。通过mapred.userlog.retain.hours属性、mapred.userlog.limit.kb属性设置。

Hadoop日志:

  • 系统守护日志:每个Hadoop守护进程产生一个日志文件(使用log4j)和另一个文件(合并错误输出和错误)。这些文件分别写入HADOOP_LOG_DIR环境变量目录里。受众管理员。
  • HDFS审计日志:记录所有HDFS请求,默认关闭状态,可以进行配置,一般写入namenode的日志。受众管理员。
  • MapReduce作业历史日志:记录运行期间发生的事件,集中保存在jobtracker上的_logs/history子目录中的作业输出目录中。受众用户
  • MapReduce任务日志:每个tasktracker子进程都用log4j产生一个日志文件,一个保存发到标准输出数据的文件,一个保存标准错误的文件。这些文件写入HADOOP_LOG_DIR环境变量定义的目录的userlogs子目录里受众用户。

当一个任务失败并且没有足够多的记录信息来诊断错误时,可以选择用调试器运行该任务。在集群上运行任务时,很难使用调试器,因为不知道哪个节点处理哪部份输入,所以不能在错误发生之前安装调试器。然而可以设置运行作业的属性来让Hadoop保留作业在运行期间产生的所有中间值。这些数据可以用来独立地在调试器上重新运行那些出错的任务。

实现这一调试方法,先将配置属性keep.failed.task.files的值设置为true,以便在任务失败时,tasktracker能保留足够多的信息让任务在相同的输入数据上重新运行。然后再次运行作业,并使用Web UI查看故障节点和task attempt ID。接着需要用前边保留的文件作为输入,运行一个特殊的作业运行器,即IsolationRunner。登录到故障节点,找到那个task attempt的目录。它可能是在本地MapReduce目录下的某一个目录,由mapred.local.dir属性设置。类似如下:

mapred.local.dir/taskTracker/jobcache/job-ID/task-attempt-ID

里边有多个文件和子目录,job.xml文件包含task attempt期间生效的所有作业的配置属性,IsolationRunner用它来创建一个JobConf实例。对于map任务,这个目录还包含一个含有输入划分序列化表示的文件,所以map任务可以取得相同的输入数据。对于reduce任务,则有一个map输出备份,它作为reducer的输入,存放在output目录中。还有一个work目录,它是task attempt的工作目录。我们改变到这个目录以运行IsolationRunner。需要设置一些选项来链接到远程调试器:

$ export HADOOP_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y, address=8000"

suspend=y 表示JVM在运行代码钱先等待调试器链接。用以下命令启动IsolationRunner:

$ hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml

下一步设置断点,链接远程调试器,然后任务在你控制下运行。

Hadoop有很多分析工具,用于对作业的调优。请另行了解。

至此MapReduce应用开发机制已大概说清了。建议MapReduce作业处理简单的问题,如果处理复杂的问题,不应该写更复杂的map和reduce函数,通常是增加更多得作业,而不是增加作业的复杂度。对于更复杂的问题,可考虑使用比MapReduce更高级的语言,如Pig、hive或Cascading。一个直接好处是:有了它之后,就用不着处理到MapReduce作业的转换,而是集中精力分析正在执行的任务。

我猜你可能也喜欢:

4 Comments - Leave a comment
  1. jared说道:

    你好,我有个问题请教,如果我想给每个task制定个性化的配置属性,比如,io.sort.mb, io.sort.factor,如何实现最好? 因为一般的情况是每个job都一次性在配置文件里设置好,然后就固定了。当每个task生成时都去读取这个统一的配置文件,我想实现让每个task都读取不一样的配置。如果有任何建议都欢迎指教,谢谢!我的邮箱:jaredcdz@hotmail.com。:-)

  2. Leon说道:

    你好,我在Linux下运行Hadoop,没有使用IDE,现在有个问题,怎么用命令行来运行单元测试,期待您的回复!(请给我发Email,谢谢)

Leave a comment

电子邮件地址不会被公开。 必填项已用*标注

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>


Welcome , today is 星期日, 2017 年 11 月 19 日