feat: add automation context implementation

This commit is contained in:
Gergő Jedlicska
2023-09-18 13:31:53 +02:00
parent 7d7d6666d0
commit 08c189d247
10 changed files with 1230 additions and 435 deletions
-45
View File
@@ -1,45 +0,0 @@
from pydantic import BaseModel, ConfigDict
from stringcase import camelcase
from specklepy.transports.memory import MemoryTransport
from specklepy.transports.server import ServerTransport
from specklepy.api.operations import receive
from specklepy.api.client import SpeckleClient
from flatten import flatten_base
from speckle_project_data import SpeckleProjectData
class FunctionInputs(BaseModel):
"""
These are function author defined values, automate will make sure to supply them.
"""
speckle_type_to_count: str
model_config = ConfigDict(alias_generator=camelcase)
def automate_function(
project_data: SpeckleProjectData,
function_inputs: FunctionInputs,
speckle_token: str,
):
client = SpeckleClient(project_data.speckle_server_url)
client.authenticate_with_token(speckle_token)
commit = client.commit.get(project_data.project_id, project_data.version_id)
memory_transport = MemoryTransport()
server_transport = ServerTransport(project_data.project_id, client)
if not commit.referencedObject:
raise ValueError("The commit has no root referencedObject.")
base = receive(commit.referencedObject, server_transport, memory_transport)
count = 0
for b in flatten_base(base):
if b.speckle_type == function_inputs.speckle_type_to_count:
count += 1
print(
f"Found {count} object that match the queried speckle type: ",
f"{function_inputs.speckle_type_to_count}",
)
+483
View File
@@ -0,0 +1,483 @@
"""WIP module for an automate python sdk."""
import json
import os
import sys
import time
import traceback
from collections import defaultdict
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Callable, TypeVar, overload
import httpx
from gql import gql
from pydantic import BaseModel, ConfigDict
from specklepy.api import operations
from specklepy.api.client import SpeckleClient
from specklepy.objects.base import Base
from specklepy.transports.memory import MemoryTransport
from specklepy.transports.server import ServerTransport
from stringcase import camelcase
class AutomateBase(BaseModel):
"""Use this class as a base model for automate related DTO."""
model_config = ConfigDict(alias_generator=camelcase, populate_by_name=True)
class AutomationRunData(BaseModel):
"""Values of the project / model that triggered the run of this function."""
project_id: str
model_id: str
branch_name: str
version_id: str
speckle_server_url: str
automation_id: str
automation_revision_id: str
automation_run_id: str
function_id: str
function_revision: str
model_config = ConfigDict(
alias_generator=camelcase, populate_by_name=True, protected_namespaces=()
)
class AutomationStatus(str, Enum):
"""Set the status of the automation."""
INITIALIZING = "INITIALIZING"
RUNNING = "RUNNING"
FAILED = "FAILED"
SUCCEEDED = "SUCCEEDED"
class ObjectResultLevel(str, Enum):
"""Possible status message levels for object reports."""
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
class ObjectResult(AutomateBase):
"""An object level result."""
level: ObjectResultLevel
status_message: str
class AutomationResult(AutomateBase):
"""Schema accepted by the Speckle server as a result for an automation run."""
elapsed: float = 0
result_view: str | None = None
result_versions: list[str] = field(default_factory=list)
blobs: list[str] = field(default_factory=list)
run_status: AutomationStatus = AutomationStatus.RUNNING
status_message: str | None = None
object_results: dict[str, list[ObjectResult]] = field(
default_factory=lambda: defaultdict(list) # typing: ignore
)
T = TypeVar("T", bound=AutomateBase)
@dataclass
class AutomationContext:
"""A WIP umbrella class for automate sdk functionality.
Potentially turn this into a context manager, to handle function enter exit status
changes.
"""
automation_run_data: AutomationRunData
speckle_client: SpeckleClient
_server_transport: ServerTransport
_speckle_token: str
#: keep a memory transponrt at hand, to speed up things if needed
_memory_transport: MemoryTransport = field(default_factory=MemoryTransport)
#: added for performance measuring
_init_time: float = field(default_factory=time.perf_counter)
_automation_result: AutomationResult = field(default_factory=AutomationResult)
@classmethod
def initialize(
cls, automation_run_data: str | AutomationRunData, speckle_token: str
) -> "AutomationContext":
"""Bootstrap the AutomateSDK from raw data.
Todo:
----
* bootstrap a structlog logger instance
* expose a logger, that ppl can use instead of print
* log an initialization message
"""
# parse the json value if its not an initialized project data instance
automation_run_data = (
automation_run_data
if isinstance(automation_run_data, AutomationRunData)
else AutomationRunData.model_validate_json(automation_run_data)
)
speckle_client = SpeckleClient(
automation_run_data.speckle_server_url,
automation_run_data.speckle_server_url.startswith("https"),
)
speckle_client.authenticate_with_token(speckle_token)
if not speckle_client.account:
msg = (
f"Could not autenticate to {automation_run_data.speckle_server_url}",
"with the provided token",
)
raise ValueError(msg)
server_transport = ServerTransport(
automation_run_data.project_id, speckle_client
)
return cls(automation_run_data, speckle_client, server_transport, speckle_token)
@property
def run_status(self) -> AutomationStatus:
"""Get the status of the automation run."""
return self._automation_result.run_status
def elapsed(self) -> float:
"""Return the elapsed time in seconds since the initialization time."""
return time.perf_counter() - self._init_time
def receive_version(self) -> Base:
"""Receive the Speckle project version that triggered this automation run."""
commit = self.speckle_client.commit.get(
self.automation_run_data.project_id, self.automation_run_data.version_id
)
if not commit.referencedObject:
raise ValueError("The commit has no referencedObject, cannot receive it.")
base = operations.receive(
commit.referencedObject, self._server_transport, self._memory_transport
)
print(
f"It took {self.elapsed():2f} seconds to receive",
f" the speckle version {self.automation_run_data.version_id}",
)
return base
def create_new_version_in_project(
self, root_object: Base, model_id: str, version_message: str = ""
) -> None:
"""Save a base model to a new version on the project.
Args:
root_object (Base): The Speckle base object for the new version.
model_id (str): For now please use a `branchName`!
version_message (str): The message for the new version.
"""
if model_id == self.automation_run_data.model_id:
raise ValueError(
f"The target model id: {model_id} cannot match the model id"
f" that triggered this automation: {self.automation_run_data.model_id}"
)
root_object_id = operations.send(
root_object,
[self._server_transport, self._memory_transport],
use_default_cache=False,
)
version_id = self.speckle_client.commit.create(
stream_id=self.automation_run_data.project_id,
object_id=root_object_id,
branch_name=model_id,
message=version_message,
source_application="SpeckleAutomate",
)
self._automation_result.result_versions.append(version_id)
def report_run_status(self) -> None:
"""Report the current run status to the Speckle server triggered the automation.
Once the automation function exits, send the status to the speckle server.
Return the result from the server, it should be a link to the stored automation
result.
"""
query = gql(
"""
mutation ReportFunctionRunStatus(
$automationId: String!,
$automationRevisionId: String!,
$automationRunId: String!,
$versionId: String!,
$functionId: String!,
$runStatus: AutomationRunStatus!
$elapsed: Float!
$resultVersionIds: [String!]!
$statusMessage: String
$objectResults: JSONObject
){
automationMutations {
functionRunStatusReport(input: {
automationId: $automationId
automationRevisionId: $automationRevisionId
automationRunId: $automationRunId
versionId: $versionId
functionRuns: [
{
functionId: $functionId
status: $runStatus,
elapsed: $elapsed,
resultVersionIds: $resultVersionIds,
statusMessage: $statusMessage
results: $objectResults
}]
})
}
}
"""
)
if self.run_status in [AutomationStatus.SUCCEEDED, AutomationStatus.FAILED]:
object_results = {
"version": "1.0.0",
"values": {
"speckleObjects": self._automation_result.model_dump(by_alias=True)[
"objectResults"
],
"blobs": self._automation_result.blobs,
},
}
else:
object_results = None
params = {
"automationId": self.automation_run_data.automation_id,
"automationRevisionId": self.automation_run_data.automation_revision_id,
"automationRunId": self.automation_run_data.automation_run_id,
"versionId": self.automation_run_data.version_id,
"functionId": self.automation_run_data.function_id,
"runStatus": self.run_status.value,
"elapsed": self.elapsed(),
"resultVersionIds": self._automation_result.result_versions,
"objectResults": object_results,
}
self.speckle_client.httpclient.execute(query, params)
def store_file_result(self, file_path: Path | str) -> None:
"""Save a file attached to the project of this automation."""
path_obj = (
Path(file_path).resolve() if isinstance(file_path, str) else file_path
)
class UploadResult(AutomateBase):
blob_id: str
file_name: str
upload_status: int
class BlobUploadResponse(AutomateBase):
upload_results: list[UploadResult]
if not path_obj.exists():
raise ValueError("The given file path doesn't exist")
files = {path_obj.name: open(str(path_obj), "rb")}
url = (
f"{self.automation_run_data.speckle_server_url}/api/stream/"
f"{self.automation_run_data.project_id}/blob"
)
data = (
httpx.post(
url,
files=files,
headers={"authorization": f"Bearer {self._speckle_token}"},
)
.raise_for_status()
.json()
)
upload_response = BlobUploadResponse.model_validate(data)
if len(upload_response.upload_results) != 1:
raise ValueError("Expecting one upload result.")
for upload_result in upload_response.upload_results:
self._automation_result.blobs.append(upload_result.blob_id)
def mark_run_failed(self, status_message: str) -> None:
"""Mark the current run a failure."""
self._mark_run(AutomationStatus.FAILED, status_message)
def mark_run_success(self, status_message: str | None) -> None:
"""Mark the current run a success with an optional message."""
self._mark_run(AutomationStatus.SUCCEEDED, status_message)
def _mark_run(self, status: AutomationStatus, status_message: str | None) -> None:
duration = self.elapsed()
self._automation_result.status_message = status_message
self._automation_result.run_status = status
self._automation_result.elapsed = duration
msg = f"Automation run {status.value} after {duration:2f} seconds."
print("\n".join([msg, status_message]) if status_message else msg)
def add_object_error(self, object_id: str, error_cause: str) -> None:
"""Add an error to a given objec id."""
self._add_object_result(object_id, ObjectResultLevel.ERROR, error_cause)
def add_object_warning(self, object_id: str, warning: str) -> None:
"""Add a warning to a given object id."""
self._add_object_result(object_id, ObjectResultLevel.WARNING, warning)
def add_object_info(self, object_id: str, info: str) -> None:
"""Add an info message to a given object."""
self._add_object_result(object_id, ObjectResultLevel.INFO, info)
def _add_object_result(
self, object_id: str, level: ObjectResultLevel, status_message: str
) -> None:
print(
f"Object {object_id} was marked with {level.value.upper()}",
f" cause: {status_message}",
)
self._automation_result.object_results[object_id].append(
ObjectResult(level=level, status_message=status_message)
)
AutomateFunction = Callable[[AutomationContext, T], None]
AutomateFunctionWithoutInputs = Callable[[AutomationContext], None]
@overload
def execute_automate_function(
automate_function: AutomateFunction[T],
input_schema: type[T],
) -> None:
...
@overload
def execute_automate_function(automate_function: AutomateFunctionWithoutInputs) -> None:
...
def execute_automate_function(
automate_function: AutomateFunction[T] | AutomateFunctionWithoutInputs,
input_schema: type[T] | None = None,
):
"""Runs the provided automate function with the input schema."""
# first arg is the python file name, we do not need that
args = sys.argv[1:]
if len(args) < 2:
raise ValueError("too few arguments specified need minimum 2")
if len(args) > 4:
raise ValueError("too many arguments specified, max supported is 4")
# we rely on a command name convention to decide what to do.
# this is here, so that the function authors do not see any of this
command = args[0]
if command == "generate_schema":
path = Path(args[1])
schema = json.dumps(
input_schema.model_json_schema(by_alias=True) if input_schema else {}
)
path.write_text(schema)
elif command == "run":
automation_run_data = args[1]
function_inputs = args[2]
speckle_token = os.environ.get("SPECKLE_TOKEN", None)
if not speckle_token and len(args) != 4:
raise ValueError("Cannot get speckle token from arguments or environment")
speckle_token = speckle_token if speckle_token else args[3]
inputs = (
input_schema.model_validate_json(function_inputs)
if input_schema
else input_schema
)
if inputs:
automate_sdk = run_function(
automate_function, # type: ignore
automation_run_data,
speckle_token,
inputs,
)
else:
automate_sdk = run_function(
automate_function, # type: ignore
automation_run_data,
speckle_token,
)
exit_code = 0 if automate_sdk.run_status == AutomationStatus.SUCCEEDED else 1
exit(exit_code)
else:
raise NotImplementedError(f"Command: '{command}' is not supported.")
@overload
def run_function(
automate_function: AutomateFunction[T],
automation_run_data: AutomationRunData | str,
speckle_token: str,
inputs: T,
) -> AutomationContext:
...
@overload
def run_function(
automate_function: AutomateFunctionWithoutInputs,
automation_run_data: AutomationRunData | str,
speckle_token: str,
) -> AutomationContext:
...
def run_function(
automate_function: AutomateFunction[T] | AutomateFunctionWithoutInputs,
automation_run_data: AutomationRunData | str,
speckle_token: str,
inputs: T | None = None,
) -> AutomationContext:
"""Run the provided function with the automate sdk context."""
automate_sdk = AutomationContext.initialize(automation_run_data, speckle_token)
automate_sdk.report_run_status()
try:
# avoiding complex type gymnastics here on the internals.
# the external type overloads make this correct
if inputs:
automate_function(automate_sdk, inputs) # type: ignore
else:
automate_function(automate_sdk) # type: ignore
# the function author forgot to mark the function success
if automate_sdk.run_status not in [
AutomationStatus.FAILED,
AutomationStatus.SUCCEEDED,
]:
automate_sdk.mark_run_success(
"WARNING: Automate assumed a success status,"
" but it was not marked as so by the function."
)
except Exception:
trace = traceback.format_exc()
print(trace)
automate_sdk.mark_run_failed(
"Function error. Check the automation run logs for details."
)
finally:
automate_sdk.report_run_status()
return automate_sdk
+5 -1
View File
@@ -1,8 +1,12 @@
from typing import Iterable
"""Helper module for a simple speckle object tree flattening."""
from collections.abc import Iterable
from specklepy.objects import Base
def flatten_base(base: Base) -> Iterable[Base]:
"""Take a base and flatten it to an iterable of bases."""
if hasattr(base, "elements"):
for element in base["elements"]:
yield from flatten_base(element)
+50 -20
View File
@@ -1,26 +1,56 @@
import typer
import os
from speckle_project_data import SpeckleProjectData
from automate_function import FunctionInputs, automate_function
from typing_extensions import Annotated
from typing import Optional
"""This module contains the business logic of the function.
Make sure that this module exposes a `FunctionInputs` class
and an `automate_function` function definition.
"""
from specklepy.objects.geometry import Mesh
from automate_sdk import (
AutomateBase,
AutomationContext,
execute_automate_function,
)
from flatten import flatten_base
def main(
speckle_project_data: str,
function_inputs: str,
speckle_token: Annotated[Optional[str], typer.Argument()] = None,
):
speckle_token = (
speckle_token if speckle_token else os.environ.get("SPECKLE_TOKEN", None)
)
if not speckle_token:
raise ValueError("The supplied speckle token is not valid")
class FunctionInputs(AutomateBase):
"""These are function author defined values.
project_data = SpeckleProjectData.model_validate_json(speckle_project_data)
inputs = FunctionInputs.model_validate_json(function_inputs)
automate_function(project_data, inputs, speckle_token)
Automate will make sure to supply them matching the types specified here.
"""
forbidden_speckle_type: str
def automate_function(
automate_context: AutomationContext,
function_inputs: FunctionInputs,
) -> None:
"""Hey, trying the automate sdk experience here."""
version_root_object = automate_context.receive_version()
count = 0
for b in flatten_base(version_root_object):
if b.speckle_type == function_inputs.forbidden_speckle_type:
if not b.id:
raise ValueError("Cannot operate on objects without their id's.")
automate_context.add_object_error(
b.id,
"This project should not contain the type: "
f"{function_inputs.forbidden_speckle_type}",
)
count += 1
if count > 0:
automate_context.mark_run_failed(
"Automation failed: "
f"Found {count} object that have a forbidden speckle type: "
f"{function_inputs.forbidden_speckle_type}"
)
else:
automate_context.mark_run_success("No forbidden types found.")
if __name__ == "__main__":
typer.run(main)
execute_automate_function(automate_function, FunctionInputs)
+10
View File
@@ -0,0 +1,10 @@
from automate_sdk import AutomationResult, ObjectResult, ObjectResultLevel
res = AutomationResult()
res.object_results["foobar"].append(
ObjectResult(level=ObjectResultLevel.ERROR, status_message="foobar")
)
print(res.model_dump(by_alias=True))
Generated
+494 -348
View File
File diff suppressed because it is too large Load Diff
+15 -2
View File
@@ -8,16 +8,29 @@ packages = [{include = "src/speckle_automate_py"}]
[tool.poetry.dependencies]
python = "^3.10"
specklepy = "^2.16.0"
typer = "^0.9.0"
specklepy = "^2.16.2"
pydantic = "^2.1.1"
stringcase = "^1.2.0"
httpx = "^0.25.0"
[tool.poetry.group.dev.dependencies]
black = "^23.3.0"
mypy = "^1.3.0"
ruff = "^0.0.271"
pytest = "^7.4.2"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.ruff]
select = [
"E", # pycodestyle
"F", # pyflakes
"UP", # pyupgrade
"D", # pydocstyle
"I", # isort
]
[tool.ruff.pydocstyle]
convention = "google"
-6
View File
@@ -1,6 +0,0 @@
import json
from main import FunctionInputs
if __name__ == "__main__":
print(json.dumps(FunctionInputs.model_json_schema()))
-13
View File
@@ -1,13 +0,0 @@
from pydantic import BaseModel, ConfigDict
from stringcase import camelcase
class SpeckleProjectData(BaseModel):
"""Values of the project / model that triggered the run of this function."""
project_id: str
model_id: str
version_id: str
speckle_server_url: str
model_config = ConfigDict(alias_generator=camelcase, protected_namespaces=())
+173
View File
@@ -0,0 +1,173 @@
"""Run integration tests with a speckle server."""
import os
import secrets
import string
from pathlib import Path
import pytest
from gql import gql
from specklepy.api import operations
from specklepy.api.client import SpeckleClient
from specklepy.objects.base import Base
from specklepy.transports.server import ServerTransport
from automate_sdk import (
AutomationContext,
AutomationRunData,
AutomationStatus,
run_function,
)
from main import FunctionInputs, automate_function
def crypto_random_string(length: int) -> str:
"""Generate a semi crypto random string of a given length."""
alphabet = string.ascii_letters + string.digits
return "".join(secrets.choice(alphabet) for _ in range(length))
def register_new_automation(
project_id: str,
model_id: str,
speckle_client: SpeckleClient,
automation_id: str,
automation_name: str,
automation_revision_id: str,
):
"""Register a new automation in the speckle server."""
query = gql(
"""
mutation CreateAutomation(
$projectId: String!
$modelId: String!
$automationName: String!
$automationId: String!
$automationRevisionId: String!
) {
automationMutations {
create(
input: {
projectId: $projectId
modelId: $modelId
automationName: $automationName
automationId: $automationId
automationRevisionId: $automationRevisionId
}
)
}
}
"""
)
params = {
"projectId": project_id,
"modelId": model_id,
"automationName": automation_name,
"automationId": automation_id,
"automationRevisionId": automation_revision_id,
}
speckle_client.httpclient.execute(query, params)
@pytest.fixture()
def speckle_token() -> str:
"""Provide a speckle token for the test suite."""
env_var = "SPECKLE_TOKEN"
token = os.getenv(env_var)
if not token:
raise ValueError(f"Cannot run tests without a {env_var} environment variable")
return token
@pytest.fixture()
def speckle_server_url() -> str:
"""Provide a speckle server url for the test suite, default to localhost."""
return os.getenv("SPECKLE_SERVER_URL", "http://127.0.0.1:3000")
@pytest.fixture()
def test_client(speckle_server_url: str, speckle_token: str) -> SpeckleClient:
"""Initialize a SpeckleClient for testing."""
test_client = SpeckleClient(
speckle_server_url, speckle_server_url.startswith("https")
)
test_client.authenticate_with_token(speckle_token)
return test_client
@pytest.fixture()
def test_object() -> Base:
"""Create a Base model for testing."""
root_object = Base()
root_object.foo = "bar"
return root_object
@pytest.fixture()
def automation_run_data(
test_object: Base, test_client: SpeckleClient, speckle_server_url: str
) -> AutomationRunData:
"""Set up an automation context for testing."""
project_id = test_client.stream.create("Automate function e2e test")
branch_name = "main"
model = test_client.branch.get(project_id, branch_name, commits_limit=1)
model_id: str = model.id
root_obj_id = operations.send(
test_object, [ServerTransport(project_id, test_client)]
)
version_id = test_client.commit.create(project_id, root_obj_id)
automation_name = crypto_random_string(10)
automation_id = crypto_random_string(10)
automation_revision_id = crypto_random_string(10)
register_new_automation(
project_id,
model_id,
test_client,
automation_id,
automation_name,
automation_revision_id,
)
automation_run_id = crypto_random_string(10)
function_id = crypto_random_string(10)
function_revision = crypto_random_string(10)
return AutomationRunData(
project_id=project_id,
model_id=model_id,
branch_name=branch_name,
version_id=version_id,
speckle_server_url=speckle_server_url,
automation_id=automation_id,
automation_revision_id=automation_revision_id,
automation_run_id=automation_run_id,
function_id=function_id,
function_revision=function_revision,
)
def test_function_run(automation_run_data: AutomationRunData, speckle_token: str):
"""Run an integration test for the automate function."""
automate_sdk = run_function(
automate_function,
automation_run_data,
speckle_token,
FunctionInputs(forbidden_speckle_type="Base"),
)
assert automate_sdk.run_status == AutomationStatus.FAILED
def test_file_uploads(automation_run_data: AutomationRunData, speckle_token: str):
"""Test file store capabilities of the automate sdk."""
automate_context = AutomationContext.initialize(automation_run_data, speckle_token)
path = Path(f"./{crypto_random_string(10)}").resolve()
path.write_text("foobar")
automate_context.store_file_result(path)
os.remove(path)
assert len(automate_context._automation_result.blobs) == 1