聚类概述以及在Spark中运行Kmeans聚类

最近在做model的前期工作,聚类, 因为聚类结果要用于下一步的推荐模型当中,所以学习了一下几种常见的聚类算法。这里只写下三种,一种是最简单的Kmeans聚类算法, 另一种是层次聚类算法, 最后一种则算是两种算法的结合,二分K均值聚类。

  1. Kmeans聚类
    machine learning的主要问题可以分为分类以及聚类问题。 分类是有监督的学习, 而聚类是没有监督的学习, 换而言之, 分类问题是训练一个有类标的训练集,进而对测试集进行预测, 而聚类问题没有类标,我们只需要知道如何计算相似度就可以了,所以通常情况下clustering是不需要使用训练数据进行学习的。组内的相似性越好, 组间差别越大, 那么聚类就越好。
    Kmeans是一个非常经典的简单算法,也算得上是最简单的一种。以下为一个简单的例子。来源于维基百科:K-means_clustering
    K initial means

    1. k initial “means” (in this case k=3) are randomly generated within the data domain (shown in color).
      k clusters created
    2. k clusters are created by associating every observation with the nearest mean. The partitions here represent the Voronoi diagram generated by the means.
      new mean
    3. The centroid of each of the k clusters becomes the new mean.
      clusters finished
    4. Steps 2 and 3 are repeated until convergence has been reached.

      算法

    5. 选择K个点作为初始质心
    6. repeat
    7. 将每个点指派到最近的质心
    8. 重新计算每个簇的质心
    9. until 质心不发生变化

      关于求簇中心的方法,常用的有欧拉距离和cosine:

    10. 欧拉距离
      $$d{ij}=\sqrt{\sum{k=1}^n(x{ik} - x{jk})^2}$$
    11. cosine(用于document中用于计算相似度)
      目标函数
      通常情况采用的都是SSE(summary of the Squared Error)作为度量聚类质量的目标函数
      也就是满足下面这个式子
      $$arg minS \sum{i=1}^k \sum_{x\in S_i}|| x- \it\mu_i||^2$$
      其中$\mu_i$是每个簇所有点的均值, 也就是质心。

      关于选定K个中心点的初值,通常是针对具体问题有一些启发式的选取方法,或者随机选取然后跑多次取最好的一个结果。

  2. run kmeans on spark
    spark社区越来越活跃, 最近也一直在学习spark, 感觉看别人写的example都非常一目了然, 而自己写的时候就是一头包。
    其实spark中给出了一些已有的机器学习算法的实现方式,简单的贴一个用spark跑Kmeans的实例代码.

    1. 引入SparkConf以及Kmeans方法

      1
      2
      3
      import org.apache.spark.mllib.clustering.KMeans
      import org.apache.spark.mllib.linalg.Vectors
      import org.apache.spark.{SparkConf, SparkContext}
    2. 设置具体参数以及解析数据并缓存(因为数据挺大的, 有170W用户,为了便于使用包括数据源以及聚类数目,迭代次数全部采用参数形式从命令行中读取)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      val sparkConf = new SparkConf().setAppName("KmeansAuthors").setMaster("local[4]")
      val sc = new SparkContext(sparkConf)
      // 从hdfs上读取数据
      // val data = sc.textFile("hdfs://localhost:9000/input/feature_1109_2.csv")
      val data = sc.textFile(args(0))
      val parsedData = data.map(s => Vectors.dense(s.split(',').map(_.toDouble))).cache()
      // Cluster the data into two classes using KMeans
      val numClusters = args(1).toInt
      val numIterations = args(2).toInt
      val clusters = KMeans.train(parsedData, numClusters, numIterations)
    3. 聚类并评估结果

      1
      2
      3
      4
      5
      val clusterCenters = clusters.clusterCenters
      val labels = clusters.predict(parsedData)
      labels.saveAsTextFile(args(3))
      val WSSSE = clusters.computeCost(parsedData)
      println("Within Set Sum of Squared Errors = " + WSSSE)
    4. 完整代码见我的github中:KmeansExample
      之后打包成jar包提交到spark集群即可。
      贴一下我的提交命令:

      1
      ./spark-submit --master spark://sparkmaster:7077 --class KmeansExample --driver-memory 4g --executor-memory 8g /home/open/KmeansExample/out/artifacts/KmeansExample_jar/KmeansExample.jar hdfs://localhost:9000/input/feature_1109_2.csv 10 2000 hdfs://localhost:9000/input/kmeansResult2
  3. 层次聚类
    与Kmeans不同,层次聚类得到的结果是一棵树, 可以在任意的地方切一刀进而得到想要的结果, 用在模型中也是因为不想和评审们解释太多。有两种产生层次聚类的基本方法:

    1. 自顶向下
      从包含所有点的某个簇开始, 每一步分裂一个簇, 直至剩下单点簇, 在这种情况下, 我们需要决定每一步分裂哪一个簇, 以及如何分裂。
    2. 自底向上,从点作为个体簇开始,每一步合并最近的簇。因此需要定义簇的邻近性概念。
      对于自底向上方法来说,那么其实最主要的是一个问题, 如何计算两个簇之间的距离?
      通常有三个做法:
    3. MAX

      不同簇最远的两个点的距离定义为这两个簇的距离

    4. MIN

      不同簇最近的两个点的距离定义为这两个簇的距离

    5. Average

      所有点对的平均距离

  4. 二分k均值(bisecting k-means)算法
    二分k均值(bisecting k-means)算法的主要思想是:首先将所有点作为一个簇,然后将该簇一分为二。之后选择能最大程度降低聚类代价函数(也就是误差平方和)的簇划分为两个簇。以此进行下去,直到簇的数目等于用户给定的数目k为止。
    原理很简单。这里也不再赘述了。考虑再三,准备把这个算法用到自己的model里来进行聚类。
    这里有一个spark库, bisecting Kmeans
    我forked到了本地准备进一步尝试, 如无意外,稍后会给出应用的方法。

  1. 参考文章:
    1. 漫谈 Clustering (1): k-means
    2. K-means聚类算法