从Hadoop streaming到mrjob


#乱炖


2014-04-11

Hadoop允许使用非Java编程语言实现mapreduce,这得益于其streaming机制,其实就是使用了Unix标准流(关于标准流的使用,请参考资料[1])。本文先论述如何基于流自制一个简单的单词计数的MapReducer,然后讨论如何使用Hadoop的streaming,最后介绍一个基于Python的Hadoop工具mrjob。

基于Linux管道的MapReducer

管道的作用是将一个命令的输出导入到另一个命令的输入,例如我们要查看进程nginx的信息,如下:

zsh >> ps -ef | grep 'nginx'
root      1243     1  0 08:44 ?        00:00:00 nginx: master process /usr/sbin/nginx
www-data  1244  1243  0 08:44 ?        00:00:00 nginx: worker process
www-data  1245  1243  0 08:44 ?        00:00:00 nginx: worker process
www-data  1247  1243  0 08:44 ?        00:00:00 nginx: worker process
www-data  1248  1243  0 08:44 ?        00:00:00 nginx: worker process
sunlt     4638  4385  0 09:38 pts/3    00:00:00 grep nginx

ps -ef 通过标准输出在终端里显示所有进程的信息,一行代表一个进程;grep 'nginx'命令用来寻找含有nginx的行并输出,也就是说该命令会过滤掉不含nginx的行。管道符号 | 用于将ps -ef的输出转换为grep 'nginx'的输入,于是得到了进程nginx的信息。

另外,上面命令运行结果的最后一行是grep 'nginx' 这个进程的信息,如果要过滤掉,可以:

zsh >> ps -ef | grep 'nginx' | grep -v 'grep'
root      1243     1  0 08:44 ?        00:00:00 nginx: master process /usr/sbin/nginx
www-data  1244  1243  0 08:44 ?        00:00:00 nginx: worker process
www-data  1245  1243  0 08:44 ?        00:00:00 nginx: worker process
www-data  1247  1243  0 08:44 ?        00:00:00 nginx: worker process
www-data  1248  1243  0 08:44 ?        00:00:00 nginx: worker process

为了通过管道来实现MapReduce方式单词计数,先建立文件mapper.py执行map工作:

import sys
for line in sys.stdin:
    ls = line.split()
    for word in ls:
        if len(word.strip()) != 0:
            print word + ',' + str(1)

建立文件reducer.py执行reduce工作:

import sys
word_dict = {}
for line in sys.stdin:
    ls = line.split(',')
    word_dict.setdefault(ls[0], 0)
    word_dict[ls[0]] += int(ls[1])

for word in word_dict:
    print word, word_dict[word]

现有文件wordcount.input,内容如下:

aaa aa asd
asd
dsa asd

执行下面命令:

zsh >> cat wordcount.input | python mapper.py | python reducer.py

运行结果如下:

aa 1
dsa 1
aaa 1
asd 3

使用Hadoop Streaming

首先参考 Hadoop1.2配置伪分布式 使用Hadoop1.2 配置一个伪分布式集群。 启动集群:start-all.sh 将文件wordcount.input拷贝到hdfs的/下:

$ hadoop dfs -copyFromLocal wordcount.input /

执行下面的命令:

$ hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.2.1.jar \
-input /wordcount.input \
-output /output \
-mapper "python mapper.py" \
-reducer "python reducer.py" \
-file mapper.py \
-file reducer.py

查看结果:

$ hadoop dfs -cat /output/part-00000
Warning: $HADOOP_HOME is deprecated.
 
aa 1	
dsa 1	
aaa 1	
asd 3	

如果是多个输入文件,可以将这些文件放在同一个目录下,例如将wordcount.input和wc.input放在/input目录下,然后执行下面的命令:

$ hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.2.1.jar \
-input /input \
-output /output \
-mapper "python mapper.py" \
-reducer "python reducer.py" \
-file mapper.py \
-file reducer.py

查看结果:

zsh >> hadoop dfs -cat /output/part-00000                                      
Warning: $HADOOP_HOME is deprecated.
 
aa 2	
dsa 2	
aaa 2	
asd 6	

使用mrjob

mrjob使用python编写,在资料[2]中如下介绍mrjob:

mrjob is a framework that assists you in submitting your job to the Hadoop job tracker and in running each individual step under Hadoop Streaming.

安装

应该先配置好环境变量$HADOOP_HOME,然后执行:

sudo pip install mrjob

当前版本是v0.4.2。

编写代码

https://pythonhosted.org/mrjob/guides/quickstart.html下复制以下代码:

from mrjob.job import MRJob
class MRWordFrequencyCount(MRJob):
 
    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1
 
    def reducer(self, key, values):
        yield key, sum(values)
 
if __name__ == '__main__':
    MRWordFrequencyCount.run()

命名为word_count.py。 mrjob有一调试模式,并不真正使用hadoop,而只是模拟:

zsh >> python word_count.py  wordcount.input 

结果如下:

"chars"	20
"lines"	3
"words"	6

而要使用hadoop的话,执行:

python word_count.py  -r hadoop hdfs://localhost:9000/input

犯了一个狗血的错误

按照0.4.2的文档,我复制了一下单词计数的代码,保存为mrjob.py,如下运行之:

python mrjob.py wordcount.input

提示找不到mrjob模块下的job。

代码第一行是:from mrjob.job import MRJob

难为了好大一会,终于找到原因,单词计数程序名和mrjob重名。于是改为wc.py,并删除mrjob.pyc。

python wc.py wordcount.input  

成功运行。

资料

[1] 学习 Linux,101: 流、管道和重定向 http://www.ibm.com/developerworks/cn/linux/l-lpic1-v3-103-4/
[2] https://pythonhosted.org/mrjob/guides/concepts.html



( 本文完 )