update docs, improve the interaction with define_data_pipeline

This commit is contained in:
robcaulk
2023-06-07 18:26:49 +02:00
parent dc577d2a1a
commit 135aaa2be2
7 changed files with 114 additions and 108 deletions

View File

@@ -226,8 +226,10 @@ FreqAI uses the the [`DataSieve`](https://github.com/emergentmethods/datasieve)
This means that users can use/customize any SKLearn modules and easily add them to their FreqAI data pipeline. By default, FreqAI builds the following pipeline:
```py
from datasieve.transforms import SKLearnWrapper, DissimilarityIndex
from datasieve.pipeline import Pipeline
dk.feature_pipeline = Pipeline([
('scaler', ds.DataSieveMinMaxScaler(feature_range=(-1, 1))),
('scaler', SKLearnWrapper(MinMaxScaler(feature_range=(-1, 1))),
('di', ds.DissimilarityIndex(di_threshold=1)),
])
```
@@ -235,10 +237,12 @@ dk.feature_pipeline = Pipeline([
But users will find that they can add PCA and other steps just by changing their configuration settings, for example, if you add `"principal_component_analysis": true` to the `feature_parameters` dict in the `freqai` config, then FreqAI will add the PCA step for you resulting in the following pipeline:
```py
from datasieve.transforms import SKLearnWrapper, DissimilarityIndex, PCA
from datasieve.pipeline import Pipeline
dk.feature_pipeline = Pipeline([
('scaler', ds.DataSieveMinMaxScaler(feature_range=(-1, 1))),
('pca', ds.DataSievePCA()),
('post-pca-scaler', ds.DataSieveMinMaxScaler(feature_range=(-1, 1)))
('scaler', SKLearnWrapper(MinMaxScaler(feature_range=(-1, 1)))),
('pca', ds.PCA()),
('post-pca-scaler', ds.MinMaxScaler(feature_range=(-1, 1)))
('di', ds.DissimilarityIndex(di_threshold=1)),
])
```
@@ -247,16 +251,19 @@ The same concept follows if users activate other config options like `"use_SVM_t
## Customizing the pipeline
Users are encouraged to customize the data pipeline to their needs by building their own data pipeline. This can be done by overriding `define_data_pipeline` in their `IFreqaiModel`. For example:
Users are encouraged to customize the data pipeline to their needs by building their own data pipeline. This can be done by simply setting `dk.feature_pipeline` to their desired `Pipeline` object inside their `IFreqaiModel` `train()` function, or if they prefer not to touch the `train()` function, they can override `define_data_pipeline` in their `IFreqaiModel`:
```py
from datasieve.transforms import SKLearnWrapper, DissimilarityIndex
from datasieve.pipeline import Pipeline
from sklearn.preprocessing import QuantileTransformer
def define_data_pipeline(self, dk: FreqaiDataKitchen) -> None:
"""
User defines their custom eature pipeline here (if they wish)
"""
from sklearn.preprocessing import QuantileTransformer
dk.feature_pipeline = Pipeline([
('qt', SKLearnWrapper(QuantileTransformer(output_distribution='normal')))
('qt', SKLearnWrapper(QuantileTransformer(output_distribution='normal'))),
('di', ds.DissimilarityIndex(di_threshold=1)
])
return

View File

@@ -110,40 +110,37 @@ class BaseReinforcementLearningModel(IFreqaiModel):
training_filter=True,
)
d: Dict[str, Any] = dk.make_train_test_datasets(
dd: Dict[str, Any] = dk.make_train_test_datasets(
features_filtered, labels_filtered)
self.df_raw = copy.deepcopy(d["train_features"])
self.df_raw = copy.deepcopy(dd["train_features"])
dk.fit_labels() # FIXME useless for now, but just satiating append methods
# normalize all data based on train_dataset only
prices_train, prices_test = self.build_ohlc_price_dataframes(dk.data_dictionary, pair, dk)
self.define_data_pipeline(dk)
self.define_label_pipeline(dk)
dk.feature_pipeline = self.define_data_pipeline()
dk.label_pipeline = self.define_label_pipeline()
# d["train_labels"], _, _ = dk.label_pipeline.fit_transform(d["train_labels"])
# d["test_labels"], _, _ = dk.label_pipeline.transform(d["test_labels"])
(dd["train_features"],
dd["train_labels"],
dd["train_weights"]) = dk.feature_pipeline.fit_transform(dd["train_features"],
dd["train_labels"],
dd["train_weights"])
(d["train_features"],
d["train_labels"],
d["train_weights"]) = dk.feature_pipeline.fit_transform(d["train_features"],
d["train_labels"],
d["train_weights"])
(d["test_features"],
d["test_labels"],
d["test_weights"]) = dk.feature_pipeline.transform(d["test_features"],
d["test_labels"],
d["test_weights"])
(dd["test_features"],
dd["test_labels"],
dd["test_weights"]) = dk.feature_pipeline.transform(dd["test_features"],
dd["test_labels"],
dd["test_weights"])
logger.info(
f'Training model on {len(dk.data_dictionary["train_features"].columns)}'
f' features and {len(d["train_features"])} data points'
f' features and {len(dd["train_features"])} data points'
)
self.set_train_and_eval_environments(d, prices_train, prices_test, dk)
self.set_train_and_eval_environments(dd, prices_train, prices_test, dk)
model = self.fit(d, dk)
model = self.fit(dd, dk)
logger.info(f"--------------------done training {pair}--------------------")

View File

@@ -50,30 +50,29 @@ class BaseClassifierModel(IFreqaiModel):
logger.info(f"-------------------- Training on data from {start_date} to "
f"{end_date} --------------------")
# split data into train/test data.
d = dk.make_train_test_datasets(features_filtered, labels_filtered)
dd = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live:
dk.fit_labels()
self.define_data_pipeline(dk)
self.define_label_pipeline(dk)
dk.feature_pipeline = self.define_data_pipeline()
(d["train_features"],
d["train_labels"],
d["train_weights"]) = dk.feature_pipeline.fit_transform(d["train_features"],
d["train_labels"],
d["train_weights"])
(dd["train_features"],
dd["train_labels"],
dd["train_weights"]) = dk.feature_pipeline.fit_transform(dd["train_features"],
dd["train_labels"],
dd["train_weights"])
(d["test_features"],
d["test_labels"],
d["test_weights"]) = dk.feature_pipeline.transform(d["test_features"],
d["test_labels"],
d["test_weights"])
(dd["test_features"],
dd["test_labels"],
dd["test_weights"]) = dk.feature_pipeline.transform(dd["test_features"],
dd["test_labels"],
dd["test_weights"])
logger.info(
f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
)
logger.info(f"Training model on {len(d['train_features'])} data points")
logger.info(f"Training model on {len(dd['train_features'])} data points")
model = self.fit(d, dk)
model = self.fit(dd, dk)
end_time = time()

View File

@@ -36,6 +36,7 @@ class BasePyTorchClassifier(BasePyTorchModel):
return dataframe
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.class_name_to_index = None
@@ -184,31 +185,30 @@ class BasePyTorchClassifier(BasePyTorchModel):
)
# split data into train/test data.
d = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions", 0) or not self.live:
dd = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live:
dk.fit_labels()
d["train_labels"], _, _ = dk.label_pipeline.fit_transform(d["train_labels"])
d["test_labels"], _, _ = dk.label_pipeline.transform(d["test_labels"])
dk.feature_pipeline = self.define_data_pipeline()
(d["train_features"],
d["train_labels"],
d["train_weights"]) = dk.feature_pipeline.fit_transform(d["train_features"],
d["train_labels"],
d["train_weights"])
(dd["train_features"],
dd["train_labels"],
dd["train_weights"]) = dk.feature_pipeline.fit_transform(dd["train_features"],
dd["train_labels"],
dd["train_weights"])
(d["test_features"],
d["test_labels"],
d["test_weights"]) = dk.feature_pipeline.transform(d["test_features"],
d["test_labels"],
d["test_weights"])
(dd["test_features"],
dd["test_labels"],
dd["test_weights"]) = dk.feature_pipeline.transform(dd["test_features"],
dd["test_labels"],
dd["test_weights"])
logger.info(
f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
)
logger.info(f"Training model on {len(d['train_features'])} data points")
logger.info(f"Training model on {len(dd['train_features'])} data points")
model = self.fit(d, dk)
model = self.fit(dd, dk)
end_time = time()
logger.info(f"-------------------- Done training {pair} "

View File

@@ -18,6 +18,7 @@ class BasePyTorchRegressor(BasePyTorchModel):
A PyTorch implementation of a regressor.
User must implement fit method
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
@@ -81,34 +82,33 @@ class BasePyTorchRegressor(BasePyTorchModel):
)
# split data into train/test data.
d = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions", 0) or not self.live:
dd = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live:
dk.fit_labels()
dk.feature_pipeline = self.define_data_pipeline()
dk.label_pipeline = self.define_label_pipeline()
self.define_data_pipeline(dk)
self.define_label_pipeline(dk)
dd["train_labels"], _, _ = dk.label_pipeline.fit_transform(dd["train_labels"])
dd["test_labels"], _, _ = dk.label_pipeline.transform(dd["test_labels"])
d["train_labels"], _, _ = dk.label_pipeline.fit_transform(d["train_labels"])
d["test_labels"], _, _ = dk.label_pipeline.transform(d["test_labels"])
(dd["train_features"],
dd["train_labels"],
dd["train_weights"]) = dk.feature_pipeline.fit_transform(dd["train_features"],
dd["train_labels"],
dd["train_weights"])
(d["train_features"],
d["train_labels"],
d["train_weights"]) = dk.feature_pipeline.fit_transform(d["train_features"],
d["train_labels"],
d["train_weights"])
(d["test_features"],
d["test_labels"],
d["test_weights"]) = dk.feature_pipeline.transform(d["test_features"],
d["test_labels"],
d["test_weights"])
(dd["test_features"],
dd["test_labels"],
dd["test_weights"]) = dk.feature_pipeline.transform(dd["test_features"],
dd["test_labels"],
dd["test_weights"])
logger.info(
f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
)
logger.info(f"Training model on {len(d['train_features'])} data points")
logger.info(f"Training model on {len(dd['train_features'])} data points")
model = self.fit(d, dk)
model = self.fit(dd, dk)
end_time = time()
logger.info(f"-------------------- Done training {pair} "

View File

@@ -49,34 +49,33 @@ class BaseRegressionModel(IFreqaiModel):
logger.info(f"-------------------- Training on data from {start_date} to "
f"{end_date} --------------------")
# split data into train/test data.
d = dk.make_train_test_datasets(features_filtered, labels_filtered)
dd = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live:
dk.fit_labels()
dk.feature_pipeline = self.define_data_pipeline()
dk.label_pipeline = self.define_label_pipeline()
self.define_data_pipeline(dk)
self.define_label_pipeline(dk)
(dd["train_features"],
dd["train_labels"],
dd["train_weights"]) = dk.feature_pipeline.fit_transform(dd["train_features"],
dd["train_labels"],
dd["train_weights"])
(d["train_features"],
d["train_labels"],
d["train_weights"]) = dk.feature_pipeline.fit_transform(d["train_features"],
d["train_labels"],
d["train_weights"])
(dd["test_features"],
dd["test_labels"],
dd["test_weights"]) = dk.feature_pipeline.transform(dd["test_features"],
dd["test_labels"],
dd["test_weights"])
(d["test_features"],
d["test_labels"],
d["test_weights"]) = dk.feature_pipeline.transform(d["test_features"],
d["test_labels"],
d["test_weights"])
d["train_labels"], _, _ = dk.label_pipeline.fit_transform(d["train_labels"])
d["test_labels"], _, _ = dk.label_pipeline.transform(d["test_labels"])
dd["train_labels"], _, _ = dk.label_pipeline.fit_transform(dd["train_labels"])
dd["test_labels"], _, _ = dk.label_pipeline.transform(dd["test_labels"])
logger.info(
f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
)
logger.info(f"Training model on {len(d['train_features'])} data points")
logger.info(f"Training model on {len(dd['train_features'])} data points")
model = self.fit(d, dk)
model = self.fit(dd, dk)
end_time = time()

View File

@@ -507,43 +507,47 @@ class IFreqaiModel(ABC):
"feature_engineering_* functions"
)
def define_data_pipeline(self, dk: FreqaiDataKitchen) -> None:
def define_data_pipeline(self) -> Pipeline:
ft_params = self.freqai_info["feature_parameters"]
dk.feature_pipeline = Pipeline([
feature_pipeline = Pipeline([
('const', ds.VarianceThreshold(threshold=0)),
('scaler', SKLearnWrapper(MinMaxScaler(feature_range=(-1, 1))))
])
if ft_params.get("principal_component_analysis", False):
dk.feature_pipeline.append(('pca', ds.PCA()))
dk.feature_pipeline.append(('post-pca-scaler',
feature_pipeline.append(('pca', ds.PCA()))
feature_pipeline.append(('post-pca-scaler',
SKLearnWrapper(MinMaxScaler(feature_range=(-1, 1)))))
if ft_params.get("use_SVM_to_remove_outliers", False):
svm_params = ft_params.get(
"svm_params", {"shuffle": False, "nu": 0.01})
dk.feature_pipeline.append(('svm', ds.SVMOutlierExtractor(**svm_params)))
feature_pipeline.append(('svm', ds.SVMOutlierExtractor(**svm_params)))
di = ft_params.get("DI_threshold", 0)
if di:
dk.feature_pipeline.append(('di', ds.DissimilarityIndex(di_threshold=di)))
feature_pipeline.append(('di', ds.DissimilarityIndex(di_threshold=di)))
if ft_params.get("use_DBSCAN_to_remove_outliers", False):
dk.feature_pipeline.append(('dbscan', ds.DBSCAN()))
feature_pipeline.append(('dbscan', ds.DBSCAN()))
sigma = self.freqai_info["feature_parameters"].get('noise_standard_deviation', 0)
if sigma:
dk.feature_pipeline.append(('noise', ds.Noise(sigma=sigma)))
feature_pipeline.append(('noise', ds.Noise(sigma=sigma)))
dk.feature_pipeline.fitparams = dk.feature_pipeline._validate_fitparams(
{}, dk.feature_pipeline.steps)
feature_pipeline.fitparams = feature_pipeline._validate_fitparams(
{}, feature_pipeline.steps)
def define_label_pipeline(self, dk: FreqaiDataKitchen) -> None:
return feature_pipeline
dk.label_pipeline = Pipeline([
def define_label_pipeline(self) -> Pipeline:
label_pipeline = Pipeline([
('scaler', SKLearnWrapper(MinMaxScaler(feature_range=(-1, 1))))
])
return label_pipeline
def model_exists(self, dk: FreqaiDataKitchen) -> bool:
"""
Given a pair and path, check if a model already exists