-
算法特征:
回归曲线上的每一点均对应一个独立的线性方程, 该线性方程由一组经过加权后的残差决定. 残差来源于待拟合数据点与拟合超平面在相空间的距离, 权重依赖于待拟合数据点与拟合数据点在参数空间的距离. -
算法推导:
待拟合方程:
\begin{equation}\label{eq_1}
h_{\theta}(x) = x^T\theta
\end{equation}
最小二乘法:
\begin{equation}\label{eq_2}
min\ \frac{1}{2}(X^T\theta-\bar{Y})^TW(X^T\theta-\bar{Y})
\end{equation}
其中, $X=[x^1, x^2, ..., x^n]$是由待拟合数据之输入值所组成的矩阵, 每根分量均为一个列矢量, 将该列矢量的最后一个元素置1以满足线性拟合之常数项需求. $\bar{Y}$由待拟合数据之标准输出值组成, 是一个列矢量. $W$由待拟合数据之权重组成, 是一个对角矩阵, 此处取第$i$个对角元为$exp(-(x_i - x)^2 / (2\tau^2))$. $\theta$为待求解的优化变量, 是一个列矢量.
上诉优化问题(\ref{eq_2})为无约束凸优化问题, 存在如下近似解析解:
\begin{equation}\label{eq_3}
\theta = (XWX^T + \varepsilon I)^{-1}XW\bar{Y}
\end{equation} -
代码实现:
1 # 局部加权线性回归 2 # 交叉验证计算泛化误差最小点 3 4 5 import numpy 6 from matplotlib import pyplot as plt 7 8 9 # 待拟合不含噪声之目标函数 10 def oriFunc(x): 11 y = numpy.exp(-x) * numpy.sin(10*x) 12 return y 13 # 待拟合包含噪声之目标函数 14 def traFunc(x, sigma=0.03): 15 y = oriFunc(x) + numpy.random.normal(0, sigma, numpy.array(x).size) 16 return y 17 18 19 # 局部加权线性回归之实现 20 class LW(object): 21 22 def __init__(self, xBound=(0, 3), number=500, tauBound=(0.001, 100), epsilon=1.e-3): 23 self.__xBound = xBound # 采样边界 24 self.__number = number # 采样数目 25 self.__tauBound = tauBound # tau之搜索边界 26 self.__epsilon = epsilon # tau之搜索精度 27 28 29 def get_data(self): 30 ''' 31 根据目标函数生成待拟合数据 32 ''' 33 X = numpy.linspace(*self.__xBound, self.__number) 34 oriY_ = oriFunc(X) # 不含误差之响应 35 traY_ = traFunc(X) # 包含误差之响应 36 37 self.X = numpy.vstack((X.reshape((1, -1)), numpy.ones((1, X.shape[0])))) 38 self.oriY_ = oriY_.reshape((-1, 1)) 39 self.traY_ = traY_.reshape((-1, 1)) 40 41 return self.X, self.oriY_, self.traY_ 42 43 44 def lw_fitting(self, tau=None): 45 if not hasattr(self, "X"): 46 self.get_data() 47 if tau is None: 48 if hasattr(self, "bestTau"): 49 tau = self.bestTau 50 else: 51 tau = self.get_tau() 52 53 xList, yList = list(), list() 54 for val in numpy.linspace(*self.__xBound, self.__number * 5): 55 x = numpy.array([[val], [1]]) 56 theta = self.__fitting(x, self.X, self.traY_, tau) 57 y = numpy.matmul(theta.T, x) 58 xList.append(x[0, 0]) 59 yList.append(y[0, 0]) 60 61 resiList = list() # 残差计算 62 for idx in range(self.__number): 63 x = self.X[:, idx:idx+1] 64 theta = self.__fitting(x, self.X, self.traY_, tau) 65 y = numpy.matmul(theta.T, x) 66 resiList.append(self.traY_[idx, 0] - y[0, 0]) 67 68 return xList, yList, self.X[0, :].tolist(), resiList 69 70 71 def show(self): 72 ''' 73 绘图展示整体拟合情况 74 ''' 75 xList, yList, sampleXList, sampleResiList = self.lw_fitting() 76 y2List = oriFunc(numpy.array(xList)) 77 fig = plt.figure(figsize=(8, 14)) 78 ax1 = plt.subplot(2, 1, 1) 79 ax2 = plt.subplot(2, 1, 2) 80 81 ax1.scatter(self.X[0, :], self.traY_[:, 0], c="green", alpha=0.7, label="samples with noise") 82 ax1.plot(xList, y2List, c="red", lw=4, alpha=0.7, label="standard curve") 83 ax1.plot(xList, yList, c="black", lw=2, linestyle="--", label="fitting curve") 84 ax1.set(xlabel="$x$", ylabel="$y$") 85 ax1.legend() 86 87 ax2.scatter(sampleXList, sampleResiList, c="blue", s=10) 88 ax2.set(xlabel="$x$", ylabel="$\epsilon$", title="residual distribution") 89 90 plt.show() 91 plt.close() 92 fig.tight_layout() 93 fig.savefig("lw.png", dpi=300) 94 95 96 def __fitting(self, x, X, Y_, tau, epsilon=1.e-9): 97 tmpX = X[0:1, :] 98 tmpW = (-(tmpX - x[0, 0]) ** 2 / tau ** 2 / 2).reshape(-1) 99 W = numpy.diag(numpy.exp(tmpW)) 100 101 item1 = numpy.matmul(numpy.matmul(X, W), X.T) 102 item2 = numpy.linalg.inv(item1 + epsilon * numpy.identity(item1.shape[0])) 103 item3 = numpy.matmul(numpy.matmul(X, W), Y_) 104 105 theta = numpy.matmul(item2, item3) 106 107 return theta 108 109 110 def get_tau(self): 111 ''' 112 交叉验证返回最优tau 113 采用黄金分割法计算最优tau 114 ''' 115 if not hasattr(self, "X"): 116 self.get_data() 117 118 lowerBound, upperBound = self.__tauBound 119 lowerTau = self.__calc_lowerTau(lowerBound, upperBound) 120 upperTau = self.__calc_upperTau(lowerBound, upperBound) 121 lowerErr = self.__calc_generalErr(self.X, self.traY_, lowerTau) 122 upperErr = self.__calc_generalErr(self.X, self.traY_, upperTau) 123 124 while (upperTau - lowerTau) > self.__epsilon: 125 if lowerErr > upperErr: 126 lowerBound = lowerTau 127 lowerTau = upperTau 128 lowerErr = upperErr 129 upperTau = self.__calc_upperTau(lowerBound, upperBound) 130 upperErr = self.__calc_generalErr(self.X, self.traY_, upperTau) 131 else: 132 upperBound = upperTau 133 upperTau = lowerTau 134 upperErr = lowerErr 135 lowerTau = self.__calc_lowerTau(lowerBound, upperBound) 136 lowerErr = self.__calc_generalErr(self.X, self.traY_, lowerTau) 137 138 self.bestTau = (upperTau + lowerTau) / 2 139 return self.bestTau 140 141 142 def __calc_generalErr(self, X, Y_, tau): 143 generalErr = 0 144 145 for idx in range(X.shape[1]): 146 tmpx = X[:, idx:idx+1] 147 tmpy_ = Y_[idx:idx+1, :] 148 tmpX = numpy.hstack((X[:, 0:idx], X[:, idx+1:])) 149 tmpY_ = numpy.vstack((Y_[0:idx, :], Y_[idx+1:, :])) 150 151 theta = self.__fitting(tmpx, tmpX, tmpY_, tau) 152 tmpy = numpy.matmul(theta.T, tmpx) 153 generalErr += (tmpy_[0, 0] - tmpy[0, 0]) ** 2 154 155 return generalErr 156 157 158 def __calc_lowerTau(self, lowerBound, upperBound): 159 delta = upperBound - lowerBound 160 lowerTau = upperBound - delta * 0.618 161 return lowerTau 162 163 164 def __calc_upperTau(self, lowerBound, upperBound): 165 delta = upperBound - lowerBound 166 upperTau = lowerBound + delta * 0.618 167 return upperTau 168 169 170 171 172 if __name__ == '__main__': 173 obj = LW() 174 obj.show()
View Code笔者所用示例函数为:
\begin{equation}\label{eq_4}
f(x) = e^{-x}sin(10x)
\end{equation} -
结果展示:
-
使用建议:
1. 局部加权线性回归主要作为一种光滑化的手段存在, 其对非样本覆盖区间的预测能力较低.
2. 在不明确具体拟合公式的前提下, 可采用局部加权线性回归获取一条可以接受的拟合曲线.
3. 在选定合适的加权方式后, 可以计算指定点处相对可靠的0阶及1阶函数信息.
相关文章
- 08-17斯坦福机器学习by吴恩达-线性回归 python实现(ex1更新,含3D画图)
- 08-17Python实现线性回归之 一元线性回归and平方损失函数
- 08-17机器学习3- 一元线性回归+Python实现
- 08-17局部加权线性回归(1) - Python实现
- 08-17局部加权之逻辑回归(1) - Python实现
- 08-17Python/spss-多元回归建模-共线性诊断1(推荐A)
- 08-17机器学习---用python实现最小二乘线性回归算法并用随机梯度下降法求解 (Machine Learning Least Squares Linear Regression Application SGD)
- 08-17python代码实现回归分析--线性回归
- 08-17线性回归算法 - python实现
- 08-17局部线性嵌入(LLE)算法的详细推导及Python实现