Files
freqtrade/tests/exchange/test_exchange_ws.py
2025-01-20 19:23:04 +01:00

190 lines
6.1 KiB
Python

import asyncio
import threading
from datetime import timedelta
from time import sleep
from unittest.mock import AsyncMock, MagicMock
from freqtrade.enums import CandleType
from freqtrade.exchange.exchange_ws import ExchangeWS
from ft_client.test_client.test_rest_client import log_has_re
def test_exchangews_init(mocker):
config = MagicMock()
ccxt_object = MagicMock()
mocker.patch("freqtrade.exchange.exchange_ws.ExchangeWS._start_forever", MagicMock())
exchange_ws = ExchangeWS(config, ccxt_object)
sleep(0.1)
assert exchange_ws.config == config
assert exchange_ws.ccxt_object == ccxt_object
assert exchange_ws._thread.name == "ccxt_ws"
assert exchange_ws._background_tasks == set()
assert exchange_ws._klines_watching == set()
assert exchange_ws._klines_scheduled == set()
assert exchange_ws.klines_last_refresh == {}
assert exchange_ws.klines_last_request == {}
# Cleanup
exchange_ws.cleanup()
def test_exchangews_cleanup_error(mocker, caplog):
config = MagicMock()
ccxt_object = MagicMock()
ccxt_object.close = AsyncMock(side_effect=Exception("Test"))
mocker.patch("freqtrade.exchange.exchange_ws.ExchangeWS._start_forever", MagicMock())
exchange_ws = ExchangeWS(config, ccxt_object)
patch_eventloop_threading(exchange_ws)
sleep(0.1)
exchange_ws.reset_connections()
assert log_has_re("Exception in _cleanup_async", caplog)
exchange_ws.cleanup()
def patch_eventloop_threading(exchange):
is_init = False
def thread_fuck():
nonlocal is_init
exchange._loop = asyncio.new_event_loop()
is_init = True
exchange._loop.run_forever()
x = threading.Thread(target=thread_fuck, daemon=True)
x.start()
while not is_init:
pass
async def test_exchangews_ohlcv(mocker, time_machine):
config = MagicMock()
ccxt_object = MagicMock()
async def sleeper(*args, **kwargs):
# pass
await asyncio.sleep(0.12)
return MagicMock()
ccxt_object.watch_ohlcv = AsyncMock(side_effect=sleeper)
ccxt_object.close = AsyncMock()
time_machine.move_to("2024-11-01 01:00:02 +00:00")
mocker.patch("freqtrade.exchange.exchange_ws.ExchangeWS._start_forever", MagicMock())
exchange_ws = ExchangeWS(config, ccxt_object)
patch_eventloop_threading(exchange_ws)
try:
assert exchange_ws._klines_watching == set()
assert exchange_ws._klines_scheduled == set()
exchange_ws.schedule_ohlcv("ETH/BTC", "1m", CandleType.SPOT)
exchange_ws.schedule_ohlcv("XRP/BTC", "1m", CandleType.SPOT)
await asyncio.sleep(0.2)
assert exchange_ws._klines_watching == {
("ETH/BTC", "1m", CandleType.SPOT),
("XRP/BTC", "1m", CandleType.SPOT),
}
assert exchange_ws._klines_scheduled == {
("ETH/BTC", "1m", CandleType.SPOT),
("XRP/BTC", "1m", CandleType.SPOT),
}
await asyncio.sleep(0.1)
assert ccxt_object.watch_ohlcv.call_count == 6
ccxt_object.watch_ohlcv.reset_mock()
time_machine.shift(timedelta(minutes=5))
exchange_ws.schedule_ohlcv("ETH/BTC", "1m", CandleType.SPOT)
await asyncio.sleep(1)
# XRP/BTC should be cleaned up.
assert exchange_ws._klines_watching == {
("ETH/BTC", "1m", CandleType.SPOT),
}
# Cleanup happened.
exchange_ws.schedule_ohlcv("ETH/BTC", "1m", CandleType.SPOT)
assert exchange_ws._klines_watching == {
("ETH/BTC", "1m", CandleType.SPOT),
}
assert exchange_ws._klines_scheduled == {
("ETH/BTC", "1m", CandleType.SPOT),
}
finally:
# Cleanup
exchange_ws.cleanup()
async def test_exchangews_get_ohlcv(mocker, caplog):
config = MagicMock()
ccxt_object = MagicMock()
ccxt_object.ohlcvs = {
"ETH/USDT": {
"1m": [
[1635840000000, 100, 200, 300, 400, 500],
[1635840060000, 101, 201, 301, 401, 501],
[1635840120000, 102, 202, 302, 402, 502],
],
"5m": [
[1635840000000, 100, 200, 300, 400, 500],
[1635840300000, 105, 201, 301, 401, 501],
[1635840600000, 102, 202, 302, 402, 502],
],
}
}
mocker.patch("freqtrade.exchange.exchange_ws.ExchangeWS._start_forever", MagicMock())
exchange_ws = ExchangeWS(config, ccxt_object)
exchange_ws.klines_last_refresh = {
("ETH/USDT", "1m", CandleType.SPOT): 1635840120000,
("ETH/USDT", "5m", CandleType.SPOT): 1635840600000,
}
# Matching last candle time - drop hint is true
resp = await exchange_ws.get_ohlcv("ETH/USDT", "1m", CandleType.SPOT, 1635840120000)
assert resp[0] == "ETH/USDT"
assert resp[1] == "1m"
assert resp[3] == [
[1635840000000, 100, 200, 300, 400, 500],
[1635840060000, 101, 201, 301, 401, 501],
[1635840120000, 102, 202, 302, 402, 502],
]
assert resp[4] is True
# expected time > last candle time - drop hint is false
resp = await exchange_ws.get_ohlcv("ETH/USDT", "1m", CandleType.SPOT, 1635840180000)
assert resp[0] == "ETH/USDT"
assert resp[1] == "1m"
assert resp[3] == [
[1635840000000, 100, 200, 300, 400, 500],
[1635840060000, 101, 201, 301, 401, 501],
[1635840120000, 102, 202, 302, 402, 502],
]
assert resp[4] is False
# Change "received" times to be before the candle starts.
# This should trigger the "time sync" warning.
exchange_ws.klines_last_refresh = {
("ETH/USDT", "1m", CandleType.SPOT): 1635840110000,
("ETH/USDT", "5m", CandleType.SPOT): 1635840600000,
}
msg = r".*Candle date > last refresh.*"
assert not log_has_re(msg, caplog)
resp = await exchange_ws.get_ohlcv("ETH/USDT", "1m", CandleType.SPOT, 1635840120000)
assert resp[0] == "ETH/USDT"
assert resp[1] == "1m"
assert resp[3] == [
[1635840000000, 100, 200, 300, 400, 500],
[1635840060000, 101, 201, 301, 401, 501],
[1635840120000, 102, 202, 302, 402, 502],
]
assert resp[4] is True
assert log_has_re(msg, caplog)
exchange_ws.cleanup()