mapred

Table of Contents

1. Scheduler

2. hadoop-lzo

2.1. 安装配置

  • 安装liblzo2库,sudo apt-get install liblzo2-2(liblzo2-dev)
  • 安装lzop程序,sudo apt-get install lzop
    • for MAC. brew install lzop
  • 下载hadoop-lzo. git clone git://github.com/kevinweil/hadoop-lzo.git
  • 编译,ant compile-native tar
  • 将结果build/hadoop-lzo-0.4.14的hadoop-lzo-0.4.14.jar复制到 $HADOOP_HOME/lib,将build/hadoop-lzo-0.4.14/lib/native下的文件复制到$HADOOP_HOME/lib/native (native/Linux-amd64-64)
    • 如果没有安装hadoop只是打算本地编译然后提交任务的话,那么可以设置下面两个环境变量
    • export HADOOP_CLASSPATH=/path/to/your/hadoop-lzo-lib.jar
    • export JAVA_LIBRARY_PATH=/path/to/hadoop-lzo-native-libs:/path/to/standard-hadoop-native-libs
  • 修改core-site.xml配置
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.BZip2Codec</value>
</property>
<property>
<name>io.compression.codec.lzo.class</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

2.2. 使用lzo

关于如何使用lzo可以参看代码示例 code on github. 下面几点需要说明:

  • LzoTextInputFormat用来处理lzo压缩的文本文件,
    • hadoop-lzo本身没有自带LzoTextOutputFormat,这样的话之能够输出TextOutputFormat然后通过lzop压缩
    • elephant-bird提供了很多额外的InputFormat/OutputFormat.
  • 如果使用本地测试集群的话,那么需要提供lzo encoder/decoder.
    • Configuration conf = new Configuration(); // 如果是本地测试集群的话,那么是没有读取core-site.xml的,因此也就是没有编码解码信息,需要通过下面语句提供
    • conf.set("io.compression.codecs", "org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,com.hadoop.compression.lzo.LzopCodec");
    • conf.set("io.compression.codec.lzo.class", "com.hadoop.compression.lzo.LzoCodec");
  • LzoIndexer.main(new String[]{kOutputFileName}); // 如果需要在程序里面进行indexer的话,那么可以直接调用LzoIndexer
    • 如果是分布式的话可以调用 DistributedLzoIndexer.main
    • 当然也可以按照下面的方法通过程序调用

之后在输出目录下面会存在很多lzo文件,但是这些文件并不能够直接作为输入使用(因为hadoop不知道如何对这些lzo文件进行切分),需要对这些lzo文件进行索引。使用下面的命令来进行索引:

  • hadoop jar /usr/lib/hadoop/lib/hadoop-lzo-0.4.14.jar com.hadoop.compression.lzo.DistributedLzoIndexer <output-dir>

完成之后对于每一个lzo文件都会存在一个.index文件。如果重复运行上面命令的话,会检查.index文件是否存在,如果存在的话那么就不会重新进行索引。

如果需要单独使用lzo而不是使用mapreduce来做压缩和解压缩的话,可以参考 code on github.

2.3. 配合protobuf

elephant-bird实现了protobuf+lzo组合使用方式。

首先创建proto文件,比如message.proto

package com.dirlt.java.mr.proto;

// FATAL: This name works as a version number
// Increase this number everytime you do a non-compatible modification!!
// The block storage writer is responsible for write the version number.
option java_outer_classname = "MessageProtos1";

message Message {
  required string text = 1;
}
  • package 名字空间
  • java_outer_classname 具体输出类名称

使用protoc –java_out=<dir> message.proto就会在<dir>下面生成MessageProtos1.java文件。

此外我们还需要为这个类写几个辅助类,但是索性的是辅助类并不是很复杂。

public class MessageLzoProtobufInputFormat extends LzoProtobufBlockInputFormat<MessageProtos1.Message> {
    public MessageLzoProtobufInputFormat() {
        super(new TypeRef<MessageProtos1.Message>() {
        });
    }
}
public class MessageLzoProtobufOutputFormat extends LzoProtobufBlockOutputFormat<MessageProtos1.Message> {
    public MessageLzoProtobufOutputFormat() {
        super(new TypeRef<MessageProtos1.Message>() {
        });
    }
}
public class MessageLzoProtobufWritable extends ProtobufWritable<MessageProtos1.Message> {
    public MessageLzoProtobufWritable() {
        super(new TypeRef<MessageProtos1.Message>() {
        });
    }

    public MessageLzoProtobufWritable(MessageProtos1.Message message) {
        super(message, new TypeRef<MessageProtos1.Message>() {
        });
    }
}

关于如何使用lzo+protobuf可以参看代码示例 code on github. 值得注意的是如果输入为lzo文件的话,那么类型是ProtobufWritable<M>(泛型),如果需要取值的话必须通过setConverter提供类信息。

3. 多路输入

3.1. MultipleInputs

  • 参考代码 code on github
  • 支持一个htable和多个文件(但是对于htable不支持设置scan范围)
  • 代码大致过程:
    • 使用 TableMapReduceUtil.initTableMapperJob 初始化htable输入(作用就是为了指定htable的input table name)
    • 之后在调用一次 MultipleInputs.addInputPath(job, new Path(kInTableName1), TableInputFormat.class, TMapper.class); 这里的kInTableName1可以随便定义,但是不要和接下来的hdfs路径重名。(作用是为了能够调整input format)
    • 接下来就是添加hdfs输入 MultipleInputs.addInputPath(job, new Path(kInFileName1), TextInputFormat.class, FMapper.class); 可以调用多次来添加多个hdfs输入源。
  • 原理解释:
    • 使用MultipleInputs的话,hadoop会在环境变量中将输入内容设置成为inputPath=className, inputPath=className这样的字符串
    • MultipleInputs底层将InputFormat替换成为了自己的DelegateInputFormat.
    • DelegateInputFormat根据每个className初始化实例并且将inputPath给这个实例,这些对于FileInputFormat工作很好
    • 而对于TableInputFormat没有使用这个inputPath,而是直接读取configuration里面设置的TableOutputFormat.OUTPUT_TABLE这个值

