Compare commits

...

8 Commits

Author SHA1 Message Date
izzy lyseggen 782f70fb49 chore: drop python 3.6 and update ujson (#190)
* chore: depreciate python 3.6 support & upate ujson

after collection python version info metrics, we fount that only 2 users
are still using python 3.6. since it has been eol for 5 months now,
we believe it's safe to let it go.

rest easy 3.6 ⚰- you served us well 🫡

closes Please upgrade the ujson dependency, which has a CVE #160

* chore: upgrade and clean some deps
2022-06-20 12:19:09 +01:00
Gergő Jedlicska 52ab27e60f SQLite write batching (#188)
### SUMMARY
**sqlite transport**

This transport now batches and bulk inserts objects when writing resulting in huge performance improvements (100x).

**base object serializer**

Batching in the sqlite transport necessitated some refactoring here in order to safely call end_write when not using operations.send/receive. This has been resolved by turning traverse_base into a wrapper for _traverse_base which can take care of calling begin/end_write and resetting the writer at the top level. This is not breaking since the top level methods to call have not changed names and the original method has just been prepended with a _

Additionally, missing referenced child objects in the read transport used to raise a SpeckleException. However, using the gql client to call objects.get() will return an object with missing references by design thus throwing an error in serialization. This has been resolved by instead raising a SpeckleWarning when child objects can't be found and just returning the reference + id. ((this method of interacting with objects is discouraged so it is not surprising to me that this bug was lurking for so long - but an oopsie nonetheless!))

**ci / dev**

Updates for the ci config and the dev container to work with the recent changes in server.

NOTE: dev container seems to be pulling an older version of server -- not resolved yet

---

* quick and hacky sqlite batching

* feat(transports): batching sqlite inserts

* chore: upgrade gql3

also removed py-spy as it's not used and i was getting install errors :/

* ci: bump node version

* ci: formatting

* update CI versions

* update to new circleci redis baseimage

* update test fixture auth to non deprecated token based method

* add start and finish write method calls to base object serialize

* chore: dev container update

* fix(serialization): move end and begin write

* style: formatting

* fix(serializer): warn but don't throw if ref not found

this is _not_ an issue with the transports, but an issue with using the
graphql api to fetch objects. since you are only receiving one obj and none of
the children, the transport has no way to find them and should simply
return the reference as is. idk why anyone would really use `object.get`
so tbh i'm not surprised no one has found this bug yet lol

* fix(client): don't parse obj create response

* fix(serializer): wrap `traverse_base`

moving `begin` and `end_write` to the seriazlier due to the new
sqlite transport with batched writes necessitates a wrapper around
`traverse_base` so end/begin write can be called once at the top level.
just adding begin/end write to the original traversal method would make
tons of calls to `end_write` since the traversal is recursive

Co-authored-by: izzy lyseggen <izzy.lyseggen@gmail.com>
2022-06-20 12:00:09 +01:00
izzy lyseggen 61e7ebeabd fix(client): add faves count to mutation return (#186) 2022-04-25 11:45:54 +01:00
izzy lyseggen 3a8121c306 feat(client): stream model update + favoriting (#185)
* feat(models): a quickie lil update

- adds fave and comment count
(mutations to come)
- adds source app to commit within stream query

* feat(client): add favorite mutation
2022-04-25 11:32:24 +01:00
izzy lyseggen 209a95879f fix(metrics): patch app version check 2022-04-22 11:46:27 +01:00
izzy lyseggen 4f829d9908 feat(metrics): some cleanup and updates (#184)
- add metrics for client init / auth
- add server metrics
- remove incompatible server check in client
(at this point, it's been long enough that I think it's fine and will
save time on request / esp in places like blender)
2022-04-22 11:26:28 +01:00
luzpaz ac5345f528 Fix various typos (#181) 2022-04-21 17:56:11 +01:00
izzy lyseggen 1142481d89 fix(geometry): int(index vals) for curve encoding (#183)
* fix(geometry): int(index vals) for curve encoding

* fix(client): update poss invalid token check

server now returns `None` instead of a `GraphqlExcetion` when asking for
the user with an invalid token (or no scopes token)
2022-04-21 17:50:43 +01:00
22 changed files with 800 additions and 797 deletions
+9 -8
View File
@@ -1,16 +1,16 @@
version: 2.1
orbs:
python: circleci/python@1.3.2
python: circleci/python@2.0.3
codecov: codecov/codecov@3.2.2
jobs:
test:
docker:
- image: "cimg/python:<<parameters.tag>>"
- image: 'cimg/node:14.18'
- image: 'circleci/redis:6'
- image: 'cimg/postgres:12.8'
- image: "cimg/node:16.15"
- image: "cimg/redis:6.2"
- image: "cimg/postgres:14.2"
environment:
POSTGRES_DB: speckle2_test
POSTGRES_PASSWORD: speckle
@@ -27,6 +27,7 @@ jobs:
STRATEGY_LOCAL: "true"
CANONICAL_URL: "http://localhost:3000"
WAIT_HOSTS: localhost:5432, localhost:6379
DISABLE_FILE_UPLOADS: "true"
parameters:
tag:
default: "3.8"
@@ -51,7 +52,7 @@ jobs:
deploy:
docker:
- image: "circleci/python:3.8"
- image: "cimg/python:3.8"
steps:
- checkout
- run: python patch_version.py $CIRCLE_TAG
@@ -60,17 +61,17 @@ jobs:
workflows:
main:
jobs:
jobs:
- test:
matrix:
parameters:
tag: ["3.6", "3.7", "3.8", "3.9"]
tag: ["3.7", "3.8", "3.9", "3.10"]
filters:
tags:
only: /.*/
- deploy:
requires:
- test
- test
filters:
tags:
only: /[0-9]+(\.[0-9]+)*/
+3 -2
View File
@@ -1,7 +1,7 @@
version: "3.3" # optional since v1.27.0
services:
postgres:
image: circleci/postgres:12
image: cimg/postgres:14.2
environment:
POSTGRES_DB: speckle2_test
POSTGRES_PASSWORD: speckle
@@ -10,7 +10,7 @@ services:
# - "5432:5432"
network_mode: host
redis:
image: circleci/redis:6
image: cimg/redis:6.2
# ports:
# - "6379:6379"
network_mode: host
@@ -27,6 +27,7 @@ services:
STRATEGY_LOCAL: "true"
CANONICAL_URL: "http://localhost:3000"
WAIT_HOSTS: localhost:5432, localhost:6379
DISABLE_FILE_UPLOADS: "true"
# ports:
# - "3000:3000"
network_mode: host
+6 -2
View File
@@ -4,6 +4,7 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
@@ -13,10 +14,13 @@
"justMyCode": false
},
{
"name": "Python: Test debug config",
"name": "Pytest",
"type": "python",
"request": "test",
"request": "launch",
"program": "poetry",
"args": ["run", "pytest"],
"console": "integratedTerminal",
"justMyCode": true
}
]
}
+63
View File
@@ -0,0 +1,63 @@
from typing import List
from specklepy.objects import Base
from specklepy.api import operations
from specklepy.transports.sqlite import SQLiteTransport
import time
from pathlib import Path
import os
import string
import random
class Sub(Base):
bar: List[str]
def random_string():
letters = string.ascii_lowercase
return "".join(random.choice(letters) for _ in range(10))
BASE_PATH = SQLiteTransport.get_base_path("Speckle")
def clean_db():
os.remove(Path(BASE_PATH, "Objects.db"))
def one_pass(clean: bool, randomize: bool, child_count: int):
foo = Base()
for i in range(child_count):
stuff = random_string() if randomize else "stuff"
foo[f"@child_{i}"] = Sub(bar=["asdf", "bar", i, stuff])
if clean:
clean_db()
transport = SQLiteTransport()
start = time.time()
hash = operations.send(base=foo, transports=[transport])
send_time = time.time() - start
receive_start = time.time()
operations.receive(hash, transport)
receive_time = time.time() - receive_start
return send_time, receive_time
if __name__ == "__main__":
sample_size = 4
test_permutations = [
(True, True),
(False, False),
(False, True),
(True, False),
]
for clean, randomize in test_permutations:
print(f"CLEAN: {clean}, RANDOMIZE: {randomize}")
for child_count in [10, 100, 1000, 10000]:
print(f"\tCHILD COUNT: {child_count}")
for _ in range(sample_size):
send_time, receive_time = one_pass(clean, randomize, child_count)
print(f"\t\tSend: {send_time} Receive: {receive_time}")
+2 -2
View File
@@ -50,10 +50,10 @@ if __name__ == "__main__":
)
# support for dynamic attributes
custom_sub.extra_extra = "what is this?"
debug(custom_sub.json())
debug(custom_sub)
serialized = operations.serialize(custom_sub)
deserialized = operations.deserialize(serialized)
# the only difference should be between the two data is that the deserialized
# instance id attribute is not None.
debug(deserialized.json())
debug(deserialized)
Generated
+531 -667
View File
File diff suppressed because it is too large Load Diff
+4 -3
View File
@@ -11,11 +11,11 @@ homepage = "https://speckle.systems/"
[tool.poetry.dependencies]
python = "^3.6.5"
python = ">=3.7.0, <4.0"
pydantic = "^1.8.2"
appdirs = "^1.4.4"
gql = {version = ">=3.0.0b1", extras = ["all"], allow-prereleases = true}
ujson = "^4.3.0"
gql = {extras = ["requests", "websockets"], version = "^3.3.0"}
ujson = "^5.3.0"
Deprecated = "^1.2.13"
[tool.poetry.dev-dependencies]
@@ -24,6 +24,7 @@ isort = "^5.7.0"
pytest = "^6.2.2"
pytest-ordering = "^0.6"
pytest-cov = "^3.0.0"
devtools = "^0.8.0"
[tool.black]
+17 -11
View File
@@ -2,8 +2,8 @@ import re
from warnings import warn
from deprecated import deprecated
from specklepy.api.credentials import Account, get_account_from_token
from specklepy.logging import metrics
from specklepy.logging.exceptions import (
GraphQLException,
SpeckleException,
SpeckleWarning,
)
@@ -57,6 +57,7 @@ class SpeckleClient:
USE_SSL = True
def __init__(self, host: str = DEFAULT_HOST, use_ssl: bool = USE_SSL) -> None:
metrics.track(metrics.CLIENT, custom_props={"name": "create"})
ws_protocol = "ws"
http_protocol = "http"
@@ -79,15 +80,17 @@ class SpeckleClient:
self._init_resources()
# Check compatibility with the server
try:
serverInfo = self.server.get()
if isinstance(serverInfo, Exception):
raise serverInfo
if not isinstance(serverInfo, ServerInfo):
raise Exception("Couldn't get ServerInfo")
except Exception as ex:
raise SpeckleException(f"{self.url} is not a compatible Speckle Server", ex)
# ? Check compatibility with the server - i think we can skip this at this point? save a request
# try:
# server_info = self.server.get()
# if isinstance(server_info, Exception):
# raise server_info
# if not isinstance(server_info, ServerInfo):
# raise Exception("Couldn't get ServerInfo")
# except Exception as ex:
# raise SpeckleException(
# f"{self.url} is not a compatible Speckle Server", ex
# ) from ex
def __repr__(self):
return f"SpeckleClient( server: {self.url}, authenticated: {self.account.token is not None} )"
@@ -114,6 +117,7 @@ class SpeckleClient:
token {str} -- an api token
"""
self.account = get_account_from_token(token, self.url)
metrics.track(metrics.CLIENT, self.account, {"name": "authenticate with token"})
self._set_up_client()
def authenticate_with_account(self, account: Account) -> None:
@@ -123,10 +127,12 @@ class SpeckleClient:
Arguments:
account {Account} -- the account object which can be found with `get_default_account` or `get_local_accounts`
"""
metrics.track(metrics.CLIENT, account, {"name": "authenticate with account"})
self.account = account
self._set_up_client()
def _set_up_client(self) -> None:
metrics.track(metrics.CLIENT, self.account, {"name": "set up client"})
headers = {
"Authorization": f"Bearer {self.account.token}",
"Content-Type": "application/json",
@@ -143,7 +149,7 @@ class SpeckleClient:
self._init_resources()
if isinstance(self.user.get(), GraphQLException):
if self.user.get() is None:
warn(
SpeckleWarning(
f"Possibly invalid token - could not authenticate Speckle Client for server {self.url}"
+5 -1
View File
@@ -66,14 +66,18 @@ class Branches(BaseModel):
class Stream(BaseModel):
id: Optional[str]
name: Optional[str]
description: Optional[str]
role: Optional[str]
isPublic: Optional[bool]
description: Optional[str]
createdAt: Optional[datetime]
updatedAt: Optional[datetime]
collaborators: List[Collaborator] = []
branches: Optional[Branches]
commit: Optional[Commit]
object: Optional[Object]
commentCount: Optional[int]
favoritedDate: Optional[datetime]
favoritesCount: Optional[int]
def __repr__(self):
return f"Stream( id: {self.id}, name: {self.name}, description: {self.description}, isPublic: {self.isPublic})"
+1 -6
View File
@@ -40,13 +40,8 @@ def send(
serializer = BaseObjectSerializer(write_transports=transports)
for t in transports:
t.begin_write()
obj_hash, _ = serializer.write_json(base=base)
for t in transports:
t.end_write()
return obj_hash
@@ -72,7 +67,7 @@ def receive(
serializer = BaseObjectSerializer(read_transport=local_transport)
# try local transport first. if the parent is there, we assume all the children are there and continue wth deserialisation using the local transport
# try local transport first. if the parent is there, we assume all the children are there and continue with deserialisation using the local transport
obj_string = local_transport.get_object(obj_id)
if obj_string:
return serializer.read_json(obj_string=obj_string)
+1 -1
View File
@@ -164,7 +164,7 @@ class Resource(ResourceBase):
description {str} -- optional: the updated branch description
Returns:
bool -- True if update is successfull
bool -- True if update is successful
"""
metrics.track(metrics.BRANCH, self.account, {"name": "update"})
query = gql(
+3 -1
View File
@@ -52,7 +52,9 @@ class Resource(ResourceBase):
params = {"stream_id": stream_id, "object_id": object_id}
return self.make_request(
query=query, params=params, return_type=["stream", "object", "data"]
query=query,
params=params,
return_type=["stream", "object", "data"],
)
def create(self, stream_id: str, objects: List[Dict]) -> str:
+5
View File
@@ -2,6 +2,7 @@ from typing import Dict, List
from gql import gql
from specklepy.api.models import ServerInfo
from specklepy.api.resource import ResourceBase
from specklepy.logging import metrics
NAME = "server"
@@ -26,6 +27,7 @@ class Resource(ResourceBase):
Returns:
dict -- the server info in dictionary form
"""
metrics.track(metrics.SERVER, self.account, {"name": "get"})
query = gql(
"""
query Server {
@@ -65,6 +67,7 @@ class Resource(ResourceBase):
Returns:
dict -- a dictionary of apps registered on the server
"""
metrics.track(metrics.SERVER, self.account, {"name": "apps"})
query = gql(
"""
query Apps {
@@ -98,6 +101,7 @@ class Resource(ResourceBase):
Returns:
str -- the new API token. note: this is the only time you'll see the token!
"""
metrics.track(metrics.SERVER, self.account, {"name": "create_token"})
query = gql(
"""
mutation TokenCreate($token: ApiTokenCreateInput!) {
@@ -123,6 +127,7 @@ class Resource(ResourceBase):
Returns:
bool -- True if the token was successfully deleted
"""
metrics.track(metrics.SERVER, self.account, {"name": "revoke_token"})
query = gql(
"""
mutation TokenRevoke($token: String!) {
+47 -6
View File
@@ -43,10 +43,13 @@ class Resource(ResourceBase):
stream(id: $id) {
id
name
role
description
isPublic
createdAt
updatedAt
commentCount
favoritesCount
collaborators {
id
name
@@ -65,11 +68,12 @@ class Resource(ResourceBase):
cursor
items {
id
referencedObject
message
authorName
authorId
createdAt
authorName
referencedObject
sourceApplication
}
}
}
@@ -98,11 +102,11 @@ class Resource(ResourceBase):
query User($stream_limit: Int!) {
user {
id
email
name
bio
company
name
email
avatar
company
verified
profiles
role
@@ -112,10 +116,13 @@ class Resource(ResourceBase):
items {
id
name
description
role
isPublic
createdAt
updatedAt
description
commentCount
favoritesCount
collaborators {
id
name
@@ -253,6 +260,7 @@ class Resource(ResourceBase):
items {
id
name
role
description
isPublic
createdAt
@@ -301,6 +309,39 @@ class Resource(ResourceBase):
query=query, params=params, return_type=["streams", "items"]
)
def favorite(self, stream_id: str, favorited: bool = True):
"""Favorite or unfavorite the given stream.
Arguments:
stream_id {str} -- the id of the stream to favorite / unfavorite
favorited {bool} -- whether to favorite (True) or unfavorite (False) the stream
Returns:
Stream -- the stream with its `id`, `name`, and `favoritedDate`
"""
metrics.track(metrics.STREAM, self.account, {"name": "favorite"})
query = gql(
"""
mutation StreamFavorite($stream_id: String!, $favorited: Boolean!) {
streamFavorite(streamId: $stream_id, favorited: $favorited) {
id
name
favoritedDate
favoritesCount
}
}
"""
)
params = {
"stream_id": stream_id,
"favorited": favorited,
}
return self.make_request(
query=query, params=params, return_type=["streamFavorite"]
)
def grant_permission(self, stream_id: str, user_id: str, role: str):
"""Grant permissions to a user on a given stream
+1 -1
View File
@@ -445,7 +445,7 @@ input ServerInfoUpdateInput {
stream( id: String! ): Stream
"""
All the streams of the current user, pass in the `query` parameter to seach by name, description or ID.
All the streams of the current user, pass in the `query` parameter to search by name, description or ID.
"""
streams( query: String, limit: Int = 25, cursor: String ): StreamCollection
@hasScope(scope: "streams:read")
+7 -4
View File
@@ -1,5 +1,3 @@
import json
import os
import socket
import sys
import queue
@@ -14,6 +12,7 @@ This really helps us to deliver a better open source project and product!
"""
TRACK = True
HOST_APP = "python"
HOST_APP_VERSION = f"python {'.'.join(map(str, sys.version_info[:3]))}"
PLATFORMS = {"win32": "Windows", "cygwin": "Windows", "darwin": "Mac OS X"}
LOG = logging.getLogger(__name__)
@@ -27,6 +26,8 @@ PERMISSION = "Permission Action"
COMMIT = "Commit Action"
BRANCH = "Branch Action"
USER = "User Action"
SERVER = "Server Action"
CLIENT = "Speckle Client"
STREAM_WRAPPER = "Stream Wrapper"
ACCOUNTS = "Get Local Accounts"
@@ -45,9 +46,10 @@ def enable():
TRACK = True
def set_host_app(host_app: str):
global HOST_APP
def set_host_app(host_app: str, host_app_version: str = None):
global HOST_APP, HOST_APP_VERSION
HOST_APP = host_app
HOST_APP_VERSION = host_app_version or HOST_APP_VERSION
def track(action: str, account: "Account" = None, custom_props: dict = None):
@@ -62,6 +64,7 @@ def track(action: str, account: "Account" = None, custom_props: dict = None):
"server_id": METRICS_TRACKER.last_server,
"token": METRICS_TRACKER.analytics_token,
"hostApp": HOST_APP,
"hostAppVersion": HOST_APP_VERSION,
"$os": METRICS_TRACKER.platform,
"type": "action",
},
+1 -1
View File
@@ -74,7 +74,7 @@ class ObjectArray:
index = 0
while index < len(data):
item_length = data[index]
item_length = int(data[index])
item_start = index + 1
item_end = item_start + item_length
item_data = data[item_start:item_end]
+4 -4
View File
@@ -293,9 +293,9 @@ class Curve(
@classmethod
def from_list(cls, args: List[Any]) -> "Curve":
point_count = args[7]
weights_count = args[8]
knots_count = args[9]
point_count = int(args[7])
weights_count = int(args[8])
knots_count = int(args[9])
points_start = 10
weights_start = 10 + point_count
@@ -303,7 +303,7 @@ class Curve(
knots_end = knots_start + knots_count
return cls(
degree=args[1],
degree=int(args[1]),
periodic=bool(args[2]),
rational=bool(args[3]),
closed=bool(args[4]),
@@ -1,17 +1,19 @@
import re
import ujson
import hashlib
import re
import warnings
from uuid import uuid4
from enum import Enum
from warnings import warn
from typing import Any, Dict, List, Tuple
from specklepy.objects.base import Base, DataChunk
from specklepy.logging.exceptions import (
SerializationException,
SpeckleException,
SpeckleWarning,
)
from specklepy.transports.abstract_transport import AbstractTransport
# import for serialization
import specklepy.objects.geometry
import specklepy.objects.other
@@ -50,9 +52,16 @@ class BaseObjectSerializer:
self.read_transport = read_transport
def write_json(self, base: Base):
self.__reset_writer()
self.detach_lineage = [True]
"""Serializes a given base object into a json string
Arguments:
base {Base} -- the base object to be decomposed and serialized
Returns:
(str, str) -- a tuple containing the hash (id) of the base object and the serialized object string
"""
hash, obj = self.traverse_base(base)
return hash, ujson.dumps(obj)
def traverse_base(self, base: Base) -> Tuple[str, Dict]:
@@ -64,6 +73,21 @@ class BaseObjectSerializer:
Returns:
(str, dict) -- a tuple containing the hash (id) of the base object and the constructed serializable dictionary
"""
self.__reset_writer()
if self.write_transports:
for wt in self.write_transports:
wt.begin_write()
hash, obj = self._traverse_base(base)
if self.write_transports:
for wt in self.write_transports:
wt.end_write()
return hash, obj
def _traverse_base(self, base: Base) -> Tuple[str, Dict]:
if not self.detach_lineage:
self.detach_lineage = [True]
@@ -141,7 +165,7 @@ class BaseObjectSerializer:
chunk_refs = []
for c in chunks:
self.detach_lineage.append(detach)
ref_hash, _ = self.traverse_base(c)
ref_hash, _ = self._traverse_base(c)
ref_obj = self.detach_helper(ref_hash=ref_hash)
chunk_refs.append(ref_obj)
object_builder[prop] = chunk_refs
@@ -200,7 +224,7 @@ class BaseObjectSerializer:
for o in obj:
if isinstance(o, Base):
self.detach_lineage.append(detach)
hash, _ = self.traverse_base(o)
hash, _ = self._traverse_base(o)
detached_list.append(self.detach_helper(ref_hash=hash))
else:
detached_list.append(self.traverse_value(o, detach))
@@ -216,7 +240,7 @@ class BaseObjectSerializer:
elif isinstance(obj, Base):
self.detach_lineage.append(detach)
_, base_obj = self.traverse_base(obj)
_, base_obj = self._traverse_base(obj)
return base_obj
else:
@@ -255,7 +279,7 @@ class BaseObjectSerializer:
def __reset_writer(self) -> None:
"""Reinitializes the lineage, and other variables that get used during the json writing process"""
self.detach_lineage = []
self.detach_lineage = [True]
self.lineage = []
self.family_tree = {}
self.closure_table = {}
@@ -321,12 +345,15 @@ class BaseObjectSerializer:
elif "referencedId" in value:
ref_hash = value["referencedId"]
ref_obj_str = self.read_transport.get_object(id=ref_hash)
if not ref_obj_str:
raise SpeckleException(
f"Could not find the referenced child object of id `{ref_hash}` in the given read transport: {self.read_transport.name}"
if ref_obj_str:
ref_obj = safe_json_loads(ref_obj_str, ref_hash)
base.__setattr__(prop, self.recompose_base(obj=ref_obj))
else:
warnings.warn(
f"Could not find the referenced child object of id `{ref_hash}` in the given read transport: {self.read_transport.name}",
SpeckleWarning,
)
ref_obj = safe_json_loads(ref_obj_str, ref_hash)
base.__setattr__(prop, self.recompose_base(obj=ref_obj))
base.__setattr__(prop, self.handle_value(value))
# 3. handle all other cases (base objects, lists, and dicts)
else:
@@ -380,8 +407,10 @@ class BaseObjectSerializer:
ref_hash = obj["referencedId"]
ref_obj_str = self.read_transport.get_object(id=ref_hash)
if not ref_obj_str:
raise SpeckleException(
f"Could not find the referenced child object of id `{ref_hash}` in the given read transport: {self.read_transport.name}"
warnings.warn(
f"Could not find the referenced child object of id `{ref_hash}` in the given read transport: {self.read_transport.name}",
SpeckleWarning,
)
return obj
return safe_json_loads(ref_obj_str, ref_hash)
+35 -61
View File
@@ -3,7 +3,7 @@ import sys
import time
import sched
import sqlite3
from typing import Any, List, Dict
from typing import Any, List, Dict, Tuple
from appdirs import user_data_dir
from contextlib import closing
from specklepy.transports.abstract_transport import AbstractTransport
@@ -14,25 +14,29 @@ class SQLiteTransport(AbstractTransport):
_name = "SQLite"
_base_path: str = None
_root_path: str = None
_is_writing: bool = False
_scheduler = sched.scheduler(time.time, time.sleep)
_polling_interval = 0.5 # seconds
__connection: sqlite3.Connection = None
app_name: str = ""
scope: str = ""
saved_obj_count: int = 0
max_size: int = None
_current_batch: List[Tuple[str, str]] = None
_current_batch_size: int = None
def __init__(
self,
base_path: str = None,
app_name: str = None,
scope: str = None,
max_batch_size_mb: float = 10.0,
**data: Any,
) -> None:
super().__init__(**data)
self.app_name = app_name or "Speckle"
self.scope = scope or "Objects"
self._base_path = base_path or self.get_base_path(self.app_name)
self.max_size = int(max_batch_size_mb * 1000 * 1000)
self._current_batch = []
self._current_batch_size = 0
try:
os.makedirs(self._base_path, exist_ok=True)
@@ -50,12 +54,6 @@ class SQLiteTransport(AbstractTransport):
def __repr__(self) -> str:
return f"SQLiteTransport(app: '{self.app_name}', scope: '{self.scope}')"
# def __write_timer_elapsed(self):
# print("WRITE TIMER ELAPSED")
# proc = Process(target=_run_queue, args=(self.__queue, self._root_path))
# proc.start()
# proc.join()
@staticmethod
def get_base_path(app_name):
# from appdirs https://github.com/ActiveState/appdirs/blob/master/appdirs.py
@@ -74,36 +72,6 @@ class SQLiteTransport(AbstractTransport):
path = os.path.expanduser("~/.config/")
return os.path.join(path, app_name)
# def __consume_queue(self):
# if self._is_writing or self.__queue.empty():
# return
# print("CONSUME QUEUE")
# self._is_writing = True
# while not self.__queue.empty():
# data = self.__queue.get()
# self.save_object(data[0], data[1])
# self._is_writing = False
# self._scheduler.enter(
# delay=self._polling_interval, priority=1, action=self.__consume_queue
# )
# self._scheduler.run(blocking=True)
# def save_object(self, id: str, serialized_object: str) -> None:
# """Adds an object to the queue and schedules it to be saved.
# Arguments:
# id {str} -- the object id
# serialized_object {str} -- the full string representation of the object
# """
# print("SAVE OBJECT")
# self.__queue.put((id, serialized_object))
# self._scheduler.enter(
# delay=self._polling_interval, priority=1, action=self.__consume_queue
# )
# self._scheduler.run(blocking=True)
def save_object_from_transport(
self, id: str, source_transport: AbstractTransport
) -> None:
@@ -117,23 +85,41 @@ class SQLiteTransport(AbstractTransport):
self.save_object(id, serialized_object)
def save_object(self, id: str, serialized_object: str) -> None:
"""Directly saves an object into the database.
"""
Adds an object to the current batch to be written to the db. If the current batch is full,
the batch is written to the db and the current batch is reset.
Arguments:
id {str} -- the object id
serialized_object {str} -- the full string representation of the object
"""
obj_size = len(serialized_object)
if (
not self._current_batch
or self._current_batch_size + obj_size < self.max_size
):
self._current_batch.append((id, serialized_object))
self._current_batch_size += obj_size
return
self.save_current_batch()
self._current_batch = [(id, serialized_object)]
self._current_batch_size = obj_size
def save_current_batch(self) -> None:
"""Save the current batch of objects to the local db"""
self.__check_connection()
try:
with closing(self.__connection.cursor()) as c:
c.execute(
c.executemany(
"INSERT OR IGNORE INTO objects(hash, content) VALUES(?,?)",
(id, serialized_object),
self._current_batch,
)
self.__connection.commit()
except Exception as ex:
raise SpeckleException(
f"Could not save the object to the local db. Inner exception: {ex}", ex
f"Could not save the batch of objects to the local db. Inner exception: {ex}",
ex,
)
def get_object(self, id: str) -> str or None:
@@ -156,10 +142,14 @@ class SQLiteTransport(AbstractTransport):
return ret
def begin_write(self):
self._object_cache = []
self.saved_obj_count = 0
def end_write(self):
pass
if self._current_batch:
self.save_current_batch()
self._current_batch = []
self._current_batch_size = 0
def copy_object_and_children(
self, id: str, target_transport: AbstractTransport
@@ -199,19 +189,3 @@ class SQLiteTransport(AbstractTransport):
def __del__(self):
self.__connection.close()
# def _run_queue(queue: Queue, root_path: str):
# if queue.empty():
# return
# print("RUN QUEUE")
# conn = sqlite3.connect(root_path)
# while not queue.empty():
# data = queue.get()
# with closing(conn.cursor()) as c:
# c.execute(
# "INSERT OR IGNORE INTO objects(hash, content) VALUES(?,?)",
# (data[0], data[1]),
# )
# conn.commit()
# conn.close()
+1 -1
View File
@@ -61,7 +61,7 @@ def second_user_dict(host):
@pytest.fixture(scope="session")
def client(host, user_dict):
client = SpeckleClient(host=host, use_ssl=False)
client.authenticate(user_dict["token"])
client.authenticate_with_token(user_dict["token"])
return client
+10
View File
@@ -69,6 +69,16 @@ class TestStream:
assert len(search_results) == 1
assert search_results[0].name == updated_stream.name
def test_stream_favorite(self, client, stream):
favorited = client.stream.favorite(stream.id)
assert isinstance(favorited, Stream)
assert favorited.favoritedDate is not None
unfavorited = client.stream.favorite(stream.id, False)
assert isinstance(favorited, Stream)
assert unfavorited.favoritedDate is None
def test_stream_grant_permission(self, client, stream, second_user_dict):
granted = client.stream.grant_permission(
stream_id=stream.id,