Code Sample: Perceptron

Perceptron is one of the common Machine Learning algorithms that’s used for training classifiers and making predictions.

The code sample below is my implementation of Perceptron for a competition project published on Kaggle.com during taking Machine Learning class.

With 25000 units user data of training on my Perceptron and testing on 12500 units of data, the highest accuracy I could hit was 88.54%.


import random

DATA_FOLD = 'data-splits/'
DATA_RAW = 'raw-data/'
OUTPUT_FOLD = 'output/'
RANDOM_SEED = 888
EPOCH = 30

# define the max dimension here
MAX_DIMENSION = 74482


def array_info(arr):
    new_arr = []
    for i in range(len(arr)):
        if i == 0:
            if arr[i] is '0':
                new_arr.append(-1)
            else:
                new_arr.append(1)
        else:
            chop = arr[i].split(":")
            new_arr.append(int(chop[0]))
            new_arr.append(int(chop[1]))
    return new_arr


# When doing the simple (standard) perceptron, update the weight vector
# when y(wx) <= 0. (margin = 0)
#
# rate decaying perceptron using the same algorithm,
# only the pass-in learning rate r changes for every time-step
#
# margin perceptron using the same algorithm,
# only the pass-in learning rate r and margin m changes for every time-step
#
# averaged perceptron use this function as well. a gets modified in this case.
#
# parameters:
# array:the line data, w: weight classifier, r: learning rate,
# m: margin, a: averaged classifier, is_averaged_perceptron: bool,
# updates: number of w's updates
def generic_perceptron(array, w, r, m, a, is_averaged_perceptron, counter):
    total = 0
    b = w[0]
    beta = a[0]

    for x in range(1, len(array), 2):
        index = array[x]
        total += array[x+1] * w[index] + b

    y = array[0]
    predict = total * y
    if predict < m:
        # print("updating b and w")
        # update b
        w[0] = b + r * y
        if is_averaged_perceptron:
            a[0] = beta + r * counter * y
        # update w
        for x in range(1, len(array), 2):
            index = array[x]
            w[index] = w[index] + r * y * array[x+1]
            if is_averaged_perceptron:
                a[index] = a[index] + r * y * array[x+1] * counter
        # updates += 1

    return [w, a]


# return a random w vector which each element is in [-0.01, 0.01],
# the length of the vector is one more than the dimension of x
# for storing bias term, so "b" is also initialed here.
def initial_w(length):
    w = []
    for x in range(length):
        random.seed(RANDOM_SEED)
        a = random.uniform(-0.01, 0.01)
        w.append(a)
    return w


# returns the trained weight w from simple perception
def do_epoch(r, data, w, variant, e, m, a, counter):
    # shuffle the data
    random.seed(RANDOM_SEED)
    random.shuffle(data)
    # reformat them in an easy-calculate way
    # new_data = reformat_data(data, len(w))

    # processing the data using perception algorithm, and return
    # the trained wight vector (chop the data into regular format first)
    for l in range(len(data)):
        if variant == 1:
            [w, a] = generic_perceptron(data[l], w, r, m, a, False, counter)
        if variant == 2 or variant == 3:
            time_step = e * len(data) + l
            # print(time_step)
            # if time_step == 5999:
            #     print("hit")
            rate = r / (1 + time_step)
            # print(r, " and ", rate)
            [w, a] = generic_perceptron(data[l], w, rate, m, a, False, counter)
        if variant == 4:
            [w, a] = generic_perceptron(data[l], w, r, m, a, True, counter)
            counter += 1

    return [w, a, counter]


# do the cross validation using 5-fold
def cross_validation(rate, variant, margin):
    # initial the accuracy
    total_accuracy = 0

    train_data = read_files("data.train", DATA_FOLD)

    train00 = []
    train01 = []
    train02 = []
    train03 = []
    train04 = []

    for l in range(len(train_data)):
        if l < 5000:
            train00.append(train_data[l])
        elif 5000 <= l < 10000:
            train01.append(train_data[l])
        elif 10000 <= l < 15000:
            train02.append(train_data[l])
        elif 20000 <= l < 25000:
            train03.append(train_data[l])
        else:
            train04.append(train_data[l])

    cvfiles = [train00, train01, train02, train03, train04]

    # the total loop time for doing all cross validation
    # process, which is 5 times.
    for i in range(5):
        # -----training part-----
        # get all 4 training folds' filename

        # initial w vector here (b is also initialed in this function)
        classifier_w = initial_w(MAX_DIMENSION)

        # initial classifier a for average perceptron
        classifier_a = []

        updates = 0

        for a in range(MAX_DIMENSION):
            classifier_a.append(0)

        # initial number of epoch
        epoch_time = EPOCH

        # initial the 4 folds data set
        data_set = []
        # get the data set
        for f in range(len(cvfiles)):
            if f != i:
                for l in cvfiles[f]:
                    data_set.append(l)

        # train the w 10 epochs here
        for epoch in range(epoch_time):
            [classifier_w, classifier_a, updates] = do_epoch(
                rate, data_set, classifier_w, variant,
                epoch, margin, classifier_a, updates)

        # -----testing part-----
        testfile = "training0" + str(i) + ".data"
        test_data = cvfiles[i]

        if variant != 4:
            # print("using w")
            accuracy = test_classifier(test_data, classifier_w)
        else:
            # print("using a")
            accuracy = test_classifier(test_data, classifier_a)
        # sum the accuracy into the total
        total_accuracy += accuracy

    # get the average accuracy:
    ave_accuracy = total_accuracy / 5
    return ave_accuracy