3.2. MultipleTableInputFormat

  • 参考代码 code on github
  • 在cdh4.3.0下面运行的代码略有变动,存放在 code on github
    • 需要配合MultipleInputs使用
    • 使用这个InputFormat可以同时支持多文件输入和多表输入
    • 多表输入还支持在一个table上面使用多个scan.
  • 大致原理如下:
    • MultipleInputs底层使用Delegate模式,将inputFormat以及mapper和Path关联,然后将InputFormat实例化来对path进行切片得到InputSplit以及RecordReader.
    • 为了能够和MultipleInputs兼容使用,代码实现上将TableInput转换成为String然后表示成为Path(TableInput包括tableName以及多个scan对象)
      • string格式为 <tableName> ! hexString(scan) ! hexString(scan)
      • 从path中将TableInput字符串分离的代码是 path.toString().substring(path.getParent().toString().length() + 1);
    • MultipleTableInputFormat进行切片的时候将path取出内容进行解析,分离出TableInput出来,然后调用TableInputFormatBase的分片策略进行分片
      • setConf空实现是因为在ReflectionUtils.newInstance创建实例的时候会调用,而MultipleTableInputFormat本身没有使用到。
    • MultipleTableSplit的引入主要是因为TableSplit没有包含scan对象,而这个对象需要在TableRecordReader里面使用到。
      • InputSplit需要实现序列化的接口,因为切片信息生成是JobTracker完成保存在hdfs的,然后TaskTracker从hdfs中读取。

4. 多路输出

4.1. MultipleOutputs

  • 参考代码 code on github
  • 支持多个htable和多个文件
  • 如果使用write(String namedOutput, K key, V value)会写到FileOutputFormat设置的目录下面,文件附上前缀namedOutput-,
  • 如果使用write(String namedOutput, K key, V value, String baseOutputPath)
    • 如果baseOutputPath以/开头的话比如/a/b/c的话,那么输出文件为/a/b/c-m-00000
    • 如果baseOutputPath以/结尾的话比如/ab/c/的话,那么输出文件为/a/b/c/-m-00000.
    • 如果baseOutputPath没有以/开头的话,那么写到FileOutputFormat设置的目录下面,文件附上前缀baseOutputPath-.
  • 因为最后输出是调用MultipleOutputs.write而非Context.write,因此和mrunit配合不太好
    • 可以通过MockMultipleOutputs来进行测试 参考代码 com.dirlt.java.mr.MockMultipleOutput

4.2. MultipleTableOutputFormat

  • 参考代码 code on github
  • 支持多个htable
  • 调用context.write的key需要指定outputTable
    • 其实这也意味着如果是单表输出的话那么key为null即可
    • 前提是需要使用conf.set(TableOutputFormat.OUTPUT_TABLE,tableName);

5. 获取集群运行状况

  • 参考代码 code on github
  • 获取更多信息可以阅读JobClient API

6. OutOfMemoryError

总结起来大致就是以下几种原因吧:

  • Increase the heap size for the TaskTracker, I did this by changing HADOOP_HEAPSIZE to 4096 in /etc/hadoop/conf/hadoop-env.sh to test. This did not solve it.(增加TaskTracker的heapsize)
  • Increase the heap size for the spawned child. Add -Xmx1024 in mapred-site.xml for mapred.map.child.java.opts. This did not solve it. (增加task的heapsize)
  • Make sure that the limit of open files is not reached, I had already done this by adding “mapred – nofile 65536″ in /etc/security/limits.conf. This did not solve it. (增加文件数目限制)
  • Adding the following to /etc/security/limits.conf and restarting the TaskTracker solved it "mapred – nproc 8192" (增加开辟子进程的数目)

7. topology rack awareness

有两种方式实现,主要是实现DNS-name/IP到network path映射,network path是如下格式的字符串

  • /switch/rack

第一种可以通过设置topology.node.switch.mapping.impl来设定DNSToSwitchMapping类

public interface DNSToSwitchMapping {
  public List<String> resolve(List<String> names);
}

实现这个类来完成DNS-name/IP-name到network path的映射.

但是存在另外一种更好的办法就是ScriptBasedMapping,这个是DNSToSwichMapping的一个实现,可以通过配置脚本来做映射。 将属性topology.script.filename设置成为脚本,脚本输入names,然后返回结果是按照空格或者是回车分隔的列表即可。 #note: 内部使用StringTokenizer来拆分结果

8. streaming

streaming允许用户使用脚本来编写mapper/reducer,使用stdin/stdout作为通信接口。tasktracker spwan一个特殊的task, 这个task将mapper/reducer数据通过pipe传递给脚本。 #note: hadoop pipes则是使用unix socket和C++处理程序通信,基本思想是相同的

调用方式:hadoop $HADOOP_HOME/contrib/streaming/hadoop-streaming-*.jar <OPTIONS> 其中OPTIONS如下:

  • -input
  • -output
  • -mapper # 执行命令比如cat,grep等,也可以是脚本但是必须+x
  • -reducer
  • -libjars
  • -file # 执行任务中需要的文件. 如果是运行脚本的话那么脚本必须在这里也指定,这样才能分发到所有机器上
  • -partitioner # 必须是java class
  • -combiner # 必须是java class
  • -D # 作业属性等
  • -numReduceTasks # reduce数目
  • -inputformat
  • -outputformat
  • -verbose # 详细输出
  • -cmdenv # 环境变量
  • -mapdebug # Script to call when map task fails
  • -reducedebug # Script to call when reduce task fails

