本文共 17170 字,大约阅读时间需要 57 分钟。
以下是定义了一个损失函数以后,参数theta对应的损失函数J的值对应的示例图,我们需要找到一个使得损失函数值J取得最小值的对应的theta(选用二维平面,即参数只有一个)
1.η
a.η的介绍
b.η取值太小,影响收敛学习速度
c.η太大,甚至导致不收敛
2.注意事项
并不是所有的函数都有唯一的极值点
解决方案:
1.绘制一个简单的损失函数
import numpy as npimport matplotlib.pyplot as plt%matplotlib inline# 简单模拟一个损失函数plot_x = np.linspace(-1,6,144)plot_y = (plot_x-2.5)**2-1plt.plot(plot_x,plot_y)
[]
2.定义梯度下降函数
def dJ(theta): """损失函数的导数""" return 2*(theta-2.5)def J(theta): """损失函数""" try: return (theta-2.5)**2-1 except: return float("inf")def gradient_descent(initial_theta,eta,n_iters=1e4,epsilon=1e-8): """梯度下降法封装""" """ initial_theta:初始化的theta值 eta:学习率 n_iters:最大循环次数 epsilon:精度 """ theta = initial_theta theta_history.append(initial_theta) # 保存theta的变化值 i_iters = 0 while i_iters < n_iters: """ 如果theta两次变化之间的损失函数值的变化小于我们定义的精度 则可以说明我们已经找到了最低的损失函数值和对应的theta 如果循环次数超过了我们设置的循环次数, 则说明可能由于η设置的过大导致无止境的循环 """ gradient = dJ(theta) # 导数 last_theta = theta # 保存上一步的theta theta = theta-eta*gradient # 新的theta theta_history.append(theta) # 判断是否小于精度 if (abs(J(theta)-J(last_theta))
3.使用不同的η学习率进行测试
a.η=0.1
eta = 0.1theta_history = []gradient_descent(0.,eta)plot_theta_history()
the size of theta_history is 46
b.η=0.01
eta = 0.01theta_history = []gradient_descent(0.,eta)plot_theta_history()
the size of theta_history is 424
c.η=0.001
eta = 0.001theta_history = []gradient_descent(0.,eta)plot_theta_history()
the size of theta_history is 3682
d.η=0.8
eta = 0.8theta_history = []gradient_descent(0.,eta)plot_theta_history()
the size of theta_history is 22
e.η=1.1
eta = 1.1theta_history = []gradient_descent(0.,eta,n_iters=10)plot_theta_history()
the size of theta_history is 11
1.模拟数据的准备
import numpy as npimport matplotlib.pyplot as pltnp.random.seed(666)x = 2*np.random.random(size=100)y = x*3.+4+np.random.normal(size=100)X = x.reshape(-1,1)plt.scatter(X,y)
2.梯度下降法的实现
a.简单的直接实现
class LinearRegression: def __init__(self): """初始化Simple Linear Regression模型""" self.a_ = None self.b_ = None def fit(self, x_train, y_train): """根据训练数据集x_train,y_train训练Simple Linear Regression模型""" assert x_train.ndim == 1, \ "Simple Linear Regressor can only solve single feature training data." assert len(x_train) == len(y_train), \ "the size of x_train must be equal to the size of y_train" # 均值 x_mean = np.mean(x_train) y_mean = np.mean(y_train) # 使用向量化点乘计算参数a和b self.a_ = (x_train - x_mean).dot(y_train - y_mean) / (x_train - x_mean).dot(x_train - x_mean) self.b_ = y_mean - self.a_ * x_mean return self def predict(self, x_predict): """给定待预测数据集x_predict,返回表示x_predict的结果向量""" assert x_predict.ndim == 1, \ "Simple Linear Regressor can only solve single feature training data." assert self.a_ is not None and self.b_ is not None, \ "must fit before predict!" return np.array([self._predict(x) for x in x_predict]) def _predict(self, x_single): """给定单个待预测数据x_single,返回x_single的预测结果值""" return self.a_ * x_single + self.b_ def __repr__(self): return "LinearRegression()" def fit_gd(self, X_train, y_train, eta=0.01, n_iters = 1e4): """根据训练数据集X_train,y_train, 使用梯度下降法训练Linear Regression 模型""" assert X_train.shape[0] == y_train.shape[0], \ "the size of X_train must be equal to the size of y_train" def J(theta, X_b, y): try: return np.sum((y - X_b.dot(theta))**2) / len(X_b) except: return float('inf') def dJ(theta, X_b, y): res = np.empty(len(theta)) res[0] = np.sum(X_b.dot(theta) - y) for i in range(1, len(theta)): res[i] = np.sum((X_b.dot(theta) - y).dot(X_b[:, i])) return res * 2 / len(X_b) def gradient_descent(X_b, y, initial_theta, eta, n_iters=n_iters, epsilon=1e-8): """ 梯度下降法封装 X_b: X特征矩阵 y: 结果向量 initial_theta:初始化的theta值 eta:学习率η n_iters: 最大循环次数 epsilon: 精度 """ theta = initial_theta i_iters = 0 while i_iters < n_iters: """ 如果theta两次变化之间的损失函数值的变化小于我们定义的精度 则可以说明我们已经找到了最低的损失函数值和对应的theta 如果循环次数超过了我们设置的循环次数, 则说明可能由于η设置的过大导致无止境的循环 """ gradient = dJ(theta, X_b, y) last_theta = theta theta = theta - eta * gradient if abs(J(theta, X_b, y) - J(last_theta, X_b, y)) < epsilon: break i_iters += 1 return theta X_b = np.hstack([np.ones((len(X_train), 1)), X_train]) initial_theta = np.zeros(X_b.shape[1]) self._theta = gradient_descent(X_b, y_train, initial_theta, eta) self.interception_ = self._theta[0] self.coef_ = self._theta[1:] return self
lin_reg = LinearRegression()lin_reg.fit_gd(X,y)# 查看系数print(lin_reg.coef_)# 查看截距print(lin_reg.interception_)
[3.00706277]4.021457858204859
b.向量化的处理
import numpy as npfrom math import sqrtdef accuracy_score(y_true, y_predict): """计算y_true和y_predict之间的准确率""" assert len(y_true) == len(y_predict), \ "the size of y_true must be equal to the size of y_predict" return np.sum(y_true == y_predict) / len(y_true)def mean_squared_error(y_true, y_predict): """计算y_true和y_predict之间的MSE""" assert len(y_true) == len(y_predict), \ "the size of y_true must be equal to the size of y_predict" return np.sum((y_true - y_predict)**2) / len(y_true)def root_mean_squared_error(y_true, y_predict): """计算y_true和y_predict之间的RMSE""" return sqrt(mean_squared_error(y_true, y_predict))def mean_absolute_error(y_true, y_predict): """计算y_true和y_predict之间的MAE""" assert len(y_true) == len(y_predict), \ "the size of y_true must be equal to the size of y_predict" return np.sum(np.absolute(y_true - y_predict)) / len(y_true)def r2_score(y_true, y_predict): """计算y_true和y_predict之间的R Square""" return 1 - mean_squared_error(y_true, y_predict)/np.var(y_true)
class LinearRegression: def __init__(self): """初始化Linear Regression模型""" self.coef_ = None self.intercept_ = None self._theta = None def fit_normal(self, X_train, y_train): """根据训练数据集X_train, y_train训练Linear Regression模型""" assert X_train.shape[0] == y_train.shape[0], \ "the size of X_train must be equal to the size of y_train" X_b = np.hstack([np.ones((len(X_train), 1)), X_train]) self._theta = np.linalg.inv(X_b.T.dot(X_b)).dot(X_b.T).dot(y_train) self.intercept_ = self._theta[0] self.coef_ = self._theta[1:] return self def fit_gd(self, X_train, y_train, eta=0.01, n_iters=1e4): """根据训练数据集X_train, y_train, 使用梯度下降法训练Linear Regression模型""" assert X_train.shape[0] == y_train.shape[0], \ "the size of X_train must be equal to the size of y_train" def J(theta, X_b, y): try: return np.sum((y - X_b.dot(theta)) ** 2) / len(y) except: return float('inf') def dJ(theta, X_b, y): return X_b.T.dot(X_b.dot(theta) - y) * 2. / len(y) def gradient_descent(X_b, y, initial_theta, eta, n_iters=1e4, epsilon=1e-8): theta = initial_theta cur_iter = 0 while cur_iter < n_iters: gradient = dJ(theta, X_b, y) last_theta = theta theta = theta - eta * gradient if (abs(J(theta, X_b, y) - J(last_theta, X_b, y)) < epsilon): break cur_iter += 1 return theta X_b = np.hstack([np.ones((len(X_train), 1)), X_train]) initial_theta = np.zeros(X_b.shape[1]) self._theta = gradient_descent(X_b, y_train, initial_theta, eta, n_iters) self.intercept_ = self._theta[0] self.coef_ = self._theta[1:] return self def predict(self, X_predict): """给定待预测数据集X_predict,返回表示X_predict的结果向量""" assert self.intercept_ is not None and self.coef_ is not None, \ "must fit before predict!" assert X_predict.shape[1] == len(self.coef_), \ "the feature number of X_predict must be equal to X_train" X_b = np.hstack([np.ones((len(X_predict), 1)), X_predict]) return X_b.dot(self._theta) def accuracy_score(y_true, y_predict): """计算y_true和y_predict之间的准确率""" assert len(y_true) == len(y_predict), \ "the size of y_true must be equal to the size of y_predict" return np.sum(y_true == y_predict) / len(y_true) def score(self, X_test, y_test): """根据测试数据集 X_test 和 y_test 确定当前模型的准确度""" y_predict = self.predict(X_test) return r2_score(y_test, y_predict) def __repr__(self): return "LinearRegression()"
lin_reg = LinearRegression()lin_reg.fit_gd(X,y)print(lin_reg.coef_)print(lin_reg.intercept_)
[3.00706277]4.021457858204859
3.波士顿房价预测
a.准备数据集
from sklearn import datasetsfrom sklearn.model_selection import train_test_splitboston = datasets.load_boston()x = boston.datay = boston.targetx = x[y<50.0]y = y[y<50.0]x_train,x_test,y_train,y_test = train_test_split(x,y,test_size=0.2,random_state=666)
b.数据归一化
由于数据的规模在不同的特征上不同,所以需要对数据进行归一化
from sklearn.preprocessing import StandardScalerstandardScaler = StandardScaler()standardScaler.fit(x_train)x_train_standard = standardScaler.transform(x_train)x_test_standard = standardScaler.transform(x_test)
c.调用梯度下降法
lin_reg = LinearRegression()lin_reg.fit_gd(x_train_standard,y_train)lin_reg.score(x_test_standard,y_test)
0.8129873310487505
然而
,如果样本数非常多的情况下,那么即使使用梯度下降法也会导致速度非常慢,因为在梯度下降法中,每一个样本都需要运算,这时候就需要随机梯度下降法。
1.随机梯度下降法介绍
a.批量梯度下降法
批量梯度下降法带来的一个问题是
η的值需要设置的比较小,在样本数比较多的时候导致不是速度特别慢,这时候观察随机梯度下降法损失函数的求导公式,可以发现,我们对每一个Xb都做了求和操作,又在最外面除以了m,那么可以考虑将求和和除以m的两个运算约掉,采用每次使用一个随机的Xb
b.随机梯度下降法
由于我们使用的是随机梯度下降法
,所以导致我们的最终结果不会像批量梯度下降法一样准确的朝着一个方向运算,而是曲线行下降,这时候我们就希望,越到下面,η值相应减小,事运算次数变多,从而精确计算结果
2.模拟数据进行测试
a.数据准备:
import numpy as npimport matplotlib.pyplot as pltm = 100000x = np.random.normal(size=m)y = 4.*x+3.+np.random.normal(0,3,size=m)X = x.reshape(-1,1)
b.批量随机梯度法:
def J(theta, X_b, y): try: return np.sum((y - X_b.dot(theta)) ** 2) / len(y) except: return float('inf') def dJ(theta, X_b, y): return X_b.T.dot(X_b.dot(theta) - y) * 2. / len(y)def gradient_descent(X_b, y, initial_theta, eta, n_iters=1e4, epsilon=1e-8): theta = initial_theta cur_iter = 0 while cur_iter < n_iters: gradient = dJ(theta, X_b, y) last_theta = theta theta = theta - eta * gradient if (abs(J(theta, X_b, y) - J(last_theta, X_b, y)) < epsilon): break cur_iter += 1 return theta
%%timeX_b = np.hstack([np.ones((len(X),1)),X])initial_theta = np.zeros(X_b.shape[1])eta = 0.01theta = gradient_descent(X_b,y,initial_theta,eta)
Wall time: 1.27 s
print(theta)
[3.00590902 4.00776602]
c.随机梯度下降法:
def dJ_sgd(theta,X_b_i,y_i): return X_b_i.T.dot(X_b_i.dot(theta) - y_i) * 2def sgd(X_b,y,initial_theta,n_iters): t0 = 5 t1 = 50 def learning_rate(t): return t0 / (t + t1) theta = initial_theta for cur_iter in range(n_iters): rand_i = np.random.randint(len(X_b)) gradient = dJ_sgd(theta,X_b[rand_i],y[rand_i]) theta = theta - learning_rate(cur_iter) * gradient return theta
%%timeX_b = np.hstack([np.ones((len(X), 1)), X])initial_theta = np.zeros(X_b.shape[1])theta = sgd(X_b, y, initial_theta, n_iters=m//3)
Wall time: 325 ms
3.对随机梯度函数进行封装
class LinearRegression(): def fit_sgd(self, X_train, y_train, n_iters=5, t0=5, t1=50): """ 根据训练数据集X_train, y_train, 使用随机梯度下降法训练Linear Regression模型 :param X_train: :param y_train: :param n_iters: 在随机梯度下降法中,n_iters代表所有的样本会被循环几次 :param t0: :param t1: :return: """ assert X_train.shape[0] == y_train.shape[0], \ "the size of X_train must be equal to the size of y_train" assert n_iters >= 1 def dJ_sgd(theta, X_b_i, y_i): """ X_b,y 中的随机一个元素进行导数公式的计算 :param theta: :param X_b_i: :param y_i: :return: """ return X_b_i * (X_b_i.dot(theta) - y_i) * 2. def sgd(X_b, y, initial_theta, n_iters, t0=5, t1=50): def learning_rate(t): """ 计算学习率,t1 为了减慢变化速度,t0为了增加随机性 :param t: 第t次循环 :return: """ return t0 / (t + t1) theta = initial_theta m = len(X_b) for cur_iter in range(n_iters): # 对X_b进行一个乱序的排序 indexes = np.random.permutation(m) X_b_new = X_b[indexes] y_new = y[indexes] # 对整个数据集看一遍 for i in range(m): gradient = dJ_sgd(theta, X_b_new[i], y_new[i]) theta = theta - learning_rate(cur_iter * m + i) * gradient return theta X_b = np.hstack([np.ones((len(X_train), 1)), X_train]) initial_theta = np.random.randn(X_b.shape[1]) self._theta = sgd(X_b, y_train, initial_theta, n_iters, t0, t1) self.interception_ = self._theta[0] self.coef_ = self._theta[1:] return self
1.数据集获取
from sklearn import datasetsboston = datasets.load_boston()x = boston.datay = boston.targetx = x[y<50.0]y = y[y<50.0]
2.数据集的划分
from sklearn.model_selection import train_test_splitx_train,x_test,y_train,y_test = train_test_split(x,y,random_state=666)
3.数据集归一化处理
from sklearn.preprocessing import StandardScalerstandardScaler = StandardScaler()standardScaler.fit(x_train)x_train_standard = standardScaler.transform(x_train)x_test_standard = standardScaler.transform(x_test)
4.调用SGD方法
from sklearn.linear_model import SGDRegressorsgd_reg = SGDRegressor(n_iter=100)
%%timesgd_reg.fit(x_train_standard,y_train)
Wall time: 5.98 msD:\software\Anaconda\workplace\lib\site-packages\sklearn\linear_model\stochastic_gradient.py:152: DeprecationWarning: n_iter parameter is deprecated in 0.19 and will be removed in 0.21. Use max_iter and tol instead. DeprecationWarning)SGDRegressor(alpha=0.0001, average=False, early_stopping=False, epsilon=0.1, eta0=0.01, fit_intercept=True, l1_ratio=0.15, learning_rate='invscaling', loss='squared_loss', max_iter=None, n_iter=100, n_iter_no_change=5, penalty='l2', power_t=0.25, random_state=None, shuffle=True, tol=None, validation_fraction=0.1, verbose=0, warm_start=False)
sgd_reg.score(x_test_standard,y_test)
0.7997484818787682
# 打印theta参数print(sgd_reg.coef_)# 打印截距print(sgd_reg.intercept_)
[-0.972382 0.6864189 -0.34498804 -0.0125929 -1.26078593 2.27274023 -0.40157318 -2.30498603 1.96081398 -1.83022108 -1.83893614 0.74687792 -2.81437963][21.53228681]
小批量梯度下降法:
我们每一次不看全部样本那么多,也不是只看一次样本那么少,每次只看k个样本。
def fit_lit_sgd(self, X_train, y_train, n_iters=5, t0=5, t1=50,k=10): """ 根据训练数据集X_train, y_train, 使用随机梯度下降法训练Linear Regression模型 :param X_train: :param y_train: :param n_iters: 在随机梯度下降法中,n_iters代表所有的样本会被看几圈 :param t0: :param t1: :param k: 小批量随机下降法的超参数k :return: """ assert X_train.shape[0] == y_train.shape[0], \ "the size of X_train must be equal to the size of y_train" assert n_iters >= 1 def dJ_sgd(theta, X_b_k, y_k): """ 去X_b,y 中的随机选择k个元素进行导数公式的计算 :param theta: :param X_b_i: :param y_i: :return: """ return np.sum((X_b_k * (X_b_k.dot(theta) - y_k) ))* 2/len(X_b_k) def sgd(X_b, y, initial_theta, n_iters, t0=5, t1=50): def learning_rate(t): """ 计算学习率,t1 为了减慢变化速度,t0为了增加随机性 :param t: 第t次循环 :return: """ return t0 / (t + t1) theta = initial_theta m = len(X_b) for cur_iter in range(n_iters): # 每次看k个元素 i =0 while i < m: X_b_new = X_b[i:i+k] y_new = y[i:i+k] gradient = dJ_sgd(theta, X_b_new, y_new) theta = theta - learning_rate(cur_iter * m + i+k) * gradient i = i+k return theta X_b = np.hstack([np.ones((len(X_train), 1)), X_train]) initial_theta = np.random.randn(X_b.shape[1]) self._theta = sgd(X_b, y_train, initial_theta, n_iters, t0, t1) self.interception_ = self._theta[0] self.coef_ = self._theta[1:] return self
转载地址:http://gdzdf.baihongyu.com/