Compare commits

..

4 Commits

Author SHA1 Message Date
Jonathon Broughton e99efe8105 Add workspaceId to test automation run and schema 2026-04-07 22:03:13 +01:00
Jonathon Broughton 30de4d207a Handle generic exceptions in AutomationContext project retrieval 2026-04-07 22:03:12 +01:00
Jonathon Broughton 69829266a4 extend AutomationRunData model to include workspace_id 2026-04-07 21:59:18 +01:00
Jonathon Broughton ca1b8c52ed add workspace_id property and resolution logic in AutomationContext 2026-04-07 21:59:17 +01:00
15 changed files with 74 additions and 392 deletions
@@ -97,6 +97,33 @@ class AutomationContext:
"""Get the current status message."""
return self._automation_result.status_message
@property
def workspace_id(self) -> Optional[str]:
"""Get the workspace id for the current automation run, if available."""
return self.automation_run_data.workspace_id
def resolve_workspace_id(self) -> Optional[str]:
"""Return workspace id from run data or project lookup fallback."""
workspace_id = self.workspace_id
if workspace_id and workspace_id.strip():
return workspace_id.strip()
project_id = self.automation_run_data.project_id
if not project_id:
return None
try:
project = self.speckle_client.project.get(project_id)
except Exception:
return None
workspace_id = getattr(project, "workspace_id", None)
if isinstance(workspace_id, str) and workspace_id.strip():
resolved_workspace_id = workspace_id.strip()
self.automation_run_data.workspace_id = resolved_workspace_id
return resolved_workspace_id
return None
def elapsed(self) -> float:
"""Return the elapsed time in seconds since the initialization time."""
return time.perf_counter() - self._init_time
+2
View File
@@ -68,6 +68,7 @@ def create_test_automation_run(
createTestAutomationRun(automationId: $automationId) {
automationRunId
functionRunId
workspaceId
triggers {
payload {
modelId
@@ -119,6 +120,7 @@ def create_test_automation_run_data(
return AutomationRunData(
project_id=test_automation_environment.project_id,
workspace_id=test_automation_run_data.workspace_id,
speckle_server_url=test_automation_environment.server_url,
automation_id=test_automation_environment.automation_id,
automation_run_id=test_automation_run_data.automation_run_id,
+3 -1
View File
@@ -1,7 +1,7 @@
""""""
from enum import Enum
from typing import Any, Literal
from typing import Any, Literal, Optional
from pydantic import BaseModel, ConfigDict, Field
from pydantic.alias_generators import to_camel
@@ -31,6 +31,7 @@ class AutomationRunData(BaseModel):
"""Values of the project / model that triggered the run of this function."""
project_id: str
workspace_id: Optional[str] = None
speckle_server_url: str
automation_id: str
automation_run_id: str
@@ -48,6 +49,7 @@ class TestAutomationRunData(BaseModel):
automation_run_id: str
function_run_id: str
workspace_id: Optional[str] = None
triggers: list[VersionCreationTrigger]
+1 -1
View File
@@ -61,4 +61,4 @@ def cmd_line_import() -> None:
if __name__ == "__main__":
start = time.time()
cmd_line_import()
print(f"Total time (including cleanup): {(time.time() - start):.3f}s")
print(f"Total time (including cleanup): {(time.time() - start) * 1000}ms")
+2 -26
View File
@@ -22,15 +22,12 @@ from specklepy.objects import Base
from specklepy.objects.data_objects import DataObject
from specklepy.objects.models.collections.collection import Collection
from specklepy.objects.proxies import InstanceProxy
from specklepy.progress.ingestion_progress import IngestionProgressManager
@dataclass
class ImportJob:
ifc_file: file
progress: IngestionProgressManager
_render_material_manager: RenderMaterialProxyManager = field(
default_factory=lambda: RenderMaterialProxyManager()
)
@@ -42,7 +39,6 @@ class ImportJob:
)
geometries_count: int = 0
geometries_used: int = 0
elements_converted: int = 0
_current_storey_data_object: DataObject | None = field(default=None, init=False)
_display_value_cache: dict[int, list[Base]] = field(default_factory=dict)
@@ -112,12 +108,6 @@ class ImportJob:
# Restore previous storey context
self._current_storey_data_object = previous_storey_data_object
self.elements_converted += 1
if self.progress.should_report_progress():
self.progress.report(
f"Converted {self.elements_converted:,} elements", None
)
return result
def _convert_children(self, step_element: entity_instance) -> list[Base]:
@@ -142,16 +132,12 @@ class ImportJob:
def convert(self) -> Base:
start = time.time()
self.pre_process_geometry()
print(
f"Geometry conversion complete after {(time.time() - start):.3f}s" # noqa: E501
)
print(f"Geometry conversion complete after {(time.time() - start) * 1000}ms")
print(f"Created {self.geometries_count} geometries")
start = time.time()
root = self._convert_project_tree()
print(
f"Element tree conversion complete after {(time.time() - start):.3f}s" # noqa: E501
)
print(f"Object tree conversion complete after {(time.time() - start) * 1000}ms")
print(f"Used {self.geometries_used} geometries")
return root
@@ -159,10 +145,7 @@ class ImportJob:
iterator = create_geometry_iterator(self.ifc_file)
if not iterator.initialize():
raise SpeckleException("Failed to find any geometry in file")
self.progress.report("Converting geometries", None)
self.geometries_count = 0
while True:
shape = cast(TriangulationElement, iterator.get())
self.geometries_count += 1
@@ -174,11 +157,6 @@ class ImportJob:
raise SpeckleException(
f"Failed to convert geometry with id: {id}"
) from ex
if self.progress.should_report_progress():
self.progress.report(
f"Converted {self.geometries_count:,} geometries", None
)
if not iterator.next():
break
@@ -219,8 +197,6 @@ class ImportJob:
raise SpeckleException("Expected exactly one IfcProject in file")
project = projects[0]
self.progress.report("Converting elements", None)
tree = self.convert_element(project)
if not isinstance(tree, Collection):
raise TypeError("Expected root object to convert to a Collection")
+25 -30
View File
@@ -11,25 +11,19 @@ from specklepy.core.api.inputs.model_ingestion_inputs import (
ModelIngestionFailedInput,
ModelIngestionStartProcessingInput,
ModelIngestionSuccessInput,
ModelIngestionUpdateInput,
SourceDataInput,
)
from specklepy.core.api.models.current import Project, Version
from specklepy.core.api.operations import send
from specklepy.logging import metrics
from specklepy.progress.ingestion_progress import IngestionProgressManager
from specklepy.progress.progress_transport import ProgressTransport
from specklepy.transports.server import ServerTransport
# Since progress messages are currently blocking (no async), we're being extra coarse
# with progress updates to ensure we're not waisting time sending updates.
# We could maybe go a little lower, but for now I'm not risking degrading performance
PROGRESS_INTERVAL_SECONDS = 10
def open_and_convert_file(
file_path: str,
project: Project,
version_message: str | None,
version_message: str,
model_ingestion_id: str,
client: SpeckleClient,
) -> Version:
@@ -39,7 +33,7 @@ def open_and_convert_file(
path = Path(file_path)
specklepy_version = importlib.metadata.version("specklepy")
ingestion = client.model_ingestion.start_processing(
client.model_ingestion.start_processing(
ModelIngestionStartProcessingInput(
project_id=project.id,
ingestion_id=model_ingestion_id,
@@ -52,38 +46,39 @@ def open_and_convert_file(
),
)
)
progress = IngestionProgressManager(
client, ingestion, PROGRESS_INTERVAL_SECONDS
)
account = client.account
server_url = account.serverInfo.url
assert server_url
remote_transport = ServerTransport(project.id, account=account)
progress_transport = ProgressTransport(
progress,
)
progress.report("Opening file", None)
ifc_file = open_ifc(file_path) # pyright: ignore[reportUnknownVariableType]
import_job = ImportJob(ifc_file, progress) # pyright: ignore[reportUnknownArgumentType]
client.model_ingestion.update_progress(
ModelIngestionUpdateInput(
project_id=project.id,
ingestion_id=model_ingestion_id,
progress_message="Converting file",
progress=None,
)
)
import_job = ImportJob(ifc_file) # pyright: ignore[reportUnknownArgumentType]
data = import_job.convert()
print(
f"File conversion complete after {(time.time() - start):.3f}s" # noqa: E501
)
print(f"File conversion complete after {(time.time() - start) * 1000}ms")
start = time.time()
progress.report("Uploading objects", None)
root_id = send(
data,
transports=[remote_transport, progress_transport],
use_default_cache=False,
)
print(
f"Sending to speckle complete after: {(time.time() - start):.3f}s" # noqa: E501
client.model_ingestion.update_progress(
ModelIngestionUpdateInput(
project_id=project.id,
ingestion_id=model_ingestion_id,
progress_message="Uploading objects",
progress=None,
)
)
root_id = send(data, transports=[remote_transport], use_default_cache=False)
print(f"Sending to speckle complete after: {(time.time() - start) * 1000}ms")
start = time.time()
@@ -100,9 +95,9 @@ def open_and_convert_file(
version = client.version.get(version_id, project.id)
end = time.time()
print(f"Version committed after: {(end - start):.3f}s")
print(f"Version committed after: {(end - start) * 1000}ms")
print(f"Total time (to commit): {(end - very_start):.3f}s")
print(f"Total time (to commit): {(end - very_start) * 1000}ms")
del ifc_file
custom_properties = {"ui": "dui3", "actionSource": "import"}
-51
View File
@@ -1,51 +0,0 @@
import time
from speckleifc.main import open_and_convert_file
from specklepy.api.client import SpeckleClient
from specklepy.core.api.credentials import get_accounts_for_server
from specklepy.logging import metrics
def _manual_import() -> None:
from specklepy.core.api.inputs.model_ingestion_inputs import (
ModelIngestionCreateInput,
SourceDataInput,
)
PROJECT_ID = "412a3c3927"
MODEL_ID = "223e61212d"
SERVER_URL = "latest.speckle.systems"
FILE_PATH = r"C:\Test Files\ifc\AC20-FZK-Haus.ifc" # noqa: E501
metrics.set_host_app(
"ifc",
)
account = get_accounts_for_server(SERVER_URL)[0]
client = SpeckleClient(SERVER_URL, use_ssl=not SERVER_URL.startswith("http://"))
client.authenticate_with_account(account)
ingestion = client.model_ingestion.create(
ModelIngestionCreateInput(
model_id=MODEL_ID,
project_id=PROJECT_ID,
progress_message="",
source_data=SourceDataInput(
source_application_slug="speckleifc",
source_application_version="0.0.0",
file_name=None,
file_size_bytes=None,
),
max_idle_timeout_seconds=2700, # 45mins
)
)
project = client.project.get(PROJECT_ID)
open_and_convert_file(FILE_PATH, project, None, ingestion.id, client)
if __name__ == "__main__":
start = time.time()
_manual_import()
print(f"Total time (including cleanup): {(time.time() - start):.3f}s")
@@ -14,7 +14,6 @@ class ModelIngestionCreateInput(GraphQLBaseModel):
project_id: str
progress_message: str
source_data: SourceDataInput
max_idle_timeout_seconds: int | None = None
class ModelIngestionStartProcessingInput(GraphQLBaseModel):
+1 -4
View File
@@ -254,15 +254,12 @@ class FileUploadUrl(GraphQLBaseModel):
class ModelIngestionStatusData(GraphQLBaseModel):
status: ModelIngestionStatus
progress_message: str | None = None
version_id: str | None = None
class ModelIngestion(GraphQLBaseModel):
id: str
created_at: datetime
updated_at: datetime
model_id: str
project_id: str
user_id: str
cancellation_requested: bool
model_id: str
status_data: ModelIngestionStatusData
@@ -50,8 +50,6 @@ class ModelIngestionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -60,10 +58,6 @@ class ModelIngestionResource(ResourceBase):
... on HasProgressMessage {
progressMessage
}
... on ModelIngestionSuccessStatus
{
versionId
}
}
}
}
@@ -93,8 +87,6 @@ class ModelIngestionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -132,8 +124,6 @@ class ModelIngestionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -169,8 +159,6 @@ class ModelIngestionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -208,8 +196,6 @@ class ModelIngestionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -291,8 +277,6 @@ class ModelIngestionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -336,8 +320,6 @@ class ModelIngestionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -388,8 +370,6 @@ class ModelIngestionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -224,8 +224,6 @@ class SubscriptionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -234,10 +232,6 @@ class SubscriptionResource(ResourceBase):
... on HasProgressMessage {
progressMessage
}
... on ModelIngestionSuccessStatus
{
versionId
}
}
}
type
@@ -1,68 +0,0 @@
from time import monotonic
from specklepy.core.api.client import SpeckleClient
from specklepy.core.api.inputs.model_ingestion_inputs import ModelIngestionUpdateInput
from specklepy.core.api.models.current import ModelIngestion
class IngestionProgressManager:
"""
Provides a performant way to report ingestion progress.
Allows callers to throttle ingestion progress messages based on an update interval.
Throttling prevents callers from overwhelming the server with constant progress updates,
and minimises blocking high-speed operations with too frequent progress updates.
Callers can use this pattern for reporting throttled progress
```
if progress.should_report_progress():
progress.report(f"Converting geometries ({current:,}/{total:,})", current - total)
```
And for unthrottled progress (e.g. between phases)
```
progress.report(f"Next phases has started (0/{total:,})", 0)
This class is similar to the `IngestionProgressManager` in the .NET SDK
Unlike in .NET, we recommend using a very coarse `update_interval_seconds`; since we're not using async messages,
they are blocking and will degrade performance if used too frequently.
```
""" # noqa: E501
def __init__(
self,
speckle_client: SpeckleClient,
ingestion: ModelIngestion,
update_interval_seconds: float,
):
self.speckle_client = speckle_client
self.ingestion = ingestion
self.update_interval = update_interval_seconds
self._last_updated_at = 0.0
def report(self, progress_message: str, progress: float | None) -> ModelIngestion:
"""
Reports a progress update
"""
self._last_updated_at = monotonic()
formatted_progress = f"{progress:.0%}" if progress else ""
print(f"Progress update: {progress_message} {formatted_progress}")
return self.speckle_client.model_ingestion.update_progress(
ModelIngestionUpdateInput(
ingestion_id=self.ingestion.id,
project_id=self.ingestion.project_id,
progress_message=progress_message,
progress=progress,
)
)
def should_report_progress(self) -> bool:
"""
Returns `true` if it's time for an update,
`false` if it's too soon since the last update
"""
elapsed = monotonic() - self._last_updated_at
return elapsed >= self.update_interval
@@ -1,64 +0,0 @@
from typing import Dict, List
from specklepy.progress.ingestion_progress import IngestionProgressManager
from specklepy.transports.abstract_transport import AbstractTransport
class ProgressTransport(AbstractTransport):
"""
This transport does not persist objects anywhere,
instead it simply reacts to save_object being called,
and reports throttled progress.
"""
def __init__(
self,
progress: IngestionProgressManager,
name="Progress",
progress_message_template="Uploading objects {:,}",
) -> None:
super().__init__()
self._name = name
self._progress = progress
self._progress_message_template = progress_message_template
self.saved_object_count = 0
def _throttle_progress(self) -> None:
if self._progress.should_report_progress():
self._progress.report(
self._progress_message_template.format(self.saved_object_count), None
)
@property
def name(self) -> str:
return self._name
def __repr__(self) -> str:
return f"ProgressTransport(objects: {self.saved_object_count})"
def save_object(self, id: str, serialized_object: str) -> None:
self.saved_object_count += 1
self._throttle_progress()
def save_object_from_transport(
self, id: str, source_transport: AbstractTransport
) -> None:
self.saved_object_count += 1
self._throttle_progress()
def get_object(self, id: str) -> str | None:
return None
def has_objects(self, id_list: List[str]) -> Dict[str, bool]:
return {id: False for id in id_list}
def begin_write(self) -> None:
self.saved_object_count = 0
def end_write(self) -> None:
pass
def copy_object_and_children(
self, id: str, target_transport: AbstractTransport
) -> str:
raise NotImplementedError
@@ -35,7 +35,7 @@ from tests.integration.conftest import is_public
@pytest.mark.skipif(is_public(), reason="The public API does not support these tests")
class TestIngestionResource:
@pytest.fixture
def project(self, client: SpeckleClient) -> Project:
def project(self, client: SpeckleClient):
return client.project.create(
ProjectCreateInput(
name="test", description=None, visibility=ProjectVisibility.PUBLIC
@@ -43,12 +43,12 @@ class TestIngestionResource:
)
@pytest.fixture
def model(self, client: SpeckleClient, project: Project) -> Model:
def model(self, client: SpeckleClient, project: Project):
return client.model.create(
CreateModelInput(name="test", description=None, project_id=project.id)
)
@pytest.fixture
@pytest.fixture()
def ingestion(
self, client: SpeckleClient, model: Model, project: Project
) -> ModelIngestion:
@@ -71,16 +71,11 @@ class TestIngestionResource:
assert isinstance(ingestion.updated_at, datetime)
assert isinstance(ingestion.cancellation_requested, bool)
assert isinstance(ingestion.model_id, str)
assert isinstance(ingestion.project_id, str)
assert isinstance(ingestion.user_id, str)
assert isinstance(ingestion.status_data, ModelIngestionStatusData)
assert isinstance(ingestion.status_data.progress_message, str | None)
assert ingestion.status_data.version_id is None
assert ingestion.status_data.status == ModelIngestionStatus.PROCESSING
assert not ingestion.cancellation_requested
assert ingestion.model_id == model.id
assert ingestion.project_id == project.id
assert ingestion.user_id == client.account.userInfo.id
return ingestion
@@ -91,7 +86,6 @@ class TestIngestionResource:
project.id, ingestion.id
)
assert queried_ingestion.id == ingestion.id
assert queried_ingestion.status_data.status == ingestion.status_data.status
def test_update_progress(
@@ -216,13 +210,12 @@ class TestIngestionResource:
version_message=None,
)
version_id = client.model_ingestion.complete(input)
res = client.model_ingestion.get_ingestion(project.id, ingestion.id)
res = client.model_ingestion.complete(input)
assert isinstance(version_id, str)
version = client.version.get(version_id, project.id)
assert isinstance(res, str)
version = client.version.get(res, project.id)
assert isinstance(version, Version)
assert res.status_data.version_id == version_id
# trying to complete for a second time should throw
# with pytest.raises(GraphQLException):
# _ = client.ingestion.complete(input)
@@ -241,6 +234,12 @@ class TestIngestionResource:
assert not res.cancellation_requested
assert res.status_data.status == ModelIngestionStatus.CANCELLED
# Cancel again, should be idempotent
res = client.model_ingestion.fail_with_cancel(input)
assert res.status_data.progress_message is None
assert not res.cancellation_requested
assert res.status_data.status == ModelIngestionStatus.CANCELLED
def test_error_non_existent_ingestion(
self, client: SpeckleClient, project: Project
):
@@ -265,7 +264,6 @@ class TestIngestionResource:
with pytest.raises(GraphQLException):
_ = client.model_ingestion.fail_with_error(input)
@pytest.mark.skip(reason="TEST FAILS - server behaviour was changed")
def test_complete_non_existent_root_object(
self, client: SpeckleClient, ingestion: ModelIngestion, project: Project
):
-105
View File
@@ -1,105 +0,0 @@
import time
import pytest
from specklepy.core.api.client import SpeckleClient
from specklepy.core.api.enums import ProjectVisibility
from specklepy.core.api.inputs.model_ingestion_inputs import (
ModelIngestionCreateInput,
SourceDataInput,
)
from specklepy.core.api.inputs.model_inputs import CreateModelInput
from specklepy.core.api.inputs.project_inputs import ProjectCreateInput
from specklepy.core.api.models.current import Model, ModelIngestion, Project
from specklepy.progress.ingestion_progress import IngestionProgressManager
from tests.integration.conftest import is_public
@pytest.mark.run()
@pytest.mark.skipif(
is_public(), reason="The public API does not support model ingestion api"
)
class TestIngestionProgressManager:
@pytest.fixture
def project(self, client: SpeckleClient) -> Project:
return client.project.create(
ProjectCreateInput(
name="test", description=None, visibility=ProjectVisibility.PUBLIC
)
)
@pytest.fixture
def model(self, client: SpeckleClient, project: Project) -> Model:
return client.model.create(
CreateModelInput(name="test", description=None, project_id=project.id)
)
@pytest.fixture
def model_ingestion(
self, client: SpeckleClient, model: Model, project: Project
) -> ModelIngestion:
input = ModelIngestionCreateInput(
model_id=model.id,
project_id=project.id,
progress_message="Starting processing",
source_data=SourceDataInput(
source_application_slug="pytest",
source_application_version="0.0.0",
file_name=None,
file_size_bytes=None,
),
)
return client.model_ingestion.create(input)
def test_progress_respects_throttle(
self,
client: SpeckleClient,
model_ingestion: ModelIngestion,
) -> None:
EXPECTED_MESSAGE = "This is a test ingestion message"
EXPECTED_PROGRESS = 0.123123
UPDATE_INTERVAL_SECONDS = 1
sut = IngestionProgressManager(
speckle_client=client,
ingestion=model_ingestion,
update_interval_seconds=UPDATE_INTERVAL_SECONDS,
)
assert sut.should_report_progress() is True
res = sut.report(EXPECTED_MESSAGE, EXPECTED_PROGRESS)
assert sut.should_report_progress() is False
time.sleep(UPDATE_INTERVAL_SECONDS + 0.5)
assert sut.should_report_progress() is True
assert sut.should_report_progress() is True
assert res.status_data.progress_message == EXPECTED_MESSAGE
def test_progress_no_throttle(
self,
client: SpeckleClient,
model_ingestion: ModelIngestion,
) -> None:
EXPECTED_MESSAGE = "This is a test ingestion message"
EXPECTED_PROGRESS = 0.123123
UPDATE_INTERVAL_SECONDS = 0
sut = IngestionProgressManager(
speckle_client=client,
ingestion=model_ingestion,
update_interval_seconds=UPDATE_INTERVAL_SECONDS,
)
assert sut.should_report_progress() is True
res = sut.report(EXPECTED_MESSAGE, EXPECTED_PROGRESS)
assert sut.should_report_progress() is True
assert sut.should_report_progress() is True
assert res.status_data.progress_message == EXPECTED_MESSAGE