作业属性里面除了在编写Java MapReduce里面会涉及的属性外,一些和streaming相关的属性如下:

  • stream.map.input.field.seperator / stream.map.output.field.seperator # map input/output kv分隔符,默认是\t
  • stream.map.output.key.fields # map task输出记录中key所占域数目
  • stream.reduce.input.field.seperator / stream.reduce.outout.field.separator
  • stream.reduce.output.key.fields
  • stream.non.zero.exit.is.failure = false # 默认返回0正常,但是也可以忽略

9. Too many fetch-failures

集群中出现job中少数几个任务卡在reduce的copy阶段,并且这几个任务都是在同一个机器执行。tasktracker log日志如下: 可以看到有两个task,49和50

2013-04-26 13:57:01,144 INFO org.apache.hadoop.mapred.TaskTracker: attempt_201301231102_24403_r_000050_1 0.1976251% reduce > copy (466 of 786 at 24.13 MB/s) >
2013-04-26 13:57:03,595 INFO org.apache.hadoop.mapred.TaskTracker: attempt_201301231102_24403_r_000049_1 0.1976251% reduce > copy (466 of 786 at 24.40 MB/s) >
2013-04-26 13:57:04,179 INFO org.apache.hadoop.mapred.TaskTracker: attempt_201301231102_24403_r_000050_1 0.1976251% reduce > copy (466 of 786 at 24.13 MB/s) >
2013-04-26 13:57:09,620 INFO org.apache.hadoop.mapred.TaskTracker: attempt_201301231102_24403_r_000049_1 0.1976251% reduce > copy (466 of 786 at 24.40 MB/s) >

检查网卡,CPU,IO,内存都非常正常。使用kill -QUIT <pid>然后到userlogs stdout察看stacktrace,日志如下:

2013-04-26 14:08:07
Full thread dump Java HotSpot(TM) 64-Bit Server VM (20.12-b01 mixed mode):

"Thread for polling Map Completion Events" daemon prio=10 tid=0x00007fd5447d5000 nid=0x33e7e waiting on condition [0x00007fd51d66c000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(Native Method)
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$GetMapEventsThread.run(ReduceTask.java:2769)

"Thread for merging in memory files" daemon prio=10 tid=0x00007fd5447d3000 nid=0x33e7d in Object.wait() [0x00007fd51d76d000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
- waiting on <0x0000000716b55598> (a java.lang.Object)
	at java.lang.Object.wait(Object.java:485)
g	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$ShuffleRamManager.waitForDataToMerge(ReduceTask.java:1117)
- locked <0x0000000716b55598> (a java.lang.Object)
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2650)

"Thread for merging on-disk files" daemon prio=10 tid=0x00007fd5447cf000 nid=0x33e7c in Object.wait() [0x00007fd51d86e000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
- waiting on <0x0000000716b55668> (a java.util.TreeSet)
	at java.lang.Object.wait(Object.java:485)
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$LocalFSMerger.run(ReduceTask.java:2549)
- locked <0x0000000716b55668> (a java.util.TreeSet)

"MapOutputCopier attempt_201301231102_24403_r_000049_1.4" prio=10 tid=0x00007fd5447cd000 nid=0x33e7b in Object.wait() [0x00007fd51d96f000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:485)
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:1244)
- locked <0x0000000716b3bf00> (a java.util.ArrayList)

"MapOutputCopier attempt_201301231102_24403_r_000049_1.3" prio=10 tid=0x00007fd5447cb000 nid=0x33e7a in Object.wait() [0x00007fd51da70000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:485)
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:1244)
- locked <0x0000000716b3bf00> (a java.util.ArrayList)

"MapOutputCopier attempt_201301231102_24403_r_000049_1.2" prio=10 tid=0x00007fd54460c000 nid=0x33e79 in Object.wait() [0x00007fd51db71000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:485)
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:1244)
- locked <0x0000000716b3bf00> (a java.util.ArrayList)

"MapOutputCopier attempt_201301231102_24403_r_000049_1.1" prio=10 tid=0x00007fd54460a000 nid=0x33e78 in Object.wait() [0x00007fd51dc72000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:485)
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:1244)
- locked <0x0000000716b3bf00> (a java.util.ArrayList)

"MapOutputCopier attempt_201301231102_24403_r_000049_1.0" prio=10 tid=0x00007fd544609800 nid=0x33e77 in Object.wait() [0x00007fd51dd73000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:485)
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:1244)
- locked <0x0000000716b3bf00> (a java.util.ArrayList)

