diff --git a/freqtrade/freqtradebot.py b/freqtrade/freqtradebot.py index 56596f2b9..2e3240cfe 100644 --- a/freqtrade/freqtradebot.py +++ b/freqtrade/freqtradebot.py @@ -187,7 +187,7 @@ class FreqtradeBot(LoggingMixin): if self.get_free_open_trades(): self.enter_positions() - Trade.query.session.flush() + Trade.commit() def process_stopped(self) -> None: """ @@ -620,7 +620,7 @@ class FreqtradeBot(LoggingMixin): self.update_trade_state(trade, order_id, order) Trade.query.session.add(trade) - Trade.query.session.flush() + Trade.commit() # Updating wallets self.wallets.update() @@ -706,6 +706,7 @@ class FreqtradeBot(LoggingMixin): if (self.strategy.order_types.get('stoploss_on_exchange') and self.handle_stoploss_on_exchange(trade)): trades_closed += 1 + Trade.commit() continue # Check if we can sell our current pair if trade.open_order_id is None and trade.is_open and self.handle_trade(trade): @@ -1036,6 +1037,7 @@ class FreqtradeBot(LoggingMixin): elif order['side'] == 'sell': self.handle_cancel_sell(trade, order, constants.CANCEL_REASON['ALL_CANCELLED']) + Trade.commit() def handle_cancel_buy(self, trade: Trade, order: Dict, reason: str) -> bool: """ @@ -1233,7 +1235,7 @@ class FreqtradeBot(LoggingMixin): # In case of market sell orders the order can be closed immediately if order.get('status', 'unknown') == 'closed': self.update_trade_state(trade, trade.open_order_id, order) - Trade.query.session.flush() + Trade.commit() # Lock pair for one candle to prevent immediate re-buys self.strategy.lock_pair(trade.pair, datetime.now(timezone.utc), @@ -1374,6 +1376,7 @@ class FreqtradeBot(LoggingMixin): # Handling of this will happen in check_handle_timeout. return True trade.update(order) + Trade.commit() # Updating wallets when order is closed if not trade.is_open: diff --git a/freqtrade/persistence/migrations.py b/freqtrade/persistence/migrations.py index d89256baf..00c9b91eb 100644 --- a/freqtrade/persistence/migrations.py +++ b/freqtrade/persistence/migrations.py @@ -1,7 +1,7 @@ import logging from typing import List -from sqlalchemy import inspect +from sqlalchemy import inspect, text logger = logging.getLogger(__name__) @@ -62,15 +62,17 @@ def migrate_trades_table(decl_base, inspector, engine, table_back_name: str, col amount_requested = get_column_def(cols, 'amount_requested', 'amount') # Schema migration necessary - engine.execute(f"alter table trades rename to {table_back_name}") - # drop indexes on backup table - for index in inspector.get_indexes(table_back_name): - engine.execute(f"drop index {index['name']}") + with engine.begin() as connection: + connection.execute(text(f"alter table trades rename to {table_back_name}")) + # drop indexes on backup table + for index in inspector.get_indexes(table_back_name): + connection.execute(text(f"drop index {index['name']}")) # let SQLAlchemy create the schema as required decl_base.metadata.create_all(engine) # Copy data back - following the correct schema - engine.execute(f"""insert into trades + with engine.begin() as connection: + connection.execute(text(f"""insert into trades (id, exchange, pair, is_open, fee_open, fee_open_cost, fee_open_currency, fee_close, fee_close_cost, fee_open_currency, open_rate, @@ -104,11 +106,12 @@ def migrate_trades_table(decl_base, inspector, engine, table_back_name: str, col {strategy} strategy, {timeframe} timeframe, {open_trade_value} open_trade_value, {close_profit_abs} close_profit_abs from {table_back_name} - """) + """)) def migrate_open_orders_to_trades(engine): - engine.execute(""" + with engine.begin() as connection: + connection.execute(text(""" insert into orders (ft_trade_id, ft_pair, order_id, ft_order_side, ft_is_open) select id ft_trade_id, pair ft_pair, open_order_id, case when close_rate_requested is null then 'buy' @@ -120,28 +123,30 @@ def migrate_open_orders_to_trades(engine): 'stoploss' ft_order_side, 1 ft_is_open from trades where stoploss_order_id is not null - """) + """)) def migrate_orders_table(decl_base, inspector, engine, table_back_name: str, cols: List): # Schema migration necessary - engine.execute(f"alter table orders rename to {table_back_name}") - # drop indexes on backup table - for index in inspector.get_indexes(table_back_name): - engine.execute(f"drop index {index['name']}") + + with engine.begin() as connection: + connection.execute(text(f"alter table orders rename to {table_back_name}")) + # drop indexes on backup table + for index in inspector.get_indexes(table_back_name): + connection.execute(text(f"drop index {index['name']}")) # let SQLAlchemy create the schema as required decl_base.metadata.create_all(engine) - - engine.execute(f""" - insert into orders ( id, ft_trade_id, ft_order_side, ft_pair, ft_is_open, order_id, status, - symbol, order_type, side, price, amount, filled, average, remaining, cost, order_date, - order_filled_date, order_update_date) - select id, ft_trade_id, ft_order_side, ft_pair, ft_is_open, order_id, status, - symbol, order_type, side, price, amount, filled, null average, remaining, cost, order_date, - order_filled_date, order_update_date - from {table_back_name} - """) + with engine.begin() as connection: + connection.execute(text(f""" + insert into orders ( id, ft_trade_id, ft_order_side, ft_pair, ft_is_open, order_id, + status, symbol, order_type, side, price, amount, filled, average, remaining, cost, + order_date, order_filled_date, order_update_date) + select id, ft_trade_id, ft_order_side, ft_pair, ft_is_open, order_id, + status, symbol, order_type, side, price, amount, filled, null average, remaining, cost, + order_date, order_filled_date, order_update_date + from {table_back_name} + """)) def check_migrate(engine, decl_base, previous_tables) -> None: diff --git a/freqtrade/persistence/models.py b/freqtrade/persistence/models.py index f2e7a10c4..ee934f657 100644 --- a/freqtrade/persistence/models.py +++ b/freqtrade/persistence/models.py @@ -9,10 +9,7 @@ from typing import Any, Dict, List, Optional from sqlalchemy import (Boolean, Column, DateTime, Float, ForeignKey, Integer, String, create_engine, desc, func, inspect) from sqlalchemy.exc import NoSuchModuleError -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import Query, relationship -from sqlalchemy.orm.scoping import scoped_session -from sqlalchemy.orm.session import sessionmaker +from sqlalchemy.orm import Query, declarative_base, relationship, scoped_session, sessionmaker from sqlalchemy.pool import StaticPool from sqlalchemy.sql.schema import UniqueConstraint @@ -41,16 +38,18 @@ def init_db(db_url: str, clean_open_orders: bool = False) -> None: """ kwargs = {} - # Take care of thread ownership if in-memory db if db_url == 'sqlite://': kwargs.update({ - 'connect_args': {'check_same_thread': False}, 'poolclass': StaticPool, - 'echo': False, + }) + # Take care of thread ownership + if db_url.startswith('sqlite://'): + kwargs.update({ + 'connect_args': {'check_same_thread': False}, }) try: - engine = create_engine(db_url, **kwargs) + engine = create_engine(db_url, future=True, **kwargs) except NoSuchModuleError: raise OperationalException(f"Given value for db_url: '{db_url}' " f"is no valid database URL! (See {_SQL_DOCS_URL})") @@ -58,7 +57,7 @@ def init_db(db_url: str, clean_open_orders: bool = False) -> None: # https://docs.sqlalchemy.org/en/13/orm/contextual.html#thread-local-scope # Scoped sessions proxy requests to the appropriate thread-local session. # We should use the scoped_session object - not a seperately initialized version - Trade._session = scoped_session(sessionmaker(bind=engine, autoflush=True, autocommit=True)) + Trade._session = scoped_session(sessionmaker(bind=engine, autoflush=True)) Trade.query = Trade._session.query_property() Order.query = Trade._session.query_property() PairLock.query = Trade._session.query_property() @@ -77,7 +76,7 @@ def cleanup_db() -> None: Flushes all pending operations to disk. :return: None """ - Trade.query.session.flush() + Trade.commit() def clean_dry_run_db() -> None: @@ -89,6 +88,7 @@ def clean_dry_run_db() -> None: # Check we are updating only a dry_run order not a prod one if 'dry_run' in trade.open_order_id: trade.open_order_id = None + Trade.commit() class Order(_DECL_BASE): @@ -177,6 +177,7 @@ class Order(_DECL_BASE): if filtered_orders: oobj = filtered_orders[0] oobj.update_from_ccxt_object(order) + Order.query.session.commit() else: logger.warning(f"Did not find order for {order}.") @@ -712,7 +713,11 @@ class Trade(_DECL_BASE, LocalTrade): Order.query.session.delete(order) Trade.query.session.delete(self) - Trade.query.session.flush() + Trade.commit() + + @staticmethod + def commit(): + Trade.query.session.commit() @staticmethod def get_trades_proxy(*, pair: str = None, is_open: bool = None, diff --git a/freqtrade/persistence/pairlock_middleware.py b/freqtrade/persistence/pairlock_middleware.py index 245f7cdab..af904f693 100644 --- a/freqtrade/persistence/pairlock_middleware.py +++ b/freqtrade/persistence/pairlock_middleware.py @@ -49,7 +49,7 @@ class PairLocks(): ) if PairLocks.use_db: PairLock.query.session.add(lock) - PairLock.query.session.flush() + PairLock.query.session.commit() else: PairLocks.locks.append(lock) @@ -99,7 +99,7 @@ class PairLocks(): for lock in locks: lock.active = False if PairLocks.use_db: - PairLock.query.session.flush() + PairLock.query.session.commit() @staticmethod def is_global_lock(now: Optional[datetime] = None) -> bool: diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index 3f26619a9..c609bccb8 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -569,7 +569,7 @@ class RPC: # Execute sell for all open orders for trade in Trade.get_open_trades(): _exec_forcesell(trade) - Trade.query.session.flush() + Trade.commit() self._freqtrade.wallets.update() return {'result': 'Created sell orders for all open trades.'} @@ -582,7 +582,7 @@ class RPC: raise RPCException('invalid argument') _exec_forcesell(trade) - Trade.query.session.flush() + Trade.commit() self._freqtrade.wallets.update() return {'result': f'Created sell order for trade {trade_id}.'} @@ -615,6 +615,7 @@ class RPC: # execute buy if self._freqtrade.execute_buy(pair, stakeamount, price, forcebuy=True): + Trade.commit() trade = Trade.get_trades([Trade.is_open.is_(True), Trade.pair == pair]).first() return trade else: @@ -705,8 +706,7 @@ class RPC: lock.active = False lock.lock_end_time = datetime.now(timezone.utc) - # session is always the same - PairLock.query.session.flush() + PairLock.query.session.commit() return self._rpc_locks() diff --git a/tests/test_persistence.py b/tests/test_persistence.py index 669f220bb..1576aaa5a 100644 --- a/tests/test_persistence.py +++ b/tests/test_persistence.py @@ -7,7 +7,7 @@ from unittest.mock import MagicMock import arrow import pytest -from sqlalchemy import create_engine, inspect +from sqlalchemy import create_engine, inspect, text from freqtrade import constants from freqtrade.exceptions import DependencyException, OperationalException @@ -486,9 +486,10 @@ def test_migrate_old(mocker, default_conf, fee): mocker.patch('freqtrade.persistence.models.create_engine', lambda *args, **kwargs: engine) # Create table using the old format - engine.execute(create_table_old) - engine.execute(insert_table_old) - engine.execute(insert_table_old2) + with engine.begin() as connection: + connection.execute(text(create_table_old)) + connection.execute(text(insert_table_old)) + connection.execute(text(insert_table_old2)) # Run init to test migration init_db(default_conf['db_url'], default_conf['dry_run']) @@ -579,15 +580,16 @@ def test_migrate_new(mocker, default_conf, fee, caplog): mocker.patch('freqtrade.persistence.models.create_engine', lambda *args, **kwargs: engine) # Create table using the old format - engine.execute(create_table_old) - engine.execute("create index ix_trades_is_open on trades(is_open)") - engine.execute("create index ix_trades_pair on trades(pair)") - engine.execute(insert_table_old) + with engine.begin() as connection: + connection.execute(text(create_table_old)) + connection.execute(text("create index ix_trades_is_open on trades(is_open)")) + connection.execute(text("create index ix_trades_pair on trades(pair)")) + connection.execute(text(insert_table_old)) - # fake previous backup - engine.execute("create table trades_bak as select * from trades") + # fake previous backup + connection.execute(text("create table trades_bak as select * from trades")) - engine.execute("create table trades_bak1 as select * from trades") + connection.execute(text("create table trades_bak1 as select * from trades")) # Run init to test migration init_db(default_conf['db_url'], default_conf['dry_run']) @@ -629,47 +631,49 @@ def test_migrate_new(mocker, default_conf, fee, caplog): caplog.clear() # Drop latest column - engine.execute("alter table orders rename to orders_bak") + with engine.begin() as connection: + connection.execute(text("alter table orders rename to orders_bak")) inspector = inspect(engine) - for index in inspector.get_indexes('orders_bak'): - engine.execute(f"drop index {index['name']}") - # Recreate table - engine.execute(""" - CREATE TABLE orders ( - id INTEGER NOT NULL, - ft_trade_id INTEGER, - ft_order_side VARCHAR NOT NULL, - ft_pair VARCHAR NOT NULL, - ft_is_open BOOLEAN NOT NULL, - order_id VARCHAR NOT NULL, - status VARCHAR, - symbol VARCHAR, - order_type VARCHAR, - side VARCHAR, - price FLOAT, - amount FLOAT, - filled FLOAT, - remaining FLOAT, - cost FLOAT, - order_date DATETIME, - order_filled_date DATETIME, - order_update_date DATETIME, - PRIMARY KEY (id), - CONSTRAINT _order_pair_order_id UNIQUE (ft_pair, order_id), - FOREIGN KEY(ft_trade_id) REFERENCES trades (id) - ) - """) + with engine.begin() as connection: + for index in inspector.get_indexes('orders_bak'): + connection.execute(text(f"drop index {index['name']}")) + # Recreate table + connection.execute(text(""" + CREATE TABLE orders ( + id INTEGER NOT NULL, + ft_trade_id INTEGER, + ft_order_side VARCHAR NOT NULL, + ft_pair VARCHAR NOT NULL, + ft_is_open BOOLEAN NOT NULL, + order_id VARCHAR NOT NULL, + status VARCHAR, + symbol VARCHAR, + order_type VARCHAR, + side VARCHAR, + price FLOAT, + amount FLOAT, + filled FLOAT, + remaining FLOAT, + cost FLOAT, + order_date DATETIME, + order_filled_date DATETIME, + order_update_date DATETIME, + PRIMARY KEY (id), + CONSTRAINT _order_pair_order_id UNIQUE (ft_pair, order_id), + FOREIGN KEY(ft_trade_id) REFERENCES trades (id) + ) + """)) - engine.execute(""" - insert into orders ( id, ft_trade_id, ft_order_side, ft_pair, ft_is_open, order_id, status, - symbol, order_type, side, price, amount, filled, remaining, cost, order_date, - order_filled_date, order_update_date) - select id, ft_trade_id, ft_order_side, ft_pair, ft_is_open, order_id, status, - symbol, order_type, side, price, amount, filled, remaining, cost, order_date, - order_filled_date, order_update_date - from orders_bak - """) + connection.execute(text(""" + insert into orders ( id, ft_trade_id, ft_order_side, ft_pair, ft_is_open, order_id, status, + symbol, order_type, side, price, amount, filled, remaining, cost, order_date, + order_filled_date, order_update_date) + select id, ft_trade_id, ft_order_side, ft_pair, ft_is_open, order_id, status, + symbol, order_type, side, price, amount, filled, remaining, cost, order_date, + order_filled_date, order_update_date + from orders_bak + """)) # Run init to test migration init_db(default_conf['db_url'], default_conf['dry_run']) @@ -722,8 +726,9 @@ def test_migrate_mid_state(mocker, default_conf, fee, caplog): mocker.patch('freqtrade.persistence.models.create_engine', lambda *args, **kwargs: engine) # Create table using the old format - engine.execute(create_table_old) - engine.execute(insert_table_old) + with engine.begin() as connection: + connection.execute(text(create_table_old)) + connection.execute(text(insert_table_old)) # Run init to test migration init_db(default_conf['db_url'], default_conf['dry_run']) @@ -1288,6 +1293,7 @@ def test_Trade_object_idem(): excludes = ( 'delete', 'session', + 'commit', 'query', 'open_date', 'get_best_pair',