《Mahout算法解析与案例实战》一一3.2 K-Means算法

本节书摘来自华章计算机《Mahout算法解析与案例实战》一书中的第3章,第3.2节,作者:樊 哲,更多章节内容可以访问云栖社区“华章计算机”公众号查看。

3.2 K-Means算法

3.2.1 K-Means算法简介
1967年,James MacQueen提出“K-Means”(K均值),这个被应用得最广泛的基于划分的聚类算法,其实是一种硬聚类算法,属于典型的局域原型的目标函数聚类的代表。算法首先随机选择k个对象,每个对象初始地代表一个簇的平均值或者中心。对于剩余的每个对象,根据其到各个簇中心的距离,把它们分给距离最小的簇中心,然后重新计算每个簇的平均值。重复这个过程,直到聚类准则函数收敛。准则函数一般采用两种方式:其一,全局误差函数,对应式(3-1);其二,前后两次中心误差变化,对应式(3-2)。
《Mahout算法解析与案例实战》一一3.2 K-Means算法

其中E代表误差,k代表k个聚类中心,Si代表k个群组中的一个,ui是群组Si的重心,xj是Si中的任意元素。
《Mahout算法解析与案例实战》一一3.2 K-Means算法

其中E代表误差,k代表k个聚类中心,i代表第i个聚类,uib是群组i的前一次重心,uia是群组i的后一次重心。
K均值算法的工作原理如下:针对包含n个对象的数据集合D以及初始化的聚类数目k,使用下面的算法。
1)从数据集合D中随机选择k个对象作为初始簇中心。
2)根据簇的中心值,把数据集合中的n个对象全部分给最“相似”的簇(这里的“相似”可以根据距离长短来判断)。
3)更新簇的中心值,即重新计算每个簇的中心值。
4)计算准则函数。
5)若准则函数满足阈值则退出,否则返回第2步。
该算法的流程图如图3?10所示。《Mahout算法解析与案例实战》一一3.2 K-Means算法
K-Means聚类算法具有以下优点:①该算法简单、快速,原理利于理解;②对处理大数据集,该算法相对是可伸缩和高效率的,其算法复杂度大约是O (nkt),其中n是所有对象的数目,k是簇的数目,t是迭代的次数;③当数据集满足球状密集性或者团状密集性时,其聚类效果很好。但是这个算法也有一些待改进的地方,比如,事先要确定k值,即要求用户事先知道数据的一些特点,而且该算法经常以局部最优结束,有时很难达到全局最优;该算法对初始聚类的中心比较敏感,对于不同的初始值,其聚类结果也可能有很大的差异;而且该算法只能发现球状簇,其他形状的簇比较难发现;另外,噪声数据对聚类的结果影响也比较大。针对上面种种不足和缺陷,可对该算法提出一些改进,这里就不一一列举了。
3.2.2 Mahout中K-Means算法实现原理
在Mahout中,K-Means算法由两大部分组成:其一,外部的循环,即算法的准则函数不满足时要继续的循环;其二,循环的主体部分,即算法的主要计算过程。Mahout中实现的K-Means算法和上面对应,分别使用KmeansDriver来设置循环,使用KmeansMapper、KmeansReducer(KmeansCombiner设置后算法运行速度会提高)作为算法的主体部分。该算法的输入主要包含两个路径(或者说文件),其中一个是数据的路径,还有一个是初始聚类中心向量的路径,即包含k个聚类中心的文件。这里要求数据都是序列化的文件,同时要求输入数据的key设置为Text(这个应该是没有做硬性要求的),value设置为VectorWritable(这个是硬性要求的,和Canopy Clustering一样)。其实在该算法中可以通过设置参数来自动提取原始数据中的k个值作为初始中心点的路径,当然,如果读者要自己提供初始中心点的文件,也可以通过Canopy算法来得到聚类的中心点作为K-Means算法的初始中心点文件。
该算法在KmeansDriver中通过不断循环使用输入数据和输入中心点来计算输出(这里的输出都定义在一个clusters-N的路径中, N是可变的)。输出同样是序列文件,key是Text类型,value是Cluster类型。该算法的原理图如图3?11所示。

