From 5c09e2235887f034b6c3b9fa6407088e5433995d Mon Sep 17 00:00:00 2001 From: Jonathon Broughton Date: Tue, 25 Mar 2025 01:49:29 +0000 Subject: [PATCH] Refactor anonymization logic and improve checks - Simplified email check method to ensure param_value is a string. - Streamlined apply method for better handling of parameter values. - Enhanced error handling when accessing parameters in Base objects. - Added debug counters for processed objects in the ParameterProcessor class. - Updated test cases to reflect changes in input parameters. --- src/data_shield/actions.py | 80 +++++++++++++---------------- src/data_shield/helpers.py | 100 +++++++++++++++++++++++++++---------- tests/test_function.py | 6 ++- 3 files changed, 114 insertions(+), 72 deletions(-) diff --git a/src/data_shield/actions.py b/src/data_shield/actions.py index 171928f..a8794b8 100644 --- a/src/data_shield/actions.py +++ b/src/data_shield/actions.py @@ -150,75 +150,65 @@ class AnonymizationAction(ParameterAction): """Initialize the anonymization action with an email matcher.""" super().__init__() self.email_matcher = EmailMatcher() - # Count of anonymized parameters for reporting self.anonymized_count = 0 def check(self, param_value: str) -> bool: - """Check if parameter value contains an email address. - - Args: - param_value: The parameter value to check - - Returns: - bool: True if the parameter value contains an email address, False otherwise - """ - return self.email_matcher.contains_email(param_value) + """Check if parameter value contains an email address.""" + return isinstance(param_value, str) and self.email_matcher.contains_email(param_value) def apply( self, parameter: dict[str, Any], parent_object: Base, containing_dict: dict[str, Any] | Base, parameter_key: str ) -> None: - """Anonymize email addresses in the parameter value. - - Args: - parameter: The parameter dictionary - parent_object: The parent Speckle object - containing_dict: The container (dict or Base object) holding the parameter - parameter_key: The key or attribute name of the parameter - """ - if "value" not in parameter or not isinstance(parameter["value"], str): - return - + """Anonymize email addresses in the parameter value.""" + # Get parameter name and object ID - same as RemovalAction param_name = parameter.get("name", parameter_key) - original_value = parameter["value"] object_id = getattr(parent_object, "id", None) - # Anonymize email addresses in the parameter value - anonymized_value = self.email_matcher.anonymize_email(original_value) + # Get the value to anonymize + param_value = None - # Only track changes if something was actually anonymized - if anonymized_value != original_value: - # Update the parameter value in place - parameter["value"] = anonymized_value + # For dictionary-style parameters + if isinstance(parameter, dict) and "value" in parameter: + param_value = parameter["value"] + if isinstance(param_value, str) and self.email_matcher.contains_email(param_value): + # Anonymize and update + anonymized_value = self.email_matcher.anonymize_email(param_value) + parameter["value"] = anonymized_value - # If we're dealing with a Base object parameter (like in Revit), - # update the actual value property of the parameter object - if isinstance(containing_dict, Base): + # Track affected parameters - EXACTLY like RemovalAction does + self.affected_parameters[object_id].append(param_name) + self.anonymized_count += 1 + + # For Base object parameters (like in Revit) + elif isinstance(containing_dict, Base): + try: + # Try to get the parameter object + param_obj = None try: - # Try to get the parameter object using __getitem__ first (Revit v2 style) param_obj = containing_dict.__getitem__(parameter_key) - if param_obj is not None and hasattr(param_obj, "value"): - setattr(param_obj, "value", anonymized_value) except (AttributeError, KeyError, TypeError): - # Fallback to standard attribute access param_obj = getattr(containing_dict, parameter_key, None) - if param_obj is not None and hasattr(param_obj, "value"): + + if param_obj and hasattr(param_obj, "value"): + param_value = getattr(param_obj, "value") + if isinstance(param_value, str) and self.email_matcher.contains_email(param_value): + # Anonymize and update + anonymized_value = self.email_matcher.anonymize_email(param_value) setattr(param_obj, "value", anonymized_value) - # Track affected object and parameter - self.affected_parameters[object_id].append(param_name) - self.anonymized_count += 1 + # Track affected parameters - EXACTLY like RemovalAction does + self.affected_parameters[object_id].append(param_name) + self.anonymized_count += 1 + except KeyError: + pass # Skip if any error occurs def report(self, automate_context: AutomationContext) -> None: - """Provide feedback based on the action's results. - - Args: - automate_context: The automation context - """ + """Provide feedback based on the action's results.""" if not self.affected_parameters: return + # Copy the exact pattern from RemovalAction for consistency anonymized_params = set(param for params in self.affected_parameters.values() for param in params) - message = f"Email addresses were anonymized in {len(anonymized_params)} parameters" automate_context.attach_info_to_objects( diff --git a/src/data_shield/helpers.py b/src/data_shield/helpers.py index 00b29bc..5c59848 100644 --- a/src/data_shield/helpers.py +++ b/src/data_shield/helpers.py @@ -5,7 +5,6 @@ from specklepy.objects import Base from data_shield.actions import ParameterAction -# Modified ParameterProcessor class imported from processor_update.py class ParameterProcessor: """Class to handle parameter processing with various actions.""" @@ -19,6 +18,9 @@ class ParameterProcessor: self.action = action self.check_values = check_values self.processed_objects = set() + # Debug counters + self.total_objects_processed = 0 + self.revit_params_processed = 0 def process_context(self, context): """Process a traversal context to handle parameters and properties. @@ -27,8 +29,9 @@ class ParameterProcessor: context: The traversal context containing the current object """ current_object = context.current + self.total_objects_processed += 1 - # Prioritise v3 + # First handle modern v3 properties if hasattr(current_object, "properties") and current_object.properties is not None: properties_dict = ( current_object.properties.__dict__ @@ -37,7 +40,7 @@ class ParameterProcessor: ) self.process_properties_dict(properties_dict, current_object) - # Handle v2 Revit parameters + # Then handle legacy v2 Revit parameters if hasattr(current_object, "parameters") and current_object.parameters is not None: self.process_revit_parameters(current_object) @@ -48,6 +51,9 @@ class ParameterProcessor: properties_dict: The properties dictionary to process current_object: The current object being processed """ + if not properties_dict: + return + for key, value in list(properties_dict.items()): # Safe iteration during mutation if isinstance(value, dict) and "value" in value: param_name = value.get("name", key) @@ -55,7 +61,8 @@ class ParameterProcessor: # Check based on mode (name or value) if self.check_values: # For value-based actions (like anonymization) - if self.action.check(value.get("value", "")): + param_value = value.get("value", "") + if self.action.check(param_value): self.action.apply(value, current_object, properties_dict, key) self.processed_objects.add(current_object.id) else: @@ -82,41 +89,84 @@ class ParameterProcessor: parameters = current_object.parameters - # Use get_dynamic_member_names() to get all parameter keys - for parameter_key in parameters.get_dynamic_member_names(): - # Get the parameter object using __getitem__ + # If parameters is a dictionary rather than a Base object, use it directly + if isinstance(parameters, dict): + self.process_properties_dict(parameters, current_object) + return + + # Get all parameter keys - handle different ways of storing parameters + param_keys = [] + + # Try get_dynamic_member_names() for Base objects + if hasattr(parameters, "get_dynamic_member_names"): + param_keys.extend(parameters.get_dynamic_member_names()) + + # Try __dict__ for standard attributes + if hasattr(parameters, "__dict__"): + param_keys.extend(k for k in parameters.__dict__.keys() if not k.startswith("_")) + + # Try dir() as a last resort + if not param_keys: + param_keys.extend(k for k in dir(parameters) if not k.startswith("_") and k != "get_dynamic_member_names") + + # Process each parameter + for parameter_key in param_keys: + # Track for debugging + self.revit_params_processed += 1 + + # Skip known non-parameter attributes + if parameter_key in ["speckle_type", "id", "totalChildrenCount"]: + continue + + # Get the parameter object using multiple methods + param_obj = None + param_value = None + + # Try __getitem__ first (common for Revit parameters) try: param_obj = parameters.__getitem__(f"{parameter_key}") - except KeyError: - continue - # Check if it's a Revit parameter - if ( - not isinstance(param_obj, Base) - or getattr(param_obj, "speckle_type", "") != "Objects.BuiltElements.Revit.Parameter" - ): + except (AttributeError, KeyError, TypeError): + try: + # Try direct attribute access + param_obj = getattr(parameters, parameter_key, None) + except KeyError: + continue + + # If we couldn't get the parameter, skip it + if param_obj is None: continue - # For name-based checks, we need to check both the name property and applicationInternalName - name_to_check = getattr(param_obj, "name", "") - value_to_check = getattr(param_obj, "value", "") + # Prepare a parameter dict with the info we have + param_dict = {} - # Create a parameter dict to pass to the action - param_dict = { - "name": name_to_check, - "value": value_to_check, - "applicationInternalName": parameter_key, - } + # Get the name - try from the parameter object first + param_name = getattr(param_obj, "name", parameter_key) if isinstance(param_obj, Base) else parameter_key + param_dict["name"] = param_name + + # Get the value + if isinstance(param_obj, Base) and hasattr(param_obj, "value"): + param_value = getattr(param_obj, "value") + param_dict["value"] = param_value + elif isinstance(param_obj, dict) and "value" in param_obj: + param_value = param_obj["value"] + param_dict["value"] = param_value + else: + # If we can't find a value, this might not be a parameter + continue + + # Add any other useful metadata + param_dict["applicationInternalName"] = parameter_key # Check based on mode (name or value) if self.check_values: # For value-based actions (like anonymization) - if isinstance(value_to_check, str) and self.action.check(value_to_check): + if isinstance(param_value, str) and self.action.check(param_value): # Apply the action self.action.apply(param_dict, current_object, parameters, parameter_key) self.processed_objects.add(current_object.id) else: # For name-based actions (like removal) - if self.action.check(name_to_check): + if self.action.check(param_name): # Apply the action self.action.apply(param_dict, current_object, parameters, parameter_key) self.processed_objects.add(current_object.id) diff --git a/tests/test_function.py b/tests/test_function.py index ba98243..c7d3df8 100644 --- a/tests/test_function.py +++ b/tests/test_function.py @@ -1,4 +1,5 @@ """Run integration tests with a speckle server.""" + from speckle_automate import ( AutomationContext, AutomationRunData, @@ -12,6 +13,7 @@ from data_shield.function import FunctionInputs, SanitizationMode, automate_func class TestFunction: """Test the automate function.""" + def test_function_run(self, test_automation_run_data: AutomationRunData, test_automation_token: str) -> None: """Run an integration test for the automate function.""" automation_context = AutomationContext.initialize(test_automation_run_data, test_automation_token) @@ -21,8 +23,8 @@ class TestFunction: automate_function, FunctionInputs( sanitization_mode=SanitizationMode.ANONYMIZATION, - parameter_input="", - strict_mode=True, + parameter_input="SPECKLE", + strict_mode=False, ), )