From 31e19add2745ec90c66adbd8a5adedbcb1ac7cf3 Mon Sep 17 00:00:00 2001 From: robcaulk Date: Fri, 26 May 2023 18:40:14 +0200 Subject: [PATCH] start transition toward outsourcing the data pipeline with objective of improving pipeline flexibility --- .../freqai/base_models/BasePyTorchModel.py | 75 +- .../freqai/base_models/BaseRegressionModel.py | 37 +- .../freqai/base_models/BaseTensorFlowModel.py | 70 -- freqtrade/freqai/data_drawer.py | 15 + freqtrade/freqai/data_kitchen.py | 843 +++++++++--------- freqtrade/freqai/freqai_interface.py | 28 + requirements-freqai.txt | 1 + tests/freqai/test_freqai_datakitchen.py | 96 +- 8 files changed, 579 insertions(+), 586 deletions(-) delete mode 100644 freqtrade/freqai/base_models/BaseTensorFlowModel.py diff --git a/freqtrade/freqai/base_models/BasePyTorchModel.py b/freqtrade/freqai/base_models/BasePyTorchModel.py index 82042d24c..21dc4e894 100644 --- a/freqtrade/freqai/base_models/BasePyTorchModel.py +++ b/freqtrade/freqai/base_models/BasePyTorchModel.py @@ -7,14 +7,15 @@ import torch from pandas import DataFrame from freqtrade.freqai.data_kitchen import FreqaiDataKitchen -from freqtrade.freqai.freqai_interface import IFreqaiModel +# from freqtrade.freqai.freqai_interface import IFreqaiModel +from freqtrade.freqai.base_models import BaseRegressionModel from freqtrade.freqai.torch.PyTorchDataConvertor import PyTorchDataConvertor logger = logging.getLogger(__name__) -class BasePyTorchModel(IFreqaiModel, ABC): +class BasePyTorchModel(BaseRegressionModel): """ Base class for PyTorch type models. User *must* inherit from this class and set fit() and predict() and @@ -29,50 +30,50 @@ class BasePyTorchModel(IFreqaiModel, ABC): self.splits = ["train", "test"] if test_size != 0 else ["train"] self.window_size = self.freqai_info.get("conv_width", 1) - def train( - self, unfiltered_df: DataFrame, pair: str, dk: FreqaiDataKitchen, **kwargs - ) -> Any: - """ - Filter the training data and train a model to it. Train makes heavy use of the datakitchen - for storing, saving, loading, and analyzing the data. - :param unfiltered_df: Full dataframe for the current training period - :return: - :model: Trained model which can be used to inference (self.predict) - """ + # def train( + # self, unfiltered_df: DataFrame, pair: str, dk: FreqaiDataKitchen, **kwargs + # ) -> Any: + # """ + # Filter the training data and train a model to it. Train makes heavy use of the datakitchen + # for storing, saving, loading, and analyzing the data. + # :param unfiltered_df: Full dataframe for the current training period + # :return: + # :model: Trained model which can be used to inference (self.predict) + # """ - logger.info(f"-------------------- Starting training {pair} --------------------") + # logger.info(f"-------------------- Starting training {pair} --------------------") - start_time = time() + # start_time = time() - features_filtered, labels_filtered = dk.filter_features( - unfiltered_df, - dk.training_features_list, - dk.label_list, - training_filter=True, - ) + # features_filtered, labels_filtered = dk.filter_features( + # unfiltered_df, + # dk.training_features_list, + # dk.label_list, + # training_filter=True, + # ) - # split data into train/test data. - data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered) - if not self.freqai_info.get("fit_live_predictions", 0) or not self.live: - dk.fit_labels() - # normalize all data based on train_dataset only - data_dictionary = dk.normalize_data(data_dictionary) + # # split data into train/test data. + # data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered) + # if not self.freqai_info.get("fit_live_predictions", 0) or not self.live: + # dk.fit_labels() + # # normalize all data based on train_dataset only + # data_dictionary = dk.normalize_data(data_dictionary) - # optional additional data cleaning/analysis - self.data_cleaning_train(dk) + # # optional additional data cleaning/analysis + # self.data_cleaning_train(dk) - logger.info( - f"Training model on {len(dk.data_dictionary['train_features'].columns)} features" - ) - logger.info(f"Training model on {len(data_dictionary['train_features'])} data points") + # logger.info( + # f"Training model on {len(dk.data_dictionary['train_features'].columns)} features" + # ) + # logger.info(f"Training model on {len(data_dictionary['train_features'])} data points") - model = self.fit(data_dictionary, dk) - end_time = time() + # model = self.fit(data_dictionary, dk) + # end_time = time() - logger.info(f"-------------------- Done training {pair} " - f"({end_time - start_time:.2f} secs) --------------------") + # logger.info(f"-------------------- Done training {pair} " + # f"({end_time - start_time:.2f} secs) --------------------") - return model + # return model @property @abstractmethod diff --git a/freqtrade/freqai/base_models/BaseRegressionModel.py b/freqtrade/freqai/base_models/BaseRegressionModel.py index 1f9b4f5a6..45660253e 100644 --- a/freqtrade/freqai/base_models/BaseRegressionModel.py +++ b/freqtrade/freqai/base_models/BaseRegressionModel.py @@ -49,21 +49,34 @@ class BaseRegressionModel(IFreqaiModel): logger.info(f"-------------------- Training on data from {start_date} to " f"{end_date} --------------------") # split data into train/test data. - data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered) + d = dk.make_train_test_datasets(features_filtered, labels_filtered) if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live: dk.fit_labels() - # normalize all data based on train_dataset only - data_dictionary = dk.normalize_data(data_dictionary) - # optional additional data cleaning/analysis - self.data_cleaning_train(dk) + self.define_data_pipeline(dk) + self.define_label_pipeline(dk) + + d["train_labels"], _, _ = dk.label_pipeline.fit_transform(d["train_labels"]) + d["test_labels"], _, _ = dk.label_pipeline.transform(d["test_labels"]) + + (d["train_features"], + d["train_labels"], + d["train_weights"]) = dk.pipeline.fit_transform(d["train_features"], + d["train_labels"], + d["train_weights"]) + + (d["test_features"], + d["test_labels"], + d["test_weights"]) = dk.pipeline.transform(d["test_features"], + d["test_labels"], + d["test_weights"]) logger.info( f"Training model on {len(dk.data_dictionary['train_features'].columns)} features" ) - logger.info(f"Training model on {len(data_dictionary['train_features'])} data points") + logger.info(f"Training model on {len(d['train_features'])} data points") - model = self.fit(data_dictionary, dk) + model = self.fit(d, dk) end_time = time() @@ -88,11 +101,11 @@ class BaseRegressionModel(IFreqaiModel): filtered_df, _ = dk.filter_features( unfiltered_df, dk.training_features_list, training_filter=False ) - filtered_df = dk.normalize_data_from_metadata(filtered_df) + # filtered_df = dk.normalize_data_from_metadata(filtered_df) dk.data_dictionary["prediction_features"] = filtered_df - # optional additional data cleaning/analysis - self.data_cleaning_predict(dk) + dk.data_dictionary["prediction_features"], outliers, _ = dk.pipeline.transform( + dk.data_dictionary["prediction_features"], outlier_check=True) predictions = self.model.predict(dk.data_dictionary["prediction_features"]) if self.CONV_WIDTH == 1: @@ -100,6 +113,8 @@ class BaseRegressionModel(IFreqaiModel): pred_df = DataFrame(predictions, columns=dk.label_list) - pred_df = dk.denormalize_labels_from_metadata(pred_df) + pred_df, _, _ = dk.label_pipeline.inverse_transform(pred_df) + dk.DI_values = dk.label_pipeline.get_step("di").di_values + dk.do_predict = outliers.to_numpy() return (pred_df, dk.do_predict) diff --git a/freqtrade/freqai/base_models/BaseTensorFlowModel.py b/freqtrade/freqai/base_models/BaseTensorFlowModel.py deleted file mode 100644 index b41ee0175..000000000 --- a/freqtrade/freqai/base_models/BaseTensorFlowModel.py +++ /dev/null @@ -1,70 +0,0 @@ -import logging -from time import time -from typing import Any - -from pandas import DataFrame - -from freqtrade.freqai.data_kitchen import FreqaiDataKitchen -from freqtrade.freqai.freqai_interface import IFreqaiModel - - -logger = logging.getLogger(__name__) - - -class BaseTensorFlowModel(IFreqaiModel): - """ - Base class for TensorFlow type models. - User *must* inherit from this class and set fit() and predict(). - """ - - def train( - self, unfiltered_df: DataFrame, pair: str, dk: FreqaiDataKitchen, **kwargs - ) -> Any: - """ - Filter the training data and train a model to it. Train makes heavy use of the datakitchen - for storing, saving, loading, and analyzing the data. - :param unfiltered_df: Full dataframe for the current training period - :param metadata: pair metadata from strategy. - :return: - :model: Trained model which can be used to inference (self.predict) - """ - - logger.info(f"-------------------- Starting training {pair} --------------------") - - start_time = time() - - # filter the features requested by user in the configuration file and elegantly handle NaNs - features_filtered, labels_filtered = dk.filter_features( - unfiltered_df, - dk.training_features_list, - dk.label_list, - training_filter=True, - ) - - start_date = unfiltered_df["date"].iloc[0].strftime("%Y-%m-%d") - end_date = unfiltered_df["date"].iloc[-1].strftime("%Y-%m-%d") - logger.info(f"-------------------- Training on data from {start_date} to " - f"{end_date} --------------------") - # split data into train/test data. - data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered) - if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live: - dk.fit_labels() - # normalize all data based on train_dataset only - data_dictionary = dk.normalize_data(data_dictionary) - - # optional additional data cleaning/analysis - self.data_cleaning_train(dk) - - logger.info( - f"Training model on {len(dk.data_dictionary['train_features'].columns)} features" - ) - logger.info(f"Training model on {len(data_dictionary['train_features'])} data points") - - model = self.fit(data_dictionary, dk) - - end_time = time() - - logger.info(f"-------------------- Done training {pair} " - f"({end_time - start_time:.2f} secs) --------------------") - - return model diff --git a/freqtrade/freqai/data_drawer.py b/freqtrade/freqai/data_drawer.py index b68a9dcad..9fdcc2d41 100644 --- a/freqtrade/freqai/data_drawer.py +++ b/freqtrade/freqai/data_drawer.py @@ -460,6 +460,13 @@ class FreqaiDataDrawer: with (save_path / f"{dk.model_filename}_metadata.json").open("w") as fp: rapidjson.dump(dk.data, fp, default=self.np_encoder, number_mode=rapidjson.NM_NATIVE) + # save the pipelines to pickle files + with (save_path / f"{dk.model_filename}_pipeline.pkl").open("wb") as fp: + cloudpickle.dump(dk.pipeline, fp) + + with (save_path / f"{dk.model_filename}_label_pipeline.pkl").open("wb") as fp: + cloudpickle.dump(dk.label_pipeline, fp) + # save the train data to file so we can check preds for area of applicability later dk.data_dictionary["train_features"].to_pickle( save_path / f"{dk.model_filename}_trained_df.pkl" @@ -482,6 +489,8 @@ class FreqaiDataDrawer: self.meta_data_dictionary[coin] = {} self.meta_data_dictionary[coin]["train_df"] = dk.data_dictionary["train_features"] self.meta_data_dictionary[coin]["meta_data"] = dk.data + self.meta_data_dictionary[coin]["pipeline"] = dk.pipeline + self.meta_data_dictionary[coin]["label_pipeline"] = dk.label_pipeline self.save_drawer_to_disk() return @@ -513,6 +522,8 @@ class FreqaiDataDrawer: if coin in self.meta_data_dictionary: dk.data = self.meta_data_dictionary[coin]["meta_data"] dk.data_dictionary["train_features"] = self.meta_data_dictionary[coin]["train_df"] + dk.pipeline = self.meta_data_dictionary[coin]["pipeline"] + dk.label_pipeline = self.meta_data_dictionary[coin]["label_pipeline"] else: with (dk.data_path / f"{dk.model_filename}_metadata.json").open("r") as fp: dk.data = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE) @@ -520,6 +531,10 @@ class FreqaiDataDrawer: dk.data_dictionary["train_features"] = pd.read_pickle( dk.data_path / f"{dk.model_filename}_trained_df.pkl" ) + with (dk.data_path / f"{dk.model_filename}_pipeline.pkl").open("rb") as fp: + dk.pipeline = cloudpickle.load(fp) + with (dk.data_path / f"{dk.model_filename}_label_pipeline.pkl").open("rb") as fp: + dk.label_pipeline = cloudpickle.load(fp) dk.training_features_list = dk.data["training_features_list"] dk.label_list = dk.data["label_list"] diff --git a/freqtrade/freqai/data_kitchen.py b/freqtrade/freqai/data_kitchen.py index 21b41db2d..adfeb8dd5 100644 --- a/freqtrade/freqai/data_kitchen.py +++ b/freqtrade/freqai/data_kitchen.py @@ -27,6 +27,7 @@ from freqtrade.exceptions import OperationalException from freqtrade.exchange import timeframe_to_seconds from freqtrade.strategy import merge_informative_pair from freqtrade.strategy.interface import IStrategy +from datasieve.pipeline import Pipeline SECONDS_IN_DAY = 86400 @@ -86,6 +87,8 @@ class FreqaiDataKitchen: self.keras: bool = self.freqai_config.get("keras", False) self.set_all_pairs() self.backtest_live_models = config.get("freqai_backtest_live_models", False) + self.pipeline = Pipeline() + self.label_pipeline = Pipeline() if not self.live: self.full_path = self.get_full_models_path(self.config) @@ -307,106 +310,106 @@ class FreqaiDataKitchen: return self.data_dictionary - def normalize_data(self, data_dictionary: Dict) -> Dict[Any, Any]: - """ - Normalize all data in the data_dictionary according to the training dataset - :param data_dictionary: dictionary containing the cleaned and - split training/test data/labels - :returns: - :data_dictionary: updated dictionary with standardized values. - """ + # def normalize_data(self, data_dictionary: Dict) -> Dict[Any, Any]: + # """ + # Normalize all data in the data_dictionary according to the training dataset + # :param data_dictionary: dictionary containing the cleaned and + # split training/test data/labels + # :returns: + # :data_dictionary: updated dictionary with standardized values. + # """ - # standardize the data by training stats - train_max = data_dictionary["train_features"].max() - train_min = data_dictionary["train_features"].min() - data_dictionary["train_features"] = ( - 2 * (data_dictionary["train_features"] - train_min) / (train_max - train_min) - 1 - ) - data_dictionary["test_features"] = ( - 2 * (data_dictionary["test_features"] - train_min) / (train_max - train_min) - 1 - ) + # # standardize the data by training stats + # train_max = data_dictionary["train_features"].max() + # train_min = data_dictionary["train_features"].min() + # data_dictionary["train_features"] = ( + # 2 * (data_dictionary["train_features"] - train_min) / (train_max - train_min) - 1 + # ) + # data_dictionary["test_features"] = ( + # 2 * (data_dictionary["test_features"] - train_min) / (train_max - train_min) - 1 + # ) - for item in train_max.keys(): - self.data[item + "_max"] = train_max[item] - self.data[item + "_min"] = train_min[item] + # for item in train_max.keys(): + # self.data[item + "_max"] = train_max[item] + # self.data[item + "_min"] = train_min[item] - for item in data_dictionary["train_labels"].keys(): - if data_dictionary["train_labels"][item].dtype == object: - continue - train_labels_max = data_dictionary["train_labels"][item].max() - train_labels_min = data_dictionary["train_labels"][item].min() - data_dictionary["train_labels"][item] = ( - 2 - * (data_dictionary["train_labels"][item] - train_labels_min) - / (train_labels_max - train_labels_min) - - 1 - ) - if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0: - data_dictionary["test_labels"][item] = ( - 2 - * (data_dictionary["test_labels"][item] - train_labels_min) - / (train_labels_max - train_labels_min) - - 1 - ) + # for item in data_dictionary["train_labels"].keys(): + # if data_dictionary["train_labels"][item].dtype == object: + # continue + # train_labels_max = data_dictionary["train_labels"][item].max() + # train_labels_min = data_dictionary["train_labels"][item].min() + # data_dictionary["train_labels"][item] = ( + # 2 + # * (data_dictionary["train_labels"][item] - train_labels_min) + # / (train_labels_max - train_labels_min) + # - 1 + # ) + # if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0: + # data_dictionary["test_labels"][item] = ( + # 2 + # * (data_dictionary["test_labels"][item] - train_labels_min) + # / (train_labels_max - train_labels_min) + # - 1 + # ) - self.data[f"{item}_max"] = train_labels_max - self.data[f"{item}_min"] = train_labels_min - return data_dictionary + # self.data[f"{item}_max"] = train_labels_max + # self.data[f"{item}_min"] = train_labels_min + # return data_dictionary - def normalize_single_dataframe(self, df: DataFrame) -> DataFrame: + # def normalize_single_dataframe(self, df: DataFrame) -> DataFrame: - train_max = df.max() - train_min = df.min() - df = ( - 2 * (df - train_min) / (train_max - train_min) - 1 - ) + # train_max = df.max() + # train_min = df.min() + # df = ( + # 2 * (df - train_min) / (train_max - train_min) - 1 + # ) - for item in train_max.keys(): - self.data[item + "_max"] = train_max[item] - self.data[item + "_min"] = train_min[item] + # for item in train_max.keys(): + # self.data[item + "_max"] = train_max[item] + # self.data[item + "_min"] = train_min[item] - return df + # return df - def normalize_data_from_metadata(self, df: DataFrame) -> DataFrame: - """ - Normalize a set of data using the mean and standard deviation from - the associated training data. - :param df: Dataframe to be standardized - """ + # def normalize_data_from_metadata(self, df: DataFrame) -> DataFrame: + # """ + # Normalize a set of data using the mean and standard deviation from + # the associated training data. + # :param df: Dataframe to be standardized + # """ - train_max = [None] * len(df.keys()) - train_min = [None] * len(df.keys()) + # train_max = [None] * len(df.keys()) + # train_min = [None] * len(df.keys()) - for i, item in enumerate(df.keys()): - train_max[i] = self.data[f"{item}_max"] - train_min[i] = self.data[f"{item}_min"] + # for i, item in enumerate(df.keys()): + # train_max[i] = self.data[f"{item}_max"] + # train_min[i] = self.data[f"{item}_min"] - train_max_series = pd.Series(train_max, index=df.keys()) - train_min_series = pd.Series(train_min, index=df.keys()) + # train_max_series = pd.Series(train_max, index=df.keys()) + # train_min_series = pd.Series(train_min, index=df.keys()) - df = ( - 2 * (df - train_min_series) / (train_max_series - train_min_series) - 1 - ) + # df = ( + # 2 * (df - train_min_series) / (train_max_series - train_min_series) - 1 + # ) - return df + # return df - def denormalize_labels_from_metadata(self, df: DataFrame) -> DataFrame: - """ - Denormalize a set of data using the mean and standard deviation from - the associated training data. - :param df: Dataframe of predictions to be denormalized - """ + # def denormalize_labels_from_metadata(self, df: DataFrame) -> DataFrame: + # """ + # Denormalize a set of data using the mean and standard deviation from + # the associated training data. + # :param df: Dataframe of predictions to be denormalized + # """ - for label in df.columns: - if df[label].dtype == object or label in self.unique_class_list: - continue - df[label] = ( - (df[label] + 1) - * (self.data[f"{label}_max"] - self.data[f"{label}_min"]) - / 2 - ) + self.data[f"{label}_min"] + # for label in df.columns: + # if df[label].dtype == object or label in self.unique_class_list: + # continue + # df[label] = ( + # (df[label] + 1) + # * (self.data[f"{label}_max"] - self.data[f"{label}_min"]) + # / 2 + # ) + self.data[f"{label}_min"] - return df + # return df def split_timerange( self, tr: str, train_split: int = 28, bt_split: float = 7 @@ -501,398 +504,398 @@ class FreqaiDataKitchen: return df_predictions - def principal_component_analysis(self) -> None: - """ - Performs Principal Component Analysis on the data for dimensionality reduction - and outlier detection (see self.remove_outliers()) - No parameters or returns, it acts on the data_dictionary held by the DataHandler. - """ + # def principal_component_analysis(self) -> None: + # """ + # Performs Principal Component Analysis on the data for dimensionality reduction + # and outlier detection (see self.remove_outliers()) + # No parameters or returns, it acts on the data_dictionary held by the DataHandler. + # """ - from sklearn.decomposition import PCA # avoid importing if we dont need it + # from sklearn.decomposition import PCA # avoid importing if we dont need it - pca = PCA(0.999) - pca = pca.fit(self.data_dictionary["train_features"]) - n_keep_components = pca.n_components_ - self.data["n_kept_components"] = n_keep_components - n_components = self.data_dictionary["train_features"].shape[1] - logger.info("reduced feature dimension by %s", n_components - n_keep_components) - logger.info("explained variance %f", np.sum(pca.explained_variance_ratio_)) + # pca = PCA(0.999) + # pca = pca.fit(self.data_dictionary["train_features"]) + # n_keep_components = pca.n_components_ + # self.data["n_kept_components"] = n_keep_components + # n_components = self.data_dictionary["train_features"].shape[1] + # logger.info("reduced feature dimension by %s", n_components - n_keep_components) + # logger.info("explained variance %f", np.sum(pca.explained_variance_ratio_)) - train_components = pca.transform(self.data_dictionary["train_features"]) - self.data_dictionary["train_features"] = pd.DataFrame( - data=train_components, - columns=["PC" + str(i) for i in range(0, n_keep_components)], - index=self.data_dictionary["train_features"].index, - ) - # normalsing transformed training features - self.data_dictionary["train_features"] = self.normalize_single_dataframe( - self.data_dictionary["train_features"]) + # train_components = pca.transform(self.data_dictionary["train_features"]) + # self.data_dictionary["train_features"] = pd.DataFrame( + # data=train_components, + # columns=["PC" + str(i) for i in range(0, n_keep_components)], + # index=self.data_dictionary["train_features"].index, + # ) + # # normalsing transformed training features + # self.data_dictionary["train_features"] = self.normalize_single_dataframe( + # self.data_dictionary["train_features"]) - # keeping a copy of the non-transformed features so we can check for errors during - # model load from disk - self.data["training_features_list_raw"] = copy.deepcopy(self.training_features_list) - self.training_features_list = self.data_dictionary["train_features"].columns + # # keeping a copy of the non-transformed features so we can check for errors during + # # model load from disk + # self.data["training_features_list_raw"] = copy.deepcopy(self.training_features_list) + # self.training_features_list = self.data_dictionary["train_features"].columns - if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0: - test_components = pca.transform(self.data_dictionary["test_features"]) - self.data_dictionary["test_features"] = pd.DataFrame( - data=test_components, - columns=["PC" + str(i) for i in range(0, n_keep_components)], - index=self.data_dictionary["test_features"].index, - ) - # normalise transformed test feature to transformed training features - self.data_dictionary["test_features"] = self.normalize_data_from_metadata( - self.data_dictionary["test_features"]) + # if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0: + # test_components = pca.transform(self.data_dictionary["test_features"]) + # self.data_dictionary["test_features"] = pd.DataFrame( + # data=test_components, + # columns=["PC" + str(i) for i in range(0, n_keep_components)], + # index=self.data_dictionary["test_features"].index, + # ) + # # normalise transformed test feature to transformed training features + # self.data_dictionary["test_features"] = self.normalize_data_from_metadata( + # self.data_dictionary["test_features"]) - self.data["n_kept_components"] = n_keep_components - self.pca = pca + # self.data["n_kept_components"] = n_keep_components + # self.pca = pca - logger.info(f"PCA reduced total features from {n_components} to {n_keep_components}") + # logger.info(f"PCA reduced total features from {n_components} to {n_keep_components}") - if not self.data_path.is_dir(): - self.data_path.mkdir(parents=True, exist_ok=True) + # if not self.data_path.is_dir(): + # self.data_path.mkdir(parents=True, exist_ok=True) - return None + # return None - def pca_transform(self, filtered_dataframe: DataFrame) -> None: - """ - Use an existing pca transform to transform data into components - :param filtered_dataframe: DataFrame = the cleaned dataframe - """ - pca_components = self.pca.transform(filtered_dataframe) - self.data_dictionary["prediction_features"] = pd.DataFrame( - data=pca_components, - columns=["PC" + str(i) for i in range(0, self.data["n_kept_components"])], - index=filtered_dataframe.index, - ) - # normalise transformed predictions to transformed training features - self.data_dictionary["prediction_features"] = self.normalize_data_from_metadata( - self.data_dictionary["prediction_features"]) + # def pca_transform(self, filtered_dataframe: DataFrame) -> None: + # """ + # Use an existing pca transform to transform data into components + # :param filtered_dataframe: DataFrame = the cleaned dataframe + # """ + # pca_components = self.pca.transform(filtered_dataframe) + # self.data_dictionary["prediction_features"] = pd.DataFrame( + # data=pca_components, + # columns=["PC" + str(i) for i in range(0, self.data["n_kept_components"])], + # index=filtered_dataframe.index, + # ) + # # normalise transformed predictions to transformed training features + # self.data_dictionary["prediction_features"] = self.normalize_data_from_metadata( + # self.data_dictionary["prediction_features"]) - def compute_distances(self) -> float: - """ - Compute distances between each training point and every other training - point. This metric defines the neighborhood of trained data and is used - for prediction confidence in the Dissimilarity Index - """ - # logger.info("computing average mean distance for all training points") - pairwise = pairwise_distances( - self.data_dictionary["train_features"], n_jobs=self.thread_count) - # remove the diagonal distances which are itself distances ~0 - np.fill_diagonal(pairwise, np.NaN) - pairwise = pairwise.reshape(-1, 1) - avg_mean_dist = pairwise[~np.isnan(pairwise)].mean() + # def compute_distances(self) -> float: + # """ + # Compute distances between each training point and every other training + # point. This metric defines the neighborhood of trained data and is used + # for prediction confidence in the Dissimilarity Index + # """ + # # logger.info("computing average mean distance for all training points") + # pairwise = pairwise_distances( + # self.data_dictionary["train_features"], n_jobs=self.thread_count) + # # remove the diagonal distances which are itself distances ~0 + # np.fill_diagonal(pairwise, np.NaN) + # pairwise = pairwise.reshape(-1, 1) + # avg_mean_dist = pairwise[~np.isnan(pairwise)].mean() - return avg_mean_dist + # return avg_mean_dist - def get_outlier_percentage(self, dropped_pts: npt.NDArray) -> float: - """ - Check if more than X% of points werer dropped during outlier detection. - """ - outlier_protection_pct = self.freqai_config["feature_parameters"].get( - "outlier_protection_percentage", 30) - outlier_pct = (dropped_pts.sum() / len(dropped_pts)) * 100 - if outlier_pct >= outlier_protection_pct: - return outlier_pct - else: - return 0.0 + # def get_outlier_percentage(self, dropped_pts: npt.NDArray) -> float: + # """ + # Check if more than X% of points werer dropped during outlier detection. + # """ + # outlier_protection_pct = self.freqai_config["feature_parameters"].get( + # "outlier_protection_percentage", 30) + # outlier_pct = (dropped_pts.sum() / len(dropped_pts)) * 100 + # if outlier_pct >= outlier_protection_pct: + # return outlier_pct + # else: + # return 0.0 - def use_SVM_to_remove_outliers(self, predict: bool) -> None: - """ - Build/inference a Support Vector Machine to detect outliers - in training data and prediction - :param predict: bool = If true, inference an existing SVM model, else construct one - """ + # def use_SVM_to_remove_outliers(self, predict: bool) -> None: + # """ + # Build/inference a Support Vector Machine to detect outliers + # in training data and prediction + # :param predict: bool = If true, inference an existing SVM model, else construct one + # """ - if self.keras: - logger.warning( - "SVM outlier removal not currently supported for Keras based models. " - "Skipping user requested function." - ) - if predict: - self.do_predict = np.ones(len(self.data_dictionary["prediction_features"])) - return + # if self.keras: + # logger.warning( + # "SVM outlier removal not currently supported for Keras based models. " + # "Skipping user requested function." + # ) + # if predict: + # self.do_predict = np.ones(len(self.data_dictionary["prediction_features"])) + # return - if predict: - if not self.svm_model: - logger.warning("No svm model available for outlier removal") - return - y_pred = self.svm_model.predict(self.data_dictionary["prediction_features"]) - do_predict = np.where(y_pred == -1, 0, y_pred) + # if predict: + # if not self.svm_model: + # logger.warning("No svm model available for outlier removal") + # return + # y_pred = self.svm_model.predict(self.data_dictionary["prediction_features"]) + # do_predict = np.where(y_pred == -1, 0, y_pred) - if (len(do_predict) - do_predict.sum()) > 0: - logger.info(f"SVM tossed {len(do_predict) - do_predict.sum()} predictions.") - self.do_predict += do_predict - self.do_predict -= 1 + # if (len(do_predict) - do_predict.sum()) > 0: + # logger.info(f"SVM tossed {len(do_predict) - do_predict.sum()} predictions.") + # self.do_predict += do_predict + # self.do_predict -= 1 - else: - # use SGDOneClassSVM to increase speed? - svm_params = self.freqai_config["feature_parameters"].get( - "svm_params", {"shuffle": False, "nu": 0.1}) - self.svm_model = linear_model.SGDOneClassSVM(**svm_params).fit( - self.data_dictionary["train_features"] - ) - y_pred = self.svm_model.predict(self.data_dictionary["train_features"]) - kept_points = np.where(y_pred == -1, 0, y_pred) - # keep_index = np.where(y_pred == 1) - outlier_pct = self.get_outlier_percentage(1 - kept_points) - if outlier_pct: - logger.warning( - f"SVM detected {outlier_pct:.2f}% of the points as outliers. " - f"Keeping original dataset." - ) - self.svm_model = None - return + # else: + # # use SGDOneClassSVM to increase speed? + # svm_params = self.freqai_config["feature_parameters"].get( + # "svm_params", {"shuffle": False, "nu": 0.1}) + # self.svm_model = linear_model.SGDOneClassSVM(**svm_params).fit( + # self.data_dictionary["train_features"] + # ) + # y_pred = self.svm_model.predict(self.data_dictionary["train_features"]) + # kept_points = np.where(y_pred == -1, 0, y_pred) + # # keep_index = np.where(y_pred == 1) + # outlier_pct = self.get_outlier_percentage(1 - kept_points) + # if outlier_pct: + # logger.warning( + # f"SVM detected {outlier_pct:.2f}% of the points as outliers. " + # f"Keeping original dataset." + # ) + # self.svm_model = None + # return - self.data_dictionary["train_features"] = self.data_dictionary["train_features"][ - (y_pred == 1) - ] - self.data_dictionary["train_labels"] = self.data_dictionary["train_labels"][ - (y_pred == 1) - ] - self.data_dictionary["train_weights"] = self.data_dictionary["train_weights"][ - (y_pred == 1) - ] + # self.data_dictionary["train_features"] = self.data_dictionary["train_features"][ + # (y_pred == 1) + # ] + # self.data_dictionary["train_labels"] = self.data_dictionary["train_labels"][ + # (y_pred == 1) + # ] + # self.data_dictionary["train_weights"] = self.data_dictionary["train_weights"][ + # (y_pred == 1) + # ] - logger.info( - f"SVM tossed {len(y_pred) - kept_points.sum()}" - f" train points from {len(y_pred)} total points." - ) + # logger.info( + # f"SVM tossed {len(y_pred) - kept_points.sum()}" + # f" train points from {len(y_pred)} total points." + # ) - # same for test data - # TODO: This (and the part above) could be refactored into a separate function - # to reduce code duplication - if self.freqai_config['data_split_parameters'].get('test_size', 0.1) != 0: - y_pred = self.svm_model.predict(self.data_dictionary["test_features"]) - kept_points = np.where(y_pred == -1, 0, y_pred) - self.data_dictionary["test_features"] = self.data_dictionary["test_features"][ - (y_pred == 1) - ] - self.data_dictionary["test_labels"] = self.data_dictionary["test_labels"][( - y_pred == 1)] - self.data_dictionary["test_weights"] = self.data_dictionary["test_weights"][ - (y_pred == 1) - ] + # # same for test data + # # TODO: This (and the part above) could be refactored into a separate function + # # to reduce code duplication + # if self.freqai_config['data_split_parameters'].get('test_size', 0.1) != 0: + # y_pred = self.svm_model.predict(self.data_dictionary["test_features"]) + # kept_points = np.where(y_pred == -1, 0, y_pred) + # self.data_dictionary["test_features"] = self.data_dictionary["test_features"][ + # (y_pred == 1) + # ] + # self.data_dictionary["test_labels"] = self.data_dictionary["test_labels"][( + # y_pred == 1)] + # self.data_dictionary["test_weights"] = self.data_dictionary["test_weights"][ + # (y_pred == 1) + # ] - logger.info( - f"{self.pair}: SVM tossed {len(y_pred) - kept_points.sum()}" - f" test points from {len(y_pred)} total points." - ) + # logger.info( + # f"{self.pair}: SVM tossed {len(y_pred) - kept_points.sum()}" + # f" test points from {len(y_pred)} total points." + # ) - return + # return - def use_DBSCAN_to_remove_outliers(self, predict: bool, eps=None) -> None: - """ - Use DBSCAN to cluster training data and remove "noisy" data (read outliers). - User controls this via the config param `DBSCAN_outlier_pct` which indicates the - pct of training data that they want to be considered outliers. - :param predict: bool = If False (training), iterate to find the best hyper parameters - to match user requested outlier percent target. - If True (prediction), use the parameters determined from - the previous training to estimate if the current prediction point - is an outlier. - """ + # def use_DBSCAN_to_remove_outliers(self, predict: bool, eps=None) -> None: + # """ + # Use DBSCAN to cluster training data and remove "noisy" data (read outliers). + # User controls this via the config param `DBSCAN_outlier_pct` which indicates the + # pct of training data that they want to be considered outliers. + # :param predict: bool = If False (training), iterate to find the best hyper parameters + # to match user requested outlier percent target. + # If True (prediction), use the parameters determined from + # the previous training to estimate if the current prediction point + # is an outlier. + # """ - if predict: - if not self.data['DBSCAN_eps']: - return - train_ft_df = self.data_dictionary['train_features'] - pred_ft_df = self.data_dictionary['prediction_features'] - num_preds = len(pred_ft_df) - df = pd.concat([train_ft_df, pred_ft_df], axis=0, ignore_index=True) - clustering = DBSCAN(eps=self.data['DBSCAN_eps'], - min_samples=self.data['DBSCAN_min_samples'], - n_jobs=self.thread_count - ).fit(df) - do_predict = np.where(clustering.labels_[-num_preds:] == -1, 0, 1) + # if predict: + # if not self.data['DBSCAN_eps']: + # return + # train_ft_df = self.data_dictionary['train_features'] + # pred_ft_df = self.data_dictionary['prediction_features'] + # num_preds = len(pred_ft_df) + # df = pd.concat([train_ft_df, pred_ft_df], axis=0, ignore_index=True) + # clustering = DBSCAN(eps=self.data['DBSCAN_eps'], + # min_samples=self.data['DBSCAN_min_samples'], + # n_jobs=self.thread_count + # ).fit(df) + # do_predict = np.where(clustering.labels_[-num_preds:] == -1, 0, 1) - if (len(do_predict) - do_predict.sum()) > 0: - logger.info(f"DBSCAN tossed {len(do_predict) - do_predict.sum()} predictions") - self.do_predict += do_predict - self.do_predict -= 1 + # if (len(do_predict) - do_predict.sum()) > 0: + # logger.info(f"DBSCAN tossed {len(do_predict) - do_predict.sum()} predictions") + # self.do_predict += do_predict + # self.do_predict -= 1 - else: + # else: - def normalise_distances(distances): - normalised_distances = (distances - distances.min()) / \ - (distances.max() - distances.min()) - return normalised_distances + # def normalise_distances(distances): + # normalised_distances = (distances - distances.min()) / \ + # (distances.max() - distances.min()) + # return normalised_distances - def rotate_point(origin, point, angle): - # rotate a point counterclockwise by a given angle (in radians) - # around a given origin - x = origin[0] + cos(angle) * (point[0] - origin[0]) - \ - sin(angle) * (point[1] - origin[1]) - y = origin[1] + sin(angle) * (point[0] - origin[0]) + \ - cos(angle) * (point[1] - origin[1]) - return (x, y) + # def rotate_point(origin, point, angle): + # # rotate a point counterclockwise by a given angle (in radians) + # # around a given origin + # x = origin[0] + cos(angle) * (point[0] - origin[0]) - \ + # sin(angle) * (point[1] - origin[1]) + # y = origin[1] + sin(angle) * (point[0] - origin[0]) + \ + # cos(angle) * (point[1] - origin[1]) + # return (x, y) - MinPts = int(len(self.data_dictionary['train_features'].index) * 0.25) - # measure pairwise distances to nearest neighbours - neighbors = NearestNeighbors( - n_neighbors=MinPts, n_jobs=self.thread_count) - neighbors_fit = neighbors.fit(self.data_dictionary['train_features']) - distances, _ = neighbors_fit.kneighbors(self.data_dictionary['train_features']) - distances = np.sort(distances, axis=0).mean(axis=1) + # MinPts = int(len(self.data_dictionary['train_features'].index) * 0.25) + # # measure pairwise distances to nearest neighbours + # neighbors = NearestNeighbors( + # n_neighbors=MinPts, n_jobs=self.thread_count) + # neighbors_fit = neighbors.fit(self.data_dictionary['train_features']) + # distances, _ = neighbors_fit.kneighbors(self.data_dictionary['train_features']) + # distances = np.sort(distances, axis=0).mean(axis=1) - normalised_distances = normalise_distances(distances) - x_range = np.linspace(0, 1, len(distances)) - line = np.linspace(normalised_distances[0], - normalised_distances[-1], len(normalised_distances)) - deflection = np.abs(normalised_distances - line) - max_deflection_loc = np.where(deflection == deflection.max())[0][0] - origin = x_range[max_deflection_loc], line[max_deflection_loc] - point = x_range[max_deflection_loc], normalised_distances[max_deflection_loc] - rot_angle = np.pi / 4 - elbow_loc = rotate_point(origin, point, rot_angle) + # normalised_distances = normalise_distances(distances) + # x_range = np.linspace(0, 1, len(distances)) + # line = np.linspace(normalised_distances[0], + # normalised_distances[-1], len(normalised_distances)) + # deflection = np.abs(normalised_distances - line) + # max_deflection_loc = np.where(deflection == deflection.max())[0][0] + # origin = x_range[max_deflection_loc], line[max_deflection_loc] + # point = x_range[max_deflection_loc], normalised_distances[max_deflection_loc] + # rot_angle = np.pi / 4 + # elbow_loc = rotate_point(origin, point, rot_angle) - epsilon = elbow_loc[1] * (distances[-1] - distances[0]) + distances[0] + # epsilon = elbow_loc[1] * (distances[-1] - distances[0]) + distances[0] - clustering = DBSCAN(eps=epsilon, min_samples=MinPts, - n_jobs=int(self.thread_count)).fit( - self.data_dictionary['train_features'] - ) + # clustering = DBSCAN(eps=epsilon, min_samples=MinPts, + # n_jobs=int(self.thread_count)).fit( + # self.data_dictionary['train_features'] + # ) - logger.info(f'DBSCAN found eps of {epsilon:.2f}.') + # logger.info(f'DBSCAN found eps of {epsilon:.2f}.') - self.data['DBSCAN_eps'] = epsilon - self.data['DBSCAN_min_samples'] = MinPts - dropped_points = np.where(clustering.labels_ == -1, 1, 0) + # self.data['DBSCAN_eps'] = epsilon + # self.data['DBSCAN_min_samples'] = MinPts + # dropped_points = np.where(clustering.labels_ == -1, 1, 0) - outlier_pct = self.get_outlier_percentage(dropped_points) - if outlier_pct: - logger.warning( - f"DBSCAN detected {outlier_pct:.2f}% of the points as outliers. " - f"Keeping original dataset." - ) - self.data['DBSCAN_eps'] = 0 - return + # outlier_pct = self.get_outlier_percentage(dropped_points) + # if outlier_pct: + # logger.warning( + # f"DBSCAN detected {outlier_pct:.2f}% of the points as outliers. " + # f"Keeping original dataset." + # ) + # self.data['DBSCAN_eps'] = 0 + # return - self.data_dictionary['train_features'] = self.data_dictionary['train_features'][ - (clustering.labels_ != -1) - ] - self.data_dictionary["train_labels"] = self.data_dictionary["train_labels"][ - (clustering.labels_ != -1) - ] - self.data_dictionary["train_weights"] = self.data_dictionary["train_weights"][ - (clustering.labels_ != -1) - ] + # self.data_dictionary['train_features'] = self.data_dictionary['train_features'][ + # (clustering.labels_ != -1) + # ] + # self.data_dictionary["train_labels"] = self.data_dictionary["train_labels"][ + # (clustering.labels_ != -1) + # ] + # self.data_dictionary["train_weights"] = self.data_dictionary["train_weights"][ + # (clustering.labels_ != -1) + # ] - logger.info( - f"DBSCAN tossed {dropped_points.sum()}" - f" train points from {len(clustering.labels_)}" - ) + # logger.info( + # f"DBSCAN tossed {dropped_points.sum()}" + # f" train points from {len(clustering.labels_)}" + # ) - return + # return - def compute_inlier_metric(self, set_='train') -> None: - """ - Compute inlier metric from backwards distance distributions. - This metric defines how well features from a timepoint fit - into previous timepoints. - """ + # def compute_inlier_metric(self, set_='train') -> None: + # """ + # Compute inlier metric from backwards distance distributions. + # This metric defines how well features from a timepoint fit + # into previous timepoints. + # """ - def normalise(dataframe: DataFrame, key: str) -> DataFrame: - if set_ == 'train': - min_value = dataframe.min() - max_value = dataframe.max() - self.data[f'{key}_min'] = min_value - self.data[f'{key}_max'] = max_value - else: - min_value = self.data[f'{key}_min'] - max_value = self.data[f'{key}_max'] - return (dataframe - min_value) / (max_value - min_value) + # def normalise(dataframe: DataFrame, key: str) -> DataFrame: + # if set_ == 'train': + # min_value = dataframe.min() + # max_value = dataframe.max() + # self.data[f'{key}_min'] = min_value + # self.data[f'{key}_max'] = max_value + # else: + # min_value = self.data[f'{key}_min'] + # max_value = self.data[f'{key}_max'] + # return (dataframe - min_value) / (max_value - min_value) - no_prev_pts = self.freqai_config["feature_parameters"]["inlier_metric_window"] + # no_prev_pts = self.freqai_config["feature_parameters"]["inlier_metric_window"] - if set_ == 'train': - compute_df = copy.deepcopy(self.data_dictionary['train_features']) - elif set_ == 'test': - compute_df = copy.deepcopy(self.data_dictionary['test_features']) - else: - compute_df = copy.deepcopy(self.data_dictionary['prediction_features']) + # if set_ == 'train': + # compute_df = copy.deepcopy(self.data_dictionary['train_features']) + # elif set_ == 'test': + # compute_df = copy.deepcopy(self.data_dictionary['test_features']) + # else: + # compute_df = copy.deepcopy(self.data_dictionary['prediction_features']) - compute_df_reindexed = compute_df.reindex( - index=np.flip(compute_df.index) - ) + # compute_df_reindexed = compute_df.reindex( + # index=np.flip(compute_df.index) + # ) - pairwise = pd.DataFrame( - np.triu( - pairwise_distances(compute_df_reindexed, n_jobs=self.thread_count) - ), - columns=compute_df_reindexed.index, - index=compute_df_reindexed.index - ) - pairwise = pairwise.round(5) + # pairwise = pd.DataFrame( + # np.triu( + # pairwise_distances(compute_df_reindexed, n_jobs=self.thread_count) + # ), + # columns=compute_df_reindexed.index, + # index=compute_df_reindexed.index + # ) + # pairwise = pairwise.round(5) - column_labels = [ - '{}{}'.format('d', i) for i in range(1, no_prev_pts + 1) - ] - distances = pd.DataFrame( - columns=column_labels, index=compute_df.index - ) + # column_labels = [ + # '{}{}'.format('d', i) for i in range(1, no_prev_pts + 1) + # ] + # distances = pd.DataFrame( + # columns=column_labels, index=compute_df.index + # ) - for index in compute_df.index[no_prev_pts:]: - current_row = pairwise.loc[[index]] - current_row_no_zeros = current_row.loc[ - :, (current_row != 0).any(axis=0) - ] - distances.loc[[index]] = current_row_no_zeros.iloc[ - :, :no_prev_pts - ] - distances = distances.replace([np.inf, -np.inf], np.nan) - drop_index = pd.isnull(distances).any(axis=1) - distances = distances[drop_index == 0] + # for index in compute_df.index[no_prev_pts:]: + # current_row = pairwise.loc[[index]] + # current_row_no_zeros = current_row.loc[ + # :, (current_row != 0).any(axis=0) + # ] + # distances.loc[[index]] = current_row_no_zeros.iloc[ + # :, :no_prev_pts + # ] + # distances = distances.replace([np.inf, -np.inf], np.nan) + # drop_index = pd.isnull(distances).any(axis=1) + # distances = distances[drop_index == 0] - inliers = pd.DataFrame(index=distances.index) - for key in distances.keys(): - current_distances = distances[key].dropna() - current_distances = normalise(current_distances, key) - if set_ == 'train': - fit_params = stats.weibull_min.fit(current_distances) - self.data[f'{key}_fit_params'] = fit_params - else: - fit_params = self.data[f'{key}_fit_params'] - quantiles = stats.weibull_min.cdf(current_distances, *fit_params) + # inliers = pd.DataFrame(index=distances.index) + # for key in distances.keys(): + # current_distances = distances[key].dropna() + # current_distances = normalise(current_distances, key) + # if set_ == 'train': + # fit_params = stats.weibull_min.fit(current_distances) + # self.data[f'{key}_fit_params'] = fit_params + # else: + # fit_params = self.data[f'{key}_fit_params'] + # quantiles = stats.weibull_min.cdf(current_distances, *fit_params) - df_inlier = pd.DataFrame( - {key: quantiles}, index=distances.index - ) - inliers = pd.concat( - [inliers, df_inlier], axis=1 - ) + # df_inlier = pd.DataFrame( + # {key: quantiles}, index=distances.index + # ) + # inliers = pd.concat( + # [inliers, df_inlier], axis=1 + # ) - inlier_metric = pd.DataFrame( - data=inliers.sum(axis=1) / no_prev_pts, - columns=['%-inlier_metric'], - index=compute_df.index - ) + # inlier_metric = pd.DataFrame( + # data=inliers.sum(axis=1) / no_prev_pts, + # columns=['%-inlier_metric'], + # index=compute_df.index + # ) - inlier_metric = (2 * (inlier_metric - inlier_metric.min()) / - (inlier_metric.max() - inlier_metric.min()) - 1) + # inlier_metric = (2 * (inlier_metric - inlier_metric.min()) / + # (inlier_metric.max() - inlier_metric.min()) - 1) - if set_ in ('train', 'test'): - inlier_metric = inlier_metric.iloc[no_prev_pts:] - compute_df = compute_df.iloc[no_prev_pts:] - self.remove_beginning_points_from_data_dict(set_, no_prev_pts) - self.data_dictionary[f'{set_}_features'] = pd.concat( - [compute_df, inlier_metric], axis=1) - else: - self.data_dictionary['prediction_features'] = pd.concat( - [compute_df, inlier_metric], axis=1) - self.data_dictionary['prediction_features'].fillna(0, inplace=True) + # if set_ in ('train', 'test'): + # inlier_metric = inlier_metric.iloc[no_prev_pts:] + # compute_df = compute_df.iloc[no_prev_pts:] + # self.remove_beginning_points_from_data_dict(set_, no_prev_pts) + # self.data_dictionary[f'{set_}_features'] = pd.concat( + # [compute_df, inlier_metric], axis=1) + # else: + # self.data_dictionary['prediction_features'] = pd.concat( + # [compute_df, inlier_metric], axis=1) + # self.data_dictionary['prediction_features'].fillna(0, inplace=True) - logger.info('Inlier metric computed and added to features.') + # logger.info('Inlier metric computed and added to features.') - return None + # return None - def remove_beginning_points_from_data_dict(self, set_='train', no_prev_pts: int = 10): - features = self.data_dictionary[f'{set_}_features'] - weights = self.data_dictionary[f'{set_}_weights'] - labels = self.data_dictionary[f'{set_}_labels'] - self.data_dictionary[f'{set_}_weights'] = weights[no_prev_pts:] - self.data_dictionary[f'{set_}_features'] = features.iloc[no_prev_pts:] - self.data_dictionary[f'{set_}_labels'] = labels.iloc[no_prev_pts:] + # def remove_beginning_points_from_data_dict(self, set_='train', no_prev_pts: int = 10): + # features = self.data_dictionary[f'{set_}_features'] + # weights = self.data_dictionary[f'{set_}_weights'] + # labels = self.data_dictionary[f'{set_}_labels'] + # self.data_dictionary[f'{set_}_weights'] = weights[no_prev_pts:] + # self.data_dictionary[f'{set_}_features'] = features.iloc[no_prev_pts:] + # self.data_dictionary[f'{set_}_labels'] = labels.iloc[no_prev_pts:] def add_noise_to_training_features(self) -> None: """ diff --git a/freqtrade/freqai/freqai_interface.py b/freqtrade/freqai/freqai_interface.py index 9cfda05ee..cacbfea67 100644 --- a/freqtrade/freqai/freqai_interface.py +++ b/freqtrade/freqai/freqai_interface.py @@ -23,6 +23,8 @@ from freqtrade.freqai.data_drawer import FreqaiDataDrawer from freqtrade.freqai.data_kitchen import FreqaiDataKitchen from freqtrade.freqai.utils import get_tb_logger, plot_feature_importance, record_params from freqtrade.strategy.interface import IStrategy +from datasieve.pipeline import Pipeline +import datasieve.transforms as ds pd.options.mode.chained_assignment = None @@ -566,6 +568,32 @@ class IFreqaiModel(ABC): if ft_params.get("use_DBSCAN_to_remove_outliers", False): dk.use_DBSCAN_to_remove_outliers(predict=True) + def define_data_pipeline(self, dk: FreqaiDataKitchen) -> None: + ft_params = self.freqai_info["feature_parameters"] + dk.pipeline = Pipeline([('scaler', ds.DataSieveMinMaxScaler(feature_range=(-1, 1)))]) + + if ft_params.get("principal_component_analysis", False): + dk.pipeline.steps += [('pca', ds.DataSievePCA())] + dk.pipeline.steps += [('post-pca-scaler', ds.DataSieveMinMaxScaler(feature_range=(-1, 1)))] + + if ft_params.get("use_SVM_to_remove_outliers", False): + dk.pipeline.steps += [('svm', ds.SVMOutlierExtractor())] + + if ft_params.get("DI_threshold", 0): + dk.pipeline.steps += [('di', ds.DissimilarityIndex())] + + if ft_params.get("use_DBSCAN_to_remove_outliers", False): + dk.pipeline.steps += [('dbscan', ds.DataSieveDBSCAN())] + + dk.pipeline.fitparams = dk.pipeline._validate_fitparams({}, dk.pipeline.steps) + + # if self.freqai_info["feature_parameters"].get('noise_standard_deviation', 0): + # dk.pipeline.extend(('noise', ds.Noise())) + + def define_label_pipeline(self, dk: FreqaiDataKitchen) -> None: + + dk.label_pipeline = Pipeline([('scaler', ds.DataSieveMinMaxScaler(feature_range=(-1, 1)))]) + def model_exists(self, dk: FreqaiDataKitchen) -> bool: """ Given a pair and path, check if a model already exists diff --git a/requirements-freqai.txt b/requirements-freqai.txt index ad069ade2..66da4e873 100644 --- a/requirements-freqai.txt +++ b/requirements-freqai.txt @@ -10,3 +10,4 @@ catboost==1.2; 'arm' not in platform_machine and (sys_platform != 'darwin' or py lightgbm==3.3.5 xgboost==1.7.5 tensorboard==2.13.0 +datasieve==0.0.5 diff --git a/tests/freqai/test_freqai_datakitchen.py b/tests/freqai/test_freqai_datakitchen.py index 13dc6b4b0..e3ef1612c 100644 --- a/tests/freqai/test_freqai_datakitchen.py +++ b/tests/freqai/test_freqai_datakitchen.py @@ -9,9 +9,9 @@ from freqtrade.configuration import TimeRange from freqtrade.data.dataprovider import DataProvider from freqtrade.exceptions import OperationalException from freqtrade.freqai.data_kitchen import FreqaiDataKitchen -from tests.conftest import get_patched_exchange, log_has_re +from tests.conftest import get_patched_exchange # , log_has_re from tests.freqai.conftest import (get_patched_data_kitchen, get_patched_freqai_strategy, - make_data_dictionary, make_unfiltered_dataframe) + make_unfiltered_dataframe) # make_data_dictionary, from tests.freqai.test_freqai_interface import is_mac @@ -72,66 +72,66 @@ def test_check_if_model_expired(mocker, freqai_conf): shutil.rmtree(Path(dk.full_path)) -def test_use_DBSCAN_to_remove_outliers(mocker, freqai_conf, caplog): - freqai = make_data_dictionary(mocker, freqai_conf) - # freqai_conf['freqai']['feature_parameters'].update({"outlier_protection_percentage": 1}) - freqai.dk.use_DBSCAN_to_remove_outliers(predict=False) - assert log_has_re(r"DBSCAN found eps of 1\.7\d\.", caplog) +# def test_use_DBSCAN_to_remove_outliers(mocker, freqai_conf, caplog): +# freqai = make_data_dictionary(mocker, freqai_conf) +# # freqai_conf['freqai']['feature_parameters'].update({"outlier_protection_percentage": 1}) +# freqai.dk.use_DBSCAN_to_remove_outliers(predict=False) +# assert log_has_re(r"DBSCAN found eps of 1\.7\d\.", caplog) -def test_compute_distances(mocker, freqai_conf): - freqai = make_data_dictionary(mocker, freqai_conf) - freqai_conf['freqai']['feature_parameters'].update({"DI_threshold": 1}) - avg_mean_dist = freqai.dk.compute_distances() - assert round(avg_mean_dist, 2) == 1.98 +# def test_compute_distances(mocker, freqai_conf): +# freqai = make_data_dictionary(mocker, freqai_conf) +# freqai_conf['freqai']['feature_parameters'].update({"DI_threshold": 1}) +# avg_mean_dist = freqai.dk.compute_distances() +# assert round(avg_mean_dist, 2) == 1.98 -def test_use_SVM_to_remove_outliers_and_outlier_protection(mocker, freqai_conf, caplog): - freqai = make_data_dictionary(mocker, freqai_conf) - freqai_conf['freqai']['feature_parameters'].update({"outlier_protection_percentage": 0.1}) - freqai.dk.use_SVM_to_remove_outliers(predict=False) - assert log_has_re( - "SVM detected 7.83%", - caplog, - ) +# def test_use_SVM_to_remove_outliers_and_outlier_protection(mocker, freqai_conf, caplog): +# freqai = make_data_dictionary(mocker, freqai_conf) +# freqai_conf['freqai']['feature_parameters'].update({"outlier_protection_percentage": 0.1}) +# freqai.dk.use_SVM_to_remove_outliers(predict=False) +# assert log_has_re( +# "SVM detected 7.83%", +# caplog, +# ) -def test_compute_inlier_metric(mocker, freqai_conf, caplog): - freqai = make_data_dictionary(mocker, freqai_conf) - freqai_conf['freqai']['feature_parameters'].update({"inlier_metric_window": 10}) - freqai.dk.compute_inlier_metric(set_='train') - assert log_has_re( - "Inlier metric computed and added to features.", - caplog, - ) +# def test_compute_inlier_metric(mocker, freqai_conf, caplog): +# freqai = make_data_dictionary(mocker, freqai_conf) +# freqai_conf['freqai']['feature_parameters'].update({"inlier_metric_window": 10}) +# freqai.dk.compute_inlier_metric(set_='train') +# assert log_has_re( +# "Inlier metric computed and added to features.", +# caplog, +# ) -def test_add_noise_to_training_features(mocker, freqai_conf): - freqai = make_data_dictionary(mocker, freqai_conf) - freqai_conf['freqai']['feature_parameters'].update({"noise_standard_deviation": 0.1}) - freqai.dk.add_noise_to_training_features() +# def test_add_noise_to_training_features(mocker, freqai_conf): +# freqai = make_data_dictionary(mocker, freqai_conf) +# freqai_conf['freqai']['feature_parameters'].update({"noise_standard_deviation": 0.1}) +# freqai.dk.add_noise_to_training_features() -def test_remove_beginning_points_from_data_dict(mocker, freqai_conf): - freqai = make_data_dictionary(mocker, freqai_conf) - freqai.dk.remove_beginning_points_from_data_dict(set_='train') +# def test_remove_beginning_points_from_data_dict(mocker, freqai_conf): +# freqai = make_data_dictionary(mocker, freqai_conf) +# freqai.dk.remove_beginning_points_from_data_dict(set_='train') -def test_principal_component_analysis(mocker, freqai_conf, caplog): - freqai = make_data_dictionary(mocker, freqai_conf) - freqai.dk.principal_component_analysis() - assert log_has_re( - "reduced feature dimension by", - caplog, - ) +# def test_principal_component_analysis(mocker, freqai_conf, caplog): +# freqai = make_data_dictionary(mocker, freqai_conf) +# freqai.dk.principal_component_analysis() +# assert log_has_re( +# "reduced feature dimension by", +# caplog, +# ) -def test_normalize_data(mocker, freqai_conf): - freqai = make_data_dictionary(mocker, freqai_conf) - data_dict = freqai.dk.data_dictionary - freqai.dk.normalize_data(data_dict) - assert any('_max' in entry for entry in freqai.dk.data.keys()) - assert any('_min' in entry for entry in freqai.dk.data.keys()) +# def test_normalize_data(mocker, freqai_conf): +# freqai = make_data_dictionary(mocker, freqai_conf) +# data_dict = freqai.dk.data_dictionary +# freqai.dk.normalize_data(data_dict) +# assert any('_max' in entry for entry in freqai.dk.data.keys()) +# assert any('_min' in entry for entry in freqai.dk.data.keys()) def test_filter_features(mocker, freqai_conf):