Skip to content

Update run.py #11967

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
297 changes: 191 additions & 106 deletions machine_learning/forecasting/run.py
Original file line number Diff line number Diff line change
@@ -1,162 +1,247 @@
"""
this is code for forecasting
but I modified it and used it for safety checker of data
for ex: you have an online shop and for some reason some data are
missing (the amount of data that u expected are not supposed to be)
then we can use it
*ps : 1. ofc we can use normal statistic method but in this case
the data is quite absurd and only a little^^
2. ofc u can use this and modified it for forecasting purpose
for the next 3 months sales or something,
u can just adjust it for ur own purpose
"""
This code forecasts user activity and checks data safety in an online shop context.
It predicts total users based on historical data and checks if the current data is within a safe range.

Check failure on line 3 in machine_learning/forecasting/run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

machine_learning/forecasting/run.py:3:89: E501 Line too long (103 > 88)
It utilizes various machine learning models and evaluates their performance.

from warnings import simplefilter
Usage:
- Load your data from a CSV file via command-line argument.
- Ensure the CSV has columns for total users, events, and dates.
"""

import logging
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import Normalizer
from sklearn.pipeline import Pipeline

Check failure on line 16 in machine_learning/forecasting/run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F401)

machine_learning/forecasting/run.py:16:30: F401 `sklearn.pipeline.Pipeline` imported but unused
from sklearn.model_selection import train_test_split, GridSearchCV

Check failure on line 17 in machine_learning/forecasting/run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F401)

machine_learning/forecasting/run.py:17:55: F401 `sklearn.model_selection.GridSearchCV` imported but unused
from sklearn.ensemble import RandomForestRegressor
from sklearn.svm import SVR
from xgboost import XGBRegressor
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from warnings import simplefilter
import joblib
import argparse

# Configure logging

Check failure on line 27 in machine_learning/forecasting/run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (I001)

machine_learning/forecasting/run.py:11:1: I001 Import block is un-sorted or un-formatted
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

# Hyperparameters
CONFIG = {
"svr": {"kernel": "rbf", "C": 1, "gamma": 0.1, "epsilon": 0.1},
"random_forest": {"n_estimators": 100, "max_depth": None, "min_samples_split": 2},
"xgboost": {"n_estimators": 100, "learning_rate": 0.1, "max_depth": 3},
"sarimax_order": (1, 2, 1),
"sarimax_seasonal_order": (1, 1, 1, 7), # Weekly seasonality
}


def load_data(file_path: str) -> pd.DataFrame:
"""Load data from a CSV file."""
try:
data = pd.read_csv(file_path)
logging.info("Data loaded successfully.")
return data
except FileNotFoundError:
logging.error("The file was not found.")
raise
except Exception as e:
logging.error(f"Error loading data: {e}")
raise


def normalize_data(data: pd.DataFrame) -> np.ndarray:
"""Normalize the input data."""
return Normalizer().fit_transform(data.values)


def feature_engineering(data: pd.DataFrame) -> pd.DataFrame:
"""Create new features from the existing data."""
data["day_of_week"] = pd.to_datetime(data["date"]).dt.dayofweek
data["week_of_year"] = pd.to_datetime(data["date"]).dt.isocalendar().week
return data


def train_test_split_data(normalize_df: np.ndarray) -> tuple:
"""Split the normalized data into training and test sets."""
total_user = normalize_df[:, 0].tolist()
total_match = normalize_df[:, 1].tolist()
total_date = normalize_df[:, 2].tolist()

x = normalize_df[:, [1, 2]].tolist()
x_train, x_test = train_test_split(x, test_size=0.2, random_state=42)

train_user = total_user[: len(x_train)]
test_user = total_user[len(x_train) :]

return (
x_train,
x_test,
train_user,
test_user,
total_match[: len(x_train)],
total_match[len(x_train) :],
total_date,
)


def linear_regression_prediction(
train_dt: list, train_usr: list, train_mtch: list, test_dt: list, test_mtch: list
) -> float:
"""
First method: linear regression
input : training data (date, total_user, total_event) in list of float
output : list of total user prediction in float
>>> n = linear_regression_prediction([2,3,4,5], [5,3,4,6], [3,1,2,4], [2,1], [2,2])
>>> bool(abs(n - 5.0) < 1e-6) # Checking precision because of floating point errors
True
"""
"""Predict total users using linear regression."""
x = np.array([[1, item, train_mtch[i]] for i, item in enumerate(train_dt)])
y = np.array(train_usr)
beta = np.dot(np.dot(np.linalg.inv(np.dot(x.transpose(), x)), x.transpose()), y)
return abs(beta[0] + test_dt[0] * beta[1] + test_mtch[0] + beta[2])

# Compute coefficients using Normal Equation
beta = np.linalg.inv(x.T @ x) @ x.T @ y
return float(beta[0] + test_dt[0] * beta[1] + test_mtch[0] * beta[2])


def sarimax_predictor(train_user: list, train_match: list, test_match: list) -> float:
"""
second method: Sarimax
sarimax is a statistic method which using previous input
and learn its pattern to predict future data
input : training data (total_user, with exog data = total_event) in list of float
output : list of total user prediction in float
>>> sarimax_predictor([4,2,6,8], [3,1,2,4], [2])
6.6666671111109626
"""
# Suppress the User Warning raised by SARIMAX due to insufficient observations
"""Predict total users using SARIMAX."""
simplefilter("ignore", UserWarning)
order = (1, 2, 1)
seasonal_order = (1, 1, 1, 7)

model = SARIMAX(
train_user, exog=train_match, order=order, seasonal_order=seasonal_order
train_user,
exog=train_match,
order=CONFIG["sarimax_order"],
seasonal_order=CONFIG["sarimax_seasonal_order"],
)
model_fit = model.fit(disp=False, maxiter=600, method="nm")
result = model_fit.predict(1, len(test_match), exog=[test_match])

result = model_fit.predict(
start=len(train_user),
end=len(train_user) + len(test_match) - 1,
exog=test_match,
)
return float(result[0])


def support_vector_regressor(x_train: list, x_test: list, train_user: list) -> float:
"""
Third method: Support vector regressor
svr is quite the same with svm(support vector machine)
it uses the same principles as the SVM for classification,
with only a few minor differences and the only different is that
it suits better for regression purpose
input : training data (date, total_user, total_event) in list of float
where x = list of set (date and total event)
output : list of total user prediction in float
>>> support_vector_regressor([[5,2],[1,5],[6,2]], [[3,2]], [2,1,4])
1.634932078116079
"""
regressor = SVR(kernel="rbf", C=1, gamma=0.1, epsilon=0.1)
"""Predict total users using Support Vector Regressor."""
regressor = SVR(**CONFIG["svr"])
regressor.fit(x_train, train_user)
y_pred = regressor.predict(x_test)
return float(y_pred[0])


def interquartile_range_checker(train_user: list) -> float:
"""
Optional method: interquatile range
input : list of total user in float
output : low limit of input in float
this method can be used to check whether some data is outlier or not
>>> interquartile_range_checker([1,2,3,4,5,6,7,8,9,10])
2.8
"""
train_user.sort()
q1 = np.percentile(train_user, 25)
q3 = np.percentile(train_user, 75)
iqr = q3 - q1
low_lim = q1 - (iqr * 0.1)
return float(low_lim)
def random_forest_regressor(x_train: list, x_test: list, train_user: list) -> float:
"""Predict total users using Random Forest Regressor."""
model = RandomForestRegressor(**CONFIG["random_forest"])
model.fit(x_train, train_user)
return model.predict(x_test)[0]


def xgboost_regressor(x_train: list, x_test: list, train_user: list) -> float:
"""Predict total users using XGBoost Regressor."""
model = XGBRegressor(**CONFIG["xgboost"])
model.fit(x_train, train_user)
return model.predict(x_test)[0]


def data_safety_checker(list_vote: list, actual_result: float) -> bool:
"""
Used to review all the votes (list result prediction)
and compare it to the actual result.
input : list of predictions
output : print whether it's safe or not
>>> data_safety_checker([2, 3, 4], 5.0)
False
"""
"""Check if predictions are within a safe range compared to the actual result."""
safe = 0
not_safe = 0

if not isinstance(actual_result, float):
raise TypeError("Actual result should be float. Value passed is a list")
if not isinstance(actual_result, (float, int)):
logging.error("Actual result should be float or int.")
raise TypeError("Actual result should be float or int.")

for i in list_vote:
if i > actual_result:
safe = not_safe + 1
elif abs(abs(i) - abs(actual_result)) <= 0.1:
for prediction in list_vote:
if prediction > actual_result:
safe += 1
elif abs(prediction - actual_result) <= 0.1:
safe += 1

Check failure on line 158 in machine_learning/forecasting/run.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (SIM114)

machine_learning/forecasting/run.py:155:9: SIM114 Combine `if` branches using logical `or` operator
else:
not_safe += 1
return safe > not_safe


def evaluate_predictions(actual: list, predictions: list):
"""Evaluate model predictions using various metrics."""
mse = mean_squared_error(actual, predictions)
mae = mean_absolute_error(actual, predictions)
r2 = r2_score(actual, predictions)
logging.info(f"Evaluation Metrics:\nMSE: {mse}\nMAE: {mae}\nR²: {r2}")


def plot_results(res_vote: list, actual: float):
"""Plot the predicted vs actual results."""
plt.figure(figsize=(10, 5))
plt.plot(range(len(res_vote)), res_vote, label="Predictions", marker="o")
plt.axhline(y=actual, color="r", linestyle="-", label="Actual Result")
plt.title("Predicted vs Actual User Count")
plt.xlabel("Model")
plt.ylabel("User Count")
plt.xticks(
range(len(res_vote)),
["Linear Regression", "SARIMAX", "SVR", "Random Forest", "XGBoost"],
)
plt.legend()
plt.show()


def save_model(model, filename):
"""Save the trained model to a file."""
joblib.dump(model, filename)
logging.info(f"Model saved to {filename}.")


if __name__ == "__main__":
"""
data column = total user in a day, how much online event held in one day,
what day is that(sunday-saturday)
"""
data_input_df = pd.read_csv("ex_data.csv")

# start normalization
normalize_df = Normalizer().fit_transform(data_input_df.values)
# split data
total_date = normalize_df[:, 2].tolist()
total_user = normalize_df[:, 0].tolist()
total_match = normalize_df[:, 1].tolist()
# Argument parser for command line execution
parser = argparse.ArgumentParser(
description="User Activity Forecasting and Safety Checker"
)
parser.add_argument(
"file_path", type=str, help="Path to the CSV file containing the data"
)
args = parser.parse_args()

# for svr (input variable = total date and total match)
x = normalize_df[:, [1, 2]].tolist()
x_train = x[: len(x) - 1]
x_test = x[len(x) - 1 :]
# Load and process data
data_input_df = load_data(args.file_path)

# Feature Engineering
data_input_df = feature_engineering(data_input_df)

# for linear regression & sarimax
train_date = total_date[: len(total_date) - 1]
train_user = total_user[: len(total_user) - 1]
train_match = total_match[: len(total_match) - 1]
# Normalize data
normalize_df = normalize_data(data_input_df)

test_date = total_date[len(total_date) - 1 :]
test_user = total_user[len(total_user) - 1 :]
test_match = total_match[len(total_match) - 1 :]
# Split data into relevant lists
x_train, x_test, train_user, test_user, train_match, test_match, total_date = (
train_test_split_data(normalize_df)
)

# voting system with forecasting
# Voting system with forecasting
res_vote = [
linear_regression_prediction(
train_date, train_user, train_match, test_date, test_match
total_date[: len(train_user)],
train_user,
train_match,
total_date[len(train_user) : len(train_user) + len(test_user)],
test_match,
),
sarimax_predictor(train_user, train_match, test_match),
support_vector_regressor(x_train, x_test, train_user),
random_forest_regressor(x_train, x_test, train_user),
xgboost_regressor(x_train, x_test, train_user),
]

# check the safety of today's data
not_str = "" if data_safety_checker(res_vote, test_user[0]) else "not "
print(f"Today's data is {not_str}safe.")
# Evaluate predictions
evaluate_predictions(test_user, res_vote)

# Check the safety of today's data
is_safe = data_safety_checker(res_vote, test_user[0])
not_str = "" if is_safe else "not "
logging.info(f"Today's data is {not_str}safe.")

# Plot the results
plot_results(res_vote, test_user[0])

# Save models for future use
save_model(support_vector_regressor, "svr_model.joblib")
save_model(RandomForestRegressor(**CONFIG["random_forest"]), "rf_model.joblib")
save_model(XGBRegressor(**CONFIG["xgboost"]), "xgb_model.joblib")
Loading