Jedd/cxpla 340 specklepy (#475)
Publish Python Package / test (push) Has been cancelled
Publish Python Package / Build and Publish Python Package (push) Has been cancelled

* First pass

* add tests

* Add cancellation

* fix

* status changes

* fixes

* test fixes

* tests(subscriptions): fix model ingestion tests

* feat(modelingestion): rename resource and add some more tests

* feat(ifcimport): use new modelingestion api

* feat: wrap up new ingestion

* fix: model ingestion payload and test server url

* fix: test port was 3000

* fix: remove version message from model ingestion success input

* fix: test subs cancelled

* ci: signal public of private envv in ci

---------

Co-authored-by: Jedd Morgan <45512892+JR-Morgan@users.noreply.github.com>
This commit is contained in:
Gergő Jedlicska
2025-12-08 13:25:54 +01:00
committed by GitHub
parent 7c108a9d43
commit 8249cd2184
25 changed files with 1305 additions and 85 deletions
+2
View File
@@ -59,6 +59,8 @@ jobs:
test-public: # Run integration tests against the public server image test-public: # Run integration tests against the public server image
name: Test (public) name: Test (public)
runs-on: ubuntu-latest runs-on: ubuntu-latest
env:
IS_PUBLIC: "true"
strategy: strategy:
matrix: matrix:
python-version: python-version:
+1
View File
@@ -0,0 +1 @@
words = ["specklepy"]
+8 -1
View File
@@ -8,7 +8,7 @@ python.uv_venv_auto = true
[tasks.install] [tasks.install]
run= "uv sync --all-groups" run= "uv sync --all-extras --all-groups"
[tasks.install_docs] [tasks.install_docs]
@@ -18,3 +18,10 @@ run= "uv sync --group docs"
description = "Build static docs " description = "Build static docs "
run = "uv run mkdocs build" run = "uv run mkdocs build"
depends = ['install_docs'] depends = ['install_docs']
[tasks.test]
run = "uv run pytest"
[env]
IS_PUBLIC = "false"
+1
View File
@@ -22,6 +22,7 @@ dependencies = [
speckleifc = ["ifcopenshell>=0.8.3.post2"] speckleifc = ["ifcopenshell>=0.8.3.post2"]
[dependency-groups] [dependency-groups]
dev = [ dev = [
"commitizen>=4.1.0", "commitizen>=4.1.0",
"devtools>=0.12.2", "devtools>=0.12.2",
+6 -3
View File
@@ -18,7 +18,7 @@ def cmd_line_import() -> None:
parser.add_argument("output_path") parser.add_argument("output_path")
parser.add_argument("project_id") parser.add_argument("project_id")
parser.add_argument("version_message") parser.add_argument("version_message")
parser.add_argument("model_id") parser.add_argument("model_ingestion_id")
# parser.add_argument("model_name") # parser.add_argument("model_name")
# parser.add_argument("region_name") # parser.add_argument("region_name")
@@ -32,6 +32,8 @@ def cmd_line_import() -> None:
"ifc", "ifc",
) )
client: SpeckleClient | None = None
try: try:
client = SpeckleClient(SERVER_URL, use_ssl=not SERVER_URL.startswith("http://")) client = SpeckleClient(SERVER_URL, use_ssl=not SERVER_URL.startswith("http://"))
client.authenticate_with_token(TOKEN) client.authenticate_with_token(TOKEN)
@@ -41,13 +43,14 @@ def cmd_line_import() -> None:
args.file_path, args.file_path,
project, project,
args.version_message, args.version_message,
args.model_id, args.model_ingestion_id,
client, client,
) )
with open(args.output_path, "w") as f: with open(args.output_path, "w") as f:
json.dump({"success": True, "commitId": version.id}, f) json.dump({"success": True, "commitId": version.id}, f)
except Exception as e: except Exception as e:
error_msg = f"IFC Importer failed with exception:\n{traceback.format_exc()}" stack_trace = traceback.format_exc()
error_msg = f"IFC Importer failed with exception:\n{stack_trace}"
print(error_msg) print(error_msg)
# Write error result # Write error result
+93 -33
View File
@@ -1,9 +1,19 @@
import contextlib
import importlib.metadata
import time import time
import traceback
from pathlib import Path
from speckleifc.ifc_geometry_processing import open_ifc from speckleifc.ifc_geometry_processing import open_ifc
from speckleifc.importer import ImportJob from speckleifc.importer import ImportJob
from specklepy.core.api.client import SpeckleClient from specklepy.core.api.client import SpeckleClient
from specklepy.core.api.inputs.version_inputs import CreateVersionInput from specklepy.core.api.inputs.model_ingestion_inputs import (
ModelIngestionFailedInput,
ModelIngestionStartProcessingInput,
ModelIngestionSuccessInput,
ModelIngestionUpdateInput,
SourceDataInput,
)
from specklepy.core.api.models.current import Project, Version from specklepy.core.api.models.current import Project, Version
from specklepy.core.api.operations import send from specklepy.core.api.operations import send
from specklepy.logging import metrics from specklepy.logging import metrics
@@ -14,48 +24,98 @@ def open_and_convert_file(
file_path: str, file_path: str,
project: Project, project: Project,
version_message: str | None, version_message: str | None,
model_id: str, model_ingestion_id: str,
client: SpeckleClient, client: SpeckleClient,
) -> Version: ) -> Version:
start = time.time() try:
very_start = start start = time.time()
very_start = start
path = Path(file_path)
account = client.account specklepy_version = importlib.metadata.version("specklepy")
server_url = account.serverInfo.url client.model_ingestion.start_processing(
assert server_url ModelIngestionStartProcessingInput(
remote_transport = ServerTransport(project.id, account=account) project_id=project.id,
ingestion_id=model_ingestion_id,
progress_message="Importing IFC file",
source_data=SourceDataInput(
file_name=path.name,
file_size_bytes=path.stat().st_size,
source_application_slug="fileimports-ifc",
source_application_version=specklepy_version,
),
)
)
ifc_file = open_ifc(file_path) # pyright: ignore[reportUnknownVariableType] account = client.account
server_url = account.serverInfo.url
assert server_url
remote_transport = ServerTransport(project.id, account=account)
import_job = ImportJob(ifc_file) # pyright: ignore[reportUnknownArgumentType] ifc_file = open_ifc(file_path) # pyright: ignore[reportUnknownVariableType]
data = import_job.convert()
print(f"File conversion complete after {(time.time() - start) * 1000}ms") client.model_ingestion.update_progress(
ModelIngestionUpdateInput(
project_id=project.id,
ingestion_id=model_ingestion_id,
progress_message="File validated, converting",
progress=None,
)
)
import_job = ImportJob(ifc_file) # pyright: ignore[reportUnknownArgumentType]
data = import_job.convert()
start = time.time() print(f"File conversion complete after {(time.time() - start) * 1000}ms")
root_id = send(data, transports=[remote_transport], use_default_cache=False) start = time.time()
print(f"Sending to speckle complete after: {(time.time() - start) * 1000}ms")
start = time.time() client.model_ingestion.update_progress(
ModelIngestionUpdateInput(
project_id=project.id,
ingestion_id=model_ingestion_id,
progress_message="Conversion complete, sending",
progress=None,
)
)
root_id = send(data, transports=[remote_transport], use_default_cache=False)
print(f"Sending to speckle complete after: {(time.time() - start) * 1000}ms")
create_version = CreateVersionInput( start = time.time()
object_id=root_id,
model_id=model_id,
project_id=project.id,
message=version_message,
source_application="ifc",
)
version = client.version.create(create_version)
end = time.time()
print(f"Version committed after: {(end - start) * 1000}ms")
print(f"Total time (to commit): {(end - very_start) * 1000}ms") version_id = client.model_ingestion.complete(
del ifc_file ModelIngestionSuccessInput(
project_id=project.id,
ingestion_id=model_ingestion_id,
root_object_id=root_id,
# version_message=version_message,
)
)
custom_properties = {"ui": "dui3", "actionSource": "import"} # needed to query version until ingestion api expands to serve it
if project.workspace_id: version = client.version.get(version_id, project.id)
custom_properties["workspace_id"] = project.workspace_id
metrics.track(metrics.SEND, account, custom_properties, send_sync=True)
return version end = time.time()
print(f"Version committed after: {(end - start) * 1000}ms")
print(f"Total time (to commit): {(end - very_start) * 1000}ms")
del ifc_file
custom_properties = {"ui": "dui3", "actionSource": "import"}
if project.workspace_id:
custom_properties["workspace_id"] = project.workspace_id
metrics.track(metrics.SEND, account, custom_properties, send_sync=True)
return version
except Exception as e:
stack_trace = traceback.format_exc()
with contextlib.suppress(Exception):
# make sure to not report process kills when we're cancelling
client.model_ingestion.fail_with_error(
ModelIngestionFailedInput(
project_id=project.id,
ingestion_id=model_ingestion_id,
error_reason=str(e),
error_stacktrace=stack_trace,
)
)
raise e
+7
View File
@@ -4,6 +4,7 @@ from specklepy.api.credentials import Account
from specklepy.api.resources import ( from specklepy.api.resources import (
ActiveUserResource, ActiveUserResource,
FileImportResource, FileImportResource,
ModelIngestionResource,
ModelResource, ModelResource,
OtherUserResource, OtherUserResource,
ProjectInviteResource, ProjectInviteResource,
@@ -119,6 +120,12 @@ class SpeckleClient(CoreSpeckleClient):
client=self.httpclient, client=self.httpclient,
server_version=server_version, server_version=server_version,
) )
self.model_ingestion = ModelIngestionResource(
account=self.account,
basepath=self.url,
client=self.httpclient,
server_version=server_version,
)
self.file_import = FileImportResource( self.file_import = FileImportResource(
account=self.account, account=self.account,
basepath=self.url, basepath=self.url,
+5
View File
@@ -1,5 +1,8 @@
from specklepy.api.resources.current.active_user_resource import ActiveUserResource from specklepy.api.resources.current.active_user_resource import ActiveUserResource
from specklepy.api.resources.current.file_import_resource import FileImportResource from specklepy.api.resources.current.file_import_resource import FileImportResource
from specklepy.api.resources.current.model_ingestion_resource import (
ModelIngestionResource,
)
from specklepy.api.resources.current.model_resource import ModelResource from specklepy.api.resources.current.model_resource import ModelResource
from specklepy.api.resources.current.other_user_resource import OtherUserResource from specklepy.api.resources.current.other_user_resource import OtherUserResource
from specklepy.api.resources.current.project_invite_resource import ( from specklepy.api.resources.current.project_invite_resource import (
@@ -22,4 +25,6 @@ __all__ = [
"SubscriptionResource", "SubscriptionResource",
"VersionResource", "VersionResource",
"WorkspaceResource", "WorkspaceResource",
"FileImportResource",
"ModelIngestionResource",
] ]
@@ -15,7 +15,7 @@ from specklepy.logging import metrics
class FileImportResource(CoreResource): class FileImportResource(CoreResource):
"""API Access class for projects""" """API Access class for file imports"""
def __init__(self, account, basepath, client, server_version) -> None: def __init__(self, account, basepath, client, server_version) -> None:
super().__init__( super().__init__(
@@ -0,0 +1,53 @@
from specklepy.core.api.inputs.model_ingestion_inputs import (
ModelIngestionCancelledInput,
ModelIngestionCreateInput,
ModelIngestionFailedInput,
ModelIngestionRequeueInput,
ModelIngestionStartProcessingInput,
ModelIngestionSuccessInput,
ModelIngestionUpdateInput,
)
from specklepy.core.api.models.current import (
ModelIngestion,
)
from specklepy.core.api.resources import (
ModelIngestionResource as CoreResource,
)
from specklepy.logging import metrics
class ModelIngestionResource(CoreResource):
"""API Access class for model ingestion"""
def __init__(self, account, basepath, client, server_version) -> None:
super().__init__(account, basepath, client, server_version)
def create(self, input: ModelIngestionCreateInput) -> ModelIngestion:
metrics.track(metrics.SDK, self.account, {"name": "Ingestion Create"})
return super().create(input)
def update_progress(self, input: ModelIngestionUpdateInput) -> ModelIngestion:
metrics.track(metrics.SDK, self.account, {"name": "Ingestion Update"})
return super().update_progress(input)
def start_processing(
self, input: ModelIngestionStartProcessingInput
) -> ModelIngestion:
metrics.track(metrics.SDK, self.account, {"name": "Ingestion Start Processing"})
return super().start_processing(input)
def requeue(self, input: ModelIngestionRequeueInput) -> ModelIngestion:
metrics.track(metrics.SDK, self.account, {"name": "Ingestion Update"})
return super().requeue(input)
def complete_successfully(self, input: ModelIngestionSuccessInput) -> str:
metrics.track(metrics.SDK, self.account, {"name": "Ingestion End"})
return super().complete(input)
def complete_failed(self, input: ModelIngestionFailedInput) -> ModelIngestion:
metrics.track(metrics.SDK, self.account, {"name": "Ingestion Error"})
return super().fail_with_error(input)
def fail_with_cancel(self, input: ModelIngestionCancelledInput) -> ModelIngestion:
metrics.track(metrics.SDK, self.account, {"name": "Ingestion Cancel"})
return super().fail_with_cancel(input)
+7
View File
@@ -12,6 +12,7 @@ from specklepy.core.api.credentials import Account
from specklepy.core.api.resources import ( from specklepy.core.api.resources import (
ActiveUserResource, ActiveUserResource,
FileImportResource, FileImportResource,
ModelIngestionResource,
ModelResource, ModelResource,
OtherUserResource, OtherUserResource,
ProjectInviteResource, ProjectInviteResource,
@@ -250,6 +251,12 @@ class SpeckleClient:
client=self.httpclient, client=self.httpclient,
server_version=server_version, server_version=server_version,
) )
self.model_ingestion = ModelIngestionResource(
account=self.account,
basepath=self.url,
client=self.httpclient,
server_version=server_version,
)
self.subscription = SubscriptionResource( self.subscription = SubscriptionResource(
account=self.account, account=self.account,
basepath=self.ws_url, basepath=self.ws_url,
+15
View File
@@ -30,3 +30,18 @@ class ProjectVersionsUpdatedMessageType(str, Enum):
CREATED = "CREATED" CREATED = "CREATED"
DELETED = "DELETED" DELETED = "DELETED"
UPDATED = "UPDATED" UPDATED = "UPDATED"
class ProjectModelIngestionUpdatedMessageType(str, Enum):
CANCELLATION_REQUESTED = "cancellationRequested"
CREATED = "created"
DELETED = "deleted"
UPDATED = "updated"
class ModelIngestionStatus(str, Enum):
CANCELLED = "cancelled"
FAILED = "failed"
PROCESSING = "processing"
QUEUED = "queued"
SUCCESS = "success"
@@ -0,0 +1,88 @@
from specklepy.core.api.enums import ProjectModelIngestionUpdatedMessageType
from specklepy.core.api.models.graphql_base_model import GraphQLBaseModel
class SourceDataInput(GraphQLBaseModel):
source_application_slug: str
source_application_version: str
file_name: str | None
file_size_bytes: int | None
class ModelIngestionCreateInput(GraphQLBaseModel):
model_id: str
project_id: str
progress_message: str
source_data: SourceDataInput
class ModelIngestionStartProcessingInput(GraphQLBaseModel):
ingestion_id: str
project_id: str
progress_message: str
source_data: SourceDataInput
class ModelIngestionRequeueInput(GraphQLBaseModel):
ingestion_id: str
project_id: str
progress_message: str
class ModelIngestionUpdateInput(GraphQLBaseModel):
ingestion_id: str
project_id: str
progress: float | None
progress_message: str
class ModelIngestionSuccessInput(GraphQLBaseModel):
ingestion_id: str
project_id: str
root_object_id: str
class ModelIngestionFailedInput(GraphQLBaseModel):
ingestion_id: str
project_id: str
error_reason: str
error_stacktrace: str | None
@staticmethod
def from_exception(
ingestion_id: str, project_id: str, exception: Exception, message: str | None
) -> "ModelIngestionFailedInput":
"""test"""
return ModelIngestionFailedInput(
ingestion_id=ingestion_id,
project_id=project_id,
error_reason=message if message else str(exception),
error_stacktrace=str(exception),
)
class ModelIngestionCancelledInput(GraphQLBaseModel):
ingestion_id: str
project_id: str
cancellation_message: str
class ModelIngestionRequestCancellationInput(GraphQLBaseModel):
ingestion_id: str
project_id: str
cancellation_message: str
class ModelIngestionReference(GraphQLBaseModel):
"""
`@oneOf` i.e. server expects **either** `ingestion_id` or `model_id`, but not both.
"""
ingestion_id: str | None
model_id: str | None
class ProjectModelIngestionSubscriptionInput(GraphQLBaseModel):
project_id: str
ingestion_reference: ModelIngestionReference
message_type: ProjectModelIngestionUpdatedMessageType | None = None
+15 -1
View File
@@ -1,7 +1,7 @@
from datetime import datetime from datetime import datetime
from typing import Generic, List, TypeVar from typing import Generic, List, TypeVar
from specklepy.core.api.enums import ProjectVisibility from specklepy.core.api.enums import ModelIngestionStatus, ProjectVisibility
from specklepy.core.api.models.graphql_base_model import GraphQLBaseModel from specklepy.core.api.models.graphql_base_model import GraphQLBaseModel
from specklepy.logging.exceptions import WorkspacePermissionException from specklepy.logging.exceptions import WorkspacePermissionException
@@ -244,3 +244,17 @@ class FileImport(GraphQLBaseModel):
class FileUploadUrl(GraphQLBaseModel): class FileUploadUrl(GraphQLBaseModel):
url: str url: str
file_id: str file_id: str
class ModelIngestionStatusData(GraphQLBaseModel):
status: ModelIngestionStatus
progress_message: str | None = None
class ModelIngestion(GraphQLBaseModel):
id: str
created_at: datetime
updated_at: datetime
cancellation_requested: bool
model_id: str
status_data: ModelIngestionStatusData
@@ -1,12 +1,13 @@
from typing import Optional from typing import Optional
from specklepy.core.api.enums import ( from specklepy.core.api.enums import (
ProjectModelIngestionUpdatedMessageType,
ProjectModelsUpdatedMessageType, ProjectModelsUpdatedMessageType,
ProjectUpdatedMessageType, ProjectUpdatedMessageType,
ProjectVersionsUpdatedMessageType, ProjectVersionsUpdatedMessageType,
UserProjectsUpdatedMessageType, UserProjectsUpdatedMessageType,
) )
from specklepy.core.api.models.current import Model, Project, Version from specklepy.core.api.models.current import Model, ModelIngestion, Project, Version
from specklepy.core.api.models.graphql_base_model import GraphQLBaseModel from specklepy.core.api.models.graphql_base_model import GraphQLBaseModel
@@ -33,3 +34,8 @@ class ProjectVersionsUpdatedMessage(GraphQLBaseModel):
type: ProjectVersionsUpdatedMessageType type: ProjectVersionsUpdatedMessageType
model_id: str model_id: str
version: Optional[Version] version: Optional[Version]
class ProjectModelIngestionUpdatedMessage(GraphQLBaseModel):
model_ingestion: ModelIngestion
type: ProjectModelIngestionUpdatedMessageType
@@ -1,5 +1,8 @@
from specklepy.core.api.resources.current.active_user_resource import ActiveUserResource from specklepy.core.api.resources.current.active_user_resource import ActiveUserResource
from specklepy.core.api.resources.current.file_import_resource import FileImportResource from specklepy.core.api.resources.current.file_import_resource import FileImportResource
from specklepy.core.api.resources.current.model_ingestion_resource import (
ModelIngestionResource,
)
from specklepy.core.api.resources.current.model_resource import ModelResource from specklepy.core.api.resources.current.model_resource import ModelResource
from specklepy.core.api.resources.current.other_user_resource import OtherUserResource from specklepy.core.api.resources.current.other_user_resource import OtherUserResource
from specklepy.core.api.resources.current.project_invite_resource import ( from specklepy.core.api.resources.current.project_invite_resource import (
@@ -24,4 +27,6 @@ __all__ = [
"SubscriptionResource", "SubscriptionResource",
"VersionResource", "VersionResource",
"WorkspaceResource", "WorkspaceResource",
"FileImportResource",
"ModelIngestionResource",
] ]
@@ -16,13 +16,15 @@ from specklepy.core.api.resource import ResourceBase
from specklepy.core.api.responses import DataResponse from specklepy.core.api.responses import DataResponse
from specklepy.logging.exceptions import SpeckleException from specklepy.logging.exceptions import SpeckleException
NAME = "file_import"
class UploadFileResponse(GraphQLBaseModel): class UploadFileResponse(GraphQLBaseModel):
etag: str etag: str
class FileImportResource(ResourceBase): class FileImportResource(ResourceBase):
"""API Access class for project invites""" """API Access class for file imports"""
def __init__( def __init__(
self, self,
@@ -36,7 +38,7 @@ class FileImportResource(ResourceBase):
basepath=basepath, basepath=basepath,
client=client, client=client,
server_version=server_version, server_version=server_version,
name="file-import", name=NAME,
) )
def finish_file_import_job(self, input: FinishFileImportInput) -> bool: def finish_file_import_job(self, input: FinishFileImportInput) -> bool:
@@ -0,0 +1,398 @@
from typing import Any, Optional, Tuple
from gql import Client, gql
from specklepy.api.credentials import Account
from specklepy.core.api.inputs.model_ingestion_inputs import (
ModelIngestionCancelledInput,
ModelIngestionCreateInput,
ModelIngestionFailedInput,
ModelIngestionRequestCancellationInput,
ModelIngestionRequeueInput,
ModelIngestionStartProcessingInput,
ModelIngestionSuccessInput,
ModelIngestionUpdateInput,
)
from specklepy.core.api.models.current import (
ModelIngestion,
)
from specklepy.core.api.resource import ResourceBase
from specklepy.core.api.responses import DataResponse
NAME = "ingestion"
class ModelIngestionResource(ResourceBase):
"""API Access class for model ingestion"""
def __init__(
self,
account: Account,
basepath: str,
client: Client,
server_version: Optional[Tuple[Any, ...]],
) -> None:
super().__init__(
account=account,
basepath=basepath,
client=client,
name=NAME,
server_version=server_version,
)
def get_ingestion(self, project_id: str, model_id: str) -> ModelIngestion:
QUERY = gql(
"""
query Query($projectId: String!, $modelId: String!) {
data:project(id: $projectId) {
data:model(id: $modelId) {
data:ingestion {
id
createdAt
modelId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
status
}
... on HasProgressMessage {
progressMessage
}
}
}
}
}
}
""" # noqa: E501
)
variables = {
"projectId": project_id,
"modelId": model_id,
}
return self.make_request_and_parse_response(
DataResponse[DataResponse[DataResponse[ModelIngestion]]],
QUERY,
variables,
).data.data.data
def create(self, input: ModelIngestionCreateInput) -> ModelIngestion:
QUERY = gql(
"""
mutation IngestionCreate($input: ModelIngestionCreateInput!) {
data: projectMutations {
data: modelIngestionMutations {
data: create(input: $input) {
id
createdAt
updatedAt
modelId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
status
}
... on HasProgressMessage {
progressMessage
}
}
}
}
}
}
"""
)
variables = {
"input": input.model_dump(warnings="error", by_alias=True),
}
return self.make_request_and_parse_response(
DataResponse[DataResponse[DataResponse[ModelIngestion]]], QUERY, variables
).data.data.data
def start_processing(
self, input: ModelIngestionStartProcessingInput
) -> ModelIngestion:
QUERY = gql(
"""
mutation IngestionStartProcessing($input: ModelIngestionStartProcessingInput!) {
data: projectMutations {
data: modelIngestionMutations {
data: startProcessing(input: $input) {
id
createdAt
updatedAt
modelId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
status
}
... on HasProgressMessage {
progressMessage
}
}
}
}
}
}
""" # noqa: E501
)
variables = {
"input": input.model_dump(warnings="error", by_alias=True),
}
return self.make_request_and_parse_response(
DataResponse[DataResponse[DataResponse[ModelIngestion]]], QUERY, variables
).data.data.data
def requeue(self, input: ModelIngestionRequeueInput) -> ModelIngestion:
QUERY = gql(
"""
mutation IngestionRequeue($input: ModelIngestionRequeueInput!) {
data: projectMutations {
data: modelIngestionMutations {
data: requeue(input: $input) {
id
createdAt
updatedAt
modelId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
status
}
... on HasProgressMessage {
progressMessage
}
}
}
}
}
}
""" # noqa: E501
)
variables = {
"input": input.model_dump(warnings="error", by_alias=True),
}
return self.make_request_and_parse_response(
DataResponse[DataResponse[DataResponse[ModelIngestion]]], QUERY, variables
).data.data.data
def update_progress(self, input: ModelIngestionUpdateInput) -> ModelIngestion:
QUERY = gql(
"""
mutation IngestionUpdateProgress(
$input: ModelIngestionUpdateInput!
) {
data: projectMutations {
data: modelIngestionMutations {
data: updateProgress(input: $input) {
id
createdAt
updatedAt
modelId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
status
}
... on HasProgressMessage {
progressMessage
}
}
}
}
}
}
"""
)
variables = {
"input": input.model_dump(warnings="error", by_alias=True),
}
return self.make_request_and_parse_response(
DataResponse[DataResponse[DataResponse[ModelIngestion]]], QUERY, variables
).data.data.data
def complete(self, input: ModelIngestionSuccessInput) -> str:
"""
Request that the server completes the ingestion by creating a version
If successful, the job will be in a terminal "successful" state.
For failed Ingestions, use `fail_with_error` instead
For user cancellation, use `fail_with_cancelled` instead
Arguments:
input {ModelIngestionSuccessInput} -- input variable
Returns:
str -- the id of the version that was just created to complete the ingestion
"""
QUERY = gql(
"""
mutation IngestionComplete($input: ModelIngestionSuccessInput!) {
data: projectMutations {
data: modelIngestionMutations {
data: completeWithVersion(input: $input) {
data:statusData {
... on ModelIngestionSuccessStatus {
data:versionId
}
}
}
}
}
}
"""
)
variables = {
"input": input.model_dump(warnings="error", by_alias=True),
}
return self.make_request_and_parse_response(
DataResponse[DataResponse[DataResponse[DataResponse[DataResponse[str]]]]],
QUERY,
variables,
).data.data.data.data.data
def fail_with_error(self, input: ModelIngestionFailedInput) -> ModelIngestion:
"""
Fail the job with an error.
For user requested cancellation, use `fail_with_cancelled` instead
"""
QUERY = gql(
"""
mutation IngestionFailWithError($input: ModelIngestionFailedInput!) {
data: projectMutations {
data: modelIngestionMutations {
data: failWithError(input: $input) {
id
createdAt
updatedAt
modelId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
status
}
... on HasProgressMessage {
progressMessage
}
}
}
}
}
}
"""
)
variables = {
"input": input.model_dump(warnings="error", by_alias=True),
}
return self.make_request_and_parse_response(
DataResponse[DataResponse[DataResponse[ModelIngestion]]],
QUERY,
variables,
).data.data.data
def fail_with_cancel(self, input: ModelIngestionCancelledInput) -> ModelIngestion:
"""
Fail the ingestion with a `cancelled` status.
This should only be done if the user has explicitly requested cancellation
Other forms of cancellation use `fail_with_error`
The ingestion should then enter a terminal "canceled" state
"""
QUERY = gql(
"""
mutation IngestionFailWithCancel($input: ModelIngestionCancelledInput!) {
data: projectMutations {
data: modelIngestionMutations {
data: failWithCancel(input: $input) {
id
createdAt
updatedAt
modelId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
status
}
... on HasProgressMessage {
progressMessage
}
}
}
}
}
}
"""
)
variables = {
"input": input.model_dump(warnings="error", by_alias=True),
}
return self.make_request_and_parse_response(
DataResponse[DataResponse[DataResponse[ModelIngestion]]],
QUERY,
variables,
).data.data.data
def request_cancellation(
self, input: ModelIngestionRequestCancellationInput
) -> ModelIngestion:
"""
Request that the ingestion is canceled.
Note: simply calling this mutation does not immediately cancel,
it doesn't even guarantee it will be canceled at all.
It's up to the client to observe this cancellation request
via `subscription.project_model_ingestion_cancellation_requested`
and report it as cancelled (via `ingestion.fail_with_cancel`
See "cooperative cancellation pattern"
"""
QUERY = gql(
"""
mutation IngestionRequestCancellation($input: ModelIngestionRequestCancellationInput!) {
data: projectMutations {
data: modelIngestionMutations {
data: requestCancellation (input: $input) {
id
createdAt
updatedAt
modelId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
status
}
... on HasProgressMessage {
progressMessage
}
}
}
}
}
}
""" # noqa: E501
)
variables = {
"input": input.model_dump(warnings="error", by_alias=True),
}
return self.make_request_and_parse_response(
DataResponse[DataResponse[DataResponse[ModelIngestion]]],
QUERY,
variables,
).data.data.data
@@ -6,17 +6,25 @@ from graphql import DocumentNode
from pydantic import BaseModel from pydantic import BaseModel
from typing_extensions import TypeVar from typing_extensions import TypeVar
from specklepy.core.api.enums import ProjectModelIngestionUpdatedMessageType
from specklepy.core.api.inputs.model_ingestion_inputs import (
ModelIngestionReference,
ProjectModelIngestionSubscriptionInput,
)
from specklepy.core.api.models import ( from specklepy.core.api.models import (
ProjectModelsUpdatedMessage, ProjectModelsUpdatedMessage,
ProjectUpdatedMessage, ProjectUpdatedMessage,
ProjectVersionsUpdatedMessage, ProjectVersionsUpdatedMessage,
UserProjectsUpdatedMessage, UserProjectsUpdatedMessage,
) )
from specklepy.core.api.models.subscription_messages import (
ProjectModelIngestionUpdatedMessage,
)
from specklepy.core.api.resource import ResourceBase from specklepy.core.api.resource import ResourceBase
from specklepy.core.api.responses import DataResponse from specklepy.core.api.responses import DataResponse
from specklepy.logging.exceptions import SpeckleException from specklepy.logging.exceptions import SpeckleException
NAME = "subscribe" NAME = "subscription"
TEventArgs = TypeVar("TEventArgs", bound=BaseModel) TEventArgs = TypeVar("TEventArgs", bound=BaseModel)
@@ -202,6 +210,66 @@ class SubscriptionResource(ResourceBase):
callback=lambda d: callback(d.data), callback=lambda d: callback(d.data),
) )
async def project_model_ingestion_updated(
self,
callback: Callable[[ProjectModelIngestionUpdatedMessage], None],
input: ProjectModelIngestionSubscriptionInput,
) -> None:
QUERY = gql(
"""
subscription IngestionUpdated($input: ProjectModelIngestionSubscriptionInput!) {
data: projectModelIngestionUpdated(input: $input) {
modelIngestion {
id
createdAt
updatedAt
modelId
cancellationRequested
statusData {
... on HasModelIngestionStatus {
status
}
... on HasProgressMessage {
progressMessage
}
}
}
type
}
}
""" # noqa: E501
)
variables = {
"input": input.model_dump(
warnings="error", by_alias=True, exclude_none=True
),
}
await self.subscribe_2(
DataResponse[ProjectModelIngestionUpdatedMessage],
QUERY,
variables,
callback=lambda d: callback(d.data),
)
async def project_model_ingestion_cancellation_requested(
self,
callback: Callable[[ProjectModelIngestionUpdatedMessage], None],
project_id: str,
ingestion_id: str,
) -> None:
await self.project_model_ingestion_updated(
callback,
ProjectModelIngestionSubscriptionInput(
project_id=project_id,
ingestion_reference=ModelIngestionReference(
ingestion_id=ingestion_id, model_id=None
),
message_type=ProjectModelIngestionUpdatedMessageType.CANCELLATION_REQUESTED,
),
)
@check_wsclient @check_wsclient
async def subscribe_2( async def subscribe_2(
self, self,
@@ -14,7 +14,7 @@ from specklepy.core.api.models import ResourceCollection, Version
from specklepy.core.api.resource import ResourceBase from specklepy.core.api.resource import ResourceBase
from specklepy.core.api.responses import DataResponse from specklepy.core.api.responses import DataResponse
NAME = "model" NAME = "version"
class VersionResource(ResourceBase): class VersionResource(ResourceBase):
@@ -16,7 +16,7 @@ NAME = "workspace"
class WorkspaceResource(ResourceBase): class WorkspaceResource(ResourceBase):
"""API Access class for models""" """API Access class for workspaces"""
def __init__(self, account, basepath, client, server_version) -> None: def __init__(self, account, basepath, client, server_version) -> None:
super().__init__( super().__init__(
@@ -0,0 +1,268 @@
from datetime import datetime
import pytest
from specklepy.api import operations
from specklepy.api.client import SpeckleClient
from specklepy.core.api.enums import ModelIngestionStatus
from specklepy.core.api.inputs.model_ingestion_inputs import (
ModelIngestionCancelledInput,
ModelIngestionCreateInput,
ModelIngestionFailedInput,
ModelIngestionRequeueInput,
ModelIngestionStartProcessingInput,
ModelIngestionSuccessInput,
ModelIngestionUpdateInput,
SourceDataInput,
)
from specklepy.core.api.inputs.model_inputs import CreateModelInput
from specklepy.core.api.inputs.project_inputs import ProjectCreateInput
from specklepy.core.api.models.current import (
Model,
ModelIngestion,
ModelIngestionStatusData,
Project,
ProjectVisibility,
Version,
)
from specklepy.logging.exceptions import GraphQLException
from specklepy.objects.base import Base
from specklepy.transports.server.server import ServerTransport
from tests.integration.conftest import is_public
@pytest.mark.run()
@pytest.mark.skipif(is_public(), reason="The public API does not support these tests")
class TestIngestionResource:
@pytest.fixture
def project(self, client: SpeckleClient):
return client.project.create(
ProjectCreateInput(
name="test", description=None, visibility=ProjectVisibility.PUBLIC
)
)
@pytest.fixture
def model(self, client: SpeckleClient, project: Project):
return client.model.create(
CreateModelInput(name="test", description=None, project_id=project.id)
)
@pytest.fixture()
def ingestion(
self, client: SpeckleClient, model: Model, project: Project
) -> ModelIngestion:
input = ModelIngestionCreateInput(
model_id=model.id,
project_id=project.id,
progress_message="Starting processing",
source_data=SourceDataInput(
source_application_slug="pytest",
source_application_version="0.0.0",
file_name=None,
file_size_bytes=None,
),
)
ingestion = client.model_ingestion.create(input)
assert isinstance(ingestion, ModelIngestion)
assert isinstance(ingestion.id, str)
assert isinstance(ingestion.created_at, datetime)
assert isinstance(ingestion.updated_at, datetime)
assert isinstance(ingestion.cancellation_requested, bool)
assert isinstance(ingestion.model_id, str)
assert isinstance(ingestion.status_data, ModelIngestionStatusData)
assert isinstance(ingestion.status_data.progress_message, str | None)
assert ingestion.status_data.status == ModelIngestionStatus.PROCESSING
assert not ingestion.cancellation_requested
assert ingestion.model_id == model.id
return ingestion
def test_update_progress(
self, client: SpeckleClient, ingestion: ModelIngestion, project: Project
):
def update(progress: float | None, message: str):
input = ModelIngestionUpdateInput(
ingestion_id=ingestion.id,
project_id=project.id,
progress=progress,
progress_message=message,
)
res = client.model_ingestion.update_progress(input)
assert isinstance(res, ModelIngestion)
assert res.status_data.progress_message == message
assert not res.cancellation_requested
assert res.status_data.status == ModelIngestionStatus.PROCESSING
update(None, "None")
update(0.1, "0.1")
update(0.5, "Whoa-oh! We're half way there!")
update(1, "Finished")
update(0.2, "Back to processing again")
def test_start_processing_and_requeue(
self, client: SpeckleClient, ingestion: ModelIngestion, project: Project
):
# just setting the baseline state
assert ingestion.status_data.status == ModelIngestionStatus.PROCESSING
def requeue(message: str):
input = ModelIngestionRequeueInput(
project_id=project.id,
ingestion_id=ingestion.id,
progress_message=message,
)
res = client.model_ingestion.requeue(input)
assert isinstance(res, ModelIngestion)
assert res.status_data.progress_message == message
assert not res.cancellation_requested
assert res.status_data.status == ModelIngestionStatus.QUEUED
def start_processing(message: str):
input = ModelIngestionStartProcessingInput(
ingestion_id=ingestion.id,
project_id=project.id,
progress_message=message,
source_data=SourceDataInput(
source_application_slug="test",
source_application_version="test",
file_name="test",
file_size_bytes=1,
),
)
res = client.model_ingestion.start_processing(input)
assert isinstance(res, ModelIngestion)
assert res.status_data.progress_message == message
assert not res.cancellation_requested
assert res.status_data.status == ModelIngestionStatus.PROCESSING
requeue("put it back in there")
start_processing("go for it")
requeue("and again")
start_processing("run forest run")
def update(progress: float | None, message: str):
input = ModelIngestionUpdateInput(
ingestion_id=ingestion.id,
project_id=project.id,
progress=progress,
progress_message=message,
)
res = client.model_ingestion.update_progress(input)
assert isinstance(res, ModelIngestion)
assert res.status_data.progress_message == message
assert not res.cancellation_requested
assert res.status_data.status == ModelIngestionStatus.PROCESSING
update(None, "None")
update(0.1, "0.1")
update(0.5, "Whoa-oh! We're half way there!")
update(1, "Finished")
update(0.2, "Back to processing again")
def test_error(
self, client: SpeckleClient, ingestion: ModelIngestion, project: Project
):
input = ModelIngestionFailedInput(
ingestion_id=ingestion.id,
project_id=project.id,
error_reason="Failed to integration test an error",
error_stacktrace="over here in test_error",
)
res = client.model_ingestion.fail_with_error(input)
assert isinstance(res, ModelIngestion)
assert res.status_data.progress_message is None
assert not res.cancellation_requested
assert res.status_data.status == ModelIngestionStatus.FAILED
# trying to fail for a second time should throw
# with pytest.raises(GraphQLException):
# _ = client.ingestion.fail_with_error(input)
def test_complete(
self, client: SpeckleClient, ingestion: ModelIngestion, project: Project
):
remote = ServerTransport(project.id, client)
object_id = operations.send(
Base(applicationId="ASDFGHJKL"), [remote], use_default_cache=False
)
input = ModelIngestionSuccessInput(
ingestion_id=ingestion.id,
root_object_id=object_id,
project_id=project.id,
# version_message=None,
)
res = client.model_ingestion.complete(input)
assert isinstance(res, str)
version = client.version.get(res, project.id)
assert isinstance(version, Version)
# trying to complete for a second time should throw
# with pytest.raises(GraphQLException):
# _ = client.ingestion.complete(input)
def test_cancel(
self, client: SpeckleClient, ingestion: ModelIngestion, project: Project
):
input = ModelIngestionCancelledInput(
ingestion_id=ingestion.id,
project_id=project.id,
cancellation_message="This was cancelled for testing purposes",
)
res = client.model_ingestion.fail_with_cancel(input)
assert isinstance(res, ModelIngestion)
assert res.status_data.progress_message is None
assert not res.cancellation_requested
assert res.status_data.status == ModelIngestionStatus.CANCELLED
# Cancel again, should be idempotent
res = client.model_ingestion.fail_with_cancel(input)
assert res.status_data.progress_message is None
assert not res.cancellation_requested
assert res.status_data.status == ModelIngestionStatus.CANCELLED
def test_error_non_existent_ingestion(
self, client: SpeckleClient, project: Project
):
input = ModelIngestionFailedInput(
ingestion_id="Non-existent-ingestion",
project_id=project.id,
error_reason="Failed to integration test an error",
error_stacktrace="over here in test_error",
)
with pytest.raises(GraphQLException):
_ = client.model_ingestion.fail_with_error(input)
def test_complete_failed_non_existent_ingestion(
self, client: SpeckleClient, project: Project
):
input = ModelIngestionFailedInput(
ingestion_id="Non-existent-ingestion",
project_id=project.id,
error_reason="Failed to integration test an error",
error_stacktrace="over here in test_error",
)
with pytest.raises(GraphQLException):
_ = client.model_ingestion.fail_with_error(input)
def test_complete_non_existent_root_object(
self, client: SpeckleClient, ingestion: ModelIngestion, project: Project
):
input = ModelIngestionSuccessInput(
ingestion_id=ingestion.id,
root_object_id="asdfasdfasdfasfd",
project_id=project.id,
# version_message=None,
)
with pytest.raises(GraphQLException):
_ = client.model_ingestion.complete(input)
@@ -1,15 +1,26 @@
import asyncio import asyncio
from typing import Dict, Optional from sys import platform
from typing import Dict
import pytest import pytest
from specklepy.api.client import SpeckleClient from specklepy.api.client import SpeckleClient
from specklepy.core.api.enums import ( from specklepy.core.api.enums import (
ModelIngestionStatus,
ProjectModelIngestionUpdatedMessageType,
ProjectModelsUpdatedMessageType, ProjectModelsUpdatedMessageType,
ProjectUpdatedMessageType, ProjectUpdatedMessageType,
ProjectVersionsUpdatedMessageType, ProjectVersionsUpdatedMessageType,
UserProjectsUpdatedMessageType, UserProjectsUpdatedMessageType,
) )
from specklepy.core.api.inputs.model_ingestion_inputs import (
ModelIngestionCreateInput,
ModelIngestionReference,
ModelIngestionRequestCancellationInput,
ModelIngestionUpdateInput,
ProjectModelIngestionSubscriptionInput,
SourceDataInput,
)
from specklepy.core.api.inputs.model_inputs import CreateModelInput from specklepy.core.api.inputs.model_inputs import CreateModelInput
from specklepy.core.api.inputs.project_inputs import ( from specklepy.core.api.inputs.project_inputs import (
ProjectCreateInput, ProjectCreateInput,
@@ -24,9 +35,16 @@ from specklepy.core.api.models import (
UserProjectsUpdatedMessage, UserProjectsUpdatedMessage,
Version, Version,
) )
from tests.integration.conftest import create_client, create_version from specklepy.core.api.models.current import ModelIngestion
from specklepy.core.api.models.subscription_messages import (
ProjectModelIngestionUpdatedMessage,
)
from tests.integration.conftest import create_client, create_version, is_public
WAIT_PERIOD = 0.4 # time in seconds # WSL is slow AF, so for local runs, we're being extra generous
# For CI runs on linux,m a much smaller wait time is acceptable
SETUP_TIME_SECONDS = 1 if platform == "linux" else 4
MAX_WAIT_TIME_SECONDS = 0.75 if platform == "linux" else 5
@pytest.mark.run() @pytest.mark.run()
@@ -55,48 +73,68 @@ class TestSubscriptionResource:
) )
return model1 return model1
@pytest.fixture
def test_model_ingestion(
self,
subscription_client: SpeckleClient,
test_project: Project,
test_model: Model,
) -> ModelIngestion:
project = subscription_client.model_ingestion.create(
ModelIngestionCreateInput(
project_id=test_project.id,
model_id=test_model.id,
progress_message="",
source_data=SourceDataInput(
source_application_slug="pytest",
source_application_version="0.0.0",
file_name=None,
file_size_bytes=None,
),
)
)
return project
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_user_projects_updated( async def test_user_projects_updated(
self, self,
subscription_client: SpeckleClient, subscription_client: SpeckleClient,
) -> None: ) -> None:
message: Optional[UserProjectsUpdatedMessage] = None loop = asyncio.get_running_loop()
future: asyncio.Future[UserProjectsUpdatedMessage] = loop.create_future()
task = None
def callback(d: UserProjectsUpdatedMessage): def callback(d: UserProjectsUpdatedMessage):
nonlocal message nonlocal future
message = d future.set_result(d)
task = asyncio.create_task( task = asyncio.create_task(
subscription_client.subscription.user_projects_updated(callback) subscription_client.subscription.user_projects_updated(callback)
) )
await asyncio.sleep(WAIT_PERIOD) # Give time to subscription to be setup await asyncio.sleep(SETUP_TIME_SECONDS) # Give time to subscription to be setup
input = ProjectCreateInput(name=None, description=None, visibility=None) input = ProjectCreateInput(name=None, description=None, visibility=None)
created = subscription_client.project.create(input) created = subscription_client.project.create(input)
await asyncio.sleep(WAIT_PERIOD) # Give time for subscription to be triggered message = await asyncio.wait_for(future, timeout=MAX_WAIT_TIME_SECONDS)
assert isinstance(message, UserProjectsUpdatedMessage) assert isinstance(message, UserProjectsUpdatedMessage)
assert message.id == created.id assert message.id == created.id
assert message.type == UserProjectsUpdatedMessageType.ADDED assert message.type == UserProjectsUpdatedMessageType.ADDED
assert isinstance(message.project, Project) assert isinstance(message.project, Project)
task.cancel() if not task.cancel():
await task await task
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_project_models_updated( async def test_project_models_updated(
self, subscription_client: SpeckleClient, test_project: Project self, subscription_client: SpeckleClient, test_project: Project
) -> None: ) -> None:
message: Optional[ProjectModelsUpdatedMessage] = None loop = asyncio.get_running_loop()
future: asyncio.Future[ProjectModelsUpdatedMessage] = loop.create_future()
task = None
def callback(d: ProjectModelsUpdatedMessage): def callback(d: ProjectModelsUpdatedMessage):
nonlocal message nonlocal future
message = d future.set_result(d)
task = asyncio.create_task( task = asyncio.create_task(
subscription_client.subscription.project_models_updated( subscription_client.subscription.project_models_updated(
@@ -104,51 +142,53 @@ class TestSubscriptionResource:
) )
) )
await asyncio.sleep(WAIT_PERIOD) # Give time to subscription to be setup await asyncio.sleep(SETUP_TIME_SECONDS) # Give time to subscription to be setup
input = CreateModelInput( input = CreateModelInput(
name="my model", description="myDescription", project_id=test_project.id name="my model", description="myDescription", project_id=test_project.id
) )
created = subscription_client.model.create(input) created = subscription_client.model.create(input)
await asyncio.sleep(WAIT_PERIOD) # Give time for subscription to be triggered message = await asyncio.wait_for(future, timeout=MAX_WAIT_TIME_SECONDS)
assert isinstance(message, ProjectModelsUpdatedMessage) assert isinstance(message, ProjectModelsUpdatedMessage)
assert message.id == created.id assert message.id == created.id
assert message.type == ProjectModelsUpdatedMessageType.CREATED assert message.type == ProjectModelsUpdatedMessageType.CREATED
assert isinstance(message.model, Model) assert isinstance(message.model, Model)
task.cancel()
await task if not task.cancel():
await task
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_project_updated( async def test_project_updated(
self, subscription_client: SpeckleClient, test_project: Project self, subscription_client: SpeckleClient, test_project: Project
) -> None: ) -> None:
message: Optional[ProjectUpdatedMessage] = None loop = asyncio.get_running_loop()
future: asyncio.Future[ProjectUpdatedMessage] = loop.create_future()
task = None
def callback(d: ProjectUpdatedMessage): def callback(d: ProjectUpdatedMessage):
nonlocal message nonlocal future
message = d future.set_result(d)
task = asyncio.create_task( task = asyncio.create_task(
subscription_client.subscription.project_updated(callback, test_project.id) subscription_client.subscription.project_updated(callback, test_project.id)
) )
await asyncio.sleep(WAIT_PERIOD) # Give time to subscription to be setup await asyncio.sleep(
SETUP_TIME_SECONDS
) # Give time for subscription to be triggered
input = ProjectUpdateInput(id=test_project.id, name="This is my new name") input = ProjectUpdateInput(id=test_project.id, name="This is my new name")
created = subscription_client.project.update(input) created = subscription_client.project.update(input)
await asyncio.sleep(WAIT_PERIOD) # Give time for subscription to be triggered message = await asyncio.wait_for(future, timeout=MAX_WAIT_TIME_SECONDS)
assert isinstance(message, ProjectUpdatedMessage) assert isinstance(message, ProjectUpdatedMessage)
assert message.id == created.id assert message.id == created.id
assert message.type == ProjectUpdatedMessageType.UPDATED assert message.type == ProjectUpdatedMessageType.UPDATED
assert isinstance(message.project, Project) assert isinstance(message.project, Project)
task.cancel() if not task.cancel():
await task await task
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_project_versions_updated( async def test_project_versions_updated(
@@ -157,13 +197,12 @@ class TestSubscriptionResource:
test_project: Project, test_project: Project,
test_model: Model, test_model: Model,
) -> None: ) -> None:
message: Optional[ProjectVersionsUpdatedMessage] = None loop = asyncio.get_running_loop()
future: asyncio.Future[ProjectVersionsUpdatedMessage] = loop.create_future()
task = None
def callback(d: ProjectVersionsUpdatedMessage): def callback(d: ProjectVersionsUpdatedMessage):
nonlocal message nonlocal future
message = d future.set_result(d)
task = asyncio.create_task( task = asyncio.create_task(
subscription_client.subscription.project_versions_updated( subscription_client.subscription.project_versions_updated(
@@ -171,15 +210,181 @@ class TestSubscriptionResource:
) )
) )
await asyncio.sleep(WAIT_PERIOD) # Give time to subscription to be setup await asyncio.sleep(SETUP_TIME_SECONDS) # Give time to subscription to be setup
created = create_version(subscription_client, test_project.id, test_model.id) created = create_version(subscription_client, test_project.id, test_model.id)
await asyncio.sleep(WAIT_PERIOD) # Give time for subscription to be triggered message = await asyncio.wait_for(future, timeout=MAX_WAIT_TIME_SECONDS)
assert isinstance(message, ProjectVersionsUpdatedMessage) assert isinstance(message, ProjectVersionsUpdatedMessage)
assert message.id == created.id assert message.id == created.id
assert message.type == ProjectVersionsUpdatedMessageType.CREATED assert message.type == ProjectVersionsUpdatedMessageType.CREATED
assert isinstance(message.version, Version) assert isinstance(message.version, Version)
task.cancel() if not task.cancel():
await task await task
@pytest.mark.asyncio
@pytest.mark.skipif(
is_public(), reason="The public API does not support these tests"
)
async def test_project_model_ingestion_cancellation(
self,
subscription_client: SpeckleClient,
test_project: Project,
test_model_ingestion: ModelIngestion,
) -> None:
assert not test_model_ingestion.cancellation_requested
loop = asyncio.get_running_loop()
future: asyncio.Future[ProjectModelIngestionUpdatedMessage] = (
loop.create_future()
)
def callback(d: ProjectModelIngestionUpdatedMessage):
nonlocal future
future.set_result(d)
task = asyncio.create_task(
subscription_client.subscription.project_model_ingestion_cancellation_requested(
callback, test_project.id, ingestion_id=test_model_ingestion.id
)
)
await asyncio.sleep(SETUP_TIME_SECONDS) # Give time to subscription to be setup
cancellation_request = ModelIngestionRequestCancellationInput(
ingestion_id=test_model_ingestion.id,
project_id=test_project.id,
cancellation_message="Please cancel",
)
created = subscription_client.model_ingestion.request_cancellation(
cancellation_request
)
assert created.id == test_model_ingestion.id
assert created.cancellation_requested
assert created.status_data.status == ModelIngestionStatus.PROCESSING
message = await asyncio.wait_for(future, timeout=MAX_WAIT_TIME_SECONDS)
assert isinstance(message, ProjectModelIngestionUpdatedMessage)
assert message.model_ingestion.id == created.id
assert message.model_ingestion.cancellation_requested
assert (
message.type
== ProjectModelIngestionUpdatedMessageType.CANCELLATION_REQUESTED
)
assert created.status_data.status == ModelIngestionStatus.PROCESSING
if not task.cancel():
await task
@pytest.mark.asyncio
@pytest.mark.skipif(
is_public(), reason="The public API does not support these tests"
)
async def test_project_model_ingestion_cancellation_isnt_triggered_by_updates(
self,
subscription_client: SpeckleClient,
test_project: Project,
test_model_ingestion: ModelIngestion,
) -> None:
assert not test_model_ingestion.cancellation_requested
loop = asyncio.get_running_loop()
future: asyncio.Future[ProjectModelIngestionUpdatedMessage] = (
loop.create_future()
)
def callback(d: ProjectModelIngestionUpdatedMessage):
nonlocal future
future.set_result(d)
task = asyncio.create_task(
subscription_client.subscription.project_model_ingestion_cancellation_requested(
callback, test_project.id, ingestion_id=test_model_ingestion.id
)
)
await asyncio.sleep(SETUP_TIME_SECONDS) # Give time to subscription to be setup
cancellation_request = ModelIngestionUpdateInput(
ingestion_id=test_model_ingestion.id,
project_id=test_project.id,
progress=None,
progress_message="this is just an ordinary update",
)
created = subscription_client.model_ingestion.update_progress(
cancellation_request
)
assert created.id == test_model_ingestion.id
assert not created.cancellation_requested
assert created.status_data.status == ModelIngestionStatus.PROCESSING
await asyncio.sleep(MAX_WAIT_TIME_SECONDS)
assert (
not future.done()
) # make sure the sub did not call back and resolve the future
if not task.cancel():
await task
@pytest.mark.asyncio
@pytest.mark.skipif(
is_public(), reason="The public API does not support these tests"
)
async def test_project_model_ingestion_updates(
self,
subscription_client: SpeckleClient,
test_project: Project,
test_model_ingestion: ModelIngestion,
) -> None:
assert not test_model_ingestion.cancellation_requested
loop = asyncio.get_running_loop()
future: asyncio.Future[ProjectModelIngestionUpdatedMessage] = (
loop.create_future()
)
def callback(d: ProjectModelIngestionUpdatedMessage):
nonlocal future
future.set_result(d)
task = asyncio.create_task(
subscription_client.subscription.project_model_ingestion_updated(
callback,
input=ProjectModelIngestionSubscriptionInput(
project_id=test_project.id,
ingestion_reference=ModelIngestionReference(
ingestion_id=test_model_ingestion.id, model_id=None
),
),
# ingestion_id=test_model_ingestion.id,
)
)
await asyncio.sleep(SETUP_TIME_SECONDS) # Give time to subscription to be setup
progress_message = "this is just an ordinary update"
cancellation_request = ModelIngestionUpdateInput(
ingestion_id=test_model_ingestion.id,
project_id=test_project.id,
progress=None,
progress_message=progress_message,
)
created = subscription_client.model_ingestion.update_progress(
cancellation_request
)
assert created.id == test_model_ingestion.id
assert not created.cancellation_requested
assert created.status_data.status == ModelIngestionStatus.PROCESSING
message = await asyncio.wait_for(future, timeout=MAX_WAIT_TIME_SECONDS)
assert isinstance(message, ProjectModelIngestionUpdatedMessage)
assert message.model_ingestion.id == created.id
assert not message.model_ingestion.cancellation_requested
assert message.type == ProjectModelIngestionUpdatedMessageType.UPDATED
assert message.model_ingestion.status_data.progress_message == progress_message
assert created.status_data.status == ModelIngestionStatus.PROCESSING
if not task.cancel():
await task
+5
View File
@@ -1,3 +1,4 @@
import os
import random import random
import uuid import uuid
from typing import Dict from typing import Dict
@@ -29,6 +30,10 @@ def host() -> str:
return "localhost:3000" return "localhost:3000"
def is_public() -> bool:
return os.getenv("IS_PUBLIC", "false").lower() == "true"
def seed_user(host: str) -> Dict[str, str]: def seed_user(host: str) -> Dict[str, str]:
seed = uuid.uuid4().hex seed = uuid.uuid4().hex
user_dict = { user_dict = {
Generated
+1 -1
View File
@@ -526,7 +526,7 @@ name = "exceptiongroup"
version = "1.3.0" version = "1.3.0"
source = { registry = "https://pypi.org/simple/" } source = { registry = "https://pypi.org/simple/" }
dependencies = [ dependencies = [
{ name = "typing-extensions", marker = "python_full_version < '3.13'" }, { name = "typing-extensions", marker = "python_full_version < '3.11'" },
] ]
sdist = { url = "https://files.pythonhosted.org/packages/0b/9f/a65090624ecf468cdca03533906e7c69ed7588582240cfe7cc9e770b50eb/exceptiongroup-1.3.0.tar.gz", hash = "sha256:b241f5885f560bc56a59ee63ca4c6a8bfa46ae4ad651af316d4e81817bb9fd88", size = 29749, upload-time = "2025-05-10T17:42:51.123Z" } sdist = { url = "https://files.pythonhosted.org/packages/0b/9f/a65090624ecf468cdca03533906e7c69ed7588582240cfe7cc9e770b50eb/exceptiongroup-1.3.0.tar.gz", hash = "sha256:b241f5885f560bc56a59ee63ca4c6a8bfa46ae4ad651af316d4e81817bb9fd88", size = 29749, upload-time = "2025-05-10T17:42:51.123Z" }
wheels = [ wheels = [