Refactor parameter actions and sanitisation logic

- Enhanced removal action to handle both dicts and Base objects.
- Improved error handling for dynamic attribute removal in Base objects.
- Updated anonymization action to support Revit-style parameters.
- Streamlined function inputs processing in the main automate function.
- Added new method for processing Revit parameters specifically.
This commit is contained in:
Jonathon Broughton
2025-03-25 00:11:28 +00:00
parent fe80f95a19
commit f289891374
2 changed files with 131 additions and 41 deletions
+68 -27
View File
@@ -1,4 +1,5 @@
"""Module for parameter actions and matching strategies."""
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import Any
@@ -78,29 +79,60 @@ class RemovalAction(ParameterAction):
return self.matcher.matches(param_name)
def apply(
self,
parameter: dict[str, Any],
parent_object: Base,
containing_dict: dict[str, Any],
parameter_key: str
self, parameter: dict[str, Any], parent_object: Base, containing_dict: dict[str, Any] | Base, parameter_key: str
) -> None:
"""Remove the parameter from the containing dictionary if it matches."""
param_name = parameter.get("name", parameter_key)
"""Remove the parameter from the containing dictionary if it matches.
# Remove from the containing dictionary
containing_dict.pop(parameter_key, None)
This method handles both dictionary-style containers and Base objects with attributes.
Args:
parameter: The parameter dictionary or object
parent_object: The parent Speckle object
containing_dict: The container (dict or Base object) holding the parameter
parameter_key: The key or attribute name of the parameter
"""
param_name = parameter.get("name", parameter_key)
object_id = getattr(parent_object, "id", None)
# Handle removal based on the container type
if isinstance(containing_dict, dict):
# Standard dictionary - just pop the key
containing_dict.pop(parameter_key, None)
elif isinstance(containing_dict, Base):
# For Base objects like Revit parameters, try to remove using __dict__
try:
if hasattr(containing_dict, "__dict__") and parameter_key in containing_dict.__dict__:
containing_dict.__dict__.pop(parameter_key)
else:
# If not in __dict__, try using dynamic attribute removal
containing_dict.__dict__.pop(parameter_key, None)
except (AttributeError, KeyError, TypeError):
# Fallback to alternative methods if direct dict manipulation fails
try:
delattr(containing_dict, parameter_key)
except (AttributeError, TypeError):
try:
setattr(containing_dict, parameter_key, None)
except (AttributeError, TypeError):
# If all removal attempts fail, try one more approach specific to Speckle Base objects
if (
hasattr(containing_dict, "get_dynamic_member_names")
and parameter_key in containing_dict.get_dynamic_member_names()
):
# This is a workaround for dynamic properties in Speckle Base objects
application_name = parameter.get("applicationInternalName", parameter_key)
if application_name in containing_dict.__dict__:
containing_dict.__dict__.pop(application_name)
# Track affected object and parameter
self.affected_parameters[getattr(parent_object, "id", None)].append(param_name)
self.affected_parameters[object_id].append(param_name)
def report(self, automate_context: AutomationContext) -> None:
"""Provide feedback based on the action's results."""
if not self.affected_parameters:
return
removed_params = set(
param for params in self.affected_parameters.values() for param in params
)
removed_params = set(param for params in self.affected_parameters.values() for param in params)
message = f"The following parameters were removed: {', '.join(removed_params)}"
@@ -111,9 +143,6 @@ class RemovalAction(ParameterAction):
)
class AnonymizationAction(ParameterAction):
"""Action to anonymize email addresses in parameter values."""
@@ -136,25 +165,22 @@ class AnonymizationAction(ParameterAction):
return self.email_matcher.contains_email(param_value)
def apply(
self,
parameter: dict[str, Any],
parent_object: Base,
containing_dict: dict[str, Any],
parameter_key: str
self, parameter: dict[str, Any], parent_object: Base, containing_dict: dict[str, Any] | Base, parameter_key: str
) -> None:
"""Anonymize email addresses in the parameter value.
Args:
parameter: The parameter dictionary
parent_object: The parent Speckle object
containing_dict: The dictionary containing the parameter
parameter_key: The key of the parameter in the containing dictionary
containing_dict: The container (dict or Base object) holding the parameter
parameter_key: The key or attribute name of the parameter
"""
if "value" not in parameter or not isinstance(parameter["value"], str):
return
param_name = parameter.get("name", parameter_key)
original_value = parameter["value"]
object_id = getattr(parent_object, "id", None)
# Anonymize email addresses in the parameter value
anonymized_value = self.email_matcher.anonymize_email(original_value)
@@ -164,8 +190,22 @@ class AnonymizationAction(ParameterAction):
# Update the parameter value in place
parameter["value"] = anonymized_value
# If we're dealing with a Base object parameter (like in Revit),
# update the actual value property of the parameter object
if isinstance(containing_dict, Base):
try:
# Try to get the parameter object using __getitem__ first (Revit v2 style)
param_obj = containing_dict.__getitem__(parameter_key)
if param_obj is not None and hasattr(param_obj, "value"):
setattr(param_obj, "value", anonymized_value)
except (AttributeError, KeyError, TypeError):
# Fallback to standard attribute access
param_obj = getattr(containing_dict, parameter_key, None)
if param_obj is not None and hasattr(param_obj, "value"):
setattr(param_obj, "value", anonymized_value)
# Track affected object and parameter
self.affected_parameters[getattr(parent_object, "id", None)].append(param_name)
self.affected_parameters[object_id].append(param_name)
self.anonymized_count += 1
def report(self, automate_context: AutomationContext) -> None:
@@ -177,9 +217,7 @@ class AnonymizationAction(ParameterAction):
if not self.affected_parameters:
return
anonymized_params = set(
param for params in self.affected_parameters.values() for param in params
)
anonymized_params = set(param for params in self.affected_parameters.values() for param in params)
message = f"Email addresses were anonymized in {len(anonymized_params)} parameters"
@@ -189,17 +227,20 @@ class AnonymizationAction(ParameterAction):
message=message,
)
# Factory functions to create specific actions with the right matcher
def create_prefix_removal_action(forbidden_prefix: str, strict_mode: bool = False) -> RemovalAction:
"""Create a removal action that matches by prefix."""
matcher = PrefixMatcher(forbidden_prefix, strict_mode)
return RemovalAction(matcher)
def create_pattern_removal_action(pattern: str, strict_mode: bool = False) -> RemovalAction:
"""Create a removal action that matches by pattern/regex."""
matcher = PatternMatcher(pattern, strict_mode)
return RemovalAction(matcher)
# Factory function to create anonymization action
def create_anonymization_action() -> AnonymizationAction:
"""Create an action that anonymizes email addresses in parameter values."""
+63 -14
View File
@@ -1,4 +1,5 @@
"""Updated main Automate function for parameter sanitization."""
from speckle_automate import AutomationContext
from specklepy.objects import Base
@@ -13,8 +14,8 @@ from data_shield.traversal import get_data_traversal_rules
def automate_function(
automate_context: AutomationContext,
function_inputs: FunctionInputs,
automate_context: AutomationContext,
function_inputs: FunctionInputs,
) -> None:
"""Main function for parameter sanitization.
@@ -30,19 +31,13 @@ def automate_function(
if not function_inputs.parameter_input:
automate_context.mark_run_failed("No parameter prefix has been set for PREFIX_MATCHING mode.")
return
action = create_prefix_removal_action(
function_inputs.parameter_input,
function_inputs.strict_mode
)
action = create_prefix_removal_action(function_inputs.parameter_input, function_inputs.strict_mode)
elif function_inputs.sanitization_mode == SanitizationMode.PATTERN_MATCHING:
if not function_inputs.parameter_input:
automate_context.mark_run_failed("No parameter pattern has been set for PATTERN_MATCHING mode.")
return
action = create_pattern_removal_action(
function_inputs.parameter_input,
function_inputs.strict_mode
)
action = create_pattern_removal_action(function_inputs.parameter_input, function_inputs.strict_mode)
elif function_inputs.sanitization_mode == SanitizationMode.ANONYMIZATION:
# Anonymization doesn't require a parameter input as it automatically detects emails
@@ -94,9 +89,11 @@ def automate_function(
# We can pin the result view to the specific version we created.
automate_context.set_context_view([f"{new_model_id}@{new_version_id}"], False)
automate_context.mark_run_success(f"Parameters processed successfully with shield function "
f"{function_inputs.sanitization_mode}"
f"{' running in strict mode' if function_inputs.strict_mode else ''}.")
automate_context.mark_run_success(
f"Parameters processed successfully with shield function "
f"{function_inputs.sanitization_mode}"
f"{' running in strict mode' if function_inputs.strict_mode else ''}."
)
# Modified ParameterProcessor class imported from processor_update.py
@@ -160,4 +157,56 @@ class ParameterProcessor:
elif isinstance(value, dict):
# Recurse into nested dictionaries
self.process_properties_dict(value, current_object)
self.process_properties_dict(value, current_object)
def process_revit_parameters(self, current_object):
"""Process v2 Revit-style parameters to find and apply the action.
Revit parameters are stored as Base objects with speckle_type 'Objects.BuiltElements.Revit.Parameter'
and can be accessed via current_object.parameters.
Args:
current_object: The current object being processed
"""
if not hasattr(current_object, "parameters") or current_object.parameters is None:
return
parameters = current_object.parameters
# Use get_dynamic_member_names() to get all parameter keys
for parameter_key in parameters.get_dynamic_member_names():
# Get the parameter object using __getitem__
try:
param_obj = parameters.__getitem__(f"{parameter_key}")
except:
continue
# Check if it's a Revit parameter
if not isinstance(param_obj, Base) or getattr(param_obj, "speckle_type", "") != "Objects.BuiltElements.Revit.Parameter":
continue
# For name-based checks, we need to check both the name property and applicationInternalName
name_to_check = getattr(param_obj, "name", "")
value_to_check = getattr(param_obj, "value", "")
# Create a parameter dict to pass to the action
param_dict = {
"name": name_to_check,
"value": value_to_check,
"applicationInternalName": parameter_key,
}
# Check based on mode (name or value)
if self.check_values:
# For value-based actions (like anonymization)
if isinstance(value_to_check, str) and self.action.check(value_to_check):
# Apply the action
self.action.apply(param_dict, current_object, parameters, parameter_key)
self.processed_objects.add(current_object.id)
else:
# For name-based actions (like removal)
if self.action.check(name_to_check):
# Apply the action
self.action.apply(param_dict, current_object, parameters, parameter_key)
self.processed_objects.add(current_object.id)
self.processed_objects.add(current_object.id)