Files
2023-11-12 23:49:51 +03:00

274 lines
10 KiB
Python

"""This module contains the business logic of the function.
use the automation_context module to wrap your function in an Autamate context helper
"""
import os
import subprocess
from archaea.geometry.vector3d import Vector3d
from archaea.geometry.mesh import Mesh
from archaea.geometry.point3d import Point3d
from archaea_simulation.simulation_objects.domain import Domain
from archaea_simulation.speckle.vtk_to_speckle import vtk_to_speckle, Text
from archaea_simulation.cfd.utils.path import get_cfd_export_path
from pydantic import Field
from speckle_automate import (
AutomateBase,
AutomationContext,
execute_automate_function,
)
from specklepy.api import operations
from specklepy.objects.base import Base
from specklepy.objects.other import DisplayStyle
from specklepy.objects.geometry import Brep, Line, Polyline, Point, Plane
from specklepy.transports.server import ServerTransport
from flatten import flatten_base
# # TODO: Use speckle text object when specklepy has new release
class FunctionInputs(AutomateBase):
"""These are function author defined values.
Automate will make sure to supply them matching the types specified here.
Please use the pydantic model schema to define your inputs:
https://docs.pydantic.dev/latest/usage/models/
"""
wind_direction: float = Field(
title="Wind Direction",
description=(
"Wind direction represents as azimuth angle is like a compass direction"
"with North = 0°, East = 90°, South = 180°, West = 270°."
),
)
wind_speed: float = Field(
title="Wind Speed",
description="Wind speed (m/s) in XY plane."
)
reference_height: float = Field(
title="Reference Wind Speed Height",
description="Altitude (m) of the wind speed is measured.",
default=15,
)
number_of_cpus: int = Field(
title="Number of CPUs",
description="Number of CPUs to run simulation parallelly.",
default=4,
)
tunnel_width_scale: float = Field(
title="Tunnel width scale",
description=("Scale value of the tunnel width according to context objects bounding box. "
"It scales the inlet surface which we send wind into tunnel."),
default=5,
)
tunnel_depth_scale: float = Field(
title="Tunnel depth scale",
description="Scale value of the tunnel depth according to context objects bounding box.",
default=5,
)
tunnel_height_scale: float = Field(
title="Tunnel height scale",
description=("Scale value of the tunnel height according to context objects bounding box. "
"It scales from zGround."),
default=3,
)
def automate_function(
automate_context: AutomationContext,
function_inputs: FunctionInputs,
) -> None:
"""This is an example Speckle Automate function.
Args:
automate_context: A context helper object, that carries relevant information
about the runtime context of this function.
It gives access to the Speckle project data, that triggered this run.
It also has conveniece methods attach result data to the Speckle model.
function_inputs: An instance object matching the defined schema.
"""
print("number of cpus", os.cpu_count())
subprocess.run("/bin/bash -c 'source /opt/openfoam9/etc/bashrc'", shell=True)
# the context provides a conveniet way, to receive the triggering version
version_root_object = automate_context.receive_version()
accepted_types = [Brep.speckle_type]
objects_to_create_stl = []
flatten_base_objects = flatten_base(version_root_object)
count = 0
for b in flatten_base_objects:
if b.speckle_type in accepted_types:
if not b.id:
raise ValueError("Cannot operate on objects without their id's.")
objects_to_create_stl.append(b)
count += 1
speckle_meshes = []
for speckle_mesh in objects_to_create_stl:
speckle_meshes += speckle_mesh.displayValue
archaea_meshes = []
for speckle_mesh in speckle_meshes:
archaea_mesh = Mesh.from_ngon_mesh(speckle_mesh.vertices, speckle_mesh.faces)
archaea_meshes.append(archaea_mesh)
x_scale = function_inputs.tunnel_width_scale
y_scale = function_inputs.tunnel_depth_scale
z_scale = function_inputs.tunnel_height_scale
# Init domain
domain = Domain.from_meshes(archaea_meshes, x_scale=x_scale, y_scale=y_scale, z_scale=z_scale, wind_direction=function_inputs.wind_direction, wind_speed=function_inputs.wind_speed)
# Get folder to copy cases
archaea_folder = get_cfd_export_path()
if not os.path.exists(archaea_folder):
os.makedirs(archaea_folder)
# Get case folder
case_folder = os.path.join(archaea_folder, version_root_object.id)
domain.create_case(case_folder, function_inputs.number_of_cpus)
cmd_path = os.path.join(case_folder, './Allrun')
cmd = "/bin/bash -c '{cmd_path}'".format(cmd_path=cmd_path)
# retcode = subprocess.call(cmd, shell=True, stdout=pipefile)
completed_process = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
print(completed_process.stdout)
blockMesh_log = os.path.join(case_folder, 'log.blockMesh')
add_to_store_if_exist(automate_context, blockMesh_log)
decomposePar_log = os.path.join(case_folder, 'log.decomposePar')
add_to_store_if_exist(automate_context, decomposePar_log)
patchSummary_log = os.path.join(case_folder, 'log.patchSummary')
add_to_store_if_exist(automate_context, patchSummary_log)
reconstructPar_log = os.path.join(case_folder, 'log.reconstructPar')
add_to_store_if_exist(automate_context, reconstructPar_log)
reconstructParMesh_log = os.path.join(case_folder, 'log.reconstructParMesh')
add_to_store_if_exist(automate_context, reconstructParMesh_log)
simpleFoam_log = os.path.join(case_folder, 'log.simpleFoam')
add_to_store_if_exist(automate_context, simpleFoam_log)
snappyHexMesh_log = os.path.join(case_folder, 'log.snappyHexMesh')
add_to_store_if_exist(automate_context, snappyHexMesh_log)
surfaceFeatures_log = os.path.join(case_folder, 'log.surfaceFeatures')
add_to_store_if_exist(automate_context, surfaceFeatures_log)
vtk_file = os.path.join(case_folder, 'postProcessing',
'cutPlaneSurface', '400', 'U_cutPlane.vtk')
result_mesh = vtk_to_speckle(vtk_file, domain.center.move(Vector3d(domain.x / 2 + 2, -domain.y / 2 - 2, 0)))
domain_corner_lines = domain_lines(domain.corners)
subdomain_corner_lines = domain_lines(domain.subdomain_corners)
arrow_line, arrow, text = wind_direction_arrow(domain)
result = Base()
result.data = [result_mesh, domain_corner_lines, subdomain_corner_lines, [arrow_line, arrow, text]]
automate_context.create_new_version_in_project(
result,
"automate_result"
)
if count == 0:
# this is how a run is marked with a failure cause
automate_context.mark_run_failed(
"Automation failed: "
"Not found appropriate object to run CFD simulation."
)
else:
automate_context.mark_run_success("Object found to run simulation!")
def wind_direction_arrow(domain: Domain):
wind_vector = Vector3d.from_azimuth_angle(domain.wind_direction)
vector = wind_vector.reverse()
mid = domain.center.move(vector.scale(domain.y / 2))
mid = mid.move(vector.scale(5))
point_left = mid.move(vector.scale(2)).rotate(Vector3d(0, 0, 1), 45, mid)
point_right = mid.move(vector.scale(2)).rotate(Vector3d(0, 0, 1), -45, mid)
offset_mid_2 = mid.move(vector.scale(2 ** 0.5))
offset_mid_10 = mid.move(vector.scale(10))
arrow_line = Polyline.from_points([Point.from_coords(p.x, p.y, p.z) for p in [offset_mid_2, offset_mid_10]])
arrow = Polyline.from_points([Point.from_coords(p.x, p.y, p.z) for p in [mid, point_left, point_right, mid]])
x_dir = vector.reverse()
y_dir = x_dir.cross_product(Vector3d(0,0,1)).reverse()
text_v = point_left.move(vector.scale(20))
plane = Plane.from_list([text_v.x, text_v.y, text_v.z,
0, 0, 1,
x_dir.x, x_dir.y, 0,
y_dir.x, y_dir.y, 0,
3])
text = Text()
text.height = 2.5
text.value = f"{domain.wind_speed} m/s"
text.plane = plane
text.units = "m"
display_style = DisplayStyle()
display_style.color = -16777216
display_style.linetype = "Continuous"
display_style.units = "m"
display_style.lineweight = 0
text.displayStyle = display_style
return arrow_line, arrow, text
def domain_lines(corners):
speckle_domain_points = [Point.from_coords(corner.x, corner.y, corner.z) for corner in corners]
floor_polyline = Polyline.from_points(speckle_domain_points[0:4])
floor_polyline.closed = True
ceiling_polyline = Polyline.from_points(speckle_domain_points[-4:])
ceiling_polyline.closed = True
line_1 = Line()
line_1.units = 'm'
line_1.start = speckle_domain_points[0]
line_1.end = speckle_domain_points[4]
line_2 = Line()
line_2.units = 'm'
line_2.start = speckle_domain_points[1]
line_2.end = speckle_domain_points[5]
line_3 = Line()
line_3.units = 'm'
line_3.start = speckle_domain_points[2]
line_3.end = speckle_domain_points[6]
line_4 = Line()
line_4.units = 'm'
line_4.start = speckle_domain_points[3]
line_4.end = speckle_domain_points[7]
return [floor_polyline,ceiling_polyline,line_1,line_2,line_3,line_4]
def add_to_store_if_exist(automate_context: AutomationContext, path):
if os.path.exists(path):
automate_context.store_file_result(path)
# make sure to call the function with the executor
if __name__ == "__main__":
# NOTE: always pass in the automate function by its reference, do not invoke it!
# pass in the function reference with the inputs schema to the executor
execute_automate_function(automate_function, FunctionInputs)
# if the function has no arguments, the executor can handle it like so
# execute_automate_function(automate_function_without_inputs)