def read_files(filename, directory):
    data = []
    f = open(directory + filename, "r")
    for line in f:
        arr = line.split(" ")
        new_arr = array_info(arr)
        data.append(new_arr)

    return data


# test the trained classifier. using the left fold to
# compare the predicted label with real label, return
# the accuracy of the classifier.
def test_classifier(test_data, w):
    total_predict = 0
    correct_predict = 0
    # data = reformat_data(test_data, len(w))

    for arr in test_data:
        total_predict += 1
        label = arr[0]
        b = w[0]
        predict = 0
        for x in range(1, len(arr), 2):
            index = arr[x]
            predict += arr[x + 1] * w[index] + b

        if predict > 0:
            predict = 1
        elif predict <= 0:
            predict = -1
        if predict is label:
            correct_predict += 1
    accuracy = correct_predict / total_predict
    return accuracy


def eval_classifier(test_data, w):
    # total_predict = 0
    # data = reformat_data(test_data, len(w))
    result = []
    for arr in test_data:
        # total_predict += 1
        b = w[0]
        predict = 0
        for x in range(1, len(arr), 2):
            index = arr[x]
            predict += arr[x + 1] * w[index] + b

        if predict > 0:
            predict = "1"
        elif predict <= 0:
            predict = "0"
        result.append(predict)
    return result


# Do all the experiment described in 4.3.2
# run 20 epochs, record accuracy and classifiers for each epoch,
# also record the number of updates to w.
def part2(variant):
    accuracy_result = []
    record_classifier = []
    info = []
    epoch_num = EPOCH
    # # initial rate hyper-parameter
    # r = a_r_m[1]
    # # initial margin hyper-parameter
    # m = a_r_m[2]

    r = 0.01
    # initial margin hyper-parameter
    m = 0
    if variant == 3:
        m = 0.01

    train_filename = "data.train"
    test_filename = "data.test"

    # updates = 0

    counter = 1
    train_data = read_files(train_filename, DATA_FOLD)
    test_data = read_files(test_filename, DATA_FOLD)

    w = initial_w(MAX_DIMENSION)
    a = []
    for i in range(MAX_DIMENSION):
        a.append(0)

    for e in range(epoch_num):
        [w, a, counter] = do_epoch(r, train_data, w, variant, e, m, a, counter)
        record_classifier.append([w, a])
        if variant != 4:
            accuracy = test_classifier(test_data, w)
        # else:
        #     accuracy = test_classifier(test_data, w)
            accuracy_result.append(accuracy)

    if variant is 4:
        for i in range(len(w)):
            w[i] = w[i] - 1 / counter * a[i]
        accuracy = test_classifier(test_data, w)
        for i in range(epoch_num):
            accuracy_result.append(accuracy)

    info.append(accuracy_result)
    info.append(record_classifier)
    # info.append(updates)

    return info


# Do all the experiment described in 4.3.3
# using the best classifier to test the test set
def part3(classifier):
    filename = "data.eval.anon"
    test_data = read_files(filename, DATA_FOLD)
    accuracy = test_classifier(test_data, classifier)
    return accuracy


def eval_(classifier):
    filename = "data.eval.anon"
    test_data = read_files(filename, DATA_FOLD)
    result = eval_classifier(test_data, classifier)
    return result


# modification to r for Decaying Rate perceptron
def modify_r(r, t):
    rate = r / (1 + t)
    return rate


# find the fest classifier according the index of the best accuracy
def find_best_classifier(info, variant):
    the_classifier = []

    if variant != 4:
        acc = info[0]
        classifiers = info[1]
        highest_acc = max(acc)

        for i in range(len(acc)):
            if acc[i] is highest_acc:

                the_classifier = classifiers[i][0]

                print("(The epoch of the best accuracy: ", i+1, "th epoch)")
    else:
        the_classifier = info[1][-1][0]

    return the_classifier


# function calls part 1, 2, 3 for simple perceptron
def process_variant(variant):
    if variant is 1:
        print("\n1. Simple Perceptron")
    elif variant is 2:
        print("\n2. Rate Decaying Perceptron")
    elif variant is 3:
        print("\n3. Margin Perceptron")
    elif variant is 4:
        print("\n4. Averaged Perceptron")


    # print("b. Best accuracy for the best hyper-parameter:", acc_rate_margin[0])
    result = part2(variant)
    # print("c. Number of updates on training set for 20 epoch:", result[2])
    print("d. Accuracies through 20 epochs on development set: ", result[0])

    best_classifier = find_best_classifier(result, variant)
    # print("best classifier is: ", best_classifier)

    predict = eval_(best_classifier)

    id_list = []
    id_file = "data.eval.anon.id"
    file = open(DATA_FOLD + id_file, "r")
    for line in file:
        haha = line.split()
        id_list.append(haha[0])

    out = "perceptron" + "-variant" + str(variant) + ".csv"
    outfile = open(OUTPUT_FOLD + out, "w")
    outfile.write("example_id,label\n")
    for i in range(len(id_list)):
        skr = id_list[i] + "," + predict[i] + "\n"
        outfile.write(skr)


# run all variants
def run():
    print("\nProgram starts.")
    print("Seed for Random in this program:", RANDOM_SEED)

    variants = [1, 2, 3, 4]
    for v in variants:
        process_variant(v)

    print("\nProgram finished.")

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.