Merge pull request #3 from specklesystems/izzy/transports

🏄‍♀️ Server transport
This commit is contained in:
izzy lyseggen
2020-12-09 12:22:41 +00:00
committed by GitHub
4 changed files with 118 additions and 4 deletions
@@ -194,6 +194,13 @@ class BaseObjectSerializer:
Returns:
Base -- the base object with all its children attached
"""
# make sure an obj was passed and create dict if string was somehow passed
if not obj:
return
if isinstance(obj, str):
obj = json.loads(obj)
# initialise the base object
base = Base()
# get total children count
@@ -214,7 +221,12 @@ class BaseObjectSerializer:
# 2. handle referenced child objects
elif "referencedId" in value:
ref_hash = value["referencedId"]
ref_obj = json.loads(self.read_transport.get_object(id=ref_hash))
ref_obj_str = self.read_transport.get_object(id=ref_hash)
if not ref_obj_str:
raise SpeckleException(
f"Could not find the referenced child object of id `{ref_hash}` in the given read transport: {self.read_transport.name}"
)
ref_obj = json.loads(ref_obj_str)
base[prop] = self.recompose_base(obj=ref_obj)
# 3. handle all other cases (base objects, lists, and dicts)
+1 -1
View File
@@ -70,7 +70,7 @@ class AbstractTransport(Transport):
id {str} -- the hash of the object
Returns:
str -- the full string representation of the object (or null of no object is found)
str -- the full string representation of the object (or null if no object is found)
"""
pass
+2 -2
View File
@@ -1,4 +1,4 @@
import os
import json
from speckle.logging.exceptions import SpeckleException
from speckle.transports.abstract_transport import AbstractTransport
@@ -27,7 +27,7 @@ class MemoryTransport(AbstractTransport):
def get_object(self, id: str) -> str or None:
if id in self.objects:
return self.objects[id]
return json.dumps(self.objects[id])
else:
return None
+102
View File
@@ -0,0 +1,102 @@
import requests
from asyncio import Queue, Task
from typing import Dict, List
from speckle.api.client import SpeckleClient
from speckle.logging.exceptions import SpeckleException
from speckle.transports.abstract_transport import AbstractTransport
class ServerTransport(AbstractTransport):
_name = "RemoteTransport"
url: str = None
stream_id: str = None
saved_obj_count: int = 0
session: requests.Session = None
__queue: Queue = None
__workers: List[Task] = []
def __init__(self, client: SpeckleClient, stream_id: str) -> None:
# TODO: replace client with account or some other auth avenue
self.url = client.url
self.stream_id = stream_id
self.session = requests.Session()
self.session.headers.update(
{"Authorization": f"Bearer {client.me['token']}", "Accept": "text/plain"}
)
def begin_write(self) -> None:
self.saved_obj_count = 0
def end_write(self) -> None:
pass
# TODO: add save task to queue and process as the root is being deserialised
def save_object(self, id: str, serialized_object: str) -> None:
endpoint = f"{self.url}/objects/{self.stream_id}"
r = self.session.post(
url=endpoint,
files={"batch-1": ("batch-1", f"[{serialized_object}]")},
)
if r.status_code != 201:
raise SpeckleException(
message=f"Could not save the object to the server - status code {r.status_code}"
)
def save_object_from_transport(
self, id: str, source_transport: AbstractTransport
) -> None:
obj_string = source_transport.get_object(id=id)
self.save_object(id=id, serialized_object=obj_string)
def get_object(self, id: str) -> str:
# endpoint = f"{self.url}/objects/{self.stream_id}/{id}/single"
# r = self.session.get(endpoint, stream=True)
# _, obj = next(r.iter_lines().decode("utf-8")).split("\t")
# return obj
raise SpeckleException(
"Getting a single object using `ServerTransport.get_object()` is not implemented. To get an object from the server, please use the `SpeckleClient.object.get()` route",
NotImplementedError,
)
def copy_object_and_children(
self, id: str, target_transport: AbstractTransport
) -> str:
endpoint = f"{self.url}/objects/{self.stream_id}/{id}"
r = self.session.get(endpoint, stream=True)
if r.encoding is None:
r.encoding = "utf-8"
lines = r.iter_lines(decode_unicode=True)
# save first (root) obj for return
root_hash, root_obj = next(lines).split("\t")
target_transport.save_object(root_hash, root_obj)
# iter through returned objects saving them as we go
for line in lines:
if line:
hash, obj = line.split("\t")
target_transport.save_object(hash, obj)
return root_obj
# async def stream_res(self, endpoint: str) -> str:
# data = b""
# async with aiohttp.ClientSession() as session:
# session.headers.update(
# {
# "Authorization": f"{self.session.headers['Authorization']}",
# "Accept": "text/plain",
# }
# )
# async with session.get(endpoint) as res:
# while True:
# chunk = await res.content.read(self.chunk_size)
# if not chunk:
# break
# data += chunk
# return data.decode("utf-8")