start transition toward outsourcing the data pipeline with objective of improving pipeline flexibility

This commit is contained in:
robcaulk
2023-05-26 18:40:14 +02:00
parent c23a045de4
commit 31e19add27
8 changed files with 579 additions and 586 deletions

View File

@@ -7,14 +7,15 @@ import torch
from pandas import DataFrame
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.freqai_interface import IFreqaiModel
# from freqtrade.freqai.freqai_interface import IFreqaiModel
from freqtrade.freqai.base_models import BaseRegressionModel
from freqtrade.freqai.torch.PyTorchDataConvertor import PyTorchDataConvertor
logger = logging.getLogger(__name__)
class BasePyTorchModel(IFreqaiModel, ABC):
class BasePyTorchModel(BaseRegressionModel):
"""
Base class for PyTorch type models.
User *must* inherit from this class and set fit() and predict() and
@@ -29,50 +30,50 @@ class BasePyTorchModel(IFreqaiModel, ABC):
self.splits = ["train", "test"] if test_size != 0 else ["train"]
self.window_size = self.freqai_info.get("conv_width", 1)
def train(
self, unfiltered_df: DataFrame, pair: str, dk: FreqaiDataKitchen, **kwargs
) -> Any:
"""
Filter the training data and train a model to it. Train makes heavy use of the datakitchen
for storing, saving, loading, and analyzing the data.
:param unfiltered_df: Full dataframe for the current training period
:return:
:model: Trained model which can be used to inference (self.predict)
"""
# def train(
# self, unfiltered_df: DataFrame, pair: str, dk: FreqaiDataKitchen, **kwargs
# ) -> Any:
# """
# Filter the training data and train a model to it. Train makes heavy use of the datakitchen
# for storing, saving, loading, and analyzing the data.
# :param unfiltered_df: Full dataframe for the current training period
# :return:
# :model: Trained model which can be used to inference (self.predict)
# """
logger.info(f"-------------------- Starting training {pair} --------------------")
# logger.info(f"-------------------- Starting training {pair} --------------------")
start_time = time()
# start_time = time()
features_filtered, labels_filtered = dk.filter_features(
unfiltered_df,
dk.training_features_list,
dk.label_list,
training_filter=True,
)
# features_filtered, labels_filtered = dk.filter_features(
# unfiltered_df,
# dk.training_features_list,
# dk.label_list,
# training_filter=True,
# )
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions", 0) or not self.live:
dk.fit_labels()
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)
# # split data into train/test data.
# data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
# if not self.freqai_info.get("fit_live_predictions", 0) or not self.live:
# dk.fit_labels()
# # normalize all data based on train_dataset only
# data_dictionary = dk.normalize_data(data_dictionary)
# optional additional data cleaning/analysis
self.data_cleaning_train(dk)
# # optional additional data cleaning/analysis
# self.data_cleaning_train(dk)
logger.info(
f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
)
logger.info(f"Training model on {len(data_dictionary['train_features'])} data points")
# logger.info(
# f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
# )
# logger.info(f"Training model on {len(data_dictionary['train_features'])} data points")
model = self.fit(data_dictionary, dk)
end_time = time()
# model = self.fit(data_dictionary, dk)
# end_time = time()
logger.info(f"-------------------- Done training {pair} "
f"({end_time - start_time:.2f} secs) --------------------")
# logger.info(f"-------------------- Done training {pair} "
# f"({end_time - start_time:.2f} secs) --------------------")
return model
# return model
@property
@abstractmethod

View File

@@ -49,21 +49,34 @@ class BaseRegressionModel(IFreqaiModel):
logger.info(f"-------------------- Training on data from {start_date} to "
f"{end_date} --------------------")
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
d = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live:
dk.fit_labels()
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)
# optional additional data cleaning/analysis
self.data_cleaning_train(dk)
self.define_data_pipeline(dk)
self.define_label_pipeline(dk)
d["train_labels"], _, _ = dk.label_pipeline.fit_transform(d["train_labels"])
d["test_labels"], _, _ = dk.label_pipeline.transform(d["test_labels"])
(d["train_features"],
d["train_labels"],
d["train_weights"]) = dk.pipeline.fit_transform(d["train_features"],
d["train_labels"],
d["train_weights"])
(d["test_features"],
d["test_labels"],
d["test_weights"]) = dk.pipeline.transform(d["test_features"],
d["test_labels"],
d["test_weights"])
logger.info(
f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
)
logger.info(f"Training model on {len(data_dictionary['train_features'])} data points")
logger.info(f"Training model on {len(d['train_features'])} data points")
model = self.fit(data_dictionary, dk)
model = self.fit(d, dk)
end_time = time()
@@ -88,11 +101,11 @@ class BaseRegressionModel(IFreqaiModel):
filtered_df, _ = dk.filter_features(
unfiltered_df, dk.training_features_list, training_filter=False
)
filtered_df = dk.normalize_data_from_metadata(filtered_df)
# filtered_df = dk.normalize_data_from_metadata(filtered_df)
dk.data_dictionary["prediction_features"] = filtered_df
# optional additional data cleaning/analysis
self.data_cleaning_predict(dk)
dk.data_dictionary["prediction_features"], outliers, _ = dk.pipeline.transform(
dk.data_dictionary["prediction_features"], outlier_check=True)
predictions = self.model.predict(dk.data_dictionary["prediction_features"])
if self.CONV_WIDTH == 1:
@@ -100,6 +113,8 @@ class BaseRegressionModel(IFreqaiModel):
pred_df = DataFrame(predictions, columns=dk.label_list)
pred_df = dk.denormalize_labels_from_metadata(pred_df)
pred_df, _, _ = dk.label_pipeline.inverse_transform(pred_df)
dk.DI_values = dk.label_pipeline.get_step("di").di_values
dk.do_predict = outliers.to_numpy()
return (pred_df, dk.do_predict)

View File

@@ -1,70 +0,0 @@
import logging
from time import time
from typing import Any
from pandas import DataFrame
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.freqai_interface import IFreqaiModel
logger = logging.getLogger(__name__)
class BaseTensorFlowModel(IFreqaiModel):
"""
Base class for TensorFlow type models.
User *must* inherit from this class and set fit() and predict().
"""
def train(
self, unfiltered_df: DataFrame, pair: str, dk: FreqaiDataKitchen, **kwargs
) -> Any:
"""
Filter the training data and train a model to it. Train makes heavy use of the datakitchen
for storing, saving, loading, and analyzing the data.
:param unfiltered_df: Full dataframe for the current training period
:param metadata: pair metadata from strategy.
:return:
:model: Trained model which can be used to inference (self.predict)
"""
logger.info(f"-------------------- Starting training {pair} --------------------")
start_time = time()
# filter the features requested by user in the configuration file and elegantly handle NaNs
features_filtered, labels_filtered = dk.filter_features(
unfiltered_df,
dk.training_features_list,
dk.label_list,
training_filter=True,
)
start_date = unfiltered_df["date"].iloc[0].strftime("%Y-%m-%d")
end_date = unfiltered_df["date"].iloc[-1].strftime("%Y-%m-%d")
logger.info(f"-------------------- Training on data from {start_date} to "
f"{end_date} --------------------")
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live:
dk.fit_labels()
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)
# optional additional data cleaning/analysis
self.data_cleaning_train(dk)
logger.info(
f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
)
logger.info(f"Training model on {len(data_dictionary['train_features'])} data points")
model = self.fit(data_dictionary, dk)
end_time = time()
logger.info(f"-------------------- Done training {pair} "
f"({end_time - start_time:.2f} secs) --------------------")
return model