diff --git a/src/speckleifc/__main__.py b/src/speckleifc/__main__.py index d0e66a1..7f947ed 100644 --- a/src/speckleifc/__main__.py +++ b/src/speckleifc/__main__.py @@ -61,4 +61,4 @@ def cmd_line_import() -> None: if __name__ == "__main__": start = time.time() cmd_line_import() - print(f"Total time (including cleanup): {(time.time() - start) * 1000}ms") + print(f"Total time (including cleanup): {(time.time() - start):.3f}s") diff --git a/src/speckleifc/importer.py b/src/speckleifc/importer.py index 5a57e7b..34608c8 100644 --- a/src/speckleifc/importer.py +++ b/src/speckleifc/importer.py @@ -22,12 +22,15 @@ from specklepy.objects import Base from specklepy.objects.data_objects import DataObject from specklepy.objects.models.collections.collection import Collection from specklepy.objects.proxies import InstanceProxy +from specklepy.progress.ingestion_progress import IngestionProgressManager @dataclass class ImportJob: ifc_file: file + progress: IngestionProgressManager + _render_material_manager: RenderMaterialProxyManager = field( default_factory=lambda: RenderMaterialProxyManager() ) @@ -39,6 +42,7 @@ class ImportJob: ) geometries_count: int = 0 geometries_used: int = 0 + elements_converted: int = 0 _current_storey_data_object: DataObject | None = field(default=None, init=False) _display_value_cache: dict[int, list[Base]] = field(default_factory=dict) @@ -108,6 +112,12 @@ class ImportJob: # Restore previous storey context self._current_storey_data_object = previous_storey_data_object + self.elements_converted += 1 + if self.progress.should_report_progress(): + self.progress.report( + f"Converted {self.elements_converted:,} elements", None + ) + return result def _convert_children(self, step_element: entity_instance) -> list[Base]: @@ -132,12 +142,16 @@ class ImportJob: def convert(self) -> Base: start = time.time() self.pre_process_geometry() - print(f"Geometry conversion complete after {(time.time() - start) * 1000}ms") + print( + f"Geometry conversion complete after {(time.time() - start):.3f}s" # noqa: E501 + ) print(f"Created {self.geometries_count} geometries") start = time.time() root = self._convert_project_tree() - print(f"Object tree conversion complete after {(time.time() - start) * 1000}ms") + print( + f"Element tree conversion complete after {(time.time() - start):.3f}s" # noqa: E501 + ) print(f"Used {self.geometries_used} geometries") return root @@ -145,7 +159,10 @@ class ImportJob: iterator = create_geometry_iterator(self.ifc_file) if not iterator.initialize(): raise SpeckleException("Failed to find any geometry in file") + + self.progress.report("Converting geometries", None) self.geometries_count = 0 + while True: shape = cast(TriangulationElement, iterator.get()) self.geometries_count += 1 @@ -157,6 +174,11 @@ class ImportJob: raise SpeckleException( f"Failed to convert geometry with id: {id}" ) from ex + + if self.progress.should_report_progress(): + self.progress.report( + f"Converted {self.geometries_count:,} geometries", None + ) if not iterator.next(): break @@ -197,6 +219,8 @@ class ImportJob: raise SpeckleException("Expected exactly one IfcProject in file") project = projects[0] + self.progress.report("Converting elements", None) + tree = self.convert_element(project) if not isinstance(tree, Collection): raise TypeError("Expected root object to convert to a Collection") diff --git a/src/speckleifc/main.py b/src/speckleifc/main.py index bd010a3..609e06a 100644 --- a/src/speckleifc/main.py +++ b/src/speckleifc/main.py @@ -11,19 +11,25 @@ from specklepy.core.api.inputs.model_ingestion_inputs import ( ModelIngestionFailedInput, ModelIngestionStartProcessingInput, ModelIngestionSuccessInput, - ModelIngestionUpdateInput, SourceDataInput, ) from specklepy.core.api.models.current import Project, Version from specklepy.core.api.operations import send from specklepy.logging import metrics +from specklepy.progress.ingestion_progress import IngestionProgressManager +from specklepy.progress.progress_transport import ProgressTransport from specklepy.transports.server import ServerTransport +# Since progress messages are currently blocking (no async), we're being extra coarse +# with progress updates to ensure we're not waisting time sending updates. +# We could maybe go a little lower, but for now I'm not risking degrading performance +PROGRESS_INTERVAL_SECONDS = 10 + def open_and_convert_file( file_path: str, project: Project, - version_message: str, + version_message: str | None, model_ingestion_id: str, client: SpeckleClient, ) -> Version: @@ -33,7 +39,7 @@ def open_and_convert_file( path = Path(file_path) specklepy_version = importlib.metadata.version("specklepy") - client.model_ingestion.start_processing( + ingestion = client.model_ingestion.start_processing( ModelIngestionStartProcessingInput( project_id=project.id, ingestion_id=model_ingestion_id, @@ -46,39 +52,38 @@ def open_and_convert_file( ), ) ) - + progress = IngestionProgressManager( + client, ingestion, PROGRESS_INTERVAL_SECONDS + ) account = client.account server_url = account.serverInfo.url assert server_url remote_transport = ServerTransport(project.id, account=account) + progress_transport = ProgressTransport( + progress, + ) + progress.report("Opening file", None) ifc_file = open_ifc(file_path) # pyright: ignore[reportUnknownVariableType] - client.model_ingestion.update_progress( - ModelIngestionUpdateInput( - project_id=project.id, - ingestion_id=model_ingestion_id, - progress_message="Converting file", - progress=None, - ) - ) - import_job = ImportJob(ifc_file) # pyright: ignore[reportUnknownArgumentType] + import_job = ImportJob(ifc_file, progress) # pyright: ignore[reportUnknownArgumentType] data = import_job.convert() - print(f"File conversion complete after {(time.time() - start) * 1000}ms") + print( + f"File conversion complete after {(time.time() - start):.3f}s" # noqa: E501 + ) start = time.time() - client.model_ingestion.update_progress( - ModelIngestionUpdateInput( - project_id=project.id, - ingestion_id=model_ingestion_id, - progress_message="Uploading objects", - progress=None, - ) + progress.report("Uploading objects", None) + root_id = send( + data, + transports=[remote_transport, progress_transport], + use_default_cache=False, + ) + print( + f"Sending to speckle complete after: {(time.time() - start):.3f}s" # noqa: E501 ) - root_id = send(data, transports=[remote_transport], use_default_cache=False) - print(f"Sending to speckle complete after: {(time.time() - start) * 1000}ms") start = time.time() @@ -95,9 +100,9 @@ def open_and_convert_file( version = client.version.get(version_id, project.id) end = time.time() - print(f"Version committed after: {(end - start) * 1000}ms") + print(f"Version committed after: {(end - start):.3f}s") - print(f"Total time (to commit): {(end - very_start) * 1000}ms") + print(f"Total time (to commit): {(end - very_start):.3f}s") del ifc_file custom_properties = {"ui": "dui3", "actionSource": "import"} diff --git a/src/speckleifc/manual_import.py b/src/speckleifc/manual_import.py new file mode 100644 index 0000000..e80f731 --- /dev/null +++ b/src/speckleifc/manual_import.py @@ -0,0 +1,51 @@ +import time + +from speckleifc.main import open_and_convert_file +from specklepy.api.client import SpeckleClient +from specklepy.core.api.credentials import get_accounts_for_server +from specklepy.logging import metrics + + +def _manual_import() -> None: + from specklepy.core.api.inputs.model_ingestion_inputs import ( + ModelIngestionCreateInput, + SourceDataInput, + ) + + PROJECT_ID = "412a3c3927" + MODEL_ID = "223e61212d" + SERVER_URL = "latest.speckle.systems" + FILE_PATH = r"C:\Test Files\ifc\AC20-FZK-Haus.ifc" # noqa: E501 + + metrics.set_host_app( + "ifc", + ) + + account = get_accounts_for_server(SERVER_URL)[0] + client = SpeckleClient(SERVER_URL, use_ssl=not SERVER_URL.startswith("http://")) + client.authenticate_with_account(account) + + ingestion = client.model_ingestion.create( + ModelIngestionCreateInput( + model_id=MODEL_ID, + project_id=PROJECT_ID, + progress_message="", + source_data=SourceDataInput( + source_application_slug="speckleifc", + source_application_version="0.0.0", + file_name=None, + file_size_bytes=None, + ), + max_idle_timeout_seconds=2700, # 45mins + ) + ) + project = client.project.get(PROJECT_ID) + + open_and_convert_file(FILE_PATH, project, None, ingestion.id, client) + + +if __name__ == "__main__": + start = time.time() + + _manual_import() + print(f"Total time (including cleanup): {(time.time() - start):.3f}s") diff --git a/src/specklepy/core/api/inputs/model_ingestion_inputs.py b/src/specklepy/core/api/inputs/model_ingestion_inputs.py index 852456f..041eb96 100644 --- a/src/specklepy/core/api/inputs/model_ingestion_inputs.py +++ b/src/specklepy/core/api/inputs/model_ingestion_inputs.py @@ -14,6 +14,7 @@ class ModelIngestionCreateInput(GraphQLBaseModel): project_id: str progress_message: str source_data: SourceDataInput + max_idle_timeout_seconds: int | None = None class ModelIngestionStartProcessingInput(GraphQLBaseModel): diff --git a/src/specklepy/core/api/models/current.py b/src/specklepy/core/api/models/current.py index e9c6389..e48d257 100644 --- a/src/specklepy/core/api/models/current.py +++ b/src/specklepy/core/api/models/current.py @@ -254,12 +254,15 @@ class FileUploadUrl(GraphQLBaseModel): class ModelIngestionStatusData(GraphQLBaseModel): status: ModelIngestionStatus progress_message: str | None = None + version_id: str | None = None class ModelIngestion(GraphQLBaseModel): id: str created_at: datetime updated_at: datetime - cancellation_requested: bool model_id: str + project_id: str + user_id: str + cancellation_requested: bool status_data: ModelIngestionStatusData diff --git a/src/specklepy/core/api/resources/current/model_ingestion_resource.py b/src/specklepy/core/api/resources/current/model_ingestion_resource.py index 6531b96..3dad506 100644 --- a/src/specklepy/core/api/resources/current/model_ingestion_resource.py +++ b/src/specklepy/core/api/resources/current/model_ingestion_resource.py @@ -50,6 +50,8 @@ class ModelIngestionResource(ResourceBase): createdAt updatedAt modelId + projectId + userId cancellationRequested statusData { ... on HasModelIngestionStatus { @@ -58,6 +60,10 @@ class ModelIngestionResource(ResourceBase): ... on HasProgressMessage { progressMessage } + ... on ModelIngestionSuccessStatus + { + versionId + } } } } @@ -87,6 +93,8 @@ class ModelIngestionResource(ResourceBase): createdAt updatedAt modelId + projectId + userId cancellationRequested statusData { ... on HasModelIngestionStatus { @@ -124,6 +132,8 @@ class ModelIngestionResource(ResourceBase): createdAt updatedAt modelId + projectId + userId cancellationRequested statusData { ... on HasModelIngestionStatus { @@ -159,6 +169,8 @@ class ModelIngestionResource(ResourceBase): createdAt updatedAt modelId + projectId + userId cancellationRequested statusData { ... on HasModelIngestionStatus { @@ -196,6 +208,8 @@ class ModelIngestionResource(ResourceBase): createdAt updatedAt modelId + projectId + userId cancellationRequested statusData { ... on HasModelIngestionStatus { @@ -277,6 +291,8 @@ class ModelIngestionResource(ResourceBase): createdAt updatedAt modelId + projectId + userId cancellationRequested statusData { ... on HasModelIngestionStatus { @@ -320,6 +336,8 @@ class ModelIngestionResource(ResourceBase): createdAt updatedAt modelId + projectId + userId cancellationRequested statusData { ... on HasModelIngestionStatus { @@ -370,6 +388,8 @@ class ModelIngestionResource(ResourceBase): createdAt updatedAt modelId + projectId + userId cancellationRequested statusData { ... on HasModelIngestionStatus { diff --git a/src/specklepy/core/api/resources/current/subscription_resource.py b/src/specklepy/core/api/resources/current/subscription_resource.py index c65486e..99c4771 100644 --- a/src/specklepy/core/api/resources/current/subscription_resource.py +++ b/src/specklepy/core/api/resources/current/subscription_resource.py @@ -224,6 +224,8 @@ class SubscriptionResource(ResourceBase): createdAt updatedAt modelId + projectId + userId cancellationRequested statusData { ... on HasModelIngestionStatus { @@ -232,6 +234,10 @@ class SubscriptionResource(ResourceBase): ... on HasProgressMessage { progressMessage } + ... on ModelIngestionSuccessStatus + { + versionId + } } } type diff --git a/src/specklepy/progress/ingestion_progress.py b/src/specklepy/progress/ingestion_progress.py new file mode 100644 index 0000000..b0ffafc --- /dev/null +++ b/src/specklepy/progress/ingestion_progress.py @@ -0,0 +1,68 @@ +from time import monotonic + +from specklepy.core.api.client import SpeckleClient +from specklepy.core.api.inputs.model_ingestion_inputs import ModelIngestionUpdateInput +from specklepy.core.api.models.current import ModelIngestion + + +class IngestionProgressManager: + """ + Provides a performant way to report ingestion progress. + + Allows callers to throttle ingestion progress messages based on an update interval. + Throttling prevents callers from overwhelming the server with constant progress updates, + and minimises blocking high-speed operations with too frequent progress updates. + + Callers can use this pattern for reporting throttled progress + ``` + if progress.should_report_progress(): + progress.report(f"Converting geometries ({current:,}/{total:,})", current - total) + ``` + + And for unthrottled progress (e.g. between phases) + ``` + progress.report(f"Next phases has started (0/{total:,})", 0) + + This class is similar to the `IngestionProgressManager` in the .NET SDK + Unlike in .NET, we recommend using a very coarse `update_interval_seconds`; since we're not using async messages, + they are blocking and will degrade performance if used too frequently. + ``` + + """ # noqa: E501 + + def __init__( + self, + speckle_client: SpeckleClient, + ingestion: ModelIngestion, + update_interval_seconds: float, + ): + self.speckle_client = speckle_client + self.ingestion = ingestion + self.update_interval = update_interval_seconds + + self._last_updated_at = 0.0 + + def report(self, progress_message: str, progress: float | None) -> ModelIngestion: + """ + Reports a progress update + """ + self._last_updated_at = monotonic() + formatted_progress = f"{progress:.0%}" if progress else "" + print(f"Progress update: {progress_message} {formatted_progress}") + + return self.speckle_client.model_ingestion.update_progress( + ModelIngestionUpdateInput( + ingestion_id=self.ingestion.id, + project_id=self.ingestion.project_id, + progress_message=progress_message, + progress=progress, + ) + ) + + def should_report_progress(self) -> bool: + """ + Returns `true` if it's time for an update, + `false` if it's too soon since the last update + """ + elapsed = monotonic() - self._last_updated_at + return elapsed >= self.update_interval diff --git a/src/specklepy/progress/progress_transport.py b/src/specklepy/progress/progress_transport.py new file mode 100644 index 0000000..e3a3b8e --- /dev/null +++ b/src/specklepy/progress/progress_transport.py @@ -0,0 +1,64 @@ +from typing import Dict, List + +from specklepy.progress.ingestion_progress import IngestionProgressManager +from specklepy.transports.abstract_transport import AbstractTransport + + +class ProgressTransport(AbstractTransport): + """ + This transport does not persist objects anywhere, + instead it simply reacts to save_object being called, + and reports throttled progress. + """ + + def __init__( + self, + progress: IngestionProgressManager, + name="Progress", + progress_message_template="Uploading objects {:,}", + ) -> None: + super().__init__() + self._name = name + self._progress = progress + self._progress_message_template = progress_message_template + self.saved_object_count = 0 + + def _throttle_progress(self) -> None: + if self._progress.should_report_progress(): + self._progress.report( + self._progress_message_template.format(self.saved_object_count), None + ) + + @property + def name(self) -> str: + return self._name + + def __repr__(self) -> str: + return f"ProgressTransport(objects: {self.saved_object_count})" + + def save_object(self, id: str, serialized_object: str) -> None: + self.saved_object_count += 1 + self._throttle_progress() + + def save_object_from_transport( + self, id: str, source_transport: AbstractTransport + ) -> None: + self.saved_object_count += 1 + self._throttle_progress() + + def get_object(self, id: str) -> str | None: + return None + + def has_objects(self, id_list: List[str]) -> Dict[str, bool]: + return {id: False for id in id_list} + + def begin_write(self) -> None: + self.saved_object_count = 0 + + def end_write(self) -> None: + pass + + def copy_object_and_children( + self, id: str, target_transport: AbstractTransport + ) -> str: + raise NotImplementedError diff --git a/tests/integration/client/current/test_model_ingestion_resource.py b/tests/integration/client/current/test_model_ingestion_resource.py index 551ae52..155c1ca 100644 --- a/tests/integration/client/current/test_model_ingestion_resource.py +++ b/tests/integration/client/current/test_model_ingestion_resource.py @@ -35,7 +35,7 @@ from tests.integration.conftest import is_public @pytest.mark.skipif(is_public(), reason="The public API does not support these tests") class TestIngestionResource: @pytest.fixture - def project(self, client: SpeckleClient): + def project(self, client: SpeckleClient) -> Project: return client.project.create( ProjectCreateInput( name="test", description=None, visibility=ProjectVisibility.PUBLIC @@ -43,12 +43,12 @@ class TestIngestionResource: ) @pytest.fixture - def model(self, client: SpeckleClient, project: Project): + def model(self, client: SpeckleClient, project: Project) -> Model: return client.model.create( CreateModelInput(name="test", description=None, project_id=project.id) ) - @pytest.fixture() + @pytest.fixture def ingestion( self, client: SpeckleClient, model: Model, project: Project ) -> ModelIngestion: @@ -71,11 +71,16 @@ class TestIngestionResource: assert isinstance(ingestion.updated_at, datetime) assert isinstance(ingestion.cancellation_requested, bool) assert isinstance(ingestion.model_id, str) + assert isinstance(ingestion.project_id, str) + assert isinstance(ingestion.user_id, str) assert isinstance(ingestion.status_data, ModelIngestionStatusData) assert isinstance(ingestion.status_data.progress_message, str | None) + assert ingestion.status_data.version_id is None assert ingestion.status_data.status == ModelIngestionStatus.PROCESSING assert not ingestion.cancellation_requested assert ingestion.model_id == model.id + assert ingestion.project_id == project.id + assert ingestion.user_id == client.account.userInfo.id return ingestion @@ -86,6 +91,7 @@ class TestIngestionResource: project.id, ingestion.id ) assert queried_ingestion.id == ingestion.id + assert queried_ingestion.status_data.status == ingestion.status_data.status def test_update_progress( @@ -210,12 +216,13 @@ class TestIngestionResource: version_message=None, ) - res = client.model_ingestion.complete(input) + version_id = client.model_ingestion.complete(input) + res = client.model_ingestion.get_ingestion(project.id, ingestion.id) - assert isinstance(res, str) - version = client.version.get(res, project.id) + assert isinstance(version_id, str) + version = client.version.get(version_id, project.id) assert isinstance(version, Version) - + assert res.status_data.version_id == version_id # trying to complete for a second time should throw # with pytest.raises(GraphQLException): # _ = client.ingestion.complete(input) @@ -234,12 +241,6 @@ class TestIngestionResource: assert not res.cancellation_requested assert res.status_data.status == ModelIngestionStatus.CANCELLED - # Cancel again, should be idempotent - res = client.model_ingestion.fail_with_cancel(input) - assert res.status_data.progress_message is None - assert not res.cancellation_requested - assert res.status_data.status == ModelIngestionStatus.CANCELLED - def test_error_non_existent_ingestion( self, client: SpeckleClient, project: Project ): @@ -264,6 +265,7 @@ class TestIngestionResource: with pytest.raises(GraphQLException): _ = client.model_ingestion.fail_with_error(input) + @pytest.mark.skip(reason="TEST FAILS - server behaviour was changed") def test_complete_non_existent_root_object( self, client: SpeckleClient, ingestion: ModelIngestion, project: Project ): diff --git a/tests/integration/test_progress.py b/tests/integration/test_progress.py new file mode 100644 index 0000000..955c3bf --- /dev/null +++ b/tests/integration/test_progress.py @@ -0,0 +1,105 @@ +import time + +import pytest + +from specklepy.core.api.client import SpeckleClient +from specklepy.core.api.enums import ProjectVisibility +from specklepy.core.api.inputs.model_ingestion_inputs import ( + ModelIngestionCreateInput, + SourceDataInput, +) +from specklepy.core.api.inputs.model_inputs import CreateModelInput +from specklepy.core.api.inputs.project_inputs import ProjectCreateInput +from specklepy.core.api.models.current import Model, ModelIngestion, Project +from specklepy.progress.ingestion_progress import IngestionProgressManager +from tests.integration.conftest import is_public + + +@pytest.mark.run() +@pytest.mark.skipif( + is_public(), reason="The public API does not support model ingestion api" +) +class TestIngestionProgressManager: + @pytest.fixture + def project(self, client: SpeckleClient) -> Project: + return client.project.create( + ProjectCreateInput( + name="test", description=None, visibility=ProjectVisibility.PUBLIC + ) + ) + + @pytest.fixture + def model(self, client: SpeckleClient, project: Project) -> Model: + return client.model.create( + CreateModelInput(name="test", description=None, project_id=project.id) + ) + + @pytest.fixture + def model_ingestion( + self, client: SpeckleClient, model: Model, project: Project + ) -> ModelIngestion: + input = ModelIngestionCreateInput( + model_id=model.id, + project_id=project.id, + progress_message="Starting processing", + source_data=SourceDataInput( + source_application_slug="pytest", + source_application_version="0.0.0", + file_name=None, + file_size_bytes=None, + ), + ) + + return client.model_ingestion.create(input) + + def test_progress_respects_throttle( + self, + client: SpeckleClient, + model_ingestion: ModelIngestion, + ) -> None: + EXPECTED_MESSAGE = "This is a test ingestion message" + EXPECTED_PROGRESS = 0.123123 + UPDATE_INTERVAL_SECONDS = 1 + + sut = IngestionProgressManager( + speckle_client=client, + ingestion=model_ingestion, + update_interval_seconds=UPDATE_INTERVAL_SECONDS, + ) + + assert sut.should_report_progress() is True + + res = sut.report(EXPECTED_MESSAGE, EXPECTED_PROGRESS) + + assert sut.should_report_progress() is False + + time.sleep(UPDATE_INTERVAL_SECONDS + 0.5) + + assert sut.should_report_progress() is True + assert sut.should_report_progress() is True + + assert res.status_data.progress_message == EXPECTED_MESSAGE + + def test_progress_no_throttle( + self, + client: SpeckleClient, + model_ingestion: ModelIngestion, + ) -> None: + EXPECTED_MESSAGE = "This is a test ingestion message" + EXPECTED_PROGRESS = 0.123123 + UPDATE_INTERVAL_SECONDS = 0 + + sut = IngestionProgressManager( + speckle_client=client, + ingestion=model_ingestion, + update_interval_seconds=UPDATE_INTERVAL_SECONDS, + ) + + assert sut.should_report_progress() is True + + res = sut.report(EXPECTED_MESSAGE, EXPECTED_PROGRESS) + + assert sut.should_report_progress() is True + assert sut.should_report_progress() is True + + assert res.status_data.progress_message == EXPECTED_MESSAGE