KmeansDriver通过判断算法计算的误差是否达到阈值或者算法循环的次数是否达到给定的最大次数来控制循环。在循环过程中,新的聚类中心文件路径,一般命名为“clusters-N”且被重新计算得到,这个计算结果是根据前一次的中心点和输入数据计算得到的。最后一步,是通过一个KmeansMapper根据最后一次的中心点文件来对输入文件进行分类,计算得到的结果放入到文件名为“clusteredPoints”文件夹中,这次任务没有combiner和Reducer操作。
KmeansMapper在setup函数中读取输入数据,然后根据用户定义的距离计算方法把这些输入放入到最近的聚类中心簇中,输出的key是类的标签,输出的value是类的表示值;KmeansCombiner通过得到Mapper的输出,然后把这些输出进行整合,得到总的输出;KmeansReducer通过设定一个Reducer来进行计算,接收所有的combiner的输出,把相同的key的类的表示值进行整合并输出。这三个类输入/输出的key/value对见表3?1。
《Mahout算法解析与案例实战》一一3.2 K-Means算法
《Mahout算法解析与案例实战》一一3.2 K-Means算法

在通常情况下,combiner的输入和输出的key/value对是一样的。
3.2.3 Mahout的K-Means算法实战
1.输入数据
为了更加清楚地了解数据的逻辑流,这里采用下面的仿造输入数据。

(1)1. txt
8,8
7,7
6.1,6.1
9,9
2,2
1,1
0,0
2.9,2.9
(2)2. txt
8.1,8.1
7.1,7.1
6.2,6.2
7.1,7.1
2.1,2.1
1.1,1.1
0.1,0.1
3.0,3.0

首先需要把输入数据转换为序列文件,这里使用代码清单 3?1处理即可,处理完输入数据后,还需要对输入的中心点文件进行设置。这里有如下三种设置方式:
直接使用参数进行设置,程序自动产生中心文件。
编写Job任务,直接对中心文本文件进行转换。
调用Hadoop中的接口,对中心文本文件进行转换。
第一种方式是这次实战使用的方式,这种方式读者只需要进行相关参数的设置即可完成,方便快捷;第二种和第三种方式对读者的编程要求比较高,只关心如何调用此算法的读者可以直接忽略。
第二种方式是通过编写Job任务的方式把原始的输入数据转为key是Text(这个格式对K-Means算法的程序影响不大)、value是ClusterWritable的序列文件,这和代码清单 3?1有些类似,如代码清单 3?3所示。
代码清单3?3 Text转换为ClusterWritable程序代码

package mahout.fansy.utils.transform;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.clustering.canopy.Canopy;
import org.apache.mahout.clustering.iterator.ClusterWritable;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
public class Text2Cluster extends AbstractJob {

