Compare commits

..

6 Commits

Author SHA1 Message Date
izzy lyseggen 21b27e2f3b feat(metrics): add ids for unknown users (#194)
* feat(metrics): add `merge_ids` helper method

* fix(metrics): use alias ids instead

* fix(metrics): final cleanup for aliasing

* fix(metrics): lol jk scratch the aliasing
2022-06-28 12:23:19 +01:00
izzy lyseggen 69cd9706cf fix(base): remove default units to Base (#193)
* fix(base): remove default units to `Base`

change in sharp that wasn't propagated to py!

* fix(objects): add `None` to unit encodings
2022-06-22 17:03:50 +01:00
izzy lyseggen 98075fa2cf fix(metrics): remove unused prop (#192) 2022-06-22 14:52:49 +01:00
izzy lyseggen 782f70fb49 chore: drop python 3.6 and update ujson (#190)
* chore: depreciate python 3.6 support & upate ujson

after collection python version info metrics, we fount that only 2 users
are still using python 3.6. since it has been eol for 5 months now,
we believe it's safe to let it go.

rest easy 3.6 ⚰- you served us well 🫡

closes Please upgrade the ujson dependency, which has a CVE #160

* chore: upgrade and clean some deps
2022-06-20 12:19:09 +01:00
Gergő Jedlicska 52ab27e60f SQLite write batching (#188)
### SUMMARY
**sqlite transport**

This transport now batches and bulk inserts objects when writing resulting in huge performance improvements (100x).

**base object serializer**

Batching in the sqlite transport necessitated some refactoring here in order to safely call end_write when not using operations.send/receive. This has been resolved by turning traverse_base into a wrapper for _traverse_base which can take care of calling begin/end_write and resetting the writer at the top level. This is not breaking since the top level methods to call have not changed names and the original method has just been prepended with a _

Additionally, missing referenced child objects in the read transport used to raise a SpeckleException. However, using the gql client to call objects.get() will return an object with missing references by design thus throwing an error in serialization. This has been resolved by instead raising a SpeckleWarning when child objects can't be found and just returning the reference + id. ((this method of interacting with objects is discouraged so it is not surprising to me that this bug was lurking for so long - but an oopsie nonetheless!))

**ci / dev**

Updates for the ci config and the dev container to work with the recent changes in server.

NOTE: dev container seems to be pulling an older version of server -- not resolved yet

---

* quick and hacky sqlite batching

* feat(transports): batching sqlite inserts

* chore: upgrade gql3

also removed py-spy as it's not used and i was getting install errors :/

* ci: bump node version

* ci: formatting

* update CI versions

* update to new circleci redis baseimage

* update test fixture auth to non deprecated token based method

* add start and finish write method calls to base object serialize

* chore: dev container update

* fix(serialization): move end and begin write

* style: formatting

* fix(serializer): warn but don't throw if ref not found

this is _not_ an issue with the transports, but an issue with using the
graphql api to fetch objects. since you are only receiving one obj and none of
the children, the transport has no way to find them and should simply
return the reference as is. idk why anyone would really use `object.get`
so tbh i'm not surprised no one has found this bug yet lol

* fix(client): don't parse obj create response

* fix(serializer): wrap `traverse_base`

moving `begin` and `end_write` to the seriazlier due to the new
sqlite transport with batched writes necessitates a wrapper around
`traverse_base` so end/begin write can be called once at the top level.
just adding begin/end write to the original traversal method would make
tons of calls to `end_write` since the traversal is recursive

Co-authored-by: izzy lyseggen <izzy.lyseggen@gmail.com>
2022-06-20 12:00:09 +01:00
izzy lyseggen 61e7ebeabd fix(client): add faves count to mutation return (#186) 2022-04-25 11:45:54 +01:00
16 changed files with 720 additions and 782 deletions
+9 -8
View File
@@ -1,16 +1,16 @@
version: 2.1
orbs:
python: circleci/python@1.3.2
python: circleci/python@2.0.3
codecov: codecov/codecov@3.2.2
jobs:
test:
docker:
- image: "cimg/python:<<parameters.tag>>"
- image: 'cimg/node:14.18'
- image: 'circleci/redis:6'
- image: 'cimg/postgres:12.8'
- image: "cimg/node:16.15"
- image: "cimg/redis:6.2"
- image: "cimg/postgres:14.2"
environment:
POSTGRES_DB: speckle2_test
POSTGRES_PASSWORD: speckle
@@ -27,6 +27,7 @@ jobs:
STRATEGY_LOCAL: "true"
CANONICAL_URL: "http://localhost:3000"
WAIT_HOSTS: localhost:5432, localhost:6379
DISABLE_FILE_UPLOADS: "true"
parameters:
tag:
default: "3.8"
@@ -51,7 +52,7 @@ jobs:
deploy:
docker:
- image: "circleci/python:3.8"
- image: "cimg/python:3.8"
steps:
- checkout
- run: python patch_version.py $CIRCLE_TAG
@@ -60,17 +61,17 @@ jobs:
workflows:
main:
jobs:
jobs:
- test:
matrix:
parameters:
tag: ["3.6", "3.7", "3.8", "3.9"]
tag: ["3.7", "3.8", "3.9", "3.10"]
filters:
tags:
only: /.*/
- deploy:
requires:
- test
- test
filters:
tags:
only: /[0-9]+(\.[0-9]+)*/
+3 -2
View File
@@ -1,7 +1,7 @@
version: "3.3" # optional since v1.27.0
services:
postgres:
image: circleci/postgres:12
image: cimg/postgres:14.2
environment:
POSTGRES_DB: speckle2_test
POSTGRES_PASSWORD: speckle
@@ -10,7 +10,7 @@ services:
# - "5432:5432"
network_mode: host
redis:
image: circleci/redis:6
image: cimg/redis:6.2
# ports:
# - "6379:6379"
network_mode: host
@@ -27,6 +27,7 @@ services:
STRATEGY_LOCAL: "true"
CANONICAL_URL: "http://localhost:3000"
WAIT_HOSTS: localhost:5432, localhost:6379
DISABLE_FILE_UPLOADS: "true"
# ports:
# - "3000:3000"
network_mode: host
+6 -2
View File
@@ -4,6 +4,7 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
@@ -13,10 +14,13 @@
"justMyCode": false
},
{
"name": "Python: Test debug config",
"name": "Pytest",
"type": "python",
"request": "test",
"request": "launch",
"program": "poetry",
"args": ["run", "pytest"],
"console": "integratedTerminal",
"justMyCode": true
}
]
}
+63
View File
@@ -0,0 +1,63 @@
from typing import List
from specklepy.objects import Base
from specklepy.api import operations
from specklepy.transports.sqlite import SQLiteTransport
import time
from pathlib import Path
import os
import string
import random
class Sub(Base):
bar: List[str]
def random_string():
letters = string.ascii_lowercase
return "".join(random.choice(letters) for _ in range(10))
BASE_PATH = SQLiteTransport.get_base_path("Speckle")
def clean_db():
os.remove(Path(BASE_PATH, "Objects.db"))
def one_pass(clean: bool, randomize: bool, child_count: int):
foo = Base()
for i in range(child_count):
stuff = random_string() if randomize else "stuff"
foo[f"@child_{i}"] = Sub(bar=["asdf", "bar", i, stuff])
if clean:
clean_db()
transport = SQLiteTransport()
start = time.time()
hash = operations.send(base=foo, transports=[transport])
send_time = time.time() - start
receive_start = time.time()
operations.receive(hash, transport)
receive_time = time.time() - receive_start
return send_time, receive_time
if __name__ == "__main__":
sample_size = 4
test_permutations = [
(True, True),
(False, False),
(False, True),
(True, False),
]
for clean, randomize in test_permutations:
print(f"CLEAN: {clean}, RANDOMIZE: {randomize}")
for child_count in [10, 100, 1000, 10000]:
print(f"\tCHILD COUNT: {child_count}")
for _ in range(sample_size):
send_time, receive_time = one_pass(clean, randomize, child_count)
print(f"\t\tSend: {send_time} Receive: {receive_time}")
+2 -2
View File
@@ -50,10 +50,10 @@ if __name__ == "__main__":
)
# support for dynamic attributes
custom_sub.extra_extra = "what is this?"
debug(custom_sub.json())
debug(custom_sub)
serialized = operations.serialize(custom_sub)
deserialized = operations.deserialize(serialized)
# the only difference should be between the two data is that the deserialized
# instance id attribute is not None.
debug(deserialized.json())
debug(deserialized)
Generated
+531 -667
View File
File diff suppressed because it is too large Load Diff
+4 -3
View File
@@ -11,11 +11,11 @@ homepage = "https://speckle.systems/"
[tool.poetry.dependencies]
python = "^3.6.5"
python = ">=3.7.0, <4.0"
pydantic = "^1.8.2"
appdirs = "^1.4.4"
gql = {version = ">=3.0.0b1", extras = ["all"], allow-prereleases = true}
ujson = "^4.3.0"
gql = {extras = ["requests", "websockets"], version = "^3.3.0"}
ujson = "^5.3.0"
Deprecated = "^1.2.13"
[tool.poetry.dev-dependencies]
@@ -24,6 +24,7 @@ isort = "^5.7.0"
pytest = "^6.2.2"
pytest-ordering = "^0.6"
pytest-cov = "^3.0.0"
devtools = "^0.8.0"
[tool.black]
-5
View File
@@ -40,13 +40,8 @@ def send(
serializer = BaseObjectSerializer(write_transports=transports)
for t in transports:
t.begin_write()
obj_hash, _ = serializer.write_json(base=base)
for t in transports:
t.end_write()
return obj_hash
+3 -1
View File
@@ -52,7 +52,9 @@ class Resource(ResourceBase):
params = {"stream_id": stream_id, "object_id": object_id}
return self.make_request(
query=query, params=params, return_type=["stream", "object", "data"]
query=query,
params=params,
return_type=["stream", "object", "data"],
)
def create(self, stream_id: str, objects: List[Dict]) -> str:
+1
View File
@@ -327,6 +327,7 @@ class Resource(ResourceBase):
id
name
favoritedDate
favoritesCount
}
}
"""
+14 -10
View File
@@ -1,10 +1,13 @@
import socket
import sys
import queue
import hashlib
import getpass
import logging
import requests
import threading
import platform
import contextlib
"""
Anonymous telemetry to help us understand how to make a better Speckle.
@@ -12,7 +15,7 @@ This really helps us to deliver a better open source project and product!
"""
TRACK = True
HOST_APP = "python"
HOST_APP_VERSION = f"python {'.'.join(map(str, sys.version_info[:3]))}"
HOST_APP_VERSION = f"python {'.'.join(map(str, sys.version_info[:2]))}"
PLATFORMS = {"win32": "Windows", "cygwin": "Windows", "darwin": "Mac OS X"}
LOG = logging.getLogger(__name__)
@@ -75,8 +78,7 @@ def track(action: str, account: "Account" = None, custom_props: dict = None):
METRICS_TRACKER.queue.put_nowait(event_params)
except Exception as ex:
# wrapping this whole thing in a try except as we never want a failure here to annoy users!
LOG.error("Error queueing metrics request: " + str(ex))
LOG.error(f"Error queueing metrics request: {str(ex)}")
def initialise_tracker(account: "Account" = None):
global METRICS_TRACKER
@@ -101,8 +103,7 @@ class Singleton(type):
class MetricsTracker(metaclass=Singleton):
analytics_url = "https://analytics.speckle.systems/track?ip=1"
analytics_token = "acd87c5a50b56df91a795e999812a3a4"
user_ip = None
last_user = None
last_user = ""
last_server = None
platform = None
sending_thread = None
@@ -114,12 +115,15 @@ class MetricsTracker(metaclass=Singleton):
)
self.platform = PLATFORMS.get(sys.platform, "linux")
self.sending_thread.start()
self.user_ip = socket.gethostbyname(socket.gethostname())
with contextlib.suppress(Exception):
node, user = platform.node(), getpass.getuser()
if node and user:
self.last_user = f"@{self.hash(f'{node}-{user}')}"
def set_last_user(self, email: str):
if not email:
return
self.last_user = "@" + self.hash(email)
self.last_user = f"@{self.hash(email)}"
def set_last_server(self, server: str):
if not server:
@@ -137,6 +141,6 @@ class MetricsTracker(metaclass=Singleton):
try:
session.post(self.analytics_url, json=event_params)
except Exception as ex:
LOG.error("Error sending metrics request: " + str(ex))
LOG.error(f"Error sending metrics request: {str(ex)}")
self.queue.task_done()
self.queue.task_done()
+1 -1
View File
@@ -142,7 +142,7 @@ class Base(_RegisteringBase):
id: Optional[str] = None
totalChildrenCount: Optional[int] = None
applicationId: Optional[str] = None
_units: str = "m"
_units: str = None
# dict of chunkable props and their max chunk size
_chunkable: Dict[str, int] = {}
_chunk_size_default: int = 1000
+3 -4
View File
@@ -16,6 +16,7 @@ UNITS_STRINGS = {
}
UNITS_ENCODINGS = {
None: 0,
"none": 0,
"mm": 1,
"cm": 2,
@@ -58,7 +59,5 @@ def get_units_from_encoding(unit: int):
def get_encoding_from_units(unit: str):
try:
return UNITS_ENCODINGS[unit]
except KeyError:
raise SpeckleException(
message=f"No encoding exists for unit {unit}. Please enter a valid unit to encode (eg {UNITS_ENCODINGS})."
)
except KeyError as e:
raise SpeckleException(message=f"No encoding exists for unit {unit}. Please enter a valid unit to encode (eg {UNITS_ENCODINGS}).") from e
@@ -1,17 +1,19 @@
import re
import ujson
import hashlib
import re
import warnings
from uuid import uuid4
from enum import Enum
from warnings import warn
from typing import Any, Dict, List, Tuple
from specklepy.objects.base import Base, DataChunk
from specklepy.logging.exceptions import (
SerializationException,
SpeckleException,
SpeckleWarning,
)
from specklepy.transports.abstract_transport import AbstractTransport
# import for serialization
import specklepy.objects.geometry
import specklepy.objects.other
@@ -50,9 +52,16 @@ class BaseObjectSerializer:
self.read_transport = read_transport
def write_json(self, base: Base):
self.__reset_writer()
self.detach_lineage = [True]
"""Serializes a given base object into a json string
Arguments:
base {Base} -- the base object to be decomposed and serialized
Returns:
(str, str) -- a tuple containing the hash (id) of the base object and the serialized object string
"""
hash, obj = self.traverse_base(base)
return hash, ujson.dumps(obj)
def traverse_base(self, base: Base) -> Tuple[str, Dict]:
@@ -64,6 +73,21 @@ class BaseObjectSerializer:
Returns:
(str, dict) -- a tuple containing the hash (id) of the base object and the constructed serializable dictionary
"""
self.__reset_writer()
if self.write_transports:
for wt in self.write_transports:
wt.begin_write()
hash, obj = self._traverse_base(base)
if self.write_transports:
for wt in self.write_transports:
wt.end_write()
return hash, obj
def _traverse_base(self, base: Base) -> Tuple[str, Dict]:
if not self.detach_lineage:
self.detach_lineage = [True]
@@ -141,7 +165,7 @@ class BaseObjectSerializer:
chunk_refs = []
for c in chunks:
self.detach_lineage.append(detach)
ref_hash, _ = self.traverse_base(c)
ref_hash, _ = self._traverse_base(c)
ref_obj = self.detach_helper(ref_hash=ref_hash)
chunk_refs.append(ref_obj)
object_builder[prop] = chunk_refs
@@ -200,7 +224,7 @@ class BaseObjectSerializer:
for o in obj:
if isinstance(o, Base):
self.detach_lineage.append(detach)
hash, _ = self.traverse_base(o)
hash, _ = self._traverse_base(o)
detached_list.append(self.detach_helper(ref_hash=hash))
else:
detached_list.append(self.traverse_value(o, detach))
@@ -216,7 +240,7 @@ class BaseObjectSerializer:
elif isinstance(obj, Base):
self.detach_lineage.append(detach)
_, base_obj = self.traverse_base(obj)
_, base_obj = self._traverse_base(obj)
return base_obj
else:
@@ -255,7 +279,7 @@ class BaseObjectSerializer:
def __reset_writer(self) -> None:
"""Reinitializes the lineage, and other variables that get used during the json writing process"""
self.detach_lineage = []
self.detach_lineage = [True]
self.lineage = []
self.family_tree = {}
self.closure_table = {}
@@ -321,12 +345,15 @@ class BaseObjectSerializer:
elif "referencedId" in value:
ref_hash = value["referencedId"]
ref_obj_str = self.read_transport.get_object(id=ref_hash)
if not ref_obj_str:
raise SpeckleException(
f"Could not find the referenced child object of id `{ref_hash}` in the given read transport: {self.read_transport.name}"
if ref_obj_str:
ref_obj = safe_json_loads(ref_obj_str, ref_hash)
base.__setattr__(prop, self.recompose_base(obj=ref_obj))
else:
warnings.warn(
f"Could not find the referenced child object of id `{ref_hash}` in the given read transport: {self.read_transport.name}",
SpeckleWarning,
)
ref_obj = safe_json_loads(ref_obj_str, ref_hash)
base.__setattr__(prop, self.recompose_base(obj=ref_obj))
base.__setattr__(prop, self.handle_value(value))
# 3. handle all other cases (base objects, lists, and dicts)
else:
@@ -380,8 +407,10 @@ class BaseObjectSerializer:
ref_hash = obj["referencedId"]
ref_obj_str = self.read_transport.get_object(id=ref_hash)
if not ref_obj_str:
raise SpeckleException(
f"Could not find the referenced child object of id `{ref_hash}` in the given read transport: {self.read_transport.name}"
warnings.warn(
f"Could not find the referenced child object of id `{ref_hash}` in the given read transport: {self.read_transport.name}",
SpeckleWarning,
)
return obj
return safe_json_loads(ref_obj_str, ref_hash)
+35 -61
View File
@@ -3,7 +3,7 @@ import sys
import time
import sched
import sqlite3
from typing import Any, List, Dict
from typing import Any, List, Dict, Tuple
from appdirs import user_data_dir
from contextlib import closing
from specklepy.transports.abstract_transport import AbstractTransport
@@ -14,25 +14,29 @@ class SQLiteTransport(AbstractTransport):
_name = "SQLite"
_base_path: str = None
_root_path: str = None
_is_writing: bool = False
_scheduler = sched.scheduler(time.time, time.sleep)
_polling_interval = 0.5 # seconds
__connection: sqlite3.Connection = None
app_name: str = ""
scope: str = ""
saved_obj_count: int = 0
max_size: int = None
_current_batch: List[Tuple[str, str]] = None
_current_batch_size: int = None
def __init__(
self,
base_path: str = None,
app_name: str = None,
scope: str = None,
max_batch_size_mb: float = 10.0,
**data: Any,
) -> None:
super().__init__(**data)
self.app_name = app_name or "Speckle"
self.scope = scope or "Objects"
self._base_path = base_path or self.get_base_path(self.app_name)
self.max_size = int(max_batch_size_mb * 1000 * 1000)
self._current_batch = []
self._current_batch_size = 0
try:
os.makedirs(self._base_path, exist_ok=True)
@@ -50,12 +54,6 @@ class SQLiteTransport(AbstractTransport):
def __repr__(self) -> str:
return f"SQLiteTransport(app: '{self.app_name}', scope: '{self.scope}')"
# def __write_timer_elapsed(self):
# print("WRITE TIMER ELAPSED")
# proc = Process(target=_run_queue, args=(self.__queue, self._root_path))
# proc.start()
# proc.join()
@staticmethod
def get_base_path(app_name):
# from appdirs https://github.com/ActiveState/appdirs/blob/master/appdirs.py
@@ -74,36 +72,6 @@ class SQLiteTransport(AbstractTransport):
path = os.path.expanduser("~/.config/")
return os.path.join(path, app_name)
# def __consume_queue(self):
# if self._is_writing or self.__queue.empty():
# return
# print("CONSUME QUEUE")
# self._is_writing = True
# while not self.__queue.empty():
# data = self.__queue.get()
# self.save_object(data[0], data[1])
# self._is_writing = False
# self._scheduler.enter(
# delay=self._polling_interval, priority=1, action=self.__consume_queue
# )
# self._scheduler.run(blocking=True)
# def save_object(self, id: str, serialized_object: str) -> None:
# """Adds an object to the queue and schedules it to be saved.
# Arguments:
# id {str} -- the object id
# serialized_object {str} -- the full string representation of the object
# """
# print("SAVE OBJECT")
# self.__queue.put((id, serialized_object))
# self._scheduler.enter(
# delay=self._polling_interval, priority=1, action=self.__consume_queue
# )
# self._scheduler.run(blocking=True)
def save_object_from_transport(
self, id: str, source_transport: AbstractTransport
) -> None:
@@ -117,23 +85,41 @@ class SQLiteTransport(AbstractTransport):
self.save_object(id, serialized_object)
def save_object(self, id: str, serialized_object: str) -> None:
"""Directly saves an object into the database.
"""
Adds an object to the current batch to be written to the db. If the current batch is full,
the batch is written to the db and the current batch is reset.
Arguments:
id {str} -- the object id
serialized_object {str} -- the full string representation of the object
"""
obj_size = len(serialized_object)
if (
not self._current_batch
or self._current_batch_size + obj_size < self.max_size
):
self._current_batch.append((id, serialized_object))
self._current_batch_size += obj_size
return
self.save_current_batch()
self._current_batch = [(id, serialized_object)]
self._current_batch_size = obj_size
def save_current_batch(self) -> None:
"""Save the current batch of objects to the local db"""
self.__check_connection()
try:
with closing(self.__connection.cursor()) as c:
c.execute(
c.executemany(
"INSERT OR IGNORE INTO objects(hash, content) VALUES(?,?)",
(id, serialized_object),
self._current_batch,
)
self.__connection.commit()
except Exception as ex:
raise SpeckleException(
f"Could not save the object to the local db. Inner exception: {ex}", ex
f"Could not save the batch of objects to the local db. Inner exception: {ex}",
ex,
)
def get_object(self, id: str) -> str or None:
@@ -156,10 +142,14 @@ class SQLiteTransport(AbstractTransport):
return ret
def begin_write(self):
self._object_cache = []
self.saved_obj_count = 0
def end_write(self):
pass
if self._current_batch:
self.save_current_batch()
self._current_batch = []
self._current_batch_size = 0
def copy_object_and_children(
self, id: str, target_transport: AbstractTransport
@@ -199,19 +189,3 @@ class SQLiteTransport(AbstractTransport):
def __del__(self):
self.__connection.close()
# def _run_queue(queue: Queue, root_path: str):
# if queue.empty():
# return
# print("RUN QUEUE")
# conn = sqlite3.connect(root_path)
# while not queue.empty():
# data = queue.get()
# with closing(conn.cursor()) as c:
# c.execute(
# "INSERT OR IGNORE INTO objects(hash, content) VALUES(?,?)",
# (data[0], data[1]),
# )
# conn.commit()
# conn.close()
+1 -1
View File
@@ -61,7 +61,7 @@ def second_user_dict(host):
@pytest.fixture(scope="session")
def client(host, user_dict):
client = SpeckleClient(host=host, use_ssl=False)
client.authenticate(user_dict["token"])
client.authenticate_with_token(user_dict["token"])
return client