From 059c28542548374ed4e2402f0f9e5c945a8295e8 Mon Sep 17 00:00:00 2001 From: robcaulk Date: Tue, 24 May 2022 12:01:01 +0200 Subject: [PATCH] paying closer attention to managing live retraining on separate thread without affecting prediction of other coins on master thread --- freqtrade/freqai/data_drawer.py | 1 + freqtrade/freqai/data_kitchen.py | 7 +- freqtrade/freqai/freqai_interface.py | 198 ++++++++++-------- .../CatboostPredictionModel.py | 51 ++--- 4 files changed, 139 insertions(+), 118 deletions(-) diff --git a/freqtrade/freqai/data_drawer.py b/freqtrade/freqai/data_drawer.py index a27a4b67f..51f56fae6 100644 --- a/freqtrade/freqai/data_drawer.py +++ b/freqtrade/freqai/data_drawer.py @@ -24,6 +24,7 @@ class FreqaiDataDrawer: self.pair_dict: Dict[str, Any] = {} # dictionary holding all actively inferenced models in memory given a model filename self.model_dictionary: Dict[str, Any] = {} + self.pair_data_dict: Dict[str, Any] = {} self.full_path = full_path self.load_drawer_from_disk() diff --git a/freqtrade/freqai/data_kitchen.py b/freqtrade/freqai/data_kitchen.py index f5ddf8462..a4867d7eb 100644 --- a/freqtrade/freqai/data_kitchen.py +++ b/freqtrade/freqai/data_kitchen.py @@ -91,14 +91,15 @@ class FreqaiDataKitchen: assert config.get('freqai', {}).get('feature_parameters'), ("No Freqai feature_parameters" "found in config file.") - def set_paths(self, trained_timestamp: int = None) -> None: + def set_paths(self, metadata: dict, trained_timestamp: int = None,) -> None: self.full_path = Path(self.config['user_data_dir'] / "models" / str(self.freqai_config.get('live_full_backtestrange') + self.freqai_config.get('identifier'))) - self.data_path = Path(self.full_path / str("sub-train" + "-" + self.pair.split("/")[0] + - str(trained_timestamp))) + self.data_path = Path(self.full_path / str("sub-train" + "-" + + metadata['pair'].split("/")[0] + + str(trained_timestamp))) return diff --git a/freqtrade/freqai/freqai_interface.py b/freqtrade/freqai/freqai_interface.py index 0b1fb3b86..19b7dbb27 100644 --- a/freqtrade/freqai/freqai_interface.py +++ b/freqtrade/freqai/freqai_interface.py @@ -108,14 +108,22 @@ class IFreqaiModel(ABC): self.live = strategy.dp.runmode in (RunMode.DRY_RUN, RunMode.LIVE) # FreqaiDataKitchen is reinstantiated for each coin - self.dh = FreqaiDataKitchen(self.config, self.data_drawer, self.live, metadata["pair"]) - if self.live: - # logger.info('testing live') - self.start_live(dataframe, metadata, strategy) + if not self.training_on_separate_thread: + self.dh = FreqaiDataKitchen(self.config, self.data_drawer, + self.live, metadata["pair"]) + dh = self.start_live(dataframe, metadata, strategy, self.dh) + else: + # we will have at max 2 separate instances of the kitchen at once. + self.dh_fg = FreqaiDataKitchen(self.config, self.data_drawer, + self.live, metadata["pair"]) + dh = self.start_live(dataframe, metadata, strategy, self.dh_fg) - return (self.dh.full_predictions, self.dh.full_do_predict, - self.dh.full_target_mean, self.dh.full_target_std) + return (dh.full_predictions, dh.full_do_predict, + dh.full_target_mean, dh.full_target_std) + + # Backtesting only + self.dh = FreqaiDataKitchen(self.config, self.data_drawer, self.live, metadata["pair"]) logger.info(f'Training {len(self.dh.training_timeranges)} timeranges') @@ -138,8 +146,9 @@ class IFreqaiModel(ABC): self.dh.data_path = Path(self.dh.full_path / str("sub-train" + "-" + metadata['pair'].split("/")[0] + str(int(trained_timestamp.stopts)))) - if not self.model_exists(metadata["pair"], trained_timestamp=trained_timestamp.stopts): - self.model = self.train(dataframe_train, metadata) + if not self.model_exists(metadata["pair"], self.dh, + trained_timestamp=trained_timestamp.stopts): + self.model = self.train(dataframe_train, metadata, self.dh) self.dh.save_data(self.model) else: self.model = self.dh.load_data() @@ -150,7 +159,7 @@ class IFreqaiModel(ABC): # self.model = self.train(dataframe_train, metadata) # self.dh.save_data(self.model) - preds, do_preds = self.predict(dataframe_backtest, metadata) + preds, do_preds = self.predict(dataframe_backtest, self.dh) self.dh.append_predictions(preds, do_preds, len(dataframe_backtest)) print('predictions', len(self.dh.full_predictions), @@ -161,7 +170,8 @@ class IFreqaiModel(ABC): return (self.dh.full_predictions, self.dh.full_do_predict, self.dh.full_target_mean, self.dh.full_target_std) - def start_live(self, dataframe: DataFrame, metadata: dict, strategy: IStrategy) -> None: + def start_live(self, dataframe: DataFrame, metadata: dict, + strategy: IStrategy, dh: FreqaiDataKitchen) -> FreqaiDataKitchen: """ The main broad execution for dry/live. This function will check if a retraining should be performed, and if so, retrain and reset the model. @@ -172,52 +182,49 @@ class IFreqaiModel(ABC): trained_timestamp, coin_first) = self.data_drawer.get_pair_dict_info(metadata) - if trained_timestamp != 0: - self.dh.set_paths(trained_timestamp) - # data_drawer thinks the file eixts, verify here - file_exists = self.model_exists(metadata['pair'], - trained_timestamp=trained_timestamp, - model_filename=model_filename) - if not self.training_on_separate_thread: + file_exists = False + + if trained_timestamp != 0: + dh.set_paths(metadata, trained_timestamp) + # data_drawer thinks the file eixts, verify here + file_exists = self.model_exists(metadata['pair'], + dh, + trained_timestamp=trained_timestamp, + model_filename=model_filename) + + # if not self.training_on_separate_thread: # this will also prevent other pairs from trying to train simultaneously. (self.retrain, - new_trained_timerange) = self.dh.check_if_new_training_required( - trained_timestamp) - self.dh.set_paths(new_trained_timerange.stopts) + new_trained_timerange) = dh.check_if_new_training_required(trained_timestamp) + dh.set_paths(metadata, new_trained_timerange.stopts) + # if self.training_on_separate_thread: + # logger.info("FreqAI training a new model on background thread.") + # self.retrain = False + + if self.retrain or not file_exists: + if coin_first: + self.train_model_in_series(new_trained_timerange, metadata, strategy, dh) + else: + self.training_on_separate_thread = True # acts like a lock + self.retrain_model_on_separate_thread(new_trained_timerange, + metadata, strategy, dh) + else: logger.info("FreqAI training a new model on background thread.") - self.retrain = False - if self.retrain or not file_exists: - if coin_first: - self.train_model_in_series(new_trained_timerange, metadata, strategy) - else: - self.training_on_separate_thread = True # acts like a lock - self.retrain_model_on_separate_thread(new_trained_timerange, - metadata, strategy) + self.model = dh.load_data(coin=metadata['pair']) - self.model = self.dh.load_data(coin=metadata['pair']) + # strategy_provided_features = dh.find_features(dataframe) + # if strategy_provided_features != dh.training_features_list: + # self.train_model_in_series(new_trained_timerange, metadata, strategy) - strategy_provided_features = self.dh.find_features(dataframe) - if strategy_provided_features != self.dh.training_features_list: - self.train_model_in_series(new_trained_timerange, metadata, strategy) + preds, do_preds = self.predict(dataframe, dh) + dh.append_predictions(preds, do_preds, len(dataframe)) - preds, do_preds = self.predict(dataframe, metadata) - self.dh.append_predictions(preds, do_preds, len(dataframe)) + return dh - return - - def make_labels(self, dataframe: DataFrame) -> DataFrame: - """ - User defines the labels here (target values). - :params: - :dataframe: the full dataframe for the present training period - """ - - return - - def data_cleaning_train(self) -> None: + def data_cleaning_train(self, dh: FreqaiDataKitchen) -> None: """ Base data cleaning method for train Any function inside this method should drop training data points from the filtered_dataframe @@ -225,23 +232,23 @@ class IFreqaiModel(ABC): of how outlier data points are dropped from the dataframe used for training. """ if self.freqai_info.get('feature_parameters', {}).get('principal_component_analysis'): - self.dh.principal_component_analysis() + dh.principal_component_analysis() # if self.feature_parameters["determine_statistical_distributions"]: - # self.dh.determine_statistical_distributions() + # dh.determine_statistical_distributions() # if self.feature_parameters["remove_outliers"]: - # self.dh.remove_outliers(predict=False) + # dh.remove_outliers(predict=False) if self.freqai_info.get('feature_parameters', {}).get('use_SVM_to_remove_outliers'): - self.dh.use_SVM_to_remove_outliers(predict=False) + dh.use_SVM_to_remove_outliers(predict=False) if self.freqai_info.get('feature_parameters', {}).get('DI_threshold'): - self.dh.data["avg_mean_dist"] = self.dh.compute_distances() + dh.data["avg_mean_dist"] = dh.compute_distances() - def data_cleaning_predict(self, filtered_dataframe: DataFrame) -> None: + def data_cleaning_predict(self, dh: FreqaiDataKitchen) -> None: """ Base data cleaning method for predict. - These functions each modify self.dh.do_predict, which is a dataframe with equal length + These functions each modify dh.do_predict, which is a dataframe with equal length to the number of candles coming from and returning to the strategy. Inside do_predict, 1 allows prediction and < 0 signals to the strategy that the model is not confident in the prediction. @@ -250,20 +257,20 @@ class IFreqaiModel(ABC): for buy signals. """ if self.freqai_info.get('feature_parameters', {}).get('principal_component_analysis'): - self.dh.pca_transform() + dh.pca_transform() # if self.feature_parameters["determine_statistical_distributions"]: - # self.dh.determine_statistical_distributions() + # dh.determine_statistical_distributions() # if self.feature_parameters["remove_outliers"]: - # self.dh.remove_outliers(predict=True) # creates dropped index + # dh.remove_outliers(predict=True) # creates dropped index if self.freqai_info.get('feature_parameters', {}).get('use_SVM_to_remove_outliers'): - self.dh.use_SVM_to_remove_outliers(predict=True) + dh.use_SVM_to_remove_outliers(predict=True) if self.freqai_info.get('feature_parameters', {}).get('DI_threshold'): - self.dh.check_if_pred_in_training_spaces() # sets do_predict + dh.check_if_pred_in_training_spaces() # sets do_predict - def model_exists(self, pair: str, trained_timestamp: int = None, + def model_exists(self, pair: str, dh: FreqaiDataKitchen, trained_timestamp: int = None, model_filename: str = '') -> bool: """ Given a pair and path, check if a model already exists @@ -272,17 +279,17 @@ class IFreqaiModel(ABC): """ coin, _ = pair.split("/") - if self.live and trained_timestamp is None: - self.dh.model_filename = model_filename - else: - self.dh.model_filename = "cb_" + coin.lower() + "_" + str(trained_timestamp) + # if self.live and trained_timestamp == 0: + # dh.model_filename = model_filename + if not self.live: + dh.model_filename = model_filename = "cb_" + coin.lower() + "_" + str(trained_timestamp) - path_to_modelfile = Path(self.dh.data_path / str(self.dh.model_filename + "_model.joblib")) + path_to_modelfile = Path(dh.data_path / str(model_filename + "_model.joblib")) file_exists = path_to_modelfile.is_file() if file_exists: - logger.info("Found model at %s", self.dh.data_path / self.dh.model_filename) + logger.info("Found model at %s", dh.data_path / dh.model_filename) else: - logger.info("Could not find model at %s", self.dh.data_path / self.dh.model_filename) + logger.info("Could not find model at %s", dh.data_path / dh.model_filename) return file_exists def set_full_path(self) -> None: @@ -293,58 +300,58 @@ class IFreqaiModel(ABC): @threaded def retrain_model_on_separate_thread(self, new_trained_timerange: TimeRange, metadata: dict, - strategy: IStrategy): + strategy: IStrategy, dh: FreqaiDataKitchen): # with nostdout(): - self.dh.download_new_data_for_retraining(new_trained_timerange, metadata) - corr_dataframes, base_dataframes = self.dh.load_pairs_histories(new_trained_timerange, - metadata) + dh.download_new_data_for_retraining(new_trained_timerange, metadata) + corr_dataframes, base_dataframes = dh.load_pairs_histories(new_trained_timerange, + metadata) - unfiltered_dataframe = self.dh.use_strategy_to_populate_indicators(strategy, - corr_dataframes, - base_dataframes, - metadata) + unfiltered_dataframe = dh.use_strategy_to_populate_indicators(strategy, + corr_dataframes, + base_dataframes, + metadata) - self.model = self.train(unfiltered_dataframe, metadata) + self.model = self.train(unfiltered_dataframe, metadata, dh) self.data_drawer.pair_dict[metadata['pair']][ 'trained_timestamp'] = new_trained_timerange.stopts - self.dh.set_new_model_names(metadata, new_trained_timerange) + dh.set_new_model_names(metadata, new_trained_timerange) - self.dh.save_data(self.model, coin=metadata['pair']) + dh.save_data(self.model, coin=metadata['pair']) self.training_on_separate_thread = False self.retrain = False def train_model_in_series(self, new_trained_timerange: TimeRange, metadata: dict, - strategy: IStrategy): + strategy: IStrategy, dh: FreqaiDataKitchen): - self.dh.download_new_data_for_retraining(new_trained_timerange, metadata) - corr_dataframes, base_dataframes = self.dh.load_pairs_histories(new_trained_timerange, - metadata) + dh.download_new_data_for_retraining(new_trained_timerange, metadata) + corr_dataframes, base_dataframes = dh.load_pairs_histories(new_trained_timerange, + metadata) - unfiltered_dataframe = self.dh.use_strategy_to_populate_indicators(strategy, - corr_dataframes, - base_dataframes, - metadata) + unfiltered_dataframe = dh.use_strategy_to_populate_indicators(strategy, + corr_dataframes, + base_dataframes, + metadata) - self.model = self.train(unfiltered_dataframe, metadata) + self.model = self.train(unfiltered_dataframe, metadata, dh) self.data_drawer.pair_dict[metadata['pair']][ 'trained_timestamp'] = new_trained_timerange.stopts - self.dh.set_new_model_names(metadata, new_trained_timerange) + dh.set_new_model_names(metadata, new_trained_timerange) self.data_drawer.pair_dict[metadata['pair']]['first'] = False - self.dh.save_data(self.model, coin=metadata['pair']) + dh.save_data(self.model, coin=metadata['pair']) self.retrain = False # Methods which are overridden by user made prediction models. # See freqai/prediction_models/CatboostPredictionModlel.py for an example. @abstractmethod - def train(self, unfiltered_dataframe: DataFrame, metadata: dict) -> Any: + def train(self, unfiltered_dataframe: DataFrame, metadata: dict, dh: FreqaiDataKitchen) -> Any: """ Filter the training data and train a model to it. Train makes heavy use of the datahandler for storing, saving, loading, and analyzing the data. @@ -369,7 +376,8 @@ class IFreqaiModel(ABC): return @abstractmethod - def predict(self, dataframe: DataFrame, metadata: dict) -> Tuple[npt.ArrayLike, npt.ArrayLike]: + def predict(self, dataframe: DataFrame, + dh: FreqaiDataKitchen) -> Tuple[npt.ArrayLike, npt.ArrayLike]: """ Filter the prediction features data and predict with it. :param: unfiltered_dataframe: Full dataframe for the current backtest period. @@ -378,3 +386,13 @@ class IFreqaiModel(ABC): :do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove data (NaNs) or felt uncertain about data (PCA and DI index) """ + + @abstractmethod + def make_labels(self, dataframe: DataFrame, dh: FreqaiDataKitchen) -> DataFrame: + """ + User defines the labels here (target values). + :params: + :dataframe: the full dataframe for the present training period + """ + + return diff --git a/freqtrade/freqai/prediction_models/CatboostPredictionModel.py b/freqtrade/freqai/prediction_models/CatboostPredictionModel.py index 6349174ad..87ddfdb66 100644 --- a/freqtrade/freqai/prediction_models/CatboostPredictionModel.py +++ b/freqtrade/freqai/prediction_models/CatboostPredictionModel.py @@ -4,6 +4,7 @@ from typing import Any, Dict, Tuple from catboost import CatBoostRegressor, Pool from pandas import DataFrame +from freqtrade.freqai.data_kitchen import FreqaiDataKitchen from freqtrade.freqai.freqai_interface import IFreqaiModel @@ -17,7 +18,7 @@ class CatboostPredictionModel(IFreqaiModel): has its own DataHandler where data is held, saved, loaded, and managed. """ - def make_labels(self, dataframe: DataFrame) -> DataFrame: + def make_labels(self, dataframe: DataFrame, dh: FreqaiDataKitchen) -> DataFrame: """ User defines the labels here (target values). :params: @@ -32,14 +33,15 @@ class CatboostPredictionModel(IFreqaiModel): / dataframe["close"] - 1 ) - self.dh.data["s_mean"] = dataframe["s"].mean() - self.dh.data["s_std"] = dataframe["s"].std() + dh.data["s_mean"] = dataframe["s"].mean() + dh.data["s_std"] = dataframe["s"].std() - # logger.info("label mean", self.dh.data["s_mean"], "label std", self.dh.data["s_std"]) + # logger.info("label mean", dh.data["s_mean"], "label std", dh.data["s_std"]) return dataframe["s"] - def train(self, unfiltered_dataframe: DataFrame, metadata: dict) -> Tuple[DataFrame, DataFrame]: + def train(self, unfiltered_dataframe: DataFrame, + metadata: dict, dh: FreqaiDataKitchen) -> Tuple[DataFrame, DataFrame]: """ Filter the training data and train a model to it. Train makes heavy use of the datahkitchen for storing, saving, loading, and analyzing the data. @@ -52,25 +54,25 @@ class CatboostPredictionModel(IFreqaiModel): logger.info("--------------------Starting training--------------------") # create the full feature list based on user config info - self.dh.training_features_list = self.dh.find_features(unfiltered_dataframe) - unfiltered_labels = self.make_labels(unfiltered_dataframe) + dh.training_features_list = dh.find_features(unfiltered_dataframe) + unfiltered_labels = self.make_labels(unfiltered_dataframe, dh) # filter the features requested by user in the configuration file and elegantly handle NaNs - features_filtered, labels_filtered = self.dh.filter_features( + features_filtered, labels_filtered = dh.filter_features( unfiltered_dataframe, - self.dh.training_features_list, + dh.training_features_list, unfiltered_labels, training_filter=True, ) # split data into train/test data. - data_dictionary = self.dh.make_train_test_datasets(features_filtered, labels_filtered) + data_dictionary = dh.make_train_test_datasets(features_filtered, labels_filtered) # standardize all data based on train_dataset only - data_dictionary = self.dh.standardize_data(data_dictionary) + data_dictionary = dh.standardize_data(data_dictionary) # optional additional data cleaning/analysis - self.data_cleaning_train() + self.data_cleaning_train(dh) - logger.info(f'Training model on {len(self.dh.training_features_list)} features') + logger.info(f'Training model on {len(dh.training_features_list)} features') logger.info(f'Training model on {len(data_dictionary["train_features"])} data points') model = self.fit(data_dictionary) @@ -107,8 +109,8 @@ class CatboostPredictionModel(IFreqaiModel): return model - def predict(self, unfiltered_dataframe: DataFrame, metadata: dict) -> Tuple[DataFrame, - DataFrame]: + def predict(self, unfiltered_dataframe: DataFrame, + dh: FreqaiDataKitchen) -> Tuple[DataFrame, DataFrame]: """ Filter the prediction features data and predict with it. :param: unfiltered_dataframe: Full dataframe for the current backtest period. @@ -120,23 +122,22 @@ class CatboostPredictionModel(IFreqaiModel): # logger.info("--------------------Starting prediction--------------------") - original_feature_list = self.dh.find_features(unfiltered_dataframe) - filtered_dataframe, _ = self.dh.filter_features( + original_feature_list = dh.find_features(unfiltered_dataframe) + filtered_dataframe, _ = dh.filter_features( unfiltered_dataframe, original_feature_list, training_filter=False ) - filtered_dataframe = self.dh.standardize_data_from_metadata(filtered_dataframe) - self.dh.data_dictionary["prediction_features"] = filtered_dataframe + filtered_dataframe = dh.standardize_data_from_metadata(filtered_dataframe) + dh.data_dictionary["prediction_features"] = filtered_dataframe # optional additional data cleaning/analysis - self.data_cleaning_predict(filtered_dataframe) + self.data_cleaning_predict(dh) - predictions = self.model.predict(self.dh.data_dictionary["prediction_features"]) + predictions = self.model.predict(dh.data_dictionary["prediction_features"]) # compute the non-standardized predictions - self.dh.predictions = (predictions + 1) * (self.dh.data["labels_max"] - - self.dh.data["labels_min"]) / 2 + self.dh.data[ - "labels_min"] + dh.predictions = (predictions + 1) * (dh.data["labels_max"] - + dh.data["labels_min"]) / 2 + dh.data["labels_min"] # logger.info("--------------------Finished prediction--------------------") - return (self.dh.predictions, self.dh.do_predict) + return (dh.predictions, dh.do_predict)