局部加权线性回归(1) - Python实现

  • 算法特征:
    回归曲线上的每一点均对应一个独立的线性方程, 该线性方程由一组经过加权后的残差决定. 残差来源于待拟合数据点与拟合超平面在相空间的距离, 权重依赖于待拟合数据点与拟合数据点在参数空间的距离.
  • 算法推导:
    待拟合方程:
    \begin{equation}\label{eq_1}
    h_{\theta}(x) = x^T\theta
    \end{equation}
    最小二乘法:
    \begin{equation}\label{eq_2}
    min\ \frac{1}{2}(X^T\theta-\bar{Y})^TW(X^T\theta-\bar{Y})
    \end{equation}
    其中, $X=[x^1, x^2, ..., x^n]$是由待拟合数据之输入值所组成的矩阵, 每根分量均为一个列矢量, 将该列矢量的最后一个元素置1以满足线性拟合之常数项需求. $\bar{Y}$由待拟合数据之标准输出值组成, 是一个列矢量. $W$由待拟合数据之权重组成, 是一个对角矩阵, 此处取第$i$个对角元为$exp(-(x_i - x)^2 / (2\tau^2))$. $\theta$为待求解的优化变量, 是一个列矢量.

    上诉优化问题(\ref{eq_2})为无约束凸优化问题, 存在如下近似解析解:
    \begin{equation}\label{eq_3}
    \theta = (XWX^T + \varepsilon I)^{-1}XW\bar{Y}
    \end{equation}
  • 代码实现:
    局部加权线性回归(1) - Python实现
      1 # 局部加权线性回归
      2 # 交叉验证计算泛化误差最小点
      3 
      4 
      5 import numpy
      6 from matplotlib import pyplot as plt
      7 
      8 
      9 # 待拟合不含噪声之目标函数
     10 def oriFunc(x):
     11     y = numpy.exp(-x) * numpy.sin(10*x)
     12     return y
     13 # 待拟合包含噪声之目标函数
     14 def traFunc(x, sigma=0.03):
     15     y = oriFunc(x) + numpy.random.normal(0, sigma, numpy.array(x).size)
     16     return y
     17 
     18     
     19 # 局部加权线性回归之实现
     20 class LW(object):
     21     
     22     def __init__(self, xBound=(0, 3), number=500, tauBound=(0.001, 100), epsilon=1.e-3):
     23         self.__xBound = xBound               # 采样边界
     24         self.__number = number               # 采样数目
     25         self.__tauBound = tauBound           # tau之搜索边界
     26         self.__epsilon = epsilon             # tau之搜索精度
     27         
     28         
     29     def get_data(self):
     30         '''
     31         根据目标函数生成待拟合数据
     32         '''
     33         X = numpy.linspace(*self.__xBound, self.__number)
     34         oriY_ = oriFunc(X)                   # 不含误差之响应
     35         traY_ = traFunc(X)                   # 包含误差之响应
     36         
     37         self.X = numpy.vstack((X.reshape((1, -1)), numpy.ones((1, X.shape[0]))))
     38         self.oriY_ = oriY_.reshape((-1, 1))
     39         self.traY_ = traY_.reshape((-1, 1))
     40         
     41         return self.X, self.oriY_, self.traY_
     42         
     43         
     44     def lw_fitting(self, tau=None):
     45         if not hasattr(self, "X"):
     46             self.get_data()
     47         if tau is None:
     48             if hasattr(self, "bestTau"):
     49                 tau = self.bestTau
     50             else:
     51                 tau = self.get_tau()
     52         
     53         xList, yList = list(), list()
     54         for val in numpy.linspace(*self.__xBound, self.__number * 5):
     55             x = numpy.array([[val], [1]])
     56             theta = self.__fitting(x, self.X, self.traY_, tau)
     57             y = numpy.matmul(theta.T, x)
     58             xList.append(x[0, 0])
     59             yList.append(y[0, 0])
     60             
     61         resiList = list()                                              # 残差计算
     62         for idx in range(self.__number):
     63             x = self.X[:, idx:idx+1]
     64             theta = self.__fitting(x, self.X, self.traY_, tau)
     65             y = numpy.matmul(theta.T, x)
     66             resiList.append(self.traY_[idx, 0] - y[0, 0])
     67             
     68         return xList, yList, self.X[0, :].tolist(), resiList
     69         
     70         
     71     def show(self):
     72         '''
     73         绘图展示整体拟合情况
     74         '''
     75         xList, yList, sampleXList, sampleResiList = self.lw_fitting()
     76         y2List = oriFunc(numpy.array(xList))
     77         fig = plt.figure(figsize=(8, 14))
     78         ax1 = plt.subplot(2, 1, 1)
     79         ax2 = plt.subplot(2, 1, 2)
     80         
     81         ax1.scatter(self.X[0, :], self.traY_[:, 0], c="green", alpha=0.7, label="samples with noise")
     82         ax1.plot(xList, y2List, c="red", lw=4, alpha=0.7, label="standard curve")
     83         ax1.plot(xList, yList, c="black", lw=2, linestyle="--", label="fitting curve")
     84         ax1.set(xlabel="$x$", ylabel="$y$")
     85         ax1.legend()
     86         
     87         ax2.scatter(sampleXList, sampleResiList, c="blue", s=10)
     88         ax2.set(xlabel="$x$", ylabel="$\epsilon$", title="residual distribution")
     89         
     90         plt.show()
     91         plt.close()
     92         fig.tight_layout()
     93         fig.savefig("lw.png", dpi=300)
     94         
     95         
     96     def __fitting(self, x, X, Y_, tau, epsilon=1.e-9):
     97         tmpX = X[0:1, :]
     98         tmpW = (-(tmpX - x[0, 0]) ** 2 / tau ** 2 / 2).reshape(-1)
     99         W = numpy.diag(numpy.exp(tmpW))
    100         
    101         item1 = numpy.matmul(numpy.matmul(X, W), X.T)
    102         item2 = numpy.linalg.inv(item1 + epsilon * numpy.identity(item1.shape[0]))
    103         item3 = numpy.matmul(numpy.matmul(X, W), Y_)
    104         
    105         theta = numpy.matmul(item2, item3)
    106         
    107         return theta
    108 
    109         
    110     def get_tau(self):
    111         '''
    112         交叉验证返回最优tau
    113         采用黄金分割法计算最优tau
    114         '''
    115         if not hasattr(self, "X"):
    116             self.get_data()
    117         
    118         lowerBound, upperBound = self.__tauBound
    119         lowerTau = self.__calc_lowerTau(lowerBound, upperBound)
    120         upperTau = self.__calc_upperTau(lowerBound, upperBound)
    121         lowerErr = self.__calc_generalErr(self.X, self.traY_, lowerTau)
    122         upperErr = self.__calc_generalErr(self.X, self.traY_, upperTau)
    123         
    124         while (upperTau - lowerTau) > self.__epsilon:
    125             if lowerErr > upperErr:
    126                 lowerBound = lowerTau
    127                 lowerTau = upperTau
    128                 lowerErr = upperErr
    129                 upperTau = self.__calc_upperTau(lowerBound, upperBound)
    130                 upperErr = self.__calc_generalErr(self.X, self.traY_, upperTau)
    131             else:
    132                 upperBound = upperTau
    133                 upperTau = lowerTau
    134                 upperErr = lowerErr
    135                 lowerTau = self.__calc_lowerTau(lowerBound, upperBound)
    136                 lowerErr = self.__calc_generalErr(self.X, self.traY_, lowerTau)
    137                 
    138         self.bestTau = (upperTau + lowerTau) / 2
    139         return self.bestTau
    140         
    141         
    142     def __calc_generalErr(self, X, Y_, tau):
    143         generalErr = 0
    144         
    145         for idx in range(X.shape[1]):
    146             tmpx = X[:, idx:idx+1]
    147             tmpy_ = Y_[idx:idx+1, :]
    148             tmpX = numpy.hstack((X[:, 0:idx], X[:, idx+1:]))
    149             tmpY_ = numpy.vstack((Y_[0:idx, :], Y_[idx+1:, :]))
    150             
    151             theta = self.__fitting(tmpx, tmpX, tmpY_, tau)
    152             tmpy = numpy.matmul(theta.T, tmpx)
    153             generalErr += (tmpy_[0, 0] - tmpy[0, 0]) ** 2
    154 
    155         return generalErr
    156         
    157         
    158     def __calc_lowerTau(self, lowerBound, upperBound):
    159         delta = upperBound - lowerBound
    160         lowerTau = upperBound - delta * 0.618
    161         return lowerTau
    162         
    163         
    164     def __calc_upperTau(self, lowerBound, upperBound):
    165         delta = upperBound - lowerBound
    166         upperTau = lowerBound + delta * 0.618
    167         return upperTau
    168         
    169         
    170         
    171         
    172 if __name__ == '__main__':
    173     obj = LW()
    174     obj.show()
    View Code

    笔者所用示例函数为:
    \begin{equation}\label{eq_4}
    f(x) = e^{-x}sin(10x)
    \end{equation}

  • 结果展示:
    局部加权线性回归(1) - Python实现
  • 使用建议:
    1. 局部加权线性回归主要作为一种光滑化的手段存在, 其对非样本覆盖区间的预测能力较低.
    2. 在不明确具体拟合公式的前提下, 可采用局部加权线性回归获取一条可以接受的拟合曲线.
    3. 在选定合适的加权方式后, 可以计算指定点处相对可靠的0阶及1阶函数信息.
上一篇:MySQL5.7 - 基于GTID复制模式搭建主从复制


下一篇:Python math.tau 常量