Enhance rule evaluation with severity handling
- Added optional parameters for rule and case numbers in condition evaluation. - Introduced an Enum for severity levels to standardise reporting. - Created a function to convert string severities into the SeverityLevel enum, handling various input formats. - Refactored metadata generation to use the new severity function.
This commit is contained in:
+74
-20
@@ -1,3 +1,4 @@
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
import pandas as pd
|
||||
@@ -10,7 +11,9 @@ from src.predicates import PREDICATE_METHOD_MAP
|
||||
from src.rules import PropertyRules
|
||||
|
||||
|
||||
def evaluate_condition(speckle_object: Base, condition: pd.Series) -> bool:
|
||||
def evaluate_condition(
|
||||
speckle_object: Base, condition: pd.Series, rule_number: str | None = None, case_number: int | None = None
|
||||
) -> bool:
|
||||
"""Given a Speckle object and a condition, evaluates the condition and returns a boolean value.
|
||||
|
||||
A condition is a pandas Series object with the following keys:
|
||||
@@ -19,6 +22,8 @@ def evaluate_condition(speckle_object: Base, condition: pd.Series) -> bool:
|
||||
- 'Value': The value to compare against.
|
||||
|
||||
Args:
|
||||
rule_number (string): For information the rule number.
|
||||
case_number (int): For information the rule clause number.
|
||||
speckle_object (Base): The Speckle object to evaluate.
|
||||
condition (pd.Series): The condition to evaluate.
|
||||
|
||||
@@ -29,6 +34,9 @@ def evaluate_condition(speckle_object: Base, condition: pd.Series) -> bool:
|
||||
predicate_key = condition["Predicate"]
|
||||
value = condition["Value"]
|
||||
|
||||
_ = rule_number
|
||||
_ = case_number
|
||||
|
||||
if predicate_key in PREDICATE_METHOD_MAP:
|
||||
method_name = PREDICATE_METHOD_MAP[predicate_key]
|
||||
method = getattr(PropertyRules, method_name, None)
|
||||
@@ -60,30 +68,27 @@ def process_rule(
|
||||
|
||||
# get the last row of the rule_group and get the Message and Report Severity
|
||||
rule_info = rule_group.iloc[-1]
|
||||
rule_number = rule_info["Rule Number"]
|
||||
|
||||
# Filter objects based on the 'WHERE' condition
|
||||
filtered_objects = [
|
||||
speckle_object for speckle_object in speckle_objects if evaluate_condition(speckle_object, filter_condition)
|
||||
]
|
||||
|
||||
rule_number = rule_info["Rule Number"]
|
||||
|
||||
# speckle_print(
|
||||
# f"{filter_condition['Logic']} {filter_condition['Property Name']} "
|
||||
# f"{filter_condition['Predicate']} {filter_condition['Value']}"
|
||||
# )
|
||||
|
||||
speckle_print(f"{rule_number}: {len(list(filtered_objects))} objects passed the filter.")
|
||||
if not filtered_objects or len(list(filtered_objects)) == 0:
|
||||
return [], []
|
||||
|
||||
# Initialize lists for passed and failed objects
|
||||
pass_objects, fail_objects = [], []
|
||||
|
||||
# Evaluate each filtered object against the 'AND' conditions
|
||||
for speckle_object in filtered_objects:
|
||||
# if filtered_objects is empty
|
||||
if len(list(filtered_objects)) == 0:
|
||||
return [], []
|
||||
elif all(evaluate_condition(speckle_object, cond) for _, cond in subsequent_conditions.iterrows()):
|
||||
if all(
|
||||
evaluate_condition(
|
||||
speckle_object=speckle_object, condition=condition, rule_number=rule_number, case_number=index
|
||||
)
|
||||
for index, condition in subsequent_conditions.iterrows()
|
||||
):
|
||||
pass_objects.append(speckle_object)
|
||||
else:
|
||||
fail_objects.append(speckle_object)
|
||||
@@ -136,6 +141,60 @@ def apply_rules_to_objects(
|
||||
return grouped_results
|
||||
|
||||
|
||||
class SeverityLevel(Enum):
|
||||
"""Enum for severity levels."""
|
||||
|
||||
INFO = "Info"
|
||||
WARNING = "Warning"
|
||||
ERROR = "Error"
|
||||
|
||||
|
||||
def get_severity(rule_info: pd.Series) -> SeverityLevel:
|
||||
"""Convert a string severity level to the corresponding SeverityLevel enum.
|
||||
|
||||
This function normalizes input strings (because processing user entered dead is hard), handling:
|
||||
- Case insensitivity (e.g., "info", "WARNING" → "Info", "Warning")
|
||||
- Shorthand mappings (e.g., "WARN" → "Warning")
|
||||
- Stripping whitespace
|
||||
- Defaults to SeverityLevel.ERROR if the input is invalid
|
||||
"""
|
||||
severity = rule_info.get("Report Severity") # Extract severity from input data
|
||||
|
||||
# If severity is None or not a string (e.g., numeric input), default to ERROR
|
||||
if not isinstance(severity, str):
|
||||
return SeverityLevel.ERROR
|
||||
|
||||
severity = severity.strip().upper() # Remove leading/trailing spaces & normalize case
|
||||
|
||||
# Define a mapping for shorthand or alternate spellings
|
||||
alias_map = {
|
||||
"WARN": "WARNING", # Treat "WARN" as "WARNING"
|
||||
}
|
||||
|
||||
# Replace shorthand values if applicable
|
||||
severity = alias_map.get(severity, severity)
|
||||
|
||||
# Attempt to match with an existing SeverityLevel enum value (case-insensitive)
|
||||
return next(
|
||||
(level for level in SeverityLevel if level.value.upper() == severity),
|
||||
SeverityLevel.ERROR, # Default to ERROR if no match is found
|
||||
)
|
||||
|
||||
|
||||
def get_metadata(
|
||||
rule_id: str, rule_info: pd.Series, passed: bool, speckle_objects: list[Base]
|
||||
) -> dict[str, str | int | Any]:
|
||||
"""Function that generates metadata with severity validation."""
|
||||
metadata = {
|
||||
"rule_id": rule_id,
|
||||
"status": "PASS" if passed else "FAIL",
|
||||
"severity": get_severity(rule_info).value, # Keep proper casing
|
||||
"rule_message": rule_info["Message"],
|
||||
"object_count": len(speckle_objects),
|
||||
}
|
||||
return metadata
|
||||
|
||||
|
||||
def attach_results(
|
||||
speckle_objects: list[Base],
|
||||
rule_info: pd.Series,
|
||||
@@ -156,13 +215,8 @@ def attach_results(
|
||||
return
|
||||
|
||||
# Create structured metadata for onward data analysis uses
|
||||
metadata = {
|
||||
"rule_id": rule_id,
|
||||
"status": "PASS" if passed else "FAIL",
|
||||
"severity": rule_info["Report Severity"],
|
||||
"rule_message": rule_info["Message"],
|
||||
"object_count": len(speckle_objects),
|
||||
}
|
||||
|
||||
metadata = get_metadata(rule_id, rule_info, passed, speckle_objects)
|
||||
|
||||
message = f"{rule_info['Message']}"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user