Perceptron is one of the common Machine Learning algorithms that’s used for training classifiers and making predictions.
The code sample below is my implementation of Perceptron for a competition project published on Kaggle.com during taking Machine Learning class.
With 25000 units user data of training on my Perceptron and testing on 12500 units of data, the highest accuracy I could hit was 88.54%.
import random
DATA_FOLD = 'data-splits/'
DATA_RAW = 'raw-data/'
OUTPUT_FOLD = 'output/'
RANDOM_SEED = 888
EPOCH = 30
# define the max dimension here
MAX_DIMENSION = 74482
def array_info(arr):
new_arr = []
for i in range(len(arr)):
if i == 0:
if arr[i] is '0':
new_arr.append(-1)
else:
new_arr.append(1)
else:
chop = arr[i].split(":")
new_arr.append(int(chop[0]))
new_arr.append(int(chop[1]))
return new_arr
# When doing the simple (standard) perceptron, update the weight vector
# when y(wx) <= 0. (margin = 0)
#
# rate decaying perceptron using the same algorithm,
# only the pass-in learning rate r changes for every time-step
#
# margin perceptron using the same algorithm,
# only the pass-in learning rate r and margin m changes for every time-step
#
# averaged perceptron use this function as well. a gets modified in this case.
#
# parameters:
# array:the line data, w: weight classifier, r: learning rate,
# m: margin, a: averaged classifier, is_averaged_perceptron: bool,
# updates: number of w's updates
def generic_perceptron(array, w, r, m, a, is_averaged_perceptron, counter):
total = 0
b = w[0]
beta = a[0]
for x in range(1, len(array), 2):
index = array[x]
total += array[x+1] * w[index] + b
y = array[0]
predict = total * y
if predict < m:
# print("updating b and w")
# update b
w[0] = b + r * y
if is_averaged_perceptron:
a[0] = beta + r * counter * y
# update w
for x in range(1, len(array), 2):
index = array[x]
w[index] = w[index] + r * y * array[x+1]
if is_averaged_perceptron:
a[index] = a[index] + r * y * array[x+1] * counter
# updates += 1
return [w, a]
# return a random w vector which each element is in [-0.01, 0.01],
# the length of the vector is one more than the dimension of x
# for storing bias term, so "b" is also initialed here.
def initial_w(length):
w = []
for x in range(length):
random.seed(RANDOM_SEED)
a = random.uniform(-0.01, 0.01)
w.append(a)
return w
# returns the trained weight w from simple perception
def do_epoch(r, data, w, variant, e, m, a, counter):
# shuffle the data
random.seed(RANDOM_SEED)
random.shuffle(data)
# reformat them in an easy-calculate way
# new_data = reformat_data(data, len(w))
# processing the data using perception algorithm, and return
# the trained wight vector (chop the data into regular format first)
for l in range(len(data)):
if variant == 1:
[w, a] = generic_perceptron(data[l], w, r, m, a, False, counter)
if variant == 2 or variant == 3:
time_step = e * len(data) + l
# print(time_step)
# if time_step == 5999:
# print("hit")
rate = r / (1 + time_step)
# print(r, " and ", rate)
[w, a] = generic_perceptron(data[l], w, rate, m, a, False, counter)
if variant == 4:
[w, a] = generic_perceptron(data[l], w, r, m, a, True, counter)
counter += 1
return [w, a, counter]
# do the cross validation using 5-fold
def cross_validation(rate, variant, margin):
# initial the accuracy
total_accuracy = 0
train_data = read_files("data.train", DATA_FOLD)
train00 = []
train01 = []
train02 = []
train03 = []
train04 = []
for l in range(len(train_data)):
if l < 5000:
train00.append(train_data[l])
elif 5000 <= l < 10000:
train01.append(train_data[l])
elif 10000 <= l < 15000:
train02.append(train_data[l])
elif 20000 <= l < 25000:
train03.append(train_data[l])
else:
train04.append(train_data[l])
cvfiles = [train00, train01, train02, train03, train04]
# the total loop time for doing all cross validation
# process, which is 5 times.
for i in range(5):
# -----training part-----
# get all 4 training folds' filename
# initial w vector here (b is also initialed in this function)
classifier_w = initial_w(MAX_DIMENSION)
# initial classifier a for average perceptron
classifier_a = []
updates = 0
for a in range(MAX_DIMENSION):
classifier_a.append(0)
# initial number of epoch
epoch_time = EPOCH
# initial the 4 folds data set
data_set = []
# get the data set
for f in range(len(cvfiles)):
if f != i:
for l in cvfiles[f]:
data_set.append(l)
# train the w 10 epochs here
for epoch in range(epoch_time):
[classifier_w, classifier_a, updates] = do_epoch(
rate, data_set, classifier_w, variant,
epoch, margin, classifier_a, updates)
# -----testing part-----
testfile = "training0" + str(i) + ".data"
test_data = cvfiles[i]
if variant != 4:
# print("using w")
accuracy = test_classifier(test_data, classifier_w)
else:
# print("using a")
accuracy = test_classifier(test_data, classifier_a)
# sum the accuracy into the total
total_accuracy += accuracy
# get the average accuracy:
ave_accuracy = total_accuracy / 5
return ave_accuracy
def read_files(filename, directory):
data = []
f = open(directory + filename, "r")
for line in f:
arr = line.split(" ")
new_arr = array_info(arr)
data.append(new_arr)
return data
# test the trained classifier. using the left fold to
# compare the predicted label with real label, return
# the accuracy of the classifier.
def test_classifier(test_data, w):
total_predict = 0
correct_predict = 0
# data = reformat_data(test_data, len(w))
for arr in test_data:
total_predict += 1
label = arr[0]
b = w[0]
predict = 0
for x in range(1, len(arr), 2):
index = arr[x]
predict += arr[x + 1] * w[index] + b
if predict > 0:
predict = 1
elif predict <= 0:
predict = -1
if predict is label:
correct_predict += 1
accuracy = correct_predict / total_predict
return accuracy
def eval_classifier(test_data, w):
# total_predict = 0
# data = reformat_data(test_data, len(w))
result = []
for arr in test_data:
# total_predict += 1
b = w[0]
predict = 0
for x in range(1, len(arr), 2):
index = arr[x]
predict += arr[x + 1] * w[index] + b
if predict > 0:
predict = "1"
elif predict <= 0:
predict = "0"
result.append(predict)
return result
# Do all the experiment described in 4.3.2
# run 20 epochs, record accuracy and classifiers for each epoch,
# also record the number of updates to w.
def part2(variant):
accuracy_result = []
record_classifier = []
info = []
epoch_num = EPOCH
# # initial rate hyper-parameter
# r = a_r_m[1]
# # initial margin hyper-parameter
# m = a_r_m[2]
r = 0.01
# initial margin hyper-parameter
m = 0
if variant == 3:
m = 0.01
train_filename = "data.train"
test_filename = "data.test"
# updates = 0
counter = 1
train_data = read_files(train_filename, DATA_FOLD)
test_data = read_files(test_filename, DATA_FOLD)
w = initial_w(MAX_DIMENSION)
a = []
for i in range(MAX_DIMENSION):
a.append(0)
for e in range(epoch_num):
[w, a, counter] = do_epoch(r, train_data, w, variant, e, m, a, counter)
record_classifier.append([w, a])
if variant != 4:
accuracy = test_classifier(test_data, w)
# else:
# accuracy = test_classifier(test_data, w)
accuracy_result.append(accuracy)
if variant is 4:
for i in range(len(w)):
w[i] = w[i] - 1 / counter * a[i]
accuracy = test_classifier(test_data, w)
for i in range(epoch_num):
accuracy_result.append(accuracy)
info.append(accuracy_result)
info.append(record_classifier)
# info.append(updates)
return info
# Do all the experiment described in 4.3.3
# using the best classifier to test the test set
def part3(classifier):
filename = "data.eval.anon"
test_data = read_files(filename, DATA_FOLD)
accuracy = test_classifier(test_data, classifier)
return accuracy
def eval_(classifier):
filename = "data.eval.anon"
test_data = read_files(filename, DATA_FOLD)
result = eval_classifier(test_data, classifier)
return result
# modification to r for Decaying Rate perceptron
def modify_r(r, t):
rate = r / (1 + t)
return rate
# find the fest classifier according the index of the best accuracy
def find_best_classifier(info, variant):
the_classifier = []
if variant != 4:
acc = info[0]
classifiers = info[1]
highest_acc = max(acc)
for i in range(len(acc)):
if acc[i] is highest_acc:
the_classifier = classifiers[i][0]
print("(The epoch of the best accuracy: ", i+1, "th epoch)")
else:
the_classifier = info[1][-1][0]
return the_classifier
# function calls part 1, 2, 3 for simple perceptron
def process_variant(variant):
if variant is 1:
print("\n1. Simple Perceptron")
elif variant is 2:
print("\n2. Rate Decaying Perceptron")
elif variant is 3:
print("\n3. Margin Perceptron")
elif variant is 4:
print("\n4. Averaged Perceptron")
# print("b. Best accuracy for the best hyper-parameter:", acc_rate_margin[0])
result = part2(variant)
# print("c. Number of updates on training set for 20 epoch:", result[2])
print("d. Accuracies through 20 epochs on development set: ", result[0])
best_classifier = find_best_classifier(result, variant)
# print("best classifier is: ", best_classifier)
predict = eval_(best_classifier)
id_list = []
id_file = "data.eval.anon.id"
file = open(DATA_FOLD + id_file, "r")
for line in file:
haha = line.split()
id_list.append(haha[0])
out = "perceptron" + "-variant" + str(variant) + ".csv"
outfile = open(OUTPUT_FOLD + out, "w")
outfile.write("example_id,label\n")
for i in range(len(id_list)):
skr = id_list[i] + "," + predict[i] + "\n"
outfile.write(skr)
# run all variants
def run():
print("\nProgram starts.")
print("Seed for Random in this program:", RANDOM_SEED)
variants = [1, 2, 3, 4]
for v in variants:
process_variant(v)
print("\nProgram finished.")