From 8249cd2184255970486612fb2e1fe6546fb672a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20Jedlicska?= <57442769+gjedlicska@users.noreply.github.com> Date: Mon, 8 Dec 2025 13:25:54 +0100 Subject: [PATCH] Jedd/cxpla 340 specklepy (#475) * First pass * add tests * Add cancellation * fix * status changes * fixes * test fixes * tests(subscriptions): fix model ingestion tests * feat(modelingestion): rename resource and add some more tests * feat(ifcimport): use new modelingestion api * feat: wrap up new ingestion * fix: model ingestion payload and test server url * fix: test port was 3000 * fix: remove version message from model ingestion success input * fix: test subs cancelled * ci: signal public of private envv in ci --------- Co-authored-by: Jedd Morgan <45512892+JR-Morgan@users.noreply.github.com> --- .github/workflows/pr.yml | 2 + codebook.toml | 1 + mise.toml | 9 +- pyproject.toml | 1 + src/speckleifc/__main__.py | 9 +- src/speckleifc/main.py | 126 ++++-- src/specklepy/api/client.py | 7 + src/specklepy/api/resources/__init__.py | 5 + .../resources/current/file_import_resource.py | 2 +- .../current/model_ingestion_resource.py | 53 +++ src/specklepy/core/api/client.py | 7 + src/specklepy/core/api/enums.py | 15 + .../core/api/inputs/model_ingestion_inputs.py | 88 ++++ src/specklepy/core/api/models/current.py | 16 +- .../core/api/models/subscription_messages.py | 8 +- src/specklepy/core/api/resources/__init__.py | 5 + .../resources/current/file_import_resource.py | 6 +- .../current/model_ingestion_resource.py | 398 ++++++++++++++++++ .../current/subscription_resource.py | 70 ++- .../api/resources/current/version_resource.py | 2 +- .../resources/current/workspace_resource.py | 2 +- .../current/test_model_ingestion_resource.py | 268 ++++++++++++ .../current/test_subscription_resource.py | 283 +++++++++++-- tests/integration/conftest.py | 5 + uv.lock | 2 +- 25 files changed, 1305 insertions(+), 85 deletions(-) create mode 100644 codebook.toml create mode 100644 src/specklepy/api/resources/current/model_ingestion_resource.py create mode 100644 src/specklepy/core/api/inputs/model_ingestion_inputs.py create mode 100644 src/specklepy/core/api/resources/current/model_ingestion_resource.py create mode 100644 tests/integration/client/current/test_model_ingestion_resource.py diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index d8b205e..56367f0 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -59,6 +59,8 @@ jobs: test-public: # Run integration tests against the public server image name: Test (public) runs-on: ubuntu-latest + env: + IS_PUBLIC: "true" strategy: matrix: python-version: diff --git a/codebook.toml b/codebook.toml new file mode 100644 index 0000000..c81261a --- /dev/null +++ b/codebook.toml @@ -0,0 +1 @@ +words = ["specklepy"] diff --git a/mise.toml b/mise.toml index 42104f0..7c49dbe 100644 --- a/mise.toml +++ b/mise.toml @@ -8,7 +8,7 @@ python.uv_venv_auto = true [tasks.install] -run= "uv sync --all-groups" +run= "uv sync --all-extras --all-groups" [tasks.install_docs] @@ -18,3 +18,10 @@ run= "uv sync --group docs" description = "Build static docs " run = "uv run mkdocs build" depends = ['install_docs'] + +[tasks.test] +run = "uv run pytest" + + +[env] +IS_PUBLIC = "false" diff --git a/pyproject.toml b/pyproject.toml index 1de50f7..1f93992 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ dependencies = [ speckleifc = ["ifcopenshell>=0.8.3.post2"] [dependency-groups] + dev = [ "commitizen>=4.1.0", "devtools>=0.12.2", diff --git a/src/speckleifc/__main__.py b/src/speckleifc/__main__.py index ea3e4bc..d0e66a1 100644 --- a/src/speckleifc/__main__.py +++ b/src/speckleifc/__main__.py @@ -18,7 +18,7 @@ def cmd_line_import() -> None: parser.add_argument("output_path") parser.add_argument("project_id") parser.add_argument("version_message") - parser.add_argument("model_id") + parser.add_argument("model_ingestion_id") # parser.add_argument("model_name") # parser.add_argument("region_name") @@ -32,6 +32,8 @@ def cmd_line_import() -> None: "ifc", ) + client: SpeckleClient | None = None + try: client = SpeckleClient(SERVER_URL, use_ssl=not SERVER_URL.startswith("http://")) client.authenticate_with_token(TOKEN) @@ -41,13 +43,14 @@ def cmd_line_import() -> None: args.file_path, project, args.version_message, - args.model_id, + args.model_ingestion_id, client, ) with open(args.output_path, "w") as f: json.dump({"success": True, "commitId": version.id}, f) except Exception as e: - error_msg = f"IFC Importer failed with exception:\n{traceback.format_exc()}" + stack_trace = traceback.format_exc() + error_msg = f"IFC Importer failed with exception:\n{stack_trace}" print(error_msg) # Write error result diff --git a/src/speckleifc/main.py b/src/speckleifc/main.py index e2e9003..e3350d3 100644 --- a/src/speckleifc/main.py +++ b/src/speckleifc/main.py @@ -1,9 +1,19 @@ +import contextlib +import importlib.metadata import time +import traceback +from pathlib import Path from speckleifc.ifc_geometry_processing import open_ifc from speckleifc.importer import ImportJob from specklepy.core.api.client import SpeckleClient -from specklepy.core.api.inputs.version_inputs import CreateVersionInput +from specklepy.core.api.inputs.model_ingestion_inputs import ( + ModelIngestionFailedInput, + ModelIngestionStartProcessingInput, + ModelIngestionSuccessInput, + ModelIngestionUpdateInput, + SourceDataInput, +) from specklepy.core.api.models.current import Project, Version from specklepy.core.api.operations import send from specklepy.logging import metrics @@ -14,48 +24,98 @@ def open_and_convert_file( file_path: str, project: Project, version_message: str | None, - model_id: str, + model_ingestion_id: str, client: SpeckleClient, ) -> Version: - start = time.time() - very_start = start + try: + start = time.time() + very_start = start + path = Path(file_path) - account = client.account - server_url = account.serverInfo.url - assert server_url - remote_transport = ServerTransport(project.id, account=account) + specklepy_version = importlib.metadata.version("specklepy") + client.model_ingestion.start_processing( + ModelIngestionStartProcessingInput( + project_id=project.id, + ingestion_id=model_ingestion_id, + progress_message="Importing IFC file", + source_data=SourceDataInput( + file_name=path.name, + file_size_bytes=path.stat().st_size, + source_application_slug="fileimports-ifc", + source_application_version=specklepy_version, + ), + ) + ) - ifc_file = open_ifc(file_path) # pyright: ignore[reportUnknownVariableType] + account = client.account + server_url = account.serverInfo.url + assert server_url + remote_transport = ServerTransport(project.id, account=account) - import_job = ImportJob(ifc_file) # pyright: ignore[reportUnknownArgumentType] - data = import_job.convert() + ifc_file = open_ifc(file_path) # pyright: ignore[reportUnknownVariableType] - print(f"File conversion complete after {(time.time() - start) * 1000}ms") + client.model_ingestion.update_progress( + ModelIngestionUpdateInput( + project_id=project.id, + ingestion_id=model_ingestion_id, + progress_message="File validated, converting", + progress=None, + ) + ) + import_job = ImportJob(ifc_file) # pyright: ignore[reportUnknownArgumentType] + data = import_job.convert() - start = time.time() + print(f"File conversion complete after {(time.time() - start) * 1000}ms") - root_id = send(data, transports=[remote_transport], use_default_cache=False) - print(f"Sending to speckle complete after: {(time.time() - start) * 1000}ms") + start = time.time() - start = time.time() + client.model_ingestion.update_progress( + ModelIngestionUpdateInput( + project_id=project.id, + ingestion_id=model_ingestion_id, + progress_message="Conversion complete, sending", + progress=None, + ) + ) + root_id = send(data, transports=[remote_transport], use_default_cache=False) + print(f"Sending to speckle complete after: {(time.time() - start) * 1000}ms") - create_version = CreateVersionInput( - object_id=root_id, - model_id=model_id, - project_id=project.id, - message=version_message, - source_application="ifc", - ) - version = client.version.create(create_version) - end = time.time() - print(f"Version committed after: {(end - start) * 1000}ms") + start = time.time() - print(f"Total time (to commit): {(end - very_start) * 1000}ms") - del ifc_file + version_id = client.model_ingestion.complete( + ModelIngestionSuccessInput( + project_id=project.id, + ingestion_id=model_ingestion_id, + root_object_id=root_id, + # version_message=version_message, + ) + ) - custom_properties = {"ui": "dui3", "actionSource": "import"} - if project.workspace_id: - custom_properties["workspace_id"] = project.workspace_id - metrics.track(metrics.SEND, account, custom_properties, send_sync=True) + # needed to query version until ingestion api expands to serve it + version = client.version.get(version_id, project.id) - return version + end = time.time() + print(f"Version committed after: {(end - start) * 1000}ms") + + print(f"Total time (to commit): {(end - very_start) * 1000}ms") + del ifc_file + + custom_properties = {"ui": "dui3", "actionSource": "import"} + if project.workspace_id: + custom_properties["workspace_id"] = project.workspace_id + metrics.track(metrics.SEND, account, custom_properties, send_sync=True) + + return version + except Exception as e: + stack_trace = traceback.format_exc() + with contextlib.suppress(Exception): + # make sure to not report process kills when we're cancelling + client.model_ingestion.fail_with_error( + ModelIngestionFailedInput( + project_id=project.id, + ingestion_id=model_ingestion_id, + error_reason=str(e), + error_stacktrace=stack_trace, + ) + ) + raise e diff --git a/src/specklepy/api/client.py b/src/specklepy/api/client.py index 975400e..71593e2 100644 --- a/src/specklepy/api/client.py +++ b/src/specklepy/api/client.py @@ -4,6 +4,7 @@ from specklepy.api.credentials import Account from specklepy.api.resources import ( ActiveUserResource, FileImportResource, + ModelIngestionResource, ModelResource, OtherUserResource, ProjectInviteResource, @@ -119,6 +120,12 @@ class SpeckleClient(CoreSpeckleClient): client=self.httpclient, server_version=server_version, ) + self.model_ingestion = ModelIngestionResource( + account=self.account, + basepath=self.url, + client=self.httpclient, + server_version=server_version, + ) self.file_import = FileImportResource( account=self.account, basepath=self.url, diff --git a/src/specklepy/api/resources/__init__.py b/src/specklepy/api/resources/__init__.py index de92988..7680d83 100644 --- a/src/specklepy/api/resources/__init__.py +++ b/src/specklepy/api/resources/__init__.py @@ -1,5 +1,8 @@ from specklepy.api.resources.current.active_user_resource import ActiveUserResource from specklepy.api.resources.current.file_import_resource import FileImportResource +from specklepy.api.resources.current.model_ingestion_resource import ( + ModelIngestionResource, +) from specklepy.api.resources.current.model_resource import ModelResource from specklepy.api.resources.current.other_user_resource import OtherUserResource from specklepy.api.resources.current.project_invite_resource import ( @@ -22,4 +25,6 @@ __all__ = [ "SubscriptionResource", "VersionResource", "WorkspaceResource", + "FileImportResource", + "ModelIngestionResource", ] diff --git a/src/specklepy/api/resources/current/file_import_resource.py b/src/specklepy/api/resources/current/file_import_resource.py index f2e2735..514d50c 100644 --- a/src/specklepy/api/resources/current/file_import_resource.py +++ b/src/specklepy/api/resources/current/file_import_resource.py @@ -15,7 +15,7 @@ from specklepy.logging import metrics class FileImportResource(CoreResource): - """API Access class for projects""" + """API Access class for file imports""" def __init__(self, account, basepath, client, server_version) -> None: super().__init__( diff --git a/src/specklepy/api/resources/current/model_ingestion_resource.py b/src/specklepy/api/resources/current/model_ingestion_resource.py new file mode 100644 index 0000000..22ac999 --- /dev/null +++ b/src/specklepy/api/resources/current/model_ingestion_resource.py @@ -0,0 +1,53 @@ +from specklepy.core.api.inputs.model_ingestion_inputs import ( + ModelIngestionCancelledInput, + ModelIngestionCreateInput, + ModelIngestionFailedInput, + ModelIngestionRequeueInput, + ModelIngestionStartProcessingInput, + ModelIngestionSuccessInput, + ModelIngestionUpdateInput, +) +from specklepy.core.api.models.current import ( + ModelIngestion, +) +from specklepy.core.api.resources import ( + ModelIngestionResource as CoreResource, +) +from specklepy.logging import metrics + + +class ModelIngestionResource(CoreResource): + """API Access class for model ingestion""" + + def __init__(self, account, basepath, client, server_version) -> None: + super().__init__(account, basepath, client, server_version) + + def create(self, input: ModelIngestionCreateInput) -> ModelIngestion: + metrics.track(metrics.SDK, self.account, {"name": "Ingestion Create"}) + return super().create(input) + + def update_progress(self, input: ModelIngestionUpdateInput) -> ModelIngestion: + metrics.track(metrics.SDK, self.account, {"name": "Ingestion Update"}) + return super().update_progress(input) + + def start_processing( + self, input: ModelIngestionStartProcessingInput + ) -> ModelIngestion: + metrics.track(metrics.SDK, self.account, {"name": "Ingestion Start Processing"}) + return super().start_processing(input) + + def requeue(self, input: ModelIngestionRequeueInput) -> ModelIngestion: + metrics.track(metrics.SDK, self.account, {"name": "Ingestion Update"}) + return super().requeue(input) + + def complete_successfully(self, input: ModelIngestionSuccessInput) -> str: + metrics.track(metrics.SDK, self.account, {"name": "Ingestion End"}) + return super().complete(input) + + def complete_failed(self, input: ModelIngestionFailedInput) -> ModelIngestion: + metrics.track(metrics.SDK, self.account, {"name": "Ingestion Error"}) + return super().fail_with_error(input) + + def fail_with_cancel(self, input: ModelIngestionCancelledInput) -> ModelIngestion: + metrics.track(metrics.SDK, self.account, {"name": "Ingestion Cancel"}) + return super().fail_with_cancel(input) diff --git a/src/specklepy/core/api/client.py b/src/specklepy/core/api/client.py index 9585c6d..dccee60 100644 --- a/src/specklepy/core/api/client.py +++ b/src/specklepy/core/api/client.py @@ -12,6 +12,7 @@ from specklepy.core.api.credentials import Account from specklepy.core.api.resources import ( ActiveUserResource, FileImportResource, + ModelIngestionResource, ModelResource, OtherUserResource, ProjectInviteResource, @@ -250,6 +251,12 @@ class SpeckleClient: client=self.httpclient, server_version=server_version, ) + self.model_ingestion = ModelIngestionResource( + account=self.account, + basepath=self.url, + client=self.httpclient, + server_version=server_version, + ) self.subscription = SubscriptionResource( account=self.account, basepath=self.ws_url, diff --git a/src/specklepy/core/api/enums.py b/src/specklepy/core/api/enums.py index 0514388..702b365 100644 --- a/src/specklepy/core/api/enums.py +++ b/src/specklepy/core/api/enums.py @@ -30,3 +30,18 @@ class ProjectVersionsUpdatedMessageType(str, Enum): CREATED = "CREATED" DELETED = "DELETED" UPDATED = "UPDATED" + + +class ProjectModelIngestionUpdatedMessageType(str, Enum): + CANCELLATION_REQUESTED = "cancellationRequested" + CREATED = "created" + DELETED = "deleted" + UPDATED = "updated" + + +class ModelIngestionStatus(str, Enum): + CANCELLED = "cancelled" + FAILED = "failed" + PROCESSING = "processing" + QUEUED = "queued" + SUCCESS = "success" diff --git a/src/specklepy/core/api/inputs/model_ingestion_inputs.py b/src/specklepy/core/api/inputs/model_ingestion_inputs.py new file mode 100644 index 0000000..6cd49b6 --- /dev/null +++ b/src/specklepy/core/api/inputs/model_ingestion_inputs.py @@ -0,0 +1,88 @@ +from specklepy.core.api.enums import ProjectModelIngestionUpdatedMessageType +from specklepy.core.api.models.graphql_base_model import GraphQLBaseModel + + +class SourceDataInput(GraphQLBaseModel): + source_application_slug: str + source_application_version: str + file_name: str | None + file_size_bytes: int | None + + +class ModelIngestionCreateInput(GraphQLBaseModel): + model_id: str + project_id: str + progress_message: str + source_data: SourceDataInput + + +class ModelIngestionStartProcessingInput(GraphQLBaseModel): + ingestion_id: str + project_id: str + progress_message: str + source_data: SourceDataInput + + +class ModelIngestionRequeueInput(GraphQLBaseModel): + ingestion_id: str + project_id: str + progress_message: str + + +class ModelIngestionUpdateInput(GraphQLBaseModel): + ingestion_id: str + project_id: str + progress: float | None + progress_message: str + + +class ModelIngestionSuccessInput(GraphQLBaseModel): + ingestion_id: str + project_id: str + root_object_id: str + + +class ModelIngestionFailedInput(GraphQLBaseModel): + ingestion_id: str + project_id: str + error_reason: str + error_stacktrace: str | None + + @staticmethod + def from_exception( + ingestion_id: str, project_id: str, exception: Exception, message: str | None + ) -> "ModelIngestionFailedInput": + """test""" + return ModelIngestionFailedInput( + ingestion_id=ingestion_id, + project_id=project_id, + error_reason=message if message else str(exception), + error_stacktrace=str(exception), + ) + + +class ModelIngestionCancelledInput(GraphQLBaseModel): + ingestion_id: str + project_id: str + cancellation_message: str + + +class ModelIngestionRequestCancellationInput(GraphQLBaseModel): + ingestion_id: str + project_id: str + cancellation_message: str + + +class ModelIngestionReference(GraphQLBaseModel): + """ + `@oneOf` i.e. server expects **either** `ingestion_id` or `model_id`, but not both. + """ + + ingestion_id: str | None + model_id: str | None + + +class ProjectModelIngestionSubscriptionInput(GraphQLBaseModel): + project_id: str + ingestion_reference: ModelIngestionReference + message_type: ProjectModelIngestionUpdatedMessageType | None = None diff --git a/src/specklepy/core/api/models/current.py b/src/specklepy/core/api/models/current.py index a8fe64a..276dcb8 100644 --- a/src/specklepy/core/api/models/current.py +++ b/src/specklepy/core/api/models/current.py @@ -1,7 +1,7 @@ from datetime import datetime from typing import Generic, List, TypeVar -from specklepy.core.api.enums import ProjectVisibility +from specklepy.core.api.enums import ModelIngestionStatus, ProjectVisibility from specklepy.core.api.models.graphql_base_model import GraphQLBaseModel from specklepy.logging.exceptions import WorkspacePermissionException @@ -244,3 +244,17 @@ class FileImport(GraphQLBaseModel): class FileUploadUrl(GraphQLBaseModel): url: str file_id: str + + +class ModelIngestionStatusData(GraphQLBaseModel): + status: ModelIngestionStatus + progress_message: str | None = None + + +class ModelIngestion(GraphQLBaseModel): + id: str + created_at: datetime + updated_at: datetime + cancellation_requested: bool + model_id: str + status_data: ModelIngestionStatusData diff --git a/src/specklepy/core/api/models/subscription_messages.py b/src/specklepy/core/api/models/subscription_messages.py index 4ef2989..fa8754d 100644 --- a/src/specklepy/core/api/models/subscription_messages.py +++ b/src/specklepy/core/api/models/subscription_messages.py @@ -1,12 +1,13 @@ from typing import Optional from specklepy.core.api.enums import ( + ProjectModelIngestionUpdatedMessageType, ProjectModelsUpdatedMessageType, ProjectUpdatedMessageType, ProjectVersionsUpdatedMessageType, UserProjectsUpdatedMessageType, ) -from specklepy.core.api.models.current import Model, Project, Version +from specklepy.core.api.models.current import Model, ModelIngestion, Project, Version from specklepy.core.api.models.graphql_base_model import GraphQLBaseModel @@ -33,3 +34,8 @@ class ProjectVersionsUpdatedMessage(GraphQLBaseModel): type: ProjectVersionsUpdatedMessageType model_id: str version: Optional[Version] + + +class ProjectModelIngestionUpdatedMessage(GraphQLBaseModel): + model_ingestion: ModelIngestion + type: ProjectModelIngestionUpdatedMessageType diff --git a/src/specklepy/core/api/resources/__init__.py b/src/specklepy/core/api/resources/__init__.py index b802cd5..7a9d9e2 100644 --- a/src/specklepy/core/api/resources/__init__.py +++ b/src/specklepy/core/api/resources/__init__.py @@ -1,5 +1,8 @@ from specklepy.core.api.resources.current.active_user_resource import ActiveUserResource from specklepy.core.api.resources.current.file_import_resource import FileImportResource +from specklepy.core.api.resources.current.model_ingestion_resource import ( + ModelIngestionResource, +) from specklepy.core.api.resources.current.model_resource import ModelResource from specklepy.core.api.resources.current.other_user_resource import OtherUserResource from specklepy.core.api.resources.current.project_invite_resource import ( @@ -24,4 +27,6 @@ __all__ = [ "SubscriptionResource", "VersionResource", "WorkspaceResource", + "FileImportResource", + "ModelIngestionResource", ] diff --git a/src/specklepy/core/api/resources/current/file_import_resource.py b/src/specklepy/core/api/resources/current/file_import_resource.py index 3215ec6..eed025a 100644 --- a/src/specklepy/core/api/resources/current/file_import_resource.py +++ b/src/specklepy/core/api/resources/current/file_import_resource.py @@ -16,13 +16,15 @@ from specklepy.core.api.resource import ResourceBase from specklepy.core.api.responses import DataResponse from specklepy.logging.exceptions import SpeckleException +NAME = "file_import" + class UploadFileResponse(GraphQLBaseModel): etag: str class FileImportResource(ResourceBase): - """API Access class for project invites""" + """API Access class for file imports""" def __init__( self, @@ -36,7 +38,7 @@ class FileImportResource(ResourceBase): basepath=basepath, client=client, server_version=server_version, - name="file-import", + name=NAME, ) def finish_file_import_job(self, input: FinishFileImportInput) -> bool: diff --git a/src/specklepy/core/api/resources/current/model_ingestion_resource.py b/src/specklepy/core/api/resources/current/model_ingestion_resource.py new file mode 100644 index 0000000..6e6494d --- /dev/null +++ b/src/specklepy/core/api/resources/current/model_ingestion_resource.py @@ -0,0 +1,398 @@ +from typing import Any, Optional, Tuple + +from gql import Client, gql + +from specklepy.api.credentials import Account +from specklepy.core.api.inputs.model_ingestion_inputs import ( + ModelIngestionCancelledInput, + ModelIngestionCreateInput, + ModelIngestionFailedInput, + ModelIngestionRequestCancellationInput, + ModelIngestionRequeueInput, + ModelIngestionStartProcessingInput, + ModelIngestionSuccessInput, + ModelIngestionUpdateInput, +) +from specklepy.core.api.models.current import ( + ModelIngestion, +) +from specklepy.core.api.resource import ResourceBase +from specklepy.core.api.responses import DataResponse + +NAME = "ingestion" + + +class ModelIngestionResource(ResourceBase): + """API Access class for model ingestion""" + + def __init__( + self, + account: Account, + basepath: str, + client: Client, + server_version: Optional[Tuple[Any, ...]], + ) -> None: + super().__init__( + account=account, + basepath=basepath, + client=client, + name=NAME, + server_version=server_version, + ) + + def get_ingestion(self, project_id: str, model_id: str) -> ModelIngestion: + QUERY = gql( + """ + query Query($projectId: String!, $modelId: String!) { + data:project(id: $projectId) { + data:model(id: $modelId) { + data:ingestion { + id + createdAt + modelId + cancellationRequested + statusData { + ... on HasModelIngestionStatus { + status + } + ... on HasProgressMessage { + progressMessage + } + } + } + } + } + } + """ # noqa: E501 + ) + + variables = { + "projectId": project_id, + "modelId": model_id, + } + + return self.make_request_and_parse_response( + DataResponse[DataResponse[DataResponse[ModelIngestion]]], + QUERY, + variables, + ).data.data.data + + def create(self, input: ModelIngestionCreateInput) -> ModelIngestion: + QUERY = gql( + """ + mutation IngestionCreate($input: ModelIngestionCreateInput!) { + data: projectMutations { + data: modelIngestionMutations { + data: create(input: $input) { + id + createdAt + updatedAt + modelId + cancellationRequested + statusData { + ... on HasModelIngestionStatus { + status + } + ... on HasProgressMessage { + progressMessage + } + } + } + } + } + } + """ + ) + + variables = { + "input": input.model_dump(warnings="error", by_alias=True), + } + + return self.make_request_and_parse_response( + DataResponse[DataResponse[DataResponse[ModelIngestion]]], QUERY, variables + ).data.data.data + + def start_processing( + self, input: ModelIngestionStartProcessingInput + ) -> ModelIngestion: + QUERY = gql( + """ + mutation IngestionStartProcessing($input: ModelIngestionStartProcessingInput!) { + data: projectMutations { + data: modelIngestionMutations { + data: startProcessing(input: $input) { + id + createdAt + updatedAt + modelId + cancellationRequested + statusData { + ... on HasModelIngestionStatus { + status + } + ... on HasProgressMessage { + progressMessage + } + } + } + } + } + } + """ # noqa: E501 + ) + + variables = { + "input": input.model_dump(warnings="error", by_alias=True), + } + + return self.make_request_and_parse_response( + DataResponse[DataResponse[DataResponse[ModelIngestion]]], QUERY, variables + ).data.data.data + + def requeue(self, input: ModelIngestionRequeueInput) -> ModelIngestion: + QUERY = gql( + """ + mutation IngestionRequeue($input: ModelIngestionRequeueInput!) { + data: projectMutations { + data: modelIngestionMutations { + data: requeue(input: $input) { + id + createdAt + updatedAt + modelId + cancellationRequested + statusData { + ... on HasModelIngestionStatus { + status + } + ... on HasProgressMessage { + progressMessage + } + } + } + } + } + } + """ # noqa: E501 + ) + + variables = { + "input": input.model_dump(warnings="error", by_alias=True), + } + + return self.make_request_and_parse_response( + DataResponse[DataResponse[DataResponse[ModelIngestion]]], QUERY, variables + ).data.data.data + + def update_progress(self, input: ModelIngestionUpdateInput) -> ModelIngestion: + QUERY = gql( + """ + mutation IngestionUpdateProgress( + $input: ModelIngestionUpdateInput! + ) { + data: projectMutations { + data: modelIngestionMutations { + data: updateProgress(input: $input) { + id + createdAt + updatedAt + modelId + cancellationRequested + statusData { + ... on HasModelIngestionStatus { + status + } + ... on HasProgressMessage { + progressMessage + } + } + } + } + } + } + """ + ) + + variables = { + "input": input.model_dump(warnings="error", by_alias=True), + } + + return self.make_request_and_parse_response( + DataResponse[DataResponse[DataResponse[ModelIngestion]]], QUERY, variables + ).data.data.data + + def complete(self, input: ModelIngestionSuccessInput) -> str: + """ + Request that the server completes the ingestion by creating a version + If successful, the job will be in a terminal "successful" state. + + For failed Ingestions, use `fail_with_error` instead + For user cancellation, use `fail_with_cancelled` instead + + Arguments: + input {ModelIngestionSuccessInput} -- input variable + + Returns: + str -- the id of the version that was just created to complete the ingestion + """ + QUERY = gql( + """ + mutation IngestionComplete($input: ModelIngestionSuccessInput!) { + data: projectMutations { + data: modelIngestionMutations { + data: completeWithVersion(input: $input) { + data:statusData { + ... on ModelIngestionSuccessStatus { + data:versionId + } + } + } + } + } + } + """ + ) + + variables = { + "input": input.model_dump(warnings="error", by_alias=True), + } + + return self.make_request_and_parse_response( + DataResponse[DataResponse[DataResponse[DataResponse[DataResponse[str]]]]], + QUERY, + variables, + ).data.data.data.data.data + + def fail_with_error(self, input: ModelIngestionFailedInput) -> ModelIngestion: + """ + Fail the job with an error. + For user requested cancellation, use `fail_with_cancelled` instead + """ + QUERY = gql( + """ + mutation IngestionFailWithError($input: ModelIngestionFailedInput!) { + data: projectMutations { + data: modelIngestionMutations { + data: failWithError(input: $input) { + id + createdAt + updatedAt + modelId + cancellationRequested + statusData { + ... on HasModelIngestionStatus { + status + } + ... on HasProgressMessage { + progressMessage + } + } + } + } + } + } + """ + ) + + variables = { + "input": input.model_dump(warnings="error", by_alias=True), + } + + return self.make_request_and_parse_response( + DataResponse[DataResponse[DataResponse[ModelIngestion]]], + QUERY, + variables, + ).data.data.data + + def fail_with_cancel(self, input: ModelIngestionCancelledInput) -> ModelIngestion: + """ + Fail the ingestion with a `cancelled` status. + This should only be done if the user has explicitly requested cancellation + Other forms of cancellation use `fail_with_error` + The ingestion should then enter a terminal "canceled" state + """ + QUERY = gql( + """ + mutation IngestionFailWithCancel($input: ModelIngestionCancelledInput!) { + data: projectMutations { + data: modelIngestionMutations { + data: failWithCancel(input: $input) { + id + createdAt + updatedAt + modelId + cancellationRequested + statusData { + ... on HasModelIngestionStatus { + status + } + ... on HasProgressMessage { + progressMessage + } + } + } + } + } + } + """ + ) + + variables = { + "input": input.model_dump(warnings="error", by_alias=True), + } + + return self.make_request_and_parse_response( + DataResponse[DataResponse[DataResponse[ModelIngestion]]], + QUERY, + variables, + ).data.data.data + + def request_cancellation( + self, input: ModelIngestionRequestCancellationInput + ) -> ModelIngestion: + """ + Request that the ingestion is canceled. + + Note: simply calling this mutation does not immediately cancel, + it doesn't even guarantee it will be canceled at all. + It's up to the client to observe this cancellation request + via `subscription.project_model_ingestion_cancellation_requested` + and report it as cancelled (via `ingestion.fail_with_cancel` + + See "cooperative cancellation pattern" + """ + QUERY = gql( + """ + mutation IngestionRequestCancellation($input: ModelIngestionRequestCancellationInput!) { + data: projectMutations { + data: modelIngestionMutations { + data: requestCancellation (input: $input) { + id + createdAt + updatedAt + modelId + cancellationRequested + statusData { + ... on HasModelIngestionStatus { + status + } + ... on HasProgressMessage { + progressMessage + } + } + } + } + } + } + """ # noqa: E501 + ) + + variables = { + "input": input.model_dump(warnings="error", by_alias=True), + } + + return self.make_request_and_parse_response( + DataResponse[DataResponse[DataResponse[ModelIngestion]]], + QUERY, + variables, + ).data.data.data diff --git a/src/specklepy/core/api/resources/current/subscription_resource.py b/src/specklepy/core/api/resources/current/subscription_resource.py index da80b1d..c65486e 100644 --- a/src/specklepy/core/api/resources/current/subscription_resource.py +++ b/src/specklepy/core/api/resources/current/subscription_resource.py @@ -6,17 +6,25 @@ from graphql import DocumentNode from pydantic import BaseModel from typing_extensions import TypeVar +from specklepy.core.api.enums import ProjectModelIngestionUpdatedMessageType +from specklepy.core.api.inputs.model_ingestion_inputs import ( + ModelIngestionReference, + ProjectModelIngestionSubscriptionInput, +) from specklepy.core.api.models import ( ProjectModelsUpdatedMessage, ProjectUpdatedMessage, ProjectVersionsUpdatedMessage, UserProjectsUpdatedMessage, ) +from specklepy.core.api.models.subscription_messages import ( + ProjectModelIngestionUpdatedMessage, +) from specklepy.core.api.resource import ResourceBase from specklepy.core.api.responses import DataResponse from specklepy.logging.exceptions import SpeckleException -NAME = "subscribe" +NAME = "subscription" TEventArgs = TypeVar("TEventArgs", bound=BaseModel) @@ -202,6 +210,66 @@ class SubscriptionResource(ResourceBase): callback=lambda d: callback(d.data), ) + async def project_model_ingestion_updated( + self, + callback: Callable[[ProjectModelIngestionUpdatedMessage], None], + input: ProjectModelIngestionSubscriptionInput, + ) -> None: + QUERY = gql( + """ + subscription IngestionUpdated($input: ProjectModelIngestionSubscriptionInput!) { + data: projectModelIngestionUpdated(input: $input) { + modelIngestion { + id + createdAt + updatedAt + modelId + cancellationRequested + statusData { + ... on HasModelIngestionStatus { + status + } + ... on HasProgressMessage { + progressMessage + } + } + } + type + } + } + """ # noqa: E501 + ) + + variables = { + "input": input.model_dump( + warnings="error", by_alias=True, exclude_none=True + ), + } + + await self.subscribe_2( + DataResponse[ProjectModelIngestionUpdatedMessage], + QUERY, + variables, + callback=lambda d: callback(d.data), + ) + + async def project_model_ingestion_cancellation_requested( + self, + callback: Callable[[ProjectModelIngestionUpdatedMessage], None], + project_id: str, + ingestion_id: str, + ) -> None: + await self.project_model_ingestion_updated( + callback, + ProjectModelIngestionSubscriptionInput( + project_id=project_id, + ingestion_reference=ModelIngestionReference( + ingestion_id=ingestion_id, model_id=None + ), + message_type=ProjectModelIngestionUpdatedMessageType.CANCELLATION_REQUESTED, + ), + ) + @check_wsclient async def subscribe_2( self, diff --git a/src/specklepy/core/api/resources/current/version_resource.py b/src/specklepy/core/api/resources/current/version_resource.py index d6f04b9..63e7625 100644 --- a/src/specklepy/core/api/resources/current/version_resource.py +++ b/src/specklepy/core/api/resources/current/version_resource.py @@ -14,7 +14,7 @@ from specklepy.core.api.models import ResourceCollection, Version from specklepy.core.api.resource import ResourceBase from specklepy.core.api.responses import DataResponse -NAME = "model" +NAME = "version" class VersionResource(ResourceBase): diff --git a/src/specklepy/core/api/resources/current/workspace_resource.py b/src/specklepy/core/api/resources/current/workspace_resource.py index 1317a04..9845dac 100644 --- a/src/specklepy/core/api/resources/current/workspace_resource.py +++ b/src/specklepy/core/api/resources/current/workspace_resource.py @@ -16,7 +16,7 @@ NAME = "workspace" class WorkspaceResource(ResourceBase): - """API Access class for models""" + """API Access class for workspaces""" def __init__(self, account, basepath, client, server_version) -> None: super().__init__( diff --git a/tests/integration/client/current/test_model_ingestion_resource.py b/tests/integration/client/current/test_model_ingestion_resource.py new file mode 100644 index 0000000..2699254 --- /dev/null +++ b/tests/integration/client/current/test_model_ingestion_resource.py @@ -0,0 +1,268 @@ +from datetime import datetime + +import pytest + +from specklepy.api import operations +from specklepy.api.client import SpeckleClient +from specklepy.core.api.enums import ModelIngestionStatus +from specklepy.core.api.inputs.model_ingestion_inputs import ( + ModelIngestionCancelledInput, + ModelIngestionCreateInput, + ModelIngestionFailedInput, + ModelIngestionRequeueInput, + ModelIngestionStartProcessingInput, + ModelIngestionSuccessInput, + ModelIngestionUpdateInput, + SourceDataInput, +) +from specklepy.core.api.inputs.model_inputs import CreateModelInput +from specklepy.core.api.inputs.project_inputs import ProjectCreateInput +from specklepy.core.api.models.current import ( + Model, + ModelIngestion, + ModelIngestionStatusData, + Project, + ProjectVisibility, + Version, +) +from specklepy.logging.exceptions import GraphQLException +from specklepy.objects.base import Base +from specklepy.transports.server.server import ServerTransport +from tests.integration.conftest import is_public + + +@pytest.mark.run() +@pytest.mark.skipif(is_public(), reason="The public API does not support these tests") +class TestIngestionResource: + @pytest.fixture + def project(self, client: SpeckleClient): + return client.project.create( + ProjectCreateInput( + name="test", description=None, visibility=ProjectVisibility.PUBLIC + ) + ) + + @pytest.fixture + def model(self, client: SpeckleClient, project: Project): + return client.model.create( + CreateModelInput(name="test", description=None, project_id=project.id) + ) + + @pytest.fixture() + def ingestion( + self, client: SpeckleClient, model: Model, project: Project + ) -> ModelIngestion: + input = ModelIngestionCreateInput( + model_id=model.id, + project_id=project.id, + progress_message="Starting processing", + source_data=SourceDataInput( + source_application_slug="pytest", + source_application_version="0.0.0", + file_name=None, + file_size_bytes=None, + ), + ) + + ingestion = client.model_ingestion.create(input) + assert isinstance(ingestion, ModelIngestion) + assert isinstance(ingestion.id, str) + assert isinstance(ingestion.created_at, datetime) + assert isinstance(ingestion.updated_at, datetime) + assert isinstance(ingestion.cancellation_requested, bool) + assert isinstance(ingestion.model_id, str) + assert isinstance(ingestion.status_data, ModelIngestionStatusData) + assert isinstance(ingestion.status_data.progress_message, str | None) + assert ingestion.status_data.status == ModelIngestionStatus.PROCESSING + assert not ingestion.cancellation_requested + assert ingestion.model_id == model.id + + return ingestion + + def test_update_progress( + self, client: SpeckleClient, ingestion: ModelIngestion, project: Project + ): + def update(progress: float | None, message: str): + input = ModelIngestionUpdateInput( + ingestion_id=ingestion.id, + project_id=project.id, + progress=progress, + progress_message=message, + ) + res = client.model_ingestion.update_progress(input) + + assert isinstance(res, ModelIngestion) + assert res.status_data.progress_message == message + assert not res.cancellation_requested + assert res.status_data.status == ModelIngestionStatus.PROCESSING + + update(None, "None") + update(0.1, "0.1") + update(0.5, "Whoa-oh! We're half way there!") + update(1, "Finished") + update(0.2, "Back to processing again") + + def test_start_processing_and_requeue( + self, client: SpeckleClient, ingestion: ModelIngestion, project: Project + ): + # just setting the baseline state + assert ingestion.status_data.status == ModelIngestionStatus.PROCESSING + + def requeue(message: str): + input = ModelIngestionRequeueInput( + project_id=project.id, + ingestion_id=ingestion.id, + progress_message=message, + ) + + res = client.model_ingestion.requeue(input) + + assert isinstance(res, ModelIngestion) + assert res.status_data.progress_message == message + assert not res.cancellation_requested + assert res.status_data.status == ModelIngestionStatus.QUEUED + + def start_processing(message: str): + input = ModelIngestionStartProcessingInput( + ingestion_id=ingestion.id, + project_id=project.id, + progress_message=message, + source_data=SourceDataInput( + source_application_slug="test", + source_application_version="test", + file_name="test", + file_size_bytes=1, + ), + ) + res = client.model_ingestion.start_processing(input) + + assert isinstance(res, ModelIngestion) + assert res.status_data.progress_message == message + assert not res.cancellation_requested + assert res.status_data.status == ModelIngestionStatus.PROCESSING + + requeue("put it back in there") + start_processing("go for it") + requeue("and again") + start_processing("run forest run") + + def update(progress: float | None, message: str): + input = ModelIngestionUpdateInput( + ingestion_id=ingestion.id, + project_id=project.id, + progress=progress, + progress_message=message, + ) + res = client.model_ingestion.update_progress(input) + + assert isinstance(res, ModelIngestion) + assert res.status_data.progress_message == message + assert not res.cancellation_requested + assert res.status_data.status == ModelIngestionStatus.PROCESSING + + update(None, "None") + update(0.1, "0.1") + update(0.5, "Whoa-oh! We're half way there!") + update(1, "Finished") + update(0.2, "Back to processing again") + + def test_error( + self, client: SpeckleClient, ingestion: ModelIngestion, project: Project + ): + input = ModelIngestionFailedInput( + ingestion_id=ingestion.id, + project_id=project.id, + error_reason="Failed to integration test an error", + error_stacktrace="over here in test_error", + ) + + res = client.model_ingestion.fail_with_error(input) + + assert isinstance(res, ModelIngestion) + assert res.status_data.progress_message is None + assert not res.cancellation_requested + assert res.status_data.status == ModelIngestionStatus.FAILED + + # trying to fail for a second time should throw + # with pytest.raises(GraphQLException): + # _ = client.ingestion.fail_with_error(input) + + def test_complete( + self, client: SpeckleClient, ingestion: ModelIngestion, project: Project + ): + remote = ServerTransport(project.id, client) + object_id = operations.send( + Base(applicationId="ASDFGHJKL"), [remote], use_default_cache=False + ) + input = ModelIngestionSuccessInput( + ingestion_id=ingestion.id, + root_object_id=object_id, + project_id=project.id, + # version_message=None, + ) + + res = client.model_ingestion.complete(input) + + assert isinstance(res, str) + version = client.version.get(res, project.id) + assert isinstance(version, Version) + + # trying to complete for a second time should throw + # with pytest.raises(GraphQLException): + # _ = client.ingestion.complete(input) + + def test_cancel( + self, client: SpeckleClient, ingestion: ModelIngestion, project: Project + ): + input = ModelIngestionCancelledInput( + ingestion_id=ingestion.id, + project_id=project.id, + cancellation_message="This was cancelled for testing purposes", + ) + res = client.model_ingestion.fail_with_cancel(input) + assert isinstance(res, ModelIngestion) + assert res.status_data.progress_message is None + assert not res.cancellation_requested + assert res.status_data.status == ModelIngestionStatus.CANCELLED + + # Cancel again, should be idempotent + res = client.model_ingestion.fail_with_cancel(input) + assert res.status_data.progress_message is None + assert not res.cancellation_requested + assert res.status_data.status == ModelIngestionStatus.CANCELLED + + def test_error_non_existent_ingestion( + self, client: SpeckleClient, project: Project + ): + input = ModelIngestionFailedInput( + ingestion_id="Non-existent-ingestion", + project_id=project.id, + error_reason="Failed to integration test an error", + error_stacktrace="over here in test_error", + ) + with pytest.raises(GraphQLException): + _ = client.model_ingestion.fail_with_error(input) + + def test_complete_failed_non_existent_ingestion( + self, client: SpeckleClient, project: Project + ): + input = ModelIngestionFailedInput( + ingestion_id="Non-existent-ingestion", + project_id=project.id, + error_reason="Failed to integration test an error", + error_stacktrace="over here in test_error", + ) + with pytest.raises(GraphQLException): + _ = client.model_ingestion.fail_with_error(input) + + def test_complete_non_existent_root_object( + self, client: SpeckleClient, ingestion: ModelIngestion, project: Project + ): + input = ModelIngestionSuccessInput( + ingestion_id=ingestion.id, + root_object_id="asdfasdfasdfasfd", + project_id=project.id, + # version_message=None, + ) + with pytest.raises(GraphQLException): + _ = client.model_ingestion.complete(input) diff --git a/tests/integration/client/current/test_subscription_resource.py b/tests/integration/client/current/test_subscription_resource.py index 3e3565a..40ee0a0 100644 --- a/tests/integration/client/current/test_subscription_resource.py +++ b/tests/integration/client/current/test_subscription_resource.py @@ -1,15 +1,26 @@ import asyncio -from typing import Dict, Optional +from sys import platform +from typing import Dict import pytest from specklepy.api.client import SpeckleClient from specklepy.core.api.enums import ( + ModelIngestionStatus, + ProjectModelIngestionUpdatedMessageType, ProjectModelsUpdatedMessageType, ProjectUpdatedMessageType, ProjectVersionsUpdatedMessageType, UserProjectsUpdatedMessageType, ) +from specklepy.core.api.inputs.model_ingestion_inputs import ( + ModelIngestionCreateInput, + ModelIngestionReference, + ModelIngestionRequestCancellationInput, + ModelIngestionUpdateInput, + ProjectModelIngestionSubscriptionInput, + SourceDataInput, +) from specklepy.core.api.inputs.model_inputs import CreateModelInput from specklepy.core.api.inputs.project_inputs import ( ProjectCreateInput, @@ -24,9 +35,16 @@ from specklepy.core.api.models import ( UserProjectsUpdatedMessage, Version, ) -from tests.integration.conftest import create_client, create_version +from specklepy.core.api.models.current import ModelIngestion +from specklepy.core.api.models.subscription_messages import ( + ProjectModelIngestionUpdatedMessage, +) +from tests.integration.conftest import create_client, create_version, is_public -WAIT_PERIOD = 0.4 # time in seconds +# WSL is slow AF, so for local runs, we're being extra generous +# For CI runs on linux,m a much smaller wait time is acceptable +SETUP_TIME_SECONDS = 1 if platform == "linux" else 4 +MAX_WAIT_TIME_SECONDS = 0.75 if platform == "linux" else 5 @pytest.mark.run() @@ -55,48 +73,68 @@ class TestSubscriptionResource: ) return model1 + @pytest.fixture + def test_model_ingestion( + self, + subscription_client: SpeckleClient, + test_project: Project, + test_model: Model, + ) -> ModelIngestion: + project = subscription_client.model_ingestion.create( + ModelIngestionCreateInput( + project_id=test_project.id, + model_id=test_model.id, + progress_message="", + source_data=SourceDataInput( + source_application_slug="pytest", + source_application_version="0.0.0", + file_name=None, + file_size_bytes=None, + ), + ) + ) + return project + @pytest.mark.asyncio async def test_user_projects_updated( self, subscription_client: SpeckleClient, ) -> None: - message: Optional[UserProjectsUpdatedMessage] = None - - task = None + loop = asyncio.get_running_loop() + future: asyncio.Future[UserProjectsUpdatedMessage] = loop.create_future() def callback(d: UserProjectsUpdatedMessage): - nonlocal message - message = d + nonlocal future + future.set_result(d) task = asyncio.create_task( subscription_client.subscription.user_projects_updated(callback) ) - await asyncio.sleep(WAIT_PERIOD) # Give time to subscription to be setup + await asyncio.sleep(SETUP_TIME_SECONDS) # Give time to subscription to be setup input = ProjectCreateInput(name=None, description=None, visibility=None) created = subscription_client.project.create(input) - await asyncio.sleep(WAIT_PERIOD) # Give time for subscription to be triggered + message = await asyncio.wait_for(future, timeout=MAX_WAIT_TIME_SECONDS) assert isinstance(message, UserProjectsUpdatedMessage) assert message.id == created.id assert message.type == UserProjectsUpdatedMessageType.ADDED assert isinstance(message.project, Project) - task.cancel() - await task + if not task.cancel(): + await task @pytest.mark.asyncio async def test_project_models_updated( self, subscription_client: SpeckleClient, test_project: Project ) -> None: - message: Optional[ProjectModelsUpdatedMessage] = None - - task = None + loop = asyncio.get_running_loop() + future: asyncio.Future[ProjectModelsUpdatedMessage] = loop.create_future() def callback(d: ProjectModelsUpdatedMessage): - nonlocal message - message = d + nonlocal future + future.set_result(d) task = asyncio.create_task( subscription_client.subscription.project_models_updated( @@ -104,51 +142,53 @@ class TestSubscriptionResource: ) ) - await asyncio.sleep(WAIT_PERIOD) # Give time to subscription to be setup + await asyncio.sleep(SETUP_TIME_SECONDS) # Give time to subscription to be setup input = CreateModelInput( name="my model", description="myDescription", project_id=test_project.id ) created = subscription_client.model.create(input) - await asyncio.sleep(WAIT_PERIOD) # Give time for subscription to be triggered + message = await asyncio.wait_for(future, timeout=MAX_WAIT_TIME_SECONDS) assert isinstance(message, ProjectModelsUpdatedMessage) assert message.id == created.id assert message.type == ProjectModelsUpdatedMessageType.CREATED assert isinstance(message.model, Model) - task.cancel() - await task + + if not task.cancel(): + await task @pytest.mark.asyncio async def test_project_updated( self, subscription_client: SpeckleClient, test_project: Project ) -> None: - message: Optional[ProjectUpdatedMessage] = None - - task = None + loop = asyncio.get_running_loop() + future: asyncio.Future[ProjectUpdatedMessage] = loop.create_future() def callback(d: ProjectUpdatedMessage): - nonlocal message - message = d + nonlocal future + future.set_result(d) task = asyncio.create_task( subscription_client.subscription.project_updated(callback, test_project.id) ) - await asyncio.sleep(WAIT_PERIOD) # Give time to subscription to be setup + await asyncio.sleep( + SETUP_TIME_SECONDS + ) # Give time for subscription to be triggered input = ProjectUpdateInput(id=test_project.id, name="This is my new name") created = subscription_client.project.update(input) - await asyncio.sleep(WAIT_PERIOD) # Give time for subscription to be triggered + message = await asyncio.wait_for(future, timeout=MAX_WAIT_TIME_SECONDS) assert isinstance(message, ProjectUpdatedMessage) assert message.id == created.id assert message.type == ProjectUpdatedMessageType.UPDATED assert isinstance(message.project, Project) - task.cancel() - await task + if not task.cancel(): + await task @pytest.mark.asyncio async def test_project_versions_updated( @@ -157,13 +197,12 @@ class TestSubscriptionResource: test_project: Project, test_model: Model, ) -> None: - message: Optional[ProjectVersionsUpdatedMessage] = None - - task = None + loop = asyncio.get_running_loop() + future: asyncio.Future[ProjectVersionsUpdatedMessage] = loop.create_future() def callback(d: ProjectVersionsUpdatedMessage): - nonlocal message - message = d + nonlocal future + future.set_result(d) task = asyncio.create_task( subscription_client.subscription.project_versions_updated( @@ -171,15 +210,181 @@ class TestSubscriptionResource: ) ) - await asyncio.sleep(WAIT_PERIOD) # Give time to subscription to be setup + await asyncio.sleep(SETUP_TIME_SECONDS) # Give time to subscription to be setup created = create_version(subscription_client, test_project.id, test_model.id) - await asyncio.sleep(WAIT_PERIOD) # Give time for subscription to be triggered + message = await asyncio.wait_for(future, timeout=MAX_WAIT_TIME_SECONDS) assert isinstance(message, ProjectVersionsUpdatedMessage) assert message.id == created.id assert message.type == ProjectVersionsUpdatedMessageType.CREATED assert isinstance(message.version, Version) - task.cancel() - await task + if not task.cancel(): + await task + + @pytest.mark.asyncio + @pytest.mark.skipif( + is_public(), reason="The public API does not support these tests" + ) + async def test_project_model_ingestion_cancellation( + self, + subscription_client: SpeckleClient, + test_project: Project, + test_model_ingestion: ModelIngestion, + ) -> None: + assert not test_model_ingestion.cancellation_requested + + loop = asyncio.get_running_loop() + future: asyncio.Future[ProjectModelIngestionUpdatedMessage] = ( + loop.create_future() + ) + + def callback(d: ProjectModelIngestionUpdatedMessage): + nonlocal future + future.set_result(d) + + task = asyncio.create_task( + subscription_client.subscription.project_model_ingestion_cancellation_requested( + callback, test_project.id, ingestion_id=test_model_ingestion.id + ) + ) + + await asyncio.sleep(SETUP_TIME_SECONDS) # Give time to subscription to be setup + + cancellation_request = ModelIngestionRequestCancellationInput( + ingestion_id=test_model_ingestion.id, + project_id=test_project.id, + cancellation_message="Please cancel", + ) + created = subscription_client.model_ingestion.request_cancellation( + cancellation_request + ) + assert created.id == test_model_ingestion.id + assert created.cancellation_requested + assert created.status_data.status == ModelIngestionStatus.PROCESSING + + message = await asyncio.wait_for(future, timeout=MAX_WAIT_TIME_SECONDS) + + assert isinstance(message, ProjectModelIngestionUpdatedMessage) + assert message.model_ingestion.id == created.id + assert message.model_ingestion.cancellation_requested + assert ( + message.type + == ProjectModelIngestionUpdatedMessageType.CANCELLATION_REQUESTED + ) + assert created.status_data.status == ModelIngestionStatus.PROCESSING + if not task.cancel(): + await task + + @pytest.mark.asyncio + @pytest.mark.skipif( + is_public(), reason="The public API does not support these tests" + ) + async def test_project_model_ingestion_cancellation_isnt_triggered_by_updates( + self, + subscription_client: SpeckleClient, + test_project: Project, + test_model_ingestion: ModelIngestion, + ) -> None: + assert not test_model_ingestion.cancellation_requested + + loop = asyncio.get_running_loop() + future: asyncio.Future[ProjectModelIngestionUpdatedMessage] = ( + loop.create_future() + ) + + def callback(d: ProjectModelIngestionUpdatedMessage): + nonlocal future + future.set_result(d) + + task = asyncio.create_task( + subscription_client.subscription.project_model_ingestion_cancellation_requested( + callback, test_project.id, ingestion_id=test_model_ingestion.id + ) + ) + + await asyncio.sleep(SETUP_TIME_SECONDS) # Give time to subscription to be setup + + cancellation_request = ModelIngestionUpdateInput( + ingestion_id=test_model_ingestion.id, + project_id=test_project.id, + progress=None, + progress_message="this is just an ordinary update", + ) + created = subscription_client.model_ingestion.update_progress( + cancellation_request + ) + assert created.id == test_model_ingestion.id + assert not created.cancellation_requested + assert created.status_data.status == ModelIngestionStatus.PROCESSING + + await asyncio.sleep(MAX_WAIT_TIME_SECONDS) + + assert ( + not future.done() + ) # make sure the sub did not call back and resolve the future + + if not task.cancel(): + await task + + @pytest.mark.asyncio + @pytest.mark.skipif( + is_public(), reason="The public API does not support these tests" + ) + async def test_project_model_ingestion_updates( + self, + subscription_client: SpeckleClient, + test_project: Project, + test_model_ingestion: ModelIngestion, + ) -> None: + assert not test_model_ingestion.cancellation_requested + + loop = asyncio.get_running_loop() + future: asyncio.Future[ProjectModelIngestionUpdatedMessage] = ( + loop.create_future() + ) + + def callback(d: ProjectModelIngestionUpdatedMessage): + nonlocal future + future.set_result(d) + + task = asyncio.create_task( + subscription_client.subscription.project_model_ingestion_updated( + callback, + input=ProjectModelIngestionSubscriptionInput( + project_id=test_project.id, + ingestion_reference=ModelIngestionReference( + ingestion_id=test_model_ingestion.id, model_id=None + ), + ), + # ingestion_id=test_model_ingestion.id, + ) + ) + + await asyncio.sleep(SETUP_TIME_SECONDS) # Give time to subscription to be setup + + progress_message = "this is just an ordinary update" + cancellation_request = ModelIngestionUpdateInput( + ingestion_id=test_model_ingestion.id, + project_id=test_project.id, + progress=None, + progress_message=progress_message, + ) + created = subscription_client.model_ingestion.update_progress( + cancellation_request + ) + assert created.id == test_model_ingestion.id + assert not created.cancellation_requested + assert created.status_data.status == ModelIngestionStatus.PROCESSING + + message = await asyncio.wait_for(future, timeout=MAX_WAIT_TIME_SECONDS) + + assert isinstance(message, ProjectModelIngestionUpdatedMessage) + assert message.model_ingestion.id == created.id + assert not message.model_ingestion.cancellation_requested + assert message.type == ProjectModelIngestionUpdatedMessageType.UPDATED + assert message.model_ingestion.status_data.progress_message == progress_message + assert created.status_data.status == ModelIngestionStatus.PROCESSING + if not task.cancel(): + await task diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index ccc8f1e..b5c6e1c 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,3 +1,4 @@ +import os import random import uuid from typing import Dict @@ -29,6 +30,10 @@ def host() -> str: return "localhost:3000" +def is_public() -> bool: + return os.getenv("IS_PUBLIC", "false").lower() == "true" + + def seed_user(host: str) -> Dict[str, str]: seed = uuid.uuid4().hex user_dict = { diff --git a/uv.lock b/uv.lock index efd45a2..a412974 100644 --- a/uv.lock +++ b/uv.lock @@ -526,7 +526,7 @@ name = "exceptiongroup" version = "1.3.0" source = { registry = "https://pypi.org/simple/" } dependencies = [ - { name = "typing-extensions", marker = "python_full_version < '3.13'" }, + { name = "typing-extensions", marker = "python_full_version < '3.11'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/0b/9f/a65090624ecf468cdca03533906e7c69ed7588582240cfe7cc9e770b50eb/exceptiongroup-1.3.0.tar.gz", hash = "sha256:b241f5885f560bc56a59ee63ca4c6a8bfa46ae4ad651af316d4e81817bb9fd88", size = 29749, upload-time = "2025-05-10T17:42:51.123Z" } wheels = [