refactor: from dirty poc to a clean solution

→ poc was not so solid 😏
→ refactor for better architecture and maintainability
This commit is contained in:
Björn Steinhagen
2025-02-09 15:11:38 +01:00
parent a81dc28ed1
commit 7f49d0db3b
19 changed files with 448 additions and 154 deletions
+67 -32
View File
@@ -5,7 +5,12 @@ from speckle_automate import (
execute_automate_function,
)
from src.processors.commit_processory import CommitProcessor
from src.processors.material import RevitMaterialProcessor
from src.processors.compliance import RevitComplianceChecker
from src.processors.model import RevitModelProcessor
from src.validators.revit import RevitSourceValidator
from src.aggregators.carbon_totals import MassAggregator
from src.logging.compliance_logger import ComplianceLogger
class FunctionInputs(AutomateBase):
@@ -24,39 +29,69 @@ def automate_function(
automate_context: AutomationContext,
function_inputs: FunctionInputs,
) -> None:
# TODO: Add method to automation_context for sourceApplication
version_id = automate_context.automation_run_data.triggers[0].payload.version_id
commit_root = automate_context.speckle_client.commit.get(
automate_context.automation_run_data.project_id, version_id
"""Program entry point."""
try:
# Get version data
version_id = automate_context.automation_run_data.triggers[0].payload.version_id
commit_root = automate_context.speckle_client.commit.get(
automate_context.automation_run_data.project_id, version_id
)
# Validate source application
source_validator = RevitSourceValidator() # Built for revit, therefore check
if not source_validator.validate(commit_root.sourceApplication):
automate_context.mark_run_failed(
f"Automation requires Revit v3 commits. Received: {commit_root.sourceApplication}"
)
return
# Create processor chain and get logger for results
processor, logger = create_processor_chain()
# Process model
model_root = automate_context.receive_version() # TODO: Line 35 and 36!?
processor.process_elements(model_root)
# Report compliance issues
compliance_summary = logger.get_summary()
for missing_property, elements in compliance_summary.items():
automate_context.attach_warning_to_objects(
category="Missing Revit Material Property",
object_ids=elements,
message=(
f"Missing {missing_property} on the object, preventing mass calculation. "
f"Update Revit object to contain the necessary properties if element is critical."
),
)
automate_context.mark_run_success("Processing completed successfully.")
except Exception as e:
automate_context.mark_run_failed(f"Processing failed: {str(e)}")
raise # Re-raise for proper error tracking
def create_processor_chain() -> tuple[RevitModelProcessor, ComplianceLogger]:
"""Creates and configures the required components."""
# Core components
logger = ComplianceLogger() # For tracking issues
mass_aggregator = MassAggregator() # For collecting computed masses
# Create processors
material_processor = RevitMaterialProcessor(mass_aggregator) # Material calcs
compliance_checker = RevitComplianceChecker(logger) # Validation
# Create and return the main processor with logger
return (
RevitModelProcessor(
material_processor=material_processor,
compliance_checker=compliance_checker,
logger=logger,
),
logger,
)
# ️ sourceApplication value for v2: AppName + Version => Revit2024, Revit2023 etc.
# ️ sourceApplication value for v3: slug => revit
# ⚠️ We're just working with v3 data - adapt commit_processor for v2 data structure if you want
# ⚠️ Alternatively, write a model factory that injects the correct CommitProcessor()
if commit_root.sourceApplication != "revit":
automate_context.mark_run_failed(
f"Automation built for v3 Revit commits. These are commits with a "
f"case-sensitive sourceApplication == 'revit', not {commit_root.sourceApplication})"
)
# Process elements
model_root = automate_context.receive_version() # TODO: This is a waste!
processor = CommitProcessor()
processor.process_elements(model_root)
compliance_summary = processor.logger.get_summary()
for missing_property, elements in compliance_summary.items():
automate_context.attach_warning_to_objects(
category="Missing Revit Material Property",
object_ids=elements,
message=f"Missing {missing_property} on the object, preventing mass calculation. "
f"Update Revit object to contain the necessary properties if element is critical. ",
)
# TODO: create_new_version_in_project
automate_context.mark_run_success("Under development.")
if __name__ == "__main__":
execute_automate_function(automate_function, FunctionInputs)
+13
View File
@@ -0,0 +1,13 @@
from .logger import Logger
from .model_processor import ModelProcessor
from .validator import SourceApplicationValidator
from .material_processor import MaterialProcessor
from .compliance_checker import ComplianceChecker
__all__ = [
"Logger",
"ModelProcessor",
"SourceApplicationValidator",
"MaterialProcessor",
"ComplianceChecker",
]
+9
View File
@@ -0,0 +1,9 @@
from abc import ABC, abstractmethod
from typing import Any, List
class ComplianceChecker(ABC):
@abstractmethod
def check_compliance(self, element: Any, required_properties: List[str]) -> bool:
"""Check if element meets compliance requirements"""
pass
+19
View File
@@ -0,0 +1,19 @@
from abc import ABC, abstractmethod
from typing import Dict
class Logger(ABC):
@abstractmethod
def log_error(self, message: str, **kwargs) -> None:
"""Log an error message"""
pass
@abstractmethod
def log_warning(self, message: str, **kwargs) -> None:
"""Log a warning message"""
pass
@abstractmethod
def get_summary(self) -> Dict:
"""Get summary of logged messages"""
pass
+11
View File
@@ -0,0 +1,11 @@
from abc import ABC, abstractmethod
from typing import Dict, Any
class MaterialProcessor(ABC):
@abstractmethod
def process_material(
self, material_data: Dict[str, Any], level: str, type_name: str
) -> None:
"""Process material data and compute required properties"""
pass
+14
View File
@@ -0,0 +1,14 @@
from abc import ABC, abstractmethod
from typing import Any
class ModelProcessor(ABC):
@abstractmethod
def process_elements(self, model: Any) -> None:
"""Process all elements in the model"""
pass
@abstractmethod
def process_element(self, level: str, type_name: str, element: Any) -> None:
"""Process a single element"""
pass
+8
View File
@@ -0,0 +1,8 @@
from abc import ABC, abstractmethod
class SourceApplicationValidator(ABC):
@abstractmethod
def validate(self, source_app: str) -> bool:
"""Validate if the source application is supported"""
pass
+24 -13
View File
@@ -2,27 +2,38 @@ import structlog
from typing import Dict, DefaultDict
from collections import defaultdict
logger = structlog.get_logger()
from src.interfaces.logger import Logger # Import the interface
class ComplianceLogger:
class ComplianceLogger(Logger): # Explicitly implement Logger interface
def __init__(self):
self.missing_properties: DefaultDict[str, set] = defaultdict(set)
self._structlog = structlog.get_logger()
def log_missing_properties(self, object_id: str, missing_property: str) -> None:
# Log to our collection for automation results
self.missing_properties[missing_property].add(object_id)
def log_error(self, message: str, **kwargs) -> None:
"""Log an error message"""
self._structlog.error(message, **kwargs)
# Still log individual cases for dev
logger.warn(
"non_compliant_element",
object_id=object_id,
property=missing_property,
message=f"Missing: '{missing_property}' on object {object_id}. No computation on "
f"for this object possible. Skipped.",
)
def log_warning(self, message: str, **kwargs) -> None:
"""
Log a warning message. Also tracks missing properties if they are specified
in the kwargs.
Args:
message: Warning message to log
**kwargs: Additional context. If 'object_id' and 'missing_property' are
provided, they will be tracked for compliance reporting.
"""
self._structlog.warn(message, **kwargs)
# Track missing properties if relevant kwargs are provided
object_id = kwargs.get("object_id")
missing_property = kwargs.get("missing_property")
if object_id and missing_property:
self.missing_properties[missing_property].add(object_id)
def get_summary(self) -> Dict[str, list]:
"""Get summary of logged messages"""
return {
prop: list(elements) for prop, elements in self.missing_properties.items()
}
+3
View File
@@ -0,0 +1,3 @@
from .material import MaterialData
__all__ = ["MaterialData"]
+12
View File
@@ -0,0 +1,12 @@
from dataclasses import dataclass
from typing import Optional
@dataclass
class MaterialData:
"""Data class for material properties"""
volume: float
density: float
structural_asset: str
mass: Optional[float] = None
+5
View File
@@ -0,0 +1,5 @@
from .material import RevitMaterialProcessor
from .compliance import RevitComplianceChecker
from .model import RevitModelProcessor
__all__ = ["RevitMaterialProcessor", "RevitComplianceChecker", "RevitModelProcessor"]
-109
View File
@@ -1,109 +0,0 @@
from typing import Dict, Any
from src.aggregators.carbon_totals import MassAggregator
from src.logging.compliance_logger import ComplianceLogger
from src.utils.constants import * # wildcard is a little dangerous
class CommitProcessor:
def __init__(self):
self.logger = ComplianceLogger()
self.mass_aggregator = MassAggregator()
def process_elements(
self, model: "Base"
) -> None: # No return needed, we're modifying in-place
levels = getattr(model, ELEMENTS, None)
if not levels: # First nesting => levels
raise ValueError("Invalid commit: missing elements at the model root.")
for level in levels:
type_groups = getattr(level, ELEMENTS, None)
if not type_groups:
raise ValueError(
f"Invalid level structure: missing elements in {getattr(level,NAME, '!Missing name attribute!')}"
)
for type_group in type_groups:
revit_objects = getattr(type_group, ELEMENTS, None)
if not revit_objects:
raise ValueError(
f"Invalid type structure: missing elements in "
f"{getattr(type_group, NAME, '!Missing name attribute!')}"
)
level_name = getattr(level, NAME, None)
type_name = getattr(type_group, NAME, None)
if level_name is None or type_name is None:
raise ValueError(
f"Every object should be on a level and be of a type."
)
for revit_object in revit_objects:
self.process_element(
level=level_name, type_name=type_name, revit_object=revit_object
)
def process_element(
self, level: str, type_name: str, revit_object: Dict[str, Any]
) -> None: # Mutating in-place
elements = getattr(revit_object, ELEMENTS, None)
if not elements:
self.logger.log_missing_properties(revit_object[ID], ELEMENTS)
for element in elements:
properties = getattr(element, PROPERTIES, None)
if not properties:
self.logger.log_missing_properties(
revit_object[ID], PROPERTIES
) # 🤔 revit_object/element?
return
material_quantities = properties.get(MATERIAL_QUANTITIES, None)
if not material_quantities:
self.logger.log_missing_properties(
revit_object[ID], MATERIAL_QUANTITIES
)
return
for material_name, material_data in material_quantities.items():
if VOLUME not in material_data:
self.logger.log_missing_properties(revit_object[ID], VOLUME)
return
if STRUCTURAL_ASSET not in material_data:
self.logger.log_missing_properties(
revit_object[ID], STRUCTURAL_ASSET
)
return
# ⚠️ This should never hit. No STRUCTURAL_ASSET → no DENSITY
if DENSITY not in material_data:
self.logger.log_missing_properties(revit_object[ID], DENSITY)
return
try:
# Dict structure for numerical properties(e.g.)
# {"name" : "volume", "value" : 100, "units" : "Cubic metres"}
# 🤫 Shouldn't change.
volume = material_data[VOLUME][VALUE]
density = material_data[DENSITY][VALUE]
mass = volume * density
material_data[MASS] = {
NAME: MASS,
VALUE: mass,
UNITS: material_data[DENSITY][UNITS].split()[0],
# TODO: 🫣 Units string operation is super sketchy.
}
self.mass_aggregator.add_mass(
mass, level, type_name, material_data[STRUCTURAL_ASSET]
)
# ❗ We've validated everything. If the computation fails, there's a bug.
# 🤾 Throw.
except (ValueError, TypeError, KeyError) as e:
raise ValueError(
f"Computation failed for {material_name} despite having required properties: {str(e)}"
) from e
+22
View File
@@ -0,0 +1,22 @@
from typing import Any, List
from src.interfaces.compliance_checker import ComplianceChecker
from src.interfaces.logger import Logger
from src.utils.constants import ID
class RevitComplianceChecker(ComplianceChecker):
def __init__(self, logger: Logger):
self._logger = logger
def check_compliance(self, element: Any, required_properties: List[str]) -> bool:
"""
Checks basic element compliance (presence of ID and any custom required properties)
"""
element_id = getattr(element, ID, None)
if not element_id:
self._logger.log_warning(
"Element missing ID", object_id="unknown", missing_property=ID
)
return False
return True # Basic compliance only requires ID
+26
View File
@@ -0,0 +1,26 @@
from typing import Dict, Any
from src.interfaces.material_processor import MaterialProcessor
from src.models.material import (
MaterialData,
) # Import from models instead of defining here
from src.utils.constants import *
class RevitMaterialProcessor(MaterialProcessor):
def __init__(self, mass_aggregator: "MassAggregator"):
self._mass_aggregator = mass_aggregator
def process_material(
self, material_data: Dict[str, Any], level: str, type_name: str
) -> MaterialData:
volume = material_data[VOLUME][VALUE]
density = material_data[DENSITY][VALUE]
mass = volume * density
structural_asset = material_data[STRUCTURAL_ASSET]
mat_data = MaterialData(
volume=volume, density=density, structural_asset=structural_asset, mass=mass
)
self._mass_aggregator.add_mass(mass, level, type_name, structural_asset)
return mat_data
+118
View File
@@ -0,0 +1,118 @@
from typing import Any
from src.interfaces.model_processor import ModelProcessor
from src.interfaces.material_processor import MaterialProcessor
from src.interfaces.compliance_checker import ComplianceChecker
from src.interfaces.logger import Logger
from src.utils.constants import (
ELEMENTS,
NAME,
PROPERTIES,
MATERIAL_QUANTITIES,
ID,
VOLUME,
STRUCTURAL_ASSET,
DENSITY,
)
class RevitModelProcessor(ModelProcessor):
def __init__(
self,
material_processor: MaterialProcessor,
compliance_checker: ComplianceChecker,
logger: Logger,
):
self._material_processor = material_processor
self._compliance_checker = compliance_checker
self._logger = logger
def process_elements(self, model: Any) -> None:
"""Process all elements in the model hierarchically.
Model → Levels → Type Groups → Elements"""
levels = getattr(model, ELEMENTS, None)
if not levels:
raise ValueError("Invalid model: missing elements at root.")
for level in levels:
self._process_level(level)
def _process_level(self, level: Any) -> None:
"""Process a single level in the model"""
type_groups = getattr(level, ELEMENTS, None)
if not type_groups:
level_name = getattr(level, NAME, "Unknown")
raise ValueError(
f"Invalid level structure: missing elements in {level_name}"
)
level_name = getattr(level, NAME)
for type_group in type_groups:
self._process_type_group(type_group, level_name)
def _process_type_group(self, type_group: Any, level_name: str) -> None:
"""Process a group of elements of the same type"""
revit_objects = getattr(type_group, ELEMENTS, None)
if not revit_objects:
type_name = getattr(type_group, NAME, "Unknown")
raise ValueError(f"Invalid type structure: missing elements in {type_name}")
type_name = getattr(type_group, NAME)
for revit_object in revit_objects:
self.process_element(level_name, type_name, revit_object)
def process_element(self, level: str, type_name: str, revit_object: Any) -> None:
"""Process a single element following original logic exactly"""
element_id = getattr(revit_object, ID, None)
if not element_id:
return
# First check elements
elements = getattr(revit_object, ELEMENTS, None)
if not elements:
self._logger.log_warning(
f"Missing elements", object_id=element_id, missing_property=ELEMENTS
)
return
# Process each element
for element in elements:
# Check properties
properties = getattr(element, PROPERTIES, None)
if not properties:
self._logger.log_warning(
f"Missing properties",
object_id=element_id,
missing_property=PROPERTIES,
)
return
# Check Material Quantities
material_quantities = properties.get(MATERIAL_QUANTITIES, None)
if not material_quantities:
self._logger.log_warning(
f"Missing Material Quantities",
object_id=element_id,
missing_property=MATERIAL_QUANTITIES,
)
return
# Process each material
for material_name, material_data in material_quantities.items():
# Check required material properties
for required_prop in [VOLUME, STRUCTURAL_ASSET, DENSITY]:
if required_prop not in material_data:
self._logger.log_warning(
f"Missing {required_prop}",
object_id=element_id,
missing_property=required_prop,
)
return
try:
self._material_processor.process_material(
material_data, level, type_name
)
except Exception as e:
self._logger.log_error(
f"Failed to process element {element_id}", error=str(e)
)
+3
View File
@@ -0,0 +1,3 @@
from .revit import RevitSourceValidator
__all__ = ["RevitSourceValidator"]
+12
View File
@@ -0,0 +1,12 @@
from src.interfaces.validator import SourceApplicationValidator
class RevitSourceValidator(SourceApplicationValidator):
"""Validates that the source application is Revit"""
# ️ sourceApplication value for v2: AppName + Version => Revit2024, Revit2023 etc.
# ️ sourceApplication value for v3: slug => revit
# ⚠️ We're just working with v3 data - adapt commit_processor for v2 data structure if you want
# ⚠️ Alternatively, write a model factory that injects the correct CommitProcessor()
def validate(self, source_app: str) -> bool:
return source_app == "revit"
View File
+82
View File
@@ -0,0 +1,82 @@
from src.processors.material import RevitMaterialProcessor
from src.processors.compliance import RevitComplianceChecker
from src.processors.model import RevitModelProcessor
from src.validators.revit import RevitSourceValidator
from src.aggregators.carbon_totals import MassAggregator
from src.logging.compliance_logger import ComplianceLogger
# Import required libraries
from specklepy.api.client import SpeckleClient
from specklepy.core.api import operations
from specklepy.transports.server import ServerTransport
# Define global variables
HOST = "https://app.speckle.systems/"
AUTHENTICATION_TOKEN = "840e5a18cda38ccc2a9ed8b52e9316530505c14181"
STREAM_ID = "99bdf924fb"
BRANCH_NAME = "2391"
# Setting up SpeckleClient and authenticating
client = SpeckleClient(host=HOST)
client.authenticate_with_token(token=AUTHENTICATION_TOKEN)
# Receving commit
transport = ServerTransport(STREAM_ID, client)
branch = client.branch.get(stream_id=STREAM_ID, name=BRANCH_NAME)
model_data = operations.receive(branch.commits.items[0].referencedObject, transport)
def create_processor_chain() -> tuple[RevitModelProcessor, ComplianceLogger]:
"""
Creates and configures the processing chain with all necessary dependencies.
Returns:
tuple[RevitModelProcessor, ComplianceLogger]:
- Configured processor ready to handle Revit models
- Logger instance for accessing compliance results
"""
# Create core components
logger = ComplianceLogger()
mass_aggregator = MassAggregator()
# Create processors
material_processor = RevitMaterialProcessor(mass_aggregator)
compliance_checker = RevitComplianceChecker(logger)
# Create and return the main processor with logger
return (
RevitModelProcessor(
material_processor=material_processor,
compliance_checker=compliance_checker,
logger=logger,
),
logger,
)
try:
# Get version data
commit_root = branch.commits.items[0]
# Validate source application
source_validator = RevitSourceValidator()
if not source_validator.validate(commit_root.sourceApplication):
print(
f"Automation requires Revit v3 commits. Received: {commit_root.sourceApplication}"
)
# Create processor chain and get logger for results
processor, logger = create_processor_chain()
# Process model
model_root = model_data
processor.process_elements(model_root)
# Report compliance issues
compliance_summary = logger.get_summary()
print("Processing completed successfully.")
except Exception as e:
print(f"Processing failed: {str(e)}")
raise # Re-raise for proper error tracking