|
1 |
| -# XGBoost Regressor Example |
2 | 1 | import numpy as np
|
3 |
| -from sklearn.datasets import fetch_california_housing |
4 |
| -from sklearn.metrics import mean_absolute_error, mean_squared_error |
5 |
| -from sklearn.model_selection import train_test_split |
6 |
| -from xgboost import XGBRegressor |
| 2 | +import pandas as pd |
| 3 | +from collections import defaultdict |
| 4 | +import math |
7 | 5 |
|
| 6 | +class XGBoostModel(): |
| 7 | + ''' XGBoost regressor. |
| 8 | + |
| 9 | + This implementation includes a simplified version of the XGBoost algorithm |
| 10 | + for regression tasks. It includes gradient boosting with decision trees as base learners. |
| 11 | + ''' |
| 12 | + |
| 13 | + def __init__(self, params=None, random_seed=None): |
| 14 | + '''Initialize XGBoostModel. |
| 15 | + |
| 16 | + Parameters: |
| 17 | + params (dict): Hyperparameters for the XGBoost model. |
| 18 | + random_seed (int): Seed for random number generation. |
| 19 | + ''' |
| 20 | + # Set hyperparameters with defaults |
| 21 | + self.params = defaultdict(lambda: None, params) |
| 22 | + self.subsample = self.params['subsample'] or 1.0 |
| 23 | + self.learning_rate = self.params['learning_rate'] or 0.3 |
| 24 | + self.base_prediction = self.params['base_score'] or 0.5 |
| 25 | + self.max_depth = self.params['max_depth'] or 5 |
| 26 | + self.rng = np.random.default_rng(seed=random_seed) |
| 27 | + self.boosters = [] |
| 28 | + |
| 29 | + def fit(self, X, y, objective, num_boost_round, verbose=False): |
| 30 | + '''Train the XGBoost model. |
| 31 | + |
| 32 | + Parameters: |
| 33 | + X (pd.DataFrame): Feature matrix. |
| 34 | + y (pd.Series): Target values. |
| 35 | + objective (ObjectiveFunction): Objective function for regression. |
| 36 | + num_boost_round (int): Number of boosting rounds. |
| 37 | + verbose (bool): Whether to print training progress. |
| 38 | + ''' |
| 39 | + # Initialize predictions with base score |
| 40 | + current_predictions = self.base_prediction * np.ones(shape=len(y)) |
| 41 | + for i in range(num_boost_round): |
| 42 | + # Compute negative gradient and hessian |
| 43 | + gradients = objective.gradient(y, current_predictions) |
| 44 | + hessians = objective.hessian(y, current_predictions) |
| 45 | + # Apply subsampling if required |
| 46 | + sample_idxs = None if self.subsample == 1.0 else self.rng.choice( |
| 47 | + len(y), size=math.floor(self.subsample*len(y)), replace=False) |
| 48 | + booster = TreeBooster(X, gradients, hessians, self.params, |
| 49 | + self.max_depth, sample_idxs) |
| 50 | + # Update predictions using learning rate and booster predictions |
| 51 | + current_predictions += self.learning_rate * booster.predict(X) |
| 52 | + self.boosters.append(booster) |
| 53 | + if verbose: |
| 54 | + print(f'[{i}] train loss = {objective.loss(y, current_predictions)}') |
| 55 | + |
| 56 | + def predict(self, X): |
| 57 | + '''Make predictions using the trained model. |
| 58 | + |
| 59 | + Parameters: |
| 60 | + X (pd.DataFrame): Feature matrix for prediction. |
| 61 | + |
| 62 | + Returns: |
| 63 | + np.ndarray: Predicted values. |
| 64 | + ''' |
| 65 | + # Calculate predictions using all boosters |
| 66 | + return (self.base_prediction + self.learning_rate * |
| 67 | + np.sum([booster.predict(X) for booster in self.boosters], axis=0)) |
8 | 68 |
|
9 |
| -def data_handling(data: dict) -> tuple: |
10 |
| - # Split dataset into features and target. Data is features. |
11 |
| - """ |
12 |
| - >>> data_handling(( |
13 |
| - ... {'data':'[ 8.3252 41. 6.9841269 1.02380952 322. 2.55555556 37.88 -122.23 ]' |
14 |
| - ... ,'target':([4.526])})) |
15 |
| - ('[ 8.3252 41. 6.9841269 1.02380952 322. 2.55555556 37.88 -122.23 ]', [4.526]) |
16 |
| - """ |
17 |
| - return (data["data"], data["target"]) |
18 | 69 |
|
| 70 | +class TreeBooster(): |
| 71 | + '''Decision tree booster for XGBoost regressor.''' |
| 72 | + |
| 73 | + def __init__(self, X, g, h, params, max_depth, idxs=None): |
| 74 | + '''Initialize a decision tree booster. |
| 75 | + |
| 76 | + Parameters: |
| 77 | + X (pd.DataFrame): Feature matrix. |
| 78 | + g (np.ndarray): Gradient values. |
| 79 | + h (np.ndarray): Hessian values. |
| 80 | + params (dict): Hyperparameters for the booster. |
| 81 | + max_depth (int): Maximum depth of the tree. |
| 82 | + idxs (np.ndarray): Indices of the samples used in this booster. |
| 83 | + ''' |
| 84 | + # Set hyperparameters |
| 85 | + self.params = params |
| 86 | + self.max_depth = max_depth |
| 87 | + assert self.max_depth >= 0, 'max_depth must be nonnegative' |
| 88 | + self.min_child_weight = params['min_child_weight'] or 1.0 |
| 89 | + self.reg_lambda = params['reg_lambda'] or 1.0 |
| 90 | + self.gamma = params['gamma'] or 0.0 |
| 91 | + self.colsample_bynode = params['colsample_bynode'] or 1.0 |
| 92 | + |
| 93 | + # Set data and indices |
| 94 | + if isinstance(g, pd.Series): g = g.values |
| 95 | + if isinstance(h, pd.Series): h = h.values |
| 96 | + if idxs is None: idxs = np.arange(len(g)) |
| 97 | + self.X, self.g, self.h, self.idxs = X, g, h, idxs |
| 98 | + self.n, self.c = len(idxs), X.shape[1] |
| 99 | + |
| 100 | + # Initialize node value |
| 101 | + self.value = -g[idxs].sum() / (h[idxs].sum() + self.reg_lambda) |
| 102 | + self.best_score_so_far = 0. |
| 103 | + |
| 104 | + # Recursively build the tree |
| 105 | + if self.max_depth > 0: |
| 106 | + self._maybe_insert_child_nodes() |
19 | 107 |
|
20 |
| -def xgboost( |
21 |
| - features: np.ndarray, target: np.ndarray, test_features: np.ndarray |
22 |
| -) -> np.ndarray: |
23 |
| - """ |
24 |
| - >>> xgboost(np.array([[ 2.3571 , 52. , 6.00813008, 1.06775068, |
25 |
| - ... 907. , 2.45799458, 40.58 , -124.26]]),np.array([1.114]), |
26 |
| - ... np.array([[1.97840000e+00, 3.70000000e+01, 4.98858447e+00, 1.03881279e+00, |
27 |
| - ... 1.14300000e+03, 2.60958904e+00, 3.67800000e+01, -1.19780000e+02]])) |
28 |
| - array([[1.1139996]], dtype=float32) |
29 |
| - """ |
30 |
| - xgb = XGBRegressor( |
31 |
| - verbosity=0, random_state=42, tree_method="exact", base_score=0.5 |
32 |
| - ) |
33 |
| - xgb.fit(features, target) |
34 |
| - # Predict target for test data |
35 |
| - predictions = xgb.predict(test_features) |
36 |
| - predictions = predictions.reshape(len(predictions), 1) |
37 |
| - return predictions |
| 108 | + @property |
| 109 | + def is_leaf(self): |
| 110 | + '''Check if the node is a leaf.''' |
| 111 | + return self.best_score_so_far == 0. |
| 112 | + |
| 113 | + def _maybe_insert_child_nodes(self): |
| 114 | + '''Recursively insert child nodes to build the tree.''' |
| 115 | + for i in range(self.c): |
| 116 | + self._find_better_split(i) |
| 117 | + if self.is_leaf: |
| 118 | + return |
| 119 | + # Split the data based on the best feature and threshold |
| 120 | + x = self.X.values[self.idxs, self.split_feature_idx] |
| 121 | + left_idx = np.nonzero(x <= self.threshold)[0] |
| 122 | + right_idx = np.nonzero(x > self.threshold)[0] |
| 123 | + # Recur for left and right subtrees |
| 124 | + self.left = TreeBooster(self.X, self.g, self.h, self.params, |
| 125 | + self.max_depth - 1, self.idxs[left_idx]) |
| 126 | + self.right = TreeBooster(self.X, self.g, self.h, self.params, |
| 127 | + self.max_depth - 1, self.idxs[right_idx]) |
38 | 128 |
|
| 129 | + def _find_better_split(self, feature_idx): |
| 130 | + '''Find the best split for a feature.''' |
| 131 | + x = self.X.values[self.idxs, feature_idx] |
| 132 | + g, h = self.g[self.idxs], self.h[self.idxs] |
| 133 | + sort_idx = np.argsort(x) |
| 134 | + sort_g, sort_h, sort_x = g[sort_idx], h[sort_idx], x[sort_idx] |
| 135 | + sum_g, sum_h = g.sum(), h.sum() |
| 136 | + sum_g_right, sum_h_right = sum_g, sum_h |
| 137 | + sum_g_left, sum_h_left = 0., 0. |
39 | 138 |
|
40 |
| -def main() -> None: |
41 |
| - """ |
42 |
| - >>> main() |
43 |
| - Mean Absolute Error : 0.30957163379906033 |
44 |
| - Mean Square Error : 0.22611560196662744 |
| 139 | + for i in range(0, self.n - 1): |
| 140 | + g_i, h_i, x_i, x_i_next = sort_g[i], sort_h[i], sort_x[i], sort_x[i + 1] |
| 141 | + sum_g_left += g_i |
| 142 | + sum_g_right -= g_i |
| 143 | + sum_h_left += h_i |
| 144 | + sum_h_right -= h_i |
| 145 | + if sum_h_left < self.min_child_weight or x_i == x_i_next: |
| 146 | + continue |
| 147 | + if sum_h_right < self.min_child_weight: |
| 148 | + break |
45 | 149 |
|
46 |
| - The URL for this algorithm |
47 |
| - https://xgboost.readthedocs.io/en/stable/ |
48 |
| - California house price dataset is used to demonstrate the algorithm. |
49 |
| - """ |
50 |
| - # Load California house price dataset |
51 |
| - california = fetch_california_housing() |
52 |
| - data, target = data_handling(california) |
53 |
| - x_train, x_test, y_train, y_test = train_test_split( |
54 |
| - data, target, test_size=0.25, random_state=1 |
55 |
| - ) |
56 |
| - predictions = xgboost(x_train, y_train, x_test) |
57 |
| - # Error printing |
58 |
| - print(f"Mean Absolute Error : {mean_absolute_error(y_test, predictions)}") |
59 |
| - print(f"Mean Square Error : {mean_squared_error(y_test, predictions)}") |
| 150 | + gain = 0.5 * ((sum_g_left**2 / (sum_h_left + self.reg_lambda)) |
| 151 | + + (sum_g_right**2 / (sum_h_right + self.reg_lambda)) |
| 152 | + - (sum_g**2 / (sum_h + self.reg_lambda)) |
| 153 | + ) - self.gamma/2 # Eq(7) in the xgboost paper |
| 154 | + if gain > self.best_score_so_far: |
| 155 | + self.split_feature_idx = feature_idx |
| 156 | + self.best_score_so_far = gain |
| 157 | + self.threshold = (x_i + x_i_next) / 2 |
| 158 | + |
| 159 | + def predict(self, X): |
| 160 | + '''Make predictions using the trained booster.''' |
| 161 | + return np.array([self._predict_row(row) for _, row in X.iterrows()]) |
60 | 162 |
|
61 |
| - |
62 |
| -if __name__ == "__main__": |
63 |
| - import doctest |
64 |
| - |
65 |
| - doctest.testmod(verbose=True) |
66 |
| - main() |
| 163 | + def _predict_row(self, row): |
| 164 | + '''Recursively predict a single data point.''' |
| 165 | + if self.is_leaf: |
| 166 | + return self.value |
| 167 | + child = self.left if row[self.split_feature_idx] <= self.threshold \ |
| 168 | + else self.right |
| 169 | + return child._predict_row(row) |
0 commit comments