我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题。
自定义Writable
Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化。为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型。不过,这些Hadoop内建的类型并不足以支持真实遇到的业务。此时,就需要自定义Writable类,使得它既能够作为Job的Key或者Value,又能体现业务逻辑。
假设我已经从豆瓣抓取了书籍的数据,包括书籍的Title以及读者定义的Tag,并以Json格式存储在文本文件中。现在我希望提取这些数据中我感兴趣的内容,例如指定书籍的Tag列表,包括Tag被标记的次数。这些数据可以作为向量,为后面的数据分析提供基础数据。对于Map,我希望读取Json文件,然后得到每本书的Title,以及对应的单个Tag信息。作为Map的输出,我希望是我自己定义的类型BookTag。它只包括Tag的名称和标记次数:
|
注意,在write()与readFields()方法中,对于String类型的处理完全不同于Int、Long等类型,它需要调用Text的相关静态方法。
针对每本书,Map出来的结果可能包含重复的BookTag信息(指Tag Name相同);而我需要得到每个Tag的标记总和,以作为数据分析的向量。因此,作为Reduce的输入,可以是<Text, Iterable
|
其实,针对这种嵌套了集合的自定义Writable类型,由于嵌套的类型同样实现了Writable接口,因而同样可以调用嵌套类型的write()与readFields()方法,唯一的区别是需要将集合的Size写入到DataOutput中,以便于在读取时可以遍历集合。这实际上是一种Composite模式。
Iterable的奇怪行为
我需要在reduce()方法中,遍历传入的Iterable
|
这里得到的一个经验是,在编写MapReduce程序时,通过调试可以帮助你快速地定位问题。调试时,可以在项目的根目录下建立input文件夹,将数据源文件放入到该文件夹中,然后在调试的参数中设置即可。
如何进行单元测试
我们同样可以给MapReduce Job编写单元测试。除了可以使用Mockito进行Mock之外,我认为MRUnit可以更好地完成对MapReduce任务的验证。MRUnit为Map与Reduce提供了对应的Driver,即MapDriver与ReduceDriver。在编写测试用例时,我们只需要为Driver指定Input与Output,然后执行Driver的runTest()方法,即可测试任务的执行是否符合预期。这种预期是针对output输出的结果而言。以WordCounter为例,编写的单元测试如下:
|
Chaining Job
通过利用Hadoop提供的ChainMapper与ChainReducer,可以较为容易地实现多个Map Job或Reduce Job的链接。例如,我们可以将WordCounter分解为Tokenizer与Upper Case两个Map任务,最后执行Reduce。遗憾的是,ChainMapper与ChainReducer似乎不支持新版本的API,它要链接的Map与Reduce必须派生自MapReduceBase,并实现对应的Mapper或Reducer接口(说明,下面的代码基本上来自于StackOverFlow的一个帖子)。
|
不知道什么时候这种机制能够很好地支持新版的API。