2014-03-22
作为一个优秀的编程模型,MapReduce在大数据处理中有很大的优势,而mongodb也支持这一编程模型,本文通过简单的单词计数示例论述在mongodb中如何使用MapReduce。
笔者使用的2.4.7版本。
单词计数示例:
插入用于单词计数的数据:
db.data.insert({sentence:'Consider the following map-reduce operations on a collection orders that contains documents of the following prototype'})
db.data.insert({sentence:'I get the following error when I follow the code found in this link'})
图个简洁,数据中没有包含标点符号。
在mongo shell写入以下内容:
var map = function() {
split_result = this.sentence.split(" ");
for (var i in split_result) {
var word = split_result[i].replace(/(^\s*)|(\s*$)/g,"").toLowerCase(); //去除了单词两边可能的空格,并将单词转换为小写
if (word.length != 0) {
emit(word, 1);
}
}
}
var reduce = function(key, values) {
return Array.sum(values);
}
db.data.mapReduce(
map,
reduce,
{out:{inline:1}}
)
db.data.mapReduce
的第一和第二个参数分别指定map
和reduce
,map
的输入是集合中的每个文档,通过emit()
生成键值对;而reduce
则处理键的多个值。
mapReduce
的第三个参数指明在内存中进行mapreduce并返回结果,运行结果如下:
{
"results" : [
{
"_id" : "a",
"value" : 1
},
{
"_id" : "code",
"value" : 1
},
{
"_id" : "collection",
"value" : 1
},
{
"_id" : "consider",
"value" : 1
},
{
"_id" : "contains",
"value" : 1
},
{
"_id" : "documents",
"value" : 1
},
{
"_id" : "error",
"value" : 1
},
{
"_id" : "follow",
"value" : 1
},
{
"_id" : "following",
"value" : 3
},
{
"_id" : "found",
"value" : 1
},
{
"_id" : "get",
"value" : 1
},
{
"_id" : "i",
"value" : 2
},
{
"_id" : "in",
"value" : 1
},
{
"_id" : "link",
"value" : 1
},
{
"_id" : "map-reduce",
"value" : 1
},
{
"_id" : "of",
"value" : 1
},
{
"_id" : "on",
"value" : 1
},
{
"_id" : "operations",
"value" : 1
},
{
"_id" : "orders",
"value" : 1
},
{
"_id" : "prototype",
"value" : 1
},
{
"_id" : "that",
"value" : 1
},
{
"_id" : "the",
"value" : 4
},
{
"_id" : "this",
"value" : 1
},
{
"_id" : "when",
"value" : 1
}
],
"timeMillis" : 1,
"counts" : {
"input" : 2,
"emit" : 30,
"reduce" : 3,
"output" : 24
},
"ok" : 1,
}
results
的值是MapReduce的处理结果,timeMillis
指明花费的时间;counts
中input
指明了输入的文档数,emit
指明了在map中调用emit
的次数,reduce
指明了reduce的次数(本例中如果单次次数为1则不需要reduce),output
指明了输出的文档数目。
可以看到,键_id
不再是自动生成,而是被reduce
中的key
取代。当然,也可以将结果输入到一个新的collection中,例如:
db.data.mapReduce( map, reduce, {out:"mr_result"} )
之后查看mr_result
集合中的内容即可:
db.mr_result.find()
也可以使用db.runCommand
执行mapreduce任务,这种方法为开发者提供了更多的选项,具体请见资料[1]。资料[2][3][4]提供了关于mapreduce更全面的内容。资料[5]给出了优化mapreduce任务的方法,资料[6]是资料[5]的一篇中文翻译。
应该注意的是,资料[5]中提到使用ScopedThread()
创建线程,笔者在GUI工具Robomongo的shell中运行 new ScopedThread()时候报错: ReferenceError: ScopedThread is not defined (shell):1
不过在mongo shell中可以正常运行:
> new ScopedThread()
Sat Mar 22 21:32:36.062 Error: need at least one argument at src/mongo/shell/utils.js:101
如果使用其他编程语言管理MongoDB,要用到线程时,应该使用该编程语言内置的线程。
关于mongodb实现的mapreduce,个人觉得如果支持多个MR任务平滑过渡就更好了。
资料
[1] mapReduce [2] Map-Reduce [3] Perform Incremental Map-Reduce [4] Map-Reduce Examples [5] How to speed up MongoDB Map Reduce by 20x [6] MongoDB MapReduce 性能提升20倍的优化宝典