Compare commits

..

1 Commits

Author SHA1 Message Date
Jedd Morgan c6fc5c6bd4 deprecate file upload api 2026-01-13 13:10:49 +00:00
20 changed files with 145 additions and 426 deletions
-4
View File
@@ -13,14 +13,12 @@ jobs:
name: Test (internal)
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version:
- "3.10"
- "3.11"
- "3.12"
- "3.13"
- "3.14"
steps:
- uses: actions/checkout@v4
@@ -64,14 +62,12 @@ jobs:
env:
IS_PUBLIC: "true"
strategy:
fail-fast: false
matrix:
python-version:
- "3.10"
- "3.11"
- "3.12"
- "3.13"
- "3.14"
steps:
- uses: actions/checkout@v4
+9 -9
View File
@@ -493,29 +493,29 @@ class AutomationContext:
Args:
level: Result level.
category (str): A short tag for the event type.
affected_objects (Union[Base, List[Base]]): A single object, a list of
objects, or an empty list. When empty, a result case is still
appended with no object IDs (e.g. for skipped rules or version-level
messages).
affected_objects (Union[Base, List[Base]]): A single object or a list of
objects that are causing the info case.
message (Optional[str]): Optional message.
metadata: User provided metadata key value pairs
visual_overrides: Case specific 3D visual overrides.
"""
if isinstance(affected_objects, list):
if len(affected_objects) < 1:
raise ValueError(
f"Need atleast one object to report a(n) {level.value.upper()}"
)
object_list = affected_objects
else:
object_list = [affected_objects]
ids: Dict[str, Optional[str]] = {}
# When objects are provided, each must have an id (empty list allowed for
# version-level/skipped results).
for o in object_list:
if not getattr(o, "id", None):
# validate that the Base.id is not None. If its a None, throw an Exception
if not o.id:
raise Exception(
f"You can only attach {level} results to objects with an id."
)
ids[o.id] = getattr(o, "applicationId", None)
ids[o.id] = o.applicationId
print(
f"Created new {level.value.upper()}"
f" category: {category} caused by: {message}"
@@ -1,5 +1,6 @@
from pathlib import Path
from deprecated import deprecated
from typing_extensions import override
from specklepy.core.api.inputs import (
@@ -10,7 +11,10 @@ from specklepy.core.api.inputs import (
from specklepy.core.api.models import FileImport, FileUploadUrl
from specklepy.core.api.models.current import ResourceCollection
from specklepy.core.api.resources import FileImportResource as CoreResource
from specklepy.core.api.resources.current.file_import_resource import UploadFileResponse
from specklepy.core.api.resources.current.file_import_resource import (
FILE_UPLOAD_DEPRECATION_WARNING,
UploadFileResponse,
)
from specklepy.logging import metrics
@@ -25,11 +29,6 @@ class FileImportResource(CoreResource):
server_version=server_version,
)
@override
def start_file_import(self, input: StartFileImportInput) -> FileImport:
metrics.track(metrics.SDK, self.account, {"name": "File Import Start"})
return super().start_file_import(input)
@override
def generate_upload_url(self, input: GenerateFileUploadUrlInput) -> FileUploadUrl:
"""
@@ -61,6 +60,13 @@ class FileImportResource(CoreResource):
metrics.track(metrics.SDK, self.account, {"name": "File Import Download File"})
return super().download_file(project_id, file_id, target_file)
@deprecated(**FILE_UPLOAD_DEPRECATION_WARNING)
@override
def start_file_import(self, input: StartFileImportInput) -> FileImport:
metrics.track(metrics.SDK, self.account, {"name": "File Import Start"})
return super().start_file_import(input)
@deprecated(**FILE_UPLOAD_DEPRECATION_WARNING)
@override
def finish_file_import_job(self, input: FinishFileImportInput) -> bool:
"""
@@ -72,6 +78,7 @@ class FileImportResource(CoreResource):
metrics.track(metrics.SDK, self.account, {"name": "File Import Finish Job"})
return super().finish_file_import_job(input)
@deprecated(**FILE_UPLOAD_DEPRECATION_WARNING)
@override
def get_model_file_import_jobs(
self,
@@ -8,10 +8,6 @@ from specklepy.core.api.inputs.model_inputs import (
)
from specklepy.core.api.inputs.project_inputs import ProjectModelsFilter
from specklepy.core.api.models import Model, ModelWithVersions, ResourceCollection
from specklepy.core.api.models.current import (
ModelPermissionChecks,
PermissionCheckResult,
)
from specklepy.core.api.resources import ModelResource as CoreResource
from specklepy.logging import metrics
@@ -76,17 +72,3 @@ class ModelResource(CoreResource):
def update(self, input: UpdateModelInput) -> Model:
metrics.track(metrics.SDK, self.account, {"name": "Model Update"})
return super().update(input)
def get_permissions(self, model_id: str, project_id: str) -> ModelPermissionChecks:
metrics.track(metrics.SDK, self.account, {"name": "Model Get Permissions"})
return super().get_permissions(model_id, project_id)
def can_create_model_ingestion(
self, model_id: str, project_id: str
) -> PermissionCheckResult:
metrics.track(
metrics.SDK,
self.account,
{"name": "Model Get Permissions canCreateIngestion"},
)
return super().can_create_model_ingestion(model_id, project_id)
-1
View File
@@ -7,7 +7,6 @@ class ProjectVisibility(str, Enum):
PRIVATE = "PRIVATE"
PUBLIC = "PUBLIC"
UNLISTED = "UNLISTED"
"""Deprecated, use PUBLIC instead"""
WORKSPACE = "WORKSPACE"
@@ -34,8 +34,4 @@ class MarkReceivedVersionInput(GraphQLBaseModel):
version_id: str
project_id: str
source_application: str
"""
IMPORTANT: this is meant to be the slug of the application that has done the
receiving, not to be confused with `Version.sourceApplication`
"""
message: Optional[str] = None
+2 -7
View File
@@ -105,7 +105,7 @@ class PendingStreamCollaborator(GraphQLBaseModel):
project_name: str
title: str
role: str
invited_by: LimitedUser | None = None
invited_by: LimitedUser
user: LimitedUser | None = None
token: str | None
@@ -137,12 +137,6 @@ class Version(GraphQLBaseModel):
source_application: str | None
class ModelPermissionChecks(GraphQLBaseModel):
can_update: "PermissionCheckResult"
can_delete: "PermissionCheckResult"
can_create_version: "PermissionCheckResult"
class Model(GraphQLBaseModel):
author: LimitedUser | None
created_at: datetime
@@ -162,6 +156,7 @@ class ProjectPermissionChecks(GraphQLBaseModel):
can_create_model: "PermissionCheckResult"
can_delete: "PermissionCheckResult"
can_load: "PermissionCheckResult"
can_publish: "PermissionCheckResult"
class Project(GraphQLBaseModel):
@@ -2,6 +2,7 @@ from pathlib import Path
from typing import Any
import httpx
from deprecated import deprecated
from gql import Client, gql
from specklepy.core.api.credentials import Account
@@ -23,6 +24,16 @@ class UploadFileResponse(GraphQLBaseModel):
etag: str
FILE_UPLOAD_DEPRECATION_WARNING: dict[str, Any] = {
"version": "3.2.4",
"reason": (
"Part of the old API surface"
"and will be removed in the future. Use the new ingestion API instead."
"Field will be deleted on June 1st, 2026"
),
}
class FileImportResource(ResourceBase):
"""API Access class for file imports"""
@@ -41,59 +52,6 @@ class FileImportResource(ResourceBase):
name=NAME,
)
def finish_file_import_job(self, input: FinishFileImportInput) -> bool:
"""
This is mostly an internal api, that marks a file import job finished.
Only use this if you are writing a file importer, that is responsible for
processing file import jobs.
"""
QUERY = gql(
"""
mutation FinishFileImport($input: FinishFileImportInput!) {
data:fileUploadMutations {
data:finishFileImport(input: $input)
}
}
"""
)
variables = {
"input": input.model_dump(warnings="error", by_alias=True),
}
return self.make_request_and_parse_response(
DataResponse[DataResponse[bool]], QUERY, variables
).data.data
def start_file_import(self, input: StartFileImportInput) -> FileImport:
QUERY = gql(
"""
mutation StartFileImport($input: StartFileImportInput!) {
data:fileUploadMutations {
data:startFileImport(input: $input) {
id
projectId
convertedVersionId
userId
convertedStatus
convertedMessage
modelId
updatedAt
}
}
}
"""
)
variables = {
"input": input.model_dump(warnings="error", by_alias=True),
}
return self.make_request_and_parse_response(
DataResponse[DataResponse[FileImport]], QUERY, variables
).data.data
def generate_upload_url(self, input: GenerateFileUploadUrlInput) -> FileUploadUrl:
"""
Get a file upload url from the Speckle server.
@@ -161,6 +119,62 @@ class FileImportResource(ResourceBase):
_ = f.write(chunk)
return target_file
@deprecated(**FILE_UPLOAD_DEPRECATION_WARNING)
def start_file_import(self, input: StartFileImportInput) -> FileImport:
QUERY = gql(
"""
mutation StartFileImport($input: StartFileImportInput!) {
data:fileUploadMutations {
data:startFileImport(input: $input) {
id
projectId
convertedVersionId
userId
convertedStatus
convertedMessage
modelId
updatedAt
}
}
}
"""
)
variables = {
"input": input.model_dump(warnings="error", by_alias=True),
}
return self.make_request_and_parse_response(
DataResponse[DataResponse[FileImport]], QUERY, variables
).data.data
@deprecated(**FILE_UPLOAD_DEPRECATION_WARNING)
def finish_file_import_job(self, input: FinishFileImportInput) -> bool:
"""
This is mostly an internal api, that marks a file import job finished.
Only use this if you are writing a file importer, that is responsible for
processing file import jobs.
"""
QUERY = gql(
"""
mutation FinishFileImport($input: FinishFileImportInput!) {
data:fileUploadMutations {
data:finishFileImport(input: $input)
}
}
"""
)
variables = {
"input": input.model_dump(warnings="error", by_alias=True),
}
return self.make_request_and_parse_response(
DataResponse[DataResponse[bool]], QUERY, variables
).data.data
@deprecated(**FILE_UPLOAD_DEPRECATION_WARNING)
def get_model_file_import_jobs(
self,
*,
@@ -10,10 +10,6 @@ from specklepy.core.api.inputs.model_inputs import (
)
from specklepy.core.api.inputs.project_inputs import ProjectModelsFilter
from specklepy.core.api.models import Model, ModelWithVersions, ResourceCollection
from specklepy.core.api.models.current import (
ModelPermissionChecks,
PermissionCheckResult,
)
from specklepy.core.api.resource import ResourceBase
from specklepy.core.api.responses import DataResponse
@@ -303,71 +299,3 @@ class ModelResource(ResourceBase):
return self.make_request_and_parse_response(
DataResponse[DataResponse[Model]], QUERY, variables
).data.data
def get_permissions(self, project_id: str, model_id: str) -> ModelPermissionChecks:
QUERY = gql(
"""
query ModelPermissions($projectId: String!, $modelId: String!) {
data:project(id: $projectId) {
data:model(id: $modelId) {
data:permissions {
canUpdate {
authorized
code
message
}
canDelete {
authorized
code
message
}
canCreateVersion {
authorized
code
message
}
}
}
}
}
"""
)
variables = {"projectId": project_id, "modelId": model_id}
return self.make_request_and_parse_response(
DataResponse[DataResponse[DataResponse[ModelPermissionChecks]]],
QUERY,
variables,
).data.data.data
def can_create_model_ingestion(
self, project_id: str, model_id: str
) -> PermissionCheckResult:
QUERY = gql(
"""
query ModelPermissions($projectId: String!, $modelId: String!) {
data:project(id: $projectId) {
data:model(id: $modelId) {
data:permissions {
data:canCreateIngestion {
authorized
code
message
}
}
}
}
}
"""
)
variables = {"projectId": project_id, "modelId": model_id}
return self.make_request_and_parse_response(
DataResponse[
DataResponse[DataResponse[DataResponse[PermissionCheckResult]]]
],
QUERY,
variables,
).data.data.data.data
+30 -31
View File
@@ -1,7 +1,7 @@
import contextlib
from dataclasses import dataclass, field
from enum import Enum
from inspect import isclass
from types import UnionType
from typing import (
Any,
ClassVar,
@@ -13,13 +13,11 @@ from typing import (
Tuple,
Type,
Union,
get_origin,
get_type_hints,
)
from warnings import warn
from pydantic.alias_generators import to_pascal
from typing_extensions import get_args
from specklepy.logging.exceptions import SpeckleException
from specklepy.transports.memory import MemoryTransport
@@ -222,28 +220,32 @@ def _validate_type(t: Optional[type], value: Any) -> Tuple[bool, Any]:
if value in t._value2member_map_:
return True, t(value)
if isinstance(t, ForwardRef):
return True, value
if getattr(t, "__module__", None) == "typing":
if isinstance(t, ForwardRef):
return True, value
if getattr(t, "__module__", None) in ["typing", "types"]:
origin = get_origin(t)
args = get_args(t)
origin = t.__origin__
# below is what in nicer for >= py38
# origin = get_origin(t)
# recursive validation for Unions on both types preferring the fist type
if origin is Union or isinstance(t, UnionType):
if origin is Union:
# below is what in nicer for >= py38
# t_1, t_2 = get_args(t)
args = t.__args__ # type: ignore
for arg_t in args:
ok, v = _validate_type(arg_t, value)
if ok:
return True, v
t_success, t_value = _validate_type(arg_t, value)
if t_success:
return True, t_value
return False, value
if origin is dict:
if not isinstance(value, dict):
return False, value
if not value:
if value == {}:
return True, value
if not args:
if not getattr(t, "__args__", None):
return True, value
t_key, t_value = args
t_key, t_value = t.__args__ # type: ignore
if (
getattr(t_key, "__name__", None),
@@ -263,11 +265,11 @@ def _validate_type(t: Optional[type], value: Any) -> Tuple[bool, Any]:
if origin is list:
if not isinstance(value, list):
return False, value
if not value:
if value == []:
return True, value
if not args:
if not hasattr(t, "__args__"):
return True, value
t_items = args[0]
t_items = t.__args__[0] # type: ignore
if getattr(t_items, "__name__", None) == "T":
return True, value
first_item_valid, _ = _validate_type(t_items, value[0])
@@ -278,10 +280,10 @@ def _validate_type(t: Optional[type], value: Any) -> Tuple[bool, Any]:
if origin is tuple:
if not isinstance(value, tuple):
return False, value
if not args:
if not hasattr(t, "__args__"):
return True, value
args = t.__args__ # type: ignore
if not args:
if args == tuple():
return True, value
# we're not checking for empty tuple, cause tuple lengths must match
if len(args) != len(value):
@@ -297,7 +299,7 @@ def _validate_type(t: Optional[type], value: Any) -> Tuple[bool, Any]:
if origin is set:
if not isinstance(value, set):
return False, value
if not args:
if not hasattr(t, "__args__"):
return True, value
t_items = t.__args__[0] # type: ignore
first_item_valid, _ = _validate_type(t_items, next(iter(value)))
@@ -308,16 +310,13 @@ def _validate_type(t: Optional[type], value: Any) -> Tuple[bool, Any]:
if isinstance(value, t):
return True, value
if t is float and type(value) is int:
return True, float(value)
# with contextlib.suppress(ValueError, TypeError):
# if t is float and value is not None:
# return True, float(value)
# # TODO: dafuq, i had to add this not list check
# # but it would also fail for objects and other complex values
# if t is str and value and not isinstance(value, list):
# return True, str(value)
with contextlib.suppress(ValueError, TypeError):
if t is float and value is not None:
return True, float(value)
# TODO: dafuq, i had to add this not list check
# but it would also fail for objects and other complex values
if t is str and value and not isinstance(value, list):
return True, str(value)
return False, value
@@ -50,10 +50,12 @@ class TestActiveUserResourcePermissions:
assert hasattr(permissions, "can_create_model")
assert hasattr(permissions, "can_delete")
assert hasattr(permissions, "can_load")
assert hasattr(permissions, "can_publish")
assert permissions.can_create_model.authorized is True
assert permissions.can_delete.authorized is True
assert permissions.can_load.authorized is True
assert permissions.can_publish.authorized is True
def test_active_user_get_projects_with_permissions_with_filter(
self, client: SpeckleClient, test_project: Project
@@ -264,7 +264,6 @@ class TestIngestionResource:
with pytest.raises(GraphQLException):
_ = client.model_ingestion.fail_with_error(input)
@pytest.mark.skip(reason="TEST FAILS - server behaviour was changed")
def test_complete_non_existent_root_object(
self, client: SpeckleClient, ingestion: ModelIngestion, project: Project
):
@@ -1,7 +1,6 @@
import pytest
from specklepy.api.client import SpeckleClient
from specklepy.core.api.enums import ProjectVisibility
from specklepy.core.api.inputs.model_inputs import (
CreateModelInput,
DeleteModelInput,
@@ -13,14 +12,11 @@ from specklepy.core.api.inputs.project_inputs import (
)
from specklepy.core.api.models.current import (
Model,
ModelPermissionChecks,
PermissionCheckResult,
Project,
ProjectWithModels,
ResourceCollection,
)
from specklepy.logging.exceptions import GraphQLException
from tests.integration.conftest import is_internal, is_public
@pytest.mark.run()
@@ -28,9 +24,7 @@ class TestModelResource:
@pytest.fixture()
def test_project(self, client: SpeckleClient) -> Project:
project = client.project.create(
ProjectCreateInput(
name="Test project", description="", visibility=ProjectVisibility.PUBLIC
)
ProjectCreateInput(name="Test project", description="", visibility=None)
)
return project
@@ -155,52 +149,3 @@ class TestModelResource:
with pytest.raises(GraphQLException):
client.model.delete(delete_data)
def test_model_get_permissions(
self,
client: SpeckleClient,
second_client: SpeckleClient,
test_project: Project,
test_model: Model,
):
result = client.model.get_permissions(test_project.id, test_model.id)
assert isinstance(result, ModelPermissionChecks)
assert result.can_update.authorized is True
assert result.can_create_version.authorized is True
assert result.can_delete.authorized is True
guest = second_client.model.get_permissions(test_project.id, test_model.id)
assert isinstance(guest, ModelPermissionChecks)
assert guest.can_update.authorized is False
assert guest.can_create_version.authorized is False
assert guest.can_delete.authorized is False
@pytest.mark.skipif(
is_public(), reason="API only available on server versions 3.0.11 or greater"
)
def test_can_create_model_ingestion_internal_server(
self,
client: SpeckleClient,
test_project: Project,
test_model: Model,
):
result = client.model.can_create_model_ingestion(test_project.id, test_model.id)
assert isinstance(result, PermissionCheckResult)
assert result.authorized is True
@pytest.mark.skipif(
is_internal(),
reason="API only available on server versions 3.0.11 or greater",
)
def test_can_create_model_ingestion_public_server(
self,
client: SpeckleClient,
test_project: Project,
test_model: Model,
):
with pytest.raises(GraphQLException) as ex:
_ = client.model.can_create_model_ingestion(test_project.id, test_model.id)
assert "GRAPHQL_VALIDATION_FAILED" in str(ex.value)
@@ -24,17 +24,6 @@ class TestProjectResource:
)
return project
@pytest.fixture()
def test_public_project(self, client: SpeckleClient) -> Project:
project = client.project.create(
ProjectCreateInput(
name="test project123",
description="desc",
visibility=ProjectVisibility.PUBLIC,
)
)
return project
@pytest.mark.parametrize(
"name, description, visibility",
[
@@ -61,7 +50,7 @@ class TestProjectResource:
assert result.id is not None
assert result.name == name
assert result.description == (description or "")
# we've disabled creation of unlisted projects for now, they fall back to public
# we've disabled creation of public projects for now, they fall back to unlisted
if visibility == ProjectVisibility.UNLISTED:
assert result.visibility == ProjectVisibility.PUBLIC
else:
@@ -78,32 +67,13 @@ class TestProjectResource:
assert result.created_at == test_project.created_at
def test_project_get_permissions(
self,
client: SpeckleClient,
second_client: SpeckleClient,
test_project: Project,
test_public_project: Project,
self, client: SpeckleClient, test_project: Project
):
result_private = client.project.get_permissions(test_project.id)
assert isinstance(result_private, ProjectPermissionChecks)
assert result_private.can_create_model.authorized is True
assert result_private.can_delete.authorized is True
assert result_private.can_load.authorized is True
result = client.project.get_permissions(test_public_project.id)
result = client.project.get_permissions(test_project.id)
assert isinstance(result, ProjectPermissionChecks)
assert result.can_create_model.authorized is True
assert result.can_delete.authorized is True
assert result.can_load.authorized is True
guest = second_client.project.get_permissions(test_public_project.id)
assert isinstance(result, ProjectPermissionChecks)
assert guest.can_create_model.authorized is False
assert guest.can_delete.authorized is False
assert guest.can_load.authorized is False
def test_project_update(self, client: SpeckleClient, test_project: Project):
new_name = "MY new name"
-4
View File
@@ -34,10 +34,6 @@ def is_public() -> bool:
return os.getenv("IS_PUBLIC", "false").lower() == "true"
def is_internal() -> bool:
return not is_public()
def seed_user(host: str) -> Dict[str, str]:
seed = uuid.uuid4().hex
user_dict = {
+9 -9
View File
@@ -157,10 +157,10 @@ def test_parse_project():
def test_parse_model():
wrap = StreamWrapper(
"https://app.speckle.systems/projects/8be1007be1/models/cc7578012d"
"https://latest.speckle.systems/projects/843d07eb10/models/d9eb4918c8"
)
assert wrap.branch_name == "speckle tower revit 2025"
assert wrap.branch_name == "building wrapper"
assert wrap.type == "branch"
@@ -191,10 +191,10 @@ def test_parse_object_fe2():
def test_parse_version():
wrap = StreamWrapper(
"https://app.speckle.systems/projects/8be1007be1/models/cc7578012d@7199443eff"
"https://latest.speckle.systems/projects/843d07eb10/models/4e7345c838@c42d5cbac1"
)
wrap_quoted = StreamWrapper(
"https://app.speckle.systems/projects/8be1007be1/models/cc7578012d%407199443eff"
"https://latest.speckle.systems/projects/843d07eb10/models/4e7345c838%40c42d5cbac1"
)
assert wrap.type == "commit"
assert wrap_quoted.type == "commit"
@@ -208,11 +208,11 @@ def test_to_string():
"https://testing.speckle.dev/streams/0c6ad366c4/globals/abd3787893",
"https://testing.speckle.dev/streams/4c3ce1459c/commits/8b9b831792",
"https://testing.speckle.dev/streams/a75ab4f10f/objects/5530363e6d51c904903dafc3ea1d2ec6",
"https://app.speckle.systems/projects/8be1007be1",
"https://app.speckle.systems/projects/8be1007be1/models/cc7578012d",
"https://app.speckle.systems/projects/8be1007be1/models/cc7578012d@7199443eff",
"https://app.speckle.systems/projects/8be1007be1/models/cc7578012d%407199443eff",
"https://app.speckle.systems/projects/8be1007be1/models/9b5e57dca804a923a8d42d55dcc0191a",
"https://latest.speckle.systems/projects/843d07eb10",
"https://latest.speckle.systems/projects/843d07eb10/models/4e7345c838",
"https://latest.speckle.systems/projects/843d07eb10/models/4e7345c838@c42d5cbac1",
"https://latest.speckle.systems/projects/843d07eb10/models/4e7345c838%40c42d5cbac1",
"https://latest.speckle.systems/projects/24c3741255/models/b48d1b10f5a732f4ca4144286391282c",
]
for url in urls:
wrap = StreamWrapper(url)
+2 -2
View File
@@ -35,7 +35,7 @@ def sample_text_all_properties(sample_point: Point, sample_plane: Plane) -> Text
alignmentH=AlignmentHorizontal.Center,
alignmentV=AlignmentVertical.Center,
plane=sample_plane,
maxWidth=20.0,
maxWidth=20,
units=Units.m,
)
@@ -56,7 +56,7 @@ def test_text_creation_minimal(sample_point: Point):
def test_text_creation_extended(sample_point: Point, sample_plane: Plane):
text_value = "text"
max_width = 20.0
max_width = 20
text_obj = Text(
value=text_value,
@@ -1,110 +0,0 @@
"""Unit tests for AutomationContext.attach_result_to_objects contract."""
from unittest.mock import MagicMock
import pytest
from speckle_automate import AutomationContext
from speckle_automate.schema import (
AutomationRunData,
ObjectResultLevel,
VersionCreationTrigger,
VersionCreationTriggerPayload,
)
from specklepy.objects.base import Base
def _minimal_automation_context() -> AutomationContext:
run_data = AutomationRunData(
project_id="p",
speckle_server_url="http://localhost",
automation_id="a",
automation_run_id="r",
function_run_id="f",
triggers=[
VersionCreationTrigger(
trigger_type="versionCreation",
payload=VersionCreationTriggerPayload(model_id="m", version_id="v"),
)
],
)
return AutomationContext(
automation_run_data=run_data,
speckle_client=MagicMock(),
_server_transport=MagicMock(),
_speckle_token="",
)
def test_attach_result_to_objects_accepts_empty_list() -> None:
"""Empty affected_objects appends one result case with no object IDs."""
ctx = _minimal_automation_context()
assert len(ctx._automation_result.object_results) == 0
ctx.attach_result_to_objects(
ObjectResultLevel.WARNING,
"SkippedRule",
[],
message="No elements to check.",
)
assert len(ctx._automation_result.object_results) == 1
case = ctx._automation_result.object_results[0]
assert case.level == ObjectResultLevel.WARNING
assert case.category == "SkippedRule"
assert case.object_app_ids == {}
assert case.message == "No elements to check."
def test_attach_result_to_objects_with_objects_appends_case_with_ids() -> None:
"""Single or multiple objects with id produce result case with object_app_ids."""
ctx = _minimal_automation_context()
obj1 = Base()
obj1.id = "id-one"
obj1.applicationId = "app-one"
obj2 = Base()
obj2.id = "id-two"
ctx.attach_result_to_objects(
ObjectResultLevel.ERROR,
"BadType",
[obj1, obj2],
message="Invalid type.",
)
assert len(ctx._automation_result.object_results) == 1
case = ctx._automation_result.object_results[0]
assert case.level == ObjectResultLevel.ERROR
assert case.category == "BadType"
assert case.object_app_ids == {"id-one": "app-one", "id-two": None}
assert case.message == "Invalid type."
def test_attach_result_to_objects_raises_when_object_has_no_id() -> None:
"""At least one object without id raises."""
ctx = _minimal_automation_context()
obj = Base()
obj.id = None
with pytest.raises(Exception, match="results to objects with an id"):
ctx.attach_result_to_objects(
ObjectResultLevel.ERROR,
"Bad",
obj,
message="No id.",
)
assert len(ctx._automation_result.object_results) == 0
def test_attach_info_to_objects_accepts_empty_list() -> None:
"""attach_info_to_objects (convenience method) also accepts empty list."""
ctx = _minimal_automation_context()
ctx.attach_info_to_objects("VersionLevel", [], message="No levels in model.")
assert len(ctx._automation_result.object_results) == 1
case = ctx._automation_result.object_results[0]
assert case.level == ObjectResultLevel.INFO
assert case.category == "VersionLevel"
assert case.object_app_ids == {}
+1 -1
View File
@@ -143,7 +143,7 @@ def test_type_checking() -> None:
order = FrozenYoghurt()
order.servings = 2
order.price = 7
order.price = "7" # type: ignore - it will get converted
order.customer = "izzy"
order.dietary = DietaryRestrictions.VEGAN
order.tag = "preorder"
+6 -5
View File
@@ -31,13 +31,13 @@ fake_bases = [FakeBase("foo"), FakeBase("bar")]
@pytest.mark.parametrize(
"input_type, value, is_valid, return_value",
[
(str, 10, False, 10),
(str, 10, True, "10"),
(str, "foo_bar", True, "foo_bar"),
(
str,
{"foo": "bar"},
False,
{"foo": "bar"},
True,
"{'foo': 'bar'}",
),
(float, 1, True, 1),
# why are we allowing this??? We're lying to our users and ourselves too.
@@ -85,8 +85,9 @@ fake_bases = [FakeBase("foo"), FakeBase("bar")]
(Dict[int, Base], {1: test_base}, True, {1: test_base}),
(Tuple[int, str, str], (1, "foo", "bar"), True, (1, "foo", "bar")),
(Tuple, (1, "foo", "bar"), True, (1, "foo", "bar")),
(Tuple[str, str, str], (1, "foo", "bar"), False, (1, "foo", "bar")),
(Tuple[str, Optional[str], str], (1, None, "bar"), False, (1, None, "bar")),
# given our current rules, this is the reality. Its just sad...
(Tuple[str, str, str], (1, "foo", "bar"), True, ("1", "foo", "bar")),
(Tuple[str, Optional[str], str], (1, None, "bar"), True, ("1", None, "bar")),
(Set[bool], set([1, 2]), False, set([1, 2])),
(Set[int], set([1, 2]), True, set([1, 2])),
(Set[int], set([None, 2]), True, set([None, 2])),