Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 83313db5e7 | |||
| 7517682456 | |||
| 46b6c0d68c | |||
| 62d5e67bab | |||
| dee3ff2b63 | |||
| 9f1aa11551 | |||
| b538e6d8d3 | |||
| 81365c0e5a | |||
| 1e8dc4dfe5 | |||
| 03f6673cc0 | |||
| d09e5c7133 |
@@ -11,7 +11,7 @@ jobs:
|
||||
FUNCTION_SCHEMA_FILE_NAME: functionSchema.json
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4.1.7
|
||||
- uses: actions/checkout@v4.2.2
|
||||
- uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.11'
|
||||
@@ -27,7 +27,7 @@ jobs:
|
||||
- name: Extract functionInputSchema
|
||||
id: extract_schema
|
||||
run: |
|
||||
python main.py generate_schema ${HOME}/${{ env.FUNCTION_SCHEMA_FILE_NAME }}
|
||||
python run.py generate_schema ${HOME}/${{ env.FUNCTION_SCHEMA_FILE_NAME }}
|
||||
- name: Speckle Automate Function - Build and Publish
|
||||
uses: specklesystems/speckle-automate-github-composite-action@0.8.1
|
||||
with:
|
||||
@@ -35,6 +35,6 @@ jobs:
|
||||
speckle_token: ${{ secrets.SPECKLE_FUNCTION_TOKEN }}
|
||||
speckle_function_id: ${{ secrets.SPECKLE_FUNCTION_ID }}
|
||||
speckle_function_input_schema_file_path: ${{ env.FUNCTION_SCHEMA_FILE_NAME }}
|
||||
speckle_function_command: 'python -u main.py run'
|
||||
speckle_function_command: 'python -u run.py run'
|
||||
speckle_function_recommended_cpu_m: 4000
|
||||
speckle_function_recommended_memory_mi: 4000
|
||||
|
||||
@@ -1,130 +0,0 @@
|
||||
from typing import List
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
import seaborn as sns
|
||||
from matplotlib.ticker import ScalarFormatter
|
||||
|
||||
sns.set_style("whitegrid")
|
||||
sns.set_context("talk")
|
||||
|
||||
|
||||
class Plotting:
|
||||
"""Class containing methods to plot various data distributions."""
|
||||
|
||||
@staticmethod
|
||||
def plot_density_distribution(densities: List[float], threshold: float) -> None:
|
||||
"""Plot density distribution with a given threshold.
|
||||
|
||||
Args:
|
||||
densities (List[float]): List of densities.
|
||||
threshold (float): Value to differentiate high and low densities.
|
||||
"""
|
||||
plt.figure(figsize=(12, 7))
|
||||
bin_edges = np.linspace(min(densities), max(densities), 51)
|
||||
|
||||
# Plot densities below the threshold in blue
|
||||
sns.histplot(
|
||||
[d for d in densities if d <= threshold],
|
||||
bins=bin_edges,
|
||||
color="green",
|
||||
alpha=0.75,
|
||||
label="Densities <= threshold",
|
||||
)
|
||||
|
||||
# Plot densities above the threshold in red
|
||||
sns.histplot(
|
||||
[d for d in densities if d > threshold],
|
||||
bins=bin_edges,
|
||||
color="red",
|
||||
alpha=0.75,
|
||||
label="Densities > threshold",
|
||||
)
|
||||
|
||||
plt.axvline(x=threshold, color="grey", linestyle="--")
|
||||
plt.xlabel("Density (~vertices/m2)")
|
||||
plt.ylabel("Count")
|
||||
plt.title("Density Distribution")
|
||||
plt.legend()
|
||||
|
||||
# Format the x-axis to avoid scientific notation
|
||||
ax = plt.gca() # Get current axis
|
||||
ax.xaxis.set_major_formatter(ScalarFormatter(useMathText=False))
|
||||
ax.ticklabel_format(style="plain", axis="x")
|
||||
# plt.show()
|
||||
|
||||
@staticmethod
|
||||
def plot_area_density_correlation(
|
||||
areas: List[float], densities: List[float], threshold: float
|
||||
) -> None:
|
||||
"""Plot correlation between area and density with a given threshold.
|
||||
|
||||
Args:
|
||||
areas (List[float]): List of areas.
|
||||
densities (List[float]): List of densities.
|
||||
threshold (float): Value to differentiate high and low densities.
|
||||
"""
|
||||
plt.figure(figsize=(12, 7))
|
||||
|
||||
mask_below_threshold = np.array(densities) <= threshold
|
||||
mask_above_threshold = ~mask_below_threshold
|
||||
|
||||
# Plot points below the threshold in blue
|
||||
sns.scatterplot(
|
||||
x=np.array(areas)[mask_below_threshold],
|
||||
y=np.array(densities)[mask_below_threshold],
|
||||
color="green",
|
||||
label=f"Densities <= {threshold}",
|
||||
edgecolor="w",
|
||||
)
|
||||
|
||||
# Plot points above the threshold in red
|
||||
sns.scatterplot(
|
||||
x=np.array(areas)[mask_above_threshold],
|
||||
y=np.array(densities)[mask_above_threshold],
|
||||
color="red",
|
||||
label=f"Densities > {threshold}",
|
||||
edgecolor="w",
|
||||
)
|
||||
|
||||
plt.axhline(y=threshold, color="grey", linestyle="--")
|
||||
plt.title("Correlation between Area and Density")
|
||||
plt.xlabel("Area")
|
||||
plt.ylabel("Density (~vertices/m2)")
|
||||
plt.legend(title="Density", loc="upper right")
|
||||
|
||||
# Format the y-axis to avoid scientific notation
|
||||
ax = plt.gca() # Get current axis
|
||||
ax.yaxis.set_major_formatter(ScalarFormatter(useMathText=False))
|
||||
ax.ticklabel_format(style="plain", axis="x")
|
||||
# plt.show()
|
||||
|
||||
@staticmethod
|
||||
def plot_size_distribution(sizes: List[float]) -> None:
|
||||
"""Plot distribution of sizes.
|
||||
|
||||
Args:
|
||||
sizes (List[float]): List of sizes.
|
||||
"""
|
||||
plt.figure()
|
||||
plt.hist(sizes, bins=10, alpha=0.75)
|
||||
plt.xlabel("Size")
|
||||
plt.ylabel("Count")
|
||||
plt.title("Size Distribution")
|
||||
plt.grid(True)
|
||||
# plt.show()
|
||||
|
||||
@staticmethod
|
||||
def plot_area_distribution(areas: List[float]) -> None:
|
||||
"""Plot distribution of areas.
|
||||
|
||||
Args:
|
||||
areas (List[float]): List of areas.
|
||||
"""
|
||||
plt.figure()
|
||||
plt.hist(areas, bins=10, alpha=0.75)
|
||||
plt.xlabel("Area")
|
||||
plt.ylabel("Count")
|
||||
plt.title("Area Distribution")
|
||||
plt.grid(True)
|
||||
# plt.show()
|
||||
@@ -1,262 +0,0 @@
|
||||
import io
|
||||
import os
|
||||
import tempfile
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import IO, Any, Dict, List, Union
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
from PIL import Image as PILImage
|
||||
from reportlab.lib.colors import green, red
|
||||
from reportlab.lib.pagesizes import A4, portrait
|
||||
from reportlab.lib.styles import getSampleStyleSheet
|
||||
from reportlab.lib.units import inch
|
||||
from reportlab.platypus import (
|
||||
Image,
|
||||
PageBreak,
|
||||
Paragraph,
|
||||
SimpleDocTemplate,
|
||||
Spacer,
|
||||
Table,
|
||||
TableStyle,
|
||||
)
|
||||
|
||||
from Objects.objects import HealthObject
|
||||
from Utilities.plotting import Plotting
|
||||
|
||||
|
||||
class Report:
|
||||
@staticmethod
|
||||
def generate_pdf(
|
||||
all_densities: List[float],
|
||||
all_areas: List[float],
|
||||
data: List[List[Union[str, float, int]]],
|
||||
threshold: float,
|
||||
summary_data=None,
|
||||
) -> IO[bytes]:
|
||||
"""
|
||||
Generate a PDF report summarizing the density data and return as a BytesIO object.
|
||||
|
||||
Args:
|
||||
all_densities (List[float]): List of all density values.
|
||||
all_areas (List[float]): List of all area values.
|
||||
data (List[List[Union[str, float, int]]]): Data to be tabulated in the PDF.
|
||||
threshold (float): The threshold for density.
|
||||
summary_data: Data to be tabulated in the summary table.
|
||||
|
||||
Returns:
|
||||
IO[bytes]: BytesIO object containing the PDF data.
|
||||
"""
|
||||
# Create a buffer to store the PDF
|
||||
pdf_buffer = io.BytesIO()
|
||||
|
||||
# Create a buffer to store the plots
|
||||
plot_buffer_density = io.BytesIO()
|
||||
plot_buffer_correlation = io.BytesIO()
|
||||
|
||||
# Determine the available width for the image, considering 1-inch margins on both sides
|
||||
available_width = (
|
||||
A4[0] - 2 * 72
|
||||
) # A4[0] gives the width of the A4 page in points
|
||||
|
||||
# Plot density distribution and save to buffer
|
||||
plt.figure(figsize=(12, 7))
|
||||
Plotting.plot_density_distribution(all_densities, threshold)
|
||||
plt.savefig(plot_buffer_density, format="png")
|
||||
plot_buffer_density.seek(0)
|
||||
|
||||
# Plot area-density correlation and save to buffer
|
||||
plt.figure(figsize=(12, 7))
|
||||
Plotting.plot_area_density_correlation(all_areas, all_densities, threshold)
|
||||
plt.savefig(plot_buffer_correlation, format="png")
|
||||
plot_buffer_correlation.seek(0)
|
||||
|
||||
# Initialize PDF document
|
||||
doc = SimpleDocTemplate(
|
||||
pdf_buffer,
|
||||
pagesize=portrait(A4),
|
||||
rightMargin=72,
|
||||
leftMargin=72,
|
||||
topMargin=72,
|
||||
bottomMargin=128,
|
||||
)
|
||||
story = []
|
||||
styles = getSampleStyleSheet()
|
||||
|
||||
story.append(Paragraph("Model Health", styles["Title"]))
|
||||
|
||||
# Introduction paragraph
|
||||
intro_paragraph = (
|
||||
"The performance and health of a digital model in the AEC "
|
||||
"(Architecture, Engineering, and Construction) domain can be significantly "
|
||||
"impacted by the complexity and density of the mesh objects within it. Heavy "
|
||||
"mesh objects, characterized by a high density of vertices and polygons, often "
|
||||
"result in slower rendering times, increased computational resource consumption, "
|
||||
"and potential crashes or lags in visualization tools. Such objects can be a "
|
||||
"primary contributor to poor model health and can degrade the user experience, "
|
||||
"especially in real-time rendering or simulation scenarios. It's important to note "
|
||||
"that the absolute value of the density, while indicative of object complexity, "
|
||||
"is without meaningful units (~vertices/m2) in and of itself and should be interpreted in the "
|
||||
"context of the model and its intended use. This report analyzes the densities of "
|
||||
"various mesh objects within the model to identify potential performance "
|
||||
"bottlenecks and provide actionable insights for optimization."
|
||||
)
|
||||
|
||||
story.append(Spacer(1, 0.25 * inch))
|
||||
story.append(Paragraph(intro_paragraph, styles["Normal"]))
|
||||
story.append(Spacer(1, 0.25 * inch))
|
||||
|
||||
result_color = green
|
||||
|
||||
# Summary Table
|
||||
if summary_data is not None:
|
||||
summary_table = Table(summary_data["table_data"])
|
||||
|
||||
# if summary_data['result'] contains "Fail", set the result color to red
|
||||
if "Fail" in summary_data["result"]:
|
||||
result_color = red
|
||||
|
||||
summary_table.setStyle(
|
||||
TableStyle(
|
||||
[
|
||||
(
|
||||
"TEXTCOLOR",
|
||||
(1, 6),
|
||||
(1, 7),
|
||||
result_color,
|
||||
) # Targeting only the "Result" cell
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
story.append(summary_table)
|
||||
story.append(Spacer(1, 0.25 * inch))
|
||||
|
||||
# Append elements to the story (PDF content)
|
||||
story.append(Paragraph("Density Summary", styles["Heading2"]))
|
||||
story.append(Spacer(1, 0.25 * inch))
|
||||
|
||||
# Add data table
|
||||
table = Table(data)
|
||||
table.setStyle(
|
||||
TableStyle(
|
||||
[
|
||||
("BACKGROUND", (0, 0), (-1, 0), "#eeeeee"),
|
||||
("TEXTCOLOR", (0, 0), (-1, 0), "#333333"),
|
||||
("ALIGN", (0, 0), (-1, -1), "CENTER"),
|
||||
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
|
||||
("FONTSIZE", (0, 0), (-1, 0), 14),
|
||||
("BOTTOMPADDING", (0, 0), (-1, 0), 12),
|
||||
("BACKGROUND", (0, 1), (-1, -1), "#f3f3f3"),
|
||||
("GRID", (0, 0), (-1, -1), 1, "#aaaaaa"),
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
story.append(table)
|
||||
story.append(Spacer(1, 0.25 * inch))
|
||||
story.append(PageBreak()) # Insert a page break
|
||||
|
||||
# For the density distribution plot:
|
||||
story.append(Paragraph("Density Distribution Plot:", styles["Heading2"]))
|
||||
plot_buffer_density.seek(0)
|
||||
width, height = Report.get_resized_dimensions(
|
||||
plot_buffer_density,
|
||||
target_width=available_width,
|
||||
max_height=available_width,
|
||||
)
|
||||
img_density = Image(plot_buffer_density, width=width, height=height)
|
||||
img_density.hAlign = "CENTER"
|
||||
story.append(img_density)
|
||||
story.append(Spacer(1, 0.25 * inch))
|
||||
|
||||
# For the area-density correlation plot:
|
||||
story.append(
|
||||
Paragraph("Correlation between Area and Density:", styles["Heading2"])
|
||||
)
|
||||
plot_buffer_correlation.seek(0)
|
||||
width, height = Report.get_resized_dimensions(
|
||||
plot_buffer_correlation,
|
||||
target_width=available_width,
|
||||
max_height=available_width,
|
||||
)
|
||||
img_correlation = Image(plot_buffer_correlation, width=width, height=height)
|
||||
img_correlation.hAlign = "CENTER"
|
||||
story.append(img_correlation)
|
||||
|
||||
# Build the PDF document
|
||||
doc.build(story)
|
||||
|
||||
# Reset the buffer position to the beginning
|
||||
pdf_buffer.seek(0)
|
||||
|
||||
return pdf_buffer
|
||||
|
||||
@staticmethod
|
||||
def get_resized_dimensions(buffer, target_width, max_height):
|
||||
"""Get resized width and height for an image while maintaining aspect ratio."""
|
||||
with PILImage.open(buffer) as img:
|
||||
original_width, original_height = img.size
|
||||
aspect_ratio = original_height / original_width
|
||||
new_height = target_width * aspect_ratio
|
||||
if new_height > max_height:
|
||||
new_height = max_height
|
||||
new_width = new_height / aspect_ratio
|
||||
else:
|
||||
new_width = target_width
|
||||
return new_width, new_height
|
||||
|
||||
@staticmethod
|
||||
def generate_summary(
|
||||
threshold: float,
|
||||
pass_rate_percentage: float,
|
||||
health_objects: Dict[str, HealthObject],
|
||||
commit_details: Dict[str, str],
|
||||
) -> Dict[str, Any]:
|
||||
# Calculate the number of objects above the threshold
|
||||
above_threshold_count = sum(
|
||||
1 for ho in health_objects.values() if ho.aggregate_density > threshold
|
||||
)
|
||||
|
||||
# Calculate the percentage of objects above the threshold
|
||||
above_threshold_percentage = above_threshold_count / len(health_objects)
|
||||
|
||||
# Determine if the result is a pass or fail
|
||||
result_state = (
|
||||
"Pass" if above_threshold_percentage <= pass_rate_percentage else "Fail"
|
||||
)
|
||||
result = f"{result_state} ({above_threshold_percentage * 100:.2f}%)"
|
||||
|
||||
# Create the summary table
|
||||
data = {
|
||||
"table": [
|
||||
["Metric", "Value"],
|
||||
["Server URL", commit_details["server_url"]],
|
||||
["Project ID", commit_details["stream_id"]],
|
||||
["Version ID", commit_details["commit_id"]],
|
||||
["Threshold", threshold],
|
||||
["Pass Rate Percentage", f"{pass_rate_percentage * 100}%"],
|
||||
["Assessment Result", result],
|
||||
],
|
||||
"values": {
|
||||
"pass_rate": pass_rate_percentage,
|
||||
"result": result_state,
|
||||
"fail_count": above_threshold_count,
|
||||
},
|
||||
}
|
||||
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def write_pdf_to_temp(report: IO[bytes]) -> str:
|
||||
temp_file = Path(
|
||||
tempfile.gettempdir(),
|
||||
f"automate_tiles_{datetime.now().timestamp():.0f}",
|
||||
"report.pdf",
|
||||
)
|
||||
temp_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
report.seek(0)
|
||||
temp_file.write_bytes(report.read())
|
||||
|
||||
return str(temp_file)
|
||||
@@ -1,100 +0,0 @@
|
||||
from typing import List, TypeVar, Iterable, Optional, Tuple
|
||||
|
||||
from specklepy.objects.base import Base
|
||||
import sys
|
||||
|
||||
from Utilities.flatten import extract_base_and_transform
|
||||
|
||||
T = TypeVar("T", bound=Base)
|
||||
|
||||
|
||||
class Utilities:
|
||||
@staticmethod
|
||||
def is_displayable_object(speckle_object: Base) -> bool:
|
||||
"""
|
||||
Determines if a given Speckle object is displayable.
|
||||
|
||||
This function checks if the speckle_object has a display value
|
||||
and returns True if it does, otherwise it returns False.
|
||||
|
||||
Args:
|
||||
speckle_object (Base): The Speckle object to check.
|
||||
|
||||
Returns:
|
||||
bool: True if the object has a display value, False otherwise.
|
||||
"""
|
||||
return Utilities.try_get_display_value(speckle_object) is not None
|
||||
|
||||
@staticmethod
|
||||
def try_get_display_value(speckle_object: Base) -> Optional[List[T]]:
|
||||
"""Try fetching the display value from a Speckle object.
|
||||
|
||||
Args:
|
||||
speckle_object (Base): The Speckle object to extract the display value from.
|
||||
|
||||
Returns:
|
||||
Optional[List[T]]: A list containing the display values. If no display value is found,
|
||||
returns None.
|
||||
"""
|
||||
raw_display_value = getattr(speckle_object, "displayValue", None) or getattr(
|
||||
speckle_object, "@displayValue", None
|
||||
)
|
||||
|
||||
if raw_display_value is None:
|
||||
return None
|
||||
|
||||
if isinstance(raw_display_value, Iterable):
|
||||
display_values = list(
|
||||
filter(lambda x: isinstance(x, Base), raw_display_value)
|
||||
)
|
||||
return display_values if display_values else None
|
||||
|
||||
@staticmethod
|
||||
def get_byte_size(speckle_object: Base) -> int:
|
||||
"""Calculate the total byte size of the display values of a Speckle object.
|
||||
Keeps drilling down until it gets to vertices, or it returns 0 if it can't find any.
|
||||
|
||||
Args:
|
||||
speckle_object (Base): The Speckle object for which to compute the byte size.
|
||||
|
||||
Returns:
|
||||
int: The total byte size of all display values that have vertices.
|
||||
"""
|
||||
if speckle_object is None:
|
||||
return 0
|
||||
|
||||
display_values = Utilities.try_get_display_value(speckle_object)
|
||||
|
||||
if display_values is None:
|
||||
display_values = speckle_object
|
||||
|
||||
if isinstance(display_values, Iterable):
|
||||
return sum(
|
||||
[sys.getsizeof(display_value) for display_value in display_values]
|
||||
)
|
||||
|
||||
if not hasattr(display_values, "vertices"):
|
||||
return 0
|
||||
|
||||
return sys.getsizeof(display_values["vertices"])
|
||||
|
||||
@staticmethod
|
||||
def filter_displayable_bases(root_object: Base) -> List[Base]:
|
||||
"""
|
||||
Filters out objects that are not displayable or don't have valid IDs.
|
||||
|
||||
Args:
|
||||
root_object: The root object to start the filtering from.
|
||||
|
||||
Returns:
|
||||
List of displayable bases with valid IDs.
|
||||
"""
|
||||
displayable_objects = [
|
||||
base # 'base' is now the first element of the tuple 'b'
|
||||
for base, instance_id, transform in list(
|
||||
extract_base_and_transform(root_object)
|
||||
)
|
||||
if Utilities.is_displayable_object(base) and getattr(base, "id", None)
|
||||
]
|
||||
|
||||
return displayable_objects
|
||||
Generated
+12
-1
@@ -1616,6 +1616,17 @@ h2 = ["h2 (>=4,<5)"]
|
||||
socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"]
|
||||
zstd = ["zstandard (>=0.18.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "vulture"
|
||||
version = "2.11"
|
||||
description = "Find dead code"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "vulture-2.11-py2.py3-none-any.whl", hash = "sha256:12d745f7710ffbf6aeb8279ba9068a24d4e52e8ed333b8b044035c9d6b823aba"},
|
||||
{file = "vulture-2.11.tar.gz", hash = "sha256:f0fbb60bce6511aad87ee0736c502456737490a82d919a44e6d92262cb35f1c2"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "websockets"
|
||||
version = "11.0.3"
|
||||
@@ -1880,4 +1891,4 @@ multidict = ">=4.0"
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.11"
|
||||
content-hash = "fa9a19381de95ee7cd6798a3b4707ed3fbe2f2845e449cdcd652d5670e4038cf"
|
||||
content-hash = "d8792ab3203910f2a054288ebb3e8d05e1deca69549c2fdf475aa7ffd2878b40"
|
||||
|
||||
+8
-5
@@ -4,27 +4,30 @@ version = "0.1.0"
|
||||
description = "Examine model health by identifying areas of high mesh density as possible perfomance issues."
|
||||
authors = ["Jonathon Broughton <jonathon@speckle.systems>"]
|
||||
readme = "README.md"
|
||||
packages = [{ include = "src" }]
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.11"
|
||||
matplotlib = "^3.8.0"
|
||||
matplotlib = "^3.9.1"
|
||||
seaborn = "^0.13.0"
|
||||
reportlab = "^4.0.6"
|
||||
reportlab = "^4.2.2"
|
||||
mypy = "^1.11.1"
|
||||
pydantic = "^2.8.2"
|
||||
specklepy = "^2.19.5"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
black = "^23.3.0"
|
||||
black = "^23.12.1"
|
||||
ruff = "^0.0.271"
|
||||
pytest = "^7.4.2"
|
||||
python-dotenv = "^1.0.0"
|
||||
pytest = "^7.4.4"
|
||||
python-dotenv = "^1.0.1"
|
||||
vulture = "^2.11"
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 88
|
||||
select = [
|
||||
"E", # pycodestyle
|
||||
"F", # pyflakes
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
from speckle_automate import execute_automate_function
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add src to the sys.path
|
||||
src_path = Path(__file__).resolve().parent / 'src'
|
||||
sys.path.append(str(src_path))
|
||||
|
||||
from src.main import automate_function, FunctionInputs
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("---------")
|
||||
print("| BEGIN |")
|
||||
print("---------")
|
||||
|
||||
# Entry point: Execute the automate function with defined inputs.
|
||||
execute_automate_function(automate_function, FunctionInputs)
|
||||
+19
-31
@@ -1,29 +1,18 @@
|
||||
"""This module contains the business logic for a Speckle Automate function.
|
||||
|
||||
The purpose is to demonstrate how one can use the automation_context module
|
||||
to process and analyze data in a Speckle project.
|
||||
"""
|
||||
from pydantic import Field
|
||||
from speckle_automate import (
|
||||
AutomateBase,
|
||||
AutomationContext,
|
||||
execute_automate_function,
|
||||
AutomationContext
|
||||
)
|
||||
|
||||
import Objects.objects
|
||||
from Objects.objects import (
|
||||
from objects.objects import (
|
||||
attach_visual_markers,
|
||||
colorise_densities,
|
||||
create_health_objects,
|
||||
density_summary,
|
||||
transport_recolorized_commit
|
||||
)
|
||||
from Utilities.reporting import Report
|
||||
from Utilities.utilities import Utilities
|
||||
|
||||
|
||||
## new render materials for objects passing/failing
|
||||
## swap those into the original commit object
|
||||
## send that back to the server
|
||||
from src.utilities.reporting import generate_pdf, write_pdf_to_temp, generate_summary, safe_store_file_result
|
||||
from src.utilities.utilities import filter_displayable_bases
|
||||
|
||||
class FunctionInputs(AutomateBase):
|
||||
"""Definition of user inputs for this function.
|
||||
@@ -35,7 +24,7 @@ class FunctionInputs(AutomateBase):
|
||||
density_level: float = Field(
|
||||
title="Density Threshold",
|
||||
description=(
|
||||
"Set a density value as the threshold. Objects with "
|
||||
"Set a density value as the threshold. objects with "
|
||||
"densities exceeding this value will be highlighted."
|
||||
),
|
||||
)
|
||||
@@ -53,7 +42,7 @@ class FunctionInputs(AutomateBase):
|
||||
|
||||
|
||||
def automate_function(
|
||||
automate_context: AutomationContext, function_inputs: FunctionInputs
|
||||
automate_context: AutomationContext, function_inputs: FunctionInputs
|
||||
) -> None:
|
||||
"""Analyzes Speckle data and provides visual markers and notifications.
|
||||
|
||||
@@ -70,7 +59,7 @@ def automate_function(
|
||||
version_root_object = automate_context.receive_version()
|
||||
|
||||
# Filter out objects to keep only displayable ones with valid IDs.
|
||||
displayable_bases = Utilities.filter_displayable_bases(version_root_object)
|
||||
displayable_bases = filter_displayable_bases(version_root_object)
|
||||
|
||||
if not displayable_bases:
|
||||
automate_context.mark_run_failed("No displayable mesh objects found.")
|
||||
@@ -95,11 +84,13 @@ def automate_function(
|
||||
|
||||
commit_details = {
|
||||
"stream_id": automate_context.automation_run_data.project_id,
|
||||
"commit_id": automate_context.automation_run_data.triggers[0].payload.version_id,
|
||||
"commit_id": automate_context.automation_run_data.triggers[
|
||||
0
|
||||
].payload.version_id,
|
||||
"server_url": automate_context.automation_run_data.speckle_server_url,
|
||||
}
|
||||
|
||||
summary_data = Report.generate_summary(
|
||||
summary_data = generate_summary(
|
||||
threshold, pass_rate_percentage, health_objects, commit_details
|
||||
)
|
||||
|
||||
@@ -111,18 +102,20 @@ def automate_function(
|
||||
high_density_count = summary_data["values"]["fail_count"]
|
||||
total_displayable_count = len(displayable_bases)
|
||||
|
||||
report = Report.generate_pdf(
|
||||
report = generate_pdf(
|
||||
all_densities, [float(a) for a in all_areas], data, threshold, report_data
|
||||
)
|
||||
|
||||
file_name = Report.write_pdf_to_temp(report)
|
||||
file_name = write_pdf_to_temp(report)
|
||||
|
||||
print(commit_details['server_url'])
|
||||
print("------------------------------------------------")
|
||||
print(f"| {commit_details['server_url']} |")
|
||||
print("------------------------------------------------")
|
||||
|
||||
automate_context.store_file_result(file_name)
|
||||
safe_store_file_result(automate_context, file_name)
|
||||
|
||||
# colorise the objects that pass/fail and send to a new model version
|
||||
Objects.objects.transport_recolorized_commit(
|
||||
transport_recolorized_commit(
|
||||
automate_context, health_objects, version_root_object
|
||||
)
|
||||
|
||||
@@ -136,8 +129,3 @@ def automate_function(
|
||||
automate_context.mark_run_success(
|
||||
"Analysis complete. High-density objects within acceptable limits."
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Entry point: Execute the automate function with defined inputs.
|
||||
execute_automate_function(automate_function, FunctionInputs)
|
||||
@@ -1,4 +1,3 @@
|
||||
import json
|
||||
import statistics
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, Iterable, List, Optional, TypeVar, Union
|
||||
@@ -11,7 +10,7 @@ from specklepy.objects.graph_traversal.traversal import GraphTraversal, Traversa
|
||||
from specklepy.objects.other import RenderMaterial
|
||||
from specklepy.objects.primitive import Interval
|
||||
|
||||
from Utilities.utilities import Utilities
|
||||
from src.utilities.utilities import try_get_display_value, get_byte_size
|
||||
|
||||
T = TypeVar("T", bound=Base)
|
||||
|
||||
@@ -94,7 +93,7 @@ class HealthObject:
|
||||
) # Fetch the parent_type attribute
|
||||
self.speckle_type = base_object.speckle_type
|
||||
self.units = base_object.units
|
||||
display_value = Utilities.try_get_display_value(base_object)
|
||||
display_value = try_get_display_value(base_object)
|
||||
|
||||
if display_value:
|
||||
self.display_values = display_value
|
||||
@@ -121,9 +120,9 @@ class HealthObject:
|
||||
# self.areas[dv.id] = dv.bbox.xSize.length * dv.bbox.ySize.length
|
||||
# elif isinstance(dv, Mesh):
|
||||
if isinstance(dv, Mesh):
|
||||
x_interval = self.interval_from_coordinates_by_offset(dv.vertices, 0)
|
||||
y_interval = self.interval_from_coordinates_by_offset(dv.vertices, 1)
|
||||
z_interval = self.interval_from_coordinates_by_offset(dv.vertices, 2)
|
||||
x_interval = interval_from_coordinates_by_offset(dv.vertices, 0)
|
||||
y_interval = interval_from_coordinates_by_offset(dv.vertices, 1)
|
||||
z_interval = interval_from_coordinates_by_offset(dv.vertices, 2)
|
||||
|
||||
self.bounding_volumes[dv.id] = (
|
||||
x_interval.length() * y_interval.length() * z_interval.length()
|
||||
@@ -152,24 +151,24 @@ class HealthObject:
|
||||
Returns:
|
||||
int: The computed byte size.
|
||||
"""
|
||||
self.sizes.update({dv.id: Utilities.get_byte_size(dv) for dv in display_values})
|
||||
self.sizes.update({dv.id: get_byte_size(dv) for dv in display_values})
|
||||
|
||||
@staticmethod
|
||||
def interval_from_coordinates_by_offset(
|
||||
vertices: List[float], offset: int = 0
|
||||
) -> Interval:
|
||||
"""Compute interval from coordinates by offset.
|
||||
|
||||
Args:
|
||||
vertices (List[float]): List of vertex coordinates.
|
||||
offset (int, optional): Offset to start from. Defaults to 0.
|
||||
def interval_from_coordinates_by_offset(
|
||||
vertices: List[float], offset: int = 0
|
||||
) -> Interval:
|
||||
"""Compute interval from coordinates by offset.
|
||||
|
||||
Returns:
|
||||
Interval: Computed interval.
|
||||
"""
|
||||
axis_coordinates = vertices[offset::3]
|
||||
axis_interval = Interval(start=min(axis_coordinates), end=max(axis_coordinates))
|
||||
return axis_interval
|
||||
Args:
|
||||
vertices (List[float]): List of vertex coordinates.
|
||||
offset (int, optional): Offset to start from. Defaults to 0.
|
||||
|
||||
Returns:
|
||||
Interval: Computed interval.
|
||||
"""
|
||||
axis_coordinates = vertices[offset::3]
|
||||
axis_interval = Interval(start=min(axis_coordinates), end=max(axis_coordinates))
|
||||
return axis_interval
|
||||
|
||||
|
||||
# def colorise_densities(
|
||||
@@ -251,7 +250,7 @@ def colorise_densities(
|
||||
|
||||
def colorize(
|
||||
health_objects
|
||||
) -> tuple[dict[Any, dict[str, Any]], list[Any], dict[Any, str]]:
|
||||
) -> tuple[dict[Any, dict[str, Any]], list[Any], dict[Any, str]] | None:
|
||||
densities = {ho.id: ho.aggregate_density for ho in health_objects.values()}
|
||||
|
||||
if not densities:
|
||||
@@ -405,6 +404,17 @@ def density_summary(
|
||||
|
||||
return data, all_densities, all_areas
|
||||
|
||||
def find_density_branch(automation_context: AutomationContext) -> Optional[Base]:
|
||||
client = automation_context.speckle_client
|
||||
project_id = automation_context.automation_run_data.project_id
|
||||
|
||||
branches = client.branch.list(project_id, 100, 0)
|
||||
for branch in branches:
|
||||
if "density" in branch.name.lower():
|
||||
print(f"Found 'density' branch: {branch.name}, Branch ID: {branch.id}")
|
||||
return branch.id
|
||||
print("No branch with the name 'density' found.")
|
||||
return None
|
||||
|
||||
def transport_recolorized_commit(
|
||||
automate_context: AutomationContext,
|
||||
@@ -415,7 +425,7 @@ def transport_recolorized_commit(
|
||||
# return the commit id of the new commit
|
||||
# create a new commit on a specific branch - we'll use "dirstat" for now
|
||||
|
||||
if automate_context.automation_run_data.branch_name == "density":
|
||||
if find_density_branch(automate_context) is not None:
|
||||
# commits on the density branch cannot be recolored
|
||||
print("------------------------------------------------")
|
||||
print("| CANNOT RECOLOR COMMITS ON THE DENSITY BRANCH |")
|
||||
@@ -441,7 +451,7 @@ def transport_recolorized_commit(
|
||||
and hasattr(current_object, "id")
|
||||
and current_object.id in health_objects.keys()
|
||||
):
|
||||
display_value = Utilities.try_get_display_value(current_object)
|
||||
display_value = try_get_display_value(current_object)
|
||||
|
||||
if display_value:
|
||||
# if display_value is an iterable
|
||||
@@ -453,10 +463,10 @@ def transport_recolorized_commit(
|
||||
].render_material
|
||||
|
||||
# concatenate the names of all the render materials
|
||||
render_material_names = [
|
||||
display_value_object.renderMaterial.name
|
||||
for display_value_object in display_value
|
||||
]
|
||||
# renderder_material_names = [
|
||||
# display_value_object.renderMaterial.name
|
||||
# for display_value_object in display_value
|
||||
# ]
|
||||
|
||||
else:
|
||||
# Apply the render material to the object
|
||||
@@ -465,7 +475,7 @@ def transport_recolorized_commit(
|
||||
].render_material
|
||||
|
||||
# concatenate the names of all the render materials
|
||||
render_material_names = [display_value.renderMaterial.name]
|
||||
# render_material_names = [display_value.renderMaterial.name]
|
||||
|
||||
current_object["density_rendered"] = True
|
||||
current_object["densities"] = health_objects[
|
||||
@@ -0,0 +1,122 @@
|
||||
from typing import List
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
import seaborn as sns
|
||||
from matplotlib.ticker import ScalarFormatter
|
||||
|
||||
sns.set_style("whitegrid")
|
||||
sns.set_context("talk")
|
||||
|
||||
|
||||
def plot_density_distribution(densities: List[float], threshold: float) -> None:
|
||||
"""Plot density distribution with a given threshold.
|
||||
|
||||
Args:
|
||||
densities (List[float]): List of densities.
|
||||
threshold (float): Value to differentiate high and low densities.
|
||||
"""
|
||||
plt.figure(figsize=(12, 7))
|
||||
bin_edges = np.linspace(min(densities), max(densities), 51)
|
||||
|
||||
# Plot densities below the threshold in blue
|
||||
sns.histplot(
|
||||
[d for d in densities if d <= threshold],
|
||||
bins=bin_edges,
|
||||
color="green",
|
||||
alpha=0.75,
|
||||
label="Densities <= threshold",
|
||||
)
|
||||
|
||||
# Plot densities above the threshold in red
|
||||
sns.histplot(
|
||||
[d for d in densities if d > threshold],
|
||||
bins=bin_edges,
|
||||
color="red",
|
||||
alpha=0.75,
|
||||
label="Densities > threshold",
|
||||
)
|
||||
|
||||
plt.axvline(x=threshold, color="grey", linestyle="--")
|
||||
plt.xlabel("Density (~vertices/m2)")
|
||||
plt.ylabel("Count")
|
||||
plt.title("Density Distribution")
|
||||
plt.legend()
|
||||
|
||||
# Format the x-axis to avoid scientific notation
|
||||
ax = plt.gca() # Get current axis
|
||||
ax.xaxis.set_major_formatter(ScalarFormatter(useMathText=False))
|
||||
ax.ticklabel_format(style="plain", axis="x")
|
||||
# plt.show()
|
||||
|
||||
def plot_area_density_correlation(
|
||||
areas: List[float], densities: List[float], threshold: float
|
||||
) -> None:
|
||||
"""Plot correlation between area and density with a given threshold.
|
||||
|
||||
Args:
|
||||
areas (List[float]): List of areas.
|
||||
densities (List[float]): List of densities.
|
||||
threshold (float): Value to differentiate high and low densities.
|
||||
"""
|
||||
plt.figure(figsize=(12, 7))
|
||||
|
||||
mask_below_threshold = np.array(densities) <= threshold
|
||||
mask_above_threshold = ~mask_below_threshold
|
||||
|
||||
# Plot points below the threshold in blue
|
||||
sns.scatterplot(
|
||||
x=np.array(areas)[mask_below_threshold],
|
||||
y=np.array(densities)[mask_below_threshold],
|
||||
color="green",
|
||||
label=f"Densities <= {threshold}",
|
||||
edgecolor="w",
|
||||
)
|
||||
|
||||
# Plot points above the threshold in red
|
||||
sns.scatterplot(
|
||||
x=np.array(areas)[mask_above_threshold],
|
||||
y=np.array(densities)[mask_above_threshold],
|
||||
color="red",
|
||||
label=f"Densities > {threshold}",
|
||||
edgecolor="w",
|
||||
)
|
||||
|
||||
plt.axhline(y=threshold, color="grey", linestyle="--")
|
||||
plt.title("Correlation between Area and Density")
|
||||
plt.xlabel("Area")
|
||||
plt.ylabel("Density (~vertices/m2)")
|
||||
plt.legend(title="Density", loc="upper right")
|
||||
|
||||
# Format the y-axis to avoid scientific notation
|
||||
ax = plt.gca() # Get current axis
|
||||
ax.yaxis.set_major_formatter(ScalarFormatter(useMathText=False))
|
||||
ax.ticklabel_format(style="plain", axis="x")
|
||||
# plt.show()
|
||||
|
||||
def plot_size_distribution(sizes: List[float]) -> None:
|
||||
"""Plot distribution of sizes.
|
||||
|
||||
Args:
|
||||
sizes (List[float]): List of sizes.
|
||||
"""
|
||||
plt.figure()
|
||||
plt.hist(sizes, bins=10, alpha=0.75)
|
||||
plt.xlabel("Size")
|
||||
plt.ylabel("Count")
|
||||
plt.title("Size Distribution")
|
||||
plt.grid(True)
|
||||
# plt.show()
|
||||
|
||||
def plot_area_distribution(areas: List[float]) -> None:
|
||||
"""Plot distribution of areas.
|
||||
|
||||
Args:
|
||||
areas (List[float]): List of areas.
|
||||
"""
|
||||
plt.figure()
|
||||
plt.hist(areas, bins=10, alpha=0.75)
|
||||
plt.xlabel("Area")
|
||||
plt.ylabel("Count")
|
||||
plt.title("Area Distribution")
|
||||
plt.grid(True)
|
||||
@@ -0,0 +1,289 @@
|
||||
import io
|
||||
import tempfile
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import IO, Any, Dict, List, Union
|
||||
|
||||
import httpx
|
||||
import matplotlib.pyplot as plt
|
||||
from PIL import Image as PILImage
|
||||
from reportlab.lib.colors import green, red
|
||||
from reportlab.lib.pagesizes import A4, portrait
|
||||
from reportlab.lib.styles import getSampleStyleSheet
|
||||
from reportlab.lib.units import inch
|
||||
from reportlab.platypus import (
|
||||
Image,
|
||||
PageBreak,
|
||||
Paragraph,
|
||||
SimpleDocTemplate,
|
||||
Spacer,
|
||||
Table,
|
||||
TableStyle,
|
||||
)
|
||||
|
||||
from src.objects.objects import HealthObject
|
||||
from src.utilities.plotting import plot_density_distribution, plot_area_density_correlation
|
||||
|
||||
|
||||
def generate_pdf(
|
||||
all_densities: List[float],
|
||||
all_areas: List[float],
|
||||
data: List[List[Union[str, float, int]]],
|
||||
threshold: float,
|
||||
summary_data=None,
|
||||
) -> IO[bytes]:
|
||||
"""
|
||||
Generate a PDF report summarizing the density data and return as a BytesIO object.
|
||||
|
||||
Args:
|
||||
all_densities (List[float]): List of all density values.
|
||||
all_areas (List[float]): List of all area values.
|
||||
data (List[List[Union[str, float, int]]]): Data to be tabulated in the PDF.
|
||||
threshold (float): The threshold for density.
|
||||
summary_data: Data to be tabulated in the summary table.
|
||||
|
||||
Returns:
|
||||
IO[bytes]: BytesIO object containing the PDF data.
|
||||
"""
|
||||
# Create a buffer to store the PDF
|
||||
pdf_buffer = io.BytesIO()
|
||||
|
||||
# Create a buffer to store the plots
|
||||
plot_buffer_density = io.BytesIO()
|
||||
plot_buffer_correlation = io.BytesIO()
|
||||
|
||||
# Determine the available width for the image, considering 1-inch margins on both sides
|
||||
available_width = (
|
||||
A4[0] - 2 * 72
|
||||
) # A4[0] gives the width of the A4 page in points
|
||||
|
||||
# Plot density distribution and save to buffer
|
||||
plt.figure(figsize=(12, 7))
|
||||
plot_density_distribution(all_densities, threshold)
|
||||
plt.savefig(plot_buffer_density, format="png")
|
||||
plot_buffer_density.seek(0)
|
||||
|
||||
# Plot area-density correlation and save to buffer
|
||||
plt.figure(figsize=(12, 7))
|
||||
plot_area_density_correlation(all_areas, all_densities, threshold)
|
||||
plt.savefig(plot_buffer_correlation, format="png")
|
||||
plot_buffer_correlation.seek(0)
|
||||
|
||||
# Initialize PDF document
|
||||
doc = SimpleDocTemplate(
|
||||
pdf_buffer,
|
||||
pagesize=portrait(A4),
|
||||
rightMargin=72,
|
||||
leftMargin=72,
|
||||
topMargin=72,
|
||||
bottomMargin=128,
|
||||
)
|
||||
story = []
|
||||
styles = getSampleStyleSheet()
|
||||
|
||||
story.append(Paragraph("Model Health", styles["Title"]))
|
||||
|
||||
# Introduction paragraph
|
||||
intro_paragraph = (
|
||||
"The performance and health of a digital model in the AEC "
|
||||
"(Architecture, Engineering, and Construction) domain can be significantly "
|
||||
"impacted by the complexity and density of the mesh objects within it. Heavy "
|
||||
"mesh objects, characterized by a high density of vertices and polygons, often "
|
||||
"result in slower rendering times, increased computational resource consumption, "
|
||||
"and potential crashes or lags in visualization tools. Such objects can be a "
|
||||
"primary contributor to poor model health and can degrade the user experience, "
|
||||
"especially in real-time rendering or simulation scenarios. It's important to note "
|
||||
"that the absolute value of the density, while indicative of object complexity, "
|
||||
"is without meaningful units (~vertices/m2) in and of itself and should be interpreted in the "
|
||||
"context of the model and its intended use. This report analyzes the densities of "
|
||||
"various mesh objects within the model to identify potential performance "
|
||||
"bottlenecks and provide actionable insights for optimization."
|
||||
)
|
||||
|
||||
story.append(Spacer(1, 0.25 * inch))
|
||||
story.append(Paragraph(intro_paragraph, styles["Normal"]))
|
||||
story.append(Spacer(1, 0.25 * inch))
|
||||
|
||||
result_color = green
|
||||
|
||||
# Summary Table
|
||||
if summary_data is not None:
|
||||
summary_table = Table(summary_data["table_data"])
|
||||
|
||||
# if summary_data['result'] contains "Fail", set the result color to red
|
||||
if "Fail" in summary_data["result"]:
|
||||
result_color = red
|
||||
|
||||
summary_table.setStyle(
|
||||
TableStyle(
|
||||
[
|
||||
(
|
||||
"TEXTCOLOR",
|
||||
(1, 6),
|
||||
(1, 7),
|
||||
result_color,
|
||||
) # Targeting only the "Result" cell
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
story.append(summary_table)
|
||||
story.append(Spacer(1, 0.25 * inch))
|
||||
|
||||
# Append elements to the story (PDF content)
|
||||
story.append(Paragraph("Density Summary", styles["Heading2"]))
|
||||
story.append(Spacer(1, 0.25 * inch))
|
||||
|
||||
# Add data table
|
||||
table = Table(data)
|
||||
table.setStyle(
|
||||
TableStyle(
|
||||
[
|
||||
("BACKGROUND", (0, 0), (-1, 0), "#eeeeee"),
|
||||
("TEXTCOLOR", (0, 0), (-1, 0), "#333333"),
|
||||
("ALIGN", (0, 0), (-1, -1), "CENTER"),
|
||||
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
|
||||
("FONTSIZE", (0, 0), (-1, 0), 14),
|
||||
("BOTTOMPADDING", (0, 0), (-1, 0), 12),
|
||||
("BACKGROUND", (0, 1), (-1, -1), "#f3f3f3"),
|
||||
("GRID", (0, 0), (-1, -1), 1, "#aaaaaa"),
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
story.append(table)
|
||||
story.append(Spacer(1, 0.25 * inch))
|
||||
story.append(PageBreak()) # Insert a page break
|
||||
|
||||
# For the density distribution plot:
|
||||
story.append(Paragraph("Density Distribution Plot:", styles["Heading2"]))
|
||||
plot_buffer_density.seek(0)
|
||||
width, height = get_resized_dimensions(
|
||||
plot_buffer_density,
|
||||
target_width=available_width,
|
||||
max_height=available_width,
|
||||
)
|
||||
img_density = Image(plot_buffer_density, width=width, height=height)
|
||||
img_density.hAlign = "CENTER"
|
||||
story.append(img_density)
|
||||
story.append(Spacer(1, 0.25 * inch))
|
||||
|
||||
# For the area-density correlation plot:
|
||||
story.append(
|
||||
Paragraph("Correlation between Area and Density:", styles["Heading2"])
|
||||
)
|
||||
plot_buffer_correlation.seek(0)
|
||||
width, height = get_resized_dimensions(
|
||||
plot_buffer_correlation,
|
||||
target_width=available_width,
|
||||
max_height=available_width,
|
||||
)
|
||||
img_correlation = Image(plot_buffer_correlation, width=width, height=height)
|
||||
img_correlation.hAlign = "CENTER"
|
||||
story.append(img_correlation)
|
||||
|
||||
# Build the PDF document
|
||||
doc.build(story)
|
||||
|
||||
# Reset the buffer position to the beginning
|
||||
pdf_buffer.seek(0)
|
||||
|
||||
return pdf_buffer
|
||||
|
||||
|
||||
def get_resized_dimensions(buffer, target_width, max_height):
|
||||
"""Get resized width and height for an image while maintaining aspect ratio."""
|
||||
with PILImage.open(buffer) as img:
|
||||
original_width, original_height = img.size
|
||||
aspect_ratio = original_height / original_width
|
||||
new_height = target_width * aspect_ratio
|
||||
if new_height > max_height:
|
||||
new_height = max_height
|
||||
new_width = new_height / aspect_ratio
|
||||
else:
|
||||
new_width = target_width
|
||||
return new_width, new_height
|
||||
|
||||
|
||||
def generate_summary(
|
||||
threshold: float,
|
||||
pass_rate_percentage: float,
|
||||
health_objects: Dict[str, HealthObject],
|
||||
commit_details: Dict[str, str],
|
||||
) -> Dict[str, Any]:
|
||||
# Calculate the number of objects above the threshold
|
||||
above_threshold_count = sum(
|
||||
1 for ho in health_objects.values() if ho.aggregate_density > threshold
|
||||
)
|
||||
|
||||
# Calculate the percentage of objects above the threshold
|
||||
above_threshold_percentage = above_threshold_count / len(health_objects)
|
||||
|
||||
# Determine if the result is a pass or fail
|
||||
result_state = (
|
||||
"Pass" if above_threshold_percentage <= pass_rate_percentage else "Fail"
|
||||
)
|
||||
result = f"{result_state} ({above_threshold_percentage * 100:.2f}%)"
|
||||
|
||||
# Create the summary table
|
||||
data = {
|
||||
"table": [
|
||||
["Metric", "Value"],
|
||||
["Server URL", commit_details["server_url"]],
|
||||
["Project ID", commit_details["stream_id"]],
|
||||
["Version ID", commit_details["commit_id"]],
|
||||
["Threshold", threshold],
|
||||
["Pass Rate Percentage", f"{pass_rate_percentage * 100}%"],
|
||||
["Assessment Result", result],
|
||||
],
|
||||
"values": {
|
||||
"pass_rate": pass_rate_percentage,
|
||||
"result": result_state,
|
||||
"fail_count": above_threshold_count,
|
||||
},
|
||||
}
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def write_pdf_to_temp(report: IO[bytes]) -> str:
|
||||
temp_file = Path(
|
||||
tempfile.gettempdir(),
|
||||
f"automate_tiles_{datetime.now().timestamp():.0f}",
|
||||
"report.pdf",
|
||||
)
|
||||
temp_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
report.seek(0)
|
||||
temp_file.write_bytes(report.read())
|
||||
|
||||
return str(temp_file)
|
||||
|
||||
|
||||
from speckle_automate import AutomationContext
|
||||
|
||||
|
||||
def safe_store_file_result(automate_context: AutomationContext, file_name: str):
|
||||
# Store the original URL
|
||||
original_url = automate_context.automation_run_data.speckle_server_url
|
||||
|
||||
try:
|
||||
# Modify the URL property of the automation_run_data
|
||||
automate_context.automation_run_data.speckle_server_url = original_url.rstrip(
|
||||
"/"
|
||||
)
|
||||
|
||||
# Attempt to store the file
|
||||
automate_context.store_file_result(file_name)
|
||||
except httpx.HTTPStatusError as e:
|
||||
if e.response.status_code != 404:
|
||||
raise
|
||||
|
||||
else:
|
||||
# Handle the 404 error
|
||||
error_message = f"Unable to store file: {file_name}. Error: {str(e)}"
|
||||
print(error_message) # For logging purposes
|
||||
# automate_context.mark_run_exception(error_message)
|
||||
finally:
|
||||
# Restore the original URL
|
||||
automate_context.automation_run_data.speckle_server_url = original_url
|
||||
@@ -0,0 +1,94 @@
|
||||
import sys
|
||||
from typing import List, TypeVar, Iterable, Optional
|
||||
|
||||
from specklepy.objects.base import Base
|
||||
|
||||
from src.utilities.flatten import extract_base_and_transform
|
||||
|
||||
T = TypeVar("T", bound=Base)
|
||||
|
||||
def is_displayable_object(speckle_object: Base) -> bool:
|
||||
"""
|
||||
Determines if a given Speckle object is displayable.
|
||||
|
||||
This function checks if the speckle_object has a display value
|
||||
and returns True if it does, otherwise it returns False.
|
||||
|
||||
Args:
|
||||
speckle_object (Base): The Speckle object to check.
|
||||
|
||||
Returns:
|
||||
bool: True if the object has a display value, False otherwise.
|
||||
"""
|
||||
return try_get_display_value(speckle_object) is not None
|
||||
|
||||
def try_get_display_value(speckle_object: Base) -> Optional[List[T]]:
|
||||
"""Try fetching the display value from a Speckle object.
|
||||
|
||||
Args:
|
||||
speckle_object (Base): The Speckle object to extract the display value from.
|
||||
|
||||
Returns:
|
||||
Optional[List[T]]: A list containing the display values. If no display value is found,
|
||||
returns None.
|
||||
"""
|
||||
raw_display_value = getattr(speckle_object, "displayValue", None) or getattr(
|
||||
speckle_object, "@displayValue", None
|
||||
)
|
||||
|
||||
if raw_display_value is None:
|
||||
return None
|
||||
|
||||
if isinstance(raw_display_value, Iterable):
|
||||
display_values = list(
|
||||
filter(lambda x: isinstance(x, Base), raw_display_value)
|
||||
)
|
||||
return display_values if display_values else None
|
||||
|
||||
def get_byte_size(speckle_object: Base) -> int:
|
||||
"""Calculate the total byte size of the display values of a Speckle object.
|
||||
Keeps drilling down until it gets to vertices, or it returns 0 if it can't find any.
|
||||
|
||||
Args:
|
||||
speckle_object (Base): The Speckle object for which to compute the byte size.
|
||||
|
||||
Returns:
|
||||
int: The total byte size of all display values that have vertices.
|
||||
"""
|
||||
if speckle_object is None:
|
||||
return 0
|
||||
|
||||
display_values = try_get_display_value(speckle_object)
|
||||
|
||||
if display_values is None:
|
||||
display_values = speckle_object
|
||||
|
||||
if isinstance(display_values, Iterable):
|
||||
return sum(
|
||||
[sys.getsizeof(display_value) for display_value in display_values]
|
||||
)
|
||||
|
||||
if not hasattr(display_values, "vertices"):
|
||||
return 0
|
||||
|
||||
return sys.getsizeof(display_values["vertices"])
|
||||
|
||||
def filter_displayable_bases(root_object: Base) -> List[Base]:
|
||||
"""
|
||||
Filters out objects that are not displayable or don't have valid IDs.
|
||||
|
||||
Args:
|
||||
root_object: The root object to start the filtering from.
|
||||
|
||||
Returns:
|
||||
List of displayable bases with valid IDs.
|
||||
"""
|
||||
displayable_objects = [
|
||||
base # 'base' is now the first element of the tuple 'b'
|
||||
for base, instance_id, transform in list(
|
||||
extract_base_and_transform(root_object)
|
||||
)
|
||||
if is_displayable_object(base) and getattr(base, "id", None)
|
||||
]
|
||||
|
||||
return displayable_objects
|
||||
@@ -1,5 +1,8 @@
|
||||
import os
|
||||
import sys
|
||||
from dotenv import load_dotenv
|
||||
# Add the src directory to the Python path
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../src')))
|
||||
|
||||
|
||||
def pytest_configure(config):
|
||||
|
||||
@@ -14,7 +14,7 @@ from speckle_automate import (
|
||||
from specklepy.api.client import SpeckleClient
|
||||
from specklepy.objects.base import Base
|
||||
|
||||
from main import FunctionInputs, automate_function
|
||||
from src.main import FunctionInputs, automate_function
|
||||
|
||||
|
||||
def crypto_random_string(length: int) -> str:
|
||||
@@ -94,7 +94,6 @@ def test_object() -> Base:
|
||||
return root_object
|
||||
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
# fixture to mock the AutomationRunData that would be generated by a full Automation Run
|
||||
def fake_automation_run_data(request, test_client: SpeckleClient) -> AutomationRunData:
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import pytest
|
||||
from specklepy.objects.base import Base
|
||||
|
||||
from Objects.objects import HealthObject
|
||||
from Utilities.utilities import Utilities
|
||||
from src.objects.objects import HealthObject
|
||||
from src.utilities.utilities import filter_displayable_bases
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -36,10 +36,10 @@ def speckle_server_url(request) -> str:
|
||||
|
||||
|
||||
def test_filter_displayable_bases(mock_base):
|
||||
displayable_bases = Utilities.filter_displayable_bases(mock_base)
|
||||
displayable_bases = filter_displayable_bases(mock_base)
|
||||
assert (
|
||||
len(displayable_bases) == 2
|
||||
) # Only child_1 and child_2 should be considered displayable
|
||||
)
|
||||
|
||||
|
||||
def test_convert_from_base_with_nested_elements(mock_base):
|
||||
@@ -48,7 +48,7 @@ def test_convert_from_base_with_nested_elements(mock_base):
|
||||
assert health_obj.id == "12345"
|
||||
assert (
|
||||
health_obj.speckle_type == "Base"
|
||||
) # Assuming no speckle_type was set in the mock_base
|
||||
)
|
||||
|
||||
|
||||
def test_density_with_nested_elements(mock_base):
|
||||
|
||||
+1
-1
@@ -2,13 +2,13 @@
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
# Speckle is a data platform for AEC; here we're importing essential modules from it
|
||||
from specklepy.api import operations
|
||||
from specklepy.api.client import SpeckleClient
|
||||
from specklepy.objects import Base
|
||||
from specklepy.transports.server import ServerTransport
|
||||
|
||||
|
||||
# Setting up some pytest fixtures for testing
|
||||
# Fixtures are a way to provide consistent test data or configuration for each test
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ import pytest
|
||||
from specklepy.objects.base import Base
|
||||
from specklepy.objects.geometry import Mesh
|
||||
|
||||
from Utilities.utilities import Utilities
|
||||
from src.utilities.utilities import is_displayable_object, try_get_display_value, get_byte_size
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -18,20 +18,20 @@ def sample_bases():
|
||||
def test_is_displayable_object(sample_bases):
|
||||
base_with_display_value, base_without_display_value = sample_bases
|
||||
|
||||
assert Utilities.is_displayable_object(base_with_display_value)
|
||||
assert not Utilities.is_displayable_object(base_without_display_value)
|
||||
assert is_displayable_object(base_with_display_value)
|
||||
assert not is_displayable_object(base_without_display_value)
|
||||
|
||||
# assert that teh count of the sample_bases that is displayable is 1
|
||||
assert sum(1 for b in sample_bases if Utilities.is_displayable_object(b)) == 1
|
||||
assert sum(1 for b in sample_bases if is_displayable_object(b)) == 1
|
||||
|
||||
|
||||
def test_try_get_display_value(sample_bases):
|
||||
base_with_display_value, base_without_display_value = sample_bases
|
||||
|
||||
print(Utilities.try_get_display_value(base_with_display_value)[0])
|
||||
print(try_get_display_value(base_with_display_value)[0])
|
||||
|
||||
assert Utilities.try_get_display_value(base_with_display_value) is not None
|
||||
assert Utilities.try_get_display_value(base_without_display_value) is None
|
||||
assert try_get_display_value(base_with_display_value) is not None
|
||||
assert try_get_display_value(base_without_display_value) is None
|
||||
|
||||
|
||||
def test_get_byte_size(sample_bases):
|
||||
@@ -39,5 +39,5 @@ def test_get_byte_size(sample_bases):
|
||||
|
||||
print(base_with_display_value.displayValue)
|
||||
|
||||
assert Utilities.get_byte_size(base_with_display_value) > 0
|
||||
assert Utilities.get_byte_size(base_without_display_value) == 0
|
||||
assert get_byte_size(base_with_display_value) > 0
|
||||
assert get_byte_size(base_without_display_value) == 0
|
||||
|
||||
Reference in New Issue
Block a user