Compare commits

..

4 Commits

Author SHA1 Message Date
Gergő Jedlicska 6c33c61a6d Merge pull request #382 from specklesystems/gergo/fixServerTransportHeader
fix: server transport always accept text/plain
2025-02-12 13:03:00 +01:00
Gergő Jedlicska 71afb1275f fix: server transport always accept text/plain 2025-02-12 12:35:11 +01:00
Chuck Driesler 78c55b787f chore(automate): improve error message when automate fails to receive a model version (#376)
Co-authored-by: Björn Steinhagen <steinhagen.bjoern@gmail.com>
2025-01-24 11:28:15 +00:00
Jedd Morgan 34f2dc2ab6 author now optional (#377) 2025-01-24 11:01:09 +00:00
43 changed files with 1052 additions and 2495 deletions
+5 -3
View File
@@ -4,9 +4,11 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "debugpy",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
@@ -14,9 +16,9 @@
},
{
"name": "Pytest",
"type": "debugpy",
"type": "python",
"request": "launch",
"module": "pytest",
"program": "pytest",
"args": [],
"console": "integratedTerminal",
"justMyCode": true
Generated
+49 -2
View File
@@ -343,6 +343,7 @@ argcomplete = ">=1.12.1,<3.6"
charset-normalizer = ">=2.1.0,<4"
colorama = ">=0.4.1,<0.5.0"
decli = ">=0.6.0,<0.7.0"
importlib_metadata = {version = ">=8.0.0,<9", markers = "python_version < \"3.10\""}
jinja2 = ">=2.10.3"
packaging = ">=19"
pyyaml = ">=3.08"
@@ -582,6 +583,9 @@ files = [
{file = "graphql_core-3.2.5.tar.gz", hash = "sha256:e671b90ed653c808715645e3998b7ab67d382d55467b7e2978549111bbabf8d5"},
]
[package.dependencies]
typing-extensions = {version = ">=4,<5", markers = "python_version < \"3.10\""}
[[package]]
name = "h11"
version = "0.14.0"
@@ -666,6 +670,29 @@ files = [
[package.extras]
all = ["flake8 (>=7.1.1)", "mypy (>=1.11.2)", "pytest (>=8.3.2)", "ruff (>=0.6.2)"]
[[package]]
name = "importlib-metadata"
version = "8.5.0"
description = "Read metadata from Python packages"
optional = false
python-versions = ">=3.8"
files = [
{file = "importlib_metadata-8.5.0-py3-none-any.whl", hash = "sha256:45e54197d28b7a7f1559e60b95e7c567032b602131fbd588f1497f47880aa68b"},
{file = "importlib_metadata-8.5.0.tar.gz", hash = "sha256:71522656f0abace1d072b9e5481a48f07c138e00f079c38c8f883823f9c26bd7"},
]
[package.dependencies]
zipp = ">=3.20"
[package.extras]
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)"]
cover = ["pytest-cov"]
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"]
enabler = ["pytest-enabler (>=2.2)"]
perf = ["ipython"]
test = ["flufl.flake8", "importlib-resources (>=1.3)", "jaraco.test (>=5.4)", "packaging", "pyfakefs", "pytest (>=6,!=8.1.*)", "pytest-perf (>=0.9.2)"]
type = ["pytest-mypy"]
[[package]]
name = "iniconfig"
version = "2.0.0"
@@ -1310,6 +1337,7 @@ mccabe = ">=0.6,<0.8"
platformdirs = ">=2.2.0"
tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""}
tomlkit = ">=0.10.1"
typing-extensions = {version = ">=3.10.0", markers = "python_version < \"3.10\""}
[package.extras]
spelling = ["pyenchant (>=3.2,<4.0)"]
@@ -2069,7 +2097,26 @@ idna = ">=2.0"
multidict = ">=4.0"
propcache = ">=0.2.0"
[[package]]
name = "zipp"
version = "3.21.0"
description = "Backport of pathlib-compatible object wrapper for zip files"
optional = false
python-versions = ">=3.9"
files = [
{file = "zipp-3.21.0-py3-none-any.whl", hash = "sha256:ac1bbe05fd2991f160ebce24ffbac5f6d11d83dc90891255885223d42b3cd931"},
{file = "zipp-3.21.0.tar.gz", hash = "sha256:2c9958f6430a2040341a52eb608ed6dd93ef4392e02ffe219417c1b28b5dd1f4"},
]
[package.extras]
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)"]
cover = ["pytest-cov"]
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"]
enabler = ["pytest-enabler (>=2.2)"]
test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more-itertools", "pytest (>=6,!=8.1.*)", "pytest-ignore-flaky"]
type = ["pytest-mypy"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.10.0, <4.0"
content-hash = "08167b7b5327a9a96f09b849552eeea1611916f7aa20b825d78f807bb10d83b7"
python-versions = ">=3.9.0, <4.0"
content-hash = "baaeb5d53fa2e60fa2be85853e841f959021600401559cb65d05fd54dadfdab0"
+1 -2
View File
@@ -15,7 +15,7 @@ packages = [
[tool.poetry.dependencies]
python = ">=3.10.0, <4.0"
python = ">=3.9.0, <4.0"
pydantic = "^2.5"
appdirs = "^1.4.4"
gql = { extras = ["requests", "websockets"], version = "^3.3.0" }
@@ -24,7 +24,6 @@ Deprecated = "^1.2.13"
stringcase = "^1.2.0"
attrs = "^23.1.0"
httpx = "^0.25.0"
pydantic-settings = "^2.6.1"
[tool.poetry.group.dev.dependencies]
black = "24.10.0"
+11 -2
View File
@@ -101,8 +101,17 @@ class AutomationContext:
commit = self.speckle_client.commit.get(
self.automation_run_data.project_id, version_id
)
if not commit.referencedObject:
raise ValueError("The commit has no referencedObject, cannot receive it.")
if not commit or not commit.referencedObject:
raise ValueError(
f"""\
Could not receive specified version.
{"The commit has no referencedObject." if not commit.referencedObject else ""}
Is your environment configured correctly?
project_id: {self.automation_run_data.project_id}
model_id: {self.automation_run_data.triggers[0].payload.model_id}
version_id: {self.automation_run_data.triggers[0].payload.version_id}
"""
)
base = operations.receive(
commit.referencedObject, self._server_transport, self._memory_transport
)
+2 -2
View File
@@ -1,3 +1,3 @@
# from specklepy import objects
from specklepy import objects
# __all__ = ["objects"]
__all__ = ["objects"]
+1 -1
View File
@@ -126,7 +126,7 @@ class Version(BaseModel):
class Model(BaseModel):
author: LimitedUser
author: Optional[LimitedUser]
createdAt: datetime
description: Optional[str]
displayName: str
+23
View File
@@ -0,0 +1,23 @@
"""Builtin Speckle object kit."""
from specklepy.objects import (
GIS,
encoding,
geometry,
other,
primitive,
structural,
units,
)
from specklepy.objects.base import Base
__all__ = [
"Base",
"encoding",
"geometry",
"other",
"units",
"structural",
"primitive",
"GIS",
]
+32 -26
View File
@@ -1,5 +1,4 @@
import contextlib
from dataclasses import dataclass, field
from enum import Enum
from inspect import isclass
from typing import (
@@ -19,7 +18,8 @@ from warnings import warn
from stringcase import pascalcase
from specklepy.logging.exceptions import SpeckleException
from specklepy.logging.exceptions import SpeckleException, SpeckleInvalidUnitException
from specklepy.objects.units import Units
from specklepy.transports.memory import MemoryTransport
PRIMITIVES = (int, float, str, bool)
@@ -65,6 +65,7 @@ REMOVE_FROM_DIR = {
"_handle_object_count",
"_type_check",
"_type_registry",
"_units",
"add_chunkable_attrs",
"add_detachable_attrs",
"get_children_count",
@@ -115,7 +116,7 @@ class _RegisteringBase:
@classmethod
def _determine_speckle_type(cls) -> str:
"""
This method brings the speckle_type construction in par with Speckle-sharp/Core.
This method brings the speckle_type construction in par with peckle-sharp/Core.
The implementation differs, because in Core the basis of the speckle_type if
type.FullName, which includes the dotnet namespace name too.
@@ -167,11 +168,8 @@ class _RegisteringBase:
initialization. This is reused to register each subclassing type into a class
level dictionary.
"""
# if not speckle_type:
# raise Exception("no type")
cls._speckle_type_override = speckle_type
cls.speckle_type = cls._determine_speckle_type()
# cls.speckle_type = speckle_type
if cls._full_name() in cls._type_registry:
raise ValueError(
f"The speckle_type: {speckle_type} is already registered for type: "
@@ -321,17 +319,22 @@ def _validate_type(t: Optional[type], value: Any) -> Tuple[bool, Any]:
return False, value
@dataclass(kw_only=True)
class Base(_RegisteringBase, speckle_type="Base"):
class Base(_RegisteringBase):
id: Union[str, None] = None
# totalChildrenCount: Union[int, None] = None
totalChildrenCount: Union[int, None] = None
applicationId: Union[str, None] = None
_units: Union[None, str] = None
def __init__(self, **kwargs) -> None:
super().__init__()
for k, v in kwargs.items():
self.__setattr__(k, v)
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}(id: {self.id}, "
f"speckle_type: {self.speckle_type}, "
# f"totalChildrenCount: {self.totalChildrenCount})"
f"totalChildrenCount: {self.totalChildrenCount})"
)
def __str__(self) -> str:
@@ -459,21 +462,21 @@ class Base(_RegisteringBase, speckle_type="Base"):
"""
self._detachable = self._detachable.union(names)
# @property
# def units(self) -> Union[str, None]:
# return self._units
@property
def units(self) -> Union[str, None]:
return self._units
# @units.setter
# def units(self, value: Union[str, Units, None]):
# """While this property accepts any string value, geometry expects units to be specific strings (see Units enum)"""
# if isinstance(value, str) or value is None:
# self._units = value
# elif isinstance(value, Units):
# self._units = value.value
# else:
# raise SpeckleInvalidUnitException(
# f"Unknown type {type(value)} received for units"
# )
@units.setter
def units(self, value: Union[str, Units, None]):
"""While this property accepts any string value, geometry expects units to be specific strings (see Units enum)"""
if isinstance(value, str) or value is None:
self._units = value
elif isinstance(value, Units):
self._units = value.value
else:
raise SpeckleInvalidUnitException(
f"Unknown type {type(value)} received for units"
)
def get_member_names(self) -> List[str]:
"""Get all of the property names on this object, dynamic or not"""
@@ -565,6 +568,9 @@ class Base(_RegisteringBase, speckle_type="Base"):
Base.update_forward_refs()
@dataclass(kw_only=True)
class DataChunk(Base, speckle_type="Speckle.Core.Models.DataChunk"):
data: List[Any] = field(default_factory=list)
data: Union[List[Any], None] = None
def __init__(self) -> None:
super().__init__()
self.data = []
@@ -16,9 +16,7 @@ CHUNKABLE_PROPS = {
DETACHABLE = {"detach_this", "origin", "detached_list"}
class FakeGeo(
Base, speckle_type="FakeGeo", chunkable={"dots": 50}, detachable={"pointslist"}
):
class FakeGeo(Base, chunkable={"dots": 50}, detachable={"pointslist"}):
pointslist: Optional[List[Base]] = None
dots: Optional[List[int]] = None
@@ -30,9 +28,7 @@ class FakeDirection(Enum):
WEST = 4
class FakeMesh(
FakeGeo, speckle_type="FakeMesh", chunkable=CHUNKABLE_PROPS, detachable=DETACHABLE
):
class FakeMesh(FakeGeo, chunkable=CHUNKABLE_PROPS, detachable=DETACHABLE):
vertices: Optional[List[float]] = None
faces: Optional[List[int]] = None
colors: Optional[List[int]] = None
File diff suppressed because it is too large Load Diff
-110
View File
@@ -1,110 +0,0 @@
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass, field
from typing import Generic, List, TypeVar
from specklepy.logging.exceptions import SpeckleInvalidUnitException
from specklepy.objects.base import Base
from specklepy.objects.models.units import Units
from specklepy.objects.primitive import Interval
T = TypeVar("T") # define type variable for generic type
# generic interfaces
class ICurve(metaclass=ABCMeta):
@property
@abstractmethod
def length(self) -> float:
pass
@property
@abstractmethod
def _domain(self) -> Interval:
pass
@property
@abstractmethod
def units(self) -> str:
pass
class IDisplayValue(Generic[T], metaclass=ABCMeta):
@property
@abstractmethod
def display_value(self) -> T:
pass
@dataclass(kw_only=True)
class IHasUnits(metaclass=ABCMeta):
units: str | Units
_units: str = field(repr=False, init=False)
@property
def units(self) -> str:
return self._units
@units.setter
def units(self, value: str | Units):
if isinstance(value, str):
self._units = value
elif isinstance(value, Units):
self._units = value.value
else:
raise SpeckleInvalidUnitException(
f"Unknown type {type(value)} received for units"
)
@dataclass(kw_only=True)
class IHasArea(metaclass=ABCMeta):
area: float
_area: float = field(init=False, repr=False)
@property
def area(self) -> float:
return self._area
@area.setter
def area(self, value: float):
if not isinstance(value, (int, float)):
raise ValueError(f"Area must be a number, got {type(value)}")
self._area = float(value)
@dataclass(kw_only=True)
class IHasVolume(metaclass=ABCMeta):
volume: float
_volume: float = field(init=False, repr=False)
@property
def volume(self) -> float:
return self._volume
@volume.setter
def volume(self, value: float):
if not isinstance(value, (int, float)):
raise ValueError(f"Volume must be a number, got {type(value)}")
self._volume = float(value)
# data object interfaces
class IProperties(metaclass=ABCMeta):
@property
@abstractmethod
def properties(self) -> dict[str, object]:
pass
class IDataObject(IProperties, IDisplayValue[List[Base]], metaclass=ABCMeta):
@property
@abstractmethod
def name(self) -> str:
pass
class IBlenderObject(IDataObject, metaclass=ABCMeta):
@property
@abstractmethod
def type(self) -> str:
pass
@@ -1,35 +0,0 @@
from dataclasses import dataclass, field
from typing import List
from specklepy.objects.base import Base
@dataclass(kw_only=True)
class Collection(
Base,
# TODO: add deprecated speckle_types
speckle_type="Speckle.Core.Models.Collections.Collection",
detachable={"elements"},
):
"""
A simple container for organising objects within a model and preserving object hierarchy.
A container is defined by a human-readable name a unique applicationId and its list of contained elements.
The elements can include an unrestricted number of Base objects including additional nested Collections.
Note:
A Collection can be for example a Layer in Rhino/AutoCad, a collection in Blender, or a Category in Revit.
The location of each collection in the hierarchy of collections in a commit will be retrieved through commit traversal.
Attributes:
name: The human-readable name of the Collection. This name is not necessarily unique within a commit. Set the applicationId for a unique identifier.
elements: The elements contained in this Collection. This may include additional nested Collections
"""
name: str
elements: List[Base] = field(default_factory=list)
if __name__ == "__main__":
c = Collection(name="asfd")
print(c)
@@ -1,18 +0,0 @@
from abc import ABCMeta, abstractmethod
class ISpeckleObject(speckle_type="ISpeckleObjects", metaclass=ABCMeta):
@property
@abstractmethod
def id(self) -> str | None:
pass
@property
@abstractmethod
def application_id(self) -> str | None:
pass
@property
@abstractmethod
def speckle_type(self) -> str:
pass
+14 -17
View File
@@ -1,28 +1,25 @@
from dataclasses import dataclass
from typing import List
from typing import Any, List
from specklepy.objects.base import Base
NAMESPACE = "Objects.Primitive"
@dataclass(kw_only=True)
class Interval(Base, speckle_type="Objects.Primitive.Interval"):
start: float = 0.0 # Added default
end: float = 0.0 # Added default
@property
def length(self) -> float:
return abs(self.end - self.start)
class Interval(Base, speckle_type=f"{NAMESPACE}.Interval"):
start: float = 0.0
end: float = 0.0
def __str__(self) -> str:
return f"{super().__str__()}[{self.start}, {self.end}]"
def length(self):
return abs(self.start - self.end)
@classmethod
def unit_interval(cls) -> "Interval":
return cls(start=0, end=1)
def from_list(cls, args: List[Any]) -> "Interval":
return cls(start=args[0], end=args[1])
def to_list(self) -> List[float]:
def to_list(self) -> List[Any]:
return [self.start, self.end]
@classmethod
def from_list(cls, args: List[float]) -> "Interval":
return cls(start=args[0], end=args[1])
class Interval2d(Base, speckle_type=f"{NAMESPACE}.Interval2d"):
u: Interval
v: Interval
-48
View File
@@ -1,48 +0,0 @@
from dataclasses import dataclass, field
from typing import List, Optional
from specklepy.objects.base import Base
from specklepy.objects.interfaces import IHasUnits
@dataclass(kw_only=True)
class ColorProxy(
Base,
speckle_type="Models.Proxies.ColorProxy",
detachable={"objects"},
):
objects: List[str] = field(default_factory=list)
value: int
name: Optional[str] = None
@dataclass(kw_only=True)
class GroupProxy(
Base,
speckle_type="Models.Proxies.GroupProxy",
detachable={"objects"},
):
objects: List[str] = field(default_factory=list)
name: str = field(default="Unnamed Group")
@dataclass(kw_only=True)
class InstanceProxy(
Base,
IHasUnits,
speckle_type="Models.Proxies.InstanceProxy",
):
definition_id: str
transform: List[float] = field(default_factory=list)
max_depth: int = 50
@dataclass(kw_only=True)
class InstanceDefinitionProxy(
Base,
speckle_type="Models.Proxies.InstanceDefinitionProxy",
detachable={"objects"},
):
objects: List[str] = field(default_factory=list)
max_depth: int = 50
name: str = field(default="Unnamed Instance")
-26
View File
@@ -1,26 +0,0 @@
from devtools import debug
from specklepy.api.operations import deserialize, serialize
from specklepy.objects.geometry import Line, Point
from specklepy.objects.models.units import Units
from specklepy.objects.primitive import Interval
# points
p1 = Point(x=1.0, y=2.0, z=3.0, units=Units.m)
p2 = Point(x=4.0, y=6.0, z=8.0, units=Units.m, applicationId="asdf")
# test Line
line = Line(start=p1, end=p2, units=Units.m, domain=Interval(start=0.0, end=1.0))
print(f"\nLine length: {line.length}")
ser_line = serialize(line)
line_again = deserialize(ser_line)
print("\nOriginal line:")
debug(line)
print("\nSerialized line:")
debug(ser_line)
print("\nDeserialized line:")
debug(line_again)
-182
View File
@@ -1,182 +0,0 @@
from devtools import debug
from specklepy.api.operations import deserialize, serialize
from specklepy.objects.geometry import Mesh
# create a speckle cube mesh (but more colorful)
vertices = [
0.0,
0.0,
0.0,
1.0,
0.0,
0.0,
1.0,
1.0,
0.0,
0.0,
1.0,
0.0,
0.0,
0.0,
1.0,
1.0,
0.0,
1.0,
1.0,
1.0,
1.0,
0.0,
1.0,
1.0,
]
# define faces (triangles)
faces = [
3,
0,
1,
2,
3,
0,
2,
3,
3,
4,
5,
6,
3,
4,
6,
7,
3,
0,
4,
7,
3,
0,
7,
3,
3,
1,
5,
6,
3,
1,
6,
2,
3,
3,
2,
6,
3,
3,
6,
7,
3,
0,
1,
5,
3,
0,
5,
4,
]
# create colors (one per vertex)
colors = [
255,
0,
0,
255,
0,
255,
0,
255,
0,
0,
255,
255,
255,
255,
0,
255,
255,
0,
255,
255,
0,
255,
255,
255,
255,
255,
255,
255,
0,
0,
0,
255,
]
texture_coordinates = [
0.0,
0.0,
1.0,
0.0,
1.0,
1.0,
0.0,
1.0,
0.0,
0.0,
1.0,
0.0,
1.0,
1.0,
0.0,
1.0,
]
# create the mesh
cube_mesh = Mesh(
vertices=vertices,
faces=faces,
colors=colors,
textureCoordinates=texture_coordinates,
units="mm",
area=0.0,
volume=0.0,
)
print("\nMesh Details:")
print(f"Number of vertices: {cube_mesh.vertices_count}")
print(f"Number of texture coordinates: {cube_mesh.texture_coordinates_count}")
print("\nSome vertex points:")
for i in range(4):
point = cube_mesh.get_point(i)
print(f"Vertex {i}: ({point.x}, {point.y}, {point.z})")
print("\nSome texture coordinates:")
for i in range(4):
u, v = cube_mesh.get_texture_coordinate(i)
print(f"Texture coordinate {i}: ({u}, {v})")
print("\nTesting serialization...")
ser_mesh = serialize(cube_mesh)
mesh_again = deserialize(ser_mesh)
print("\nOriginal mesh:")
debug(cube_mesh)
print("\nDeserialized mesh:")
debug(mesh_again)
print("\nTesting vertex-texture coordinate alignment...")
cube_mesh.align_vertices_with_texcoords_by_index()
print("Alignment complete.")
print(f"Vertices count after alignment: {cube_mesh.vertices_count}")
print(
f"Texture coordinates count after alignment: {cube_mesh.texture_coordinates_count}"
)
-21
View File
@@ -1,21 +0,0 @@
from devtools import debug
from specklepy.api.operations import deserialize, serialize
from specklepy.objects.geometry import Point
from specklepy.objects.models.units import Units
# test points
p1 = Point(x=1.0, y=2.0, z=3.0, units=Units.m)
p2 = Point(x=4.0, y=6.0, z=8.0, units=Units.m, applicationId="asdf")
print("Distance between points:", p1.distance_to(p2))
ser_p1 = serialize(p1)
p1_again = deserialize(ser_p1)
print("\nOriginal point:")
debug(p1)
print("\nSerialized point:")
debug(ser_p1)
print("\nDeserialized point:")
debug(p1_again)
@@ -1,51 +0,0 @@
from devtools import debug
from specklepy.api.operations import deserialize, serialize
from specklepy.objects.geometry import Polyline
from specklepy.objects.models.units import Units
from specklepy.objects.primitive import Interval
# create points for first polyline - not closed, in meters
points1_coords = [1.0, 1.0, 0.0, 2.0, 1.0, 0.0, 2.0, 2.0, 0.0, 1.0, 2.0, 0.0]
# Create points for second polyline - closed, in ft
points2_coords = [0.0, 0.0, 0.0, 3.0, 0.0, 0.0, 3.0, 3.0, 0.0, 0.0, 3.0, 0.0]
# create polylines
polyline1 = Polyline(
value=points1_coords,
closed=False,
units=Units.m,
domain=Interval(start=0.0, end=1.0),
)
polyline2 = Polyline(
value=points2_coords,
closed=True,
units=Units.feet,
domain=Interval(start=0.0, end=1.0),
applicationId="polyllllineeee",
)
print("Polyline 1 length (meters):", polyline1.length)
print("Polyline 2 length (feet):", polyline2.length)
ser_poly1 = serialize(polyline1)
poly1_again = deserialize(ser_poly1)
print("\nOriginal polyline 1:")
debug(polyline1)
print("\nSerialized polyline 1:")
debug(ser_poly1)
print("\nDeserialized polyline 1:")
debug(poly1_again)
ser_poly2 = serialize(polyline2)
poly2_again = deserialize(ser_poly2)
print("\nOriginal polyline 2:")
debug(polyline2)
print("\nSerialized polyline 2:")
debug(ser_poly2)
print("\nDeserialized polyline 2:")
debug(poly2_again)
-23
View File
@@ -1,23 +0,0 @@
"""Builtin Speckle object kit."""
# from specklepy.objects import (
# GIS,
# encoding,
# geometry,
# other,
# primitive,
# structural,
# units,
# )
from specklepy.objects.base import Base
__all__ = [
"Base",
# "encoding",
# "geometry",
# "other",
# "units",
# "structural",
# "primitive",
# "GIS",
]
-588
View File
@@ -1,588 +0,0 @@
import contextlib
from enum import Enum
from inspect import isclass
from typing import (
Any,
ClassVar,
Dict,
ForwardRef,
List,
Optional,
Set,
Tuple,
Type,
Union,
get_type_hints,
)
from warnings import warn
from stringcase import pascalcase
from specklepy.logging.exceptions import SpeckleException, SpeckleInvalidUnitException
from specklepy.objects.units import Units
from specklepy.transports.memory import MemoryTransport
PRIMITIVES = (int, float, str, bool)
# to remove from dir() when calling get_member_names()
REMOVE_FROM_DIR = {
"Config",
"_Base__dict_helper",
"__annotations__",
"__class__",
"__delattr__",
"__dict__",
"__dir__",
"__doc__",
"__eq__",
"__format__",
"__ge__",
"__getattribute__",
"__getitem__",
"__gt__",
"__hash__",
"__init__",
"__init_subclass__",
"__le__",
"__lt__",
"__module__",
"__ne__",
"__new__",
"__reduce__",
"__reduce_ex__",
"__repr__",
"__setattr__",
"__setitem__",
"__sizeof__",
"__str__",
"__subclasshook__",
"__weakref__",
"_chunk_size_default",
"_chunkable",
"_count_descendants",
"_attr_types",
"_detachable",
"_handle_object_count",
"_type_check",
"_type_registry",
"_units",
"add_chunkable_attrs",
"add_detachable_attrs",
"get_children_count",
"get_dynamic_member_names",
"get_id",
"get_member_names",
"get_registered_type",
"get_typed_member_names",
"to_dict",
"update_forward_refs",
"validate_prop_name",
"from_list",
"to_list",
}
class _RegisteringBase:
"""
Private Base model for Speckle types.
This is an implementation detail, please do not use this outside this module.
This class provides automatic registration of `speckle_type` into a global,
(class level) registry for each subclassing type.
The type registry is a base for accurate type based (de)serialization.
"""
speckle_type: ClassVar[str]
_speckle_type_override: ClassVar[Optional[str]] = None
_speckle_namespace: ClassVar[Optional[str]] = None
_type_registry: ClassVar[Dict[str, Type["Base"]]] = {}
_attr_types: ClassVar[Dict[str, Type]] = {}
# dict of chunkable props and their max chunk size
_chunkable: Dict[str, int] = {}
_chunk_size_default: int = 1000
_detachable: Set[str] = set() # list of defined detachable props
_serialize_ignore: Set[str] = set()
@classmethod
def get_registered_type(cls, speckle_type: str) -> Optional[Type["Base"]]:
"""Get the registered type from the protected mapping via the `speckle_type`"""
for full_name in reversed(speckle_type.split(":")):
maybe_type = cls._type_registry.get(full_name, None)
if maybe_type:
return maybe_type
return None
@classmethod
def _determine_speckle_type(cls) -> str:
"""
This method brings the speckle_type construction in par with Speckle-sharp/Core.
The implementation differs, because in Core the basis of the speckle_type if
type.FullName, which includes the dotnet namespace name too.
Copying that behavior is hard in python, where the concept of namespaces
means something entirely different.
So we enabled a speckle_type override mechanism, that enables
"""
base_name = "Base"
if cls.__name__ == base_name:
return base_name
bases = [
b._full_name()
for b in reversed(cls.mro())
if issubclass(b, Base) and b.__name__ != base_name
]
return ":".join(bases)
@classmethod
def _full_name(cls) -> str:
base_name = "Base"
if cls.__name__ == base_name:
return base_name
if cls._speckle_type_override:
return cls._speckle_type_override
# convert the module names to PascalCase to match c# namespace naming convention
# also drop specklepy from the beginning
namespace = ".".join(
pascalcase(m)
for m in filter(lambda name: name != "specklepy", cls.__module__.split("."))
)
return f"{namespace}.{cls.__name__}"
def __init_subclass__(
cls,
speckle_type: Optional[str] = None,
chunkable: Optional[Dict[str, int]] = None,
detachable: Optional[Set[str]] = None,
serialize_ignore: Optional[Set[str]] = None,
**kwargs: Dict[str, Any],
):
"""
Hook into subclass type creation.
This is provides a mechanism to hook into the event of the subclass type object
initialization. This is reused to register each subclassing type into a class
level dictionary.
"""
# if not speckle_type:
# raise Exception("no type")
cls._speckle_type_override = speckle_type
cls.speckle_type = cls._determine_speckle_type()
# cls.speckle_type = speckle_type
if cls._full_name() in cls._type_registry:
raise ValueError(
f"The speckle_type: {speckle_type} is already registered for type: "
f"{cls._type_registry[cls._full_name()].__name__}. "
"Please choose a different type name."
)
cls._type_registry[cls._full_name()] = cls # type: ignore
try:
cls._attr_types = get_type_hints(cls)
except Exception:
cls._attr_types = getattr(cls, "__annotations__", {})
if chunkable:
chunkable = {k: v for k, v in chunkable.items() if isinstance(v, int)}
cls._chunkable = dict(cls._chunkable, **chunkable)
if detachable:
cls._detachable = cls._detachable.union(detachable)
if serialize_ignore:
cls._serialize_ignore = cls._serialize_ignore.union(serialize_ignore)
# we know, that the super here is object, that takes no args on init subclass
return super().__init_subclass__()
# T = TypeVar("T")
# how i wish the code below would be correct, but we're also parsing into floats
# and converting into strings if the original type is string, but the value isn't
# def _validate_type(t: type, value: T) -> Tuple[bool, T]:
def _validate_type(t: Optional[type], value: Any) -> Tuple[bool, Any]:
# this should be reworked. Its only ok to return null for Optionals...
# if t is None and value is None:
if value is None:
return True, value
# after fixing the None t above, this should be
# if t is Any:
# if t is None:
if t is None or t is Any:
return True, value
if isclass(t) and issubclass(t, Enum):
if isinstance(value, t):
return True, value
if value in t._value2member_map_:
return True, t(value)
if getattr(t, "__module__", None) == "typing":
if isinstance(t, ForwardRef):
return True, value
origin = getattr(t, "__origin__")
# below is what in nicer for >= py38
# origin = get_origin(t)
# recursive validation for Unions on both types preferring the fist type
if origin is Union:
# below is what in nicer for >= py38
# t_1, t_2 = get_args(t)
args = t.__args__ # type: ignore
for arg_t in args:
t_success, t_value = _validate_type(arg_t, value)
if t_success:
return True, t_value
return False, value
if origin is dict:
if not isinstance(value, dict):
return False, value
if value == {}:
return True, value
if not getattr(t, "__args__", None):
return True, value
t_key, t_value = t.__args__ # type: ignore
if (
getattr(t_key, "__name__", None),
getattr(t_value, "__name__", None),
) == ("KT", "VT"):
return True, value
# we're only checking the first item, but the for loop and return after
# evaluating the first item is the fastest way
for dict_key, dict_value in value.items():
valid_key, _ = _validate_type(t_key, dict_key)
valid_value, _ = _validate_type(t_value, dict_value)
if valid_key and valid_value:
return True, value
return False, value
if origin is list:
if not isinstance(value, list):
return False, value
if value == []:
return True, value
if not hasattr(t, "__args__"):
return True, value
t_items = t.__args__[0] # type: ignore
if getattr(t_items, "__name__", None) == "T":
return True, value
first_item_valid, _ = _validate_type(t_items, value[0])
if first_item_valid:
return True, value
return False, value
if origin is tuple:
if not isinstance(value, tuple):
return False, value
if not hasattr(t, "__args__"):
return True, value
args = t.__args__ # type: ignore
if args == tuple():
return True, value
# we're not checking for empty tuple, cause tuple lengths must match
if len(args) != len(value):
return False, value
values = []
for t_item, v_item in zip(args, value):
item_valid, item_value = _validate_type(t_item, v_item)
if not item_valid:
return False, value
values.append(item_value)
return True, tuple(values)
if origin is set:
if not isinstance(value, set):
return False, value
if not hasattr(t, "__args__"):
return True, value
t_items = t.__args__[0] # type: ignore
first_item_valid, _ = _validate_type(t_items, next(iter(value)))
if first_item_valid:
return True, value
return False, value
if isinstance(value, t):
return True, value
with contextlib.suppress(ValueError, TypeError):
if t is float and value is not None:
return True, float(value)
# TODO: dafuq, i had to add this not list check
# but it would also fail for objects and other complex values
if t is str and value and not isinstance(value, list):
return True, str(value)
return False, value
class Base(_RegisteringBase, speckle_type="Base"):
# id: Union[str, None] = None
# totalChildrenCount: Union[int, None] = None
# applicationId: Union[str, None] = None
_units: Union[None, str] = None
def __init__(
self,
id: str | None = None,
# totalChildrenCount: Union[int, None] = None,
applicationId: str | None = None,
**kwargs,
) -> None:
self.id = id
# self.totalChildrenCount = totalChildrenCount
self.applicationId = applicationId
super().__init__()
for k, v in kwargs.items():
self.__setattr__(k, v)
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}(id: {self.id}, "
f"speckle_type: {self.speckle_type}, "
# f"totalChildrenCount: {self.totalChildrenCount})"
)
def __str__(self) -> str:
return self.__repr__()
@classmethod
def of_type(cls, speckle_type: str, **kwargs) -> "Base":
"""
Get a plain Base object with a specified speckle_type.
The speckle_type is protected and cannot be overwritten on a class instance.
This is to prevent problems with receiving in other platforms or connectors.
However, if you really need a base with a different type, here is a helper
to do that for you.
This is used in the deserialisation of unknown types so their speckle_type
can be preserved.
"""
b = cls(**kwargs)
b.__dict__.update(speckle_type=speckle_type)
return b
def __setitem__(self, name: str, value: Any) -> None:
self.validate_prop_name(name)
self.__dict__[name] = value
def __getitem__(self, name: str) -> Any:
return self.__dict__[name]
def __setattr__(self, name: str, value: Any) -> None:
"""
Type checking, guard attribute, and property set mechanism.
The `speckle_type` is a protected class attribute it must not be overridden.
This also performs a type check if the attribute is type hinted.
"""
if name == "speckle_type":
# not sure if we should raise an exception here??
# raise SpeckleException(
# "Cannot override the `speckle_type`. This is set manually by the class or on deserialisation"
# )
return
# if value is not None:
value = self._type_check(name, value)
attr = getattr(self.__class__, name, None)
if isinstance(attr, property):
try:
attr.__set__(self, value)
except AttributeError:
return # the prop probably doesn't have a setter
super().__setattr__(name, value)
@classmethod
def update_forward_refs(cls) -> None:
"""
Attempts to populate the internal defined types dict for type checking
sometime after defining the class.
This is already done when defining the class, but can be called
again if references to undefined types were
included.
See `objects.geometry` for an example of how this is used with
the Brep class definitions.
"""
try:
cls._attr_types = get_type_hints(cls)
except Exception as e:
warn(f"Could not update forward refs for class {cls.__name__}: {e}")
@classmethod
def validate_prop_name(cls, name: str) -> None:
"""Validator for dynamic attribute names."""
if name in {"", "@"}:
raise ValueError("Invalid Name: Base member names cannot be empty strings")
if name.startswith("@@"):
raise ValueError(
"Invalid Name: Base member names cannot start with more than one '@'",
)
if "." in name or "/" in name:
raise ValueError(
"Invalid Name: Base member names cannot contain characters '.' or '/'",
)
def _type_check(self, name: str, value: Any) -> Any:
"""
Lightweight type checking of values before setting them
NOTE: Does not check subscripted types within generics as the performance hit
of checking each item within a given collection isn't worth it.
Eg if you have a type Dict[str, float],
we will only check if the value you're trying to set is a dict.
"""
types = getattr(self, "_attr_types", {})
t = types.get(name, None)
valid, checked_value = _validate_type(t, value)
if valid:
return checked_value
raise SpeckleException(
f"Cannot set '{self.__class__.__name__}.{name}':"
f"it expects type '{str(t)}',"
f"but received type '{type(value).__name__}'"
)
def add_chunkable_attrs(self, **kwargs: int) -> None:
"""
Mark defined attributes as chunkable for serialisation
Arguments:
kwargs {int} -- the name of the attribute as the keyword
and the chunk size as the arg
"""
chunkable = {k: v for k, v in kwargs.items() if isinstance(v, int)}
self._chunkable = dict(self._chunkable, **chunkable)
def add_detachable_attrs(self, names: Set[str]) -> None:
"""
Mark defined attributes as detachable for serialisation
Arguments:
names {Set[str]} -- the names of the attributes to detach as a set of string
"""
self._detachable = self._detachable.union(names)
@property
def units(self) -> Union[str, None]:
return self._units
@units.setter
def units(self, value: Union[str, Units, None]):
"""While this property accepts any string value, geometry expects units to be specific strings (see Units enum)"""
if isinstance(value, str) or value is None:
self._units = value
elif isinstance(value, Units):
self._units = value.value
else:
raise SpeckleInvalidUnitException(
f"Unknown type {type(value)} received for units"
)
def get_member_names(self) -> List[str]:
"""Get all of the property names on this object, dynamic or not"""
attr_dir = list(set(dir(self)) - REMOVE_FROM_DIR)
return [
name
for name in attr_dir
if not name.startswith("_") and not callable(getattr(self, name))
]
def get_serializable_attributes(self) -> List[str]:
"""Get the attributes that should be serialized"""
return sorted(list(set(self.get_member_names()) - self._serialize_ignore))
def get_typed_member_names(self) -> List[str]:
"""Get all of the names of the defined (typed) properties of this object"""
return list(self._attr_types.keys())
def get_dynamic_member_names(self) -> List[str]:
"""Get all of the names of the dynamic properties of this object"""
return list(set(self.__dict__.keys()) - set(self._attr_types.keys()))
def get_children_count(self) -> int:
"""Get the total count of children Base objects"""
parsed = []
return 1 + self._count_descendants(self, parsed)
def get_id(self, decompose: bool = False) -> str:
"""
Gets the id (a unique hash) of this object.
⚠️ This method fully serializes the object which,
in the case of large objects (with many sub-objects), has a tangible cost.
Avoid using it!
Note: the hash of a decomposed object differs from that of a
non-decomposed object
Arguments:
decompose {bool} -- if True, will decompose the object in
the process of hashing it
Returns:
str -- the hash (id) of the fully serialized object
"""
from specklepy.serialization.base_object_serializer import BaseObjectSerializer
serializer = BaseObjectSerializer()
if decompose:
serializer.write_transports = [MemoryTransport()]
return serializer.traverse_base(self)[0]
def _count_descendants(self, base: "Base", parsed: List) -> int:
if base in parsed:
return 0
parsed.append(base)
return sum(
self._handle_object_count(value, parsed)
for name, value in base.get_member_names()
if not name.startswith("@")
)
def _handle_object_count(self, obj: Any, parsed: List) -> int:
# pylint: disable=isinstance-second-argument-not-valid-type
count = 0
if obj is None:
return count
if isinstance(obj, "Base"):
count += 1
count += self._count_descendants(obj, parsed)
return count
elif isinstance(obj, list):
for item in obj:
if isinstance(item, "Base"):
count += 1
count += self._count_descendants(item, parsed)
else:
count += self._handle_object_count(item, parsed)
elif isinstance(obj, dict):
for _, value in obj.items():
if isinstance(value, "Base"):
count += 1
count += self._count_descendants(value, parsed)
else:
count += self._handle_object_count(value, parsed)
return count
Base.update_forward_refs()
class DataChunk(Base, speckle_type="Speckle.Core.Models.DataChunk"):
data: Union[List[Any], None] = None
def __init__(self) -> None:
super().__init__()
self.data = []
-946
View File
@@ -1,946 +0,0 @@
from enum import Enum
from typing import Any, List, Optional
from specklepy.objects.base import Base
from specklepy.objects.encoding import CurveArray, CurveTypeEncoding, ObjectArray
from specklepy.objects.primitive import Interval
from specklepy.objects.units import get_encoding_from_units, get_units_from_encoding
GEOMETRY = "Objects.Geometry."
class Point(Base, speckle_type=GEOMETRY + "Point"):
x: float = 0.0
y: float = 0.0
z: float = 0.0
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}(x: {self.x}, y: {self.y}, z: {self.z}, id:"
f" {self.id}, speckle_type: {self.speckle_type})"
)
@classmethod
def from_list(cls, args: List[float]) -> "Point":
"""
Create a new Point from a list of three floats
representing the x, y, and z coordinates
"""
return cls(x=args[0], y=args[1], z=args[2])
def to_list(self) -> List[Any]:
return [self.x, self.y, self.z]
@classmethod
def from_coords(cls, x: float = 0.0, y: float = 0.0, z: float = 0.0):
"""Create a new Point from x, y, and z values"""
pt = Point()
pt.x, pt.y, pt.z = x, y, z
return pt
class Pointcloud(
Base,
speckle_type=GEOMETRY + "Pointcloud",
chunkable={"points": 31250, "colors": 62500, "sizes": 62500},
):
points: Optional[List[float]] = None
colors: Optional[List[int]] = None
sizes: Optional[List[float]] = None
bbox: Optional["Box"] = None
class Vector(Base, speckle_type=GEOMETRY + "Vector"):
x: float = 0.0
y: float = 0.0
z: float = 0.0
applicationId: Optional[str] = None
def __repr__(self) -> str:
return (
f"{self.__class__.__name__} "
"(x: {self.x}, y: {self.y}, z: {self.z}, id: {self.id}, "
"speckle_type: {self.speckle_type})"
)
@classmethod
def from_list(cls, args: List[float]) -> "Vector":
"""
Create from a list of three floats representing the x, y, and z coordinates.
"""
return cls(x=args[0], y=args[1], z=args[2])
def to_list(self) -> List[float]:
return [self.x, self.y, self.z]
@classmethod
def from_coords(cls, x: float = 0.0, y: float = 0.0, z: float = 0.0) -> "Vector":
"""Create a new Point from x, y, and z values"""
v = Vector()
v.x, v.y, v.z = x, y, z
return v
class ControlPoint(Point, speckle_type=GEOMETRY + "ControlPoint"):
weight: Optional[float] = None
class Plane(Base, speckle_type=GEOMETRY + "Plane"):
origin: Point = Point()
normal: Vector = Vector()
xdir: Vector = Vector()
ydir: Vector = Vector()
@classmethod
def from_list(cls, args: List[Any]) -> "Plane":
return cls(
origin=Point.from_list(args[:3]),
normal=Vector.from_list(args[3:6]),
xdir=Vector.from_list(args[6:9]),
ydir=Vector.from_list(args[9:12]),
units=get_units_from_encoding(args[-1]),
)
def to_list(self) -> List[Any]:
return [
*self.origin.to_list(),
*self.normal.to_list(),
*self.xdir.to_list(),
*self.ydir.to_list(),
get_encoding_from_units(self._units),
]
class Box(Base, speckle_type=GEOMETRY + "Box"):
basePlane: Plane = Plane()
xSize: Interval = Interval()
ySize: Interval = Interval()
zSize: Interval = Interval()
area: Optional[float] = None
volume: Optional[float] = None
class Line(Base, speckle_type=GEOMETRY + "Line"):
start: Point = Point()
end: Optional[Point] = None
domain: Optional[Interval] = None
bbox: Optional[Box] = None
length: Optional[float] = None
@classmethod
def from_list(cls, args: List[Any]) -> "Line":
return cls(
start=Point.from_list(args[1:4]),
end=Point.from_list(args[4:7]),
domain=Interval.from_list(args[7:10]),
units=get_units_from_encoding(args[-1]),
)
def to_list(self) -> List[Any]:
domain = self.domain.to_list() if self.domain else [0, 1]
return [
CurveTypeEncoding.Line.value,
*self.start.to_list(),
*self.end.to_list(),
*domain,
get_encoding_from_units(self._units),
]
class Arc(Base, speckle_type=GEOMETRY + "Arc"):
radius: Optional[float] = None
startAngle: Optional[float] = None
endAngle: Optional[float] = None
angleRadians: Optional[float] = None
plane: Optional[Plane] = None
domain: Optional[Interval] = None
startPoint: Optional[Point] = None
midPoint: Optional[Point] = None
endPoint: Optional[Point] = None
bbox: Optional[Box] = None
area: Optional[float] = None
length: Optional[float] = None
@classmethod
def from_list(cls, args: List[Any]) -> "Arc":
return cls(
radius=args[1],
startAngle=args[2],
endAngle=args[3],
angleRadians=args[4],
domain=Interval.from_list(args[5:7]),
plane=Plane.from_list(args[7:20]),
startPoint=Point.from_list(args[20:23]),
midPoint=Point.from_list(args[23:26]),
endPoint=Point.from_list(args[26:29]),
units=get_units_from_encoding(args[-1]),
)
def to_list(self) -> List[Any]:
return [
CurveTypeEncoding.Arc.value,
self.radius,
self.startAngle,
self.endAngle,
self.angleRadians,
*self.domain.to_list(),
*self.plane.to_list(),
*self.startPoint.to_list(),
*self.midPoint.to_list(),
*self.endPoint.to_list(),
get_encoding_from_units(self._units),
]
class Circle(Base, speckle_type=GEOMETRY + "Circle"):
radius: Optional[float] = None
plane: Optional[Plane] = None
domain: Optional[Interval] = None
bbox: Optional[Box] = None
area: Optional[float] = None
length: Optional[float] = None
@classmethod
def from_list(cls, args: List[Any]) -> "Circle":
return cls(
radius=args[1],
domain=Interval.from_list(args[2:4]),
plane=Plane.from_list(args[4:17]),
units=get_units_from_encoding(args[-1]),
)
def to_list(self) -> List[Any]:
return [
CurveTypeEncoding.Circle.value,
self.radius,
*self.domain.to_list(),
*self.plane.to_list(),
get_encoding_from_units(self._units),
]
class Ellipse(Base, speckle_type=GEOMETRY + "Ellipse"):
firstRadius: Optional[float] = None
secondRadius: Optional[float] = None
plane: Optional[Plane] = None
domain: Optional[Interval] = None
trimDomain: Optional[Interval] = None
bbox: Optional[Box] = None
area: Optional[float] = None
length: Optional[float] = None
@classmethod
def from_list(cls, args: List[Any]) -> "Ellipse":
return cls(
firstRadius=args[1],
secondRadius=args[2],
domain=Interval.from_list(args[3:5]),
plane=Plane.from_list(args[5:18]),
units=get_units_from_encoding(args[-1]),
)
def to_list(self) -> List[Any]:
return [
CurveTypeEncoding.Ellipse.value,
self.firstRadius,
self.secondRadius,
*self.domain.to_list(),
*self.plane.to_list(),
get_encoding_from_units(self._units),
]
class Polyline(Base, speckle_type=GEOMETRY + "Polyline", chunkable={"value": 20000}):
value: Optional[List[float]] = None
closed: Optional[bool] = None
domain: Optional[Interval] = None
bbox: Optional[Box] = None
area: Optional[float] = None
length: Optional[float] = None
@classmethod
def from_points(cls, points: List[Point]):
"""Create a new Polyline from a list of Points"""
polyline = cls()
polyline.units = points[0].units
polyline.value = []
for point in points:
polyline.value.extend([point.x, point.y, point.z])
return polyline
@classmethod
def from_list(cls, args: List[Any]) -> "Polyline":
point_count = args[4]
return cls(
closed=bool(args[1]),
domain=Interval.from_list(args[2:4]),
value=args[5 : 5 + point_count],
units=get_units_from_encoding(args[-1]),
)
def to_list(self) -> List[Any]:
return [
CurveTypeEncoding.Polyline.value,
int(self.closed),
*self.domain.to_list(),
len(self.value),
*self.value,
get_encoding_from_units(self._units),
]
def as_points(self) -> List[Point]:
"""Converts the `value` attribute to a list of Points"""
if not self.value:
return
if len(self.value) % 3:
raise ValueError("Points array malformed: length%3 != 0.")
values = iter(self.value)
return [
Point(x=v, y=next(values), z=next(values), units=self.units) for v in values
]
class SpiralType(Enum):
Biquadratic = 0
BiquadraticParabola = 1
Bloss = 2
Clothoid = 3
Cosine = 4
Cubic = 5
CubicParabola = 6
Radioid = 7
Sinusoid = 8
Unknown = 9
class Spiral(Base, speckle_type=GEOMETRY + "Spiral", detachable={"displayValue"}):
startPoint: Optional[Point] = None
endPoint: Optional[Point]
plane: Optional[Plane]
turns: Optional[float]
pitchAxis: Optional[Vector] = Vector()
pitch: float = 0
spiralType: Optional[SpiralType] = None
displayValue: Optional[Polyline] = None
bbox: Optional[Box] = None
length: Optional[float] = None
domain: Optional[Interval] = None
class Curve(
Base,
speckle_type=GEOMETRY + "Curve",
chunkable={"points": 20000, "weights": 20000, "knots": 20000},
):
degree: Optional[int] = None
periodic: Optional[bool] = None
rational: Optional[bool] = None
points: Optional[List[float]] = None
weights: Optional[List[float]] = None
knots: Optional[List[float]] = None
domain: Optional[Interval] = None
displayValue: Optional[Polyline] = None
closed: Optional[bool] = None
bbox: Optional[Box] = None
area: Optional[float] = None
length: Optional[float] = None
def as_points(self) -> List[Point]:
"""Converts the `value` attribute to a list of Points"""
if not self.points:
return
if len(self.points) % 3:
raise ValueError("Points array malformed: length%3 != 0.")
values = iter(self.points)
return [
Point(x=v, y=next(values), z=next(values), units=self.units) for v in values
]
@classmethod
def from_list(cls, args: List[Any]) -> "Curve":
point_count = int(args[7])
weights_count = int(args[8])
knots_count = int(args[9])
points_start = 10
weights_start = 10 + point_count
knots_start = weights_start + weights_count
knots_end = knots_start + knots_count
return cls(
degree=int(args[1]),
periodic=bool(args[2]),
rational=bool(args[3]),
closed=bool(args[4]),
domain=Interval.from_list(args[5:7]),
points=args[points_start:weights_start],
weights=args[weights_start:knots_start],
knots=args[knots_start:knots_end],
units=get_units_from_encoding(args[-1]),
)
def to_list(self) -> List[Any]:
return [
CurveTypeEncoding.Curve.value,
self.degree,
int(self.periodic),
int(self.rational),
int(self.closed),
*self.domain.to_list(),
len(self.points),
len(self.weights),
len(self.knots),
*self.points,
*self.weights,
*self.knots,
get_encoding_from_units(self._units),
]
class Polycurve(Base, speckle_type=GEOMETRY + "Polycurve"):
segments: Optional[List[Base]] = None
domain: Optional[Interval] = None
closed: Optional[bool] = None
bbox: Optional[Box] = None
area: Optional[float] = None
length: Optional[float] = None
@classmethod
def from_list(cls, args: List[Any]) -> "Polycurve":
curve_arrays = CurveArray(args[5:-1])
return cls(
closed=bool(args[1]),
domain=Interval.from_list(args[2:4]),
segments=curve_arrays.to_curves(),
units=get_units_from_encoding(args[-1]),
)
def to_list(self) -> List[Any]:
curve_array = CurveArray.from_curves(self.segments).data
return [
CurveTypeEncoding.Polycurve.value,
int(self.closed),
*self.domain.to_list(),
len(curve_array),
*curve_array,
get_encoding_from_units(self._units),
]
class Extrusion(Base, speckle_type=GEOMETRY + "Extrusion"):
capped: Optional[bool] = None
profile: Optional[Base] = None
pathStart: Optional[Point] = None
pathEnd: Optional[Point] = None
pathCurve: Optional[Base] = None
pathTangent: Optional[Base] = None
profiles: Optional[List[Base]] = None
length: Optional[float] = None
area: Optional[float] = None
volume: Optional[float] = None
bbox: Optional[Box] = None
class Mesh(
Base,
speckle_type=GEOMETRY + "Mesh",
chunkable={
"vertices": 2000,
"faces": 2000,
"colors": 2000,
"textureCoordinates": 2000,
},
):
vertices: Optional[List[float]] = None
faces: Optional[List[int]] = None
colors: Optional[List[int]] = None
textureCoordinates: Optional[List[float]] = None
bbox: Optional[Box] = None
area: Optional[float] = None
volume: Optional[float] = None
@classmethod
def create(
cls,
vertices: List[float],
faces: List[int],
colors: Optional[List[int]] = None,
texture_coordinates: Optional[List[float]] = None,
) -> "Mesh":
"""
Create a new Mesh from lists representing its vertices, faces,
colors (optional), and texture coordinates (optional).
This will initialise empty lists for colors and texture coordinates
if you do not provide any.
"""
return cls(
vertices=vertices,
faces=faces,
colors=colors or [],
textureCoordinates=texture_coordinates or [],
)
class Surface(Base, speckle_type=GEOMETRY + "Surface"):
degreeU: Optional[int] = None
degreeV: Optional[int] = None
rational: Optional[bool] = None
area: Optional[float] = None
pointData: Optional[List[float]] = None
countU: Optional[int] = None
countV: Optional[int] = None
bbox: Optional[Box] = None
closedU: Optional[bool] = None
closedV: Optional[bool] = None
domainU: Optional[Interval] = None
domainV: Optional[Interval] = None
knotsU: Optional[List[float]] = None
knotsV: Optional[List[float]] = None
@classmethod
def from_list(cls, args: List[Any]) -> "Surface":
point_count = int(args[11])
knots_u_count = int(args[12])
knots_v_count = int(args[13])
start_point_data = 14
start_knots_u = start_point_data + point_count
start_knots_v = start_knots_u + knots_u_count
return cls(
degreeU=int(args[0]),
degreeV=int(args[1]),
countU=int(args[2]),
countV=int(args[3]),
rational=bool(args[4]),
closedU=bool(args[5]),
closedV=bool(args[6]),
domainU=Interval(start=args[7], end=args[8]),
domainV=Interval(start=args[9], end=args[10]),
pointData=args[start_point_data:start_knots_u],
knotsU=args[start_knots_u:start_knots_v],
knotsV=args[start_knots_v : start_knots_v + knots_v_count],
units=get_units_from_encoding(args[-1]),
)
def to_list(self) -> List[Any]:
return [
self.degreeU,
self.degreeV,
self.countU,
self.countV,
int(self.rational),
int(self.closedU),
int(self.closedV),
*self.domainU.to_list(),
*self.domainV.to_list(),
len(self.pointData),
len(self.knotsU),
len(self.knotsV),
*self.pointData,
*self.knotsU,
*self.knotsV,
get_encoding_from_units(self._units),
]
class BrepFace(Base, speckle_type=GEOMETRY + "BrepFace"):
_Brep: Optional["Brep"] = None
SurfaceIndex: Optional[int] = None
OuterLoopIndex: Optional[int] = None
OrientationReversed: Optional[bool] = None
LoopIndices: Optional[List[int]] = None
@property
def _outer_loop(self):
return self._Brep.Loops[self.OuterLoopIndex] # pylint: disable=no-member
@property
def _surface(self):
return self._Brep.Surfaces[self.SurfaceIndex] # pylint: disable=no-member
@property
def _loops(self):
if self.LoopIndices:
# pylint: disable=not-an-iterable, no-member
return [self._Brep.Loops[i] for i in self.LoopIndices]
@classmethod
def from_list(cls, args: List[Any], brep: "Brep" = None) -> "BrepFace":
return cls(
_Brep=brep,
SurfaceIndex=args[0],
OuterLoopIndex=args[1],
OrientationReversed=bool(args[2]),
LoopIndices=args[3:],
)
def to_list(self) -> List[Any]:
return [
self.SurfaceIndex,
self.OuterLoopIndex,
int(self.OrientationReversed),
*self.LoopIndices,
]
class BrepEdge(Base, speckle_type=GEOMETRY + "BrepEdge"):
_Brep: Optional["Brep"] = None
Curve3dIndex: Optional[int] = None
TrimIndices: Optional[List[int]] = None
StartIndex: Optional[int] = None
EndIndex: Optional[int] = None
ProxyCurveIsReversed: Optional[bool] = None
Domain: Optional[Interval] = None
@property
def _start_vertex(self):
return self._Brep.Vertices[self.StartIndex]
@property
def _end_vertex(self):
return self._Brep.Vertices[self.EndIndex]
@property
def _trims(self):
if self.TrimIndices:
# pylint: disable=not-an-iterable
return [self._Brep.Trims[i] for i in self.TrimIndices]
@property
def _curve(self):
return self._Brep.Curve3D[self.Curve3dIndex]
@classmethod
def from_list(cls, args: List[Any], brep: "Brep" = None) -> "BrepEdge":
domain_start = args[4]
domain_end = args[5]
domain = (
Interval(start=domain_start, end=domain_end)
if None not in (domain_start, domain_end)
else None
)
return cls(
_Brep=brep,
Curve3dIndex=int(args[0]),
TrimIndices=[int(t) for t in args[6:]],
StartIndex=int(args[1]),
EndIndex=int(args[2]),
ProxyCurveIsReversed=bool(args[3]),
Domain=domain,
)
def to_list(self) -> List[Any]:
return [
self.Curve3dIndex,
self.StartIndex,
self.EndIndex,
int(self.ProxyCurveIsReversed),
self.Domain.start,
self.Domain.end,
*self.TrimIndices,
]
class BrepLoopType(int, Enum):
Unknown = 0
Outer = 1
Inner = 2
Slit = 3
CurveOnSurface = 4
PointOnSurface = 5
class BrepLoop(Base, speckle_type=GEOMETRY + "BrepLoop"):
_Brep: Optional["Brep"] = None
FaceIndex: Optional[Optional[int]] = None
TrimIndices: Optional[List[int]] = None
Type: Optional[BrepLoopType] = None
@property
def _face(self):
return self._Brep.Faces[self.FaceIndex]
@property
def _trims(self):
if self.TrimIndices:
# pylint: disable=not-an-iterable
return [self._Brep.Trims[i] for i in self.TrimIndices]
@classmethod
def from_list(cls, args: List[any], brep: "Brep" = None):
return cls(
_Brep=brep,
FaceIndex=args[0],
Type=BrepLoopType(args[1]),
TrimIndices=args[2:],
)
def to_list(self) -> List[int]:
return [
self.FaceIndex,
self.Type.value,
*self.TrimIndices,
]
class BrepTrimType(int, Enum):
Unknown = 0
Boundary = 1
Mated = 2
Seam = 3
Singular = 4
CurveOnSurface = 5
PointOnSurface = 6
Slit = 7
class BrepTrim(Base, speckle_type=GEOMETRY + "BrepTrim"):
_Brep: Optional["Brep"] = None
EdgeIndex: Optional[int] = None
StartIndex: Optional[int] = None
EndIndex: Optional[int] = None
FaceIndex: Optional[int] = None
LoopIndex: Optional[int] = None
CurveIndex: Optional[int] = None
IsoStatus: Optional[int] = None
TrimType: Optional[BrepTrimType] = None
IsReversed: Optional[bool] = None
Domain: Optional[Interval] = None
@property
def _face(self):
if self._Brep:
return self._Brep.Faces[self.FaceIndex] # pylint: disable=no-member
@property
def _loop(self):
if self._Brep:
return self._Brep.Loops[self.LoopIndex] # pylint: disable=no-member
@property
def _edge(self):
if self._Brep:
# pylint: disable=no-member
return self._Brep.Edges[self.EdgeIndex] if self.EdgeIndex != -1 else None
@property
def _curve_2d(self):
if self._Brep:
return self._Brep.Curve2D[self.CurveIndex] # pylint: disable=no-member
@classmethod
def from_list(cls, args: List[Any], brep: "Brep" = None) -> "BrepTrim":
return cls(
_Brep=brep,
EdgeIndex=args[0],
StartIndex=args[1],
EndIndex=args[2],
FaceIndex=args[3],
LoopIndex=args[4],
CurveIndex=args[5],
IsoStatus=args[6],
TrimType=BrepTrimType(args[7]),
IsReversed=bool(args[8]),
)
def to_list(self) -> List[Any]:
return [
self.EdgeIndex,
self.StartIndex,
self.EndIndex,
self.FaceIndex,
self.LoopIndex,
self.CurveIndex,
self.IsoStatus,
self.TrimType.value,
int(self.IsReversed),
]
class Brep(
Base,
speckle_type=GEOMETRY + "Brep",
chunkable={
"SurfacesValue": 31250,
"Curve3DValues": 31250,
"Curve2DValues": 31250,
"VerticesValue": 31250,
"EdgesValue": 62500,
"LoopsValue": 62500,
"FacesValue": 62500,
"TrimsValue": 62500,
},
detachable={"displayValue"},
serialize_ignore={
"Surfaces",
"Curve3D",
"Curve2D",
"Vertices",
"Trims",
"Edges",
"Loops",
"Faces",
},
):
provenance: Optional[str] = None
bbox: Optional[Box] = None
area: Optional[float] = None
volume: Optional[float] = None
_displayValue: Optional[List[Mesh]] = None
Surfaces: Optional[List[Surface]] = None
Curve3D: Optional[List[Base]] = None
Curve2D: Optional[List[Base]] = None
Vertices: Optional[List[Point]] = None
Edges: Optional[List[BrepEdge]] = None
Loops: Optional[List[BrepLoop]] = None
Faces: Optional[List[BrepFace]] = None
Trims: Optional[List[BrepTrim]] = None
IsClosed: Optional[bool] = None
Orientation: Optional[int] = None
def _inject_self_into_children(self, children: Optional[List[Base]]) -> List[Base]:
if children is None:
return children
for child in children:
child._Brep = self # pylint: disable=protected-access
return children
# set as prop for now for backwards compatibility
@property
def displayValue(self) -> List[Mesh]:
return self._displayValue
@displayValue.setter
def displayValue(self, value):
if isinstance(value, Mesh):
self._displayValue = [value]
elif isinstance(value, list):
self._displayValue = value
@property
def EdgesValue(self) -> List[BrepEdge]:
return None if self.Edges is None else ObjectArray.from_objects(self.Edges).data
@EdgesValue.setter
def EdgesValue(self, value: List[float]):
if not value:
return
self.Edges = ObjectArray.decode_data(value, BrepEdge.from_list, brep=self)
@property
def LoopsValue(self) -> List[BrepLoop]:
return None if self.Loops is None else ObjectArray.from_objects(self.Loops).data
@LoopsValue.setter
def LoopsValue(self, value: List[int]):
if not value:
return
self.Loops = ObjectArray.decode_data(value, BrepLoop.from_list, brep=self)
@property
def FacesValue(self) -> List[int]:
return None if self.Faces is None else ObjectArray.from_objects(self.Faces).data
@FacesValue.setter
def FacesValue(self, value: List[int]):
if not value:
return
self.Faces = ObjectArray.decode_data(value, BrepFace.from_list, brep=self)
@property
def SurfacesValue(self) -> List[float]:
return (
None
if self.Surfaces is None
else ObjectArray.from_objects(self.Surfaces).data
)
@SurfacesValue.setter
def SurfacesValue(self, value: List[float]):
if not value:
return
self.Surfaces = ObjectArray.decode_data(value, Surface.from_list)
@property
def Curve3DValues(self) -> List[float]:
return (
None if self.Curve3D is None else CurveArray.from_curves(self.Curve3D).data
)
@Curve3DValues.setter
def Curve3DValues(self, value: List[float]):
crv_array = CurveArray(value)
self.Curve3D = crv_array.to_curves()
@property
def Curve2DValues(self) -> List[Base]:
return (
None if self.Curve2D is None else CurveArray.from_curves(self.Curve2D).data
)
@Curve2DValues.setter
def Curve2DValues(self, value: List[float]):
crv_array = CurveArray(value)
self.Curve2D = crv_array.to_curves()
@property
def VerticesValue(self) -> List[Point]:
if self.Vertices is None:
return None
encoded_unit = get_encoding_from_units(self.Vertices[0].units)
values = [encoded_unit]
for vertex in self.Vertices:
values.extend(vertex.to_list())
return values
@VerticesValue.setter
def VerticesValue(self, value: List[float]):
value = value.copy()
units = get_units_from_encoding(value.pop(0))
vertices = []
for i in range(0, len(value), 3):
vertex = Point.from_list(value[i : i + 3])
vertex.units = units
vertices.append(vertex)
self.Vertices = vertices
# TODO: can this be consistent with loops, edges, faces, curves, etc and prepend with the chunk list? needs to happen in sharp first
@property
def TrimsValue(self) -> List[float]:
# return None if self.Trims is None else ObjectArray.from_objects(self.Trims).data
if not self.Trims:
return
value = []
for trim in self.Trims:
value.extend(trim.to_list())
return value
@TrimsValue.setter
def TrimsValue(self, value: List[float]):
if not value:
return
# self.Trims = ObjectArray.decode_data(value, BrepTrim.from_list, brep=self)
self.Trims = [
BrepTrim.from_list(value[i : i + 9], self) for i in range(0, len(value), 9)
]
BrepEdge.update_forward_refs()
BrepLoop.update_forward_refs()
BrepTrim.update_forward_refs()
BrepFace.update_forward_refs()
-25
View File
@@ -1,25 +0,0 @@
from typing import Any, List
from specklepy.objects.base import Base
NAMESPACE = "Objects.Primitive"
class Interval(Base, speckle_type=f"{NAMESPACE}.Interval"):
start: float = 0.0
end: float = 0.0
def length(self):
return abs(self.start - self.end)
@classmethod
def from_list(cls, args: List[Any]) -> "Interval":
return cls(start=args[0], end=args[1])
def to_list(self) -> List[Any]:
return [self.start, self.end]
class Interval2d(Base, speckle_type=f"{NAMESPACE}.Interval2d"):
u: Interval
v: Interval
-124
View File
@@ -1,124 +0,0 @@
from enum import Enum
from typing import Union
from specklepy.logging.exceptions import SpeckleException, SpeckleInvalidUnitException
__all__ = [
"Units",
"get_encoding_from_units",
"get_units_from_encoding",
"get_units_from_string",
]
class Units(Enum):
mm = "mm"
cm = "cm"
m = "m"
km = "km"
inches = "in"
feet = "ft"
yards = "yd"
miles = "mi"
none = "none"
UNITS_STRINGS = {
Units.mm: ["mm", "mil", "millimeters", "millimetres"],
Units.cm: ["cm", "centimetre", "centimeter", "centimetres", "centimeters"],
Units.m: ["m", "meter", "meters", "metre", "metres"],
Units.km: ["km", "kilometer", "kilometre", "kilometers", "kilometres"],
Units.inches: ["in", "inch", "inches"],
Units.feet: ["ft", "foot", "feet"],
Units.yards: ["yd", "yard", "yards"],
Units.miles: ["mi", "mile", "miles"],
Units.none: ["none", "null"],
}
UNITS_ENCODINGS = {
Units.none: 0,
None: 0,
Units.mm: 1,
Units.cm: 2,
Units.m: 3,
Units.km: 4,
Units.inches: 5,
Units.feet: 6,
Units.yards: 7,
Units.miles: 8,
}
UNIT_SCALE = {
Units.none: 1,
Units.mm: 0.001,
Units.cm: 0.01,
Units.m: 1.0,
Units.km: 1000.0,
Units.inches: 0.0254,
Units.feet: 0.3048,
Units.yards: 0.9144,
Units.miles: 1609.340,
}
"""Unit scaling factor to meters"""
def get_units_from_string(unit: str) -> Units:
if not isinstance(unit, str):
raise SpeckleInvalidUnitException(unit)
unit = str.lower(unit)
for name, alternates in UNITS_STRINGS.items():
if unit in alternates:
return name
raise SpeckleInvalidUnitException(unit)
def get_units_from_encoding(unit: int) -> Units:
for name, encoding in UNITS_ENCODINGS.items():
if unit == encoding:
return name or Units.none
raise SpeckleException(
message=(
f"Could not understand what unit {unit} is referring to."
f"Please enter a valid unit encoding (eg {UNITS_ENCODINGS})."
)
)
def get_encoding_from_units(unit: Union[Units, str, None]):
maybe_sanitized_unit = unit
if isinstance(unit, str):
for unit_enum, aliases in UNITS_STRINGS.items():
if unit in aliases:
maybe_sanitized_unit = unit_enum
try:
return UNITS_ENCODINGS[maybe_sanitized_unit]
except KeyError as e:
raise SpeckleException(
message=(
f"No encoding exists for unit {maybe_sanitized_unit}."
f"Please enter a valid unit to encode (eg {UNITS_ENCODINGS})."
)
) from e
def get_scale_factor_from_string(fromUnits: str, toUnits: str) -> float:
"""Returns a scalar to convert distance values from one unit system to another"""
return get_scale_factor(
get_units_from_string(fromUnits), get_units_from_string(toUnits)
)
def get_scale_factor(fromUnits: Units, toUnits: Units) -> float:
"""Returns a scalar to convert distance values from one unit system to another"""
return get_scale_factor_to_meters(fromUnits) / get_scale_factor_to_meters(toUnits)
def get_scale_factor_to_meters(fromUnits: Units) -> float:
"""Returns a scalar to convert distance values from one unit system to meters"""
if fromUnits not in UNIT_SCALE:
raise ValueError(f"Invalid units provided: {fromUnits}")
return UNIT_SCALE[fromUnits]
@@ -342,11 +342,7 @@ class BaseObjectSerializer:
object_type = Base.get_registered_type(speckle_type)
# initialise the base object using `speckle_type` fall back to base if needed
base = (
object_type.__new__(object_type)
if object_type
else Base.of_type(speckle_type=speckle_type)
)
base = object_type() if object_type else Base.of_type(speckle_type=speckle_type)
# get total children count
if "__closure" in obj:
if not self.read_transport:
+6 -1
View File
@@ -86,6 +86,12 @@ class ServerTransport(AbstractTransport):
self.session = requests.Session()
self.session.headers.update(
{
"Accept": "text/plain",
}
)
if self.account is not None:
self._batch_sender = BatchSender(
self.url, self.stream_id, self.account.token, max_batch_size_mb=1
@@ -93,7 +99,6 @@ class ServerTransport(AbstractTransport):
self.session.headers.update(
{
"Authorization": f"Bearer {self.account.token}",
"Accept": "text/plain",
}
)