Skip to content

Refactor local_weighted_learning.py to use np.array #8069

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion DIRECTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
* [Huffman](compression/huffman.py)
* [Lempel Ziv](compression/lempel_ziv.py)
* [Lempel Ziv Decompress](compression/lempel_ziv_decompress.py)
* [Lz77](compression/lz77.py)
* [Peak Signal To Noise Ratio](compression/peak_signal_to_noise_ratio.py)
* [Run Length Encoding](compression/run_length_encoding.py)

Expand Down Expand Up @@ -1162,7 +1163,7 @@
* [Get Amazon Product Data](web_programming/get_amazon_product_data.py)
* [Get Imdb Top 250 Movies Csv](web_programming/get_imdb_top_250_movies_csv.py)
* [Get Imdbtop](web_programming/get_imdbtop.py)
* [Get Top Billioners](web_programming/get_top_billioners.py)
* [Get Top Billionaires](web_programming/get_top_billionaires.py)
* [Get Top Hn Posts](web_programming/get_top_hn_posts.py)
* [Get User Tweets](web_programming/get_user_tweets.py)
* [Giphy](web_programming/giphy.py)
Expand Down
116 changes: 66 additions & 50 deletions machine_learning/local_weighted_learning/local_weighted_learning.py
Original file line number Diff line number Diff line change
@@ -1,116 +1,128 @@
# Required imports to run this file
import matplotlib.pyplot as plt
import numpy as np


# weighted matrix
def weighted_matrix(point: np.mat, training_data_x: np.mat, bandwidth: float) -> np.mat:
def weighted_matrix(
point: np.array, training_data_x: np.array, bandwidth: float
) -> np.array:
"""
Calculate the weight for every point in the
data set. It takes training_point , query_point, and tau
Here Tau is not a fixed value it can be varied depends on output.
tau --> bandwidth
xmat -->Training data
point --> the x where we want to make predictions
>>> weighted_matrix(np.array([1., 1.]),np.mat([[16.99, 10.34], [21.01,23.68],
... [24.59,25.69]]), 0.6)
matrix([[1.43807972e-207, 0.00000000e+000, 0.00000000e+000],
[0.00000000e+000, 0.00000000e+000, 0.00000000e+000],
[0.00000000e+000, 0.00000000e+000, 0.00000000e+000]])
Calculate the weight for every point in the data set.
point --> the x value at which we want to make predictions
>>> weighted_matrix(
... np.array([1., 1.]),
... np.array([[16.99, 10.34], [21.01,23.68], [24.59,25.69]]),
... 0.6
... )
array([[1.43807972e-207, 0.00000000e+000, 0.00000000e+000],
[0.00000000e+000, 0.00000000e+000, 0.00000000e+000],
[0.00000000e+000, 0.00000000e+000, 0.00000000e+000]])
"""
# m is the number of training samples
m, n = np.shape(training_data_x)
# Initializing weights as identity matrix
weights = np.mat(np.eye(m))
m, _ = np.shape(training_data_x) # m is the number of training samples
weights = np.eye(m) # Initializing weights as identity matrix

# calculating weights for all training examples [x(i)'s]
for j in range(m):
diff = point - training_data_x[j]
weights[j, j] = np.exp(diff * diff.T / (-2.0 * bandwidth**2))
weights[j, j] = np.exp(diff @ diff.T / (-2.0 * bandwidth**2))
return weights


