2014-11-17
关于hadoop2.4的配置请参考 hadoop2.4.1单机伪分布式安装和配置。
现在有两个文本文件t1.txt
和t2.txt
,内容分别是:
t1.txt:
Sign up for GitHub. By clicking "Sign up for GitHub", you agree to our terms of service and privacy policy. We will send you account related emails occasionally
t2.txt:
and and you
问题1是,如何对t1.txt中的单词进行计数。
问题2是,如何对t1.txt和t2.txt中的单词进行计数。
创建目录并将文本文件放入HDFS
zsh >> $HADOOP_PREFIX/bin/hadoop fs -mkdir /input/
zsh >> $HADOOP_PREFIX/bin/hadoop fs -put t1.txt /input
zsh >> $HADOOP_PREFIX/bin/hadoop fs -put t2.txt /input
对t1.txt中的单词进行计数
打开eclipse,创建项目WordCount,导入hadoop-2.4.1/share/hadoop/common/hadoop-common-2.4.1.jar
和hadoop-2.4.1/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.4.1.jar
,创建WordCount.java,将hadoop自带的wordcount源码粘贴进去并略做修改:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
System.out.println("start...");
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/input/t1.txt"));
FileOutputFormat.setOutputPath(job, new Path("/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
下面解释一下这段代码。 TokenizerMapper类继承自Mapper<Object, Text, Text, IntWritable>,其原型如下:
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends Object
KEYIN
是map的输入数据的键的类型,VALUEIN
是map的输入数据的值的类型,KEYOUT
是map处理后输出的结果中键的类型,VALUEOUT
是map处理后输出的结果中值的类型。
TokenizerMapper类的map函数中使用了StringTokenizer类,其根据空格、换行符等对一段文本进行拆分,功能比较简单(也不够实用),下面是一个例子:
import java.util.StringTokenizer;
public class Test {
public static void main(String[] args) {
StringTokenizer itr = new StringTokenizer("hello world! hi\nhadoop");
while (itr.hasMoreTokens()) {
System.out.println(itr.nextToken());
}
}
}
运行结果如下:
hello
world!
hi
hadoop
IntSumReducer类以及main()函数就不介绍了。 有一点要注意,在main()函数中,设置了输入的文件是/input/t1.txt
,MapReduce结果放入/output
目录中。
将该项目导出为WordCount.jar
包后,执行:
$HADOOP_PREFIX/bin/hadoop jar WordCount.jar WordCount
运行完毕,查看reduce后的结果:
zsh >> $HADOOP_PREFIX/bin/hadoop fs -cat /output/part-r-00000
"Sign 1
By 1
GitHub", 1
GitHub. 1
Sign 1
We 1
account 1
agree 1
and 1
clicking 1
emails 1
for 2
occasionally 1
of 1
our 1
policy. 1
privacy 1
related 1
send 1
service 1
terms 1
to 1
up 2
will 1
you 2
结果正确。标点符号的混入以及单词大小写的问题可以根据需要完善一下。
对t1.txt和t2.txt中的单词进行计数
先把/output
目录删除了:
$HADOOP_PREFIX/bin/hadoop fs -rmr /output
将上面的WordCount.java
中main()
函数中的
FileInputFormat.addInputPath(job, new Path("/input/t1.txt"));
替换为:
FileInputFormat.addInputPath(job, new Path("/input/t1.txt"));
FileInputFormat.addInputPath(job, new Path("/input/t2.txt"));
或者替换为:
FileInputFormat.setInputPaths(job, "/input");
MapReduce的结果如下:
zsh >> $HADOOP_PREFIX/bin/hadoop fs -cat /output/part-r-00000
"Sign 1
By 1
GitHub", 1
GitHub. 1
Sign 1
We 1
account 1
agree 1
and 3
clicking 1
emails 1
for 2
occasionally 1
of 1
our 1
policy. 1
privacy 1
related 1
send 1
service 1
terms 1
to 1
up 2
will 1
you 3
and
和you
出现的次数变成了3,结果正确。
关于FileInputFormat
,具体可查看官方文档:Class FileInputFormat<K,V>。