"Timer thread for monitoring mapred" daemon prio=10 tid=0x00007fd54464f000 nid=0x33e76 in Object.wait() [0x00007fd51de74000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.util.TimerThread.mainLoop(Timer.java:509)
- locked <0x0000000716b74b90> (a java.util.TaskQueue)
	at java.util.TimerThread.run(Timer.java:462)

"communication thread" daemon prio=10 tid=0x00007fd544586800 nid=0x33e6c waiting on condition [0x00007fd51df75000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(Native Method)
	at org.apache.hadoop.mapred.Task$TaskReporter.run(Task.java:645)
	at java.lang.Thread.run(Thread.java:662)

"Timer thread for monitoring jvm" daemon prio=10 tid=0x00007fd544525800 nid=0x33e6a in Object.wait() [0x00007fd51e177000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.util.TimerThread.mainLoop(Timer.java:509)
- locked <0x0000000716b74cf0> (a java.util.TaskQueue)
	at java.util.TimerThread.run(Timer.java:462)

"Thread for syncLogs" daemon prio=10 tid=0x00007fd544497800 nid=0x33e51 waiting on condition [0x00007fd51e481000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(Native Method)
	at org.apache.hadoop.mapred.Child$3.run(Child.java:155)

"sendParams-0" daemon prio=10 tid=0x00007fd544464800 nid=0x33e50 waiting on condition [0x00007fd51e582000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000716b3bcf8> (a java.util.concurrent.SynchronousQueue$TransferStack)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
	at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424)
	at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323)
	at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:874)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
	at java.lang.Thread.run(Thread.java:662)

"IPC Client (47) connection to /127.0.0.1:12540 from job_201301231102_24403" daemon prio=10 tid=0x00007fd544430000 nid=0x33e4f in Object.wait() [0x00007fd51e683000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at org.apache.hadoop.ipc.Client$Connection.waitForWork(Client.java:680)
- locked <0x0000000716a64810> (a org.apache.hadoop.ipc.Client$Connection)
	at org.apache.hadoop.ipc.Client$Connection.run(Client.java:723)

"Low Memory Detector" daemon prio=10 tid=0x00007fd5440ae800 nid=0x33e38 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" daemon prio=10 tid=0x00007fd5440ac000 nid=0x33e37 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" daemon prio=10 tid=0x00007fd5440a9000 nid=0x33e36 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" daemon prio=10 tid=0x00007fd5440a7000 nid=0x33e35 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" daemon prio=10 tid=0x00007fd54408b000 nid=0x33e34 in Object.wait() [0x00007fd53cf41000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
- locked <0x0000000716b74f78> (a java.lang.ref.ReferenceQueue$Lock)
	at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
	at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)

"Reference Handler" daemon prio=10 tid=0x00007fd544089000 nid=0x33e32 in Object.wait() [0x00007fd53d042000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:485)
	at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
- locked <0x0000000716b3ba60> (a java.lang.ref.Reference$Lock)

"main" prio=10 tid=0x00007fd544009800 nid=0x33e0a waiting on condition [0x00007fd5484ee000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(Native Method)
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.fetchOutputs(ReduceTask.java:2099)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:382)
	at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1157)
	at org.apache.hadoop.mapred.Child.main(Child.java:264)

"VM Thread" prio=10 tid=0x00007fd544082800 nid=0x33e30 runnable

"GC task thread#0 (ParallelGC)" prio=10 tid=0x00007fd54401c800 nid=0x33e11 runnable

"GC task thread#1 (ParallelGC)" prio=10 tid=0x00007fd54401e800 nid=0x33e13 runnable

"GC task thread#2 (ParallelGC)" prio=10 tid=0x00007fd544020800 nid=0x33e15 runnable

"GC task thread#3 (ParallelGC)" prio=10 tid=0x00007fd544022000 nid=0x33e17 runnable

"GC task thread#4 (ParallelGC)" prio=10 tid=0x00007fd544024000 nid=0x33e19 runnable

"GC task thread#5 (ParallelGC)" prio=10 tid=0x00007fd544026000 nid=0x33e1b runnable

"GC task thread#6 (ParallelGC)" prio=10 tid=0x00007fd544027800 nid=0x33e1d runnable

"GC task thread#7 (ParallelGC)" prio=10 tid=0x00007fd544029800 nid=0x33e1f runnable

"GC task thread#8 (ParallelGC)" prio=10 tid=0x00007fd54402b000 nid=0x33e21 runnable

"GC task thread#9 (ParallelGC)" prio=10 tid=0x00007fd54402d000 nid=0x33e23 runnable

"GC task thread#10 (ParallelGC)" prio=10 tid=0x00007fd54402f000 nid=0x33e25 runnable

"GC task thread#11 (ParallelGC)" prio=10 tid=0x00007fd544030800 nid=0x33e27 runnable

"GC task thread#12 (ParallelGC)" prio=10 tid=0x00007fd544032800 nid=0x33e29 runnable

"GC task thread#13 (ParallelGC)" prio=10 tid=0x00007fd544034800 nid=0x33e2a runnable

"GC task thread#14 (ParallelGC)" prio=10 tid=0x00007fd544036000 nid=0x33e2b runnable

"GC task thread#15 (ParallelGC)" prio=10 tid=0x00007fd544038000 nid=0x33e2c runnable

"GC task thread#16 (ParallelGC)" prio=10 tid=0x00007fd54403a000 nid=0x33e2d runnable

"GC task thread#17 (ParallelGC)" prio=10 tid=0x00007fd54403b800 nid=0x33e2e runnable

"VM Periodic Task Thread" prio=10 tid=0x00007fd5440c1000 nid=0x33e39 waiting on condition

JNI global references: 1033

Heap
 PSYoungGen      total 93120K, used 32400K [0x00000007aaab0000, 0x00000007b2ec0000, 0x0000000800000000)
  eden space 92352K, 34% used [0x00000007aaab0000,0x00000007ac9940b0,0x00000007b04e0000)
  from space 768K, 100% used [0x00000007b0600000,0x00000007b06c0000,0x00000007b06c0000)
  to   space 21440K, 0% used [0x00000007b19d0000,0x00000007b19d0000,0x00000007b2ec0000)
 PSOldGen        total 898368K, used 604768K [0x0000000700000000, 0x0000000736d50000, 0x00000007aaab0000)
  object space 898368K, 67% used [0x0000000700000000,0x0000000724e980d8,0x0000000736d50000)
 PSPermGen       total 29120K, used 14619K [0x00000006fae00000, 0x00000006fca70000, 0x0000000700000000)
  object space 29120K, 50% used [0x00000006fae00000,0x00000006fbc46da8,0x00000006fca70000)

从日志上看都非常正常,但是看failed map里面出现非常多的Too many fetch-failures,并且这些机器都是最近上的一批机器。Too many fetch-failures这个错误通常表明 网络联通情况不是很顺畅 ,检查之后发现新上的 这批机器没有出现在这个机器的hosts里面

10. blacklist

  • http://www.mapr.com/doc/display/MapR/TaskTracker+Blacklisting
  • Per-Job Blacklisting
    • The configuration value mapred.max.tracker.failures in mapred-site.xml specifies a number of task failures in a specific job after which the TaskTracker is blacklisted for that job. The TaskTracker can still accept tasks from other jobs, as long as it is not blacklisted cluster-wide (see below).
    • A job can only blacklist up to 25% of TaskTrackers in the cluster.
  • Cluster-Wide Blacklisting
    • A TaskTracker can be blacklisted cluster-wide for any of the following reasons:
      • The number of blacklists from successful jobs (the fault count) exceeds mapred.max.tracker.blacklists
      • The TaskTracker has been manually blacklisted using hadoop job -blacklist-tracker <host>
      • The status of the TaskTracker (as reported by a user-provided health-check script) is not healthy
    • If a TaskTracker is blacklisted, any currently running tasks are allowed to finish, but no further tasks are scheduled. If a TaskTracker has been blacklisted due to mapred.max.tracker.blacklists or using the hadoop job -blacklist-tracker <host> command, un-blacklisting requires a TaskTracker restart.
    • Only 50% of the TaskTrackers in a cluster can be blacklisted at any one time.
    • After 24 hours, the TaskTracker is automatically removed from the blacklist and can accept jobs again.

一旦tt被blacklist之后,会出现如下日志。从语义上看就是删除在这个tt上面所有执行的job(in purgeJob)

2013-05-02 11:55:18,170 INFO org.apache.hadoop.mapred.TaskTracker: Received 'KillJobAction' for job: job_201301231102_25218
2013-05-02 11:55:18,170 WARN org.apache.hadoop.mapred.TaskTracker: Unknown job job_201301231102_25218 being deleted.

而在jt里面会出现如下日志

2013-05-14 03:19:37,493 INFO org.apache.hadoop.mapred.JobTracker: Adding dp31.umeng.com to the blacklist across all jobs
2013-05-14 03:19:37,493 INFO org.apache.hadoop.mapred.JobTracker: Blacklisting tracker : dp31.umeng.com Reason for blacklisting is : EXCEEDING_FAILURES

11. 磁盘空间

随着我们以5T/day数据量曾长,很多机器磁盘都已经出现饱和情况(可能是某个data目录已经饱和),对我们的tasktracker也造成了一定的影响。我抽取了了今天所有挂掉的tt来做post-mortem. 结果是这样的。其中有几个tt前几天已经是被blacklist的。这些机器有 dp24,dp31,dp34,dp36,dp41. 我们后续对磁盘规划的话,一定需要预留一部分磁盘空间出来给tt(不知道现在是否预留足够),另外一个就是我们调研一下如何tt是否可以强制清理,自己腾出一部分磁盘空间。

我始终感觉tt在磁盘空间处理上没有datanode那样智能,原因也会是显而易见的,毕竟datanode是面向存储的会合理规划每个磁盘,而tt面向计算任务可能就是使用round-robin方式随即挑选磁盘作为自己的工作目录,不考虑上面磁盘空间是否足够等原因。

磁盘空间不足的机器上面都出现了很多非常诡异的日志,比如找不到job directory目录,或者是读取map output出现错误等


dp24 OOM

dp31

dp@dp31:~$ df -h
Filesystem      Size  Used Avail Use% Mounted on
/dev/sda6       1.9T   34G  1.7T   2% /
udev             24G  4.0K   24G   1% /dev
tmpfs           9.5G  352K  9.5G   1% /run
none            5.0M     0  5.0M   0% /run/lock
none             24G     0   24G   0% /run/shm
/dev/sda1       140M   55M   79M  42% /boot
/dev/sdf1       1.8T  1.7T  781M 100% /data/data5
/dev/sdb1       1.8T  1.7T  632M 100% /data/data1
/dev/sdc1       1.8T  1.7T  1.0G 100% /data/data2
/dev/sdd1       1.8T  1.7T  1.7G 100% /data/data3
/dev/sdg1       1.8T  1.7T  973M 100% /data/data6
/dev/sdh1       1.8T  1.7T  917M 100% /data/data7
/dev/sde1       1.8T  346G  1.4T  20% /data/data4

dp34

dp@dp34:~$ df -h
Filesystem      Size  Used Avail Use% Mounted on
/dev/sda6       1.9T   33G  1.7T   2% /
udev             24G  4.0K   24G   1% /dev
tmpfs           9.5G  332K  9.5G   1% /run
none            5.0M     0  5.0M   0% /run/lock
none             24G     0   24G   0% /run/shm
/dev/sda1       140M   32M  102M  24% /boot
/dev/sdb1       1.9T  1.8T  1.1G 100% /data/data1
/dev/sdc1       1.9T  1.8T  968M 100% /data/data2
/dev/sdd1       1.9T  1.8T  1.9G 100% /data/data3
/dev/sde1       1.9T  1.8T  1.1G 100% /data/data4
/dev/sdf1       1.9T  1.8T  1.2G 100% /data/data5
/dev/sdh1       1.9T  1.8T  236M 100% /data/data7
/dev/sdg1       1.8T  691G  1.1T  40% /data/data6

dp36 2013-05-07就挂了,在也没有起来过,但是从当时FATAL日志看来已经是没有磁盘空间了。

dp@dp36:~$ df -h
Filesystem      Size  Used Avail Use% Mounted on
/dev/sda6       1.9T   32G  1.7T   2% /
udev             24G  4.0K   24G   1% /dev
tmpfs           9.5G  332K  9.5G   1% /run
none            5.0M     0  5.0M   0% /run/lock
none             24G     0   24G   0% /run/shm
/dev/sda1       140M   32M  102M  24% /boot
/dev/sde1       1.9T  1.8T   18G 100% /data/data4
/dev/sdb1       1.8T  1.7T   22G  99% /data/data1
/dev/sdc1       1.8T  1.7T   61G  97% /data/data2
/dev/sdd1       1.8T  932G  810G  54% /data/data3
/dev/sdf1       1.9T  1.8T   21G  99% /data/data5
/dev/sdg1       1.9T  1.8T   20G  99% /data/data6
/dev/sdh1       1.9T  1.8T   18G  99% /data/data7

dp41

dp@dp41:~$ df -h
Filesystem      Size  Used Avail Use% Mounted on
/dev/sda6       1.9T   34G  1.7T   2% /
udev             24G  4.0K   24G   1% /dev
tmpfs           9.5G  344K  9.5G   1% /run
none            5.0M     0  5.0M   0% /run/lock
none             24G     0   24G   0% /run/shm
/dev/sda1       140M   32M  102M  24% /boot
/dev/sdb1       1.9T  1.8T   17G 100% /data/data1
/dev/sdc1       1.9T  1.8T   15G 100% /data/data2
/dev/sdg1       1.9T  1.8T   13G 100% /data/data6
/dev/sdd1       1.9T  1.8T   15G 100% /data/data3
/dev/sdf1       1.9T  1.8T   14G 100% /data/data5
/dev/sdh1       1.9T  1.8T   13G 100% /data/data7
/dev/sde1       1.9T  535G  1.3T  31% /data/data4

12. getMapOutput failed

12.1. normal case

dp41上面出现如下错误日志,可以看到dp41在作为map output server时候,dp16从这些取数据但是失败,并且可以分析

  • map task attempt_201301231102_33841_m_000331_0
  • reduce task id = 15
2013-05-13 12:23:06,708 WARN org.apache.hadoop.mapred.TaskTracker: getMapOutput(attempt_201301231102_33841_m_000331_0,15) failed :
org.mortbay.jetty.EofException
        at org.mortbay.jetty.HttpGenerator.flush(HttpGenerator.java:791)
        at org.mortbay.jetty.AbstractGenerator$Output.blockForOutput(AbstractGenerator.java:551)
        at org.mortbay.jetty.AbstractGenerator$Output.flush(AbstractGenerator.java:572)
        at org.mortbay.jetty.HttpConnection$Output.flush(HttpConnection.java:1012)
        at org.mortbay.jetty.AbstractGenerator$Output.write(AbstractGenerator.java:651)
        at org.mortbay.jetty.AbstractGenerator$Output.write(AbstractGenerator.java:580)
        at org.apache.hadoop.mapred.TaskTracker$MapOutputServlet.doGet(TaskTracker.java:4061)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
        at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
        at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221)
        at org.apache.hadoop.http.HttpServer$QuotingInputFilter.doFilter(HttpServer.java:829)
        at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
        at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
        at org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216)
        at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
        at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:766)
        at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450)
        at org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)
        at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
        at org.mortbay.jetty.Server.handle(Server.java:326)
        at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)
        at org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928)
        at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549)
        at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)
        at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)
        at org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:410)
        at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
