Ruff and flake fixes

This commit is contained in:
Joe Schr
2024-02-08 19:38:21 +01:00
parent c40f3d91d4
commit 5b264d66e0
2 changed files with 47 additions and 41 deletions

View File

@@ -351,7 +351,6 @@ class Exchange:
return int(self._ft_has.get('ohlcv_candle_limit_per_timeframe', {}).get(
timeframe, self._ft_has.get('ohlcv_candle_limit')))
def trades_candle_limit(
self, timeframe: str, candle_type: CandleType, since_ms: Optional[int] = None) -> int:
"""
@@ -363,9 +362,9 @@ class Exchange:
:param since_ms: Starting timestamp
:return: Candle limit as integer
"""
#TODO: check if there are trades candle limits
# TODO: check if there are trades candle limits
return int(self._ft_has.get('trade_candle_limit_per_timeframe', {}).get(
timeframe, self._ft_has.get('trade_candle_limit',self._max_trades_candle_limit)))
timeframe, self._ft_has.get('trade_candle_limit', self._max_trades_candle_limit)))
def get_markets(self, base_currencies: List[str] = [], quote_currencies: List[str] = [],
spot_only: bool = False, margin_only: bool = False, futures_only: bool = False,
@@ -2020,7 +2019,7 @@ class Exchange:
data = sorted(data, key=lambda x: x[0])
return pair, timeframe, candle_type, data, self._ohlcv_partial_candle
def needed_candle_ms(self, timeframe: str, candle_type:CandleType):
def needed_candle_ms(self, timeframe: str, candle_type: CandleType):
one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(
timeframe, candle_type)
move_to = one_call * self.required_candle_call_count
@@ -2111,26 +2110,37 @@ class Exchange:
self._klines[(pair, timeframe, c_type)] = ohlcv_df
return ohlcv_df
def _process_trades_df(self, pair: str, timeframe: str, c_type: CandleType, ticks: List[List],
cache: bool, drop_incomplete: bool, first_required_candle_date: Optional[int]) -> DataFrame:
def _process_trades_df(self,
pair: str,
timeframe: str,
c_type: CandleType,
ticks: List[List],
cache: bool,
drop_incomplete: bool,
first_required_candle_date: Optional[int]) -> DataFrame:
# keeping parsed dataframe in cache
trades_df = public_trades_to_dataframe(ticks, pair=pair)
# keeping last candle time as last refreshed time of the pair
if ticks and cache:
idx = -2 if drop_incomplete and len(ticks) > 1 else -1
self._trades_last_refresh_time[(pair, timeframe, c_type)] = trades_df['timestamp'].iat[idx] // 1000 # NOTE: // is floor: divides and rounds to nearest int
# NOTE: // is floor: divides and rounds to nearest int
self._trades_last_refresh_time[(pair, timeframe, c_type)] = trades_df['timestamp'].iat[idx] // 1000 # noqa
if cache:
if (pair, timeframe, c_type) in self._trades:
old = self._trades[(pair, timeframe, c_type)]
# Reassign so we return the updated, combined df
trades_df = clean_duplicate_trades(concat(
[old, trades_df], axis=0), timeframe, pair, fill_missing=False, drop_incomplete=False)
[old, trades_df], axis=0),
timeframe,
pair,
fill_missing=False,
drop_incomplete=False)
# warn_of_tick_duplicates(trades_df, pair)
# Age out old candles
if first_required_candle_date:
# slice of older dates
trades_df = trades_df[first_required_candle_date <
trades_df['timestamp']]
trades_df = trades_df[
first_required_candle_date < trades_df['timestamp']]
trades_df = trades_df.reset_index(drop=True)
self._trades[(pair, timeframe, c_type)] = trades_df
return trades_df
@@ -2214,7 +2224,9 @@ class Exchange:
# b. no cache used
# c. need new data
is_in_cache = (pair, timeframe, candle_type) in self._trades
if ( not is_in_cache or not cache or self._now_is_time_to_refresh_trades(pair, timeframe, candle_type)):
if (not is_in_cache
or not cache
or self._now_is_time_to_refresh_trades(pair, timeframe, candle_type)):
logger.debug(f"Refreshing TRADES data for {pair}")
# fetch trades since latest _trades and
# store together with existing trades
@@ -2223,7 +2235,7 @@ class Exchange:
from_id = None
if is_in_cache:
from_id = self._trades[(pair, timeframe, candle_type)].iloc[-1]['id']
until = dt_ts() # now
until = dt_ts() # now
else:
until = int(timeframe_to_prev_date(timeframe).timestamp()) * 1000
@@ -2233,41 +2245,37 @@ class Exchange:
if all_stored_ticks_df.iloc[0]['timestamp'] <= first_candle_ms:
last_cached_ms = all_stored_ticks_df.iloc[-1]['timestamp']
# only use cached if it's closer than first_candle_ms
since_ms = last_cached_ms if last_cached_ms > first_candle_ms else first_candle_ms
since_ms = last_cached_ms if last_cached_ms > first_candle_ms else first_candle_ms # noqa
# doesn't go far enough
else:
all_stored_ticks_df = DataFrame(columns=DEFAULT_TRADES_COLUMNS + ['date'])
all_stored_ticks_df = DataFrame(
columns=DEFAULT_TRADES_COLUMNS + ['date'])
# from_id overrules with exchange set to id paginate
# TODO: DEBUG:
# since_ms = 1698060269000
# from_id = None
# TODO: /DEBUG
[ticks_pair, new_ticks]=self._download_trades_history(pair,
since=since_ms if since_ms else first_candle_ms,
until=until,
from_id=from_id)
[ticks_pair, new_ticks] = self._download_trades_history(pair,
since=since_ms if since_ms else first_candle_ms, # noqa
until=until,
from_id=from_id)
except Exception as e:
logger.error(f"Refreshing TRADES data for {pair} failed")
logger.error(e)
raise e
if new_ticks:
drop_incomplete = False # TODO: remove, no incomplete trades
# drop 'date' column from stored ticks
all_stored_ticks_list = all_stored_ticks_df[DEFAULT_TRADES_COLUMNS].values.tolist() # noqa: E501
all_stored_ticks_list.extend(new_ticks)
# NOTE: only process new trades
# self._trades = until_first_candle(stored_trades) + fetch_trades
trades_df = self._process_trades_df(pair,
timeframe,
candle_type,
all_stored_ticks_list,
cache,
drop_incomplete,
first_candle_ms)
drop_incomplete=False,
first_required_candle_date=first_candle_ms)
results_df[(pair, timeframe, candle_type)] = trades_df
data_handler.trades_store(f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS])
@@ -2276,7 +2284,6 @@ class Exchange:
return results_df
def _now_is_time_to_refresh(self, pair: str, timeframe: str, candle_type: CandleType) -> bool:
# Timeframe in seconds
interval_in_sec = timeframe_to_seconds(timeframe)
@@ -2285,12 +2292,14 @@ class Exchange:
now = int(timeframe_to_prev_date(timeframe).timestamp())
return plr < now
def _now_is_time_to_refresh_trades(self, pair: str, timeframe: str, candle_type: CandleType) -> bool:
def _now_is_time_to_refresh_trades(self,
pair: str,
timeframe: str,
candle_type: CandleType) -> bool:
# Timeframe in seconds
df = self.klines((pair, timeframe, candle_type), True)
_calculate_ohlcv_candle_start_and_end(df, timeframe)
timeframe_to_seconds(timeframe)
# plr = self._trades_last_refresh_time.get((pair, timeframe, candle_type), 0) + interval_in_sec
plr = round(df.iloc[-1]["candle_end"].timestamp())
now = int(timeframe_to_prev_date(timeframe).timestamp())
return plr < now
@@ -2360,8 +2369,6 @@ class Exchange:
raise OperationalException(f'Could not fetch historical candle (OHLCV) data '
f'for pair {pair}. Message: {e}') from e
async def _fetch_funding_rate_history(
self,
pair: str,
@@ -2394,11 +2401,15 @@ class Exchange:
returns: List of dicts containing trades, the next iteration value (new "since" or trade_id)
"""
try:
candle_limit = self.trades_candle_limit("1m", candle_type=CandleType.FUTURES, since_ms=since)
candle_limit = self.trades_candle_limit("1m",
candle_type=CandleType.FUTURES,
since_ms=since)
# fetch trades asynchronously
if params:
logger.debug("Fetching trades for pair %s, params: %s ", pair, params)
trades = await self._api_async.fetch_trades(pair, params=params, limit=candle_limit)
trades = await self._api_async.fetch_trades(pair,
params=params,
limit=candle_limit)
else:
logger.debug(
"Fetching trades for pair %s, since %s %s...",
@@ -2444,7 +2455,7 @@ class Exchange:
until: Optional[int],
since: Optional[int] = None,
from_id: Optional[str] = None,
stop_on_from_id: Optional[bool] = True) -> Tuple[str, List[List]]:
stop_on_from_id: Optional[bool] = True) -> Tuple[str, List[List]]: # noqa
"""
Asyncronously gets trade history using fetch_trades
use this when exchange uses id-based iteration (check `self._trades_pagination`)

View File

@@ -165,8 +165,7 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades(
def test_public_trades_put_volume_profile_into_ohlcv_candles(public_trades_list_simple, candles):
df = public_trades_to_dataframe(
public_trades_list_simple, '1m', 'doesntmatter', fill_missing=False, drop_incomplete=False)
df = public_trades_to_dataframe(public_trades_list_simple, 'doesntmatter')
df = trades_to_volumeprofile_with_total_delta_bid_ask(
df, scale=BIN_SIZE_SCALE)
candles['vp'] = np.nan
@@ -178,9 +177,7 @@ def test_public_trades_put_volume_profile_into_ohlcv_candles(public_trades_list_
def test_public_trades_binned_big_sample_list(public_trades_list):
BIN_SIZE_SCALE = 0.05
trades = public_trades_to_dataframe(
public_trades_list, '1m', 'doesntmatter',
fill_missing=False, drop_incomplete=False)
trades = public_trades_to_dataframe(public_trades_list, 'doesntmatter')
df = trades_to_volumeprofile_with_total_delta_bid_ask(
trades, scale=BIN_SIZE_SCALE)
assert df.columns.tolist() == ['bid', 'ask', 'delta',
@@ -207,9 +204,7 @@ def test_public_trades_binned_big_sample_list(public_trades_list):
assert 57.551 == df['delta'].iat[-1] # delta
BIN_SIZE_SCALE = 1
trades = public_trades_to_dataframe(
public_trades_list, '1m', 'doesntmatter',
fill_missing=False, drop_incomplete=False)
trades = public_trades_to_dataframe(public_trades_list, 'doesntmatter')
df = trades_to_volumeprofile_with_total_delta_bid_ask(
trades, scale=BIN_SIZE_SCALE)
assert 2 == len(df)