Compare commits

...

3 Commits

Author SHA1 Message Date
Jedd Morgan 2f4c403229 Added speckleifc 2025-07-10 19:20:13 +01:00
Jedd Morgan f5e024c8ce perf(serializer): Avoid unnecessary serialization of detached objects (#431)
* Avoid unnecessary serialization of detached objects

* camel case variable namings
2025-06-16 16:24:41 +01:00
Dogukan Karatas 3bcdf723b0 feat (api): projects with permissions (#430)
Publish Python Package / test (push) Has been cancelled
Publish Python Package / Build and Publish Python Package (push) Has been cancelled
* adds project with permissions

* removes the project resource with permissions

* fix the tests
2025-06-06 16:07:48 +02:00
25 changed files with 2273 additions and 1056 deletions
+5
View File
@@ -18,6 +18,11 @@ dependencies = [
"ujson>=5.10.0",
]
[project.optional-dependencies]
speckleifc = [
"ifcopenshell>=0.8.2",
]
[dependency-groups]
dev = [
"commitizen>=4.1.0",
View File
+126
View File
@@ -0,0 +1,126 @@
import json
import time
import traceback
from argparse import ArgumentParser
from os import getenv
from speckleifc.ifc_geometry_processing import open_ifc
from speckleifc.importer import ImportJob
from specklepy.core.api.client import SpeckleClient
from specklepy.core.api.credentials import Account, get_accounts_for_server
from specklepy.core.api.inputs.version_inputs import CreateVersionInput
from specklepy.core.api.models.current import Version
from specklepy.core.api.operations import send
from specklepy.transports.server import ServerTransport
def cmd_line_import() -> None:
parser = ArgumentParser(
prog="speckleifc",
description="imports a file",
)
parser.add_argument("file_path")
parser.add_argument("output_path")
parser.add_argument("project_id")
parser.add_argument("version_message")
parser.add_argument("model_id")
# parser.add_argument("model_name")
# parser.add_argument("region_name")
args = parser.parse_args()
TOKEN = getenv("USER_TOKEN")
assert TOKEN is not None
SERVER_URL = getenv("SPECKLE_SERVER_URL") or "http://127.0.0.1:3000"
account = Account.from_token(TOKEN, SERVER_URL)
try:
version = open_and_convert_file(
args.file_path,
args.project_id,
args.version_message,
args.model_id,
account,
)
with open(args.output_path, "w") as f:
json.dump({"success": True, "commitId": version.id}, f)
except Exception as e:
error_msg = f"IFC Importer failed with exception:\n{traceback.format_exc()}"
print(error_msg)
# Write error result
with open(args.output_path, "w") as f:
json.dump({"success": False, "error": str(e)}, f)
def manual_import() -> None:
PROJECT_ID = "f3a42bdf24"
MODEL_ID = "0e23cfdea3"
SERVER_URL = "app.speckle.systems"
# FILE_PATH = "C:\\Users\\Jedd\\Desktop\\ifc\\60mins.ifc"
# FILE_PATH = "C:\\Users\\Jedd\\Desktop\\ifc\\hillside_house_meters.ifc"
# FILE_PATH = "C:\\Users\\Jedd\\Desktop\\ifc\\GRAPHISOFT_Archicad_Sample_Project-S-Office_v1.0_AC25.ifc" # noqa: E501
# FILE_PATH = "C:\\Users\\Jedd\\Desktop\\ifc\\GRAPHISOFT_Archicad_Sample_Project-S-Office_v1.0_AC25.ifc" # noqa: E501
# FILE_PATH = "C:\\Users\\Jedd\\Desktop\\ifc\\OSS-MJL-B1-ZZ-M3-ME-00002.ifc[P26].ifc" # noqa: E501
FILE_PATH = "C:\\Users\\Jedd\\Desktop\\ifc\\AC20-FZK-Haus.ifc" # noqa: E501
# FILE_PATH = "C:\\Users\\Jedd\\Desktop\\ifc\\ENZ-TPL-27-02-Zones.ifc" # noqa: E501
# FILE_PATH = (
# "C:\\Users\\Jedd\\Desktop\\ifc\\22-329-X-CVP-XX-XX-M3-A-3001-G-1 Bedroom.ifc" # noqa: E501
# )
account = get_accounts_for_server(SERVER_URL)[0]
open_and_convert_file(FILE_PATH, PROJECT_ID, None, MODEL_ID, account)
def open_and_convert_file(
file_path: str,
project_id: str,
version_message: str | None,
model_id: str,
account: Account,
) -> Version:
start = time.time()
very_start = start
ifc_file = open_ifc(file_path)
import_job = ImportJob(ifc_file)
data = import_job.convert()
print(f"File conversion complete after {(time.time() - start) * 1000}ms")
start = time.time()
remote_transport = ServerTransport(project_id, account=account)
root_id = send(data, transports=[remote_transport], use_default_cache=False)
print(f"Sending to speckle complete after: {(time.time() - start) * 1000}ms")
start = time.time()
server_url = account.serverInfo.url
assert server_url
client = SpeckleClient(host=server_url, use_ssl=server_url.startswith("https"))
client.authenticate_with_account(account)
create_version = CreateVersionInput(
object_id=root_id,
model_id=model_id,
project_id=project_id,
message=version_message,
source_application="IFC",
)
version = client.version.create(create_version)
end = time.time()
print(f"Version committed after: {(end - start) * 1000}ms")
print(f"Total time (to commit): {(end - very_start) * 1000}ms")
del ifc_file
return version
if __name__ == "__main__":
start = time.time()
# cmd_line_import()
manual_import()
print(f"Total time (including cleanup): {(time.time() - start) * 1000}ms")
@@ -0,0 +1,28 @@
from typing import cast
from ifcopenshell.entity_instance import entity_instance
from speckleifc.property_extraction import extract_properties
from specklepy.objects.base import Base
from specklepy.objects.data_objects import DataObject
def data_object_to_speckle(
display_value: list[Base],
step_element: entity_instance,
children: list[Base],
) -> DataObject:
guid = cast(str, step_element.GlobalId)
name = cast(str, step_element.Name or guid)
data_object = DataObject(
applicationId=guid,
properties=extract_properties(step_element),
name=name or guid,
displayValue=display_value,
)
data_object["@elements"] = children
data_object["ifcType"] = step_element.is_a()
return data_object
@@ -0,0 +1,130 @@
from collections import defaultdict
from collections.abc import Sequence
from typing import cast
from ifcopenshell.ifcopenshell_wrapper import (
Triangulation,
TriangulationElement,
colour,
style,
)
from speckleifc.render_material_proxy_manager import RenderMaterialProxyManager
from specklepy.objects.base import Base
from specklepy.objects.geometry import Mesh
from specklepy.objects.other import RenderMaterial
def geometry_to_speckle(
shape: TriangulationElement, render_material_manager: RenderMaterialProxyManager
) -> list[Base]:
geometry = cast(Triangulation, shape.geometry)
materials = cast(Sequence[style], geometry.materials)
MESH_COUNT = max(len(materials), 1)
material_ids = cast(Sequence[int], geometry.material_ids)
faces = cast(Sequence[int], geometry.faces)
verts = cast(Sequence[float], geometry.verts)
normals = cast(Sequence[float], geometry.normals)
FACE_COUNT = len(material_ids)
if len(faces) != FACE_COUNT * 3:
# Not really expected, but occasionally some meshes fail to triangulate
return []
mapped_meshes = _pre_alloc_mesh_lists(shape, material_ids, MESH_COUNT)
for i, mesh in enumerate(mapped_meshes):
material = _material_to_speckle(materials[i])
render_material_manager.add_mesh_material_mapping(material, mesh)
mapped_faces_pointers = [0] * MESH_COUNT
mapped_vertices_pointers = [0] * MESH_COUNT
mapped_index_counters = [0] * MESH_COUNT
i = 0
face_index = 0
while i < FACE_COUNT:
mesh_index = material_ids[i]
mesh: Mesh = mapped_meshes[mesh_index]
face_ptr = mapped_faces_pointers[mesh_index]
vert_ptr = mapped_vertices_pointers[mesh_index]
# Add triangle
mesh.faces[face_ptr] = 3
for j in range(3):
# Add vert
mesh.faces[face_ptr + 1 + j] = mapped_index_counters[mesh_index] + j
vert_index = faces[face_index + j] * 3
mapped_vert_offset = vert_ptr + (j * 3)
mesh.vertices[mapped_vert_offset] = verts[vert_index]
mesh.vertices[mapped_vert_offset + 1] = verts[vert_index + 1]
mesh.vertices[mapped_vert_offset + 2] = verts[vert_index + 2]
mesh.vertexNormals[mapped_vert_offset] = normals[vert_index]
mesh.vertexNormals[mapped_vert_offset + 1] = normals[vert_index + 1]
mesh.vertexNormals[mapped_vert_offset + 2] = normals[vert_index + 2]
i += 1
face_index += 3 # number of items in the faces list we just jumped over
mapped_index_counters[mesh_index] += (
3 # number of verts we just added to the mesh.vertices i.e. the next index
)
mapped_faces_pointers[mesh_index] += (
4 # number of item's we've just added to the mesh.faces list
)
mapped_vertices_pointers[mesh_index] += (
9 # number of item's we've just added to the mesh.vertices list
)
return mapped_meshes # type: ignore
def _material_to_speckle(material: style) -> RenderMaterial:
return RenderMaterial(
applicationId=material.calc_hash(),
name=material.name,
diffuse=_color_to_argb(material.diffuse),
opacity=1 - material.transparency if material.has_transparency() else 1,
)
def _color_to_argb(colour: colour) -> int:
# Clamp values to [0, 1] and convert to 0255
a_int = 255
r_int = max(0, min(255, int(round(colour.r() * 255))))
g_int = max(0, min(255, int(round(colour.g() * 255))))
b_int = max(0, min(255, int(round(colour.b() * 255))))
return (a_int << 24) | (r_int << 16) | (g_int << 8) | b_int
def _pre_alloc_mesh_lists(
shape: TriangulationElement, material_ids: Sequence[int], MESH_COUNT: int
) -> list[Mesh]:
"""
This is a performance optimisation to pre-size the lists
since we're expecting potential hundreds of thousands of verts in a single model
This is very much in the hot path, so worth the extra bit of convoluted logic
"""
appId = cast(str, shape.guid)
material_face_counts = defaultdict(int)
for mat_id in material_ids:
material_face_counts[mat_id] += 1
meshes = []
for mat_id in range(MESH_COUNT):
face_count = material_face_counts.get(mat_id, 0)
mesh = Mesh(
units="m",
vertices=[-1] * (face_count * 9),
vertexNormals=[-1] * (face_count * 9),
faces=[-1] * (face_count * 4), # 1 marker + 3 vertex indices
applicationId=f"{appId}_mat{mat_id}",
)
meshes.append(mesh)
return meshes
@@ -0,0 +1,24 @@
from typing import cast
from ifcopenshell.entity_instance import entity_instance
from specklepy.objects.base import Base
from specklepy.objects.models.collections.collection import Collection
def project_to_speckle(
step_element: entity_instance, children: list[Base]
) -> Collection:
guid = cast(str, step_element.GlobalId)
name = cast(str, step_element.Name or step_element.LongName or guid)
project = Collection(applicationId=guid, name=name, elements=children)
project["ifcType"] = step_element.is_a()
project["description"] = step_element.Description
project["objectType"] = step_element.ObjectType
project["longName"] = step_element.LongName
project["phase"] = step_element.Phase
return project
return project
@@ -0,0 +1,42 @@
from typing import cast
from ifcopenshell.entity_instance import entity_instance
from speckleifc.property_extraction import extract_properties
from specklepy.objects.base import Base
from specklepy.objects.data_objects import DataObject
from specklepy.objects.models.collections.collection import Collection
def spatial_element_to_speckle(
display_value: list[Base],
step_element: entity_instance,
relational_children: list[Base],
) -> Collection:
direct_geometry = _convert_as_data_object(display_value, step_element)
all_children = [direct_geometry] + relational_children
guid = cast(str, step_element.GlobalId)
name = cast(str, step_element.Name or step_element.LongName or guid)
data_object = Collection(applicationId=guid, name=name, elements=all_children)
data_object["ifcType"] = step_element.is_a()
return data_object
def _convert_as_data_object(
display_value: list[Base], step_element: entity_instance
) -> DataObject:
guid = cast(str, step_element.GlobalId)
name = cast(str, step_element.Name or step_element.LongName or guid)
data_object = DataObject(
applicationId=guid,
properties=extract_properties(step_element),
name=name,
displayValue=display_value,
)
data_object["ifcType"] = step_element.is_a()
return data_object
+44
View File
@@ -0,0 +1,44 @@
import multiprocessing
from ifcopenshell import file, ifcopenshell_wrapper, open, sqlite
from ifcopenshell.geom import iterator, settings
from specklepy.logging.exceptions import SpeckleException
def _create_iterator_settings() -> settings:
ifc_settings = settings()
# triangles for now, speckle does support n-gons, but may be less performant
ifc_settings.set("triangulation-type", ifcopenshell_wrapper.TRIANGLE_MESH)
# no need to weld verts
ifc_settings.set("weld-vertices", False)
# Speckle meshes are all in world coords
ifc_settings.set("use-world-coords", True)
# Tiny performance improvement,
ifc_settings.set("no-wire-intersection-check", True)
# IfcOpenshell defaults to 0.001mm here, which leads to very dense meshes.
# lowering the mesh quality a bit here leads to meshes
# that are still much higher quality than webifc
# We still need to experiment with the affect on memory usage
# It may be desirable to lower this further, and increase the angular deflection
# to compensate. This would allow large meshes to be lower quality,
# while keeping small meshes relatively similar.
ifc_settings.set("mesher-linear-deflection", 0.2)
return ifc_settings
def open_ifc(file_path: str) -> file:
ifc_file = open(file_path)
if isinstance(ifc_file, file):
return ifc_file
else:
raise SpeckleException(f"file at {file_path} is not a compatible ifc file type")
def create_geometry_iterator(ifc_file: file | sqlite) -> iterator:
return iterator(_create_iterator_settings(), ifc_file, multiprocessing.cpu_count())
return iterator(_create_iterator_settings(), ifc_file, multiprocessing.cpu_count())
+31
View File
@@ -0,0 +1,31 @@
from collections.abc import Generator, Iterable
from itertools import chain
from typing import cast
from ifcopenshell.entity_instance import entity_instance
def get_children(step_element: entity_instance) -> Generator[entity_instance]:
yield from chain(
get_spatial_children(step_element), get_aggregate_children(step_element)
)
def get_spatial_children(step_element: entity_instance) -> Generator[entity_instance]:
spatial_relations = cast(
Iterable[entity_instance] | None,
getattr(step_element, "ContainsElements", None),
)
if spatial_relations is not None:
for relation in spatial_relations:
yield from cast(Iterable[entity_instance], relation.RelatedElements)
def get_aggregate_children(step_element: entity_instance) -> Generator[entity_instance]:
aggregate_relations = cast(
Iterable[entity_instance] | None,
getattr(step_element, "IsDecomposedBy", None),
)
if aggregate_relations is not None:
for relation in aggregate_relations:
yield from cast(Iterable[entity_instance], relation.RelatedObjects)
+101
View File
@@ -0,0 +1,101 @@
import time
from typing import cast
from ifcopenshell.entity_instance import entity_instance
from ifcopenshell.geom import file
from ifcopenshell.ifcopenshell_wrapper import TriangulationElement
from speckleifc.converter.data_object_converter import data_object_to_speckle
from speckleifc.converter.geometry_converter import geometry_to_speckle
from speckleifc.converter.project_converter import project_to_speckle
from speckleifc.converter.spatial_element_converter import spatial_element_to_speckle
from speckleifc.ifc_geometry_processing import create_geometry_iterator
from speckleifc.ifc_openshell_helpers import get_children
from speckleifc.render_material_proxy_manager import RenderMaterialProxyManager
from specklepy.logging.exceptions import SpeckleException
from specklepy.objects import Base
class ImportJob:
def __init__(self, ifc_file: file):
self._ifc_file = ifc_file
self.cached_display_values: dict[int, list[Base]] = {}
self._render_material_manager = RenderMaterialProxyManager()
self.geometries_count = 0
self.geometries_used = 0
def convert_element(self, step_element: entity_instance) -> Base:
children = self._convert_children(step_element)
display_value = self.cached_display_values.get(step_element.id(), [])
if display_value is not None:
self.geometries_used += 1
if step_element.is_a("IfcProject"):
return project_to_speckle(step_element, children)
elif step_element.is_a("IfcSpatialStructureElement"):
return spatial_element_to_speckle(display_value, step_element, children)
else:
return data_object_to_speckle(display_value, step_element, children)
def _convert_children(self, step_element: entity_instance) -> list[Base]:
return [
self.convert_element(i)
for i in get_children(step_element)
if self._should_convert(i)
]
@staticmethod
def _should_convert(step_element: entity_instance) -> bool:
# We only consider IfcRoot objects convertible
# This is the super class for root level entities that have a GUID...
# This will ignore some types like IfcGridAxis
s = step_element.is_a("IfcRoot")
if not s:
print(
f"Skipping #{step_element.id()} because it's type ({step_element.is_a()}) it not an IfcRoot" # noqa: E501
)
return s
def convert(self) -> Base:
start = time.time()
self.pre_process_geometry()
print(f"Geometry conversion complete after {(time.time() - start) * 1000}ms")
print(f"Created {self.geometries_count} geometries")
start = time.time()
root = self._convert_project_tree()
print(f"Object tree conversion complete after {(time.time() - start) * 1000}ms")
print(f"Used {self.geometries_used} geometries")
return root
def pre_process_geometry(self) -> None:
iterator = create_geometry_iterator(self._ifc_file)
if not iterator.initialize():
raise SpeckleException(
"geometry iterator failed to initialize for the given file"
)
self.geometries_count = 0
while True:
shape = cast(TriangulationElement, iterator.get())
self.geometries_count += 1
id = cast(int, shape.id)
display_value = geometry_to_speckle(shape, self._render_material_manager)
self.cached_display_values[id] = display_value
if not iterator.next():
break
def _convert_project_tree(self) -> Base:
projects = self._ifc_file.by_type("IfcProject", False)
if len(projects) != 1:
raise SpeckleException("Expected exactly one IfcProject in file")
project = projects[0]
tree = self.convert_element(project)
tree["renderMaterialProxies"] = list(
self._render_material_manager.render_material_proxies.values()
)
return tree
+87
View File
@@ -0,0 +1,87 @@
from typing import Any
from ifcopenshell.entity_instance import entity_instance
from ifcopenshell.util.element import get_type
def extract_properties(element: entity_instance) -> dict[str, object]:
properties: dict[str, object] = {
"Attributes": get_attributes(element),
"Property Sets": _get_ifc_object_properties(element),
}
if (ifc_type := get_type(element)) is not None:
properties["Element Type Property Sets"] = _get_ifc_element_type_properties(
ifc_type,
)
return properties
def get_attributes(element: entity_instance) -> dict[str, object]:
return element.get_info(True, False, scalar_only=True)
def _get_ifc_element_type_properties(element: entity_instance) -> dict[str, object]:
result: dict[str, object] = {}
for definition in element.HasPropertySets or []:
if not definition.is_a("IfcPropertySet"):
continue
result[definition.Name] = _get_properties(definition.HasProperties)
return result
def _get_ifc_object_properties(element: entity_instance) -> dict[str, object]:
result: dict[str, object] = {}
for rel in getattr(element, "IsDefinedBy", []):
if not rel.is_a("IfcRelDefinesByProperties"):
continue
definition: entity_instance = rel.RelatingPropertyDefinition
if not definition.is_a("IfcPropertySet"):
continue
set_name = definition.Name
properties = _get_properties(definition.HasProperties)
if properties:
result[set_name] = properties
return result
def _get_properties(properties: entity_instance) -> dict[str, Any]:
"""
There already exists a canonical way to get properties
`ifcopenshell.util.element.get_properties` but it's very verbose
and we don't want to bloat our selves with supporting complex property types
This is a slimmed down version, only supporting a couple of property types
"""
result: dict[str, Any] = {}
for prop in properties:
name = prop.Name
if prop.is_a("IfcPropertySingleValue"):
val = prop.NominalValue
if val is not None:
result[name] = val.wrappedValue if hasattr(val, "wrappedValue") else val
elif prop.is_a("IfcPropertyListValue"):
values = getattr(prop, "ListValues", None)
if values:
result[name] = [
v.wrappedValue if hasattr(v, "wrappedValue") else v for v in values
]
elif prop.is_a("IfcPropertyEnumeratedValue"):
values = getattr(prop, "EnumerationValues", None)
if values:
result[name] = [
v.wrappedValue if hasattr(v, "wrappedValue") else v for v in values
]
# elif prop.is_a("IfcPropertyTableValue"):
# properties[name] = #not sure if we want to support these...
return result
@@ -0,0 +1,28 @@
from specklepy.objects.geometry import Mesh
from specklepy.objects.other import RenderMaterial
from specklepy.objects.proxies import RenderMaterialProxy
class RenderMaterialProxyManager:
def __init__(self):
self._render_material_proxies: dict[str, RenderMaterialProxy] = {}
@property
def render_material_proxies(self):
return self._render_material_proxies
def add_mesh_material_mapping(
self, render_material: RenderMaterial, mesh: Mesh
) -> None:
material_id = render_material.applicationId
assert material_id is not None
mesh_id = mesh.applicationId
assert mesh_id is not None
proxy = self._render_material_proxies.get(material_id, None)
if proxy is not None:
proxy.objects.append(mesh_id)
else:
self._render_material_proxies[material_id] = RenderMaterialProxy(
objects=[mesh_id], value=render_material
)
@@ -11,7 +11,11 @@ from specklepy.core.api.models import (
ResourceCollection,
User,
)
from specklepy.core.api.models.current import PermissionCheckResult, Workspace
from specklepy.core.api.models.current import (
PermissionCheckResult,
ProjectWithPermissions,
Workspace,
)
from specklepy.core.api.resources import ActiveUserResource as CoreResource
from specklepy.logging import metrics
@@ -51,6 +55,22 @@ class ActiveUserResource(CoreResource):
metrics.track(metrics.SDK, self.account, {"name": "Active User Get Projects"})
return super().get_projects(limit=limit, cursor=cursor, filter=filter)
def get_projects_with_permissions(
self,
*,
limit: int = 25,
cursor: Optional[str] = None,
filter: Optional[UserProjectsFilter] = None,
) -> ResourceCollection[ProjectWithPermissions]:
metrics.track(
metrics.SDK,
self.account,
{"name": "Active User Get Projects With Permissions"},
)
return super().get_projects_with_permissions(
limit=limit, cursor=cursor, filter=filter
)
def get_project_invites(self) -> List[PendingStreamCollaborator]:
metrics.track(
metrics.SDK, self.account, {"name": "Active User Get Project Invites"}
@@ -7,7 +7,11 @@ from specklepy.core.api.inputs.project_inputs import (
ProjectUpdateRoleInput,
WorkspaceProjectCreateInput,
)
from specklepy.core.api.models import Project, ProjectWithModels, ProjectWithTeam
from specklepy.core.api.models import (
Project,
ProjectWithModels,
ProjectWithTeam,
)
from specklepy.core.api.models.current import ProjectPermissionChecks
from specklepy.core.api.resources import ProjectResource as CoreResource
from specklepy.logging import metrics
@@ -1,7 +1,12 @@
from typing import Optional
from specklepy.core.api.inputs.project_inputs import WorksaceProjectsFilter
from specklepy.core.api.models.current import Project, ResourceCollection, Workspace
from specklepy.core.api.models.current import (
Project,
ProjectWithPermissions,
ResourceCollection,
Workspace,
)
from specklepy.core.api.resources import WorkspaceResource as CoreResource
from specklepy.logging import metrics
@@ -30,3 +35,19 @@ class WorkspaceResource(CoreResource):
) -> ResourceCollection[Project]:
metrics.track(metrics.SDK, self.account, {"name": "Workspace Get Projects"})
return super().get_projects(workspace_id, limit, cursor, filter)
def get_projects_with_permissions(
self,
workspace_id: str,
limit: int = 25,
cursor: Optional[str] = None,
filter: Optional[WorksaceProjectsFilter] = None,
) -> ResourceCollection[ProjectWithPermissions]:
metrics.track(
metrics.SDK,
self.account,
{"name": "Workspace Get Projects With Permissions"},
)
return super().get_projects_with_permissions(
workspace_id, limit, cursor, filter
)
@@ -8,6 +8,7 @@ from specklepy.core.api.models.current import (
ProjectCollaborator,
ProjectCommentCollection,
ProjectWithModels,
ProjectWithPermissions,
ProjectWithTeam,
ResourceCollection,
ServerConfiguration,
@@ -39,6 +40,7 @@ __all__ = [
"ModelWithVersions",
"Project",
"ProjectWithModels",
"ProjectWithPermissions",
"ProjectWithTeam",
"ProjectCommentCollection",
"UserSearchResultCollection",
+4
View File
@@ -176,6 +176,10 @@ class ProjectWithModels(Project):
models: ResourceCollection[Model]
class ProjectWithPermissions(Project):
permissions: ProjectPermissionChecks
class ProjectWithTeam(Project):
invited_team: List[PendingStreamCollaborator]
team: List[ProjectCollaborator]
@@ -13,7 +13,11 @@ from specklepy.core.api.models import (
ResourceCollection,
User,
)
from specklepy.core.api.models.current import PermissionCheckResult, Workspace
from specklepy.core.api.models.current import (
PermissionCheckResult,
ProjectWithPermissions,
Workspace,
)
from specklepy.core.api.resource import ResourceBase
from specklepy.core.api.responses import DataResponse
from specklepy.logging.exceptions import GraphQLException
@@ -338,3 +342,84 @@ class ActiveUserResource(ResourceBase):
)
return response.data.data
def get_projects_with_permissions(
self,
*,
limit: int = 25,
cursor: Optional[str] = None,
filter: Optional[UserProjectsFilter] = None,
) -> ResourceCollection[ProjectWithPermissions]:
"""
Gets the currently active user's projects with their permissions.
This is useful for checking what actions can be performed on each project.
"""
QUERY = gql(
"""
query User($limit : Int!, $cursor: String, $filter: UserProjectsFilter) {
data:activeUser {
data:projects(limit: $limit, cursor: $cursor, filter: $filter) {
totalCount
cursor
items {
id
name
description
visibility
allowPublicComments
role
createdAt
updatedAt
sourceApps
workspaceId
permissions {
canCreateModel {
code
authorized
message
}
canDelete {
code
authorized
message
}
canLoad {
code
authorized
message
}
canPublish {
code
authorized
message
}
}
}
}
}
}
"""
)
variables = {
"limit": limit,
"cursor": cursor,
"filter": filter.model_dump(warnings="error", by_alias=True)
if filter
else None,
}
response = self.make_request_and_parse_response(
DataResponse[
Optional[DataResponse[ResourceCollection[ProjectWithPermissions]]]
],
QUERY,
variables,
)
if response.data is None:
raise GraphQLException(
"GraphQL response indicated that the ActiveUser could not be found"
)
return response.data.data
@@ -9,7 +9,11 @@ from specklepy.core.api.inputs.project_inputs import (
ProjectUpdateRoleInput,
WorkspaceProjectCreateInput,
)
from specklepy.core.api.models import Project, ProjectWithModels, ProjectWithTeam
from specklepy.core.api.models import (
Project,
ProjectWithModels,
ProjectWithTeam,
)
from specklepy.core.api.models.current import ProjectPermissionChecks
from specklepy.core.api.resource import ResourceBase
from specklepy.core.api.responses import DataResponse
@@ -3,7 +3,12 @@ from typing import Optional
from gql import gql
from specklepy.core.api.inputs.project_inputs import WorksaceProjectsFilter
from specklepy.core.api.models.current import Project, ResourceCollection, Workspace
from specklepy.core.api.models.current import (
Project,
ProjectWithPermissions,
ResourceCollection,
Workspace,
)
from specklepy.core.api.resource import ResourceBase
from specklepy.core.api.responses import DataResponse
@@ -104,3 +109,72 @@ class WorkspaceResource(ResourceBase):
return self.make_request_and_parse_response(
DataResponse[DataResponse[ResourceCollection[Project]]], QUERY, variables
).data.data
def get_projects_with_permissions(
self,
workspace_id: str,
limit: int = 25,
cursor: Optional[str] = None,
filter: Optional[WorksaceProjectsFilter] = None,
) -> ResourceCollection[ProjectWithPermissions]:
QUERY = gql(
"""
query Workspace($workspaceId: String!, $limit: Int!, $cursor: String, $filter: WorkspaceProjectsFilter) {
data:workspace(id: $workspaceId) {
data:projects(limit: $limit, cursor: $cursor, filter: $filter) {
cursor
items {
allowPublicComments
createdAt
description
id
name
role
sourceApps
updatedAt
visibility
workspaceId
permissions {
canCreateModel {
code
authorized
message
}
canDelete {
code
authorized
message
}
canLoad {
code
authorized
message
}
canPublish {
code
authorized
message
}
}
}
totalCount
}
}
}
""" # noqa: E501
)
variables = {
"workspaceId": workspace_id,
"limit": limit,
"cursor": cursor,
"filter": filter.model_dump(warnings="error", by_alias=True)
if filter
else None,
}
return self.make_request_and_parse_response(
DataResponse[DataResponse[ResourceCollection[ProjectWithPermissions]]],
QUERY,
variables,
).data.data
@@ -199,8 +199,9 @@ class BaseObjectSerializer:
# write detached or root objects to transports
if detached and self.write_transports:
serialized_data = ujson.dumps(object_builder)
for t in self.write_transports:
t.save_object(id=obj_id, serialized_object=ujson.dumps(object_builder))
t.save_object(id=obj_id, serialized_object=serialized_data)
del self.lineage[-1]
@@ -0,0 +1,85 @@
import pytest
from specklepy.api.client import SpeckleClient
from specklepy.core.api.inputs.project_inputs import ProjectCreateInput
from specklepy.core.api.inputs.user_inputs import UserProjectsFilter
from specklepy.core.api.models.current import (
Project,
ProjectWithPermissions,
ResourceCollection,
)
@pytest.mark.run()
class TestActiveUserResourcePermissions:
@pytest.fixture()
def test_project(self, client: SpeckleClient) -> Project:
project = client.project.create(
ProjectCreateInput(
name="test project for active user permissions",
description="test description",
visibility=None,
)
)
return project
def test_active_user_get_projects_with_permissions(
self, client: SpeckleClient, test_project: Project
):
result = client.active_user.get_projects_with_permissions()
assert isinstance(result, ResourceCollection)
assert len(result.items) >= 1
test_project_with_permissions = None
for project in result.items:
if project.id == test_project.id:
test_project_with_permissions = project
break
assert test_project_with_permissions is not None
assert isinstance(test_project_with_permissions, ProjectWithPermissions)
assert hasattr(test_project_with_permissions, "permissions")
assert test_project_with_permissions.permissions is not None
assert test_project_with_permissions.id == test_project.id
assert test_project_with_permissions.name == test_project.name
permissions = test_project_with_permissions.permissions
assert hasattr(permissions, "can_create_model")
assert hasattr(permissions, "can_delete")
assert hasattr(permissions, "can_load")
assert hasattr(permissions, "can_publish")
assert permissions.can_create_model.authorized is True
assert permissions.can_delete.authorized is True
assert permissions.can_load.authorized is True
assert permissions.can_publish.authorized is True
def test_active_user_get_projects_with_permissions_with_filter(
self, client: SpeckleClient, test_project: Project
):
"""test getting active user's projects with permissions using a filter."""
filter = UserProjectsFilter(search=test_project.name)
result = client.active_user.get_projects_with_permissions(filter=filter)
assert isinstance(result, ResourceCollection)
assert len(result.items) >= 1
assert result.total_count >= 1
project_with_permissions = result.items[0]
assert isinstance(project_with_permissions, ProjectWithPermissions)
assert project_with_permissions.id == test_project.id
assert hasattr(project_with_permissions, "permissions")
assert project_with_permissions.permissions is not None
def test_active_user_projects_with_permissions_method_exists(
self, client: SpeckleClient
):
"""test that the method exists and is callable on active user resource."""
assert hasattr(client.active_user, "get_projects_with_permissions")
method = client.active_user.get_projects_with_permissions
assert callable(method)
@@ -0,0 +1,19 @@
import pytest
from specklepy.api.client import SpeckleClient
from specklepy.logging.exceptions import GraphQLException
@pytest.mark.run()
class TestWorkspaceResourcePermissions:
def test_get_projects_with_permissions(self, client: SpeckleClient):
with pytest.raises(GraphQLException):
client.workspace.get_projects_with_permissions("not a real id")
def test_get_projects_with_permissions_method_exists(self, client: SpeckleClient):
"""
test that the method exists with the correct signature.
"""
assert hasattr(client.workspace, "get_projects_with_permissions")
method = client.workspace.get_projects_with_permissions
assert callable(method)
Generated
+1301 -1049
View File
File diff suppressed because it is too large Load Diff