Caused by: java.io.IOException: Broken pipe
        at sun.nio.ch.FileDispatcher.write0(Native Method)
        at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:29)
        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:69)
        at sun.nio.ch.IOUtil.write(IOUtil.java:40)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:336)
        at org.mortbay.io.nio.ChannelEndPoint.flush(ChannelEndPoint.java:171)
        at org.mortbay.io.nio.SelectChannelEndPoint.flush(SelectChannelEndPoint.java:221)
        at org.mortbay.jetty.HttpGenerator.flush(HttpGenerator.java:725)
        ... 27 more

2013-05-13 12:23:06,708 WARN org.mortbay.log: Committed before 410 getMapOutput(attempt_201301231102_33841_m_000331_0,15) failed :
org.mortbay.jetty.EofException
        at org.mortbay.jetty.HttpGenerator.flush(HttpGenerator.java:791)
        at org.mortbay.jetty.AbstractGenerator$Output.blockForOutput(AbstractGenerator.java:551)
        at org.mortbay.jetty.AbstractGenerator$Output.flush(AbstractGenerator.java:572)
        at org.mortbay.jetty.HttpConnection$Output.flush(HttpConnection.java:1012)
        at org.mortbay.jetty.AbstractGenerator$Output.write(AbstractGenerator.java:651)
        at org.mortbay.jetty.AbstractGenerator$Output.write(AbstractGenerator.java:580)
        at org.apache.hadoop.mapred.TaskTracker$MapOutputServlet.doGet(TaskTracker.java:4061)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)

        at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
        at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221)
        at org.apache.hadoop.http.HttpServer$QuotingInputFilter.doFilter(HttpServer.java:829)
        at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
        at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
        at org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216)
        at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
        at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:766)
        at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450)
        at org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)
        at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
        at org.mortbay.jetty.Server.handle(Server.java:326)
        at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)
        at org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928)
        at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549)
        at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)
        at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)
        at org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:410)
        at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
