2014-11-19
先说基本思路,再说实现。
如何用MapReduce的思路实现Kmeans
这个思路参考自下面这篇论文:
Zhao W, Ma H, He Q. Parallel k-means clustering based on mapreduce[M]//Cloud Computing. Springer Berlin Heidelberg, 2009: 674-679.
对于有n个对象的数据集,如果要聚成k类,Kmeans的基本思路是:
**1、**首先从这n个对象中随机选择k个对象作为初始的k个簇的中心(就叫做“簇心”吧);
**2、**然后将其余的对象分到最近的簇心,如此k个簇就出来了;
**3、**之后对每个簇,求新的簇心(基本方法是将属于该簇的所有点放在一起求平均值);
**4、**重复2、3步(一轮2、3步,可以叫做一次迭代
),直到所有的簇心基本不再变化,或者达到指定的重复次数;
**5、**和第2步相同。
从上面的步骤中,可以看出在每次迭代中,计算距离这一操作需要执行n*k
次;很容易看出,对一个对象求最近簇心并不受对另外一个对象求最近簇心的影响,所以这一步是可以并行的。由于每次迭代要用到的簇心是上一次迭代的结果,所以些迭代是串行的关系。
基于上面的分析,现在可以考虑如何实现kmeans的MapReduce版本了,下面是一次迭代中每个Task的伪代码实现。
Map
Map Task的输入数据以SequenceFile的形式存储在HDFS中,这种文件以<key, value>这种键值对的形式存储每个样本。key是这个样本的存储位置在该数据文件中的偏移量(这个key没什么作用),value是字符串,是这个样本的内容。SequenceFile类型的文件是可拆分的,这意味着这些样本可以拆开并映射到多个Map Task中。每个个Map Task获取当前的簇心,计算每个样本最近的簇心。Map Task的输出中,值是一个样本,键是该样本对应的簇心的标号。
**Input:** Global variable centers, the offset key, the sample value
**Output:** <key’, value’> pair, where the key’ is the index of the closest center point and value’ is a string comprise of sample information
1. Construct the sample instance from value;
2. minDis = Double.MAX VALUE ;
3. index = -1;
4. For i=0 to centers.length do
dis= ComputeDist(instance, centers[i]);
If dis < minDis {
minDis = dis;
index = i;
}
5. End For
6. Take index as key’;
7. Construct value’ as a string comprise of the values of different dimensions;
8. output < key , value > pair;
9. End
Combine
Combine Task是一个小型的Reduce Task,其输入是Map Task的输出。Combine Task的输出中,键还是簇心的标号,值由两部分组成,一个是同属于该簇心的所有样本的和,另外一个是这些样本的个数。
**Input:** key is the index of the cluster, V is the list of the samples assigned to the same cluster
**Output:** < key , value > pair, where the key’ is the index of the cluster, value’ is a string comprised of sum of the samples in the same cluster and the sample number
1. Initialize one array to record the sum of value of each dimensions of the samples contained in the same cluster, i.e. the samples in the list V ;
2. Initialize a counter num as 0 to record the sum of sample number in the same cluster;
3. while(V.hasNext()){
Construct the sample instance from V.next();
Add the values of different dimensions of instance to the array
num++;
4. }
5. Take key as key’;
6. Construct value’ as a string comprised of the sum values of different dimensions and num;
7. output < key , value > pair;
8. End
Reduce
**Input: **key is the index of the cluster, V is the list of the partial sums from different host
**Output:** < key , value > pair, where the key’ is the index of the cluster, value’ is a string repre-
senting the new center
1. Initialize one array record the sum of value of each dimensions of the samples contained in the
same cluster, e.g. the samples in the list V ;
2. Initialize a counter NUM as 0 to record the sum of sample number in the same cluster;
3. while(V.hasNext()){
Construct the sample instance from V.next();
Add the values of different dimensions of instance to the array
NUM += num;
4. }
5. Divide the entries of the array by NUM to get the new center’s coordinates; // 求簇心
6. Take key as key’;
7. Construct value’ as a string comprise of the center ’s coordinates;
8. output < key , value > pair;
9. End
编程实现Kmeans
Hadoop k-means 算法实现中贴出了kmeans实现的源码,原理和上面讲述的相同。
不过有一些值得注意的地方。
Kmeans包含了很多了顺序执行的MapReduce job,每次MapReduce job会产生新的簇心的信息并保存到HDFS中,这些新的簇心通过org.apache.hadoop.filecache.DistributedCache
共享到下一次MapReduce job的Map Task中。
关于org.apache.hadoop.filecache.DistributedCache
,可以参考: 如何使用Hadoop的DistributedCache 、Hadoop DistributedCache详解,不过该类(至少)在Hadoop 2.4中已经被弃用了,新的API可以参考stackoverflow的这个提问:Hadoop DistributedCache is deprecated - what is the preferred API?。