mirror of
https://github.com/freqtrade/freqtrade.git
synced 2025-12-15 20:31:43 +00:00
Tests with seperate thread
This commit is contained in:
@@ -58,7 +58,6 @@ from freqtrade.exchange.exchange_utils import (
|
|||||||
ROUND,
|
ROUND,
|
||||||
ROUND_DOWN,
|
ROUND_DOWN,
|
||||||
ROUND_UP,
|
ROUND_UP,
|
||||||
CcxtModuleType,
|
|
||||||
amount_to_contract_precision,
|
amount_to_contract_precision,
|
||||||
amount_to_contracts,
|
amount_to_contracts,
|
||||||
amount_to_precision,
|
amount_to_precision,
|
||||||
@@ -75,6 +74,7 @@ from freqtrade.exchange.exchange_utils_timeframe import (
|
|||||||
timeframe_to_prev_date,
|
timeframe_to_prev_date,
|
||||||
timeframe_to_seconds,
|
timeframe_to_seconds,
|
||||||
)
|
)
|
||||||
|
from freqtrade.exchange.exchange_ws import ExchangeWS
|
||||||
from freqtrade.exchange.types import OHLCVResponse, OrderBook, Ticker, Tickers
|
from freqtrade.exchange.types import OHLCVResponse, OrderBook, Ticker, Tickers
|
||||||
from freqtrade.misc import (
|
from freqtrade.misc import (
|
||||||
chunks,
|
chunks,
|
||||||
@@ -230,7 +230,11 @@ class Exchange:
|
|||||||
exchange_conf.get("ccxt_async_config", {}), ccxt_async_config
|
exchange_conf.get("ccxt_async_config", {}), ccxt_async_config
|
||||||
)
|
)
|
||||||
self._api_async = self._init_ccxt(exchange_conf, False, ccxt_async_config)
|
self._api_async = self._init_ccxt(exchange_conf, False, ccxt_async_config)
|
||||||
self._has_watch_ohlcv = self.exchange_has('watchOHLCV')
|
self._ws_async = self._init_ccxt(exchange_conf, False, ccxt_async_config)
|
||||||
|
self._has_watch_ohlcv = self.exchange_has("watchOHLCV")
|
||||||
|
self._exchange_ws: Optional[ExchangeWS] = None
|
||||||
|
if self._has_watch_ohlcv:
|
||||||
|
self._exchange_ws = ExchangeWS(self._config, self._ws_async)
|
||||||
|
|
||||||
logger.info(f'Using Exchange "{self.name}"')
|
logger.info(f'Using Exchange "{self.name}"')
|
||||||
self.required_candle_call_count = 1
|
self.required_candle_call_count = 1
|
||||||
@@ -2222,10 +2226,14 @@ class Exchange:
|
|||||||
data = sorted(data, key=lambda x: x[0])
|
data = sorted(data, key=lambda x: x[0])
|
||||||
return pair, timeframe, candle_type, data, self._ohlcv_partial_candle
|
return pair, timeframe, candle_type, data, self._ohlcv_partial_candle
|
||||||
|
|
||||||
async def _async_watch_ohlcv(self, pair: str, timeframe: str,
|
async def _async_watch_ohlcv(
|
||||||
candle_type: CandleType) -> Tuple[str, str, str, List]:
|
self, pair: str, timeframe: str, candle_type: CandleType
|
||||||
|
) -> Tuple[str, str, str, List]:
|
||||||
start = time.time()
|
start = time.time()
|
||||||
data = await self._api_async.watch_ohlcv(pair, timeframe, )
|
data = await self._api_async.watch_ohlcv(
|
||||||
|
pair,
|
||||||
|
timeframe,
|
||||||
|
)
|
||||||
|
|
||||||
logger.info(f"watch {pair}, {timeframe}, data {len(data)} in {time.time() - start:.2f}s")
|
logger.info(f"watch {pair}, {timeframe}, data {len(data)} in {time.time() - start:.2f}s")
|
||||||
return pair, timeframe, candle_type, data
|
return pair, timeframe, candle_type, data
|
||||||
@@ -2239,19 +2247,28 @@ class Exchange:
|
|||||||
cache: bool,
|
cache: bool,
|
||||||
) -> Coroutine[Any, Any, OHLCVResponse]:
|
) -> Coroutine[Any, Any, OHLCVResponse]:
|
||||||
not_all_data = cache and self.required_candle_call_count > 1
|
not_all_data = cache and self.required_candle_call_count > 1
|
||||||
|
if cache:
|
||||||
|
if self._exchange_ws:
|
||||||
|
# Subscribe to websocket
|
||||||
|
self._exchange_ws.schedule_ohlcv(pair, timeframe, candle_type)
|
||||||
if cache and (pair, timeframe, candle_type) in self._klines:
|
if cache and (pair, timeframe, candle_type) in self._klines:
|
||||||
candle_limit = self.ohlcv_candle_limit(timeframe, candle_type)
|
candle_limit = self.ohlcv_candle_limit(timeframe, candle_type)
|
||||||
min_date = date_minus_candles(timeframe, candle_limit - 5).timestamp()
|
min_date = date_minus_candles(timeframe, candle_limit - 5).timestamp()
|
||||||
one_date = date_minus_candles(timeframe, 1).timestamp()
|
date_minus_candles(timeframe, 1).timestamp()
|
||||||
last_refresh = self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0)
|
last_refresh = self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0)
|
||||||
if (self._has_watch_ohlcv
|
# if self._exchange_ws:
|
||||||
and candle_type in (CandleType.SPOT, CandleType.FUTURES)
|
# self._exchange_ws.schedule_ohlcv(pair, timeframe, candle_type)
|
||||||
and one_date <= last_refresh):
|
# if (
|
||||||
logger.info(f"Using watch {pair}, {timeframe}, {candle_type}")
|
# self._has_watch_ohlcv
|
||||||
return self._async_watch_ohlcv(pair, timeframe, candle_type)
|
# and candle_type in (CandleType.SPOT, CandleType.FUTURES)
|
||||||
pass
|
# and one_date <= last_refresh
|
||||||
|
# ):
|
||||||
|
# logger.info(f"Using watch {pair}, {timeframe}, {candle_type}")
|
||||||
|
# return self._async_watch_ohlcv(pair, timeframe, candle_type)
|
||||||
|
# pass
|
||||||
# Check if 1 call can get us updated candles without hole in the data.
|
# Check if 1 call can get us updated candles without hole in the data.
|
||||||
elif min_date < last_refresh:
|
# el
|
||||||
|
if min_date < last_refresh:
|
||||||
# Cache can be used - do one-off call.
|
# Cache can be used - do one-off call.
|
||||||
not_all_data = False
|
not_all_data = False
|
||||||
else:
|
else:
|
||||||
|
|||||||
94
freqtrade/exchange/exchange_ws.py
Normal file
94
freqtrade/exchange/exchange_ws.py
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from threading import Thread
|
||||||
|
from typing import List, Set, Tuple
|
||||||
|
|
||||||
|
from freqtrade.constants import Config
|
||||||
|
from freqtrade.enums.candletype import CandleType
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ExchangeWS():
|
||||||
|
def __init__(self, config: Config, ccxt_object) -> None:
|
||||||
|
self.config = config
|
||||||
|
self.ccxt_object = ccxt_object
|
||||||
|
self._thread = Thread(name="ccxt_ws", target=self.start)
|
||||||
|
self._background_tasks = set()
|
||||||
|
self._pairs_watching: Set[Tuple[str, str, CandleType]] = set()
|
||||||
|
self._pairs_scheduled: Set[Tuple[str, str, CandleType]] = set()
|
||||||
|
self._thread.start()
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
self._loop = asyncio.new_event_loop()
|
||||||
|
self._loop.run_forever()
|
||||||
|
|
||||||
|
## One task per Watch
|
||||||
|
# async def schedule_schedule(self) -> None:
|
||||||
|
|
||||||
|
# for p in self._pairs_watching:
|
||||||
|
# if p not in self._pairs_scheduled:
|
||||||
|
# self._pairs_scheduled.add(p)
|
||||||
|
# await self.schedule_one_task(p[0], p[1], p[2])
|
||||||
|
|
||||||
|
# async def schedule_one_task(self, pair: str, timeframe: str, candle_type: CandleType) -> None:
|
||||||
|
# task = asyncio.create_task(self._async_watch_ohlcv(pair, timeframe, candle_type))
|
||||||
|
|
||||||
|
# # Add task to the set. This creates a strong reference.
|
||||||
|
# self._background_tasks.add(task)
|
||||||
|
# task.add_done_callback(self.reschedule_or_stop)
|
||||||
|
|
||||||
|
# async def _async_watch_ohlcv(self, pair: str, timeframe: str,
|
||||||
|
# candle_type: CandleType) -> Tuple[str, str, str, List]:
|
||||||
|
# start = time.time()
|
||||||
|
# data = await self.ccxt_object.watch_ohlcv(pair, timeframe, )
|
||||||
|
# logger.info(f"watch done {pair}, {timeframe}, data {len(data)} in {time.time() - start:.2f}s")
|
||||||
|
# return pair, timeframe, candle_type, data
|
||||||
|
|
||||||
|
# def reschedule_or_stop(self, task: asyncio.Task):
|
||||||
|
# # logger.info(f"Task finished {task}")
|
||||||
|
|
||||||
|
# self._background_tasks.discard(task)
|
||||||
|
# pair, timeframe, candle_type, data = task.result()
|
||||||
|
|
||||||
|
# # reschedule
|
||||||
|
# asyncio.run_coroutine_threadsafe(self.schedule_one_task(
|
||||||
|
# pair, timeframe, candle_type), loop=self._loop)
|
||||||
|
|
||||||
|
## End one task epr watch
|
||||||
|
|
||||||
|
async def schedule_while_true(self) -> None:
|
||||||
|
|
||||||
|
for p in self._pairs_watching:
|
||||||
|
if p not in self._pairs_scheduled:
|
||||||
|
self._pairs_scheduled.add(p)
|
||||||
|
pair, timeframe, candle_type = p
|
||||||
|
task = asyncio.create_task(
|
||||||
|
self.continuously_async_watch_ohlcv(pair, timeframe, candle_type))
|
||||||
|
self._background_tasks.add(task)
|
||||||
|
task.add_done_callback(self.continuous_stopped)
|
||||||
|
|
||||||
|
def continuous_stopped(self, task: asyncio.Task):
|
||||||
|
self._background_tasks.discard(task)
|
||||||
|
pair, timeframe, candle_type, data = task.result()
|
||||||
|
self._pairs_scheduled.discard(p)
|
||||||
|
|
||||||
|
logger.info(f"Task finished {task}")
|
||||||
|
|
||||||
|
async def continuously_async_watch_ohlcv(
|
||||||
|
self, pair: str, timeframe: str, candle_type: CandleType) -> Tuple[str, str, str, List]:
|
||||||
|
|
||||||
|
while (pair, timeframe, candle_type) in self._pairs_watching:
|
||||||
|
start = time.time()
|
||||||
|
data = await self.ccxt_object.watch_ohlcv(pair, timeframe, )
|
||||||
|
logger.info(
|
||||||
|
f"watch1 done {pair}, {timeframe}, data {len(data)} in {time.time() - start:.2f}s")
|
||||||
|
|
||||||
|
def schedule_ohlcv(self, pair: str, timeframe: str, candle_type: CandleType) -> None:
|
||||||
|
self._pairs_watching.add((pair, timeframe, candle_type))
|
||||||
|
# asyncio.run_coroutine_threadsafe(self.schedule_schedule(), loop=self._loop)
|
||||||
|
asyncio.run_coroutine_threadsafe(self.schedule_while_true(), loop=self._loop)
|
||||||
|
|
||||||
Reference in New Issue
Block a user