feat(speckle_ifc): More granular IFC conversion progress (#492)
* IFC importer progress * Fix broken tests after Ingestion state changes * Tests * final tweaks * add timeout to ModelIngestionCreateInput * print timing in seconds * default to None * small tweak * fix test * Fix other tests * Add progress transport
This commit is contained in:
@@ -61,4 +61,4 @@ def cmd_line_import() -> None:
|
|||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
start = time.time()
|
start = time.time()
|
||||||
cmd_line_import()
|
cmd_line_import()
|
||||||
print(f"Total time (including cleanup): {(time.time() - start) * 1000}ms")
|
print(f"Total time (including cleanup): {(time.time() - start):.3f}s")
|
||||||
|
|||||||
@@ -22,12 +22,15 @@ from specklepy.objects import Base
|
|||||||
from specklepy.objects.data_objects import DataObject
|
from specklepy.objects.data_objects import DataObject
|
||||||
from specklepy.objects.models.collections.collection import Collection
|
from specklepy.objects.models.collections.collection import Collection
|
||||||
from specklepy.objects.proxies import InstanceProxy
|
from specklepy.objects.proxies import InstanceProxy
|
||||||
|
from specklepy.progress.ingestion_progress import IngestionProgressManager
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class ImportJob:
|
class ImportJob:
|
||||||
ifc_file: file
|
ifc_file: file
|
||||||
|
|
||||||
|
progress: IngestionProgressManager
|
||||||
|
|
||||||
_render_material_manager: RenderMaterialProxyManager = field(
|
_render_material_manager: RenderMaterialProxyManager = field(
|
||||||
default_factory=lambda: RenderMaterialProxyManager()
|
default_factory=lambda: RenderMaterialProxyManager()
|
||||||
)
|
)
|
||||||
@@ -39,6 +42,7 @@ class ImportJob:
|
|||||||
)
|
)
|
||||||
geometries_count: int = 0
|
geometries_count: int = 0
|
||||||
geometries_used: int = 0
|
geometries_used: int = 0
|
||||||
|
elements_converted: int = 0
|
||||||
_current_storey_data_object: DataObject | None = field(default=None, init=False)
|
_current_storey_data_object: DataObject | None = field(default=None, init=False)
|
||||||
|
|
||||||
_display_value_cache: dict[int, list[Base]] = field(default_factory=dict)
|
_display_value_cache: dict[int, list[Base]] = field(default_factory=dict)
|
||||||
@@ -108,6 +112,12 @@ class ImportJob:
|
|||||||
|
|
||||||
# Restore previous storey context
|
# Restore previous storey context
|
||||||
self._current_storey_data_object = previous_storey_data_object
|
self._current_storey_data_object = previous_storey_data_object
|
||||||
|
self.elements_converted += 1
|
||||||
|
if self.progress.should_report_progress():
|
||||||
|
self.progress.report(
|
||||||
|
f"Converted {self.elements_converted:,} elements", None
|
||||||
|
)
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def _convert_children(self, step_element: entity_instance) -> list[Base]:
|
def _convert_children(self, step_element: entity_instance) -> list[Base]:
|
||||||
@@ -132,12 +142,16 @@ class ImportJob:
|
|||||||
def convert(self) -> Base:
|
def convert(self) -> Base:
|
||||||
start = time.time()
|
start = time.time()
|
||||||
self.pre_process_geometry()
|
self.pre_process_geometry()
|
||||||
print(f"Geometry conversion complete after {(time.time() - start) * 1000}ms")
|
print(
|
||||||
|
f"Geometry conversion complete after {(time.time() - start):.3f}s" # noqa: E501
|
||||||
|
)
|
||||||
print(f"Created {self.geometries_count} geometries")
|
print(f"Created {self.geometries_count} geometries")
|
||||||
|
|
||||||
start = time.time()
|
start = time.time()
|
||||||
root = self._convert_project_tree()
|
root = self._convert_project_tree()
|
||||||
print(f"Object tree conversion complete after {(time.time() - start) * 1000}ms")
|
print(
|
||||||
|
f"Element tree conversion complete after {(time.time() - start):.3f}s" # noqa: E501
|
||||||
|
)
|
||||||
print(f"Used {self.geometries_used} geometries")
|
print(f"Used {self.geometries_used} geometries")
|
||||||
return root
|
return root
|
||||||
|
|
||||||
@@ -145,7 +159,10 @@ class ImportJob:
|
|||||||
iterator = create_geometry_iterator(self.ifc_file)
|
iterator = create_geometry_iterator(self.ifc_file)
|
||||||
if not iterator.initialize():
|
if not iterator.initialize():
|
||||||
raise SpeckleException("Failed to find any geometry in file")
|
raise SpeckleException("Failed to find any geometry in file")
|
||||||
|
|
||||||
|
self.progress.report("Converting geometries", None)
|
||||||
self.geometries_count = 0
|
self.geometries_count = 0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
shape = cast(TriangulationElement, iterator.get())
|
shape = cast(TriangulationElement, iterator.get())
|
||||||
self.geometries_count += 1
|
self.geometries_count += 1
|
||||||
@@ -157,6 +174,11 @@ class ImportJob:
|
|||||||
raise SpeckleException(
|
raise SpeckleException(
|
||||||
f"Failed to convert geometry with id: {id}"
|
f"Failed to convert geometry with id: {id}"
|
||||||
) from ex
|
) from ex
|
||||||
|
|
||||||
|
if self.progress.should_report_progress():
|
||||||
|
self.progress.report(
|
||||||
|
f"Converted {self.geometries_count:,} geometries", None
|
||||||
|
)
|
||||||
if not iterator.next():
|
if not iterator.next():
|
||||||
break
|
break
|
||||||
|
|
||||||
@@ -197,6 +219,8 @@ class ImportJob:
|
|||||||
raise SpeckleException("Expected exactly one IfcProject in file")
|
raise SpeckleException("Expected exactly one IfcProject in file")
|
||||||
project = projects[0]
|
project = projects[0]
|
||||||
|
|
||||||
|
self.progress.report("Converting elements", None)
|
||||||
|
|
||||||
tree = self.convert_element(project)
|
tree = self.convert_element(project)
|
||||||
if not isinstance(tree, Collection):
|
if not isinstance(tree, Collection):
|
||||||
raise TypeError("Expected root object to convert to a Collection")
|
raise TypeError("Expected root object to convert to a Collection")
|
||||||
|
|||||||
+30
-25
@@ -11,19 +11,25 @@ from specklepy.core.api.inputs.model_ingestion_inputs import (
|
|||||||
ModelIngestionFailedInput,
|
ModelIngestionFailedInput,
|
||||||
ModelIngestionStartProcessingInput,
|
ModelIngestionStartProcessingInput,
|
||||||
ModelIngestionSuccessInput,
|
ModelIngestionSuccessInput,
|
||||||
ModelIngestionUpdateInput,
|
|
||||||
SourceDataInput,
|
SourceDataInput,
|
||||||
)
|
)
|
||||||
from specklepy.core.api.models.current import Project, Version
|
from specklepy.core.api.models.current import Project, Version
|
||||||
from specklepy.core.api.operations import send
|
from specklepy.core.api.operations import send
|
||||||
from specklepy.logging import metrics
|
from specklepy.logging import metrics
|
||||||
|
from specklepy.progress.ingestion_progress import IngestionProgressManager
|
||||||
|
from specklepy.progress.progress_transport import ProgressTransport
|
||||||
from specklepy.transports.server import ServerTransport
|
from specklepy.transports.server import ServerTransport
|
||||||
|
|
||||||
|
# Since progress messages are currently blocking (no async), we're being extra coarse
|
||||||
|
# with progress updates to ensure we're not waisting time sending updates.
|
||||||
|
# We could maybe go a little lower, but for now I'm not risking degrading performance
|
||||||
|
PROGRESS_INTERVAL_SECONDS = 10
|
||||||
|
|
||||||
|
|
||||||
def open_and_convert_file(
|
def open_and_convert_file(
|
||||||
file_path: str,
|
file_path: str,
|
||||||
project: Project,
|
project: Project,
|
||||||
version_message: str,
|
version_message: str | None,
|
||||||
model_ingestion_id: str,
|
model_ingestion_id: str,
|
||||||
client: SpeckleClient,
|
client: SpeckleClient,
|
||||||
) -> Version:
|
) -> Version:
|
||||||
@@ -33,7 +39,7 @@ def open_and_convert_file(
|
|||||||
path = Path(file_path)
|
path = Path(file_path)
|
||||||
|
|
||||||
specklepy_version = importlib.metadata.version("specklepy")
|
specklepy_version = importlib.metadata.version("specklepy")
|
||||||
client.model_ingestion.start_processing(
|
ingestion = client.model_ingestion.start_processing(
|
||||||
ModelIngestionStartProcessingInput(
|
ModelIngestionStartProcessingInput(
|
||||||
project_id=project.id,
|
project_id=project.id,
|
||||||
ingestion_id=model_ingestion_id,
|
ingestion_id=model_ingestion_id,
|
||||||
@@ -46,39 +52,38 @@ def open_and_convert_file(
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
progress = IngestionProgressManager(
|
||||||
|
client, ingestion, PROGRESS_INTERVAL_SECONDS
|
||||||
|
)
|
||||||
account = client.account
|
account = client.account
|
||||||
server_url = account.serverInfo.url
|
server_url = account.serverInfo.url
|
||||||
assert server_url
|
assert server_url
|
||||||
remote_transport = ServerTransport(project.id, account=account)
|
remote_transport = ServerTransport(project.id, account=account)
|
||||||
|
progress_transport = ProgressTransport(
|
||||||
|
progress,
|
||||||
|
)
|
||||||
|
|
||||||
|
progress.report("Opening file", None)
|
||||||
ifc_file = open_ifc(file_path) # pyright: ignore[reportUnknownVariableType]
|
ifc_file = open_ifc(file_path) # pyright: ignore[reportUnknownVariableType]
|
||||||
|
|
||||||
client.model_ingestion.update_progress(
|
import_job = ImportJob(ifc_file, progress) # pyright: ignore[reportUnknownArgumentType]
|
||||||
ModelIngestionUpdateInput(
|
|
||||||
project_id=project.id,
|
|
||||||
ingestion_id=model_ingestion_id,
|
|
||||||
progress_message="Converting file",
|
|
||||||
progress=None,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
import_job = ImportJob(ifc_file) # pyright: ignore[reportUnknownArgumentType]
|
|
||||||
data = import_job.convert()
|
data = import_job.convert()
|
||||||
|
|
||||||
print(f"File conversion complete after {(time.time() - start) * 1000}ms")
|
print(
|
||||||
|
f"File conversion complete after {(time.time() - start):.3f}s" # noqa: E501
|
||||||
|
)
|
||||||
|
|
||||||
start = time.time()
|
start = time.time()
|
||||||
|
|
||||||
client.model_ingestion.update_progress(
|
progress.report("Uploading objects", None)
|
||||||
ModelIngestionUpdateInput(
|
root_id = send(
|
||||||
project_id=project.id,
|
data,
|
||||||
ingestion_id=model_ingestion_id,
|
transports=[remote_transport, progress_transport],
|
||||||
progress_message="Uploading objects",
|
use_default_cache=False,
|
||||||
progress=None,
|
)
|
||||||
)
|
print(
|
||||||
|
f"Sending to speckle complete after: {(time.time() - start):.3f}s" # noqa: E501
|
||||||
)
|
)
|
||||||
root_id = send(data, transports=[remote_transport], use_default_cache=False)
|
|
||||||
print(f"Sending to speckle complete after: {(time.time() - start) * 1000}ms")
|
|
||||||
|
|
||||||
start = time.time()
|
start = time.time()
|
||||||
|
|
||||||
@@ -95,9 +100,9 @@ def open_and_convert_file(
|
|||||||
version = client.version.get(version_id, project.id)
|
version = client.version.get(version_id, project.id)
|
||||||
|
|
||||||
end = time.time()
|
end = time.time()
|
||||||
print(f"Version committed after: {(end - start) * 1000}ms")
|
print(f"Version committed after: {(end - start):.3f}s")
|
||||||
|
|
||||||
print(f"Total time (to commit): {(end - very_start) * 1000}ms")
|
print(f"Total time (to commit): {(end - very_start):.3f}s")
|
||||||
del ifc_file
|
del ifc_file
|
||||||
|
|
||||||
custom_properties = {"ui": "dui3", "actionSource": "import"}
|
custom_properties = {"ui": "dui3", "actionSource": "import"}
|
||||||
|
|||||||
@@ -0,0 +1,51 @@
|
|||||||
|
import time
|
||||||
|
|
||||||
|
from speckleifc.main import open_and_convert_file
|
||||||
|
from specklepy.api.client import SpeckleClient
|
||||||
|
from specklepy.core.api.credentials import get_accounts_for_server
|
||||||
|
from specklepy.logging import metrics
|
||||||
|
|
||||||
|
|
||||||
|
def _manual_import() -> None:
|
||||||
|
from specklepy.core.api.inputs.model_ingestion_inputs import (
|
||||||
|
ModelIngestionCreateInput,
|
||||||
|
SourceDataInput,
|
||||||
|
)
|
||||||
|
|
||||||
|
PROJECT_ID = "412a3c3927"
|
||||||
|
MODEL_ID = "223e61212d"
|
||||||
|
SERVER_URL = "latest.speckle.systems"
|
||||||
|
FILE_PATH = r"C:\Test Files\ifc\AC20-FZK-Haus.ifc" # noqa: E501
|
||||||
|
|
||||||
|
metrics.set_host_app(
|
||||||
|
"ifc",
|
||||||
|
)
|
||||||
|
|
||||||
|
account = get_accounts_for_server(SERVER_URL)[0]
|
||||||
|
client = SpeckleClient(SERVER_URL, use_ssl=not SERVER_URL.startswith("http://"))
|
||||||
|
client.authenticate_with_account(account)
|
||||||
|
|
||||||
|
ingestion = client.model_ingestion.create(
|
||||||
|
ModelIngestionCreateInput(
|
||||||
|
model_id=MODEL_ID,
|
||||||
|
project_id=PROJECT_ID,
|
||||||
|
progress_message="",
|
||||||
|
source_data=SourceDataInput(
|
||||||
|
source_application_slug="speckleifc",
|
||||||
|
source_application_version="0.0.0",
|
||||||
|
file_name=None,
|
||||||
|
file_size_bytes=None,
|
||||||
|
),
|
||||||
|
max_idle_timeout_seconds=2700, # 45mins
|
||||||
|
)
|
||||||
|
)
|
||||||
|
project = client.project.get(PROJECT_ID)
|
||||||
|
|
||||||
|
open_and_convert_file(FILE_PATH, project, None, ingestion.id, client)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
start = time.time()
|
||||||
|
|
||||||
|
_manual_import()
|
||||||
|
print(f"Total time (including cleanup): {(time.time() - start):.3f}s")
|
||||||
@@ -14,6 +14,7 @@ class ModelIngestionCreateInput(GraphQLBaseModel):
|
|||||||
project_id: str
|
project_id: str
|
||||||
progress_message: str
|
progress_message: str
|
||||||
source_data: SourceDataInput
|
source_data: SourceDataInput
|
||||||
|
max_idle_timeout_seconds: int | None = None
|
||||||
|
|
||||||
|
|
||||||
class ModelIngestionStartProcessingInput(GraphQLBaseModel):
|
class ModelIngestionStartProcessingInput(GraphQLBaseModel):
|
||||||
|
|||||||
@@ -254,12 +254,15 @@ class FileUploadUrl(GraphQLBaseModel):
|
|||||||
class ModelIngestionStatusData(GraphQLBaseModel):
|
class ModelIngestionStatusData(GraphQLBaseModel):
|
||||||
status: ModelIngestionStatus
|
status: ModelIngestionStatus
|
||||||
progress_message: str | None = None
|
progress_message: str | None = None
|
||||||
|
version_id: str | None = None
|
||||||
|
|
||||||
|
|
||||||
class ModelIngestion(GraphQLBaseModel):
|
class ModelIngestion(GraphQLBaseModel):
|
||||||
id: str
|
id: str
|
||||||
created_at: datetime
|
created_at: datetime
|
||||||
updated_at: datetime
|
updated_at: datetime
|
||||||
cancellation_requested: bool
|
|
||||||
model_id: str
|
model_id: str
|
||||||
|
project_id: str
|
||||||
|
user_id: str
|
||||||
|
cancellation_requested: bool
|
||||||
status_data: ModelIngestionStatusData
|
status_data: ModelIngestionStatusData
|
||||||
|
|||||||
@@ -50,6 +50,8 @@ class ModelIngestionResource(ResourceBase):
|
|||||||
createdAt
|
createdAt
|
||||||
updatedAt
|
updatedAt
|
||||||
modelId
|
modelId
|
||||||
|
projectId
|
||||||
|
userId
|
||||||
cancellationRequested
|
cancellationRequested
|
||||||
statusData {
|
statusData {
|
||||||
... on HasModelIngestionStatus {
|
... on HasModelIngestionStatus {
|
||||||
@@ -58,6 +60,10 @@ class ModelIngestionResource(ResourceBase):
|
|||||||
... on HasProgressMessage {
|
... on HasProgressMessage {
|
||||||
progressMessage
|
progressMessage
|
||||||
}
|
}
|
||||||
|
... on ModelIngestionSuccessStatus
|
||||||
|
{
|
||||||
|
versionId
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -87,6 +93,8 @@ class ModelIngestionResource(ResourceBase):
|
|||||||
createdAt
|
createdAt
|
||||||
updatedAt
|
updatedAt
|
||||||
modelId
|
modelId
|
||||||
|
projectId
|
||||||
|
userId
|
||||||
cancellationRequested
|
cancellationRequested
|
||||||
statusData {
|
statusData {
|
||||||
... on HasModelIngestionStatus {
|
... on HasModelIngestionStatus {
|
||||||
@@ -124,6 +132,8 @@ class ModelIngestionResource(ResourceBase):
|
|||||||
createdAt
|
createdAt
|
||||||
updatedAt
|
updatedAt
|
||||||
modelId
|
modelId
|
||||||
|
projectId
|
||||||
|
userId
|
||||||
cancellationRequested
|
cancellationRequested
|
||||||
statusData {
|
statusData {
|
||||||
... on HasModelIngestionStatus {
|
... on HasModelIngestionStatus {
|
||||||
@@ -159,6 +169,8 @@ class ModelIngestionResource(ResourceBase):
|
|||||||
createdAt
|
createdAt
|
||||||
updatedAt
|
updatedAt
|
||||||
modelId
|
modelId
|
||||||
|
projectId
|
||||||
|
userId
|
||||||
cancellationRequested
|
cancellationRequested
|
||||||
statusData {
|
statusData {
|
||||||
... on HasModelIngestionStatus {
|
... on HasModelIngestionStatus {
|
||||||
@@ -196,6 +208,8 @@ class ModelIngestionResource(ResourceBase):
|
|||||||
createdAt
|
createdAt
|
||||||
updatedAt
|
updatedAt
|
||||||
modelId
|
modelId
|
||||||
|
projectId
|
||||||
|
userId
|
||||||
cancellationRequested
|
cancellationRequested
|
||||||
statusData {
|
statusData {
|
||||||
... on HasModelIngestionStatus {
|
... on HasModelIngestionStatus {
|
||||||
@@ -277,6 +291,8 @@ class ModelIngestionResource(ResourceBase):
|
|||||||
createdAt
|
createdAt
|
||||||
updatedAt
|
updatedAt
|
||||||
modelId
|
modelId
|
||||||
|
projectId
|
||||||
|
userId
|
||||||
cancellationRequested
|
cancellationRequested
|
||||||
statusData {
|
statusData {
|
||||||
... on HasModelIngestionStatus {
|
... on HasModelIngestionStatus {
|
||||||
@@ -320,6 +336,8 @@ class ModelIngestionResource(ResourceBase):
|
|||||||
createdAt
|
createdAt
|
||||||
updatedAt
|
updatedAt
|
||||||
modelId
|
modelId
|
||||||
|
projectId
|
||||||
|
userId
|
||||||
cancellationRequested
|
cancellationRequested
|
||||||
statusData {
|
statusData {
|
||||||
... on HasModelIngestionStatus {
|
... on HasModelIngestionStatus {
|
||||||
@@ -370,6 +388,8 @@ class ModelIngestionResource(ResourceBase):
|
|||||||
createdAt
|
createdAt
|
||||||
updatedAt
|
updatedAt
|
||||||
modelId
|
modelId
|
||||||
|
projectId
|
||||||
|
userId
|
||||||
cancellationRequested
|
cancellationRequested
|
||||||
statusData {
|
statusData {
|
||||||
... on HasModelIngestionStatus {
|
... on HasModelIngestionStatus {
|
||||||
|
|||||||
@@ -224,6 +224,8 @@ class SubscriptionResource(ResourceBase):
|
|||||||
createdAt
|
createdAt
|
||||||
updatedAt
|
updatedAt
|
||||||
modelId
|
modelId
|
||||||
|
projectId
|
||||||
|
userId
|
||||||
cancellationRequested
|
cancellationRequested
|
||||||
statusData {
|
statusData {
|
||||||
... on HasModelIngestionStatus {
|
... on HasModelIngestionStatus {
|
||||||
@@ -232,6 +234,10 @@ class SubscriptionResource(ResourceBase):
|
|||||||
... on HasProgressMessage {
|
... on HasProgressMessage {
|
||||||
progressMessage
|
progressMessage
|
||||||
}
|
}
|
||||||
|
... on ModelIngestionSuccessStatus
|
||||||
|
{
|
||||||
|
versionId
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
type
|
type
|
||||||
|
|||||||
@@ -0,0 +1,68 @@
|
|||||||
|
from time import monotonic
|
||||||
|
|
||||||
|
from specklepy.core.api.client import SpeckleClient
|
||||||
|
from specklepy.core.api.inputs.model_ingestion_inputs import ModelIngestionUpdateInput
|
||||||
|
from specklepy.core.api.models.current import ModelIngestion
|
||||||
|
|
||||||
|
|
||||||
|
class IngestionProgressManager:
|
||||||
|
"""
|
||||||
|
Provides a performant way to report ingestion progress.
|
||||||
|
|
||||||
|
Allows callers to throttle ingestion progress messages based on an update interval.
|
||||||
|
Throttling prevents callers from overwhelming the server with constant progress updates,
|
||||||
|
and minimises blocking high-speed operations with too frequent progress updates.
|
||||||
|
|
||||||
|
Callers can use this pattern for reporting throttled progress
|
||||||
|
```
|
||||||
|
if progress.should_report_progress():
|
||||||
|
progress.report(f"Converting geometries ({current:,}/{total:,})", current - total)
|
||||||
|
```
|
||||||
|
|
||||||
|
And for unthrottled progress (e.g. between phases)
|
||||||
|
```
|
||||||
|
progress.report(f"Next phases has started (0/{total:,})", 0)
|
||||||
|
|
||||||
|
This class is similar to the `IngestionProgressManager` in the .NET SDK
|
||||||
|
Unlike in .NET, we recommend using a very coarse `update_interval_seconds`; since we're not using async messages,
|
||||||
|
they are blocking and will degrade performance if used too frequently.
|
||||||
|
```
|
||||||
|
|
||||||
|
""" # noqa: E501
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
speckle_client: SpeckleClient,
|
||||||
|
ingestion: ModelIngestion,
|
||||||
|
update_interval_seconds: float,
|
||||||
|
):
|
||||||
|
self.speckle_client = speckle_client
|
||||||
|
self.ingestion = ingestion
|
||||||
|
self.update_interval = update_interval_seconds
|
||||||
|
|
||||||
|
self._last_updated_at = 0.0
|
||||||
|
|
||||||
|
def report(self, progress_message: str, progress: float | None) -> ModelIngestion:
|
||||||
|
"""
|
||||||
|
Reports a progress update
|
||||||
|
"""
|
||||||
|
self._last_updated_at = monotonic()
|
||||||
|
formatted_progress = f"{progress:.0%}" if progress else ""
|
||||||
|
print(f"Progress update: {progress_message} {formatted_progress}")
|
||||||
|
|
||||||
|
return self.speckle_client.model_ingestion.update_progress(
|
||||||
|
ModelIngestionUpdateInput(
|
||||||
|
ingestion_id=self.ingestion.id,
|
||||||
|
project_id=self.ingestion.project_id,
|
||||||
|
progress_message=progress_message,
|
||||||
|
progress=progress,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def should_report_progress(self) -> bool:
|
||||||
|
"""
|
||||||
|
Returns `true` if it's time for an update,
|
||||||
|
`false` if it's too soon since the last update
|
||||||
|
"""
|
||||||
|
elapsed = monotonic() - self._last_updated_at
|
||||||
|
return elapsed >= self.update_interval
|
||||||
@@ -0,0 +1,64 @@
|
|||||||
|
from typing import Dict, List
|
||||||
|
|
||||||
|
from specklepy.progress.ingestion_progress import IngestionProgressManager
|
||||||
|
from specklepy.transports.abstract_transport import AbstractTransport
|
||||||
|
|
||||||
|
|
||||||
|
class ProgressTransport(AbstractTransport):
|
||||||
|
"""
|
||||||
|
This transport does not persist objects anywhere,
|
||||||
|
instead it simply reacts to save_object being called,
|
||||||
|
and reports throttled progress.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
progress: IngestionProgressManager,
|
||||||
|
name="Progress",
|
||||||
|
progress_message_template="Uploading objects {:,}",
|
||||||
|
) -> None:
|
||||||
|
super().__init__()
|
||||||
|
self._name = name
|
||||||
|
self._progress = progress
|
||||||
|
self._progress_message_template = progress_message_template
|
||||||
|
self.saved_object_count = 0
|
||||||
|
|
||||||
|
def _throttle_progress(self) -> None:
|
||||||
|
if self._progress.should_report_progress():
|
||||||
|
self._progress.report(
|
||||||
|
self._progress_message_template.format(self.saved_object_count), None
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self) -> str:
|
||||||
|
return self._name
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
return f"ProgressTransport(objects: {self.saved_object_count})"
|
||||||
|
|
||||||
|
def save_object(self, id: str, serialized_object: str) -> None:
|
||||||
|
self.saved_object_count += 1
|
||||||
|
self._throttle_progress()
|
||||||
|
|
||||||
|
def save_object_from_transport(
|
||||||
|
self, id: str, source_transport: AbstractTransport
|
||||||
|
) -> None:
|
||||||
|
self.saved_object_count += 1
|
||||||
|
self._throttle_progress()
|
||||||
|
|
||||||
|
def get_object(self, id: str) -> str | None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def has_objects(self, id_list: List[str]) -> Dict[str, bool]:
|
||||||
|
return {id: False for id in id_list}
|
||||||
|
|
||||||
|
def begin_write(self) -> None:
|
||||||
|
self.saved_object_count = 0
|
||||||
|
|
||||||
|
def end_write(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def copy_object_and_children(
|
||||||
|
self, id: str, target_transport: AbstractTransport
|
||||||
|
) -> str:
|
||||||
|
raise NotImplementedError
|
||||||
@@ -35,7 +35,7 @@ from tests.integration.conftest import is_public
|
|||||||
@pytest.mark.skipif(is_public(), reason="The public API does not support these tests")
|
@pytest.mark.skipif(is_public(), reason="The public API does not support these tests")
|
||||||
class TestIngestionResource:
|
class TestIngestionResource:
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def project(self, client: SpeckleClient):
|
def project(self, client: SpeckleClient) -> Project:
|
||||||
return client.project.create(
|
return client.project.create(
|
||||||
ProjectCreateInput(
|
ProjectCreateInput(
|
||||||
name="test", description=None, visibility=ProjectVisibility.PUBLIC
|
name="test", description=None, visibility=ProjectVisibility.PUBLIC
|
||||||
@@ -43,12 +43,12 @@ class TestIngestionResource:
|
|||||||
)
|
)
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def model(self, client: SpeckleClient, project: Project):
|
def model(self, client: SpeckleClient, project: Project) -> Model:
|
||||||
return client.model.create(
|
return client.model.create(
|
||||||
CreateModelInput(name="test", description=None, project_id=project.id)
|
CreateModelInput(name="test", description=None, project_id=project.id)
|
||||||
)
|
)
|
||||||
|
|
||||||
@pytest.fixture()
|
@pytest.fixture
|
||||||
def ingestion(
|
def ingestion(
|
||||||
self, client: SpeckleClient, model: Model, project: Project
|
self, client: SpeckleClient, model: Model, project: Project
|
||||||
) -> ModelIngestion:
|
) -> ModelIngestion:
|
||||||
@@ -71,11 +71,16 @@ class TestIngestionResource:
|
|||||||
assert isinstance(ingestion.updated_at, datetime)
|
assert isinstance(ingestion.updated_at, datetime)
|
||||||
assert isinstance(ingestion.cancellation_requested, bool)
|
assert isinstance(ingestion.cancellation_requested, bool)
|
||||||
assert isinstance(ingestion.model_id, str)
|
assert isinstance(ingestion.model_id, str)
|
||||||
|
assert isinstance(ingestion.project_id, str)
|
||||||
|
assert isinstance(ingestion.user_id, str)
|
||||||
assert isinstance(ingestion.status_data, ModelIngestionStatusData)
|
assert isinstance(ingestion.status_data, ModelIngestionStatusData)
|
||||||
assert isinstance(ingestion.status_data.progress_message, str | None)
|
assert isinstance(ingestion.status_data.progress_message, str | None)
|
||||||
|
assert ingestion.status_data.version_id is None
|
||||||
assert ingestion.status_data.status == ModelIngestionStatus.PROCESSING
|
assert ingestion.status_data.status == ModelIngestionStatus.PROCESSING
|
||||||
assert not ingestion.cancellation_requested
|
assert not ingestion.cancellation_requested
|
||||||
assert ingestion.model_id == model.id
|
assert ingestion.model_id == model.id
|
||||||
|
assert ingestion.project_id == project.id
|
||||||
|
assert ingestion.user_id == client.account.userInfo.id
|
||||||
|
|
||||||
return ingestion
|
return ingestion
|
||||||
|
|
||||||
@@ -86,6 +91,7 @@ class TestIngestionResource:
|
|||||||
project.id, ingestion.id
|
project.id, ingestion.id
|
||||||
)
|
)
|
||||||
assert queried_ingestion.id == ingestion.id
|
assert queried_ingestion.id == ingestion.id
|
||||||
|
|
||||||
assert queried_ingestion.status_data.status == ingestion.status_data.status
|
assert queried_ingestion.status_data.status == ingestion.status_data.status
|
||||||
|
|
||||||
def test_update_progress(
|
def test_update_progress(
|
||||||
@@ -210,12 +216,13 @@ class TestIngestionResource:
|
|||||||
version_message=None,
|
version_message=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
res = client.model_ingestion.complete(input)
|
version_id = client.model_ingestion.complete(input)
|
||||||
|
res = client.model_ingestion.get_ingestion(project.id, ingestion.id)
|
||||||
|
|
||||||
assert isinstance(res, str)
|
assert isinstance(version_id, str)
|
||||||
version = client.version.get(res, project.id)
|
version = client.version.get(version_id, project.id)
|
||||||
assert isinstance(version, Version)
|
assert isinstance(version, Version)
|
||||||
|
assert res.status_data.version_id == version_id
|
||||||
# trying to complete for a second time should throw
|
# trying to complete for a second time should throw
|
||||||
# with pytest.raises(GraphQLException):
|
# with pytest.raises(GraphQLException):
|
||||||
# _ = client.ingestion.complete(input)
|
# _ = client.ingestion.complete(input)
|
||||||
@@ -234,12 +241,6 @@ class TestIngestionResource:
|
|||||||
assert not res.cancellation_requested
|
assert not res.cancellation_requested
|
||||||
assert res.status_data.status == ModelIngestionStatus.CANCELLED
|
assert res.status_data.status == ModelIngestionStatus.CANCELLED
|
||||||
|
|
||||||
# Cancel again, should be idempotent
|
|
||||||
res = client.model_ingestion.fail_with_cancel(input)
|
|
||||||
assert res.status_data.progress_message is None
|
|
||||||
assert not res.cancellation_requested
|
|
||||||
assert res.status_data.status == ModelIngestionStatus.CANCELLED
|
|
||||||
|
|
||||||
def test_error_non_existent_ingestion(
|
def test_error_non_existent_ingestion(
|
||||||
self, client: SpeckleClient, project: Project
|
self, client: SpeckleClient, project: Project
|
||||||
):
|
):
|
||||||
@@ -264,6 +265,7 @@ class TestIngestionResource:
|
|||||||
with pytest.raises(GraphQLException):
|
with pytest.raises(GraphQLException):
|
||||||
_ = client.model_ingestion.fail_with_error(input)
|
_ = client.model_ingestion.fail_with_error(input)
|
||||||
|
|
||||||
|
@pytest.mark.skip(reason="TEST FAILS - server behaviour was changed")
|
||||||
def test_complete_non_existent_root_object(
|
def test_complete_non_existent_root_object(
|
||||||
self, client: SpeckleClient, ingestion: ModelIngestion, project: Project
|
self, client: SpeckleClient, ingestion: ModelIngestion, project: Project
|
||||||
):
|
):
|
||||||
|
|||||||
@@ -0,0 +1,105 @@
|
|||||||
|
import time
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from specklepy.core.api.client import SpeckleClient
|
||||||
|
from specklepy.core.api.enums import ProjectVisibility
|
||||||
|
from specklepy.core.api.inputs.model_ingestion_inputs import (
|
||||||
|
ModelIngestionCreateInput,
|
||||||
|
SourceDataInput,
|
||||||
|
)
|
||||||
|
from specklepy.core.api.inputs.model_inputs import CreateModelInput
|
||||||
|
from specklepy.core.api.inputs.project_inputs import ProjectCreateInput
|
||||||
|
from specklepy.core.api.models.current import Model, ModelIngestion, Project
|
||||||
|
from specklepy.progress.ingestion_progress import IngestionProgressManager
|
||||||
|
from tests.integration.conftest import is_public
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.run()
|
||||||
|
@pytest.mark.skipif(
|
||||||
|
is_public(), reason="The public API does not support model ingestion api"
|
||||||
|
)
|
||||||
|
class TestIngestionProgressManager:
|
||||||
|
@pytest.fixture
|
||||||
|
def project(self, client: SpeckleClient) -> Project:
|
||||||
|
return client.project.create(
|
||||||
|
ProjectCreateInput(
|
||||||
|
name="test", description=None, visibility=ProjectVisibility.PUBLIC
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def model(self, client: SpeckleClient, project: Project) -> Model:
|
||||||
|
return client.model.create(
|
||||||
|
CreateModelInput(name="test", description=None, project_id=project.id)
|
||||||
|
)
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def model_ingestion(
|
||||||
|
self, client: SpeckleClient, model: Model, project: Project
|
||||||
|
) -> ModelIngestion:
|
||||||
|
input = ModelIngestionCreateInput(
|
||||||
|
model_id=model.id,
|
||||||
|
project_id=project.id,
|
||||||
|
progress_message="Starting processing",
|
||||||
|
source_data=SourceDataInput(
|
||||||
|
source_application_slug="pytest",
|
||||||
|
source_application_version="0.0.0",
|
||||||
|
file_name=None,
|
||||||
|
file_size_bytes=None,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
return client.model_ingestion.create(input)
|
||||||
|
|
||||||
|
def test_progress_respects_throttle(
|
||||||
|
self,
|
||||||
|
client: SpeckleClient,
|
||||||
|
model_ingestion: ModelIngestion,
|
||||||
|
) -> None:
|
||||||
|
EXPECTED_MESSAGE = "This is a test ingestion message"
|
||||||
|
EXPECTED_PROGRESS = 0.123123
|
||||||
|
UPDATE_INTERVAL_SECONDS = 1
|
||||||
|
|
||||||
|
sut = IngestionProgressManager(
|
||||||
|
speckle_client=client,
|
||||||
|
ingestion=model_ingestion,
|
||||||
|
update_interval_seconds=UPDATE_INTERVAL_SECONDS,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert sut.should_report_progress() is True
|
||||||
|
|
||||||
|
res = sut.report(EXPECTED_MESSAGE, EXPECTED_PROGRESS)
|
||||||
|
|
||||||
|
assert sut.should_report_progress() is False
|
||||||
|
|
||||||
|
time.sleep(UPDATE_INTERVAL_SECONDS + 0.5)
|
||||||
|
|
||||||
|
assert sut.should_report_progress() is True
|
||||||
|
assert sut.should_report_progress() is True
|
||||||
|
|
||||||
|
assert res.status_data.progress_message == EXPECTED_MESSAGE
|
||||||
|
|
||||||
|
def test_progress_no_throttle(
|
||||||
|
self,
|
||||||
|
client: SpeckleClient,
|
||||||
|
model_ingestion: ModelIngestion,
|
||||||
|
) -> None:
|
||||||
|
EXPECTED_MESSAGE = "This is a test ingestion message"
|
||||||
|
EXPECTED_PROGRESS = 0.123123
|
||||||
|
UPDATE_INTERVAL_SECONDS = 0
|
||||||
|
|
||||||
|
sut = IngestionProgressManager(
|
||||||
|
speckle_client=client,
|
||||||
|
ingestion=model_ingestion,
|
||||||
|
update_interval_seconds=UPDATE_INTERVAL_SECONDS,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert sut.should_report_progress() is True
|
||||||
|
|
||||||
|
res = sut.report(EXPECTED_MESSAGE, EXPECTED_PROGRESS)
|
||||||
|
|
||||||
|
assert sut.should_report_progress() is True
|
||||||
|
assert sut.should_report_progress() is True
|
||||||
|
|
||||||
|
assert res.status_data.progress_message == EXPECTED_MESSAGE
|
||||||
Reference in New Issue
Block a user