      @Override
      public int run(String[] args) throws Exception {
           addInputOption();
           addOutputOption();
           if (parseArguments(args) == null) {
               ???return -1;
           }
           Path input=getInputPath();
           Path output=getOutputPath();
           Configuration conf=getConf();
           // set job information
           Job job=new Job(conf,"text2cluster with input:"+input.getName());
           job.setOutputFormatClass(SequenceFileOutputFormat.class);
           job.setMapperClass(Text2ClusterMapper.class);
           job.setMapOutputKeyClass(LongWritable.class);
           job.setMapOutputValueClass(ClusterWritable.class);
           job.setReducerClass(Text2ClusterReducer.class);
           job.setOutputKeyClass(LongWritable.class);
           job.setOutputValueClass(ClusterWritable.class);
           job.setJarByClass(Text2Cluster.class);     
           FileInputFormat.addInputPath(job, input);
           SequenceFileOutputFormat.setOutputPath(job, output);
           if (!job.waitForCompletion(true)) { // wait for the job is done
                 throw new InterruptedException("text2cluster Job failed processing " + input);
              }
           ???return 0;
    }
    /**
     * @param args
     * @throws Exception 
     */
    public static void main(String[] args) throws Exception {
         ToolRunner.run(new Configuration(), new Text2Cluster(),args);
    }
    /**
     * Mapper :main procedure 
     * @author fansy
     *
     */
    private static  int center=1;
    public static class Text2ClusterMapper extends Mapper<LongWritable,Text,
LongWritable,ClusterWritable>{
    public void map(LongWritable key,Text value,Context context)throws IOException, InterruptedException{
        ?String[] str=value.toString().split(","); // split data use one or more blanker
         Vector vector=new RandomAccessSparseVector(str.length);
         for(int i=0;i<str.length;i++){
              vector.set(i, Double.parseDouble(str[i]));
         }
              ClusterWritable val=new ClusterWritable();
              Canopy ca=new Canopy(vector,center++,new EuclideanDistanceMeasure());
              val.setValue(ca);
              context.write(key, val);
         }
      ??}
    /**
     * Reducer: do nothing but output
     * @author fansy
     * 
     */
    public static class Text2ClusterReducer extends Reducer<LongWritable,ClusterWritable, LongWritable,ClusterWritable>{
    public void reduce(LongWritable key,Iterable<ClusterWritable> values,Context context)throws IOException,InterruptedException{
         for(ClusterWritable v:values){
              context.write(key, v);
         }
      ??}
    }
}
第三种方式是直接调用Hadoop中已有的接口来对HDFS上的文件进行操作,这种方式需要对Hadoop中已经实现的一些文件操作比较熟悉。具体的实现如代码清单 3?4所示。
代码清单3?4 Hadoop接口实现文件转换
/**
     * 把一个double数组写入sequence file,格式为 ClusterWritable
     * @param center double数组
     * @param output 输出路径
     * @throws IOException 
     */
    ??public static void writeClusterWritable(String output,double[][] center) throws IOException{
          Configuration conf =new Configuration();
          conf.set("mapred.job.tracker", "ubuntu:9001");
          FileSystem fs=FileSystem.get(URI.create(output),conf);
          Path path=new Path(output);
          Text key =new Text();
          ClusterWritable value=new ClusterWritable();
          SequenceFile.Writer writer = null;  
          try {  
               writer = SequenceFile.createWriter(fs, conf, path,  
                         key.getClass(), value.getClass(),SequenceFile.Compressi-onType.NONE); 
               for (int i = 0; i < center.length; i++) {  
                     key.set("C-"+String.valueOf(i));  

                     Vector v=new RandomAccessSparseVector(center[0].length);
                     for(int j=0;j<center[0].length;j++){
                          v.set(j, i*4+Math.random());
                     }
                     Canopy canopy=new Canopy(v,i,new ManhattanDistanceMeasure());
                     value.setValue(canopy);  
                     writer.append(key, value);  
               }
           } finally {  
               IOUtils.closeStream(writer);  
           }
    }

通过这三种方式转换了输入中心点文件后,可以不先运行K-Means算法,可以使用代码清单3?2进行验证,通过把序列文件可视化(即转换为文本)来确定是否确实把输入文件转换为了聚类中心的序列文件。当然也可以通过Hadoop的接口直接读取HDFS上的文件,主要的代码如代码清单3?5所示。
代码清单3?5 读取聚类中心文件

/**
     * 把序列文件读入到一个变量中
     * @param clusterPath 序列文件
     * @param conf  Configuration
     * @return  序列文件读取的变量
     */
    public static List<Cluster> readClusterWritable(Path clusterPath,Configuration conf){
         List<Cluster> clusters = new ArrayList<Cluster>();
        ???for (Writable value : new SequenceFileDirValueIterable<Writable>(clusterPath, PathType.LIST,
                PathFilters.partFilter(), conf)) {
              Class<? extends Writable> valueClass = value.getClass();
              if (valueClass.equals(ClusterWritable.class)) {
                  ClusterWritable clusterWritable = (ClusterWritable) value;
                  value = clusterWritable.getValue();
                  valueClass = value.getClass();
              }
              if (valueClass.equals(Kluster.class)) {
                 // get the cluster info
                 clusters.add((Kluster) value);
              } else if (valueClass.equals(Canopy.class)) {
                 // get the cluster info
                 Canopy canopy = (Canopy) value;
              ????clusters.add(new Kluster(canopy.getCenter(), canopy.getId(), canopy.getMeasure()));
        ?} else {
               throw new IllegalStateException("Bad value class: " + valueClass);
               }
             }
        return clusters;
    }

