Compare commits

...

5 Commits

Author SHA1 Message Date
Gergő Jedlicska 309c78da37 feat(automate_sdk): support version result reporting (#468)
Publish Python Package / test (push) Has been cancelled
Publish Python Package / Build and Publish Python Package (push) Has been cancelled
* feat(automate_sdk): support version result reporting

* fix(automate_tests): skip test, its not working in CI

* docs(automate_sdk): describe new args
2025-11-05 21:50:40 +01:00
Jedd Morgan ff812d5ad9 add 520 status to retry policy (#467) 2025-11-05 10:49:36 +00:00
Jedd Morgan 8edc0d5d78 feat(ifc)!: Display Value proxies for IFC importer (#466)
Publish Python Package / test (push) Has been cancelled
Publish Python Package / Build and Publish Python Package (push) Has been cancelled
* First pass

* Add applicationId for proxyInstance

* renamed revit instances

* renamed collection again

* again

* reverted main changes for manual testing

* small refactor of function def

* format matix
2025-10-31 13:54:46 +00:00
Jedd Morgan 78b3e99475 Remove unused server envars (#464) 2025-10-28 16:49:32 +00:00
Jedd Morgan ac9e081d49 feat(metrics): Introduce sync metrics (#462)
Publish Python Package / test (push) Has been cancelled
Publish Python Package / Build and Publish Python Package (push) Has been cancelled
* sync metrics tracking

* fix
2025-10-27 13:40:40 +01:00
17 changed files with 271 additions and 133 deletions
-1
View File
@@ -96,7 +96,6 @@ services:
SESSION_SECRET: "TODO:ReplaceWithLongString"
STRATEGY_LOCAL: "true"
DEBUG: "speckle:*"
POSTGRES_URL: "postgres"
POSTGRES_USER: "speckle"
-1
View File
@@ -96,7 +96,6 @@ services:
SESSION_SECRET: "TODO:ReplaceWithLongString"
STRATEGY_LOCAL: "true"
DEBUG: "speckle:*"
POSTGRES_URL: "postgres"
POSTGRES_USER: "speckle"
+39 -15
View File
@@ -245,24 +245,24 @@ class AutomationContext:
"""
)
if self.run_status in [AutomationStatus.SUCCEEDED, AutomationStatus.FAILED]:
object_results = {
"version": 2,
results_dict = self._automation_result.model_dump(by_alias=True)
results = {
"version": 3,
"values": {
"objectResults": self._automation_result.model_dump(by_alias=True)[
"objectResults"
],
"objectResults": results_dict["objectResults"],
"versionResult": results_dict["versionResult"],
"blobIds": self._automation_result.blobs,
},
}
else:
object_results = None
results = None
params = {
"projectId": self.automation_run_data.project_id,
"functionRunId": self.automation_run_data.function_run_id,
"status": self.run_status.value,
"statusMessage": self._automation_result.status_message,
"results": object_results,
"results": results,
"contextView": self._automation_result.result_view,
}
print(f"Reporting run status with content: {params}")
@@ -312,25 +312,49 @@ class AutomationContext:
return upload_response.upload_results[0].blob_id
def mark_run_failed(self, status_message: str) -> None:
"""Mark the current run a failure."""
self._mark_run(AutomationStatus.FAILED, status_message)
def mark_run_failed(
self, status_message: str, version_result: dict[str, Any] | None = None
) -> None:
"""
Mark the current run a failure.
Args:
status_message: Optional message to be displayed.
version_result: Optional data object,
that will be attached to the run results.
The dictionary should be JSON serializable
"""
self._mark_run(AutomationStatus.FAILED, status_message, version_result)
def mark_run_exception(self, status_message: str) -> None:
"""Mark the current run a failure."""
self._mark_run(AutomationStatus.EXCEPTION, status_message)
self._mark_run(AutomationStatus.EXCEPTION, status_message, None)
def mark_run_success(self, status_message: Optional[str]) -> None:
"""Mark the current run a success with an optional message."""
self._mark_run(AutomationStatus.SUCCEEDED, status_message)
def mark_run_success(
self, status_message: str | None, version_result: dict[str, Any] | None = None
) -> None:
"""
Mark the current run a success with an optional message.
Args:
status_message: Optional message to be displayed.
version_result: Optional data object,
that will be attached to the run results.
The dictionary should be JSON serializable
"""
self._mark_run(AutomationStatus.SUCCEEDED, status_message, version_result)
def _mark_run(
self, status: AutomationStatus, status_message: Optional[str]
self,
status: AutomationStatus,
status_message: str | None,
version_result: dict[str, Any] | None,
) -> None:
duration = self.elapsed()
self._automation_result.status_message = status_message
self._automation_result.run_status = status
self._automation_result.elapsed = duration
self._automation_result.version_result = version_result
msg = f"Automation run {status.value} after {duration:.2f} seconds."
print("\n".join([msg, status_message]) if status_message else msg)
+5 -7
View File
@@ -88,10 +88,8 @@ def create_test_automation_run(
print(result)
return (
result.get("projectMutations")
.get("automationMutations")
.get("createTestAutomationRun")
return TestAutomationRunData.model_validate(
result["projectMutations"]["automationMutations"]["createTestAutomationRun"]
)
@@ -123,9 +121,9 @@ def create_test_automation_run_data(
project_id=test_automation_environment.project_id,
speckle_server_url=test_automation_environment.server_url,
automation_id=test_automation_environment.automation_id,
automation_run_id=test_automation_run_data["automationRunId"],
function_run_id=test_automation_run_data["functionRunId"],
triggers=test_automation_run_data["triggers"],
automation_run_id=test_automation_run_data.automation_run_id,
function_run_id=test_automation_run_data.function_run_id,
triggers=test_automation_run_data.triggers,
)
+12 -11
View File
@@ -1,7 +1,7 @@
""""""
from enum import Enum
from typing import Any, Dict, List, Literal, Optional
from typing import Any, Literal
from pydantic import BaseModel, ConfigDict, Field
from pydantic.alias_generators import to_camel
@@ -36,7 +36,7 @@ class AutomationRunData(BaseModel):
automation_run_id: str
function_run_id: str
triggers: List[VersionCreationTrigger]
triggers: list[VersionCreationTrigger]
model_config = ConfigDict(
alias_generator=to_camel, populate_by_name=True, protected_namespaces=()
@@ -49,7 +49,7 @@ class TestAutomationRunData(BaseModel):
automation_run_id: str
function_run_id: str
triggers: List[VersionCreationTrigger]
triggers: list[VersionCreationTrigger]
model_config = ConfigDict(
alias_generator=to_camel, populate_by_name=True, protected_namespaces=()
@@ -80,19 +80,20 @@ class ResultCase(AutomateBase):
category: str
level: ObjectResultLevel
object_app_ids: Dict[str, Optional[str]]
message: Optional[str]
metadata: Optional[Dict[str, Any]]
visual_overrides: Optional[Dict[str, Any]]
object_app_ids: dict[str, str | None]
message: str | None
metadata: dict[str, Any] | None
visual_overrides: dict[str, Any] | None
class AutomationResult(AutomateBase):
"""Schema accepted by the Speckle server as a result for an automation run."""
elapsed: float = 0
result_view: Optional[str] = None
result_versions: List[str] = Field(default_factory=list)
blobs: List[str] = Field(default_factory=list)
result_view: str | None = None
result_versions: list[str] = Field(default_factory=list)
blobs: list[str] = Field(default_factory=list)
run_status: AutomationStatus = AutomationStatus.RUNNING
status_message: Optional[str] = None
status_message: str | None = None
object_results: list[ResultCase] = Field(default_factory=list)
version_result: dict[str, Any] | None = None
@@ -4,21 +4,21 @@ from typing import cast
from ifcopenshell.ifcopenshell_wrapper import (
Triangulation,
TriangulationElement,
colour,
style,
)
from speckleifc.render_material_proxy_manager import RenderMaterialProxyManager
from speckleifc.proxy_managers.render_material_proxy_manager import (
RenderMaterialProxyManager,
)
from specklepy.objects.base import Base
from specklepy.objects.geometry import Mesh
from specklepy.objects.other import RenderMaterial
def geometry_to_speckle(
shape: TriangulationElement, render_material_manager: RenderMaterialProxyManager
geometry: Triangulation, render_material_manager: RenderMaterialProxyManager
) -> list[Base]:
geometry = cast(Triangulation, shape.geometry)
materials = cast(Sequence[style], geometry.materials)
MESH_COUNT = max(len(materials), 1)
@@ -33,7 +33,7 @@ def geometry_to_speckle(
# Not really expected, but occasionally some meshes fail to triangulate
return []
mapped_meshes = _pre_alloc_mesh_lists(shape, material_ids, MESH_COUNT)
mapped_meshes = _pre_alloc_mesh_lists(geometry, material_ids, MESH_COUNT)
for i, mesh in enumerate(mapped_meshes):
material = _material_to_speckle(materials[i])
render_material_manager.add_mesh_material_mapping(material, mesh)
@@ -103,14 +103,14 @@ def _color_to_argb(colour: colour) -> int:
def _pre_alloc_mesh_lists(
shape: TriangulationElement, material_ids: Sequence[int], MESH_COUNT: int
geometry: Triangulation, material_ids: Sequence[int], MESH_COUNT: int
) -> list[Mesh]:
"""
This is a performance optimisation to pre-size the lists
since we're expecting potential hundreds of thousands of verts in a single model
This is very much in the hot path, so worth the extra bit of convoluted logic
"""
appId = cast(str, shape.guid)
appId = cast(str, geometry.id)
material_face_counts = defaultdict(int)
for mat_id in material_ids:
+4 -2
View File
@@ -12,8 +12,10 @@ def _create_iterator_settings() -> settings:
ifc_settings.set("triangulation-type", ifcopenshell_wrapper.TRIANGLE_MESH)
# no need to weld verts
ifc_settings.set("weld-vertices", False)
# Speckle meshes are all in world coords
ifc_settings.set("use-world-coords", True)
#
ifc_settings.set("use-world-coords", False)
ifc_settings.set("permissive-shape-reuse", True)
# Tiny performance improvement,
ifc_settings.set("no-wire-intersection-check", True)
# Rendermaterials inherit the material names instead of type + unique id
+63 -12
View File
@@ -1,10 +1,10 @@
import time
from dataclasses import dataclass, field
from typing import cast
from typing import List, cast
from ifcopenshell.entity_instance import entity_instance
from ifcopenshell.geom import file
from ifcopenshell.ifcopenshell_wrapper import TriangulationElement
from ifcopenshell.ifcopenshell_wrapper import Triangulation, TriangulationElement
from speckleifc.converter.data_object_converter import data_object_to_speckle
from speckleifc.converter.geometry_converter import geometry_to_speckle
@@ -12,27 +12,38 @@ from speckleifc.converter.project_converter import project_to_speckle
from speckleifc.converter.spatial_element_converter import spatial_element_to_speckle
from speckleifc.ifc_geometry_processing import create_geometry_iterator
from speckleifc.ifc_openshell_helpers import get_children
from speckleifc.level_proxy_manager import LevelProxyManager
from speckleifc.render_material_proxy_manager import RenderMaterialProxyManager
from speckleifc.proxy_managers.instance_proxy_manager import InstanceProxyManager
from speckleifc.proxy_managers.level_proxy_manager import LevelProxyManager
from speckleifc.proxy_managers.render_material_proxy_manager import (
RenderMaterialProxyManager,
)
from specklepy.logging.exceptions import SpeckleException
from specklepy.objects import Base
from specklepy.objects.data_objects import DataObject
from specklepy.objects.models.collections.collection import Collection
from specklepy.objects.proxies import InstanceProxy
@dataclass
class ImportJob:
ifc_file: file
cached_display_values: dict[int, list[Base]] = field(default_factory=dict) # noqa: F821
_render_material_manager: RenderMaterialProxyManager = field(
default_factory=lambda: RenderMaterialProxyManager()
)
_level_proxy_manager: LevelProxyManager = field(
default_factory=lambda: LevelProxyManager()
)
_instance_proxy_manager: InstanceProxyManager = field(
default_factory=lambda: InstanceProxyManager()
)
geometries_count: int = 0
geometries_used: int = 0
_current_storey_data_object: DataObject | None = field(default=None, init=False)
_display_value_cache: dict[int, list[Base]] = field(default_factory=dict)
"""Maps an instance step ID to a list of instances"""
def convert_element(self, step_element: entity_instance) -> Base:
try:
return self._convert_element(step_element)
@@ -48,14 +59,14 @@ class ImportJob:
previous_storey_data_object = self._current_storey_data_object
if step_element.is_a("IfcBuildingStorey"):
# Convert the building storey to a DataObject for the level proxy
storey_display_value = self.cached_display_values.get(step_element.id(), [])
storey_display_value = self._display_value_cache.get(step_element.id(), [])
self._current_storey_data_object = data_object_to_speckle(
storey_display_value, step_element, []
)
children = self._convert_children(step_element)
id = step_element.id()
display_value = self.cached_display_values.get(id, [])
display_value = self._display_value_cache.get(id, [])
if display_value:
self.geometries_used += 1
@@ -127,12 +138,9 @@ class ImportJob:
shape = cast(TriangulationElement, iterator.get())
self.geometries_count += 1
id = cast(int, shape.id)
try:
display_value = geometry_to_speckle(
shape, self._render_material_manager
)
self.cached_display_values[id] = display_value
display_value = self._create_display_value(shape)
self._display_value_cache[id] = display_value
except Exception as ex:
raise SpeckleException(
f"Failed to convert geometry with id: {id}"
@@ -140,6 +148,37 @@ class ImportJob:
if not iterator.next():
break
def _create_display_value(self, shape: TriangulationElement) -> List[Base]:
geometry = cast(Triangulation, shape.geometry)
display_value_geometry = geometry_to_speckle(
geometry, self._render_material_manager
)
definition_ids = self._instance_proxy_manager.add_display_value_definitions(
display_value_geometry
)
matrix = shape.transformation.matrix
transposed = [
matrix[0], matrix[4], matrix[8], matrix[12],
matrix[1], matrix[5], matrix[9], matrix[13],
matrix[2], matrix[6], matrix[10], matrix[14],
matrix[3], matrix[7], matrix[11], matrix[15],
] # fmt: skip
return [
cast(
Base,
InstanceProxy(
units="m",
definitionId=definition_id,
transform=transposed,
maxDepth=0,
applicationId=f"{shape.guid}:{definition_id}",
),
)
for definition_id in definition_ids
]
def _convert_project_tree(self) -> Base:
projects = self.ifc_file.by_type("IfcProject", False)
if len(projects) != 1:
@@ -147,10 +186,22 @@ class ImportJob:
project = projects[0]
tree = self.convert_element(project)
if not isinstance(tree, Collection):
raise TypeError("Expected root object to convert to a Collection")
tree["renderMaterialProxies"] = list(
self._render_material_manager.render_material_proxies.values()
)
tree["levelProxies"] = list(self._level_proxy_manager.level_proxies.values())
tree["instanceDefinitionProxies"] = list(
self._instance_proxy_manager.instance_definition_proxies.values()
)
tree.elements.append(
Collection(
name="definitionGeometry",
elements=list(self._instance_proxy_manager.instance_geometry.values()),
)
)
tree["version"] = 3
return tree
+1 -1
View File
@@ -56,6 +56,6 @@ def open_and_convert_file(
custom_properties = {"ui": "dui3", "actionSource": "import"}
if project.workspace_id:
custom_properties["workspace_id"] = project.workspace_id
metrics.track(metrics.SEND, account, custom_properties)
metrics.track(metrics.SEND, account, custom_properties, send_sync=True)
return version
@@ -0,0 +1,43 @@
from typing import Sequence
from specklepy.objects.base import Base
from specklepy.objects.proxies import InstanceDefinitionProxy
class InstanceProxyManager:
def __init__(self):
self._instance_definition_proxies: dict[str, InstanceDefinitionProxy] = {}
"""definition proxies to be added directly to the root"""
self._instance_geometry: dict[str, Base] = {}
"""The geometry that will be added in it's own collection under the root"""
@property
def instance_definition_proxies(self) -> dict[str, InstanceDefinitionProxy]:
return self._instance_definition_proxies
@property
def instance_geometry(self) -> dict[str, Base]:
return self._instance_geometry
def add_display_value_definitions(self, geometry: Sequence[Base]) -> list[str]:
result: list[str] = []
for m in geometry:
if not m.applicationId:
raise ValueError("geometry with no applicationId cannot be proxied ")
definition_id = f"DEFINITION:{m.applicationId}"
result.append(definition_id)
self._add_definition(definition_id, [m.applicationId], 0)
self._instance_geometry[m.applicationId] = m
return result
def _add_definition(
self, definition_id: str, objects: list[str], max_depth: int
) -> None:
proxy = InstanceDefinitionProxy(
applicationId=definition_id,
name=definition_id,
objects=objects,
maxDepth=max_depth,
)
self._instance_definition_proxies[definition_id] = proxy
+60 -60
View File
@@ -6,6 +6,7 @@ import platform
import queue
import sys
import threading
from typing import Any
import requests
@@ -29,21 +30,6 @@ CONNECTOR = "Connector Action"
RECEIVE = "Receive"
SEND = "Send"
# not in use since 2.15
ACCOUNTS = "Get Local Accounts"
BRANCH = "Branch Action"
CLIENT = "Speckle Client"
COMMIT = "Commit Action"
DESERIALIZE = "serialization/deserialize"
INVITE = "Invite Action"
OTHER_USER = "Other User Action"
PERMISSION = "Permission Action"
SERIALIZE = "serialization/serialize"
SERVER = "Server Action"
STREAM = "Stream Action"
STREAM_WRAPPER = "Stream Wrapper"
USER = "User Action"
def disable():
global TRACK
@@ -65,43 +51,43 @@ def track(
action: str,
account: Account | None = None,
custom_props: dict | None = None,
send_sync: bool = False,
):
if not TRACK:
return
try:
initialise_tracker(account)
event_params = {
"event": action,
"properties": {
"distinct_id": METRICS_TRACKER.last_user,
"server_id": METRICS_TRACKER.last_server,
"token": METRICS_TRACKER.analytics_token,
"hostApp": HOST_APP,
"hostAppVersion": HOST_APP_VERSION,
"$os": METRICS_TRACKER.platform,
"type": "action",
},
}
if custom_props:
event_params["properties"].update(custom_props)
METRICS_TRACKER.queue.put_nowait(event_params)
except Exception as ex:
# wrapping this whole thing in a try except as we never want a failure here
# to annoy users!
LOG.debug(f"Error queueing metrics request: {str(ex)}")
tracker = initialise_tracker(account)
event_params: dict[str, Any] = {
"event": action,
"properties": {
"distinct_id": tracker.last_user,
"server_id": tracker.last_server,
"token": tracker.analytics_token,
"hostApp": HOST_APP,
"hostAppVersion": HOST_APP_VERSION,
"$os": tracker.platform,
"type": "action",
},
}
if custom_props:
event_params["properties"].update(custom_props)
if send_sync:
tracker.send_event(event_params)
else:
tracker.queue_event(event_params)
def initialise_tracker(account: Account | None = None):
def initialise_tracker(account: Account | None = None) -> "MetricsTracker":
global METRICS_TRACKER
if not METRICS_TRACKER:
METRICS_TRACKER = MetricsTracker()
if not account:
return
if account:
METRICS_TRACKER.set_last_user(account.userInfo.email)
METRICS_TRACKER.set_last_server(account.serverInfo.url)
METRICS_TRACKER.set_last_user(account.userInfo.email)
METRICS_TRACKER.set_last_server(account.serverInfo.url)
return METRICS_TRACKER
class Singleton(type):
@@ -114,48 +100,62 @@ class Singleton(type):
class MetricsTracker(metaclass=Singleton):
analytics_url = "https://analytics.speckle.systems/track?ip=1"
analytics_token = "acd87c5a50b56df91a795e999812a3a4"
last_user = ""
last_server = None
platform = None
sending_thread = None
queue = queue.Queue(1000)
analytics_url: str = "https://analytics.speckle.systems/track?ip=1"
analytics_token: str = "acd87c5a50b56df91a795e999812a3a4"
last_user: str = ""
last_server: str | None = None
platform: str
_sending_thread: threading.Thread
_queue: queue.Queue[dict[str, Any]] = queue.Queue(1000)
_session = requests.Session()
def __init__(self) -> None:
self.sending_thread = threading.Thread(
self._sending_thread = threading.Thread(
target=self._send_tracking_requests, daemon=True
)
self.platform = PLATFORMS.get(sys.platform, "linux")
self.sending_thread.start()
self._sending_thread.start()
with contextlib.suppress(Exception):
node, user = platform.node(), getpass.getuser()
if node and user:
self.last_user = f"@{self.hash(f'{node}-{user}')}"
def set_last_user(self, email: str | None):
def set_last_user(self, email: str | None) -> None:
if not email:
return
self.last_user = f"@{self.hash(email)}"
def set_last_server(self, server: str | None):
def set_last_server(self, server: str | None) -> None:
if not server:
return
self.last_server = self.hash(server)
def hash(self, value: str):
def hash(self, value: str) -> str:
inputList = value.lower().split("://")
input = inputList[len(inputList) - 1].split("/")[0].split("?")[0]
return hashlib.md5(input.encode("utf-8")).hexdigest().upper()
def _send_tracking_requests(self):
session = requests.Session()
def queue_event(self, event_params: dict[str, Any]) -> None:
try:
self._queue.put_nowait(event_params)
except queue.Full:
LOG.warning(
"Metrics event was skipped because the metrics queue was was full",
exc_info=True,
)
def send_event(self, event_params: dict[str, Any]) -> None:
response = self._session.post(self.analytics_url, json=[event_params])
response.raise_for_status()
def _send_tracking_requests(self) -> None:
while True:
event_params = [self.queue.get()]
event_params = self._queue.get()
try:
session.post(self.analytics_url, json=event_params)
except Exception as ex:
LOG.debug(f"Error sending metrics request: {str(ex)}")
self.send_event(event_params)
except Exception:
LOG.warning("Error sending metrics request", exc_info=True)
self.queue.task_done()
self._queue.task_done()
+3 -3
View File
@@ -33,9 +33,9 @@ class InstanceProxy(
IHasUnits,
speckle_type="Speckle.Core.Models.Instances.InstanceProxy",
):
definition_id: str
definitionId: str
transform: List[float]
max_depth: int
maxDepth: int
@dataclass(kw_only=True)
@@ -45,7 +45,7 @@ class InstanceDefinitionProxy(
detachable={"objects"},
):
objects: List[str]
max_depth: int
maxDepth: int
name: str
@@ -21,7 +21,7 @@ def setup_session(auth_token: str | None) -> requests.Session:
read=3,
connect=3,
backoff_factor=0.5,
status_forcelist=(500, 502, 503, 504, 408, 429),
status_forcelist=(500, 502, 503, 504, 520, 408, 429),
allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"],
raise_on_status=False,
)
@@ -14,10 +14,14 @@ from speckle_automate import (
run_function,
)
from speckle_automate.fixtures import (
TestAutomationEnvironment,
create_test_automation_run_data,
)
from speckle_automate.schema import AutomateBase
from specklepy.api.client import SpeckleClient
from specklepy.core.api.enums import ProjectVisibility
from specklepy.core.api.inputs import ProjectCreateInput
from specklepy.core.api.models import Project
from specklepy.core.api.models.current import Model, Version
from specklepy.core.helpers import crypto_random_string
from specklepy.objects.base import Base
@@ -43,18 +47,33 @@ def test_client(speckle_server_url: str, speckle_token: str) -> SpeckleClient:
return test_client
@pytest.fixture
def project(test_client: SpeckleClient) -> Project:
return test_client.project.create(
ProjectCreateInput(
name="test", description=None, visibility=ProjectVisibility.PRIVATE
)
)
@pytest.fixture
def automation_run_data(
test_client: SpeckleClient, speckle_server_url: str
test_client: SpeckleClient,
speckle_server_url: str,
speckle_token: str,
project: Project,
) -> AutomationRunData:
"""TODO: Set up a test automation for integration testing"""
project_id = crypto_random_string(10)
test_automation_id = crypto_random_string(10)
return create_test_automation_run_data(
test_client, speckle_server_url, project_id, test_automation_id
environment = TestAutomationEnvironment(
token=speckle_token,
server_url=speckle_server_url,
project_id=project.id,
automation_id=test_automation_id,
)
return create_test_automation_run_data(test_client, environment)
@pytest.fixture
def automation_context(
@@ -133,7 +152,7 @@ def automate_function(
raise ValueError("Cannot operate on objects without their id's.")
automation_context.attach_error_to_objects(
"Forbidden speckle_type",
version_root_object.id,
version_root_object,
"This project should not contain the type: "
f"{function_inputs.forbidden_speckle_type}",
)
@@ -164,7 +183,7 @@ def test_function_run(automation_context: AutomationContext) -> None:
assert automation_context.run_status == AutomationStatus.FAILED
status = get_automation_status(
automation_context.automation_run_data.project_id,
automation_context.automation_run_data.model_id,
automation_context.automation_run_data.triggers[0].payload.model_id,
automation_context.speckle_client,
)
assert status["status"] == automation_context.run_status
@@ -205,7 +224,7 @@ def test_create_version_in_project_raises_error_for_same_model(
) -> None:
with pytest.raises(ValueError):
automation_context.create_new_version_in_project(
Base(), automation_context.automation_run_data.branch_name
Base(), automation_context.automation_run_data.triggers[0].payload.model_id
)
@@ -220,8 +239,8 @@ def test_create_version_in_project(
model, version = automation_context.create_new_version_in_project(
root_object, "foobar"
)
isinstance(model, Model)
isinstance(version, Version)
assert isinstance(model, Model)
assert isinstance(version, Version)
@pytest.mark.skip(
@@ -230,9 +249,11 @@ def test_create_version_in_project(
def test_set_context_view(automation_context: AutomationContext) -> None:
automation_context.set_context_view()
trigger = automation_context.automation_run_data.triggers[0].payload
assert automation_context.context_view is not None
assert automation_context.context_view.endswith(
f"models/{automation_context.automation_run_data.model_id}@{automation_context.automation_run_data.version_id}"
f"models/{trigger.model_id}@{trigger.version_id}"
)
automation_context.report_run_status()
@@ -244,7 +265,7 @@ def test_set_context_view(automation_context: AutomationContext) -> None:
assert automation_context.context_view is not None
assert automation_context.context_view.endswith(
f"models/{automation_context.automation_run_data.model_id}@{automation_context.automation_run_data.version_id},{dummy_context}"
f"models/{trigger.model_id}@{trigger.version_id},{dummy_context}"
)
automation_context.report_run_status()