diff --git a/freqtrade/data/converter/__init__.py b/freqtrade/data/converter/__init__.py index 3918e49da..3a330554f 100644 --- a/freqtrade/data/converter/__init__.py +++ b/freqtrade/data/converter/__init__.py @@ -1,7 +1,8 @@ -from freqtrade.data.converter.converter import (clean_ohlcv_dataframe, convert_ohlcv_format, +from freqtrade.data.converter.converter import (clean_duplicate_trades, clean_ohlcv_dataframe, convert_ohlcv_format, ohlcv_fill_up_missing_data, ohlcv_to_dataframe, - order_book_to_dataframe, reduce_dataframe_footprint, - trim_dataframe, trim_dataframes) + order_book_to_dataframe, + populate_dataframe_with_trades, public_trades_to_dataframe, + reduce_dataframe_footprint, trim_dataframe, trim_dataframes) from freqtrade.data.converter.trade_converter import (convert_trades_format, convert_trades_to_ohlcv, trades_convert_types, trades_df_remove_duplicates, diff --git a/freqtrade/data/converter/converter.py b/freqtrade/data/converter/converter.py index 0dbfe55a5..b59543fbf 100644 --- a/freqtrade/data/converter/converter.py +++ b/freqtrade/data/converter/converter.py @@ -7,9 +7,11 @@ from typing import Dict import numpy as np import pandas as pd from pandas import DataFrame, to_datetime +import itertools -from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS, Config +from freqtrade.constants import DEFAULT_ORDERFLOW_COLUMNS, DEFAULT_DATAFRAME_COLUMNS, Config from freqtrade.enums import CandleType, TradingMode +from freqtrade.data.converter.trade_converter import trades_df_remove_duplicates logger = logging.getLogger(__name__) @@ -74,10 +76,11 @@ def _calculate_ohlcv_candle_start_and_end(df: DataFrame, timeframe: str): timeframe_frequency, timeframe_minutes = _convert_timeframe_to_pandas_frequency( timeframe) # calculate ohlcv candle start and end - df['datetime'] = pd.to_datetime(df['date'], unit='ms') - df['candle_start'] = df['datetime'].dt.floor(timeframe_frequency) - df['candle_end'] = df['candle_start'] + pd.Timedelta(timeframe_minutes) - df.drop(columns=['datetime'], inplace=True) + if df is not None and not df.empty: + df['datetime'] = pd.to_datetime(df['date'], unit='ms') + df['candle_start'] = df['datetime'].dt.floor(timeframe_frequency) + df['candle_end'] = df['candle_start'] + pd.Timedelta(timeframe_minutes) + df.drop(columns=['datetime'], inplace=True) def populate_dataframe_with_trades(config: Config, dataframe: DataFrame, trades: DataFrame, *, pair: str) -> DataFrame: @@ -168,7 +171,8 @@ def populate_dataframe_with_trades(config: Config, dataframe: DataFrame, trades: max_delta = np.max(deltas_per_trade) df.loc[is_between, 'total_trades'] = len(trades_grouped_df) - dataframe.loc[is_between] = df.loc[is_between].copy() # copy to avoid memory leaks + # copy to avoid memory leaks + dataframe.loc[is_between] = df.loc[is_between].copy() else: logger.debug( f"Found NO candles for trades starting with {candle_start}") @@ -456,7 +460,8 @@ def ohlcv_fill_up_missing_data(dataframe: DataFrame, timeframe: str, pair: str) df.reset_index(inplace=True) len_before = len(dataframe) len_after = len(df) - pct_missing = (len_after - len_before) / len_before if len_before > 0 else 0 + pct_missing = (len_after - len_before) / \ + len_before if len_before > 0 else 0 if len_before != len_after: message = (f"Missing data fillup for {pair}: before: {len_before} - after: {len_after}" f" - {pct_missing:.2%}") @@ -501,7 +506,8 @@ def trim_dataframes(preprocessed: Dict[str, DataFrame], timerange, processed: Dict[str, DataFrame] = {} for pair, df in preprocessed.items(): - trimed_df = trim_dataframe(df, timerange, startup_candles=startup_candles) + trimed_df = trim_dataframe( + df, timerange, startup_candles=startup_candles) if not trimed_df.empty: processed[pair] = trimed_df else: @@ -557,15 +563,18 @@ def convert_ohlcv_format( candle_types = [CandleType.from_string(ct) for ct in config.get('candle_types', [ c.value for c in CandleType])] logger.info(candle_types) - paircombs = src.ohlcv_get_available_data(config['datadir'], TradingMode.SPOT) - paircombs.extend(src.ohlcv_get_available_data(config['datadir'], TradingMode.FUTURES)) + paircombs = src.ohlcv_get_available_data( + config['datadir'], TradingMode.SPOT) + paircombs.extend(src.ohlcv_get_available_data( + config['datadir'], TradingMode.FUTURES)) if 'pairs' in config: # Filter pairs paircombs = [comb for comb in paircombs if comb[0] in config['pairs']] if 'timeframes' in config: - paircombs = [comb for comb in paircombs if comb[1] in config['timeframes']] + paircombs = [comb for comb in paircombs if comb[1] + in config['timeframes']] paircombs = [comb for comb in paircombs if comb[2] in candle_types] paircombs = sorted(paircombs, key=lambda x: (x[0], x[1], x[2].value)) @@ -582,7 +591,8 @@ def convert_ohlcv_format( drop_incomplete=False, startup_candles=0, candle_type=candle_type) - logger.info(f"Converting {len(data)} {timeframe} {candle_type} candles for {pair}") + logger.info( + f"Converting {len(data)} {timeframe} {candle_type} candles for {pair}") if len(data) > 0: trg.ohlcv_store( pair=pair, @@ -592,7 +602,8 @@ def convert_ohlcv_format( ) if erase and convert_from != convert_to: logger.info(f"Deleting source data for {pair} / {timeframe}") - src.ohlcv_purge(pair=pair, timeframe=timeframe, candle_type=candle_type) + src.ohlcv_purge(pair=pair, timeframe=timeframe, + candle_type=candle_type) def reduce_dataframe_footprint(df: DataFrame) -> DataFrame: diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index 7ec4afa35..6c58fe882 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -8,6 +8,7 @@ import logging import signal from copy import deepcopy from datetime import datetime, timedelta, timezone +import arrow from math import floor from threading import Lock from typing import Any, Coroutine, Dict, List, Literal, Optional, Tuple, Union, Callable @@ -2319,7 +2320,7 @@ class Exchange: results_df = {} for pair, timeframe, candle_type in set(pair_list): new_ticks = [] - all_stored_ticks = [] + all_stored_ticks_df = DataFrame(columns=DEFAULT_TRADES_COLUMNS + ['date']) first_candle_ms = self.needed_candle_ms(timeframe, candle_type) # refresh, if # a. not in _trades @@ -2340,19 +2341,20 @@ class Exchange: else: until = int(timeframe_to_prev_date(timeframe).timestamp()) * 1000 - all_stored_ticks = data_handler.trades_load(f"{pair}-cached") - if all_stored_ticks: - if all_stored_ticks[0][0] <= first_candle_ms: - last_cached_ms = all_stored_ticks[-1][0] + all_stored_ticks_df = data_handler.trades_load(f"{pair}-cached") + + if not all_stored_ticks_df.empty: + if all_stored_ticks_df.iloc[0]['timestamp'] <= first_candle_ms: + last_cached_ms = all_stored_ticks_df.iloc[-1]['timestamp'] # only use cached if it's closer than first_candle_ms since_ms = last_cached_ms if last_cached_ms > first_candle_ms else first_candle_ms # doesn't go far enough else: - all_stored_ticks = [] + all_stored_ticks_df = DataFrame(columns=DEFAULT_TRADES_COLUMNS + ['date']) # from_id overrules with exchange set to id paginate # TODO: DEBUG: - # since_ms = 1682609520000 + # since_ms = 1695832200000 # from_id = None # TODO: /DEBUG [ticks_pair, new_ticks]=self._download_trades_history(pair, @@ -2367,12 +2369,14 @@ class Exchange: if new_ticks: drop_incomplete = False # TODO: remove, no incomplete trades - all_stored_ticks.extend(new_ticks) + # drop 'date' column from stored ticks + all_stored_ticks_list = all_stored_ticks_df[DEFAULT_TRADES_COLUMNS].values.tolist() + all_stored_ticks_list.extend(new_ticks) # NOTE: only process new trades # self._trades = until_first_candle(stored_trades) + fetch_trades - trades_df = self._process_trades_df(pair, timeframe, candle_type, all_stored_ticks, cache, drop_incomplete, first_candle_ms) + trades_df = self._process_trades_df(pair, timeframe, candle_type, all_stored_ticks_list, cache, drop_incomplete, first_candle_ms) results_df[(pair, timeframe, candle_type)] = trades_df - data_handler.trades_store(f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS].values.tolist()) + data_handler.trades_store(f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS]) return results_df