mirror of
https://github.com/freqtrade/freqtrade.git
synced 2025-11-29 08:33:07 +00:00
Implement ws cleanup
This commit is contained in:
@@ -18,9 +18,11 @@ class ExchangeWS():
|
||||
self.ccxt_object = ccxt_object
|
||||
self._thread = Thread(name="ccxt_ws", target=self.start)
|
||||
self._background_tasks = set()
|
||||
|
||||
self._pairs_watching: Set[Tuple[str, str, CandleType]] = set()
|
||||
self._pairs_scheduled: Set[Tuple[str, str, CandleType]] = set()
|
||||
self.pairs_last_refresh: Dict[Tuple[str, str, CandleType], int] = {}
|
||||
self.pairs_last_request: Dict[Tuple[str, str, CandleType], int] = {}
|
||||
self._thread.start()
|
||||
|
||||
def start(self) -> None:
|
||||
@@ -34,39 +36,19 @@ class ExchangeWS():
|
||||
self._thread.join()
|
||||
logger.debug("Stopped")
|
||||
|
||||
# One task per Watch
|
||||
# async def schedule_schedule(self) -> None:
|
||||
|
||||
# for p in self._pairs_watching:
|
||||
# if p not in self._pairs_scheduled:
|
||||
# self._pairs_scheduled.add(p)
|
||||
# await self.schedule_one_task(p[0], p[1], p[2])
|
||||
|
||||
# async def schedule_one_task(self, pair: str, timeframe: str, candle_type: CandleType) -> None:
|
||||
# task = asyncio.create_task(self._async_watch_ohlcv(pair, timeframe, candle_type))
|
||||
|
||||
# # Add task to the set. This creates a strong reference.
|
||||
# self._background_tasks.add(task)
|
||||
# task.add_done_callback(self.reschedule_or_stop)
|
||||
|
||||
# async def _async_watch_ohlcv(self, pair: str, timeframe: str,
|
||||
# candle_type: CandleType) -> Tuple[str, str, str, List]:
|
||||
# start = time.time()
|
||||
# data = await self.ccxt_object.watch_ohlcv(pair, timeframe, )
|
||||
# logger.info(f"watch done {pair}, {timeframe}, data {len(data)} in {time.time() - start:.2f}s")
|
||||
# return pair, timeframe, candle_type, data
|
||||
|
||||
# def reschedule_or_stop(self, task: asyncio.Task):
|
||||
# # logger.info(f"Task finished {task}")
|
||||
|
||||
# self._background_tasks.discard(task)
|
||||
# pair, timeframe, candle_type, data = task.result()
|
||||
|
||||
# # reschedule
|
||||
# asyncio.run_coroutine_threadsafe(self.schedule_one_task(
|
||||
# pair, timeframe, candle_type), loop=self._loop)
|
||||
|
||||
# End one task epr watch
|
||||
def cleanup_expired(self) -> None:
|
||||
"""
|
||||
Remove pairs from watchlist if they've not been requested within
|
||||
the last timeframe (+ offset)
|
||||
"""
|
||||
from freqtrade.exchange.exchange import timeframe_to_seconds
|
||||
for p in list(self._pairs_watching):
|
||||
_, timeframe, _ = p
|
||||
timeframe_s = timeframe_to_seconds(timeframe)
|
||||
last_refresh = self.pairs_last_request.get(p, 0)
|
||||
if last_refresh > 0 and time.time() - last_refresh > timeframe_s + 20:
|
||||
logger.info(f"Removing {p} from watchlist")
|
||||
self._pairs_watching.discard(p)
|
||||
|
||||
async def schedule_while_true(self) -> None:
|
||||
|
||||
@@ -99,6 +81,7 @@ class ExchangeWS():
|
||||
|
||||
def schedule_ohlcv(self, pair: str, timeframe: str, candle_type: CandleType) -> None:
|
||||
self._pairs_watching.add((pair, timeframe, candle_type))
|
||||
self.pairs_last_request[(pair, timeframe, candle_type)] = time.time()
|
||||
# asyncio.run_coroutine_threadsafe(self.schedule_schedule(), loop=self._loop)
|
||||
asyncio.run_coroutine_threadsafe(self.schedule_while_true(), loop=self._loop)
|
||||
|
||||
self.cleanup_expired()
|
||||
|
||||
Reference in New Issue
Block a user