# Author:Richard
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(0) # 使得每次生成的随机数相同
X_train_path = r"G:\课程学习\机器学习\Mr_Li_ML\HomeWorks\数据\hw2\data\X_train"
Y_train_path = r"G:\课程学习\机器学习\Mr_Li_ML\HomeWorks\数据\hw2\data\Y_train"
X_test_path = r"G:\课程学习\机器学习\Mr_Li_ML\HomeWorks\数据\hw2\data\X_test"
# 将数据转成numpy格式
with open(X_train_path) as File:
head = next(File) # 提取投文件,Str格式
# print(type(head),head[0])
X_train = np.array([line.strip('\n').split(',')[1:] for line in File], dtype=float)
# print(X_train.shape) #(54256,510)
with open(Y_train_path) as File:
head = next(File)
Y_train = np.array([line.strip('\n').split(',')[1] for line in File], dtype=float)
# print(Y_train.shape) #(5425611)
with open(X_test_path) as File:
head = next(File)
X_test = np.array([line.strip('\n').split(',')[1:] for line in File], dtype=float)
# print(X_test.shape) #(27622,510)
# set a Normalize function
def _normalize(X, train=True, specified_column=None, X_mean=None, X_std=None):
# This function normalizes specific columns of X.
# The mean and standard variance of training data will be reused when processing testing data.
#
# Arguments:
# X: data to be processed
# train: 'True' when processing training data, 'False' for testing data
# specific_column: indexes of the columns that will be normalized. If 'None', all columns
# will be normalized.
# X_mean: mean value of training data, used when train = 'True'
# X_std: standard deviation of training data, used when train = 'True'
# Outputs:
# X: normalized data
# X_mean: computed mean value of training data
# X_std: computed standard deviation of training data
if specified_column == None:
specified_column = np.arange(X.shape[1])
if train:
X_mean = np.mean(X[:, specified_column], axis=0)
X_std = np.std(X[:, specified_column], axis=0)
for i in range(X.shape[0]):
for j in range(X.shape[1]):
if X_std[j] != 0:
X[i, j] = (X[i, j] - X_mean[j]) / X_std[j]
return X, X_mean, X_std
# 标准化训练数据和测试数据
X_train, X_mean, X_std = _normalize(X_train, train=True)
X_test, _, _ = _normalize(X_test, train=False, X_mean=X_mean, X_std=X_std)
# _变量用来存储函数返回的无用值
# 将数据分成训练集和验证集 9:1
ratio = 0.9
train_len = int(len(X_train) * ratio)
# X_train = X_train[:train_len]
# Y_train = Y_train[:train_len]
# X_dev = X_train[train_len:]
# Y_dev = Y_train[train_len:]
X_train0 = X_train
Y_train0 = Y_train
X_train = X_train0[:train_len]
Y_train = Y_train0[:train_len]
X_dev = X_train0[train_len:]
Y_dev = Y_train0[train_len:]
#
train_size = X_train.shape[0]
dev_size = X_dev.shape[0]
test_size = X_test.shape[0]
data_dim = X_train.shape[1]
# print('Size of training set: {}'.format(train_size))
# print('Size of development set: {}'.format(dev_size))
# print('Size of testing set: {}'.format(test_size))
# print('Dimension of data: {}'.format(data_dim))
###
# Size of training set: 48830
# Size of development set: 5426
# Size of testing set: 27622
# imension of data: 510
###
def _shuffle(X, Y):
# This function shuffles two equal-length list/array, X and Y, together.
randomize = np.arange(len(X))
np.random.shuffle(randomize)
return (X[randomize], Y[randomize])
def _sigmoid(z):
# sigmoid function can be used to calculate probability
# to avoid overflow, min/max value is set
return np.clip(1.0 / (1.0 + np.exp(-z)), 1e-8, 1 - 1e-8)
def _f(X, w, b):
# This is the logistic regression function, parameterized by w and b
#
# Arguements:
# X: input data, shape = [batch_size, data_dimension]
# w: weight vector, shape = [data_dimension, ]
# b: bias, scalar
# Output: numpy.matmul 函数返回两个数组的矩阵乘积
# predicted probability of each row of X being positively labeled, shape = [batch_size, ]
return _sigmoid(np.matmul(X, w) + b)
def _predict(X, w, b):
# This function returns a truth value prediction for each row of X
# by rounding the result of logistic regression function.
# return np.round(_f(X,w,b)).astype(np.int) #原则:对于浮点型数据,四舍六入,正好一半就搞到偶数,和文中说的不太一样 修改
# return 1 if _f(X, w, b) >= 0.5 else 0
f = _f(X, w, b)
f[f >= 0.5] = 1
f[f < 0.5] = 0
return f
def _accuracy(Y_pred, Y_label):
# this function calculate presiction accuracy
acc = 1 - np.mean(np.abs(Y_pred - Y_label))
# acc = 1 - np.abs(Y_pred - Y_label)
return acc
def _cross_entropy_loss(Y_pred, Y_label):
# This function computes the cross entropy.
#
# Arguements:
# y_pred: probabilistic predictions, float vector
# Y_label: ground truth labels, bool vector
# Output:
# cross entropy, scalar
cross_entropy = -np.dot(Y_label, np.log(Y_pred)) - np.dot((1 - Y_label), np.log(1 - Y_pred))
return cross_entropy
def _gradient(X, Y_label, w, b):
# This function computes the gradient of cross entropy loss with respect to weight w and bias b.
y_pred = _f(X, w, b)
pred_error = Y_label - y_pred
w_grad = -np.sum(pred_error * X.T, 1)
b_grad = -np.sum(pred_error)
return w_grad, b_grad
# 初始化权重w和b 都为0
w = np.zeros((data_dim,))
b = np.zeros((1,))
# 训练时的超参数
max_iter = 20
batch_size = 8
learning_rate = 0.05
# 保存每个iteration的loss和accuracy,方便画图
train_loss = []
dev_loss = []
train_acc = []
dev_acc = []
# 累计参数更新的次数
step = 1
# 迭代训练
for epoch in range(max_iter):
# 在每个epoch开始时,随机打散训练数据
X_train, Y_train = _shuffle(X_train, Y_train)
# Mini-batch训练
for idx in range(int(np.floor(train_size / batch_size))):
X = X_train[idx * batch_size:(idx + 1) * batch_size]
Y = Y_train[idx * batch_size:(idx + 1) * batch_size]
# calculate gradient
# 学习率随着时间衰减
w_grad, b_grad = _gradient(X, Y, w, b)
w = w - learning_rate / np.sqrt(step) * w_grad
b = b - learning_rate / np.sqrt(step) * b_grad
#
step += 1
# 计算训练集合测试集的loss和accuracy
# Y_train_pred = _predict(X_train, w, b)
# for i in range(len(Y_train_pred)):
# train_acc.append(_accuracy(Y_train_pred[i], Y_train[i]))
# train_loss.append(_cross_entropy_loss(Y_train_pred[i], Y_train[i]) / train_size)
# Y_dev_pred = _predict(X_dev, w, b)
# for i in range(len(Y_dev_pred)):
# dev_acc.append(_accuracy(Y_dev_pred[i], Y_dev[i]))
# dev_loss.append(_cross_entropy_loss(Y_dev_pred[i], Y_dev[i]) / dev_size)
y_train_pred = _f(X_train, w, b)
# Y_train_pred = np.round(y_train_pred)
Y_train_pred = _predict(X_train, w, b)
train_acc.append(_accuracy(Y_train_pred, Y_train))
train_loss.append(_cross_entropy_loss(y_train_pred, Y_train) / train_size)
y_dev_pred = _f(X_dev, w, b)
# Y_dev_pred = np.round(y_dev_pred)
Y_dev_pred = _predict(X_dev, w, b)
dev_acc.append(_accuracy(Y_dev_pred, Y_dev))
dev_loss.append(_cross_entropy_loss(y_dev_pred, Y_dev) / dev_size)
print('Training loss: {}'.format(train_loss[-1]))
print('Development loss: {}'.format(dev_loss[-1]))
print('Training accuracy: {}'.format(train_acc[-1]))
print('Development accuracy: {}'.format(dev_acc[-1]))
print('weight_hw2.npy', w)
# Loss curve
plt.plot(train_loss)
plt.plot(dev_loss)
plt.title("Loss")
plt.legend(['train', 'dev'])
plt.show()
# accuracy curve
plt.plot(train_acc)
plt.plot(dev_acc)
plt.title("Accuracy")
plt.legend(['train', 'dev'])
plt.show()