feat(transports): batching sqlite inserts

This commit is contained in:
izzy lyseggen
2022-06-17 16:15:47 +01:00
parent 739d8bc189
commit e22bfd72ea
+43 -76
View File
@@ -3,7 +3,7 @@ import sys
import time
import sched
import sqlite3
from typing import Any, List, Dict
from typing import Any, List, Dict, Tuple
from appdirs import user_data_dir
from contextlib import closing
from specklepy.transports.abstract_transport import AbstractTransport
@@ -14,26 +14,29 @@ class SQLiteTransport(AbstractTransport):
_name = "SQLite"
_base_path: str = None
_root_path: str = None
_is_writing: bool = False
_scheduler = sched.scheduler(time.time, time.sleep)
_polling_interval = 0.5 # seconds
__connection: sqlite3.Connection = None
app_name: str = ""
scope: str = ""
saved_obj_count: int = 0
_object_cache: List = None
max_size: int = None
_current_batch: List[Tuple[str, str]] = None
_current_batch_size: int = None
def __init__(
self,
base_path: str = None,
app_name: str = None,
scope: str = None,
max_batch_size_mb: float = 10.0,
**data: Any,
) -> None:
super().__init__(**data)
self.app_name = app_name or "Speckle"
self.scope = scope or "Objects"
self._base_path = base_path or self.get_base_path(self.app_name)
self.max_size = int(max_batch_size_mb * 1000 * 1000)
self._current_batch = []
self._current_batch_size = 0
try:
os.makedirs(self._base_path, exist_ok=True)
@@ -51,12 +54,6 @@ class SQLiteTransport(AbstractTransport):
def __repr__(self) -> str:
return f"SQLiteTransport(app: '{self.app_name}', scope: '{self.scope}')"
# def __write_timer_elapsed(self):
# print("WRITE TIMER ELAPSED")
# proc = Process(target=_run_queue, args=(self.__queue, self._root_path))
# proc.start()
# proc.join()
@staticmethod
def get_base_path(app_name):
# from appdirs https://github.com/ActiveState/appdirs/blob/master/appdirs.py
@@ -75,36 +72,6 @@ class SQLiteTransport(AbstractTransport):
path = os.path.expanduser("~/.config/")
return os.path.join(path, app_name)
# def __consume_queue(self):
# if self._is_writing or self.__queue.empty():
# return
# print("CONSUME QUEUE")
# self._is_writing = True
# while not self.__queue.empty():
# data = self.__queue.get()
# self.save_object(data[0], data[1])
# self._is_writing = False
# self._scheduler.enter(
# delay=self._polling_interval, priority=1, action=self.__consume_queue
# )
# self._scheduler.run(blocking=True)
# def save_object(self, id: str, serialized_object: str) -> None:
# """Adds an object to the queue and schedules it to be saved.
# Arguments:
# id {str} -- the object id
# serialized_object {str} -- the full string representation of the object
# """
# print("SAVE OBJECT")
# self.__queue.put((id, serialized_object))
# self._scheduler.enter(
# delay=self._polling_interval, priority=1, action=self.__consume_queue
# )
# self._scheduler.run(blocking=True)
def save_object_from_transport(
self, id: str, source_transport: AbstractTransport
) -> None:
@@ -118,25 +85,42 @@ class SQLiteTransport(AbstractTransport):
self.save_object(id, serialized_object)
def save_object(self, id: str, serialized_object: str) -> None:
"""Directly saves an object into the database.
"""
Adds an object to the current batch to be written to the db. If the current batch is full,
the batch is written to the db and the current batch is reset.
Arguments:
id {str} -- the object id
serialized_object {str} -- the full string representation of the object
"""
# self.__check_connection()
# try:
# with closing(self.__connection.cursor()) as c:
# c.execute(
# "INSERT OR IGNORE INTO objects(hash, content) VALUES(?,?)",
# (id, serialized_object),
# )
# # self.__connection.commit()
# except Exception as ex:
# raise SpeckleException(
# f"Could not save the object to the local db. Inner exception: {ex}", ex
# )
self._object_cache.append((id, serialized_object))
obj_size = len(serialized_object)
if (
not self._current_batch
or self._current_batch_size + obj_size < self.max_size
):
self._current_batch.append((id, serialized_object))
self._current_batch_size += obj_size
return
self.save_current_batch()
self._current_batch = [(id, serialized_object)]
self._current_batch_size = obj_size
def save_current_batch(self) -> None:
"""Save the current batch of objects to the local db"""
self.__check_connection()
try:
with closing(self.__connection.cursor()) as c:
c.executemany(
"INSERT OR IGNORE INTO objects(hash, content) VALUES(?,?)",
self._current_batch,
)
self.__connection.commit()
except Exception as ex:
raise SpeckleException(
f"Could not save the batch of objects to the local db. Inner exception: {ex}",
ex,
)
def get_object(self, id: str) -> str or None:
self.__check_connection()
@@ -162,11 +146,10 @@ class SQLiteTransport(AbstractTransport):
self.saved_obj_count = 0
def end_write(self):
self.__connection.executemany(
"INSERT OR IGNORE INTO objects(hash, content) VALUES(?,?)",
self._object_cache,
)
self.__connection.commit()
if self._current_batch:
self.save_current_batch()
self._current_batch = []
self._current_batch_size = 0
def copy_object_and_children(
self, id: str, target_transport: AbstractTransport
@@ -206,19 +189,3 @@ class SQLiteTransport(AbstractTransport):
def __del__(self):
self.__connection.close()
# def _run_queue(queue: Queue, root_path: str):
# if queue.empty():
# return
# print("RUN QUEUE")
# conn = sqlite3.connect(root_path)
# while not queue.empty():
# data = queue.get()
# with closing(conn.cursor()) as c:
# c.execute(
# "INSERT OR IGNORE INTO objects(hash, content) VALUES(?,?)",
# (data[0], data[1]),
# )
# conn.commit()
# conn.close()