Refactor anonymization logic and improve checks
build and deploy Speckle functions / publish-automate-function-version (push) Has been cancelled

- Simplified email check method to ensure param_value is a string.
- Streamlined apply method for better handling of parameter values.
- Enhanced error handling when accessing parameters in Base objects.
- Added debug counters for processed objects in the ParameterProcessor class.
- Updated test cases to reflect changes in input parameters.
This commit is contained in:
Jonathon Broughton
2025-03-25 01:49:29 +00:00
parent eafc75c2f7
commit 5c09e22358
3 changed files with 114 additions and 72 deletions
+35 -45
View File
@@ -150,75 +150,65 @@ class AnonymizationAction(ParameterAction):
"""Initialize the anonymization action with an email matcher."""
super().__init__()
self.email_matcher = EmailMatcher()
# Count of anonymized parameters for reporting
self.anonymized_count = 0
def check(self, param_value: str) -> bool:
"""Check if parameter value contains an email address.
Args:
param_value: The parameter value to check
Returns:
bool: True if the parameter value contains an email address, False otherwise
"""
return self.email_matcher.contains_email(param_value)
"""Check if parameter value contains an email address."""
return isinstance(param_value, str) and self.email_matcher.contains_email(param_value)
def apply(
self, parameter: dict[str, Any], parent_object: Base, containing_dict: dict[str, Any] | Base, parameter_key: str
) -> None:
"""Anonymize email addresses in the parameter value.
Args:
parameter: The parameter dictionary
parent_object: The parent Speckle object
containing_dict: The container (dict or Base object) holding the parameter
parameter_key: The key or attribute name of the parameter
"""
if "value" not in parameter or not isinstance(parameter["value"], str):
return
"""Anonymize email addresses in the parameter value."""
# Get parameter name and object ID - same as RemovalAction
param_name = parameter.get("name", parameter_key)
original_value = parameter["value"]
object_id = getattr(parent_object, "id", None)
# Anonymize email addresses in the parameter value
anonymized_value = self.email_matcher.anonymize_email(original_value)
# Get the value to anonymize
param_value = None
# Only track changes if something was actually anonymized
if anonymized_value != original_value:
# Update the parameter value in place
parameter["value"] = anonymized_value
# For dictionary-style parameters
if isinstance(parameter, dict) and "value" in parameter:
param_value = parameter["value"]
if isinstance(param_value, str) and self.email_matcher.contains_email(param_value):
# Anonymize and update
anonymized_value = self.email_matcher.anonymize_email(param_value)
parameter["value"] = anonymized_value
# If we're dealing with a Base object parameter (like in Revit),
# update the actual value property of the parameter object
if isinstance(containing_dict, Base):
# Track affected parameters - EXACTLY like RemovalAction does
self.affected_parameters[object_id].append(param_name)
self.anonymized_count += 1
# For Base object parameters (like in Revit)
elif isinstance(containing_dict, Base):
try:
# Try to get the parameter object
param_obj = None
try:
# Try to get the parameter object using __getitem__ first (Revit v2 style)
param_obj = containing_dict.__getitem__(parameter_key)
if param_obj is not None and hasattr(param_obj, "value"):
setattr(param_obj, "value", anonymized_value)
except (AttributeError, KeyError, TypeError):
# Fallback to standard attribute access
param_obj = getattr(containing_dict, parameter_key, None)
if param_obj is not None and hasattr(param_obj, "value"):
if param_obj and hasattr(param_obj, "value"):
param_value = getattr(param_obj, "value")
if isinstance(param_value, str) and self.email_matcher.contains_email(param_value):
# Anonymize and update
anonymized_value = self.email_matcher.anonymize_email(param_value)
setattr(param_obj, "value", anonymized_value)
# Track affected object and parameter
self.affected_parameters[object_id].append(param_name)
self.anonymized_count += 1
# Track affected parameters - EXACTLY like RemovalAction does
self.affected_parameters[object_id].append(param_name)
self.anonymized_count += 1
except KeyError:
pass # Skip if any error occurs
def report(self, automate_context: AutomationContext) -> None:
"""Provide feedback based on the action's results.
Args:
automate_context: The automation context
"""
"""Provide feedback based on the action's results."""
if not self.affected_parameters:
return
# Copy the exact pattern from RemovalAction for consistency
anonymized_params = set(param for params in self.affected_parameters.values() for param in params)
message = f"Email addresses were anonymized in {len(anonymized_params)} parameters"
automate_context.attach_info_to_objects(
+75 -25
View File
@@ -5,7 +5,6 @@ from specklepy.objects import Base
from data_shield.actions import ParameterAction
# Modified ParameterProcessor class imported from processor_update.py
class ParameterProcessor:
"""Class to handle parameter processing with various actions."""
@@ -19,6 +18,9 @@ class ParameterProcessor:
self.action = action
self.check_values = check_values
self.processed_objects = set()
# Debug counters
self.total_objects_processed = 0
self.revit_params_processed = 0
def process_context(self, context):
"""Process a traversal context to handle parameters and properties.
@@ -27,8 +29,9 @@ class ParameterProcessor:
context: The traversal context containing the current object
"""
current_object = context.current
self.total_objects_processed += 1
# Prioritise v3
# First handle modern v3 properties
if hasattr(current_object, "properties") and current_object.properties is not None:
properties_dict = (
current_object.properties.__dict__
@@ -37,7 +40,7 @@ class ParameterProcessor:
)
self.process_properties_dict(properties_dict, current_object)
# Handle v2 Revit parameters
# Then handle legacy v2 Revit parameters
if hasattr(current_object, "parameters") and current_object.parameters is not None:
self.process_revit_parameters(current_object)
@@ -48,6 +51,9 @@ class ParameterProcessor:
properties_dict: The properties dictionary to process
current_object: The current object being processed
"""
if not properties_dict:
return
for key, value in list(properties_dict.items()): # Safe iteration during mutation
if isinstance(value, dict) and "value" in value:
param_name = value.get("name", key)
@@ -55,7 +61,8 @@ class ParameterProcessor:
# Check based on mode (name or value)
if self.check_values:
# For value-based actions (like anonymization)
if self.action.check(value.get("value", "")):
param_value = value.get("value", "")
if self.action.check(param_value):
self.action.apply(value, current_object, properties_dict, key)
self.processed_objects.add(current_object.id)
else:
@@ -82,41 +89,84 @@ class ParameterProcessor:
parameters = current_object.parameters
# Use get_dynamic_member_names() to get all parameter keys
for parameter_key in parameters.get_dynamic_member_names():
# Get the parameter object using __getitem__
# If parameters is a dictionary rather than a Base object, use it directly
if isinstance(parameters, dict):
self.process_properties_dict(parameters, current_object)
return
# Get all parameter keys - handle different ways of storing parameters
param_keys = []
# Try get_dynamic_member_names() for Base objects
if hasattr(parameters, "get_dynamic_member_names"):
param_keys.extend(parameters.get_dynamic_member_names())
# Try __dict__ for standard attributes
if hasattr(parameters, "__dict__"):
param_keys.extend(k for k in parameters.__dict__.keys() if not k.startswith("_"))
# Try dir() as a last resort
if not param_keys:
param_keys.extend(k for k in dir(parameters) if not k.startswith("_") and k != "get_dynamic_member_names")
# Process each parameter
for parameter_key in param_keys:
# Track for debugging
self.revit_params_processed += 1
# Skip known non-parameter attributes
if parameter_key in ["speckle_type", "id", "totalChildrenCount"]:
continue
# Get the parameter object using multiple methods
param_obj = None
param_value = None
# Try __getitem__ first (common for Revit parameters)
try:
param_obj = parameters.__getitem__(f"{parameter_key}")
except KeyError:
continue
# Check if it's a Revit parameter
if (
not isinstance(param_obj, Base)
or getattr(param_obj, "speckle_type", "") != "Objects.BuiltElements.Revit.Parameter"
):
except (AttributeError, KeyError, TypeError):
try:
# Try direct attribute access
param_obj = getattr(parameters, parameter_key, None)
except KeyError:
continue
# If we couldn't get the parameter, skip it
if param_obj is None:
continue
# For name-based checks, we need to check both the name property and applicationInternalName
name_to_check = getattr(param_obj, "name", "")
value_to_check = getattr(param_obj, "value", "")
# Prepare a parameter dict with the info we have
param_dict = {}
# Create a parameter dict to pass to the action
param_dict = {
"name": name_to_check,
"value": value_to_check,
"applicationInternalName": parameter_key,
}
# Get the name - try from the parameter object first
param_name = getattr(param_obj, "name", parameter_key) if isinstance(param_obj, Base) else parameter_key
param_dict["name"] = param_name
# Get the value
if isinstance(param_obj, Base) and hasattr(param_obj, "value"):
param_value = getattr(param_obj, "value")
param_dict["value"] = param_value
elif isinstance(param_obj, dict) and "value" in param_obj:
param_value = param_obj["value"]
param_dict["value"] = param_value
else:
# If we can't find a value, this might not be a parameter
continue
# Add any other useful metadata
param_dict["applicationInternalName"] = parameter_key
# Check based on mode (name or value)
if self.check_values:
# For value-based actions (like anonymization)
if isinstance(value_to_check, str) and self.action.check(value_to_check):
if isinstance(param_value, str) and self.action.check(param_value):
# Apply the action
self.action.apply(param_dict, current_object, parameters, parameter_key)
self.processed_objects.add(current_object.id)
else:
# For name-based actions (like removal)
if self.action.check(name_to_check):
if self.action.check(param_name):
# Apply the action
self.action.apply(param_dict, current_object, parameters, parameter_key)
self.processed_objects.add(current_object.id)
+4 -2
View File
@@ -1,4 +1,5 @@
"""Run integration tests with a speckle server."""
from speckle_automate import (
AutomationContext,
AutomationRunData,
@@ -12,6 +13,7 @@ from data_shield.function import FunctionInputs, SanitizationMode, automate_func
class TestFunction:
"""Test the automate function."""
def test_function_run(self, test_automation_run_data: AutomationRunData, test_automation_token: str) -> None:
"""Run an integration test for the automate function."""
automation_context = AutomationContext.initialize(test_automation_run_data, test_automation_token)
@@ -21,8 +23,8 @@ class TestFunction:
automate_function,
FunctionInputs(
sanitization_mode=SanitizationMode.ANONYMIZATION,
parameter_input="",
strict_mode=True,
parameter_input="SPECKLE",
strict_mode=False,
),
)