def local_weight(
point: np.mat, training_data_x: np.mat, training_data_y: np.mat, bandwidth: float
) -> np.mat:
point: np.array,
training_data_x: np.array,
training_data_y: np.array,
bandwidth: float,
) -> np.array:
"""
Calculate the local weights using the weight_matrix function on training data.
Return the weighted matrix.
>>> local_weight(np.array([1., 1.]),np.mat([[16.99, 10.34], [21.01,23.68],
... [24.59,25.69]]),np.mat([[1.01, 1.66, 3.5]]), 0.6)
matrix([[0.00873174],
[0.08272556]])
>>> local_weight(
... np.array([1., 1.]),
... np.array([[16.99, 10.34], [21.01,23.68], [24.59,25.69]]),
... np.array([[1.01, 1.66, 3.5]]),
... 0.6
... )
array([[0.00873174],
[0.08272556]])
"""
weight = weighted_matrix(point, training_data_x, bandwidth)
w = (training_data_x.T * (weight * training_data_x)).I * (
training_data_x.T * weight * training_data_y.T
w = np.linalg.inv(training_data_x.T @ (weight @ training_data_x)) @ (
training_data_x.T @ weight @ training_data_y.T
)

return w


def local_weight_regression(
training_data_x: np.mat, training_data_y: np.mat, bandwidth: float
) -> np.mat:
training_data_x: np.array, training_data_y: np.array, bandwidth: float
) -> np.array:
"""
Calculate predictions for each data point on axis.
>>> local_weight_regression(np.mat([[16.99, 10.34], [21.01,23.68],
... [24.59,25.69]]),np.mat([[1.01, 1.66, 3.5]]), 0.6)
Calculate predictions for each data point on axis
>>> local_weight_regression(
... np.array([[16.99, 10.34], [21.01, 23.68], [24.59, 25.69]]),
... np.array([[1.01, 1.66, 3.5]]),
... 0.6
... )
array([1.07173261, 1.65970737, 3.50160179])
"""
m, n = np.shape(training_data_x)
m, _ = np.shape(training_data_x)
ypred = np.zeros(m)

for i, item in enumerate(training_data_x):
ypred[i] = item * local_weight(
ypred[i] = item @ local_weight(
item, training_data_x, training_data_y, bandwidth
)

return ypred


def load_data(dataset_name: str, cola_name: str, colb_name: str) -> np.mat:
def load_data(
dataset_name: str, cola_name: str, colb_name: str
) -> tuple[np.array, np.array, np.array, np.array]:
"""
Function used for loading data from the seaborn splitting into x and y points
Load data from seaborn and split it into x and y points
"""
import seaborn as sns

data = sns.load_dataset(dataset_name)
col_a = np.array(data[cola_name]) # total_bill
col_b = np.array(data[colb_name]) # tip

mcol_a = np.mat(col_a)
mcol_b = np.mat(col_b)
mcol_a = col_a.copy()
mcol_b = col_b.copy()

m = np.shape(mcol_b)[1]
one = np.ones((1, m), dtype=int)
one = np.ones(np.shape(mcol_b)[0], dtype=int)

# horizontal stacking
training_data_x = np.hstack((one.T, mcol_a.T))
# pairing elements of one and mcol_a
training_data_x = np.column_stack((one, mcol_a))

return training_data_x, mcol_b, col_a, col_b


def get_preds(training_data_x: np.mat, mcol_b: np.mat, tau: float) -> np.ndarray:
def get_preds(training_data_x: np.array, mcol_b: np.array, tau: float) -> np.array:
"""
Get predictions with minimum error for each training data
>>> get_preds(np.mat([[16.99, 10.34], [21.01,23.68],
... [24.59,25.69]]),np.mat([[1.01, 1.66, 3.5]]), 0.6)
>>> get_preds(
... np.array([[16.99, 10.34], [21.01, 23.68], [24.59, 25.69]]),
... np.array([[1.01, 1.66, 3.5]]),
... 0.6
... )
array([1.07173261, 1.65970737, 3.50160179])
"""
ypred = local_weight_regression(training_data_x, mcol_b, tau)
return ypred


def plot_preds(
training_data_x: np.mat,
predictions: np.ndarray,
col_x: np.ndarray,
col_y: np.ndarray,
training_data_x: np.array,
predictions: np.array,
col_x: np.array,
col_y: np.array,
cola_name: str,
colb_name: str,
) -> plt.plot:
"""
This function used to plot predictions and display the graph
Plot predictions and display the graph
"""
xsort = training_data_x.copy()
xsort.sort(axis=0)
Expand All @@ -128,6 +140,10 @@ def plot_preds(


if __name__ == "__main__":
import doctest

doctest.testmod()

training_data_x, mcol_b, col_a, col_b = load_data("tips", "total_bill", "tip")
predictions = get_preds(training_data_x, mcol_b, 0.5)
plot_preds(training_data_x, predictions, col_a, col_b, "total_bill", "tip")