Caused by: java.io.IOException: Broken pipe
        at sun.nio.ch.FileDispatcher.write0(Native Method)
        at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:29)
        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:69)
        at sun.nio.ch.IOUtil.write(IOUtil.java:40)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:336)
        at org.mortbay.io.nio.ChannelEndPoint.flush(ChannelEndPoint.java:171)
        at org.mortbay.io.nio.SelectChannelEndPoint.flush(SelectChannelEndPoint.java:221)
        at org.mortbay.jetty.HttpGenerator.flush(HttpGenerator.java:725)
        ... 27 more

2013-05-13 12:23:06,708 INFO org.apache.hadoop.mapred.TaskTracker.clienttrace: src: 10.11.0.41:50060, dest: 10.11.0.16:3294, bytes: 0, op: MAPRED_SHUFFLE, cliID: attempt_201301231102_33841_m_000331_0, duration: 19292379
2013-05-13 12:23:06,709 ERROR org.mortbay.log: /mapOutput
java.lang.IllegalStateException: Committed
        at org.mortbay.jetty.Response.resetBuffer(Response.java:1023)
        at org.mortbay.jetty.Response.sendError(Response.java:240)
        at org.apache.hadoop.mapred.TaskTracker$MapOutputServlet.doGet(TaskTracker.java:4094)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
        at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
        at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221)
        at org.apache.hadoop.http.HttpServer$QuotingInputFilter.doFilter(HttpServer.java:829)
        at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
        at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
        at org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216)
        at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
        at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:766)
        at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450)
        at org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)
        at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
        at org.mortbay.jetty.Server.handle(Server.java:326)
        at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)
        at org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928)
        at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549)
        at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)
        at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)
        at org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:410)
        at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)

