11import numpy as np
22from scipy .interpolate import BSpline
33from sklearn .base import BaseEstimator , TransformerMixin
4+ from sklearn .metrics import pairwise_distances
45from sklearn .preprocessing import SplineTransformer
56from sklearn .tree import DecisionTreeClassifier , DecisionTreeRegressor
7+ from sklearn .utils .validation import check_array
68
79
810class SplineExpansion (BaseEstimator , TransformerMixin ):
@@ -41,6 +43,24 @@ def __init__(
4143 if spline_implementation not in ["scipy" , "sklearn" ]:
4244 raise ValueError ("Invalid spline implementation. Choose 'scipy' or 'sklearn'." )
4345
46+ self .fitted = False
47+
48+ @staticmethod
49+ def knot_identification_using_decision_tree (X , y , task = "regression" , n_knots = 5 ):
50+ # Use DecisionTreeClassifier for classification tasks
51+ knots = []
52+ if task == "classification" :
53+ tree = DecisionTreeClassifier (max_leaf_nodes = n_knots + 1 )
54+ elif task == "regression" :
55+ tree = DecisionTreeRegressor (max_leaf_nodes = n_knots + 1 )
56+ else :
57+ raise ValueError ("Invalid task type. Choose 'regression' or 'classification'." )
58+ tree .fit (X , y )
59+ # Extract thresholds from the decision tree
60+ thresholds = tree .tree_ .threshold [tree .tree_ .threshold != - 2 ] # type: ignore
61+ knots .append (np .sort (thresholds ))
62+ return knots
63+
4464 def fit (self , X , y = None ):
4565 """
4666 Fit the preprocessor by determining the knot positions.
@@ -52,43 +72,57 @@ def fit(self, X, y=None):
5272 Returns:
5373 - self: Fitted preprocessor.
5474 """
55- X = np .asarray (X )
75+ if self .use_decision_tree and y is None :
76+ raise ValueError ("Target variable 'y' must be provided when use_decision_tree=True." )
5677
57- if self .use_decision_tree :
58- if y is None :
59- raise ValueError ("Target variable 'y' must be provided when use_decision_tree=True." )
60- y = np .asarray (y )
61-
62- self .knots = []
63- for i in range (X .shape [1 ]):
64- x_col = X [:, i ].reshape (- 1 , 1 )
65-
66- # Use DecisionTreeClassifier for classification tasks
67- if self .task == "classification" :
68- tree = DecisionTreeClassifier (max_leaf_nodes = self .n_knots + 1 )
69- elif self .task == "regression" :
70- tree = DecisionTreeRegressor (max_leaf_nodes = self .n_knots + 1 )
71- else :
72- raise ValueError ("Invalid task type. Choose 'regression' or 'classification'." )
73-
74- tree .fit (x_col , y )
75-
76- # Extract thresholds from the decision tree
77- thresholds = tree .tree_ .threshold [tree .tree_ .threshold != - 2 ] # type: ignore
78- self .knots .append (np .sort (thresholds ))
79- else :
80- # Compute knots based on uniform spacing or quantile
81- self .knots = []
82- for i in range (X .shape [1 ]):
83- if self .strategy == "quantile" :
84- # Use quantile to determine knot locations
85- quantiles = np .linspace (0 , 1 , self .n_knots + 2 )[1 :- 1 ]
86- knots = np .quantile (X [:, i ], quantiles )
87- self .knots .append (knots )
88- elif self .strategy == "uniform" :
89- # Use uniform spacing within the range of the feature
90- knots = np .linspace (np .min (X [:, i ]), np .max (X [:, i ]), self .n_knots + 2 )[1 :- 1 ]
91- self .knots .append (knots )
78+ self .knots = []
79+
80+ if self .use_decision_tree and self .spline_implementation == "scipy" :
81+ self .knots = self .knot_identification_using_decision_tree (X , y , self .task , self .n_knots )
82+ self .fitted = True
83+
84+ elif self .spline_implementation == "scipy" and not self .use_decision_tree :
85+ if self .strategy == "quantile" :
86+ # Use quantile to determine knot locations
87+ quantiles = np .linspace (0 , 1 , self .n_knots + 2 )[1 :- 1 ]
88+ knots = np .quantile (X , quantiles )
89+ self .knots .append (knots )
90+ self .fitted = True
91+ # print("Scipy spline implementation using quantile works in fit phase")
92+ elif self .strategy == "uniform" :
93+ # Use uniform spacing within the range of the feature
94+ knots = np .linspace (np .min (X ), np .max (X ), self .n_knots + 2 )[1 :- 1 ]
95+ self .knots .append (knots )
96+ self .fitted = True
97+ # print("Scipy spline implementation using uniform works in fit phase")
98+
99+ elif self .use_decision_tree and self .spline_implementation == "sklearn" :
100+ self .knots = self .knot_identification_using_decision_tree (X , y , self .task , self .n_knots )
101+ knots = np .vstack (self .knots ).T
102+ self .transformer = SplineTransformer (
103+ n_knots = self .n_knots , degree = self .degree , include_bias = False , knots = knots
104+ )
105+ self .transformer .fit (X )
106+ self .fitted = True
107+
108+ elif self .spline_implementation == "sklearn" and not self .use_decision_tree :
109+ if self .strategy == "quantile" :
110+ # print("Using sklearn spline transformer using quantile")
111+ # print()
112+ self .transformer = SplineTransformer (
113+ n_knots = self .n_knots , degree = self .degree , include_bias = False , knots = "quantile"
114+ )
115+ self .fitted = True
116+ self .transformer .fit (X )
117+
118+ elif self .strategy == "uniform" :
119+ # print("Using sklearn spline transformer using uniform")
120+ # print()
121+ self .transformer = SplineTransformer (
122+ n_knots = self .n_knots , degree = self .degree , include_bias = False , knots = "uniform"
123+ )
124+ self .fitted = True
125+ self .transformer .fit (X )
92126
93127 return self
94128
@@ -105,43 +139,148 @@ def transform(self, X):
105139 if self .knots is None :
106140 raise ValueError ("Knots have not been initialized. Please fit the preprocessor first." )
107141
108- X = np .asarray (X )
109142 transformed_features = []
110143
144+ if self .fitted is False :
145+ raise ValueError ("Model has not been fitted. Please fit the model first." )
146+
111147 if self .spline_implementation == "scipy" :
112- for i in range (X .shape [1 ]):
113- x_col = X [:, i ]
114- knots = self .knots [i ] # type: ignore
148+ # Extend the knots for boundary conditions
149+ t = np .concatenate (([self .knots [0 ]] * self .degree , self .knots , [self .knots [- 1 ]] * self .degree ))
150+
151+ # Create spline basis functions for this feature
152+ spline_basis = [
153+ BSpline .basis_element (t [j : j + self .degree + 2 ])(X ) for j in range (len (t ) - self .degree - 1 )
154+ ]
155+ # Stack and append transformed features
156+ transformed_features .append (np .vstack (spline_basis ).T )
157+ # Concatenate all transformed features
158+ return np .hstack (transformed_features )
159+ elif self .spline_implementation == "sklearn" :
160+ return self .transformer .transform (X )
115161
116- # Extend the knots for boundary conditions
117- t = np .concatenate (([knots [0 ]] * self .degree , knots , [knots [- 1 ]] * self .degree ))
118162
119- # Create spline basis functions for this feature
120- spline_basis = [
121- BSpline .basis_element (t [j : j + self .degree + 2 ])(x_col ) for j in range (len (t ) - self .degree - 1 )
122- ]
163+ def center_identification_using_decision_tree (X , y , task = "regression" , n_centers = 5 ):
164+ # Use DecisionTreeClassifier for classification tasks
165+ centers = []
166+ if task == "classification" :
167+ tree = DecisionTreeClassifier (max_leaf_nodes = n_centers + 1 )
168+ elif task == "regression" :
169+ tree = DecisionTreeRegressor (max_leaf_nodes = n_centers + 1 )
170+ else :
171+ raise ValueError ("Invalid task type. Choose 'regression' or 'classification'." )
172+ tree .fit (X , y )
173+ # Extract thresholds from the decision tree
174+ thresholds = tree .tree_ .threshold [tree .tree_ .threshold != - 2 ] # type: ignore
175+ centers .append (np .sort (thresholds ))
176+ return centers
123177
124- # Stack and append transformed features
125- transformed_features .append (np .vstack (spline_basis ).T )
126178
127- # Concatenate all transformed features
128- return np .hstack (transformed_features )
179+ class RBFExpansion (BaseEstimator , TransformerMixin ):
180+ def __init__ (
181+ self , n_centers = 10 , gamma : float = 1.0 , use_decision_tree = True , task : str = "regression" , strategy = "uniform"
182+ ):
183+ """
184+ Radial Basis Function Expansion.
185+
186+ Parameters:
187+ - n_centers: Number of RBF centers.
188+ - gamma: Width of the RBF kernel.
189+ - use_decision_tree: If True, use a decision tree to determine RBF centers.
190+ - task: Task type, 'regression' or 'classification'.
191+ - strategy: If 'uniform', centers are uniformly spaced. If 'quantile', centers are
192+ determined by data quantile.
193+ """
194+ self .n_centers = n_centers
195+ self .gamma = gamma
196+ self .use_decision_tree = use_decision_tree
197+ self .strategy = strategy
198+ self .task = task
199+
200+ if self .strategy not in ["uniform" , "quantile" ]:
201+ raise ValueError ("Invalid strategy. Choose 'uniform' or 'quantile'." )
202+
203+ def fit (self , X , y = None ):
204+ X = check_array (X )
205+
206+ if self .use_decision_tree and y is None :
207+ raise ValueError ("Target variable 'y' must be provided when use_decision_tree=True." )
208+
209+ if self .use_decision_tree :
210+ self .centers_ = center_identification_using_decision_tree (X , y , self .task , self .n_centers )
211+ self .centers_ = np .vstack (self .centers_ )
129212 else :
130- if self .use_decision_tree :
131- knots = np .vstack (self .knots ).T
132- transformer = SplineTransformer (
133- n_knots = self .n_knots , degree = self .degree , include_bias = False , knots = knots
134- )
135- else :
136- if self .strategy == "quantile" :
137- transformer = SplineTransformer (
138- n_knots = self .n_knots , degree = self .degree , include_bias = False , knots = "quantile"
139- )
140- elif self .strategy == "uniform" :
141- transformer = SplineTransformer (
142- n_knots = self .n_knots , degree = self .degree , include_bias = False , knots = "uniform"
143- )
144- else :
145- raise ValueError ("Invalid strategy for knot location calculation. Choose 'quantile' or 'uniform'." )
146-
147- return transformer .fit_transform (X )
213+ # Compute centers
214+ if self .strategy == "quantile" :
215+ self .centers_ = np .percentile (X , np .linspace (0 , 100 , self .n_centers ), axis = 0 )
216+ elif self .strategy == "uniform" :
217+ self .centers_ = np .linspace (X .min (axis = 0 ), X .max (axis = 0 ), self .n_centers )
218+
219+ # Compute gamma if not provided
220+ # if self.gamma is None:
221+ # dists = pairwise_distances(self.centers_)
222+ # self.gamma = 1 / (2 * np.mean(dists[dists > 0]) ** 2) # Mean pairwise distance
223+ return self
224+
225+ def transform (self , X ):
226+ X = check_array (X )
227+ transformed = []
228+ self .centers_ = np .array (self .centers_ )
229+ for center in self .centers_ .T :
230+ rbf_features = np .exp (- self .gamma * (X - center ) ** 2 ) # type: ignore
231+ transformed .append (rbf_features )
232+ return np .hstack (transformed )
233+
234+
235+ class SigmoidExpansion (BaseEstimator , TransformerMixin ):
236+ def __init__ (
237+ self , n_centers = 10 , scale : float = 1.0 , use_decision_tree = True , task : str = "regression" , strategy = "uniform"
238+ ):
239+ """
240+ Sigmoid Basis Expansion.
241+
242+ Parameters:
243+ - n_centers: Number of sigmoid centers.
244+ - scale: Scale parameter for sigmoid function.
245+ - use_decision_tree: If True, use a decision tree to determine sigmoid centers.
246+ - task: Task type, 'regression' or 'classification'.
247+ - strategy: If 'uniform', centers are uniformly spaced. If 'quantile', centers are
248+ determined by data quantile.
249+ """
250+ self .n_centers = n_centers
251+ self .scale = scale
252+ self .use_decision_tree = use_decision_tree
253+ self .strategy = strategy
254+ self .task = task
255+
256+ def fit (self , X , y = None ):
257+ X = check_array (X )
258+
259+ if self .use_decision_tree and y is None :
260+ raise ValueError ("Target variable 'y' must be provided when use_decision_tree=True." )
261+
262+ if self .use_decision_tree :
263+ self .centers_ = center_identification_using_decision_tree (X , y , self .task , self .n_centers )
264+ self .centers_ = np .vstack (self .centers_ )
265+ else :
266+ # Compute centers
267+ if self .strategy == "quantile" :
268+ self .centers_ = np .percentile (X , np .linspace (0 , 100 , self .n_centers ), axis = 0 )
269+ elif self .strategy == "uniform" :
270+ self .centers_ = np .linspace (X .min (axis = 0 ), X .max (axis = 0 ), self .n_centers )
271+
272+ # Compute gamma if not provided
273+ # if self.gamma is None:
274+ # dists = pairwise_distances(self.centers_)
275+ # self.gamma = 1 / (2 * np.mean(dists[dists > 0]) ** 2) # Mean pairwise distance
276+ return self
277+
278+ def transform (self , X ):
279+ X = check_array (X )
280+ transformed = []
281+
282+ self .centers_ = np .array (self .centers_ )
283+ for center in self .centers_ .T :
284+ sigmoid_features = 1 / (1 + np .exp (- (X - center ) / self .scale ))
285+ transformed .append (sigmoid_features )
286+ return np .hstack (transformed )
0 commit comments