test: improve event-loop mocking

This commit is contained in:
Matthias
2025-07-03 20:59:00 +02:00
parent 4421e54cde
commit e92afb74c6

View File

@@ -50,18 +50,18 @@ def test_exchangews_cleanup_error(mocker, caplog):
def patch_eventloop_threading(exchange):
is_init = False
init_event = threading.Event()
def thread_fuck():
nonlocal is_init
def thread_func():
exchange._loop = asyncio.new_event_loop()
is_init = True
init_event.set()
exchange._loop.run_forever()
x = threading.Thread(target=thread_fuck, daemon=True)
x = threading.Thread(target=thread_func, daemon=True)
x.start()
while not is_init:
pass
# Wait for thread to be properly initialized with timeout
if not init_event.wait(timeout=5.0):
raise RuntimeError("Failed to initialize event loop thread")
async def test_exchangews_ohlcv(mocker, time_machine, caplog):