@Pigmon
2017-09-17T20:24:48.000000Z
字数 10310
阅读 941
Python
# -*- coding: utf-8 -*-
"""
Created on Sun Apr 09 17:25:53 2017
@author: Yuan Sheng
"""
#import random
import numpy as np
import random as rnd
def sigmoid(x):
return 1. / (1. + np.e ** (-x))
class Neuron:
"神经元基类"
# layer: 所在层,第一个输入层为0
# index: 在当前层的位置,最上面为0
# gid: 全局位置,从 0 开始
# 整个网络输入在左,输出在右
# 从输入神经元算起,按左上到右下依次排列
# 2
# ->0 5 7->
# 3
# ->1 6 8->
# 4
layer, index, gid = 0, 0, 0
# output: 输出值
# error: 误差项
output, error = 0.0, 0.0
def __init__ (self, _layer, _index, _global_index):
self.layer, self.index, self.gid = _layer, _index, _global_index
self.bias = rnd.uniform(-1.0, 1.0)
def sigmoid_derive(self):
return self.output * (1.0 - self.output)
def count_out(self, _in_w, _in_vec):
"计算输出值"
if len(_in_w) != len(_in_vec):
return 0.0
self.output = sigmoid(self.bias + np.dot(_in_w, _in_vec))
return self.output
def count_error(self, _out_w, _next_layer_error):
"计算误差项"
if len(_out_w) != len(_next_layer_error):
return 0.0
self.error = self.sigmoid_derive() * np.dot(_out_w, _next_layer_error)
return self.error
def update_bais(self, _eta):
"调整 bias 值"
self.bias -= _eta * self.error
def debug(self):
print ('%d: <%d, %d> b=%f' % (self.gid, self.layer, self.index, self.bias))
class OutputNeuron(Neuron):
"输出层神经元"
def __init__ (self, _layer, _index, _global_index):
Neuron.__init__(self, _layer, _index, _global_index)
def count_error(self, _label):
self.error = self.sigmoid_derive() * (self.output - _label)
return self.error
# -*- coding: utf-8 -*-
"""
Created on Sun Apr 09 20:50:49 2017
@author: Yuan Sheng
为了降低阅读代码的难度,并没有采用矩阵乘法,而是直接套用ppt中的公式
"""
import numpy as np
import random as rnd
from neuron import Neuron, OutputNeuron
class BPNN:
"BP前馈神经网络"
# @input_vec_len: 输入层维数
# @output_vec_len: 输出层维数
# @hidden_layer_count: 隐藏层数
# @eta: 学习率,步长
# @threshold: 全局均方差的阈值
input_vec_len, output_vec_len, hidden_layer_count, eta, threshold = 2, 1, 0, 0.2, 0.1
# @hidden_layers_len: 每个隐藏层维数
# @hidden_neurons:隐藏神经元二维数组
# @output_neurons:输出神经元一维数组
# @weights:权重三维数组
hidden_layers_len, hidden_neurons, output_neurons, weights = [], [], [], []
# 全局位置到(layer, index)的映射字典; 当前的输出值
gid_dict = {}
# -----------------输出变量-----------------
_correct_rate, _output_vec = 0.0, []
# -------------------------------------------------------------------------
def __init__ (self, _in_len, _out_len, _hiddens_len, _learning_rate):
self.input_vec_len, self.output_vec_len, self.hidden_layer_count, self.hidden_layers_len, self.eta = \
_in_len, _out_len, len(_hiddens_len), _hiddens_len, _learning_rate
# 生成隐藏层 Nerons
cnter = _in_len
for i in range(0, len(_hiddens_len)):
cnt, layer_list = _hiddens_len[i], []
for j in range(0, cnt):
layer_list.append(Neuron(i, j, cnter))
self.gid_dict[cnter] = (i, j)
cnter += 1
self.hidden_neurons.append(layer_list)
# 生成输出层 Neurons
for i in range(0, _out_len):
self.output_neurons.append(OutputNeuron(self.hidden_layer_count, i, cnter))
self.gid_dict[cnter] = (self.hidden_layer_count, i)
cnter += 1
# 生成权重,例如:
# 形如下面这样的输入层2,隐藏层为(3, 2),输出层为2:
#
# 2
# ->0 5 7->
# 3
# ->1 6 8->
# 4
#
# 权重数组如下所示:
# [
# [ [w_0_2, w_1_2], [w_0_3, w_1_3], [w_0_4, w_1_4] ],
# [ [w_2_5, w_3_5, w_4_5], [w_2_6, w_3_6, w_4_6] ],
# [ [w_5_7, w_6_7], [w_5_8, w_6_8] ]
# ]
#
# 可以在循环最内部,用pair.append(self.debug_weight_index(cnt_list, i, j, k))调试
#
cnt_list = np.append(np.array([_in_len]), np.append(_hiddens_len, _out_len))
#print cnt_list
for i in range(0, len(cnt_list)-1):
layer1_cnt, layer2_cnt, layer_weight = cnt_list[i], cnt_list[i + 1], []
for j in range(0, layer2_cnt):
pair = []
for k in range(0, layer1_cnt):
pair.append(rnd.uniform(-1.0, 1.0))
#pair.append(self.debug_weight_index(cnt_list, i, j, k))
layer_weight.append(pair)
self.weights.append(layer_weight)
# -------------------------------------------------------------------------
def input_weights_of(self, layer, index):
"""
得到在第layer层的第index个神经元的输入权重(左侧)列表。
layer 0 代表第一个隐藏层,index 0代表最上面的神经元
"""
return self.weights[layer][index]
# -------------------------------------------------------------------------
def input_weights_of_gid(self, _gid):
"通过gid得到输入权重列表"
return self.input_weights_of(self.gid_dict[_gid][0], self.gid_dict[_gid][1])
# -------------------------------------------------------------------------
def output_weights_of(self, layer, index):
"""
得到在第layer层的第index个神经元的输出权重(右侧)列表。
layer 0 代表第一个隐藏层,index 0代表最上面的神经元
"""
# 如果是输出层的神经元,返回空数组
if layer == self.hidden_layer_count:
return []
next_layer_weights, ret = self.weights[layer + 1], []
for group in next_layer_weights:
ret.append(group[index])
return ret
# -------------------------------------------------------------------------
def output_weights_of_gid(self, _gid):
"通过gid得到输出权重列表"
return self.output_weights_of(self.gid_dict[_gid][0], self.gid_dict[_gid][1])
# -------------------------------------------------------------------------
def pre_layer_outs(self, layer, input_vec):
"""
得到在第layer层的第index个神经元的<前一层>神经元的值的列表。
layer 0 代表第一个隐藏层,index 0代表最上面的神经元
"""
if layer == 0:
return input_vec
else:
pre_layer_neurons, ret = self.hidden_neurons[layer - 1], []
for neuron in pre_layer_neurons:
ret.append(neuron.output)
return ret
# -------------------------------------------------------------------------
def next_layer_errors(self, layer):
"得到第layer+1层的神经元的误差项列表"
# 如果是输出层的神经元,返回空数组
if layer >= self.hidden_layer_count:
return []
next_layer_neurons, ret = [], []
if layer == self.hidden_layer_count - 1:
next_layer_neurons = self.output_neurons
else:
next_layer_neurons = self.hidden_neurons[layer + 1]
for neuron in next_layer_neurons:
ret.append(neuron.error)
return ret
# -------------------------------------------------------------------------
def forward(self, input_vec):
"前向传播计算值的过程"
assert len(input_vec) == self.input_vec_len
self._output_vec = []
for i in range(0, len(self.hidden_neurons)):
hidden_layer = self.hidden_neurons[i]
for j in range(0, len(hidden_layer)):
neuron, input_values, input_weights = \
hidden_layer[j], self.pre_layer_outs(i, input_vec), self.input_weights_of(i, j)
neuron.count_out(input_weights, input_values)
for j in range(0, len(self.output_neurons)):
neuron, input_values, input_weights = self.output_neurons[j], \
self.pre_layer_outs(self.hidden_layer_count, input_vec), self.input_weights_of(self.hidden_layer_count, j)
neuron.count_out(input_weights, input_values)
self._output_vec.append(neuron.output)
return self._output_vec;
# -------------------------------------------------------------------------
def backward(self, _result_vec, _label_vec):
"后向传播误差项"
assert len(_result_vec) == len(_label_vec)
# 计算输出层误差项
for j in range(0, len(self.output_neurons)):
self.output_neurons[j].count_error(_label_vec[j])
# 从后向前计算隐藏层误差项
arr, length = self.hidden_neurons[::-1], len(self.hidden_neurons)
for i in range(0, length):
hidden_layer, layer_idx = arr[i], length - i - 1
for j in range(0, len(hidden_layer)):
neuron, output_weights, next_layer_errors = \
hidden_layer[j], self.output_weights_of(layer_idx, j), self.next_layer_errors(layer_idx)
neuron.count_error(output_weights, next_layer_errors)
# -------------------------------------------------------------------------
def update_param(self, _inpput_vec):
"调整误差和 bias 的值,可以放到 backward 函数里,单独写出来是为了看起来清晰"
# 调整输出层的输入权重和 bias
for j in range(0, len(self.output_neurons)):
neuron, input_values, input_weights = self.output_neurons[j], \
self.pre_layer_outs(self.hidden_layer_count, []), self.input_weights_of(self.hidden_layer_count, j)
neuron.update_bais(self.eta)
for i in range(0, len(input_weights)):
input_weights[i] -= self.eta * neuron.error * input_values[i]
# 更新 self.weights
self.weights[self.hidden_layer_count][j] = input_weights
# 调整隐藏层的输入权重和 bias
for i in range(0, len(self.hidden_neurons)):
hidden_layer = self.hidden_neurons[i]
for j in range(0, len(hidden_layer)):
neuron, input_values, input_weights =\
hidden_layer[j], self.pre_layer_outs(i, _inpput_vec), self.input_weights_of(i, j)
neuron.update_bais(self.eta)
for k in range(0, len(input_weights)):
input_weights[k] -= self.eta * neuron.error * input_values[k]
# 更新 self.weights
self.weights[i][j] = input_weights
# -------------------------------------------------------------------------
@staticmethod
def correct_rate(_vec1, _vec2):
"正确率"
assert len(_vec1) == len(_vec2)
counter = 0
for i in range(0, len(_vec1)):
if abs(_vec1[i] - _vec2[i]) < 0.5:
counter += 1
return float(counter) / float(len(_vec1))
# -------------------------------------------------------------------------
def total_error_var(self, _train_set):
total = 0
for smpl in _train_set:
total += (self.predict_labeled([smpl])[0] - smpl[1]) ** 2
return total / len(_train_set)
# -------------------------------------------------------------------------
def predict_labeled(self, _test_set):
"_test_set 每个元素,传入之前需要格式化成 [[x1, x2, ...], label]"
assert len(_test_set) > 0 and len(_test_set[0]) == 2
ret_vec, label_vec = [], []
for sample in _test_set:
ret_vec.append(self.forward(sample[0])[0])
label_vec.append(sample[1])
self._correct_rate = BPNN.correct_rate(ret_vec, label_vec)
return ret_vec
# -------------------------------------------------------------------------
@staticmethod
def sigsign(x):
if x >= 0.5:
return 1
else:
return 0
# -------------------------------------------------------------------------
def predict(self, _attr_set):
"只包含特征的测试集,[[x1, x2, ...], [x1, x2,...], ...]"
ret_vec = []
for point in _attr_set:
#ret_vec.append(self.forward(point))
ret_vec.append(BPNN.sigsign(self.forward(point)[0]))
return ret_vec
# -------------------------------------------------------------------------
def fit(self, _train_set, max_epochs = 10000):
"_train_set 每个元素,传入之前需要格式化成 [[x1, x2, ...], label]"
assert len(_train_set) > 0 and len(_train_set[0]) == 2
counter, go_through = 0, False
while (counter <= max_epochs):
if self.total_error_var(_train_set) <= self.threshold:
go_through = True
break
# 1. 随机选取一个测试样本
sample = rnd.choice(_train_set)
input_vec, label = sample[0], sample[1]
# 2. 前向计算值
y_hat = self.forward(input_vec)
# 3. 反向传播误差
self.backward([y_hat], [label])
# 4. 更新参数
self.update_param(input_vec)
counter += 1
return go_through
# -------------------------------------------------------------------------
# Debug Functions
# -------------------------------------------------------------------------
def debug(self):
"输出 debug 信息"
print ('Hidden Layers Length: ' + str(nn.hidden_layers_len))
print ('Hidden Neurons: ')
for layer_neurons in self.hidden_neurons:
for neuron in layer_neurons:
print (neuron.debug())
for neuron in self.output_neurons:
print (neuron.debug())
print (self.weights)
# -------------------------------------------------------------------------
def debug_weight_index(self, cnt_list, layer, next_layer_index, current_layer_index):
"""
测试用,生成权重下标。如连接3和8号神经元的权重返回 'w_3_8'
layer 是从输入层开始为0, 依次+1
调用方式:
在构造函数生成weights的循环最内部 pair.append(self.debug_weight_index(cnt_list, i, j, k))
"""
# 当前层起始位置
current_layer_start_index = 0
for i in range(0, layer):
current_layer_start_index += cnt_list[i]
# 下一层的起始位置
next_layer_start_index = current_layer_start_index + cnt_list[layer]
idx1, idx2 = current_layer_start_index + current_layer_index, \
next_layer_start_index + next_layer_index
#return 'w_%d_%d' % (idx1, idx2)
return float(idx1) + idx2 / 10.0 # temp test
if __name__ == '__main__':
X = [ [[0, 0], 0], [[0, 1], 1], [[1, 0], 1], [[1, 1], 0] ]
T = [[0.1, 0.085], [-0.12, 0.88], [1.13, 0.1], [0.8, 1.2]] # 期望输出:[0, 1, 1, 0]
nn = BPNN(2, 1, [2], 0.2)
# -----test-----
print (nn.fit(X))
print (nn._correct_rate)
print (nn.predict(T))
#nn.debug()