Compare commits

...

1 Commits

Author SHA1 Message Date
Jedd Morgan 20a412bc65 feat(speckle_ifc): More granular IFC conversion progress (#492)
Publish Python Package / test (push) Has been cancelled
Publish Python Package / Build and Publish Python Package (push) Has been cancelled
* IFC importer progress

* Fix broken tests after Ingestion state changes

* Tests

* final tweaks

* add timeout to ModelIngestionCreateInput

* print timing in seconds

* default to None

* small tweak

* fix test

* Fix other tests

* Add progress transport
2026-04-14 14:15:53 +01:00
12 changed files with 391 additions and 42 deletions
+1 -1
View File
@@ -61,4 +61,4 @@ def cmd_line_import() -> None:
if __name__ == "__main__":
start = time.time()
cmd_line_import()
print(f"Total time (including cleanup): {(time.time() - start) * 1000}ms")
print(f"Total time (including cleanup): {(time.time() - start):.3f}s")
+26 -2
View File
@@ -22,12 +22,15 @@ from specklepy.objects import Base
from specklepy.objects.data_objects import DataObject
from specklepy.objects.models.collections.collection import Collection
from specklepy.objects.proxies import InstanceProxy
from specklepy.progress.ingestion_progress import IngestionProgressManager
@dataclass
class ImportJob:
ifc_file: file
progress: IngestionProgressManager
_render_material_manager: RenderMaterialProxyManager = field(
default_factory=lambda: RenderMaterialProxyManager()
)
@@ -39,6 +42,7 @@ class ImportJob:
)
geometries_count: int = 0
geometries_used: int = 0
elements_converted: int = 0
_current_storey_data_object: DataObject | None = field(default=None, init=False)
_display_value_cache: dict[int, list[Base]] = field(default_factory=dict)
@@ -108,6 +112,12 @@ class ImportJob:
# Restore previous storey context
self._current_storey_data_object = previous_storey_data_object
self.elements_converted += 1
if self.progress.should_report_progress():
self.progress.report(
f"Converted {self.elements_converted:,} elements", None
)
return result
def _convert_children(self, step_element: entity_instance) -> list[Base]:
@@ -132,12 +142,16 @@ class ImportJob:
def convert(self) -> Base:
start = time.time()
self.pre_process_geometry()
print(f"Geometry conversion complete after {(time.time() - start) * 1000}ms")
print(
f"Geometry conversion complete after {(time.time() - start):.3f}s" # noqa: E501
)
print(f"Created {self.geometries_count} geometries")
start = time.time()
root = self._convert_project_tree()
print(f"Object tree conversion complete after {(time.time() - start) * 1000}ms")
print(
f"Element tree conversion complete after {(time.time() - start):.3f}s" # noqa: E501
)
print(f"Used {self.geometries_used} geometries")
return root
@@ -145,7 +159,10 @@ class ImportJob:
iterator = create_geometry_iterator(self.ifc_file)
if not iterator.initialize():
raise SpeckleException("Failed to find any geometry in file")
self.progress.report("Converting geometries", None)
self.geometries_count = 0
while True:
shape = cast(TriangulationElement, iterator.get())
self.geometries_count += 1
@@ -157,6 +174,11 @@ class ImportJob:
raise SpeckleException(
f"Failed to convert geometry with id: {id}"
) from ex
if self.progress.should_report_progress():
self.progress.report(
f"Converted {self.geometries_count:,} geometries", None
)
if not iterator.next():
break
@@ -197,6 +219,8 @@ class ImportJob:
raise SpeckleException("Expected exactly one IfcProject in file")
project = projects[0]
self.progress.report("Converting elements", None)
tree = self.convert_element(project)
if not isinstance(tree, Collection):
raise TypeError("Expected root object to convert to a Collection")
+30 -25
View File
@@ -11,19 +11,25 @@ from specklepy.core.api.inputs.model_ingestion_inputs import (
ModelIngestionFailedInput,
ModelIngestionStartProcessingInput,
ModelIngestionSuccessInput,
ModelIngestionUpdateInput,
SourceDataInput,
)
from specklepy.core.api.models.current import Project, Version
from specklepy.core.api.operations import send
from specklepy.logging import metrics
from specklepy.progress.ingestion_progress import IngestionProgressManager
from specklepy.progress.progress_transport import ProgressTransport
from specklepy.transports.server import ServerTransport
# Since progress messages are currently blocking (no async), we're being extra coarse
# with progress updates to ensure we're not waisting time sending updates.
# We could maybe go a little lower, but for now I'm not risking degrading performance
PROGRESS_INTERVAL_SECONDS = 10
def open_and_convert_file(
file_path: str,
project: Project,
version_message: str,
version_message: str | None,
model_ingestion_id: str,
client: SpeckleClient,
) -> Version:
@@ -33,7 +39,7 @@ def open_and_convert_file(
path = Path(file_path)
specklepy_version = importlib.metadata.version("specklepy")
client.model_ingestion.start_processing(
ingestion = client.model_ingestion.start_processing(
ModelIngestionStartProcessingInput(
project_id=project.id,
ingestion_id=model_ingestion_id,
@@ -46,39 +52,38 @@ def open_and_convert_file(
),
)
)
progress = IngestionProgressManager(
client, ingestion, PROGRESS_INTERVAL_SECONDS
)
account = client.account
server_url = account.serverInfo.url
assert server_url
remote_transport = ServerTransport(project.id, account=account)
progress_transport = ProgressTransport(
progress,
)
progress.report("Opening file", None)
ifc_file = open_ifc(file_path) # pyright: ignore[reportUnknownVariableType]
client.model_ingestion.update_progress(
ModelIngestionUpdateInput(
project_id=project.id,
ingestion_id=model_ingestion_id,
progress_message="Converting file",
progress=None,
)
)
import_job = ImportJob(ifc_file) # pyright: ignore[reportUnknownArgumentType]
import_job = ImportJob(ifc_file, progress) # pyright: ignore[reportUnknownArgumentType]
data = import_job.convert()
print(f"File conversion complete after {(time.time() - start) * 1000}ms")
print(
f"File conversion complete after {(time.time() - start):.3f}s" # noqa: E501
)
start = time.time()
client.model_ingestion.update_progress(
ModelIngestionUpdateInput(
project_id=project.id,
ingestion_id=model_ingestion_id,
progress_message="Uploading objects",
progress=None,
)
progress.report("Uploading objects", None)
root_id = send(
data,
transports=[remote_transport, progress_transport],
use_default_cache=False,
)
print(
f"Sending to speckle complete after: {(time.time() - start):.3f}s" # noqa: E501
)
root_id = send(data, transports=[remote_transport], use_default_cache=False)
print(f"Sending to speckle complete after: {(time.time() - start) * 1000}ms")
start = time.time()
@@ -95,9 +100,9 @@ def open_and_convert_file(
version = client.version.get(version_id, project.id)
end = time.time()
print(f"Version committed after: {(end - start) * 1000}ms")
print(f"Version committed after: {(end - start):.3f}s")
print(f"Total time (to commit): {(end - very_start) * 1000}ms")
print(f"Total time (to commit): {(end - very_start):.3f}s")
del ifc_file
custom_properties = {"ui": "dui3", "actionSource": "import"}
+51
View File
@@ -0,0 +1,51 @@
import time
from speckleifc.main import open_and_convert_file
from specklepy.api.client import SpeckleClient
from specklepy.core.api.credentials import get_accounts_for_server
from specklepy.logging import metrics
def _manual_import() -> None:
from specklepy.core.api.inputs.model_ingestion_inputs import (
ModelIngestionCreateInput,
SourceDataInput,
)
PROJECT_ID = "412a3c3927"
MODEL_ID = "223e61212d"
SERVER_URL = "latest.speckle.systems"
FILE_PATH = r"C:\Test Files\ifc\AC20-FZK-Haus.ifc" # noqa: E501
metrics.set_host_app(
"ifc",
)
account = get_accounts_for_server(SERVER_URL)[0]
client = SpeckleClient(SERVER_URL, use_ssl=not SERVER_URL.startswith("http://"))
client.authenticate_with_account(account)
ingestion = client.model_ingestion.create(
ModelIngestionCreateInput(
model_id=MODEL_ID,
project_id=PROJECT_ID,
progress_message="",
source_data=SourceDataInput(
source_application_slug="speckleifc",
source_application_version="0.0.0",
file_name=None,
file_size_bytes=None,
),
max_idle_timeout_seconds=2700, # 45mins
)
)
project = client.project.get(PROJECT_ID)
open_and_convert_file(FILE_PATH, project, None, ingestion.id, client)
if __name__ == "__main__":
start = time.time()
_manual_import()
print(f"Total time (including cleanup): {(time.time() - start):.3f}s")
@@ -14,6 +14,7 @@ class ModelIngestionCreateInput(GraphQLBaseModel):
project_id: str
progress_message: str
source_data: SourceDataInput
max_idle_timeout_seconds: int | None = None
class ModelIngestionStartProcessingInput(GraphQLBaseModel):
+4 -1
View File
@@ -254,12 +254,15 @@ class FileUploadUrl(GraphQLBaseModel):
class ModelIngestionStatusData(GraphQLBaseModel):
status: ModelIngestionStatus
progress_message: str | None = None
version_id: str | None = None
class ModelIngestion(GraphQLBaseModel):
id: str
created_at: datetime
updated_at: datetime
cancellation_requested: bool
model_id: str
project_id: str
user_id: str
cancellation_requested: bool
status_data: ModelIngestionStatusData
@@ -50,6 +50,8 @@ class ModelIngestionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -58,6 +60,10 @@ class ModelIngestionResource(ResourceBase):
... on HasProgressMessage {
progressMessage
}
... on ModelIngestionSuccessStatus
{
versionId
}
}
}
}
@@ -87,6 +93,8 @@ class ModelIngestionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -124,6 +132,8 @@ class ModelIngestionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -159,6 +169,8 @@ class ModelIngestionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -196,6 +208,8 @@ class ModelIngestionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -277,6 +291,8 @@ class ModelIngestionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -320,6 +336,8 @@ class ModelIngestionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -370,6 +388,8 @@ class ModelIngestionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -224,6 +224,8 @@ class SubscriptionResource(ResourceBase):
createdAt
updatedAt
modelId
projectId
userId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
@@ -232,6 +234,10 @@ class SubscriptionResource(ResourceBase):
... on HasProgressMessage {
progressMessage
}
... on ModelIngestionSuccessStatus
{
versionId
}
}
}
type
@@ -0,0 +1,68 @@
from time import monotonic
from specklepy.core.api.client import SpeckleClient
from specklepy.core.api.inputs.model_ingestion_inputs import ModelIngestionUpdateInput
from specklepy.core.api.models.current import ModelIngestion
class IngestionProgressManager:
"""
Provides a performant way to report ingestion progress.
Allows callers to throttle ingestion progress messages based on an update interval.
Throttling prevents callers from overwhelming the server with constant progress updates,
and minimises blocking high-speed operations with too frequent progress updates.
Callers can use this pattern for reporting throttled progress
```
if progress.should_report_progress():
progress.report(f"Converting geometries ({current:,}/{total:,})", current - total)
```
And for unthrottled progress (e.g. between phases)
```
progress.report(f"Next phases has started (0/{total:,})", 0)
This class is similar to the `IngestionProgressManager` in the .NET SDK
Unlike in .NET, we recommend using a very coarse `update_interval_seconds`; since we're not using async messages,
they are blocking and will degrade performance if used too frequently.
```
""" # noqa: E501
def __init__(
self,
speckle_client: SpeckleClient,
ingestion: ModelIngestion,
update_interval_seconds: float,
):
self.speckle_client = speckle_client
self.ingestion = ingestion
self.update_interval = update_interval_seconds
self._last_updated_at = 0.0
def report(self, progress_message: str, progress: float | None) -> ModelIngestion:
"""
Reports a progress update
"""
self._last_updated_at = monotonic()
formatted_progress = f"{progress:.0%}" if progress else ""
print(f"Progress update: {progress_message} {formatted_progress}")
return self.speckle_client.model_ingestion.update_progress(
ModelIngestionUpdateInput(
ingestion_id=self.ingestion.id,
project_id=self.ingestion.project_id,
progress_message=progress_message,
progress=progress,
)
)
def should_report_progress(self) -> bool:
"""
Returns `true` if it's time for an update,
`false` if it's too soon since the last update
"""
elapsed = monotonic() - self._last_updated_at
return elapsed >= self.update_interval
@@ -0,0 +1,64 @@
from typing import Dict, List
from specklepy.progress.ingestion_progress import IngestionProgressManager
from specklepy.transports.abstract_transport import AbstractTransport
class ProgressTransport(AbstractTransport):
"""
This transport does not persist objects anywhere,
instead it simply reacts to save_object being called,
and reports throttled progress.
"""
def __init__(
self,
progress: IngestionProgressManager,
name="Progress",
progress_message_template="Uploading objects {:,}",
) -> None:
super().__init__()
self._name = name
self._progress = progress
self._progress_message_template = progress_message_template
self.saved_object_count = 0
def _throttle_progress(self) -> None:
if self._progress.should_report_progress():
self._progress.report(
self._progress_message_template.format(self.saved_object_count), None
)
@property
def name(self) -> str:
return self._name
def __repr__(self) -> str:
return f"ProgressTransport(objects: {self.saved_object_count})"
def save_object(self, id: str, serialized_object: str) -> None:
self.saved_object_count += 1
self._throttle_progress()
def save_object_from_transport(
self, id: str, source_transport: AbstractTransport
) -> None:
self.saved_object_count += 1
self._throttle_progress()
def get_object(self, id: str) -> str | None:
return None
def has_objects(self, id_list: List[str]) -> Dict[str, bool]:
return {id: False for id in id_list}
def begin_write(self) -> None:
self.saved_object_count = 0
def end_write(self) -> None:
pass
def copy_object_and_children(
self, id: str, target_transport: AbstractTransport
) -> str:
raise NotImplementedError
@@ -35,7 +35,7 @@ from tests.integration.conftest import is_public
@pytest.mark.skipif(is_public(), reason="The public API does not support these tests")
class TestIngestionResource:
@pytest.fixture
def project(self, client: SpeckleClient):
def project(self, client: SpeckleClient) -> Project:
return client.project.create(
ProjectCreateInput(
name="test", description=None, visibility=ProjectVisibility.PUBLIC
@@ -43,12 +43,12 @@ class TestIngestionResource:
)
@pytest.fixture
def model(self, client: SpeckleClient, project: Project):
def model(self, client: SpeckleClient, project: Project) -> Model:
return client.model.create(
CreateModelInput(name="test", description=None, project_id=project.id)
)
@pytest.fixture()
@pytest.fixture
def ingestion(
self, client: SpeckleClient, model: Model, project: Project
) -> ModelIngestion:
@@ -71,11 +71,16 @@ class TestIngestionResource:
assert isinstance(ingestion.updated_at, datetime)
assert isinstance(ingestion.cancellation_requested, bool)
assert isinstance(ingestion.model_id, str)
assert isinstance(ingestion.project_id, str)
assert isinstance(ingestion.user_id, str)
assert isinstance(ingestion.status_data, ModelIngestionStatusData)
assert isinstance(ingestion.status_data.progress_message, str | None)
assert ingestion.status_data.version_id is None
assert ingestion.status_data.status == ModelIngestionStatus.PROCESSING
assert not ingestion.cancellation_requested
assert ingestion.model_id == model.id
assert ingestion.project_id == project.id
assert ingestion.user_id == client.account.userInfo.id
return ingestion
@@ -86,6 +91,7 @@ class TestIngestionResource:
project.id, ingestion.id
)
assert queried_ingestion.id == ingestion.id
assert queried_ingestion.status_data.status == ingestion.status_data.status
def test_update_progress(
@@ -210,12 +216,13 @@ class TestIngestionResource:
version_message=None,
)
res = client.model_ingestion.complete(input)
version_id = client.model_ingestion.complete(input)
res = client.model_ingestion.get_ingestion(project.id, ingestion.id)
assert isinstance(res, str)
version = client.version.get(res, project.id)
assert isinstance(version_id, str)
version = client.version.get(version_id, project.id)
assert isinstance(version, Version)
assert res.status_data.version_id == version_id
# trying to complete for a second time should throw
# with pytest.raises(GraphQLException):
# _ = client.ingestion.complete(input)
@@ -234,12 +241,6 @@ class TestIngestionResource:
assert not res.cancellation_requested
assert res.status_data.status == ModelIngestionStatus.CANCELLED
# Cancel again, should be idempotent
res = client.model_ingestion.fail_with_cancel(input)
assert res.status_data.progress_message is None
assert not res.cancellation_requested
assert res.status_data.status == ModelIngestionStatus.CANCELLED
def test_error_non_existent_ingestion(
self, client: SpeckleClient, project: Project
):
@@ -264,6 +265,7 @@ class TestIngestionResource:
with pytest.raises(GraphQLException):
_ = client.model_ingestion.fail_with_error(input)
@pytest.mark.skip(reason="TEST FAILS - server behaviour was changed")
def test_complete_non_existent_root_object(
self, client: SpeckleClient, ingestion: ModelIngestion, project: Project
):
+105
View File
@@ -0,0 +1,105 @@
import time
import pytest
from specklepy.core.api.client import SpeckleClient
from specklepy.core.api.enums import ProjectVisibility
from specklepy.core.api.inputs.model_ingestion_inputs import (
ModelIngestionCreateInput,
SourceDataInput,
)
from specklepy.core.api.inputs.model_inputs import CreateModelInput
from specklepy.core.api.inputs.project_inputs import ProjectCreateInput
from specklepy.core.api.models.current import Model, ModelIngestion, Project
from specklepy.progress.ingestion_progress import IngestionProgressManager
from tests.integration.conftest import is_public
@pytest.mark.run()
@pytest.mark.skipif(
is_public(), reason="The public API does not support model ingestion api"
)
class TestIngestionProgressManager:
@pytest.fixture
def project(self, client: SpeckleClient) -> Project:
return client.project.create(
ProjectCreateInput(
name="test", description=None, visibility=ProjectVisibility.PUBLIC
)
)
@pytest.fixture
def model(self, client: SpeckleClient, project: Project) -> Model:
return client.model.create(
CreateModelInput(name="test", description=None, project_id=project.id)
)
@pytest.fixture
def model_ingestion(
self, client: SpeckleClient, model: Model, project: Project
) -> ModelIngestion:
input = ModelIngestionCreateInput(
model_id=model.id,
project_id=project.id,
progress_message="Starting processing",
source_data=SourceDataInput(
source_application_slug="pytest",
source_application_version="0.0.0",
file_name=None,
file_size_bytes=None,
),
)
return client.model_ingestion.create(input)
def test_progress_respects_throttle(
self,
client: SpeckleClient,
model_ingestion: ModelIngestion,
) -> None:
EXPECTED_MESSAGE = "This is a test ingestion message"
EXPECTED_PROGRESS = 0.123123
UPDATE_INTERVAL_SECONDS = 1
sut = IngestionProgressManager(
speckle_client=client,
ingestion=model_ingestion,
update_interval_seconds=UPDATE_INTERVAL_SECONDS,
)
assert sut.should_report_progress() is True
res = sut.report(EXPECTED_MESSAGE, EXPECTED_PROGRESS)
assert sut.should_report_progress() is False
time.sleep(UPDATE_INTERVAL_SECONDS + 0.5)
assert sut.should_report_progress() is True
assert sut.should_report_progress() is True
assert res.status_data.progress_message == EXPECTED_MESSAGE
def test_progress_no_throttle(
self,
client: SpeckleClient,
model_ingestion: ModelIngestion,
) -> None:
EXPECTED_MESSAGE = "This is a test ingestion message"
EXPECTED_PROGRESS = 0.123123
UPDATE_INTERVAL_SECONDS = 0
sut = IngestionProgressManager(
speckle_client=client,
ingestion=model_ingestion,
update_interval_seconds=UPDATE_INTERVAL_SECONDS,
)
assert sut.should_report_progress() is True
res = sut.report(EXPECTED_MESSAGE, EXPECTED_PROGRESS)
assert sut.should_report_progress() is True
assert sut.should_report_progress() is True
assert res.status_data.progress_message == EXPECTED_MESSAGE