From c6c7d1731a36bf0953d4a0d872180322eb2c757c Mon Sep 17 00:00:00 2001 From: cristi8 Date: Mon, 22 Mar 2021 14:24:14 +0200 Subject: [PATCH] ServerTransport: Added object batching and sending from multiple background threads --- speckle/transports/server/__init__.py | 1 + speckle/transports/server/batch_sender.py | 117 ++++++++++++++++++++++ speckle/transports/{ => server}/server.py | 26 ++--- 3 files changed, 129 insertions(+), 15 deletions(-) create mode 100644 speckle/transports/server/__init__.py create mode 100644 speckle/transports/server/batch_sender.py rename speckle/transports/{ => server}/server.py (84%) diff --git a/speckle/transports/server/__init__.py b/speckle/transports/server/__init__.py new file mode 100644 index 0000000..60bf56d --- /dev/null +++ b/speckle/transports/server/__init__.py @@ -0,0 +1 @@ +from .server import ServerTransport diff --git a/speckle/transports/server/batch_sender.py b/speckle/transports/server/batch_sender.py new file mode 100644 index 0000000..990cee7 --- /dev/null +++ b/speckle/transports/server/batch_sender.py @@ -0,0 +1,117 @@ + +import logging +import threading +import queue +import time +import gzip + +import requests +from speckle.logging.exceptions import SpeckleException + +LOG = logging.getLogger(__name__) + + +class BatchSender(object): + def __init__(self, endpoint, token, max_batch_size_mb=1, batch_buffer_length=10, thread_count=4): + self.endpoint = endpoint + self._token = token + + self.max_size = int(max_batch_size_mb * 1000 * 1000) + self._batches = queue.Queue(batch_buffer_length) + self._crt_batch = [] + self._crt_batch_size = 0 + + self.thread_count = thread_count + self._send_threads = [] + self._exception = None + + def send_object(self, obj: str): + if not self._send_threads: + self._create_threads() + + crt_obj_size = len(obj) + if not self._crt_batch or self._crt_batch_size + crt_obj_size < self.max_size: + self._crt_batch.append(obj) + self._crt_batch_size += crt_obj_size + return + + self._batches.put(self._crt_batch) + self._crt_batch = [obj] + self._crt_batch_size = crt_obj_size + + def flush(self): + # Add current non-complete batch + if self._crt_batch: + self._batches.put(self._crt_batch) + self._crt_batch = [] + self._crt_batch_size = 0 + # Wait for queued batches to be sent + self._batches.join() + # End the sending threads + self._delete_threads() + # If there was any error, throw the first exception that occurred during upload + if self._exception is not None: + ex = self._exception + self._exception = None + raise ex + + def _sending_thread_main(self): + try: + session = requests.Session() + session.headers.update( + {"Authorization": f"Bearer {self._token}", "Accept": "text/plain"} + ) + + while True: + batch = self._batches.get() + + # None is a sentinel value, meaning the thread should exit gracefully + if batch is None: + self._batches.task_done() + break + + try: + self._bg_send_batch(session, batch) + except Exception as ex: + self._exception = self._exception or ex + LOG.error("Error sending batch of objects to server: " + str(ex)) + + self._batches.task_done() + except Exception as ex: + self._exception = self._exception or ex + LOG.error("ServerTransport sending thread error: " + str(ex)) + + def _bg_send_batch(self, session, batch): + upload_data = "[" + ",".join(batch) + "]" + upload_data_gzip = gzip.compress(upload_data.encode()) + LOG.info("Uploading batch of %s objects (size: %s, compressed size: %s)" % + (len(batch), len(upload_data), len(upload_data_gzip))) + + r = session.post( + url=self.endpoint, + files={"batch-1": ("batch-1", upload_data_gzip, 'application/gzip')}, + ) + if r.status_code != 201: + LOG.warning("Upload server response: %s", r.text) + raise SpeckleException( + message=f"Could not save the object to the server - status code {r.status_code}" + ) + + def _create_threads(self): + for i in range(self.thread_count): + t = threading.Thread(target=self._sending_thread_main, daemon=True) + t.start() + self._send_threads.append(t) + + def _delete_threads(self): + for i in range(len(self._send_threads)): + self._batches.put(None) + + for thread in self._send_threads: + thread.join() + + self._send_threads = [] + + def __del__(self): + self._delete_threads() + diff --git a/speckle/transports/server.py b/speckle/transports/server/server.py similarity index 84% rename from speckle/transports/server.py rename to speckle/transports/server/server.py index cb5c82b..33722b3 100644 --- a/speckle/transports/server.py +++ b/speckle/transports/server/server.py @@ -1,11 +1,13 @@ import requests -from asyncio import Queue, Task + from typing import Any, Dict, List, Type from speckle.api.client import SpeckleClient from speckle.logging.exceptions import SpeckleException from speckle.transports.abstract_transport import AbstractTransport +from .batch_sender import BatchSender + class ServerTransport(AbstractTransport): _name = "RemoteTransport" @@ -13,36 +15,30 @@ class ServerTransport(AbstractTransport): stream_id: str = None saved_obj_count: int = 0 session: requests.Session = None - __queue: Queue = None - __workers: List[Task] = [] def __init__(self, client: SpeckleClient, stream_id: str, **data: Any) -> None: super().__init__(**data) # TODO: replace client with account or some other auth avenue self.url = client.url self.stream_id = stream_id + + token = client.me['token'] + endpoint = f"{self.url}/objects/{self.stream_id}" + self._batch_sender = BatchSender(endpoint, token, max_batch_size_mb=1) + self.session = requests.Session() self.session.headers.update( - {"Authorization": f"Bearer {client.me['token']}", "Accept": "text/plain"} + {"Authorization": f"Bearer {token}", "Accept": "text/plain"} ) def begin_write(self) -> None: self.saved_obj_count = 0 def end_write(self) -> None: - pass + self._batch_sender.flush() - # TODO: add save task to queue and process as the root is being deserialised def save_object(self, id: str, serialized_object: str) -> None: - endpoint = f"{self.url}/objects/{self.stream_id}" - r = self.session.post( - url=endpoint, - files={"batch-1": ("batch-1", f"[{serialized_object}]")}, - ) - if r.status_code != 201: - raise SpeckleException( - message=f"Could not save the object to the server - status code {r.status_code}" - ) + self._batch_sender.send_object(serialized_object) def save_object_from_transport( self, id: str, source_transport: AbstractTransport