diff --git a/Geometry/clash.py b/Geometry/clash.py index 86dc368..78ef6ea 100644 --- a/Geometry/clash.py +++ b/Geometry/clash.py @@ -1,3 +1,4 @@ +from collections import defaultdict from concurrent.futures import ProcessPoolExecutor, as_completed from typing import List, Tuple, Any, Optional @@ -63,7 +64,7 @@ def detect_clashes_old( def check_for_clash( ref_element: Element, latest_element: Element -) -> Optional[tuple[Any, Any, Any]]: +) -> Optional[tuple[Any, Any]]: """ Check for a clash between two elements and calculate the severity of the clash. @@ -86,16 +87,14 @@ def check_for_clash( intersection = pymesh.boolean(latest_pymesh, ref_pymesh, operation="intersection") if intersection and intersection.volume > 0: - severity = intersection.volume / min( - ref_pymesh.volume, latest_pymesh.volume - ) - return ref_element.id, latest_element.id, severity + + return ref_element.id, latest_element.id return None def detect_clashes( reference_elements: List[Element], latest_elements: List[Element], _tolerance: float -) -> List[Tuple[str, str, float]]: +) -> List[Tuple[str, str]]: """ Detect clashes between two sets of mesh elements using parallel processing. @@ -127,30 +126,25 @@ def detect_and_report_clashes( latest_elements: list[Element], tolerance: float, automate_context: AutomationContext, -) -> list[tuple[str, str, float]]: - print(f"{len(reference_elements[0].meshes)} reference meshes") - print(f"{len(latest_elements[0].meshes)} latest meshes") +) -> list[tuple[str, str]]: clashes = detect_clashes(reference_elements, latest_elements, tolerance) - total_clashes = len(clashes) - padding_length = len(str(total_clashes)) + grouped_clashes = defaultdict(list) - for i, (ref_id, latest_id, severity) in enumerate(clashes, start=1): - clash_number = str(i).zfill(padding_length) - combined_message = f"Clash {clash_number}: between {ref_id} and {latest_id} with severity {severity:.2f}" - object_ids = [ref_id, latest_id] + for ref, latest in clashes: + if not latest: + continue + grouped_clashes[ref].append(latest) - # Assuming severity levels: Low (<0.25), Medium (0.25-0.75), High (>0.75) TODO: Determine severity levels - if severity > 0.75: - category = "High" - elif severity > 0.25: - category = "Medium" - else: - category = "Low" + for group_number, clashing_objects in enumerate(grouped_clashes.items(), start=1): + + ref_id, latest_elements = clashing_objects + + all_clashing_objects = [ref_id] + [element_id for element_id in latest_elements] automate_context.attach_error_to_objects( - category=category, object_ids=object_ids, message=combined_message + category="Clash", object_ids=all_clashing_objects, message=str(group_number) ) return clashes diff --git a/main.py b/main.py index 7fe9a2b..d73b96c 100644 --- a/main.py +++ b/main.py @@ -2,6 +2,7 @@ use the automation_context module to wrap your function in an Automate context helper """ +from collections import defaultdict from typing import Optional from pydantic import Field @@ -132,17 +133,23 @@ def automate_function( tolerance = function_inputs.tolerance + if len(reference_mesh_elements) == 0 or len(latest_mesh_elements) == 0: + automate_context.mark_run_failed( + status_message="Clash detection failed. No objects to compare." + ) + return + clashes = detect_and_report_clashes( reference_mesh_elements, latest_mesh_elements, tolerance, automate_context ) - + percentage_reference_objects_clashing = ( - len(set([ref_id for ref_id, latest_id, severity in clashes])) + len(set([ref_id for ref_id, latest_id in clashes])) / len(reference_mesh_elements) * 100 ) percentage_latest_objects_clashing = ( - len(set([latest_id for ref_id, latest_id, severity in clashes])) + len(set([latest_id for ref_id, latest_id in clashes])) / len(latest_mesh_elements) * 100 ) @@ -151,11 +158,6 @@ def automate_function( all_objects_count = len(reference_mesh_elements) + len(latest_mesh_elements) all_clashes_count = len(clashes) - print(f"Clash detection report: {all_clashes_count} clashes found between {all_objects_count} objects.") - - print(f"Reference objects: {len([x for x in reference_objects])}.") - print(f"Latest objects: {len([x for x in latest_objects])}.") - clash_report_message = ( f"Clash detection report: {all_clashes_count} clashes found " f"between {all_objects_count} objects. " diff --git a/tests/test_main.py b/tests/test_main.py index d86cdc8..ce33ee6 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -97,8 +97,8 @@ def test_object() -> Base: # fixture to mock the AutomationRunData that would be generated by a full Automation Run def fake_automation_run_data(request, test_client: SpeckleClient) -> AutomationRunData: server_url = request.config.SPECKLE_SERVER_URL - project_id = "4f064f09e6" - model_id = "5a16cf52af" + project_id = "7d8e96669a" + model_id = "efeb71387b" function_name = "Clash Test" @@ -119,7 +119,7 @@ def fake_automation_run_data(request, test_client: SpeckleClient) -> AutomationR project_id=project_id, model_id=model_id, branch_name="main", - version_id="861bbab860", + version_id="2eb06c1034", speckle_server_url=server_url, # These ids would be available with a valid registered Automation definition. automation_id=automation_id, @@ -142,7 +142,7 @@ def test_function_run(fake_automation_run_data: AutomationRunData, speckle_token context, automate_function, FunctionInputs( - tolerance=0.1, tolerance_unit="mm", static_model_name="structures from revit" + tolerance=0.1, tolerance_unit="mm", static_model_name="simple beams" ), )