然后察看这个job两个task情况,性能数据如下:

=mapper in dp41=

monitor
by pass counter	2,557,788

FileSystemCounters
FILE_BYTES_READ	223,919,205
HDFS_BYTES_READ	1,229,397,574
FILE_BYTES_WRITTEN	445,249,847

Map-Reduce Framework
Combine output records	0
Map input records	3,155,527
Physical memory (bytes) snapshot	1,076,948,992
Spilled Records	4,781,912
Map output bytes	523,441,900
Total committed heap usage (bytes)	1,214,513,152
CPU time spent (ms)	104,400
Virtual memory (bytes) snapshot	3,522,392,064
SPLIT_RAW_BYTES	321
Map output records	2,390,956
Combine input records	0

LzoBlocks of com.umeng.analytics.proto.DailyLaunchProtos4$DailyLaunchInfo
Errors	0
Records Read	3,155,527

=reducer in dp16=

FileSystemCounters
FILE_BYTES_READ	28,468,696,830
FILE_BYTES_WRITTEN	28,468,769,783

Map-Reduce Framework
Reduce input groups	7
Combine output records	0
Reduce shuffle bytes	28,332,344,050
Physical memory (bytes) snapshot	2,283,491,328
Reduce output records	11
Spilled Records	339,962,329
Total committed heap usage (bytes)	1,863,778,304
CPU time spent (ms)	3,031,800
Virtual memory (bytes) snapshot	3,255,595,008
Combine input records	0
Reduce input records	321,140,243

性能数据非常正常. 然后察看dp16对应时刻的日志,发现这个getMapOutput只不过是一个偶然的失败。但是其实从dp16上面的日志来看,可能只是取map output部分block没有成功,但是后续还是成功的,观察 attempt_201301231102_33841_r_000015_0 传输百分比和block数目。

2013-05-13 12:23:04,575 INFO org.apache.hadoop.mapred.TaskTracker: attempt_201301231102_33841_r_000015_0 0.01798942% reduce > copy (102 of 1890 at 19.63 MB/s) >
2013-05-13 12:23:04,620 INFO org.apache.hadoop.mapred.TaskTracker.clienttrace: src: 10.11.0.16:50060, dest: 10.11.0.50:29582, bytes: 28814, op: MAPRED_SHUFFLE, cliID: attempt_201301231102_33841_m_000480_0, duration: 3011431376
2013-05-13 12:23:05,438 INFO org.apache.hadoop.mapred.TaskTracker.clienttrace: src: 10.11.0.16:50060, dest: 10.11.0.54:38101, bytes: 13541361, op: MAPRED_SHUFFLE, cliID: attempt_201301231102_33841_m_000480_0, duration: 28773030913
2013-05-13 12:23:05,843 INFO org.apache.hadoop.mapred.TaskTracker: attempt_201301231102_33841_m_000520_0 0.26028645%
2013-05-13 12:23:05,924 INFO org.apache.hadoop.mapred.TaskTracker: attempt_201301231102_33841_m_000241_0 0.7132669%
2013-05-13 12:23:05,947 INFO org.apache.hadoop.mapred.TaskTracker: attempt_201301231102_33841_m_000441_0 0.70975286%
2013-05-13 12:23:06,114 INFO org.apache.hadoop.mapred.TaskTracker: attempt_201301231102_33840_m_000000_0 0.47209314%
2013-05-13 12:23:06,579 INFO org.apache.hadoop.mapred.TaskTracker: attempt_201301231102_33841_m_000094_0 0.8537282%
2013-05-13 12:23:06,755 INFO org.apache.hadoop.mapred.TaskTracker: attempt_201301231102_33841_m_000342_0 0.54360586%
2013-05-13 12:23:08,298 INFO org.apache.hadoop.mapred.TaskTracker: attempt_201301231102_33841_r_000015_0 0.019400354% reduce > copy (110 of 1890 at 20.45 MB/s) >

12.2. bas case

13. Error initializing

13.1. No such file or directory

dp31出现如下日志,这个过程是非常诡异的。从stacktrace上面可以看到过程是在创建job directory时候正要修改权限,但是文件找不到了。从dp31的df -h可以看到剩余磁盘空间如下

/dev/sdf1       1.8T  1.7T  904M 100% /data/data5
/dev/sdb1       1.8T  1.7T  681M 100% /data/data1
/dev/sdc1       1.8T  1.7T  902M 100% /data/data2
/dev/sdd1       1.8T  1.7T  1.7G 100% /data/data3
/dev/sdg1       1.8T  1.7T 1004M 100% /data/data6
/dev/sdh1       1.8T  1.7T  1.1G 100% /data/data7
/dev/sde1       1.8T  346G  1.4T  20% /data/data4

所以我怀疑是因为磁盘过满出现的一些诡异问题。 #note: 容易造成node被blacklisted.

