diff --git a/src/data_shield/actions.py b/src/data_shield/actions.py index b8e8940..86449dd 100644 --- a/src/data_shield/actions.py +++ b/src/data_shield/actions.py @@ -1,5 +1,4 @@ """Module for parameter actions and matching strategies.""" -import re from abc import ABC, abstractmethod from collections import defaultdict from typing import Any @@ -7,7 +6,7 @@ from typing import Any from speckle_automate import AutomationContext from specklepy.objects import Base -from data_shield.helpers import PatternChecker +from data_shield.helpers import EmailMatcher, PatternChecker class ParameterMatcher(ABC): @@ -112,24 +111,96 @@ class RemovalAction(ParameterAction): ) + + + +class AnonymizationAction(ParameterAction): + """Action to anonymize email addresses in parameter values.""" + + def __init__(self) -> None: + """Initialize the anonymization action with an email matcher.""" + super().__init__() + self.email_matcher = EmailMatcher() + # Count of anonymized parameters for reporting + self.anonymized_count = 0 + + def check(self, param_value: str) -> bool: + """Check if parameter value contains an email address. + + Args: + param_value: The parameter value to check + + Returns: + bool: True if the parameter value contains an email address, False otherwise + """ + return self.email_matcher.contains_email(param_value) + + def apply( + self, + parameter: dict[str, Any], + parent_object: Base, + containing_dict: dict[str, Any], + parameter_key: str + ) -> None: + """Anonymize email addresses in the parameter value. + + Args: + parameter: The parameter dictionary + parent_object: The parent Speckle object + containing_dict: The dictionary containing the parameter + parameter_key: The key of the parameter in the containing dictionary + """ + if "value" not in parameter or not isinstance(parameter["value"], str): + return + + param_name = parameter.get("name", parameter_key) + original_value = parameter["value"] + + # Anonymize email addresses in the parameter value + anonymized_value = self.email_matcher.anonymize_email(original_value) + + # Only track changes if something was actually anonymized + if anonymized_value != original_value: + # Update the parameter value in place + parameter["value"] = anonymized_value + + # Track affected object and parameter + self.affected_parameters[getattr(parent_object, "id", None)].append(param_name) + self.anonymized_count += 1 + + def report(self, automate_context: AutomationContext) -> None: + """Provide feedback based on the action's results. + + Args: + automate_context: The automation context + """ + if not self.affected_parameters: + return + + anonymized_params = set( + param for params in self.affected_parameters.values() for param in params + ) + + message = f"Email addresses were anonymized in {len(anonymized_params)} parameters" + + automate_context.attach_info_to_objects( + category="Anonymized_Parameters", + object_ids=list(self.affected_parameters.keys()), + message=message, + ) + # Factory functions to create specific actions with the right matcher def create_prefix_removal_action(forbidden_prefix: str, strict_mode: bool = False) -> RemovalAction: """Create a removal action that matches by prefix.""" matcher = PrefixMatcher(forbidden_prefix, strict_mode) return RemovalAction(matcher) - def create_pattern_removal_action(pattern: str, strict_mode: bool = False) -> RemovalAction: """Create a removal action that matches by pattern/regex.""" matcher = PatternMatcher(pattern, strict_mode) return RemovalAction(matcher) - -# Placeholder for future anonymization action -def create_anonymization_action() -> None: - """Create an action that anonymizes email addresses in parameter values. - - This is a placeholder for future implementation. - """ - # To be implemented - return None \ No newline at end of file +# Factory function to create anonymization action +def create_anonymization_action() -> AnonymizationAction: + """Create an action that anonymizes email addresses in parameter values.""" + return AnonymizationAction() diff --git a/src/data_shield/function.py b/src/data_shield/function.py index 31ebc22..7d05f3b 100644 --- a/src/data_shield/function.py +++ b/src/data_shield/function.py @@ -1,66 +1,17 @@ -"""Main Automate function for parameter sanitization.""" +"""Updated main Automate function for parameter sanitization.""" from speckle_automate import AutomationContext from specklepy.objects import Base -from data_shield.actions import ParameterAction, create_pattern_removal_action, create_prefix_removal_action +from data_shield.actions import ( + ParameterAction, + create_anonymization_action, + create_pattern_removal_action, + create_prefix_removal_action, +) from data_shield.inputs import FunctionInputs, SanitizationMode from data_shield.traversal import get_data_traversal_rules -class ParameterProcessor: - """Class to handle parameter processing with a removal action.""" - - def __init__(self, action: ParameterAction): - """Initialize the parameter processor with a removal action. - - Args: - action: The parameter action to apply - """ - self.action = action - self.processed_objects = set() - - def process_context(self, context): - """Process a traversal context to handle parameters and properties. - - Args: - context: The traversal context containing the current object - """ - current_object = context.current - - # Prioritise v3 - if hasattr(current_object, "properties") and current_object.properties is not None: - properties_dict = ( - current_object.properties.__dict__ - if isinstance(current_object.properties, Base) - else current_object.properties - ) - self.process_properties_dict(properties_dict, current_object) - - # Legacy placeholder for v2, ready for later - if hasattr(current_object, "parameters") and current_object.parameters is not None: - pass # Add v2 handling when ready - - def process_properties_dict(self, properties_dict, current_object): - """Recursively process v3-style properties dictionary to find and apply the action to parameters. - - Args: - properties_dict: The properties dictionary to process - current_object: The current object being processed - """ - for key, value in list(properties_dict.items()): # Safe iteration during mutation - if isinstance(value, dict) and "value" in value: - param_name = value.get("name", key) - - # Check if parameter matches our criteria - if self.action.check(param_name): - self.action.apply(value, current_object, properties_dict, key) - self.processed_objects.add(current_object.id) - - elif isinstance(value, dict): - # Recurse into nested dictionaries - self.process_properties_dict(value, current_object) - - def automate_function( automate_context: AutomationContext, function_inputs: FunctionInputs, @@ -73,6 +24,7 @@ def automate_function( """ # Create appropriate action based on sanitization mode action = None + check_values = False if function_inputs.sanitization_mode == SanitizationMode.PREFIX_MATCHING: if not function_inputs.parameter_input: @@ -93,17 +45,17 @@ def automate_function( ) elif function_inputs.sanitization_mode == SanitizationMode.ANONYMIZATION: - # Anonymization doesn't require a parameter input - # Add anonymization action here when implemented - automate_context.mark_run_failed("ANONYMIZATION mode not yet implemented.") - return + # Anonymization doesn't require a parameter input as it automatically detects emails + action = create_anonymization_action() + # For anonymization, we check values, not names + check_values = True if not action: automate_context.mark_run_failed("Failed to create a valid action.") return # Process the model with the selected action - processor = ParameterProcessor(action) + processor = ParameterProcessor(action, check_values) version_root_object = automate_context.receive_version() speckle_data = get_data_traversal_rules() @@ -144,4 +96,68 @@ def automate_function( automate_context.mark_run_success(f"Parameters processed successfully with shield function " f"{function_inputs.sanitization_mode}" - f"{' running in strict mode' if function_inputs.strict_mode else ''}.") \ No newline at end of file + f"{' running in strict mode' if function_inputs.strict_mode else ''}.") + + +# Modified ParameterProcessor class imported from processor_update.py +class ParameterProcessor: + """Class to handle parameter processing with various actions.""" + + def __init__(self, action: ParameterAction, check_values: bool = False): + """Initialize the parameter processor with an action. + + Args: + action: The parameter action to apply + check_values: If True, check parameter values instead of names + """ + self.action = action + self.check_values = check_values + self.processed_objects = set() + + def process_context(self, context): + """Process a traversal context to handle parameters and properties. + + Args: + context: The traversal context containing the current object + """ + current_object = context.current + + # Prioritise v3 + if hasattr(current_object, "properties") and current_object.properties is not None: + properties_dict = ( + current_object.properties.__dict__ + if isinstance(current_object.properties, Base) + else current_object.properties + ) + self.process_properties_dict(properties_dict, current_object) + + # Legacy placeholder for v2, ready for later + if hasattr(current_object, "parameters") and current_object.parameters is not None: + pass # Add v2 handling when ready + + def process_properties_dict(self, properties_dict, current_object): + """Recursively process v3-style properties dictionary to find and apply the action to parameters. + + Args: + properties_dict: The properties dictionary to process + current_object: The current object being processed + """ + for key, value in list(properties_dict.items()): # Safe iteration during mutation + if isinstance(value, dict) and "value" in value: + param_name = value.get("name", key) + + # Check based on mode (name or value) + if self.check_values: + # For value-based actions (like anonymization) + if self.action.check(value.get("value", "")): + self.action.apply(value, current_object, properties_dict, key) + self.processed_objects.add(current_object.id) + else: + # For name-based actions (like removal) + if self.action.check(param_name): + self.action.apply(value, current_object, properties_dict, key) + self.processed_objects.add(current_object.id) + + elif isinstance(value, dict): + # Recurse into nested dictionaries + self.process_properties_dict(value, current_object) \ No newline at end of file diff --git a/src/data_shield/helpers.py b/src/data_shield/helpers.py index 7910f9e..ff674e0 100644 --- a/src/data_shield/helpers.py +++ b/src/data_shield/helpers.py @@ -1,6 +1,7 @@ """Helper classes and functions for the parameter checker.""" import fnmatch import re +from re import Pattern class PatternChecker: @@ -41,4 +42,69 @@ class PatternChecker: if self.ignore_case: return fnmatch.fnmatch(param_name.lower(), self.pattern.lower()) else: - return fnmatch.fnmatchcase(param_name, self.pattern) \ No newline at end of file + return fnmatch.fnmatchcase(param_name, self.pattern) + +class EmailMatcher: + """Class for identifying and anonymizing email addresses in parameter values.""" + + # Email regex pattern - basic pattern to identify email addresses + EMAIL_PATTERN = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' + + def __init__(self): + """Initialize with a compiled regex pattern for email matching.""" + self.pattern: Pattern = re.compile(self.EMAIL_PATTERN) + + def contains_email(self, value: str) -> bool: + """Check if a string contains an email address. + + Args: + value: The string to check for email addresses + + Returns: + bool: True if the string contains an email address, False otherwise + """ + if not isinstance(value, str): + return False + + return bool(self.pattern.search(value)) + + def anonymize_email(self, value: str) -> str: + """Anonymize email addresses in a string. + + The function replaces the local part of each email address with the + first character followed by asterisks, preserving the domain part. + + Example: "email@example.com" becomes "e****@example.com" + + Args: + value: The string containing email addresses to anonymize + + Returns: + str: The string with anonymized email addresses + """ + if not isinstance(value, str): + return value + + def replace_email(match_obj): + """Replace function for regex sub to anonymize matched emails.""" + email = match_obj.group(0) + + # Split the email into local part and domain part + local, domain = email.split('@', 1) + + # Anonymize the local part: keep first and last character, replace rest with asterisks + if len(local) > 2: + # For longer local parts, keep first and last characters + anonymized_local = local[0] + '*' * (len(local) - 2) + local[-1] + elif len(local) == 2: + # For 2-character local parts, show first character and one asterisk + anonymized_local = local[0] + '*' + else: + # For 1-character local parts, just use an asterisk + anonymized_local = '*' + + # Return the anonymized email + return f"{anonymized_local}@{domain}" + + # Replace all email addresses in the string + return self.pattern.sub(replace_email, value) \ No newline at end of file diff --git a/tests/test_function.py b/tests/test_function.py index a3c81b0..ba98243 100644 --- a/tests/test_function.py +++ b/tests/test_function.py @@ -20,8 +20,8 @@ class TestFunction: automation_context, automate_function, FunctionInputs( - sanitization_mode=SanitizationMode.PATTERN_MATCHING, - parameter_input="/.*?peckl.*/i", + sanitization_mode=SanitizationMode.ANONYMIZATION, + parameter_input="", strict_mode=True, ), )