ensure data kitchen thread count is propagated to pipeline

This commit is contained in:
robcaulk
2023-06-08 12:33:08 +02:00
parent 88337b6c5e
commit 33b028b104
7 changed files with 12 additions and 32 deletions

View File

@@ -118,8 +118,8 @@ class BaseReinforcementLearningModel(IFreqaiModel):
# normalize all data based on train_dataset only # normalize all data based on train_dataset only
prices_train, prices_test = self.build_ohlc_price_dataframes(dk.data_dictionary, pair, dk) prices_train, prices_test = self.build_ohlc_price_dataframes(dk.data_dictionary, pair, dk)
dk.feature_pipeline = self.define_data_pipeline() dk.feature_pipeline = self.define_data_pipeline(threads=dk.thread_count)
dk.label_pipeline = self.define_label_pipeline() dk.label_pipeline = self.define_label_pipeline(threads=dk.thread_count)
(dd["train_features"], (dd["train_features"],
dd["train_labels"], dd["train_labels"],

View File

@@ -53,7 +53,7 @@ class BaseClassifierModel(IFreqaiModel):
dd = dk.make_train_test_datasets(features_filtered, labels_filtered) dd = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live: if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live:
dk.fit_labels() dk.fit_labels()
dk.feature_pipeline = self.define_data_pipeline() dk.feature_pipeline = self.define_data_pipeline(threads=dk.thread_count)
(dd["train_features"], (dd["train_features"],
dd["train_labels"], dd["train_labels"],

View File

@@ -189,7 +189,7 @@ class BasePyTorchClassifier(BasePyTorchModel):
if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live: if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live:
dk.fit_labels() dk.fit_labels()
dk.feature_pipeline = self.define_data_pipeline() dk.feature_pipeline = self.define_data_pipeline(threads=dk.thread_count)
(dd["train_features"], (dd["train_features"],
dd["train_labels"], dd["train_labels"],

View File

@@ -85,8 +85,8 @@ class BasePyTorchRegressor(BasePyTorchModel):
dd = dk.make_train_test_datasets(features_filtered, labels_filtered) dd = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live: if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live:
dk.fit_labels() dk.fit_labels()
dk.feature_pipeline = self.define_data_pipeline() dk.feature_pipeline = self.define_data_pipeline(threads=dk.thread_count)
dk.label_pipeline = self.define_label_pipeline() dk.label_pipeline = self.define_label_pipeline(threads=dk.thread_count)
dd["train_labels"], _, _ = dk.label_pipeline.fit_transform(dd["train_labels"]) dd["train_labels"], _, _ = dk.label_pipeline.fit_transform(dd["train_labels"])
dd["test_labels"], _, _ = dk.label_pipeline.transform(dd["test_labels"]) dd["test_labels"], _, _ = dk.label_pipeline.transform(dd["test_labels"])

View File

@@ -52,8 +52,8 @@ class BaseRegressionModel(IFreqaiModel):
dd = dk.make_train_test_datasets(features_filtered, labels_filtered) dd = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live: if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live:
dk.fit_labels() dk.fit_labels()
dk.feature_pipeline = self.define_data_pipeline() dk.feature_pipeline = self.define_data_pipeline(threads=dk.thread_count)
dk.label_pipeline = self.define_label_pipeline() dk.label_pipeline = self.define_label_pipeline(threads=dk.thread_count)
(dd["train_features"], (dd["train_features"],
dd["train_labels"], dd["train_labels"],

View File

@@ -507,7 +507,7 @@ class IFreqaiModel(ABC):
"feature_engineering_* functions" "feature_engineering_* functions"
) )
def define_data_pipeline(self) -> Pipeline: def define_data_pipeline(self, threads=-1) -> Pipeline:
ft_params = self.freqai_info["feature_parameters"] ft_params = self.freqai_info["feature_parameters"]
feature_pipeline = Pipeline([ feature_pipeline = Pipeline([
('const', ds.VarianceThreshold(threshold=0)), ('const', ds.VarianceThreshold(threshold=0)),
@@ -526,10 +526,10 @@ class IFreqaiModel(ABC):
di = ft_params.get("DI_threshold", 0) di = ft_params.get("DI_threshold", 0)
if di: if di:
feature_pipeline.append(('di', ds.DissimilarityIndex(di_threshold=di))) feature_pipeline.append(('di', ds.DissimilarityIndex(di_threshold=di, n_jobs=threads)))
if ft_params.get("use_DBSCAN_to_remove_outliers", False): if ft_params.get("use_DBSCAN_to_remove_outliers", False):
feature_pipeline.append(('dbscan', ds.DBSCAN())) feature_pipeline.append(('dbscan', ds.DBSCAN(n_jobs=threads)))
sigma = self.freqai_info["feature_parameters"].get('noise_standard_deviation', 0) sigma = self.freqai_info["feature_parameters"].get('noise_standard_deviation', 0)
if sigma: if sigma:
@@ -540,7 +540,7 @@ class IFreqaiModel(ABC):
return feature_pipeline return feature_pipeline
def define_label_pipeline(self) -> Pipeline: def define_label_pipeline(self, threads=-1) -> Pipeline:
label_pipeline = Pipeline([ label_pipeline = Pipeline([
('scaler', SKLearnWrapper(MinMaxScaler(feature_range=(-1, 1)))) ('scaler', SKLearnWrapper(MinMaxScaler(feature_range=(-1, 1))))

View File

@@ -52,23 +52,3 @@ class XGBoostRegressor(BaseRegressionModel):
model.set_params(callbacks=[]) model.set_params(callbacks=[])
return model return model
# def define_data_pipeline(self, dk: FreqaiDataKitchen) -> None:
# """
# User defines their custom eature pipeline here (if they wish)
# """
# dk.feature_pipeline = Pipeline([
# ('qt', SKLearnWrapper(QuantileTransformer(output_distribution='normal')))
# ])
# return
# def define_label_pipeline(self, dk: FreqaiDataKitchen) -> None:
# """
# User defines their custom label pipeline here (if they wish)
# """
# dk.label_pipeline = Pipeline([
# ('qt', SKLearnWrapper(QuantileTransformer(output_distribution='normal')))
# ])
# return