292 lines
8.9 KiB
Python
292 lines
8.9 KiB
Python
import copy
|
|
import numpy as np
|
|
import numpy.random as npr
|
|
import random
|
|
from sklearn import preprocessing
|
|
|
|
import matplotlib.pyplot as plt
|
|
|
|
DATA_TYPE = np.float32
|
|
|
|
|
|
def dataset_get_sin():
|
|
NUM = 1000
|
|
RATIO = 0.5
|
|
SPLIT = int(NUM * RATIO)
|
|
data = np.zeros((NUM, 2), DATA_TYPE)
|
|
data[:, 0] = np.linspace(0.0, 4 * np.pi, num=NUM) # inputs
|
|
data[:, 1] = np.sin(data[:, 0]) # outputs
|
|
npr.shuffle(data)
|
|
training, test = data[:SPLIT,:], data[SPLIT:,:]
|
|
return training, test
|
|
|
|
|
|
def dataset_get_linear():
|
|
NUM = 100
|
|
RATIO = 0.8
|
|
SPLIT = int(NUM * RATIO)
|
|
data = np.zeros((NUM, 2), DATA_TYPE)
|
|
data[:, 0] = np.linspace(0.0, 2 * np.pi, num=NUM) # inputs
|
|
data[:, 1] = 2 * data[:, 0] # outputs
|
|
npr.shuffle(data)
|
|
training, test = data[:SPLIT,:], data[SPLIT:,:]
|
|
return training, test
|
|
|
|
|
|
def relu(x):
|
|
"""Apply a rectified linear unit to x"""
|
|
return np.maximum(0, x)
|
|
|
|
|
|
def d_relu(x):
|
|
res = x
|
|
res[res >= 0] = 1
|
|
res[res < 0] = 0.01
|
|
return res
|
|
|
|
|
|
def sigmoid(vec):
|
|
"""Apply sigmoid to vec"""
|
|
return 1.0 / (1.0 + np.exp(-1 * vec))
|
|
|
|
|
|
def d_sigmoid(vec):
|
|
s = sigmoid(vec)
|
|
return s * (1 - s)
|
|
|
|
|
|
def L(x, y):
|
|
return 0.5 * (x - y) * (x - y)
|
|
|
|
|
|
class Model(object):
|
|
|
|
def __init__(self, layer_size, h, dh, data_type):
|
|
self.w1 = npr.uniform(0, 1, layer_size)
|
|
self.b1 = npr.uniform(0, 1, layer_size)
|
|
self.w2 = npr.uniform(0, 1, (1, layer_size))
|
|
self.b2 = npr.uniform(0, 1, 1)
|
|
|
|
# self.w1 = preprocessing.scale(self.w1)
|
|
# self.w2 = preprocessing.scale(self.w2)
|
|
# self.b1 = preprocessing.scale(self.b1)
|
|
# self.b2 = preprocessing.scale(self.b2)
|
|
|
|
self.h = h
|
|
self.dh = dh
|
|
|
|
self.w1_v = np.zeros(self.w1.shape)
|
|
self.w2_v = np.zeros(self.w2.shape)
|
|
self.b1_v = np.zeros(self.b1.shape)
|
|
self.b2_v = np.zeros(self.b2.shape)
|
|
|
|
def L(self, x, y):
|
|
f_x = self.f(x)
|
|
return 0.5 * (f_x - y) * (f_x - y)
|
|
|
|
def z1(self, x):
|
|
return self.w1 * x + self.b1
|
|
|
|
def a(self, x):
|
|
return self.h(self.z1(x))
|
|
|
|
def f(self, x):
|
|
return np.dot(self.w2, self.a(x)) + self.b2
|
|
|
|
def dLdf(self, x, y):
|
|
return self.f(x) - y
|
|
|
|
def dLdb2(self, x, y):
|
|
return self.dLdf(x, y)
|
|
|
|
def dfda(self): # how f changes with ith element of a
|
|
return self.w2
|
|
|
|
def dadz1(self, x): # how a[i] changes with z1[i]
|
|
"""Compute da/dz1 for an input x"""
|
|
return self.dh(self.z1(x))
|
|
|
|
def dLdz1(self, x, y):
|
|
"""Compute dL/dz1 for an input x and expected output y"""
|
|
return self.dLdf(x, y) * np.dot(self.dfda(), self.dadz1(x))
|
|
|
|
def dLdw1(self, x, y):
|
|
"""Compute dL/dw1 for an input x and expected output y"""
|
|
return self.dLdf(x, y) * np.dot(self.dfda(), self.dadz1(x) * x)
|
|
|
|
def dLdw2(self, x, y):
|
|
"""Compute dL/dw2 for an input x and expected output y"""
|
|
return self.dLdf(x, y) * self.a(x) # df/dw2
|
|
|
|
def dLdb1(self, x, y):
|
|
return self.dLdf(x, y) * np.dot(self.dfda(), self.dadz1(x))
|
|
|
|
def backward(self, training_samples, ETA):
|
|
for sample in training_samples:
|
|
sample_input = sample[0]
|
|
sample_output = sample[1]
|
|
|
|
b2_grad = self.dLdb2(sample_input, sample_output)
|
|
w2_grad = self.dLdw2(sample_input, sample_output)
|
|
b1_grad = self.dLdb1(sample_input, sample_output)
|
|
w1_grad = self.dLdw1(sample_input, sample_output)
|
|
self.b2 -= ETA * b2_grad
|
|
self.b1 -= ETA * b1_grad
|
|
self.w2 -= ETA * w2_grad
|
|
self.w1 -= ETA * w1_grad
|
|
return
|
|
|
|
def backward_minibatch(self, batch, ETA):
|
|
b2_grad = np.zeros(self.b2.shape)
|
|
b1_grad = np.zeros(self.b1.shape)
|
|
w2_grad = np.zeros(self.w2.shape)
|
|
w1_grad = np.zeros(self.w1.shape)
|
|
|
|
for sample in batch:
|
|
sample_input = sample[0]
|
|
sample_output = sample[1]
|
|
|
|
self.grad_checker(10e-4, sample_input, sample_output)
|
|
|
|
b2_grad += self.dLdb2(sample_input, sample_output)
|
|
w2_grad += self.dLdw2(sample_input, sample_output)
|
|
b1_grad += self.dLdb1(sample_input, sample_output)
|
|
w1_grad += self.dLdw1(sample_input, sample_output)
|
|
|
|
self.b2 -= ETA * b2_grad / len(batch)
|
|
self.b1 -= ETA * b1_grad / len(batch)
|
|
self.w2 -= ETA * w2_grad / len(batch)
|
|
self.w1 -= ETA * w1_grad / len(batch)
|
|
return
|
|
|
|
def SGDm(self, training_samples, ETA):
|
|
alpha = 0.99
|
|
for sample in training_samples:
|
|
sample_input = sample[0]
|
|
sample_output = sample[1]
|
|
|
|
self.b2_v = alpha * self.b2_v + ETA * \
|
|
self.dLdb2(sample_input, sample_output)
|
|
self.w2_v = alpha * self.w2_v + ETA * \
|
|
self.dLdw2(sample_input, sample_output)
|
|
self.b1_v = alpha * self.b1_v + ETA * \
|
|
self.dLdb1(sample_input, sample_output)
|
|
self.w1_v = alpha * self.w1_v + ETA * \
|
|
self.dLdw1(sample_input, sample_output)
|
|
self.b2 -= self.b2_v
|
|
self.b1 -= self.b1_v
|
|
self.w2 -= self.w2_v
|
|
self.w1 -= self.w1_v
|
|
return
|
|
|
|
def grad_checker(self, eps, x, y):
|
|
# Check b2
|
|
# inc_model = copy.deepcopy(self)
|
|
# dec_model = copy.deepcopy(self)
|
|
# inc_model.b2 = self.b2 + eps
|
|
# dec_model.b2 = self.b2 - eps
|
|
# grad_estimate = (inc_model.L(x, y) - dec_model.L(x, y)) / (2 * eps)
|
|
# grad_actual = self.dLdb2(x, y)
|
|
# print "b2:", np.linalg.norm(grad_estimate - grad_actual)
|
|
|
|
# Check b1
|
|
# inc_model = copy.deepcopy(self)
|
|
# dec_model = copy.deepcopy(self)
|
|
# inc_model.b1 = self.b1 + eps
|
|
# dec_model.b1 = self.b1 - eps
|
|
# grad_estimate = (inc_model.L(x, y) - dec_model.L(x, y)) / (2 * eps)
|
|
# grad_actual = self.dLdb1(x, y)
|
|
# print "b1:", np.linalg.norm(grad_estimate - grad_actual)
|
|
|
|
# Check w2
|
|
inc_model = copy.deepcopy(self)
|
|
dec_model = copy.deepcopy(self)
|
|
inc_model.w2 = self.w2 + eps
|
|
dec_model.w2 = self.w2 - eps
|
|
grad_estimate = (inc_model.L(x, y) - dec_model.L(x, y)) / (2 * eps)
|
|
grad_actual = self.dLdw2(x, y)
|
|
print "w2:", np.linalg.norm(grad_estimate - grad_actual)
|
|
|
|
# Check w1
|
|
# inc_model = copy.deepcopy(self)
|
|
# dec_model = copy.deepcopy(self)
|
|
# inc_model.w1 = self.w1 + eps
|
|
# dec_model.w1 = self.w1 - eps
|
|
# grad_estimate = (inc_model.L(x, y) - dec_model.L(x, y)) / (2 * eps)
|
|
# grad_actual = self.dLdw1(x, y)
|
|
# print "w1:", np.linalg.norm(grad_estimate - grad_actual)
|
|
|
|
def evaluate(model, samples):
|
|
"""Report the average loss function over the data"""
|
|
cost_acc = 0.0
|
|
for sample in samples:
|
|
cost_acc += model.L(sample[0], sample[1])
|
|
return cost_acc / len(samples)
|
|
|
|
TRAIN_DATA, TEST_DATA = dataset_get_sin()
|
|
# TRAIN_DATA, TEST_DATA = dataset_get_linear()
|
|
|
|
MODEL = Model(6, sigmoid, d_sigmoid, DATA_TYPE)
|
|
# MODEL = Model(10, relu, d_relu, DATA_TYPE)
|
|
|
|
# Train the model with some training data
|
|
TRAINING_ITERS = 5000
|
|
LEARNING_RATE = 0.005
|
|
TRAINING_SUBSET_SIZE = len(TRAIN_DATA)
|
|
PATIENCE = 100
|
|
|
|
print TRAINING_SUBSET_SIZE
|
|
|
|
best_rate = np.inf
|
|
for training_iter in range(TRAINING_ITERS):
|
|
# Create a training sample
|
|
training_subset_indices = npr.choice(
|
|
range(len(TRAIN_DATA)), size=TRAINING_SUBSET_SIZE, replace=False)
|
|
training_subset = [TRAIN_DATA[i] for i in training_subset_indices]
|
|
random.shuffle(training_subset)
|
|
|
|
# Apply backpropagation
|
|
# MODEL.backward(training_subset, LEARNING_RATE)
|
|
|
|
# Apply backpropagation
|
|
# MODEL.SGDm(training_subset, LEARNING_RATE)
|
|
|
|
# Apply backprop with minibatch
|
|
BATCH_SIZE = 1
|
|
for i in range(0, len(training_subset), BATCH_SIZE):
|
|
batch = training_subset[i:min(i + BATCH_SIZE, len(training_subset))]
|
|
# print batch
|
|
MODEL.backward_minibatch(batch, LEARNING_RATE)
|
|
|
|
# Evaluate accuracy against training data
|
|
training_rate = evaluate(MODEL, training_subset)
|
|
test_rate = evaluate(MODEL, TEST_DATA)
|
|
|
|
print training_iter, "cost:", training_rate, test_rate,
|
|
|
|
# If it's the best one so far, store it
|
|
if training_rate < best_rate:
|
|
print "(new best)"
|
|
best_rate = training_rate
|
|
patience = PATIENCE
|
|
else:
|
|
patience -= 1
|
|
print patience
|
|
|
|
if patience <= 0:
|
|
print PATIENCE, "iterations without improvement"
|
|
break
|
|
|
|
TEST_OUTPUT = np.vectorize(MODEL.f)(TEST_DATA[:, 0])
|
|
TRAIN_OUTPUT = np.vectorize(MODEL.f)(TRAIN_DATA[:, 0])
|
|
|
|
scatter_train, = plt.plot(
|
|
TRAIN_DATA[:, 0], TRAIN_DATA[:, 1], 'ro', label="Real Data")
|
|
scatter_train_out, = plt.plot(
|
|
TRAIN_DATA[:, 0], TRAIN_OUTPUT, 'go', label="Network output on training data")
|
|
scatter_test_out, = plt.plot(
|
|
TEST_DATA[:, 0], TEST_OUTPUT, 'bo', label="Network output on test data")
|
|
plt.legend(handles=[scatter_train, scatter_train_out, scatter_test_out])
|
|
plt.savefig("results.png", bbox_inches="tight")
|
|
plt.show()
|