本文简单介绍DBSCAN算法的原理及实现。
DBSCAN算法原理
基本概念
DBSCAN(Density-Based Spatial Clustering of Applications with Noise)是一种基于密度的空间聚类算法。该算法将具有足够密度的区域划分为簇,并在具有噪声的空间数据库中发现任意形状的簇,它将簇定义为密度相连的点的最大集合。
假设样本集为 D = ( x 1 , x 2 , . . . , x m ) D=(x_1, x_2, ..., x_m) D=(x1,x2,...,xm),DBSCAN算法有如下相关定义:
- ϵ \epsilon ϵ-邻域:对于 x j ∈ D x_j∈D xj∈D,其 ϵ \epsilon ϵ-邻域包含样本集 D D D中与 x j x_j xj的距离不大于 ϵ \epsilon ϵ的子样本集,即 N ϵ ( x j ) = { x i ∈ D ∣ d i s t a n c e ( x i , x j ) ⩽ ϵ } , N_{\epsilon}(x_j)= \{x_i∈D|distance(x_i,x_j) \leqslant \epsilon \}, Nϵ(xj)={xi∈D∣distance(xi,xj)⩽ϵ}, 这个子样本集的个数记为 ∣ N ϵ ( x j ) ∣ |N_{\epsilon}(x_j)| ∣Nϵ(xj)∣
- 核心点:对于任一样本 x j ∈ D x_j∈D xj∈D,如果其ϵϵ-邻域对应的 N ϵ ( x j ) N_{\epsilon}(x_j) Nϵ(xj)至少包含 M i n P t s MinPts MinPts个样本,即如果 ∣ N ϵ ( x j ) ∣ ≥ M i n P t s |N_{\epsilon}(x_j)|≥MinPts ∣Nϵ(xj)∣≥MinPts,则 x j x_j xj是核心对象。
- 密度直达:如果 x i x_i xi位于 x j x_j xj的 ϵ \epsilon ϵ-邻域中,且 x j x_j xj是核心对象,则称 x i x_i xi由 x j x_j xj密度直达。
- 密度可达:对于 x i x_i xi和 x j x_j xj,如果存在样本样本序列 p 1 , p 2 , . . . , p T p_1,p_2,...,p_T p1,p2,...,pT,满足 p 1 = x i , p T = x j p_1=x_i,pT=x_j p1=xi,pT=xj, 且 p t + 1 p_{t+1} pt+1由 p t p_t pt密度直达,则称 x j x_j xj由 x i x_i xi密度可达。
- 密度相连:对于 x i x_i xi和 x j x_j xj,如果存在核心对象样本 x k x_k xk,使 x i x_i xi和 x j x_j xj均由 x k x_k xk密度可达,则称 x i x_i xi和 x j x_j xj密度相连。
注意:
- 密度直达是非对称的,若 x i x_i xi由 x j x_j xj密度直达,只有当 x i x_i xi也为核心对象时,才有 x j x_j xj由 x i x_i xi密度直达
- 密度可达是密度直达的传递闭包,是非对称的
- 密度相连是对称的
算法原理
总体思路
- 遍历每一个未处理的点
- 如果为核心点,找出所有从该点密度可达的点,形成一个簇
- 否则,标记后遍历下一个点
详细算法
算法1
输入:样本集 D = ( x 1 , x 2 , . . . , x m ) D=(x_1, x_2, ..., x_m) D=(x1,x2,...,xm),参数 ( ϵ , M i n P t s ) (\epsilon, MinPts) (ϵ,MinPts)
输出:核心点集合 Ω \Omega Ω
- 初始化核心对象集合 Ω = ∅ \Omega = \emptyset Ω=∅
- 对于所有点
x
j
x_j
xj:
- 找到样本 x j x_j xj的 ϵ \epsilon ϵ-邻域子样本集 N ϵ ( x j ) N_{\epsilon}(x_j) Nϵ(xj)
- 如果子样本集合个数满足 ∣ N ϵ ( x j ) ∣ ≥ M i n P t s |N_{\epsilon}(x_j)|≥MinPts ∣Nϵ(xj)∣≥MinPts,将样本 x j x_j xj加入核心对象样本集合 Ω = Ω ∪ { x j } \Omega = \Omega \cup \{ x_j \} Ω=Ω∪{xj}
算法2
输入:样本集 D = ( x 1 , x 2 , . . . , x m ) D=(x_1, x_2, ..., x_m) D=(x1,x2,...,xm),参数 ( ϵ , M i n P t s ) (\epsilon, MinPts) (ϵ,MinPts)
输出:簇划分结果 C = { C 1 , C 2 , . . . , C k } C=\{ C_1, C_2, ..., C_k \} C={C1,C2,...,Ck}
- 执行算法1,找到所有核心点 Ω \Omega Ω
- 初始化簇类别 k = 0 k=0 k=0, 未访问样本集合 Γ = D \Gamma = D Γ=D, 簇划分结果 C = ∅ C=\emptyset C=∅
- 如果核心点集合 Ω \Omega Ω不为空,则继续执行,否则算法结束
- 在核心对象集合 Ω \Omega Ω中,随机选择一个核心对象 o o o,初始化当前簇核心对象队列 Ω c u r = { o } Ω_{cur}=\{o\} Ωcur={o}, 初始化类别序号 k = k + 1 k=k+1 k=k+1,初始化当前簇样本集合 C k = { o } C_k=\{o\} Ck={o}, 更新未访问样本集合 Γ = Γ − o Γ=Γ−{o} Γ=Γ−o
- Repeat
- 如果当前簇核心对象集合 Ω c u r ≠ ∅ Ω_{cur}\ne∅ Ωcur=∅,在 Ω c u r Ω_{cur} Ωcur中取出一个核心对象 o ′ o′ o′,通过邻域距离阈值 ϵ ϵ ϵ找出所有的 ϵ ϵ ϵ-邻域子样本集 N ϵ ( o ′ ) N_ϵ(o′) Nϵ(o′),令 Δ = N ϵ ( o ′ ) ∩ Γ Δ=N_ϵ(o′)∩Γ Δ=Nϵ(o′)∩Γ, 更新当前簇样本集合 C k = C k ∪ Δ C_k=C_k∪Δ Ck=Ck∪Δ, 更新未访问样本集合 Γ = Γ − Δ Γ=Γ−Δ Γ=Γ−Δ, 更新 Ω c u r = Ω c u r ∪ ( Δ ∩ Ω ) − o ′ Ω_{cur}=Ω_{cur}∪(Δ∩Ω)−o′ Ωcur=Ωcur∪(Δ∩Ω)−o′,更新核心对象集合 Ω = Ω − C k Ω=Ω−C_k Ω=Ω−Ck
- 如果当前簇核心对象队列 Ω c u r = ∅ Ω_{cur}=∅ Ωcur=∅,则当前聚类簇 C k C_k Ck生成完毕, 更新簇划分 C = C ∪ C k C=C \cup C_k C=C∪Ck ,转入步骤3。
DBSCAN算法实现
Sklearn DBSCAN
尝试使用sklearn中的DBSCAN算法。
# all import
from sklearn import datasets
import matplotlib.pyplot as plt
from sklearn.cluster import DBSCAN
import numpy as np
import time
# generate data
n_samples = 1500
noisy_circles = datasets.make_circles(n_samples=n_samples, factor=.5,noise=.05)
X, y = noisy_circles
# plot
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
# plt.figure(1)
plt.scatter(X[:, 0], X[:, 1])
plt.subplot(1, 2, 2)
# plt.figure(2)
plt.scatter(X[:, 0], X[:, 1], c=y)
上图是sklearn生成的数据,右图是根据label直接绘制的结果。
下面使用sklearn中的DBSCAN类对数据X进行聚类。
# run sklearn DBSCAN to predict
y_pred = DBSCAN(eps=0.1).fit_predict(X)
# plot sklearn DBSCAN result
plt.figure(figsize=(5, 5))
plt.scatter(X[:, 0], X[:, 1], c=y_pred)
My DBSCAN
此部分代码为笔者依据上述伪代码实现,未经过优化
# my DBSCAN algorithm
from sklearn.metrics.pairwise import pairwise_distances
class MyDBSCAN:
def __init__(self, X, eps=0.5, minPts=5):
"""
@param:
X: input dataset, (n, m)
eps:
minPts:
"""
self.X = X
self.n = len(X)
self.eps = eps
self.minPts = minPts
def get_core_points(self):
"""
@return:
core_points: the index of core points
"""
core_points = []
for i in range(self.n):
neighbors = self.get_neighbors(i)
if len(neighbors) >= self.minPts:
core_points.append(i)
return core_points
def get_neighbors(self, point_idx):
"""
@param:
point_idx: index of point
@return:
neighbors: neighbors of point
"""
neighbors = []
for i in range(self.n):
if self.dis_mat[point_idx][i] < self.eps:
neighbors.append(i)
return neighbors
def compute_dis_mat(self):
"""
@param:
X: input dataset, (n, m)
@return:
self.dis_mat: (n, n)
"""
self.dis_mat = pairwise_distances(X)
return
def random_choose(self, points):
"""
random choose a point
"""
# return points[0]
return points[np.random.randint(0, len(points))]
def fit_predict(self):
"""
@param:
X: input dataset, (n, m)
@return:
y_pred: predicted result, (n,)
"""
# computer distance matrix
self.compute_dis_mat()
core_points = set(self.get_core_points())
waiting_points = set([i for i in range(len(self.X))])
cluster = -1 * np.ones(self.n, dtype=np.int32)
k = 0
while len(core_points) > 0:
o = self.random_choose(list(core_points))
cur_core_points = set()
cur_core_points.add(o)
cur_cluster = set()
cur_cluster.add(o)
waiting_points.remove(o)
while True:
if len(cur_core_points) > 0:
point = list(cur_core_points)[0]
neighbors = set(self.get_neighbors(point))
delta = neighbors & waiting_points
cur_cluster = cur_cluster | delta
waiting_points = waiting_points - delta
cur_core_points = cur_core_points | (delta & core_points)
cur_core_points.remove(point)
core_points = core_points - cur_cluster
else:
cluster[list(cur_cluster)] = k
k += 1
break
return cluster
将其用到sklearn生成的数据集上测试效果:
n_samples = 500
noisy_circles = datasets.make_circles(n_samples=n_samples, factor=.5,
noise=.05)
X, y = noisy_circles
start = time.time()
y_my_pred = MyDBSCAN(X=X, eps=0.15).fit_predict()
end1 = time.time()
y_pred = DBSCAN(eps=0.15).fit_predict(X)
end2 = time.time()
print('{:15} {:5} {:3}'.format('Algorithm', 'Time', 'NMI'))
print('{:15} {:5.2f} {:3}'.format('My DBSCAN ', end1 - start, metrics.normalized_mutual_info_score(y, y_my_pred)))
print('{:15} {:5.2f} {:3}'.format('Sklearn DBSCAN', end2 - end1, metrics.normalized_mutual_info_score(y, y_pred)))
Algorithm Time NMI
My DBSCAN 0.11 1.0
Sklearn DBSCAN 0.00 1.0
# plot sklearn DBSCAN result
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.scatter(X[:, 0], X[:, 1], c=y)
plt.title('origin data')
plt.subplot(1, 3, 2)
plt.scatter(X[:, 0], X[:, 1], c=y_pred)
plt.title('sklearn DBSCAN')
plt.subplot(1, 3, 3)
plt.scatter(X[:, 0], X[:, 1], c=y_my_pred)
plt.title('My DBSCAN')
修改eps后的测试结果(minPts=5):
eps | result |
---|---|
0.1 | |
0.12 | |
0.15 | |
可以观察到在sklearn生成的该类数据集上My DBSCAN与sklearn DBSCAN的聚类结果非常接近,但My DBSCAN的运行时间较长。
Reference
A Density-Based Algorithm for Discovering Clusters in Large Spatial Databases with Noise (aaai.org)
DBSCAN密度聚类算法 - 刘建平Pinard - 博客园
用scikit-learn学习DBSCAN聚类 - 刘建平Pinard - 博客园