diff --git a/specklepy/transports/sqlite.py b/specklepy/transports/sqlite.py index 2e1610a..ea95f02 100644 --- a/specklepy/transports/sqlite.py +++ b/specklepy/transports/sqlite.py @@ -3,7 +3,7 @@ import sys import time import sched import sqlite3 -from typing import Any, List, Dict +from typing import Any, List, Dict, Tuple from appdirs import user_data_dir from contextlib import closing from specklepy.transports.abstract_transport import AbstractTransport @@ -14,26 +14,29 @@ class SQLiteTransport(AbstractTransport): _name = "SQLite" _base_path: str = None _root_path: str = None - _is_writing: bool = False - _scheduler = sched.scheduler(time.time, time.sleep) - _polling_interval = 0.5 # seconds __connection: sqlite3.Connection = None app_name: str = "" scope: str = "" saved_obj_count: int = 0 - _object_cache: List = None + max_size: int = None + _current_batch: List[Tuple[str, str]] = None + _current_batch_size: int = None def __init__( self, base_path: str = None, app_name: str = None, scope: str = None, + max_batch_size_mb: float = 10.0, **data: Any, ) -> None: super().__init__(**data) self.app_name = app_name or "Speckle" self.scope = scope or "Objects" self._base_path = base_path or self.get_base_path(self.app_name) + self.max_size = int(max_batch_size_mb * 1000 * 1000) + self._current_batch = [] + self._current_batch_size = 0 try: os.makedirs(self._base_path, exist_ok=True) @@ -51,12 +54,6 @@ class SQLiteTransport(AbstractTransport): def __repr__(self) -> str: return f"SQLiteTransport(app: '{self.app_name}', scope: '{self.scope}')" - # def __write_timer_elapsed(self): - # print("WRITE TIMER ELAPSED") - # proc = Process(target=_run_queue, args=(self.__queue, self._root_path)) - # proc.start() - # proc.join() - @staticmethod def get_base_path(app_name): # from appdirs https://github.com/ActiveState/appdirs/blob/master/appdirs.py @@ -75,36 +72,6 @@ class SQLiteTransport(AbstractTransport): path = os.path.expanduser("~/.config/") return os.path.join(path, app_name) - # def __consume_queue(self): - # if self._is_writing or self.__queue.empty(): - # return - # print("CONSUME QUEUE") - # self._is_writing = True - # while not self.__queue.empty(): - # data = self.__queue.get() - # self.save_object(data[0], data[1]) - # self._is_writing = False - - # self._scheduler.enter( - # delay=self._polling_interval, priority=1, action=self.__consume_queue - # ) - # self._scheduler.run(blocking=True) - - # def save_object(self, id: str, serialized_object: str) -> None: - # """Adds an object to the queue and schedules it to be saved. - - # Arguments: - # id {str} -- the object id - # serialized_object {str} -- the full string representation of the object - # """ - # print("SAVE OBJECT") - # self.__queue.put((id, serialized_object)) - - # self._scheduler.enter( - # delay=self._polling_interval, priority=1, action=self.__consume_queue - # ) - # self._scheduler.run(blocking=True) - def save_object_from_transport( self, id: str, source_transport: AbstractTransport ) -> None: @@ -118,25 +85,42 @@ class SQLiteTransport(AbstractTransport): self.save_object(id, serialized_object) def save_object(self, id: str, serialized_object: str) -> None: - """Directly saves an object into the database. + """ + Adds an object to the current batch to be written to the db. If the current batch is full, + the batch is written to the db and the current batch is reset. Arguments: id {str} -- the object id serialized_object {str} -- the full string representation of the object """ - # self.__check_connection() - # try: - # with closing(self.__connection.cursor()) as c: - # c.execute( - # "INSERT OR IGNORE INTO objects(hash, content) VALUES(?,?)", - # (id, serialized_object), - # ) - # # self.__connection.commit() - # except Exception as ex: - # raise SpeckleException( - # f"Could not save the object to the local db. Inner exception: {ex}", ex - # ) - self._object_cache.append((id, serialized_object)) + obj_size = len(serialized_object) + if ( + not self._current_batch + or self._current_batch_size + obj_size < self.max_size + ): + self._current_batch.append((id, serialized_object)) + self._current_batch_size += obj_size + return + + self.save_current_batch() + self._current_batch = [(id, serialized_object)] + self._current_batch_size = obj_size + + def save_current_batch(self) -> None: + """Save the current batch of objects to the local db""" + self.__check_connection() + try: + with closing(self.__connection.cursor()) as c: + c.executemany( + "INSERT OR IGNORE INTO objects(hash, content) VALUES(?,?)", + self._current_batch, + ) + self.__connection.commit() + except Exception as ex: + raise SpeckleException( + f"Could not save the batch of objects to the local db. Inner exception: {ex}", + ex, + ) def get_object(self, id: str) -> str or None: self.__check_connection() @@ -162,11 +146,10 @@ class SQLiteTransport(AbstractTransport): self.saved_obj_count = 0 def end_write(self): - self.__connection.executemany( - "INSERT OR IGNORE INTO objects(hash, content) VALUES(?,?)", - self._object_cache, - ) - self.__connection.commit() + if self._current_batch: + self.save_current_batch() + self._current_batch = [] + self._current_batch_size = 0 def copy_object_and_children( self, id: str, target_transport: AbstractTransport @@ -206,19 +189,3 @@ class SQLiteTransport(AbstractTransport): def __del__(self): self.__connection.close() - - -# def _run_queue(queue: Queue, root_path: str): -# if queue.empty(): -# return -# print("RUN QUEUE") -# conn = sqlite3.connect(root_path) -# while not queue.empty(): -# data = queue.get() -# with closing(conn.cursor()) as c: -# c.execute( -# "INSERT OR IGNORE INTO objects(hash, content) VALUES(?,?)", -# (data[0], data[1]), -# ) -# conn.commit() -# conn.close()