后面两种方式这里只作为扩展,有兴趣的读者可以进行相关内容的实战。本书使用的是第一种方式,即直接设置参数的方式来使算法自动生成聚类中心文件,具体方式参考下文。
2.参数意义
K-Means算法在Mahout中的使用方式如下:
usage: kmeans [Generic Options] [Job-Specific Options]
其中[]包含的两个参数是可选的。
Generic Options参数选项和Canopy算法一样,这里不再介绍,可直接参考Canopy算法的相应章节即可。
Job-Specific Options参数选项包含:
--input (-i ) input:任务的输入文件选项,必选。
--output (-o) output:任务的输出文件的选项,必选。
--distanceMeasure (-dm) distanceMeasure:距离计算的类名称,默认为Square-Euclidean,即欧氏距离平方,可选。
--clusters (-c) clusters:初始聚类中心点文件路径,其包含的必须是序列文件,如果k参数被设置,则此路径上面的数据将被重写,必选。
--numClusters (-k) k:聚类中心的数目,如果选择,则聚类中心路径被重写,可选。
--convergenceDelta (-cd) convergenceDelta:判断退出循环的阈值,默认是0.5,可选。
--maxIter (-x) maxIter:最大的循环次数,必选。
--overwrite (-ow):如果出现,则对输出路径进行重写,可选。
--clustering (-cl):如果出现,则对数据进行分类,可选。
--method (-xm) method:选择使用的计算方式,单机或者集群,默认为集群,可选。
--outlierThreshold (-outlierThreshold) outlierThreshold:异常值阈值,可选。
--help (-h):打印此参数帮助信息,可选。
--tempDir tempDir:临时文件所存放的地方,可选。
--startPhase startPhase:开始要运行算法的阶段,可选。
--endPhase endPhase:最后要运行算法的阶段,可选。
其中k的值可选可不选,这里的实战设置了此参数。设置此参数后可以不用再进行聚类中心的配置。最大的聚类次数是一定要选择的,不然当聚类循环的阈值没有达到的时候就会一直进行循环。
3.运行
首先把输入文件上传到HDFS文件系统上,使用下面的命令:

$HADOOP_HOME/bin/hadoop fs –copyFromLocal ../../mahout_data/k1.txt ../../mahout_data/k2.txt input/

这里上传的数据是文本格式的,要把它们转换为序列文件,参考代码清单3-1,运行下面的命令,进行文件格式转换:

$HADOOP_HOME/bin/hadoop jar ../../mahout_jar/CluserUtils.jar mahout.fansy.utils.transform.Text2VectorWritable –i input –o input/transform

使用下面的命令运行K-Means算法:

$MAHOUT_HOME/bin /mahout kmeans -i input/transform/part-r-00000 -o output -c input/center -k 2 -x 5 –cl

上面各个参数的意义表示为:-i设置输入数据路径,-o为输出路径,-c为初始输入聚类中心点,-k表示聚类的数目,-x表示最大的循环次数,-cl表示算法完成后进行原始数据的分类。
4.结果分析
在运行完上面的命令后,可以在终端查看到全部的信息,主要信息显示如图3?12所示。
《Mahout算法解析与案例实战》一一3.2 K-Means算法

图3?12 K-Means算法运行结果信息
从图3?12可以看出,循环主体进行了两次,输出路径分别是output/clusters-0、output/clusters-1、output/c?lusters-2,(这里主体循环只运行了3次,而最大的循环次数设置为5,此处算法达到了准则函数的阈值了,直接退出循环)最后一个任务是进行分类数据,所有的任务一共运行了168645 ms。在文件监控系统可以看到输出的文件如图3?13所示。
《Mahout算法解析与案例实战》一一3.2 K-Means算法

图3?13 K-Means算法输出HDFS文件的监控界面
因为设置的k值为2,所以在clusters-0中有两个聚类中心文件,从代码清单 3?5中可以看到:这里初始化的聚类中心分别是[2.0,2.0]、[2.1,2.1],这样的初始聚类中心不是很好; clusters-1和clusters-2的输出分别为[1.033,1.033]、[5.725,5.725],[1.525,1.525]、[7.325,7.325];最后的聚类中心即是[1.525,1.525]、[7.325,7.325],然后使用此聚类中心对原始数据进行分类,得到clusteredPoints文件夹下面的文件。
3.2.4 K-Means算法小结
本节首先对K-Means算法进行了一个简单的介绍,并详细分析了该算法在Mahout中的实现原理,方便读者理解该算法的“云化”方法。最后,通过一个实际的例子进行了实战,详细演示了如何调用该算法:包括前期的数据准备、算法调用命令、算法计算结果的分析等。并且为了使读者对该算法的数据逻辑流有更深刻的认识,特意仿制了原始数据,相信通过这样的介绍,读者可以对该算法如何被调用有一个清晰的认识。本节的代码清单 3?3、代码清单3?5都是把计算结果的聚类中心可视化的代码,具有一定的通用性,读者可以详细分析以提高自己对Hadoop云平台的文件操作,即HDFS操作的熟练性。

上一篇:【Movidius发布最新一代VPU】集成DNN加速器,每秒超过1万亿次运算


下一篇:python 开发框架