# -*- coding: utf-8 -*-
Created on Fri May 4 10:54:49 2018
@author: Wang
platform: win_10_x64
# -*- coding: utf-8 -*-
notepad++ Editor
# pydub 需要用到的模块
# pydub 安装教程:https://blog.csdn.net/qq_25883823/article/details/52749279
# github_API:https://github.com/jiaaro/pydub/blob/master/API.markdown
# 中文说明:https://blog.csdn.net/tyfbhlxd/article/details/72046552
# 数据链接:https://bhichallenge.med.auth.gr/ 我使用的是第110个数据
from pydub import AudioSegment
import os, re
wav_path = r"C:\Users\aixin\Desktop\lungsound\LungSoundFromICBHIchallenge\ICBHI_final_database\110_1p1_Al_sc_Meditron.wav"
txt_path = r"C:\Users\aixin\Desktop\lungsound\LungSoundFromICBHIchallenge\ICBHI_final_database\110_1p1_Al_sc_Meditron.txt"
save_path = r"C:\Users\aixin\Desktop\lungsound\LungSoundFromICBHIchallenge\database_segmentation"
wav = AudioSegment.from_wav(wav_path)
filename_wav = os.listdir(root_wav_path)
filename_txt = os.listdir(root_txt_path)
# 得到音频基本信息
from pydub import AudioSegment
# sound = AudioSegment.from_file("sound1.wav")
# loudness = sound.dBFS
# 取得音频文件音量分贝数
# channel_count = sound.channels
# 取得音频文件声道数
# bytes_per_sample = sound.sample_width
# 取得音频文件采样宽度
# frames_per_second = sound.frame_rate
# 取得音频文件采样频率
# loudness = sound.rms
# 获取音频音量大小,该值通常用来计算分贝数(dB= 20×lgX)
# assert sound.duration_seconds == (len(sound) / 1000.0)
# 取得音频的持续时间,同 len()
# number_of_frames_in_sound = sound.frame_count()
# number_of_frames_in_200ms_of_sound = sound.frame_count(ms=200)
# 取得音频的frame数量
with open(txt_path,'r') as f:
tag_00 = 0
tag_11 = 0
lines = f.readlines()
print(type(lines)) # list
for line in lines:
t = [float(i) for i in line.strip().split('\t')]
ts = [round(i*1000) for i in t] # 转换成毫秒, pydub中的标准时间为毫秒
if ts[2:] == [0, 0]:
# print(ts)
part1 = wav[ts[0]:ts[1]] # 把标签是0,0的切割出来
tag_00 += part1
part2 = wav[ts[0]:ts[1]]
tag_11 += part2
tag_00.export(save_path + "\\" + str(wav_name) + '_' + str('00') + '.wav', format="wav")
tag_11.export(save_path + "\\" + str(wav_name) + '_' + str('11') + '.wav', format="wav")
# default 100 ms crossfade
combined = sound1.append(sound2)
# 5000 ms crossfade
combined_with_5_sec_crossfade = sound1.append(sound2, crossfade=5000)
# no crossfade
no_crossfade1 = sound1.append(sound2, crossfade=0)
root_path = r"C:\Users\aixin\Desktop\lungsound\LungSoundFromICBHIchallenge\ICBHI_final_database"
i = 0
for each in os.listdir(root_path):
filename_wav = re.findall(r"(.*?)\.wav", each)
# filename_txt = re.findall(r"(.*?)\.txt", each)
if filename_wav:
i += 1
# if filename_wav == filename_txt:
# print(filename_wav)
# print(filename_txt)
# else:
# print("there is some filenames are not compitable")
# 最后的每一个文件保存的应该有两个位置:
# 这里的00代表一种病也没的,11代表其他的;
# 我不想分成四类了,只要两类搞定了,四类我肯定也会的。
# save_path + each_filename + str("00") + '.wav'
# save_path + each_filename + str("11") + '.wav'
# 预备函数
def endWith(*endstring):
ends = endstring
def run(s):
f = map(s.endswith,ends)
if True in f: return s
return run
def lss_seg(wav_path,wav_name):
lung_sound = AudioSegment.from_file(wav_path)
five_seconds = 5 * 1000
ten_seconds = 10 * 1000
fi = lung_sound[:ten_seconds]
mi = lung_sound[five_seconds:ten_seconds]
la = lung_sound[-five_seconds:]
fi.export(seg_path+'\\'+ wav_name[:-4] + '_'+ 'fi.wav', format="wav")
mi.export(seg_path+'\\'+ wav_name[:-4] + '_'+ 'mi.wav', format="wav")
la.export(seg_path+'\\'+ wav_name[:-4] + '_'+ 'la.wav', format="wav")
# fi.export(seg_path+'\\'+ wav_name[:-4] + '_'+ 'fi.wav', format="wav")
# mi.export(seg_path+'\\'+ wav_name[:-4] + '_'+ 'mi.wav', format="wav")
# la.export(seg_path+'\\'+ wav_name[:-4] + '_'+ 'la.wav', format="wav")
def get_wavedata(wav_name):
f = wave.open(filepath+'\\'+wav_name,'rb')
params = f.getparams()
nchannels, sampwidth, framerate, nframes = params[:4]
strData = f.readframes(nframes)
waveData = np.fromstring(strData,dtype=np.int16)
waveData = waveData*1.0/(max(abs(waveData)))
waveData = np.reshape(waveData,[nframes,nchannels]).T
return waveData, framerate
def save_new_img(spec_path):
fig = plt.figure()
fig.set_size_inches(0.5, 0.5)
plt.axis('off') # no axis
plt.axes([0., 0., 1., 1.], frameon=False, xticks=[], yticks=[]) # Remove the white edge
plt.specgram(waveData[0],Fs = framerate, scale_by_freq = True, sides = 'default')
plt.savefig(spec_path, bbox_inches=None, pad_inches=0)
for f in files:
if not f.endswith('.mp3'):
# Skip any non-MP3 files
mp3_file = os.path.join(path, f)
import os, re
import wave
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from pydub import AudioSegment
## 读取频谱图数据--得到X, y, numpy数据
image_base_path = r"C:\Users\aixin\Desktop\lungsound\LSS_seg_img"
filenames= os.listdir(image_base_path)
img0 = mpimg.imread(image_base_path +'\\'+ filenames[0])
img0 = np.expand_dims(img0,axis=0)
label = []
i = 0
j = 0
k = 0
for jpg_name in filenames:
if jpg_name.startswith('0_'):
i = i+1
j = j+1
img = mpimg.imread(image_base_path +'\\'+ jpg_name)
img = np.expand_dims(img,axis=0)
X = np.concatenate((img0,img),axis=0)
img0 = X
k = k+1
X_path = r"C:\Users\aixin\Desktop\lungsound\LSS_x_y\X.npy"
np.save(X_path, X)
yarray = np.array(label)
y_path = r"C:\Users\aixin\Desktop\lungsound\LSS_x_y\y.npy"
np.save(y_path, yarray)
## 读取5秒音频文件--生成频谱图--保存成无白边的jpg图像格式
filepath = r"C:\Users\aixin\Desktop\lungsound\LSS_seg"
save_path = r"C:\Users\aixin\Desktop\lungsound\LSS_seg_img"
filenames= os.listdir(filepath)
i = 0
j = 0
a = endWith('.wav')
f_file = filter(a,filenames)
for wav_name in f_file:
waveData,framerate = get_wavedata(wav_name)
if re.findall(r'正常',wav_name):
if re.findall(r'_fi',wav_name):
spec_path0_fi = save_path+'\\'+ '0_' + str(i) + '_fi' + '.jpg'
elif re.findall(r'_mi',wav_name):
spec_path0_mi = save_path+'\\'+ '0_' + str(i) + '_mi' + '.jpg'
spec_path0_la = save_path+'\\'+ '0_' + str(i) + '_la' + '.jpg'
i = i+1
if re.findall(r'_fi',wav_name):
spec_path1_fi = save_path+'\\'+ '1_' + str(j) + '_fi' + '.jpg'
elif re.findall(r'_mi',wav_name):
spec_path1_mi = save_path+'\\'+ '1_' + str(j) + '_mi' + '.jpg'
spec_path1_la = save_path+'\\'+ '1_' + str(j) + '_la' + '.jpg'
j = j+1
print('len of filenames:', len(filenames))
## 重新分数据,5秒一个数据集,原来是15秒长度,如:现在是原来数据的3倍、
filepath = r"C:\Users\aixin\Desktop\lungsound\LSS_Nname_545_both"
seg_path = r"C:\Users\aixin\Desktop\lungsound\LSS_seg"
filenames = os.listdir(filepath)
a = endWith('.wav')
f_file = filter(a,filenames)
for wav_name in f_file:
wav_path = filepath + '\\' + wav_name
# 读取频谱图数据--得到 x,y, numpy数据
import os,re
import cv2
import matplotlib.image as mpimg
import numpy as np
image_base_path = r"C:\Users\aixin\Desktop\lungsound\minst_fake"
filenames= os.listdir(image_base_path)
n = filenames.__len__()
# 获取图片的个数
l = []
label = []
i = 0
j = 0
k = 0
for jpg_name in filenames:
# img = cv2.imread(image_base_path +'\\'+ filenames[k])
img = mpimg.imread(image_base_path +'\\'+ filenames[k])
if re.findall(r'0_',jpg_name):
i = i+1
j = j+1
k = k+1
Xarray = np.array(l)
X_path = r"C:\Users\aixin\Desktop\lungsound\minst_fake\X.npy"
np.save(X_path, Xarray)
yarray = np.array(label)
y_path = r"C:\Users\aixin\Desktop\lungsound\minst_fake\y.npy"
np.save(y_path, yarray)
# 图像先升维,然后合并
a = np.expand_dims(np.random.randint(2,size=(2,3)), axis=0)
b = np.expand_dims(np.random.randint(2,size=(2,3)), axis=0)
c = np.concatenate((a,b),axis=0)
# 读取频谱图数据--得到y, numpy数据
filepath = r"C:\Users\aixin\Desktop\lungsound\LSS"
filenames = os.listdir(filepath)
a = endWith('.jpg')
f_file = filter(a,filenames)
i = 0
j = 0
label = []
for jpg_name in f_file:
if re.findall(r'0_',jpg_name):
i = i+1
j = j+1
y_label = np.array(label)
label_path = r"C:\Users\aixin\Desktop\lungsound\LSS\label.npy"
np.save(label_path, y_label)
########################## 以下部分使用待定 ############################
########################## 以下部分使用待定 ############################
# 读取音频文件--保存图像数据,并且切割成正方形
import os, re
import cv2
import wave
import matplotlib.pyplot as plt
import numpy as np
filepath = r"C:\Users\aixin\Desktop\lungsound\LSS"
filenames= os.listdir(filepath)
i = 0
j = 0
a = endWith('.wav')
f_file = filter(a,filenames)
for wav_name in f_file:
waveData,framerate = get_wavedata(wav_name)
if re.findall(r'正常',wav_name):
spec_path0 = filepath+'\\'+ '0_' + str(i) + '.jpg'
i = i+1
spec_path1 = filepath+'\\'+ '1_' + str(j) + '.jpg'
j = j+1
filepath = r"C:\Users\aixin\Desktop\lungsound\LSS"
filenames = os.listdir(filepath)
a = endWith('.jpg')
f_file = filter(a,filenames)
for jpg_name in f_file:
save_cropped_img(filepath+'\\'+jpg_name, filepath+'\\'+jpg_name)
# 画语谱图
# 获取音频信息:Wave_read.getparams用法:
import wave
import matplotlib.pyplot as plt
import numpy as np
import os
wav_path = r"C:\Users\aixin\Desktop\lungsound\LSS\FT___鼾音__李庆_男_74_000_000_现在吸烟_26_20150418101410_000163_025.wav"
filepath = r"C:\Users\aixin\Desktop\lungsound\LSS"
filenames= os.listdir(filepath) #得到文件夹下的所有文件名称
# f = wave.open(filepath+'\\' + filenames[1],'rb')
f = wave.open(wav_path,'rb')
params = f.getparams()
nchannels, sampwidth, framerate, nframes = params[:4]
strData = f.readframes(nframes)#读取音频,字符串格式
waveData = np.fromstring(strData,dtype=np.int16)#将字符串转化为int
waveData = waveData*1.0/(max(abs(waveData)))#wave幅值归一化
waveData = np.reshape(waveData,[nframes,nchannels]).T
# plot the wave
spec_path = filepath+'\\'+ '0000000000000000' + '.jpg'
plt.specgram(waveData[0],Fs = framerate, scale_by_freq = True, sides = 'default')
# plt.axis('tight')
plt.savefig(spec_path, dpi=100)
# plt.savefig(spec_path, dpi=80)
# plt.savefig(spec_path, dpi=50)
import audiosegment
print("Reading in the wave file...")
seg = audiosegment.from_file(filename)
print("Channels:", seg.channels)
print("Bits per sample:", seg.sample_width * 8)
print("Sampling frequency:", seg.frame_rate)
print("Length:", seg.duration_seconds, "seconds")
freqs, times, amplitudes = seg.spectrogram(window_length_s=0.03, overlap=0.5)
amplitudes = 10 * np.log10(amplitudes + 1e-9)
# Plot
plt.pcolormesh(times, freqs, amplitudes)
plt.xlabel("Time in Seconds")
plt.ylabel("Frequency in Hz")
# Some code was borrowed from
# https://github.com/petewarden/tensorflow_makefile/blob/master/tensorflow/models/image/mnist/convolutional.py
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy
import tensorflow as tf
import tensorflow.contrib.slim as slim
import ls_data
import cnn_model
MODEL_DIRECTORY = "model/model.ckpt"
LOGS_DIRECTORY = "logs/train"
# Params for Train
training_epochs = 10# 10 for augmented training data, 20 for training data
display_step = 100
validation_step = 500
# Params for test
def train():
# Some parameters
batch_size = TRAIN_BATCH_SIZE
num_labels = ls_data.NUM_LABELS
# Prepare ls_data data
train_total_data, train_size, validation_data, validation_labels, test_data, test_labels = ls_data.prepare_ls_data(True)
# Boolean for MODE of train or test
is_training = tf.placeholder(tf.bool, name='MODE')
# tf Graph input
x = tf.placeholder(tf.float32, [None, 784])
y_ = tf.placeholder(tf.float32, [None, 2]) #answer
# Predict
y = cnn_model.CNN(x)
# Get loss of model
with tf.name_scope("LOSS"):
loss = slim.losses.softmax_cross_entropy(y,y_)
# loss = slim.losses.softmax_cross_entropy(y,y_)
# Create a summary to monitor loss tensor
tf.scalar_summary('loss', loss)
# Define optimizer
with tf.name_scope("ADAM"):
# Optimizer: set up a variable that's incremented once per batch and
# controls the learning rate decay.
batch = tf.Variable(0)
# batch = tf.Variable(0)
# batch = tf.Variable(0)
# batch = tf.Variable(0)
learning_rate = tf.train.exponential_decay(
1e-4, # Base learning rate.
batch * batch_size, # Current index into the dataset.
train_size, # Decay step.
0.95, # Decay rate.
# Use simple momentum for the optimization.
train_step = tf.train.AdamOptimizer(learning_rate).minimize(loss,global_step=batch)
# Create a summary to monitor learning_rate tensor
tf.scalar_summary('learning_rate', learning_rate)
# Get accuracy of model
with tf.name_scope("ACC"):
correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
# accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
# accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
# none
# accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
# Create a summary to monitor accuracy tensor
tf.scalar_summary('acc', accuracy)
# Merge all summaries into a single op
merged_summary_op = tf.merge_all_summaries()
# Add ops to save and restore all the variables
saver = tf.train.Saver()
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer(), feed_dict={is_training: True})
# sess.run(tf.global_variables_initializer(), feed_dict={is_training: True})
# sess.run(tf.global_variables_initializer(), feed_dict={is_training: True})
# sess.run(tf.global_variables_initializer(), feed_dict={is_training: True})
# Training cycle
total_batch = int(train_size / batch_size)
# op to write logs to Tensorboard
summary_writer = tf.train.SummaryWriter(LOGS_DIRECTORY, graph=tf.get_default_graph())
# Save the maximum accuracy value for validation data
max_acc = 0.
# Loop for epoch
for epoch in range(training_epochs):
# Random shuffling
train_data_ = train_total_data[:, :-num_labels]
train_labels_ = train_total_data[:, -num_labels:]
# Loop over all batches
for i in range(total_batch):
# Compute the offset of the current minibatch in the data.
offset = (i * batch_size) % (train_size)
batch_xs = train_data_[offset:(offset + batch_size), :]
batch_ys = train_labels_[offset:(offset + batch_size), :]
# Run optimization op (backprop), loss op (to get loss value)
# and summary nodes
_, train_accuracy, summary = sess.run([train_step, accuracy, merged_summary_op] , feed_dict={x: batch_xs, y_: batch_ys, is_training: True})
# Write logs at every iteration
summary_writer.add_summary(summary, epoch * total_batch + i)
# Display logs
if i % display_step == 0:
print("Epoch:", '%04d,' % (epoch + 1),
"batch_index %4d/%4d, training accuracy %.5f" % (i, total_batch, train_accuracy))
# Get accuracy for validation data
if i % validation_step == 0:
# Calculate accuracy
validation_accuracy = sess.run(accuracy,
feed_dict={x: validation_data, y_: validation_labels, is_training: False})
print("Epoch:", '%04d,' % (epoch + 1),
"batch_index %4d/%4d, validation accuracy %.5f" % (i, total_batch, validation_accuracy))
# Save the current model if the maximum accuracy is updated
if validation_accuracy > max_acc:
max_acc = validation_accuracy
save_path = saver.save(sess, MODEL_DIRECTORY)
print("Model updated and saved in file: %s" % save_path)
print("Optimization Finished!")
# Restore variables from disk
saver.restore(sess, MODEL_DIRECTORY)
# Calculate accuracy for all ls_data test images
test_size = test_labels.shape[0]
batch_size = TEST_BATCH_SIZE
total_batch = int(test_size / batch_size)
acc_buffer = []
# Loop over all batches
for i in range(total_batch):
# Compute the offset of the current minibatch in the data.
offset = (i * batch_size) % (test_size)
batch_xs = test_data[offset:(offset + batch_size), :]
batch_ys = test_labels[offset:(offset + batch_size), :]
y_final = sess.run(y, feed_dict={x: batch_xs, y_: batch_ys, is_training: False})
correct_prediction = numpy.equal(numpy.argmax(y_final, 1), numpy.argmax(batch_ys, 1))
acc_buffer.append(numpy.sum(correct_prediction) / batch_size)
print("test accuracy for the stored model: %g" % numpy.mean(acc_buffer))
if __name__ == '__main__':
# Some code was borrowed from
# https://github.com/petewarden/tensorflow_makefile/blob/master/tensorflow/models/image/mnist/convolutional.py
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
import tensorflow.contrib.slim as slim
# Create model of CNN with slim api
def CNN(inputs, is_training=True):
batch_norm_params = {'is_training': is_training, 'decay': 0.9, 'updates_collections': None}
with slim.arg_scope([slim.conv2d, slim.fully_connected],
x = tf.reshape(inputs, [-1, 28, 28, 1])
# For slim.conv2d, default argument values are like
# normalizer_fn = None, normalizer_params = None, <== slim.arg_scope changes these arguments
# padding='SAME', activation_fn=nn.relu,
# weights_initializer = initializers.xavier_initializer(),
# biases_initializer = init_ops.zeros_initializer,
net = slim.conv2d(x, 32, [5, 5], scope='conv1')
net = slim.max_pool2d(net, [2, 2], scope='pool1')
net = slim.conv2d(net, 64, [5, 5], scope='conv2')
net = slim.max_pool2d(net, [2, 2], scope='pool2')
net = slim.flatten(net, scope='flatten3')
# For slim.fully_connected, default argument values are like
# activation_fn = nn.relu,
# normalizer_fn = None, normalizer_params = None, <== slim.arg_scope changes these arguments
# weights_initializer = initializers.xavier_initializer(),
# biases_initializer = init_ops.zeros_initializer,
net = slim.fully_connected(net, 1024, scope='fc3')
net = slim.dropout(net, is_training=is_training, scope='dropout3')
# 0.5 by default
outputs = slim.fully_connected(net, 10, activation_fn=None, normalizer_fn=None, scope='fco')
return outputs
# Some code was borrowed from https://github.com/petewarden/tensorflow_makefile/blob/master/tensorflow/models/image/mnist/convolutional.py
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy
import os
import tensorflow as tf
import tensorflow.contrib.slim as slim
import ls_data
import cnn_model
# user input
from argparse import ArgumentParser
# refernce argument values
# build parser
def build_parser():
parser = ArgumentParser()
dest='model_directory', help='directory where model to be tested is stored',
metavar='MODEL_DIRECTORY', required=True)
parser.add_argument('--batch-size', type=int,
dest='batch_size', help='batch size for test',
metavar='TEST_BATCH_SIZE', required=True)
dest='ensemble', help='boolean for usage of ensemble',
metavar='ENSEMBLE', required=True)
return parser
# test with test data given by ls_data.py
def test(model_directory, batch_size):
# Import data
ls = input_data.read_data_sets('data/', one_hot=True)
is_training = tf.placeholder(tf.bool, name='MODE')
# tf Graph input
x = tf.placeholder(tf.float32, [None, 784])
y_ = tf.placeholder(tf.float32, [None, 2]) # answer
y = cnn_model.CNN(x, is_training=is_training)
# Add ops to save and restore all the variables
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer(), feed_dict={is_training: True})
# Restore variables from disk
saver = tf.train.Saver()
# Calculate accuracy for all ls_data test images
test_size = ls.test.num_examples
total_batch = int(test_size / batch_size)
saver.restore(sess, model_directory)
acc_buffer = []
# Loop over all batches
for i in range(total_batch):
batch = ls.test.next_batch(batch_size)
batch_xs = (batch[0] - (PIXEL_DEPTH / 2.0) / PIXEL_DEPTH) # make zero-centered distribution as in ls_data.extract_data()
batch_ys = batch[1]
y_final = sess.run(y, feed_dict={x: batch_xs, y_: batch_ys, is_training: False})
correct_prediction = numpy.equal(numpy.argmax(y_final, 1), numpy.argmax(batch_ys, 1))
acc_buffer.append(numpy.sum(correct_prediction) / batch_size)
print("test accuracy for the stored model: %g" % numpy.mean(acc_buffer))
# test with test data given by ls_data.py
def test_org(model_directory, batch_size):
# Import data
train_total_data, train_size, validation_data, validation_labels, test_data, test_labels = ls_data.prepare_ls_data(
is_training = tf.placeholder(tf.bool, name='MODE')
# tf Graph input
x = tf.placeholder(tf.float32, [None, 784])
y_ = tf.placeholder(tf.float32, [None, 2]) # answer
y = cnn_model.CNN(x, is_training=is_training)
# Add ops to save and restore all the variables
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer(), feed_dict={is_training: True})
# Restore variables from disk
saver = tf.train.Saver()
# Calculate accuracy for all ls_data test images
test_size = test_labels.shape[0]
total_batch = int(test_size / batch_size)
saver.restore(sess, model_directory)
acc_buffer = []
# Loop over all batches
for i in range(total_batch):
# Compute the offset of the current minibatch in the data.
offset = (i * batch_size) % (test_size)
batch_xs = test_data[offset:(offset + batch_size), :]
batch_ys = test_labels[offset:(offset + batch_size), :]
y_final = sess.run(y, feed_dict={x: batch_xs, y_: batch_ys, is_training: False})
correct_prediction = numpy.equal(numpy.argmax(y_final, 1), numpy.argmax(batch_ys, 1))
acc_buffer.append(numpy.sum(correct_prediction) / batch_size)
print("test accuracy for the stored model: %g" % numpy.mean(acc_buffer))
# For a given matrix, each row is converted into a one-hot row vector
def one_hot_matrix(a):
a_ = numpy.zeros_like(a)
for i, j in zip(numpy.arange(a.shape[0]), numpy.argmax(a, 1)): a_[i, j] = 1
return a_
# test with test data given by ls_data.py
def test_ensemble(model_directory_list, batch_size):
# Import data
ls = input_data.read_data_sets('data/', one_hot=True)
is_training = tf.placeholder(tf.bool, name='MODE')
# tf Graph input
x = tf.placeholder(tf.float32, [None, 784])
y_ = tf.placeholder(tf.float32, [None, 2]) # answer
y = cnn_model.CNN(x, is_training=is_training)
# Add ops to save and restore all the variables
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer(), feed_dict={is_training: True})
# Restore variables from disk
saver = tf.train.Saver()
# Calculate accuracy for all ls_data test images
test_size = ls.test.num_examples
total_batch = int(test_size / batch_size)
acc_buffer = []
# Loop over all batches
for i in range(total_batch):
batch = ls.test.next_batch(batch_size)
batch_xs = (batch[0] - (PIXEL_DEPTH / 2.0) / PIXEL_DEPTH) # make zero-centered distribution as in ls_data.extract_data()
batch_ys = batch[1]
y_final = numpy.zeros_like(batch_ys)
for dir in model_directory_list:
saver.restore(sess, dir+'/model.ckpt')
pred = sess.run(y, feed_dict={x: batch_xs, y_: batch_ys, is_training: False})
y_final += one_hot_matrix(pred)
# take a majority vote as an answer
# note
correct_prediction = numpy.equal(numpy.argmax(y_final, 1), numpy.argmax(batch_ys, 1))
acc_buffer.append(numpy.sum(correct_prediction) / batch_size)
print("test accuracy for the stored model: %g" % numpy.mean(acc_buffer))
if __name__ == '__main__':
# Parse argument
parser = build_parser()
options = parser.parse_args()
ensemble = options.ensemble
model_directory = options.model_directory
batch_size = options.batch_size
# Select ensemble test or a single model test
if ensemble=='True': # use ensemble model
model_directory_list = [x[0] for x in os.walk(model_directory)]
test_ensemble(model_directory_list[1:], batch_size)
else: # test a single model
# test_org(model_directory,
# batch_size) #test with test data given by ls_data.py
batch_size) # test with test data given by tensorflow.examples.tutorials.ls.input_data()
# Some code was borrowed from
# https://github.com/petewarden/tensorflow_makefile/blob/master/tensorflow/models/image/2/convolutional.py
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import gzip
import os
import numpy
from scipy import ndimage
from six.moves import urllib
from sklearn.model_selection import train_test_split
import tensorflow as tf
# Params for ls_data
VALIDATION_SIZE = 5000 # Size of the validation set.
# Extract the images
def extract_data(filename, num_images):
"""Extract the images into a 4D tensor [image index, y, x, channels].
Values are rescaled from [0, 255] down to [-0.5, 0.5].
print('Extracting', filename)
with gzip.open(filename) as bytestream:
buf = bytestream.read(IMAGE_SIZE * IMAGE_SIZE * num_images * NUM_CHANNELS)
data = numpy.frombuffer(buf, dtype=numpy.uint8).astype(numpy.float32)
data = (data - (PIXEL_DEPTH / 2.0)) / PIXEL_DEPTH
data = data.reshape(num_images, IMAGE_SIZE, IMAGE_SIZE, NUM_CHANNELS)
data = numpy.reshape(data, [num_images, -1])
return data
# Extract the labels
def extract_labels(filename, num_images):
"""Extract the labels into a vector of int64 label IDs."""
print('Extracting', filename)
with gzip.open(filename) as bytestream:
buf = bytestream.read(1 * num_images)
labels = numpy.frombuffer(buf, dtype=numpy.uint8).astype(numpy.int64)
num_labels_data = len(labels)
one_hot_encoding = numpy.zeros((num_labels_data,NUM_LABELS))
one_hot_encoding[numpy.arange(num_labels_data),labels] = 1
one_hot_encoding = numpy.reshape(one_hot_encoding, [-1, NUM_LABELS])
return one_hot_encoding
# Augment training data
def expend_training_data(images, labels):
expanded_images = []
expanded_labels = []
j = 0 # counter
for x, y in zip(images, labels):
j = j+1
if j%100==0:
print ('expanding data : %03d / %03d' % (j,numpy.size(images,0)))
# register original data
# get a value for the background
# zero is the expected value,
# but median() is used to estimate background's value
bg_value = numpy.median(x)
# this is regarded as background's value
image = numpy.reshape(x, (-1, 28))
for i in range(4):
# rotate the image with random degree
angle = numpy.random.randint(-15,15,1)
new_img = ndimage.rotate(image,angle,reshape=False, cval=bg_value)
# shift the image with random distance
shift = numpy.random.randint(-2, 2, 2)
new_img_ = ndimage.shift(new_img,shift, cval=bg_value)
# register new training data
expanded_images.append(numpy.reshape(new_img_, 784))
# images and labels are concatenated
# for random-shuffle at each epoch
# notice that pair of image
# and label should not be broken
expanded_train_total_data = numpy.concatenate((expanded_images, expanded_labels), axis=1)
return expanded_train_total_data
# Prepare data
def prepare_ls_data(use_data_augmentation=True):
# Get the data.
X = np.load(r"C:\Users\aixin\Desktop\lungsound\LSS_x_y\X.npy")
y = np.load(r"C:\Users\aixin\Desktop\lungsound\LSS_x_y\y.npy")
X = X[:-1]
y = y[:-1]
print('Cutted data shape: ', X.shape)
print('Cutted data shape: ', y.shape)
Cutted data shape: (230, 36, 36, 3)
Cutted data shape: (230,)
# 避免过拟合,采用交叉验证
# 验证集占训练集30%,固定随机种子(random_state)
train_data , train_labels, test_data ,test_labels
= train_test_split(X, y,test_size=0.302, random_state=40)
# As a sanity check,
# we print out the size of the training and test data.
print('Training data shape: ', train_data.shape)
print('Training labels shape: ', train_labels.shape)
print('Test data shape: ', test_data.shape)
print('Test labels shape: ', test_labels.shape)
# Split the data into train, val, and test sets. In addition we will
# create a small development set as a subset of the training data;
# we can use this for development so our code runs faster.
# Generate a validation set.
validation_data = train_data[:VALIDATION_SIZE, :]
validation_labels = train_labels[:VALIDATION_SIZE,:]
train_data = train_data[VALIDATION_SIZE:, :]
train_labels = train_labels[VALIDATION_SIZE:,:]
# Concatenate train_data & train_labels for random shuffle
if use_data_augmentation:
train_total_data = expend_training_data(train_data, train_labels)
train_total_data = numpy.concatenate((train_data, train_labels), axis=1)
train_size = train_total_data.shape[0]
return train_total_data, train_size, validation_data, validation_labels, test_data, test_labels
import skimage.data
import numpy
import matplotlib
import numpycnn
The project is tested using Python 3.5.2 installed inside Anaconda 4.2.0 (64-bit)
NumPy version used is 1.14.0
NumPy version used is 1.14.0
# Reading the image
#img = skimage.io.imread("test.jpg")
#img = skimage.data.checkerboard()
img = skimage.data.chelsea()
#img = skimage.data.camera()
# Converting the image into gray.
img = skimage.color.rgb2gray(img)
# First conv layer
#l1_filter = numpy.random.rand(2,7,7)*20
# Preparing the filters randomly.
l1_filter = numpy.zeros((2,3,3))
l1_filter[0, :, :] = numpy.array([[[-1, 0, 1],
[-1, 0, 1],
[-1, 0, 1]]])
l1_filter[1, :, :] = numpy.array([[[1, 1, 1],
[0, 0, 0],
[-1, -1, -1]]])
print("\n**Working with conv layer 1**")
l1_feature_map = numpycnn.conv(img, l1_filter)
l1_feature_map_relu = numpycnn.relu(l1_feature_map)
l1_feature_map_relu_pool = numpycnn.pooling(l1_feature_map_relu, 2, 2)
print("**End of conv layer 1**\n")
# Second conv layer
l2_filter = numpy.random.rand(3, 5, 5, l1_feature_map_relu_pool.shape[-1])
print("\n**Working with conv layer 2**")
l2_feature_map = numpycnn.conv(l1_feature_map_relu_pool, l2_filter)
l2_feature_map_relu = numpycnn.relu(l2_feature_map)
l2_feature_map_relu_pool = numpycnn.pooling(l2_feature_map_relu, 2, 2)
print("**End of conv layer 2**\n")
# Third conv layer
l3_filter = numpy.random.rand(1, 7, 7, l2_feature_map_relu_pool.shape[-1])
print("\n**Working with conv layer 3**")
l3_feature_map = numpycnn.conv(l2_feature_map_relu_pool, l3_filter)
l3_feature_map_relu = numpycnn.relu(l3_feature_map)
l3_feature_map_relu_pool = numpycnn.pooling(l3_feature_map_relu, 2, 2)
print("**End of conv layer 3**\n")
# Graphing
# results
fig0, ax0 = matplotlib.pyplot.subplots(nrows=1, ncols=1)
ax0.set_title("Input Image")
matplotlib.pyplot.savefig("in_img.png", bbox_inches="tight")
# Layer 1
fig1, ax1 = matplotlib.pyplot.subplots(nrows=3, ncols=2)
ax1[0, 0].imshow(l1_feature_map[:, :, 0]).set_cmap("gray")
ax1[0, 0].get_xaxis().set_ticks([])
ax1[0, 0].get_yaxis().set_ticks([])
ax1[0, 0].set_title("L1-Map1")
ax1[0, 1].imshow(l1_feature_map[:, :, 1]).set_cmap("gray")
ax1[0, 1].get_xaxis().set_ticks([])
ax1[0, 1].get_yaxis().set_ticks([])
ax1[0, 1].set_title("L1-Map2")
ax1[1, 0].imshow(l1_feature_map_relu[:, :, 0]).set_cmap("gray")
ax1[1, 0].get_xaxis().set_ticks([])
ax1[1, 0].get_yaxis().set_ticks([])
ax1[1, 0].set_title("L1-Map1ReLU")
ax1[1, 1].imshow(l1_feature_map_relu[:, :, 1]).set_cmap("gray")
ax1[1, 1].get_xaxis().set_ticks([])
ax1[1, 1].get_yaxis().set_ticks([])
ax1[1, 1].set_title("L1-Map2ReLU")
ax1[2, 0].imshow(l1_feature_map_relu_pool[:, :, 0]).set_cmap("gray")
ax1[2, 0].get_xaxis().set_ticks([])
ax1[2, 0].get_yaxis().set_ticks([])
ax1[2, 0].set_title("L1-Map1ReLUPool")
ax1[2, 1].imshow(l1_feature_map_relu_pool[:, :, 1]).set_cmap("gray")
ax1[2, 0].get_xaxis().set_ticks([])
ax1[2, 0].get_yaxis().set_ticks([])
ax1[2, 1].set_title("L1-Map2ReLUPool")
matplotlib.pyplot.savefig("L1.png", bbox_inches="tight")
# Layer 2
fig2, ax2 = matplotlib.pyplot.subplots(nrows=3, ncols=3)
ax2[0, 0].imshow(l2_feature_map[:, :, 0]).set_cmap("gray")
ax2[0, 0].get_xaxis().set_ticks([])
ax2[0, 0].get_yaxis().set_ticks([])
ax2[0, 0].set_title("L2-Map1")
ax2[0, 1].imshow(l2_feature_map[:, :, 1]).set_cmap("gray")
ax2[0, 1].get_xaxis().set_ticks([])
ax2[0, 1].get_yaxis().set_ticks([])
ax2[0, 1].set_title("L2-Map2")
ax2[0, 2].imshow(l2_feature_map[:, :, 2]).set_cmap("gray")
ax2[0, 2].get_xaxis().set_ticks([])
ax2[0, 2].get_yaxis().set_ticks([])
ax2[0, 2].set_title("L2-Map3")
ax2[1, 0].imshow(l2_feature_map_relu[:, :, 0]).set_cmap("gray")
ax2[1, 0].get_xaxis().set_ticks([])
ax2[1, 0].get_yaxis().set_ticks([])
ax2[1, 0].set_title("L2-Map1ReLU")
ax2[1, 1].imshow(l2_feature_map_relu[:, :, 1]).set_cmap("gray")
ax2[1, 1].get_xaxis().set_ticks([])
ax2[1, 1].get_yaxis().set_ticks([])
ax2[1, 1].set_title("L2-Map2ReLU")
ax2[1, 2].imshow(l2_feature_map_relu[:, :, 2]).set_cmap("gray")
ax2[1, 2].get_xaxis().set_ticks([])
ax2[1, 2].get_yaxis().set_ticks([])
ax2[1, 2].set_title("L2-Map3ReLU")
ax2[2, 0].imshow(l2_feature_map_relu_pool[:, :, 0]).set_cmap("gray")
ax2[2, 0].get_xaxis().set_ticks([])
ax2[2, 0].get_yaxis().set_ticks([])
ax2[2, 0].set_title("L2-Map1ReLUPool")
ax2[2, 1].imshow(l2_feature_map_relu_pool[:, :, 1]).set_cmap("gray")
ax2[2, 1].get_xaxis().set_ticks([])
ax2[2, 1].get_yaxis().set_ticks([])
ax2[2, 1].set_title("L2-Map2ReLUPool")
ax2[2, 2].imshow(l2_feature_map_relu_pool[:, :, 2]).set_cmap("gray")
ax2[2, 2].get_xaxis().set_ticks([])
ax2[2, 2].get_yaxis().set_ticks([])
ax2[2, 2].set_title("L2-Map3ReLUPool")
matplotlib.pyplot.savefig("L2.png", bbox_inches="tight")
# Layer 3
fig3, ax3 = matplotlib.pyplot.subplots(nrows=1, ncols=3)
ax3[0].imshow(l3_feature_map[:, :, 0]).set_cmap("gray")
ax3[1].imshow(l3_feature_map_relu[:, :, 0]).set_cmap("gray")
ax3[2].imshow(l3_feature_map_relu_pool[:, :, 0]).set_cmap("gray")
matplotlib.pyplot.savefig("L3.png", bbox_inches="tight")
# -*- coding: utf-8 -*-
Created on Fri May 4 10:54:49 2018
@author: aixin
import numpy as np
from sklearn.model_selection import train_test_split
X = np.load(r"C:\Users\aixin\Desktop\lungsound\LSS_x_y\X.npy")
y = np.load(r"C:\Users\aixin\Desktop\lungsound\LSS_x_y\y.npy")
X = X[:-1]
y = y[:-1]
print('Cutted data shape: ', X.shape)
print('Cutted data shape: ', y.shape)
Cutted data shape: (230, 36, 36, 3)
Cutted data shape: (230,)
# 避免过拟合,采用交叉验证,# 验证集占训练集30%,固定随机种子(random_state)
train_data , train_labels, test_data ,test_labels
= train_test_split(X, y,test_size=0.302, random_state=40)
# As a sanity check, we print out the size of the training and test data.
print('Training data shape: ', X_train.shape)
print('Training labels shape: ', y_train.shape)
print('Test data shape: ', X_test.shape)
print('Test labels shape: ', y_test.shape)
# Split the data into train,
# val, and test sets. In addition we will
# create a small development
# set as a subset of the training data;
# we can use this for
# development so our code runs faster.
num_training = 140 # 训练数据
num_validation = 20 # 验证数据
num_test = 10 # 测试数据
num_dev = 5 # small development 数据
# Our validation set will be
# num_validation points from the original
# training set.
mask = range(num_training, num_training + num_validation) # 49000-50000的数据
X_val = X_train[mask]
y_val = y_train[mask]
# Our training set will be the first num_train points from the original
# training set.
mask = range(num_training)
X_train = X_train[mask]
y_train = y_train[mask]
# We will also make a
# development set, which is a small subset of
# the training set.
mask = np.random.choice(num_training, num_dev, replace=False)
X_dev = X_train[mask]
y_dev = y_train[mask]
# We use the first num_test points of the original test set as our
# test set.
mask = range(num_test)
X_test = X_test[mask]
y_test = y_test[mask]
print('Train data shape: ', X_train.shape)
print('Train labels shape: ', y_train.shape)
print('Validation data shape: ', X_val.shape)
print('Validation labels shape: ', y_val.shape)
print('Test data shape: ', X_test.shape)
print('Test labels shape: ', y_test.shape)
X_train = np.hstack([X_train, np.ones((X_train.shape[0], 1))])
X_val = np.hstack([X_val, np.ones((X_val.shape[0], 1))])
X_test = np.hstack([X_test, np.ones((X_test.shape[0], 1))])
X_dev = np.hstack([X_dev, np.ones((X_dev.shape[0], 1))])
print(X_train.shape, X_val.shape, X_test.shape, X_dev.shape)
# (140, 3889) (20, 3889) (10, 3889) (5, 3889)
# generate a random
# SVM weight matrix of small numbers
from classifiers.linear_svm import svm_loss_naive
W = np.random.randn(X_train.shape[1], 10) * 0.0001
loss, grad = svm_loss_naive(W, X_dev, y_dev, 0.00005)
print('loss: %f' % (loss, ))
# In the file linear_classifier.py, implement SGD in the function
# LinearClassifier.train() and then run it with the code below.
from classifiers import LinearSVM
svm = LinearSVM()
tic = time.time()
loss_hist = svm.train(X_train, y_train, learning_rate=1e-7, reg=2.5e4,
num_iters=1500, verbose=True)
toc = time.time()
print('That took %fs' % (toc - tic))
plt.xlabel('Iteration number')
plt.ylabel('Loss value')
# training and validation set
y_train_pred = svm.predict(X_train) # 49000x3073
print('training accuracy: %f' % (np.mean(y_train == y_train_pred), ))
y_val_pred = svm.predict(X_val) # 1000x3073
print('validation accuracy: %f' % (np.mean(y_val == y_val_pred), ))
training accuracy: 0.814286
validation accuracy: 0.850000
import tensorflow as tf
import numpy as np
import image
def weight_variable(shape, dtype, name):
initial = tf.truncated_normal(shape = shape, stddev = 0.1, dtype = dtype, name = name)
return tf.Variable(initial)
def bias_variable(shape, dtype, name):
initial = tf.constant(0.1, shape = shape, dtype = dtype, name = name)
return tf.Variable(initial)
def conv2d(x, W):
return tf.nn.conv2d(x, W, strides = [1, 1, 1, 1], padding = 'SAME')
def max_pool_2x2(x):
return tf.nn.max_pool(x, ksize = [1, 2, 2, 1], strides = [1, 2, 2, 1], padding = 'SAME')
# Lungsound_path = r"C:\Users\aixin\Desktop\lungsound"
Lungsound_path = r"C:\Users\aixin\Desktop\lungsound"
Lungsound = input_data.read_data_sets(Lungsound_path, one_hot = True)
x = tf.placeholder("float", [None, 784])
y = tf.placeholder("float", [None, 2])
x_image = tf.reshape(x, [-1, 28, 28, 1])
# convolution 1
weight_conv1 = weight_variable([5, 5, 1, 32], dtype = "float", name = 'weight_conv1')
bias_conv1 = bias_variable([32], dtype = "float", name = 'bias_conv1')
hidden_conv1 = tf.nn.relu(conv2d(x_image, weight_conv1) + bias_conv1)
hidden_pool1 = max_pool_2x2(hidden_conv1)
# convolution 2
weight_conv2 = weight_variable([5, 5, 32, 64], dtype = "float", name = 'weight_conv2')
bias_conv2 = bias_variable([64], dtype = "float", name = 'bias_conv2')
hidden_conv2 = tf.nn.relu(conv2d(hidden_pool1, weight_conv2) + bias_conv2)
hidden_pool2 = max_pool_2x2(hidden_conv2)
# function 1
hidden_pool2_flat = tf.reshape(hidden_pool2, [-1, 7 * 7 * 64])
weight_fc1 = weight_variable([7 * 7 * 64, 1024], dtype = "float", name = 'weight_fc1')
bias_fc1 = bias_variable([1024], dtype = "float", name = 'bias_fc1')
hidden_fc1 = tf.nn.relu(tf.matmul(hidden_pool2_flat, weight_fc1) + bias_fc1)
keep_prob = tf.placeholder("float")
hidden_fc1_dropout = tf.nn.dropout(hidden_fc1, keep_prob)
# function 2
weight_fc2 = weight_variable([1024, 2], dtype = "float", name = 'weight_fc2')
bias_fc2 = bias_variable([2], dtype = "float", name = 'weight_fc2')
y_fc2 = tf.nn.softmax(tf.matmul(hidden_fc1_dropout, weight_fc2) + bias_fc2)
# create tensorflow structure
cross_entropy = -tf.reduce_sum(y * tf.log(y_fc2))
optimize = tf.train.AdamOptimizer(0.0001)
train = optimize.minimize(cross_entropy)
correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_fc2, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))
# initial all variables
init = tf.initialize_all_variables()
session = tf.Session()
# train
def Train() :
for i in range(5000):
batch = Lungsound.train.next_batch(50)
session.run(train, feed_dict = {x:batch[0], y:batch[1], keep_prob:0.5})
if i % 100 == 0:
print("step %4d: " % i)
print(session.run(accuracy, feed_dict = {x:batch[0], y:batch[1], keep_prob:1}))
print(session.run(accuracy, feed_dict = {x:Lungsound.test.images, y:Lungsound.test.labels, keep_prob:1}))
# save variables
def save() :
saver = tf.train.Saver()
saver.save(session, save_path)
# restore variables
def restore() :
saver = tf.train.Saver()
saver.restore(session, save_path)
def getTestPicArray(filename) :
im = Image.open(filename)
x_s = 28
y_s = 28
out = im.resize((x_s, y_s), Image.ANTIALIAS)
im_arr = np.array(out.convert('L'))
num0 = 0
num255 = 0
threshold = 100
for x in range(x_s):
for y in range(y_s):
if im_arr[x][y] > threshold :
num255 = num255 + 1
else :
num0 = num0 + 1
if(num255 > num0) :
for x in range(x_s):
for y in range(y_s):
im_arr[x][y] = 255 - im_arr[x][y]
if(im_arr[x][y] < threshold) :
im_arr[x][y] = 0
out = Image.fromarray(np.uint8(im_arr))
out.save(filename.split('/')[0] + '/28pix/' + filename.split('/')[1])
#print im_arr
nm = im_arr.reshape((1, 784))
nm = nm.astype(np.float32)
nm = np.multiply(nm, 1.0 / 255.0)
return nm
def testMyPicture() :
testNum = input("input the number of test picture:")
for i in range(testNum) :
testPicture = raw_input("input the test picture's path:")
oneTestx = getTestPicArray(testPicture)
ans = tf.argmax(y_fc2, 1)
print("The prediction answer is:")
print(session.run(ans, feed_dict = {x:oneTestx, keep_prob:1}))
save_path = "network/cnn.ckpt"
# train...........model
#save ...........model
import numpy as np
from random import shuffle
def softmax_loss_naive(W, X, y, reg):
Softmax loss function, naive implementation (with loops)
Inputs have dimension D, there are C classes, and we operate on minibatches
of N examples.
- W: A numpy array of shape (D, C) containing weights.
- X: A numpy array of shape (N, D) containing a minibatch of data.
- y: A numpy array of shape (N,) containing training labels; y[i] = c means
that X[i] has label c, where 0 <= c < C.
- reg: (float) regularization strength
Returns a tuple of:
- loss as single float
- gradient with respect to weights W; an array of same shape as W
# Initialize the loss and gradient to zero.
loss = 0.0
dW = np.zeros_like(W)
# TODO: Compute the softmax loss and its gradient using explicit loops. #
# Store the loss in loss and the gradient in dW. If you are not careful #
# here, it is easy to run into numeric instability. Don't forget the #
# regularization! #
# pass
# num_train = X.shape[0]
# num_classes = W.shape[1]
# for i in range(num_train):
# scores = X[i].dot(W)
# prevent_explo_scores = scores - max(scores)
# 这里减去最大值是防止数值爆炸
# loss_i = - prevent_explo_scores[y[i]] + np.log(sum(np.exp(prevent_explo_scores)))
# loss += loss_i
# for j in range(num_classes):
# softmax_output = np.exp(prevent_explo_scores[j]) / sum(np.exp(prevent_explo_scores))
# if j == y[i]:
# dW[:, j] += (-1 + softmax_output) * X[i]
# else:
# dW[:, j] =softmax_output * X[i]
# loss /= num_train
# loss += 0.5 *reg *np.sum(W *W)
# dW = dW/num_train + reg *W
########################另一个 GitHub-observerspy 的办法#####################
num_classes = W.shape[1]
num_train = X.shape[0]
loss = 0.0
for i in range(num_train):
scores = X[i].dot(W)
correct_class_score = scores[y[i]]
exp_sum = np.sum(np.exp(scores))
loss += np.log(exp_sum) - correct_class_score
dW[:, y[i]] += -X[i]
for j in range(num_classes):
dW[:, j] += (np.exp(scores[j]) / exp_sum) * X[i]
loss /= num_train
dW /= num_train
loss += 0.5 *reg *np.sum(W*W)
dW += reg*W
return loss, dW
def softmax_loss_vectorized(W, X, y, reg):
Softmax loss function, vectorized version.
Inputs and outputs are the same as softmax_loss_naive.
# Initialize the loss and gradient to zero.
loss = 0.0
dW = np.zeros_like(W)
# TODO: Compute the softmax loss and its gradient using no explicit loops. #
# Store the loss in loss and the gradient in dW. If you are not careful #
# here, it is easy to run into numeric instability. Don't forget the #
# regularization! #
# pass
loss = 0.0
num_classes = W.shape[1] # C 10
num_train = X.shape[0] # N 49000
scores = X.dot(W) # NxD * DxC = NxC 49000*10
prevent_explo_scores = scores - np.max(scores, axis=1).reshape(-1,1) # N*1
softmax_output = np.exp(prevent_explo_scores)/np.sum(np.exp(prevent_explo_scores), axis =1).reshape(-1,1)
loss = -np.sum(np.log(softmax_output[range(num_train), list(y)]))
loss /= num_train
loss += 0.5* reg* np.sum(W* W)
dS = softmax_output.copy()
dS[range(num_train), list(y)] += -1 # 减去那个-1项,看我的笔记就知道了
dW = (X.T).dot(dS) # DxN * NxC = DxC 3073*10
dW = dW / num_train + reg *W
########################另一个 GitHub-observerspy 的办法#####################
# num_train = X.shape[0]
# num_classes = W.shape[1]
# scores = X.dot(W)
# correct_class_score = scores[np.arange(num_train), y].reshape(-1,1)
# exp_sum = np.sum(np.exp(scores), axis=1).reshape(-1,1)
# loss += np.sum(np.log(exp_sum) - correct_class_score)
# margin = np.exp(scores) / exp_sum
# margin[np.arange(num_train),y] += 1
# dW = X.T.dot(margin)
# loss /= num_train
# dW /= num_train
# loss += 0.5*reg*np.sum(W*W)
# dW += reg*W
return loss, dW
from __future__ import print_function
import numpy as np
import matplotlib.pyplot as plt
class TwoLayerNet(object):
A two-layer fully-connected neural network. The net has an input dimension of
N, a hidden layer dimension of H,
and performs classification over C classes.
We train the network with a softmax loss function and L2 regularization
on the weight matrices. The network uses a ReLU nonlinearity after the first fully
connected layer.
In other words, the network has the following architecture:
input - fully connected layer - ReLU - fully connected layer - softmax
The outputs of the second fully-connected layer are the scores for each class.
def __init__(self, input_size, hidden_size, output_size, std=1e-4):
Initialize the model. Weights are initialized to small random values and
biases are initialized to zero. Weights and biases are stored in the
variable self.params, which is a dictionary with the following keys:
W1: First layer weights; has shape (D, H)
b1: First layer biases; has shape (H,)
W2: Second layer weights; has shape (H, C)
b2: Second layer biases; has shape (C,)
- input_size: The dimension D of the input data.
- hidden_size: The number of neurons H in the hidden layer.
- output_size: The number of classes C.
初始化模型中权重是比较小的随机值,偏置初始化为零,存在: self.params字典中
输入 D ----> DxH && Hx1 -----> H -----> HxC && Cx1 ----> C
self.params = {}
self.params['W1'] = std * np.random.randn(input_size, hidden_size)
self.params['b1'] = np.zeros(hidden_size)
self.params['W2'] = std * np.random.randn(hidden_size, output_size)
self.params['b2'] = np.zeros(output_size)
def loss(self, X, y=None, reg=0.0):
Compute the loss and gradients for a two layer fully connected neural
- X: Input data of shape (N, D). Each X[i] is a training sample.
- y: Vector of training labels. y[i] is the label for X[i], and each y[i] is
an integer in the range 0 <= y[i] < C.
This parameter is optional; if it
is not passed then we only return scores, and if it is passed then we
instead return the loss and gradients.
- reg: Regularization strength.
If y is None, return a matrix scores of shape (N, C) where scores[i, c] is
the score for class c on input X[i].
If y is not None, instead return a tuple of:
- loss: Loss (data loss and regularization loss) for this batch of training
- grads: Dictionary mapping parameter names to gradients of those parameters
with respect to the loss function; has the same keys as self.params.
X = NxD y 就是X对应的分数 Nx1
如果传入参数y 则返回损失和梯度 grads, loss,
如果没有参数y 则返回的 分数: NxC 的, 分数[i, c] 对应输入的X[i] 对每一类的分数
# Unpack variables from the params dictionary
W1, b1 = self.params['W1'], self.params['b1']
W2, b2 = self.params['W2'], self.params['b2']
N, D = X.shape
# Compute the forward pass
scores = None
# TODO: Perform the forward pass, computing the class scores for the input. #
# Store the result in the scores variable, which should be an array of #
# shape (N, C). #
H_out = np.maximum(0, X.dot(W1) + b1) # ReLU 就是np.maximum H_out = NxH
scores = H_out.dot(W2) + b2 # NxC
# pass
# If the targets are not given then jump out, we're done
# 这里就是看是否有y给出,给出就继续,没有就返回scores
if y is None:
return scores
# Compute the loss
loss = None
# TODO: Finish the forward pass, and compute the loss. This should include #
# both the data loss and L2 regularization for W1 and W2. Store the result #
# in the variable loss, which should be a scalar. Use the Softmax #
# classifier loss. #
prevent_explo_scores = scores - np.max(scores, axis=1).reshape(-1,1) # N*1
softmax_output = np.exp(prevent_explo_scores)/np.sum(np.exp(prevent_explo_scores), axis =1).reshape(-1,1) # 分母按行求和, 最后得到 NxC
loss = -np.sum(np.log(softmax_output[range(N), list(y)])) #
# np.sum()
# 直接就是所有的和得到一个数值, 如果axis=None
loss /= N
loss += 0.5* reg* (np.sum(W1 * W1) + np.sum(W2 * W2))
# pass
# Backward pass: compute gradients
grads = {}
# TODO: Compute the backward pass, computing the derivatives of the weights #
# and biases. Store the results in the grads dictionary. For example, #
# grads['W1'] should store the gradient on W1, and be a matrix of same size #
dscores = softmax_output.copy() # NxC
dscores[range(N), list(y)] -= 1
dscores /= N
dW2 = H_out.T.dot(dscores) + reg * W2 # HxC
# 这里别忘了正则项
grads['W2'] = dW2
grads['b2'] = np.sum(dscores, axis=0)
dH = dscores.dot(W2.T)
dH_Relu = (H_out > 0) * dH
# 这个语句就包含了:dH_Relu[out1 <= 0] = 0
grads['W1'] = X.T.dot(dH_Relu) + reg * W1
grads['b1'] = np.sum(dH_Relu, axis = 0)
# pass
return loss, grads
def train(self, X, y, X_val, y_val,
learning_rate=1e-3, learning_rate_decay=0.95,
reg=5e-6, num_iters=100,
batch_size=200, verbose=False):
Train this neural network using stochastic gradient descent.
- X: A numpy array of shape (N, D) giving training data.
- y: A numpy array f shape (N,) giving training labels; y[i] = c means that
X[i] has label c, where 0 <= c < C.
- X_val: A numpy array of shape (N_val, D) giving validation data.
- y_val: A numpy array of shape (N_val,) giving validation labels.
- learning_rate: Scalar giving learning rate for optimization.
- learning_rate_decay: Scalar giving factor used to decay the learning rate
after each epoch.
- reg: Scalar giving regularization strength.
- num_iters: Number of steps to take when optimizing.
- batch_size: Number of training examples to use per step.
- verbose: boolean; if true print progress during optimization.
num_train = X.shape[0] # NxD
iterations_per_epoch = max(num_train / batch_size, 1)
# Use SGD to optimize the parameters in self.model
loss_history = []
train_acc_history = []
val_acc_history = []
for it in range(num_iters):
X_batch = None
y_batch = None
# TODO: Create a random minibatch of training data and labels, storing #
# them in X_batch and y_batch respectively. #
# pass
idx = np.random.choice(num_train, batch_size, replace=True)
X_batch = X[idx]
y_batch = y[idx]
# Compute loss and gradients using the current minibatch
loss, grads = self.loss(X_batch, y=y_batch, reg=reg)
# TODO: Use the gradients in the grads dictionary to update the #
# parameters of the network (stored in the dictionary self.params) #
# using stochastic gradient descent. You'll need to use the gradients #
# stored in the grads dictionary defined above. #
# pass
self.params['W2'] += -learning_rate * grads['W2']
self.params['b2'] += -learning_rate * grads['b2']
self.params['W1'] += -learning_rate * grads['W1']
self.params['b1'] += -learning_rate * grads['b1']
if verbose and it % 10 == 0:
print('iteration %d / %d: loss %f' % (it, num_iters, loss))
# Every epoch, check train
# and val accuracy and decay learning rate.
if it % iterations_per_epoch == 0:
# Check accuracy
train_acc = (self.predict(X_batch) == y_batch).mean()
val_acc = (self.predict(X_val) == y_val).mean()
# Decay learning rate
learning_rate *= learning_rate_decay
return {
'loss_history': loss_history,
'train_acc_history': train_acc_history,
'val_acc_history': val_acc_history,
def predict(self, X):
Use the trained weights of this two-layer network to predict labels for
data points. For each data point we predict scores for each of the C
classes, and assign each data point to the class with the highest score.
- X: A numpy array of shape (N, D) giving N D-dimensional data points to
- y_pred: A numpy array of shape (N,) giving predicted labels for each of
the elements of X. For all i, y_pred[i] = c means that X[i] is predicted
to have class c, where 0 <= c < C.
y_pred = None
# TODO: Implement this function; it should be VERY simple! #
# pass
H = np.maximum(0, X.dot(self.params['W1']) + self.params['b1'])
final_scores = H.dot(self.params['W2']) + self.params['b2']
y_pred = np.argmax(final_scores, axis = 1)
return y_pred
import numpy as np
from random import shuffle
# 这是naive 的损失函数:就是有循环,看到了吧, 鄙视你循环!
# 其实更新梯度有两种方法,1. 倒数的定义出发,2.直接微分分析
######################## 输入:#############################
# #### W numpy (维度【权重D维度】, 类【类别个数】) 3073x10 #
# #### X numpy (N个数据,权重D维度) 100x3073 #
# #### y numpy (N,) y[i] =c, X[i]的分类是c, c < C! 100 #
# #### reg float 正则化强度 ,或者正则化系数 #
######################## 输出:#############################
# #### loss, dW, 单精度float, dW 和 W 一样的维度 #
def svm_loss_naive(W, X, y, reg):
Structured SVM loss function, naive implementation (with loops).
Inputs have dimension D, there are C classes, and we operate on minibatches
of N examples.
- W: A numpy array of shape (D, C) containing weights.
- X: A numpy array of shape (N, D) containing a minibatch of data.
- y: A numpy array of shape (N,) containing training labels; y[i] = c means
that X[i] has label c, where 0 <= c < C.
- reg: (float) regularization strength
Returns a tuple of:
- loss as single float
- gradient with respect to weights W; an array of same shape as W
dW = np.zeros(W.shape) # initialize the gradient as zero
# compute the loss and the gradient
num_classes = W.shape[1] # 10
num_train = X.shape[0] # 100x3073
loss = 0.0
for i in range(num_train):
scores = X[i].dot(W) # X[i].dot(W) = (1*D) · (D*C) = 1*C = 1*10
correct_class_score = scores[y[i]]
for j in range(num_classes):
if j == y[i]:
margin = scores[j] - correct_class_score + 1 # note delta = 1
# 每个大于0的maxmargin会产生两个贡献
if margin > 0:
loss += margin
dW[:,j] += X[i].T
# 分类错误的添加一个xi
dW[:,y[i]] -=X[i].T
# 分类正确的产生一个-xi
# Right now the loss is a sum over all training examples, but we want it
# to be an average instead so we divide by num_train.
loss /= num_train
# 这里就是那个 ( 1/N )
dW /= num_train
# Add regularization to the loss.
loss += 0.5 * reg * np.sum(W * W)
# 加正则化
dW += reg*W
# TODO: #
# Compute the gradient of the loss function and store it dW. #
# Rather that first computing the loss and then computing the derivative, #
# it may be simpler to compute the derivative at the same time that the #
# loss is being computed. As a result you may need to modify some of the #
# code above to compute the gradient. #
return loss, dW
# 构建向量化SVM 损失函数, 这里得到的输出和 非向量化的相同
# 先存储,score,和loss, 然后计算dW.
def svm_loss_vectorized(W, X, y, reg):
Structured SVM loss function, vectorized implementation.
Inputs and outputs are the same as svm_loss_naive.
loss = 0.0
dW = np.zeros(W.shape) # initialize the gradient as zero
num_train = X.shape[0]
num_classes = W.shape[1]
# TODO: #
# Implement a vectorized version of the structured SVM loss, storing the #
# result in loss. #
# pass
scores = X.dot(W) # N*C
correct_class_score = scores[range(num_train), list(y)].reshape(-1,1)
margin = np.maximum(0, scores - correct_class_score + 1)
# margin[range(num_train), list(y)] = 0 # sj-si + 1 >0 ,所以不算这些.
loss = np.sum(margin) / num_train + 0.5 * reg * np.sum(W * W)
# TODO: #
# Implement a vectorized version of the gradient for the structured SVM #
# loss, storing the result in dW. #
# #
# Hint: Instead of computing the gradient from scratch, it may be easier #
# to reuse some of the intermediate values that you used to compute the #
# loss. #
# pass
# 这里是来自 lightatime的GitHub
# coeff_mat = np.zeros((num_train, num_classes))
# coeff_mat[margin>0] = 1
# coeff_mat[(range(num_train), list[y])] = 0
# coeff_mat[(range(num_train), list[y])] = -np.sum(coeff_mat, axis=1)
# dW = (x.T).dot(coeff_mat)
# dW = dW / num_train + reg*W
######## 下面是另一种方法,好像这个简单,不需要中间矩阵 coeff_mat ##########
margin[margin>0] = 1 # 或者写成 margin = (margin>0)*1
row_sum = np.sum(margin,axis=1) #
margin[range(num_train), list(y)] = -row_sum
dW = X.T.dot(margin) / num_train + reg*W # D*C
return loss, dW
from __future__ import print_function
import numpy as np
from cs231n.classifiers.linear_svm import *
from cs231n.classifiers.softmax import *
# 这里只需要说一下:verbose,就是训练优化过程中要不要打印过程
class LinearClassifier(object):
def __init__(self):
self.W = None
def train(self, X, y, learning_rate=1e-3, reg=1e-5, num_iters=100,
batch_size=200, verbose=False):
Train this linear classifier using stochastic gradient descent.
- X: A numpy array of shape (N, D) containing training data; there are N
training samples each of dimension D.
- y: A numpy array of shape (N,) containing training labels; y[i] = c
means that X[i] has label 0 <= c < C for C classes.
- learning_rate: (float) learning rate for optimization.
- reg: (float) regularization strength.
- num_iters: (integer) number of steps to take when optimizing
- batch_size: (integer) number of training examples to use at each step.
- verbose: (boolean) If true, print progress during optimization.
A list containing the value of the loss function at each training iteration.
num_train, dim = X.shape
num_classes = np.max(y) + 1 # assume y takes values 0...K-1 where K is number of classes
if self.W is None:
# lazily initialize W
self.W = 0.001 * np.random.randn(dim, num_classes)
# Run stochastic gradient descent to optimize W
loss_history = []
for it in range(num_iters):
X_batch = None
y_batch = None
# TODO: #
# Sample batch_size elements from the training data and their #
# corresponding labels to use in this round of gradient descent. #
# Store the data in X_batch and their corresponding labels in #
# y_batch; after sampling X_batch should have shape (dim, batch_size) #
# and y_batch should have shape (batch_size,) #
# #
# Hint: Use np.random.choice to generate indices. Sampling with #
# replacement is faster than sampling without replacement. #
# pass
mask = np.random.choice(num_train, batch_size, replace=True)
X_batch = X[mask]
# 随机从数据中选取数据
y_batch = y[mask]
# 用来随机梯度下降啊
# evaluate loss and gradient
loss, grad = self.loss(X_batch, y_batch, reg)
# perform parameter update
# TODO: #
# Update the weights using the gradient and the learning rate. #
# pass
self.W += -learning_rate * grad
if verbose and it % 100 == 0:
print('iteration %d / %d: loss %f' % (it, num_iters, loss))
return loss_history
def predict(self, X):
Use the trained weights of this linear classifier to predict labels for
data points.
- X: A numpy array of shape (N, D) containing training data; there are N
training samples each of dimension D.
- y_pred: Predicted labels for the data in X. y_pred is a 1-dimensional
array of length N, and each element is an integer giving the predicted
y_pred = np.zeros(X.shape[1])
# TODO: #
# Implement this method. Store the predicted labels in y_pred. #
# pass
y_pred = np.argmax(X.dot(self.W), axis=1)
return y_pred
def loss(self, X_batch, y_batch, reg):
Compute the loss function and its derivative.
Subclasses will override this.
- X_batch: A numpy array of shape (N, D) containing a minibatch of N
data points; each point has dimension D.
- y_batch: A numpy array of shape (N,) containing labels for the minibatch.
- reg: (float) regularization strength.
Returns: A tuple containing:
- loss as a single float
- gradient with respect to self.W; an array of the same shape as W
class LinearSVM(LinearClassifier):
""" A subclass that uses the Multiclass SVM loss function """
def loss(self, X_batch, y_batch, reg):
return svm_loss_vectorized(self.W, X_batch, y_batch, reg)
class Softmax(LinearClassifier):
""" A subclass that uses the Softmax + Cross-entropy loss function """
def loss(self, X_batch, y_batch, reg):
return softmax_loss_vectorized(self.W, X_batch, y_batch, reg)
import numpy as np
from past.builtins import xrange
# pass 空语句块,是为了保持程序结构的完整性,一般用做占位语句
class KNearestNeighbor(object):
""" a kNN classifier with L2 distance """
def __init__(self):
def train(self, X, y):
Train the classifier. For k-nearest neighbors this is just
memorizing the training data.
- X: A numpy array of shape (num_train, D) containing the training data
consisting of num_train samples each of dimension D.
- y: A numpy array of shape (N,) containing the training labels, where
y[i] is the label for X[i].
self.X_train = X # 5000x3072
self.y_train = y # 500x3072
def predict(self, X, k=1, num_loops=0):
Predict labels for test data using this classifier.
- X: A numpy array of shape (num_test, D) containing test data consisting
of num_test samples each of dimension D.
- k: The number of nearest neighbors that vote for the predicted labels.
- num_loops: Determines which implementation to use to compute distances
between training points and testing points.
- y: A numpy array of shape (num_test,) containing predicted labels for the
test data, where y[i] is the predicted label for the test point X[i].
X numpy (num_test, D),比如(5000,3072)
k 选取几个最近的label
num_loops 哪个计算距离的方法,0:no_loops 1:one_loops 2:two_loops
y numpy (num_test,) 比如(500) 其实就是标签啦
test_data y[i] 就是X[i] 的标签
if num_loops == 0:
dists = self.compute_distances_no_loops(X)
elif num_loops == 1:
dists = self.compute_distances_one_loop(X)
elif num_loops == 2:
dists = self.compute_distances_two_loops(X)
raise ValueError('Invalid value %d for num_loops' % num_loops)
return self.predict_labels(dists, k=k)
def compute_distances_two_loops(self, X):
##### 计算L2 距离 #####
Compute the distance between each test point in X and each training point
in self.X_train using a nested loop over both the training data and the
test data.
- X: A numpy array of shape (num_test, D) containing test data.
- dists: A numpy array of shape (num_test, num_train) where dists[i, j]
is the Euclidean distance between the ith test point and the jth training
num_test = X.shape[0]
# 500
num_train = self.X_train.shape[0]
# 5000
dists = np.zeros((num_test, num_train))
# 500x5000 全零矩阵
for i in xrange(num_test):
for j in xrange(num_train):
# TODO: #
# Compute the l2 distance between the ith test point and the jth #
# training point, and store the result in dists[i, j]. You should #
# not use a loop over dimension.
dists[i,j] = np.sqrt(np.sum(np.square(X[i] - self.X_train[j])))
# 另一种向量化方法
# dicts[i,j] = np.sqrt(np.dot(X[i]-self.X_train[i], X[i]-X_train[j]))
# 使用 函数 numpy.linalg.norm 来实现
# dicts[i,j] = np.linalg.norm(self.X_train[j,:] - X[i])
# pass
return dists
def compute_distances_one_loop(self, X):
Compute the distance between each test point in X and each training point
in self.X_train using a single loop over the test data.
Input / Output: Same as compute_distances_two_loops
num_test = X.shape[0]
num_train = self.X_train.shape[0]
dists = np.zeros((num_test, num_train))
for i in xrange(num_test):
# TODO: #
# Compute the l2 distance between the ith test point and all training #
# points, and store the result in dists[i, :]. #
# pass
dists[i] = np.sqrt(np.sum(np.square(self.X_train - X[i]),axis =1))
return dists
def compute_distances_no_loops(self, X):
Compute the distance between each test point in X and each training point
in self.X_train using no explicit loops.
Input / Output: Same as compute_distances_two_loops
num_test = X.shape[0]
num_train = self.X_train.shape[0]
dists = np.zeros((num_test, num_train))
# TODO: #
# Compute the l2 distance between all test points and all training #
# points without using any explicit loops, and store the result in #
# dists.
# #
# You should implement this function using only basic array operations; #
# in particular you should not use functions from scipy. #
# #
# HINT: Try to formulate the l2 distance using matrix multiplication #
# and two broadcast sums. #
# pass
# 基本思想就是 (a-b)2 = a2+b2-2ab
A = np.sum(np.square(self.X_train), axis = 1)
B = np.transpose([np.sum(np.square(X), axis =1)])
er_AB = 2*np.dot(X, self.X_train.T)
dists = np.sqrt(A + B - er_AB)
return dists
def predict_labels(self, dists, k=1):
Given a matrix of distances between test points and training points,
predict a label for each test point.
- dists: A numpy array of shape (num_test, num_train) where dists[i, j]
gives the distance betwen the ith test point and the jth training point.
- y: A numpy array of shape (num_test,) containing predicted labels for the
test data, where y[i] is the predicted label for the test point X[i].
num_test = dists.shape[0]
y_pred = np.zeros(num_test)
for i in xrange(num_test):
# A list of length k storing the labels of the k nearest neighbors to
# the ith test point.
closest_y = []
# TODO: #
# Use the distance matrix to find the k nearest neighbors of the ith #
# testing point, and use self.y_train to find the labels of these #
# neighbors. Store these labels in closest_y. #
# Hint: Look up the function numpy.argsort. #
# pass
closest_y = self.y_train[np.argsort(dists[i])[:k]]
# TODO: #
# Now that you have found the labels of the k nearest neighbors, you #
# need to find the most common label in the list closest_y of labels. #
# Store this label in y_pred[i]. Break ties by choosing the smaller #
# label. #
# pass
y_pred[i] = np.argmax(np.bincount(closest_y))
# argmax 返回最值所在的索引
# #
return y_pred
