diff --git a/speckle/serialization/base_object_serializer.py b/speckle/serialization/base_object_serializer.py index a42aa21..3f9d5a1 100644 --- a/speckle/serialization/base_object_serializer.py +++ b/speckle/serialization/base_object_serializer.py @@ -194,6 +194,13 @@ class BaseObjectSerializer: Returns: Base -- the base object with all its children attached """ + # make sure an obj was passed and create dict if string was somehow passed + if not obj: + return + if isinstance(obj, str): + obj = json.loads(obj) + + # initialise the base object base = Base() # get total children count @@ -214,7 +221,12 @@ class BaseObjectSerializer: # 2. handle referenced child objects elif "referencedId" in value: ref_hash = value["referencedId"] - ref_obj = json.loads(self.read_transport.get_object(id=ref_hash)) + ref_obj_str = self.read_transport.get_object(id=ref_hash) + if not ref_obj_str: + raise SpeckleException( + f"Could not find the referenced child object of id `{ref_hash}` in the given read transport: {self.read_transport.name}" + ) + ref_obj = json.loads(ref_obj_str) base[prop] = self.recompose_base(obj=ref_obj) # 3. handle all other cases (base objects, lists, and dicts) diff --git a/speckle/transports/abstract_transport.py b/speckle/transports/abstract_transport.py index a88b9e3..317f994 100644 --- a/speckle/transports/abstract_transport.py +++ b/speckle/transports/abstract_transport.py @@ -70,7 +70,7 @@ class AbstractTransport(Transport): id {str} -- the hash of the object Returns: - str -- the full string representation of the object (or null of no object is found) + str -- the full string representation of the object (or null if no object is found) """ pass diff --git a/speckle/transports/memory.py b/speckle/transports/memory.py index 2139445..85b5211 100644 --- a/speckle/transports/memory.py +++ b/speckle/transports/memory.py @@ -1,4 +1,4 @@ -import os +import json from speckle.logging.exceptions import SpeckleException from speckle.transports.abstract_transport import AbstractTransport @@ -27,7 +27,7 @@ class MemoryTransport(AbstractTransport): def get_object(self, id: str) -> str or None: if id in self.objects: - return self.objects[id] + return json.dumps(self.objects[id]) else: return None diff --git a/speckle/transports/server.py b/speckle/transports/server.py new file mode 100644 index 0000000..1af7b02 --- /dev/null +++ b/speckle/transports/server.py @@ -0,0 +1,102 @@ +import requests +from asyncio import Queue, Task +from typing import Dict, List + +from speckle.api.client import SpeckleClient +from speckle.logging.exceptions import SpeckleException +from speckle.transports.abstract_transport import AbstractTransport + + +class ServerTransport(AbstractTransport): + _name = "RemoteTransport" + url: str = None + stream_id: str = None + saved_obj_count: int = 0 + session: requests.Session = None + __queue: Queue = None + __workers: List[Task] = [] + + def __init__(self, client: SpeckleClient, stream_id: str) -> None: + # TODO: replace client with account or some other auth avenue + self.url = client.url + self.stream_id = stream_id + self.session = requests.Session() + self.session.headers.update( + {"Authorization": f"Bearer {client.me['token']}", "Accept": "text/plain"} + ) + + def begin_write(self) -> None: + self.saved_obj_count = 0 + + def end_write(self) -> None: + pass + + # TODO: add save task to queue and process as the root is being deserialised + def save_object(self, id: str, serialized_object: str) -> None: + endpoint = f"{self.url}/objects/{self.stream_id}" + r = self.session.post( + url=endpoint, + files={"batch-1": ("batch-1", f"[{serialized_object}]")}, + ) + if r.status_code != 201: + raise SpeckleException( + message=f"Could not save the object to the server - status code {r.status_code}" + ) + + def save_object_from_transport( + self, id: str, source_transport: AbstractTransport + ) -> None: + obj_string = source_transport.get_object(id=id) + self.save_object(id=id, serialized_object=obj_string) + + def get_object(self, id: str) -> str: + # endpoint = f"{self.url}/objects/{self.stream_id}/{id}/single" + # r = self.session.get(endpoint, stream=True) + + # _, obj = next(r.iter_lines().decode("utf-8")).split("\t") + + # return obj + + raise SpeckleException( + "Getting a single object using `ServerTransport.get_object()` is not implemented. To get an object from the server, please use the `SpeckleClient.object.get()` route", + NotImplementedError, + ) + + def copy_object_and_children( + self, id: str, target_transport: AbstractTransport + ) -> str: + endpoint = f"{self.url}/objects/{self.stream_id}/{id}" + r = self.session.get(endpoint, stream=True) + if r.encoding is None: + r.encoding = "utf-8" + lines = r.iter_lines(decode_unicode=True) + + # save first (root) obj for return + root_hash, root_obj = next(lines).split("\t") + target_transport.save_object(root_hash, root_obj) + + # iter through returned objects saving them as we go + for line in lines: + if line: + hash, obj = line.split("\t") + target_transport.save_object(hash, obj) + + return root_obj + + # async def stream_res(self, endpoint: str) -> str: + # data = b"" + # async with aiohttp.ClientSession() as session: + # session.headers.update( + # { + # "Authorization": f"{self.session.headers['Authorization']}", + # "Accept": "text/plain", + # } + # ) + # async with session.get(endpoint) as res: + # while True: + # chunk = await res.content.read(self.chunk_size) + # if not chunk: + # break + # data += chunk + + # return data.decode("utf-8") \ No newline at end of file