2013-05-14 00:20:56,266 INFO org.apache.hadoop.mapred.TaskTracker: LaunchTaskAction (registerTask): attempt_201301231102_34166_m_000000_0 task's state:UNASSIGNED
2013-05-14 00:20:56,266 INFO org.apache.hadoop.mapred.TaskTracker: Trying to launch : attempt_201301231102_34166_m_000000_0 which needs 1 slots
2013-05-14 00:20:56,266 INFO org.apache.hadoop.mapred.TaskTracker: In TaskLauncher, current free slots : 12 and trying to launch attempt_201301231102_34166_m_000000_0 which needs 1 slots
2013-05-14 00:20:56,312 WARN org.apache.hadoop.conf.Configuration: /data/data5/mapred/local/ttprivate/taskTracker/xiarong/jobcache/job_201301231102_34166/job.xml:a attempt to override final parameter: mapred.submit.replication;  Ignoring.
2013-05-14 00:20:56,327 INFO org.apache.hadoop.mapred.JobLocalizer: Initializing user xiarong on this TT.
2013-05-14 00:20:56,334 WARN org.apache.hadoop.mapred.TaskTracker: Exception while localization ENOENT: No such file or directory
        at org.apache.hadoop.io.nativeio.NativeIO.chmod(Native Method)
        at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:521)
        at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:344)
        at org.apache.hadoop.mapred.JobLocalizer.createJobDirs(JobLocalizer.java:222)
        at org.apache.hadoop.mapred.DefaultTaskController.initializeJob(DefaultTaskController.java:204)
        at org.apache.hadoop.mapred.TaskTracker$4.run(TaskTracker.java:1352)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1157)
        at org.apache.hadoop.mapred.TaskTracker.initializeJob(TaskTracker.java:1327)
        at org.apache.hadoop.mapred.TaskTracker.localizeJob(TaskTracker.java:1242)
        at org.apache.hadoop.mapred.TaskTracker.startNewTask(TaskTracker.java:2563)
        at org.apache.hadoop.mapred.TaskTracker$TaskLauncher.run(TaskTracker.java:2527)

2013-05-14 00:20:56,334 ERROR org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:xiarong (auth:SIMPLE) cause:ENOENT: No such file or directory
2013-05-14 00:20:56,334 WARN org.apache.hadoop.mapred.TaskTracker: Error initializing attempt_201301231102_34166_m_000000_0:
ENOENT: No such file or directory
        at org.apache.hadoop.io.nativeio.NativeIO.chmod(Native Method)
        at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:521)
        at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:344)
        at org.apache.hadoop.mapred.JobLocalizer.createJobDirs(JobLocalizer.java:222)
        at org.apache.hadoop.mapred.DefaultTaskController.initializeJob(DefaultTaskController.java:204)
        at org.apache.hadoop.mapred.TaskTracker$4.run(TaskTracker.java:1352)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1157)
        at org.apache.hadoop.mapred.TaskTracker.initializeJob(TaskTracker.java:1327)
        at org.apache.hadoop.mapred.TaskTracker.localizeJob(TaskTracker.java:1242)
        at org.apache.hadoop.mapred.TaskTracker.startNewTask(TaskTracker.java:2563)
        at org.apache.hadoop.mapred.TaskTracker$TaskLauncher.run(TaskTracker.java:2527)

2013-05-14 00:20:56,334 ERROR org.apache.hadoop.mapred.TaskStatus: Trying to set finish time for task attempt_201301231102_34166_m_000000_0 when no start time is set, stackTrace is : java.lang.Exception
        at org.apache.hadoop.mapred.TaskStatus.setFinishTime(TaskStatus.java:185)
        at org.apache.hadoop.mapred.TaskTracker$TaskInProgress.kill(TaskTracker.java:3280)
        at org.apache.hadoop.mapred.TaskTracker.startNewTask(TaskTracker.java:2573)
        at org.apache.hadoop.mapred.TaskTracker$TaskLauncher.run(TaskTracker.java:2527)

14. shuffleInMemory OutOfMemoryError

2013-06-19 07:16:17,180 FATAL org.apache.hadoop.mapred.Task: attempt_201306141608_1574_r_009039_0 : Map output copy failure : java.lang.OutOfMemoryError: Java heap space
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.shuffleInMemory(ReduceTask.java:1612)
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.getMapOutput(ReduceTask.java:1472)
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:1321)
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:1253)

将参数mapred.job.shuffle.input.buffer.percent设置成为0.2或者是更小。关于这个问题实际上应该是一个bug,framework应该是能够智能决定shuffle在memory还是在disk上面的。可以参考代码分析一节的shuffle in memory.

15. HowManyMapsAndReduces

http://wiki.apache.org/hadoop/HowManyMapsAndReduces

Number of Maps:

  • The number of maps is usually driven by the number of DFS blocks in the input files. Although that causes people to adjust their DFS block size to adjust the number of maps. (map数量通常都是由输入文件的blocks决定的,因此可以通过调整blocksize来调整map的数量)
  • The right level of parallelism for maps seems to be around 10-100 maps/node, although we have taken it up to 300 or so for very cpu-light map tasks. Task setup takes awhile, so it is best if the maps take at least a minute to execute. (因为map启动需要花费一些时间,因此map执行时间最好至少1min不然overhead太高。通常map数量是在10-100/node但是如果cpu-light的话那么可以设置到300左右)
  • The number of map tasks can also be increased manually using the JobConf's conf.setNumMapTasks(int num). This can be used to increase the number of map tasks, but will not set the number below that which Hadoop determines via splitting the input data. (使用API可以增加map数量但是却不能够减少)

Number of Reduces:

  • The right number of reduces seems to be 0.95 or 1.75 * (nodes * mapred.tasktracker.tasks.maximum). At 0.95 all of the reduces can launch immediately and start transfering map outputs as the maps finish. At 1.75 the faster nodes will finish their first round of reduces and launch a second round of reduces doing a much better job of load balancing. (将reduce数量设置在允许同时运行最大reduce数量的0.95/1.75. 0.95可以让map完成之后所有reduce都可以理解启动就传输数据,而1.75的话可以让比较快的节点在第一轮就运算完成,而在第二轮做更好的load-balance)
  • The number of reduces also controls the number of output files in the output directory, but usually that is not important because the next map/reduce step will split them into even smaller splits for the maps. (虽然reduce数量会影响到输出文件的数量,但是通常并不重要)
  • The number of reduce tasks can also be increased in the same way as the map tasks, via JobConf's conf.setNumReduceTasks(int num). (可以设置reduce数目)