first push

This commit is contained in:
NLSA
2026-03-03 10:06:20 +01:00
parent c2353d5353
commit ea7ea434ab
14 changed files with 1302736 additions and 71 deletions
-4
View File
@@ -1,4 +0,0 @@
SPECKLE_TOKEN="mytoken"
SPECKLE_SERVER_URL="http://127.0.0.1:3000"
SPECKLE_PROJECT_ID=""
SPECKLE_AUTOMATION_ID=""
+177 -64
View File
@@ -1,7 +1,23 @@
"""This module contains the function's business logic. from datetime import datetime
Use the automation_context module to wrap your function in an Automate context helper. import ifcopenshell.api
"""
import utils.config as config
from utils.materials import MaterialManager
from utils.traversal import traverse, print_tree
from utils.mapper import classify
from utils.geometry import mesh_to_ifc, get_display_instances, _make_placement
from utils.instances import is_instance, instance_to_ifc, build_definition_map, print_instance_stats
from utils.properties import write_properties, write_common_properties
from utils.writer import create_ifc_scaffold, StoreyManager
SPATIAL_STRUCTURE_TYPES = {
"IfcSite", "IfcBuilding", "IfcBuildingStorey",
"IfcSpace", "IfcExternalSpatialElement", "IfcSpatialZone",
"IfcGrid", "IfcAnnotation",
}
from pydantic import Field, SecretStr from pydantic import Field, SecretStr
from speckle_automate import ( from speckle_automate import (
@@ -10,9 +26,6 @@ from speckle_automate import (
execute_automate_function, execute_automate_function,
) )
from flatten import flatten_base
class FunctionInputs(AutomateBase): class FunctionInputs(AutomateBase):
"""These are function author-defined values. """These are function author-defined values.
@@ -20,77 +33,177 @@ class FunctionInputs(AutomateBase):
Please use the pydantic model schema to define your inputs: Please use the pydantic model schema to define your inputs:
https://docs.pydantic.dev/latest/usage/models/ https://docs.pydantic.dev/latest/usage/models/
""" """
file_name: str = Field(
# An example of how to use secret values. title="File Name",
whisper_message: SecretStr = Field(title="This is a secret message") description="The name of the IFC file.",
forbidden_speckle_type: str = Field(
title="Forbidden speckle type",
description=(
"If a object has the following speckle_type,"
" it will be marked with an error."
),
) )
IFC_PROJECT_NAME : str = Field(
title="IFC Project Name",
description="The name of the IFC project.",
)
IFC_SITE_NAME : str = Field(
title="IFC Site Name",
description="The name of the IFC site.",
)
IFC_BUILDING_NAME : str = Field(
title="IFC Building Name",
description="The name of the IFC building.",
)
def automate_function( def automate_function(
automate_context: AutomationContext, automate_context: AutomationContext,
function_inputs: FunctionInputs, function_inputs: FunctionInputs,
) -> None: ) -> None:
"""This is an example Speckle Automate function. print("=" * 60)
print(" Speckle -> IFC4.3 Exporter")
print("=" * 60)
Args: #version_root_object = automate_context.receive_version()
automate_context: A context-helper object that carries relevant information
about the runtime context of this function.
It gives access to the Speckle project data that triggered this run.
It also has convenient methods for attaching results to the Speckle model.
function_inputs: An instance object matching the defined schema.
"""
# The context provides a convenient way to receive the triggering version.
version_root_object = automate_context.receive_version()
objects_with_forbidden_speckle_type = [ # ------------------------------------------------------------------ #
b # 1. Receive
for b in flatten_base(version_root_object) # ------------------------------------------------------------------ #
if b.speckle_type == function_inputs.forbidden_speckle_type base = automate_context.receive_version()
] scale = 1.0
count = len(objects_with_forbidden_speckle_type)
if count > 0: # Uncomment to debug object tree:
# This is how a run is marked with a failure cause. # print_tree(base)
automate_context.attach_error_to_objects(
category="Forbidden speckle_type" # ------------------------------------------------------------------ #
f" ({function_inputs.forbidden_speckle_type})", # 2. Build definition map (for instance resolution)
affected_objects=objects_with_forbidden_speckle_type, # ------------------------------------------------------------------ #
message="This project should not contain the type: " print("\n🔍 Building definition map...")
f"{function_inputs.forbidden_speckle_type}", definition_map = build_definition_map(base)
# ------------------------------------------------------------------ #
# 3. Set up IFC
# ------------------------------------------------------------------ #
ifc, building, body_context = create_ifc_scaffold()
storey_manager = StoreyManager(ifc, building)
# ------------------------------------------------------------------ #
# 3b. Build material map from renderMaterialProxies
# ------------------------------------------------------------------ #
print("\n🎨 Building material map...")
material_manager = MaterialManager(ifc, base)
# ------------------------------------------------------------------ #
# 4. Traverse & export
# ------------------------------------------------------------------ #
total = 0
no_geometry = 0
skipped_spatial = 0
instance_count = 0
print(f"\n📐 Processing elements (scale={scale})...\n")
for obj, level_name, category_name in traverse(base):
ifc_class = classify(obj, category_name)
if ifc_class in SPATIAL_STRUCTURE_TYPES:
skipped_spatial += 1
continue
name = getattr(obj, "name", None) or getattr(obj, "applicationId", None) or getattr(obj, "id", "unnamed")
storey = storey_manager.get_or_create(level_name)
# ------------------------------------------------------------------ #
# Path A: Instance object (has transform + definitionId, no displayValue)
# ------------------------------------------------------------------ #
if is_instance(obj):
rep, placement = instance_to_ifc(ifc, body_context, obj, definition_map, scale=scale, material_manager=material_manager)
element = _create_element(ifc, ifc_class, name, rep, placement, storey)
write_common_properties(ifc, element, obj, category_name)
write_properties(ifc, element, obj)
instance_count += 1
total += 1
if not rep:
no_geometry += 1
else:
# ------------------------------------------------------------------ #
# Path B: Normal DataObject — may have:
# B1. Direct mesh geometry in displayValue
# B2. Instance objects in displayValue (the hidden case!)
# ------------------------------------------------------------------ #
# B1: Mesh geometry on the parent object
rep, placement = mesh_to_ifc(ifc, body_context, obj, scale=scale, material_manager=material_manager)
element = _create_element(ifc, ifc_class, name, rep, placement, storey)
write_common_properties(ifc, element, obj, category_name)
write_properties(ifc, element, obj)
total += 1
if not rep:
no_geometry += 1
# B2: Instance objects nested inside displayValue
# Each becomes its own IFC element (same class as parent)
# Use the parent object's name — the InstanceProxy has no meaningful name
nested_instances = get_display_instances(obj)
for inst in nested_instances:
inst_rep, inst_placement = instance_to_ifc(
ifc, body_context, inst, definition_map, scale=scale, material_manager=material_manager
)
inst_element = _create_element(
ifc, ifc_class, name, inst_rep, inst_placement, storey
)
write_common_properties(ifc, inst_element, obj, category_name)
write_properties(ifc, inst_element, obj)
instance_count += 1
total += 1
if not inst_rep:
no_geometry += 1
if total % 100 == 0:
print(f" ... processed {total} elements")
# ------------------------------------------------------------------ #
# 5. Write output
# ------------------------------------------------------------------ #
file_name = function_inputs.file_name
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
ifc_filename = f"{file_name}_{timestamp}.ifc"
ifc.write(ifc_filename)
automate_context.store_file_result(f"./{ifc_filename}")
print(f"\n{'=' * 60}")
print(f" Export complete!")
print(f" Total exported : {total}")
print(f" Instances : {instance_count}")
print(f" Without geometry : {no_geometry}")
print(f" Skipped (spatial) : {skipped_spatial}")
print(f" Storeys created : {storey_manager.count}")
print(f" Levels : {', '.join(storey_manager.names)}")
print_instance_stats()
print(f"{'=' * 60}\n")
def _create_element(ifc, ifc_class, name, rep, placement, storey):
"""Helper: create an IFC element, assign geometry + placement + container."""
element = ifcopenshell.api.run(
"root.create_entity", ifc,
ifc_class=ifc_class,
name=str(name),
)
if rep and placement:
element.Representation = ifc.createIfcProductDefinitionShape(
Representations=(rep,)
) )
automate_context.mark_run_failed( element.ObjectPlacement = placement
"Automation failed: " elif placement:
f"Found {count} object that have one of the forbidden speckle types: " element.ObjectPlacement = placement
f"{function_inputs.forbidden_speckle_type}"
)
# Set the automation context view to the original model/version view
# to show the offending objects.
automate_context.set_context_view()
else: else:
automate_context.mark_run_success("No forbidden types found.") element.ObjectPlacement = _make_placement(ifc, 0.0, 0.0, 0.0)
# If the function generates file results, this is how it can be
# attached to the Speckle project/model
# automate_context.store_file_result("./report.pdf")
def automate_function_without_inputs(automate_context: AutomationContext) -> None:
"""A function example without inputs.
If your function does not need any input variables,
besides what the automation context provides,
the inputs argument can be omitted.
"""
pass
ifcopenshell.api.run(
"spatial.assign_container", ifc,
relating_structure=storey,
products=[element],
)
return element
# make sure to call the function with the executor # make sure to call the function with the executor
if __name__ == "__main__": if __name__ == "__main__":
+3 -1
View File
@@ -9,13 +9,15 @@ readme = "README.md"
license = "Apache-2.0" license = "Apache-2.0"
keywords = ["speckle", "automate", "bim", "aec"] keywords = ["speckle", "automate", "bim", "aec"]
dependencies = ["specklepy==3.1.0"] dependencies = ["specklepy==3.1.0",
"ifcopenshell==0.8.4.post1",]
[project.optional-dependencies] [project.optional-dependencies]
dev = [ dev = [
"mypy==1.13.0", "mypy==1.13.0",
"pytest==7.4.4", "pytest==7.4.4",
"ruff==0.11.12", "ruff==0.11.12",
] ]
[tool.ruff] [tool.ruff]
File diff suppressed because one or more lines are too long
+4 -2
View File
@@ -23,8 +23,10 @@ def test_function_run(
automation_context, automation_context,
automate_function, automate_function,
FunctionInputs( FunctionInputs(
forbidden_speckle_type="None", file_name="test_output.ifc",
whisper_message=SecretStr("testing automatically"), IFC_PROJECT_NAME = "Speckle Export",
IFC_SITE_NAME = "Site",
IFC_BUILDING_NAME = "Building"
), ),
) )
+34
View File
@@ -0,0 +1,34 @@
# =============================================================================
# config.py
# All user-facing settings. Edit this file before running main.py.
# =============================================================================
# --- Speckle Connection ---
SPECKLE_HOST = "app.speckle.systems" # or your self-hosted server URL
SPECKLE_TOKEN = "40e3222fe7d82ed1796aa4ccd353f38ad098cc84dd" # from app.speckle.systems/profile
# --- Speckle Project ---
PROJECT_ID = "d7d987146d" # the stream/project ID from the URL
VERSION_ID = "d59178f01e" # the specific version/commit to export
# --- IFC Output ---
OUTPUT_PATH = "output3.ifc" # where to write the IFC file
IFC_SCHEMA = "IFC4X3" # IFC4X3 = IFC4.3
# --- Project Metadata (written into the IFC file) ---
IFC_PROJECT_NAME = "Speckle Export"
IFC_SITE_NAME = "Site"
IFC_BUILDING_NAME = "Building"
# --- Units ---
# Speckle unit → metres scale factor
# The exporter reads units from the root object automatically,
# but this is the fallback if units are not set on the stream.
DEFAULT_UNITS = "mm"
UNIT_SCALE = {
"mm": 0.001,
"cm": 0.01,
"m": 1.0,
"ft": 0.3048,
"in": 0.0254,
}
+465
View File
@@ -0,0 +1,465 @@
# =============================================================================
# geometry.py
# Converts Speckle DataObject geometry → IFC IfcFacetedBrep + IfcLocalPlacement
#
# Key facts:
# - After specklepy receive(), vertices and faces are FLAT Python lists
# - displayValue is an array of Mesh objects
# - Units are in mm (for Revit), scale to metres for IFC
# - Vertices are in absolute world coordinates
# =============================================================================
import ifcopenshell
from specklepy.objects.base import Base
# Scale factors → MILLIMETRES (IFC file is declared as mm)
_UNIT_SCALES = {
"mm": 1.0, "millimeter": 1.0, "millimeters": 1.0,
"cm": 10.0, "centimeter": 10.0, "centimeters": 10.0,
"m": 1000.0, "meter": 1000.0, "meters": 1000.0,
"ft": 304.8, "foot": 304.8, "feet": 304.8,
"in": 25.4, "inch": 25.4, "inches": 25.4,
}
# --------------------------------------------------------------------------- #
# Geometry validation helpers (GEM111 + BRP002 fixes)
# --------------------------------------------------------------------------- #
# Minimum distance in mm below which two vertices are considered identical (GEM111).
_VERTEX_MERGE_TOL = 0.01 # 0.01 mm
def snap_coord(v: float) -> int:
"""Snap a coordinate to integer grid at _VERTEX_MERGE_TOL resolution."""
return round(v / _VERTEX_MERGE_TOL)
def _find_connected_components(snapped_faces: list) -> list:
"""
Union-Find: group face indices into connected components.
Two faces are connected if they share an edge (pair of snapped vertex keys).
Returns list of components, each a list of face indices.
BRP002 requires all faces in an IfcClosedShell to form ONE component.
If multiple components exist, each must become a separate IfcClosedShell.
"""
n = len(snapped_faces)
if n == 0:
return []
parent = list(range(n))
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(a, b):
parent[find(a)] = find(b)
# Map each edge to the first face that used it, then union subsequent faces
edge_to_face = {}
for fi, keys in enumerate(snapped_faces):
for i in range(len(keys)):
edge = frozenset([keys[i], keys[(i + 1) % len(keys)]])
if edge in edge_to_face:
union(fi, edge_to_face[edge])
else:
edge_to_face[edge] = fi
from collections import defaultdict
groups: dict = defaultdict(list)
for fi in range(n):
groups[find(fi)].append(fi)
return list(groups.values())
def build_ifc_breps(ifc, verts_scaled: list, face_groups: list) -> list:
"""
Build a list of IfcFacetedBrep from scaled (x,y,z) vertices and face index groups.
GEM111 fix: skip faces with near-duplicate vertices (snapped to same grid cell).
BRP002 fix: split faces into connected components; each component → its own
IfcClosedShell → IfcFacetedBrep so every shell is arc-wise connected.
verts_scaled: flat list of already-scaled floats [x0,y0,z0, x1,y1,z1, ...]
face_groups: list of index lists [[i,j,k], [i,j,k,l], ...]
Returns: list of IfcFacetedBrep (one per connected component, never empty).
"""
# Pass 1: validate faces and build snapped key lists for connectivity analysis
valid_faces = [] # list of (pts_raw, snapped_keys)
for indices in face_groups:
try:
pts_raw = []
snapped = []
degenerate = False
seen = set()
for i in indices:
x = float(verts_scaled[i * 3])
y = float(verts_scaled[i * 3 + 1])
z = float(verts_scaled[i * 3 + 2])
key = (snap_coord(x), snap_coord(y), snap_coord(z))
if key in seen:
degenerate = True
break
seen.add(key)
pts_raw.append((x, y, z))
snapped.append(key)
if degenerate or len(pts_raw) < 3:
continue
valid_faces.append((pts_raw, snapped))
except Exception:
continue
if not valid_faces:
return []
# Pass 2: split into connected components (BRP002)
snapped_only = [f[1] for f in valid_faces]
components = _find_connected_components(snapped_only)
# Pass 3: build one IfcFacetedBrep per component
breps = []
for component_indices in components:
ifc_faces = []
for fi in component_indices:
pts_raw, _ = valid_faces[fi]
try:
pts = [ifc.createIfcCartesianPoint([x, y, z]) for x, y, z in pts_raw]
poly = ifc.createIfcPolyLoop(pts)
bound = ifc.createIfcFaceOuterBound(poly, True)
ifc_faces.append(ifc.createIfcFace([bound]))
except Exception:
continue
if not ifc_faces:
continue
shell = ifc.createIfcClosedShell(ifc_faces)
breps.append(ifc.createIfcFacetedBrep(shell))
return breps
# Keep old name as alias so instances.py import works unchanged
def build_ifc_faces(ifc, verts_scaled: list, face_groups: list) -> list:
"""Legacy wrapper — returns flat list of IfcFace (no connectivity splitting)."""
# Used only as a fallback; callers should prefer build_ifc_breps directly.
breps = build_ifc_breps(ifc, verts_scaled, face_groups)
# Return the faces from all shells combined (for callers that need face lists)
faces = []
for brep in breps:
faces.extend(brep.Outer.CfsFaces)
return faces
# --------------------------------------------------------------------------- #
# Safe data access helpers
# --------------------------------------------------------------------------- #
def _get(obj, key, default=None):
"""
Safe access for specklepy Base objects.
Tries attribute access first, then bracket access.
"""
try:
val = getattr(obj, key, None)
if val is not None:
return val
except Exception:
pass
try:
val = obj[key]
if val is not None:
return val
except Exception:
pass
return default
def unwrap_chunks(raw) -> list:
"""
Flatten a Speckle data array into a plain Python list of numbers.
Handles two cases:
1. Already flat list of numbers (after specklepy receive deserializes)
→ [3, 0, 1, 2, 3, ...] returned as-is
2. List of DataChunk objects (raw from server before deserialization)
→ each chunk's .data list is concatenated
Both cases are handled so this function is always safe to call.
"""
if not raw:
return []
result = []
for item in raw:
if item is None:
continue
# Plain number — already flat
if isinstance(item, (int, float)):
result.append(item)
continue
# DataChunk — unwrap .data
speckle_type = getattr(item, "speckle_type", "") or ""
if "DataChunk" in speckle_type:
chunk_data = _get(item, "data") or _get(item, "@data")
if chunk_data:
result.extend(list(chunk_data))
else:
# Unknown — try iterating (handles nested lists)
try:
result.extend(list(item))
except Exception:
pass
return result
def _resolve_scale(obj, stream_scale: float) -> float:
"""Resolve unit scale: obj.units → stream fallback."""
units = _get(obj, "units")
if units and isinstance(units, str):
return _UNIT_SCALES.get(units.lower().strip(), stream_scale)
return stream_scale
# --------------------------------------------------------------------------- #
# Mesh extraction
# --------------------------------------------------------------------------- #
def _is_mesh(item) -> bool:
"""
Detect if a specklepy object is a Mesh.
Uses speckle_type string — more reliable than hasattr on Base objects.
"""
if item is None:
return False
speckle_type = _get(item, "speckle_type") or ""
if "Mesh" in speckle_type:
return True
# Fallback: has both vertices and faces data
verts = _get(item, "vertices")
faces = _get(item, "faces")
return verts is not None and faces is not None
def get_display_meshes(obj: Base) -> list:
"""
Extract all Mesh objects from a DataObject's displayValue.
displayValue is always an array per the Speckle schema docs.
"""
meshes = []
for key in ["displayValue", "@displayValue"]:
display = _get(obj, key)
if display is None:
continue
items = display if isinstance(display, list) else [display]
for item in items:
if _is_mesh(item):
meshes.append(item)
if meshes:
break # found meshes, don't check @displayValue too
# Fallback: object itself is a Mesh
if not meshes and _is_mesh(obj):
speckle_type = _get(obj, "speckle_type") or ""
if "Mesh" in speckle_type:
meshes.append(obj)
return meshes
def get_display_instances(obj: Base) -> list:
"""
Extract InstanceProxy objects from a DataObject's displayValue.
Per the official speckleifc converter, every IFC element's displayValue
contains InstanceProxy objects (not raw meshes). Each InstanceProxy has:
- transform: 16-float row-major matrix, translation in metres
- definitionId: "DEFINITION:{meshAppId}" string
- units: "m"
Raw meshes do NOT appear in displayValue in IFC→Speckle exports.
"""
instances = []
for key in ["displayValue", "@displayValue"]:
display = _get(obj, key)
if display is None:
continue
items = display if isinstance(display, list) else [display]
for item in items:
if item is None:
continue
transform = _get(item, "transform")
definition_id = _get(item, "definitionId")
if transform is not None and definition_id is not None:
instances.append(item)
if instances:
break
return instances
# --------------------------------------------------------------------------- #
# Face decoding
# --------------------------------------------------------------------------- #
def decode_faces(faces_raw: list) -> list:
"""
Decode Speckle's run-length encoded face list into vertex index groups.
Format: [n, i0, i1, ..., n, i0, i1, ...]
n=0 → triangle (legacy), n=1 → quad (legacy), n≥3 → n-gon
"""
decoded = []
i = 0
while i < len(faces_raw):
n = int(faces_raw[i])
if n == 0:
n = 3
elif n == 1:
n = 4
end = i + 1 + n
if end > len(faces_raw):
break
indices = [int(faces_raw[i + 1 + j]) for j in range(n)]
decoded.append(indices)
i = end
return decoded
# --------------------------------------------------------------------------- #
# Bounding box + placement
# --------------------------------------------------------------------------- #
def compute_origin(flat_verts: list) -> tuple:
"""
Compute placement origin from scaled vertex list (metres).
X, Y = bounding box centroid
Z = minimum Z (bottom face of element — more natural for IFC)
"""
xs = flat_verts[0::3]
ys = flat_verts[1::3]
zs = flat_verts[2::3]
cx = (min(xs) + max(xs)) / 2.0
cy = (min(ys) + max(ys)) / 2.0
cz = min(zs)
return cx, cy, cz
def _make_placement(ifc, x: float, y: float, z: float):
"""Create an IfcLocalPlacement at absolute world coordinates (metres)."""
origin = ifc.createIfcCartesianPoint([x, y, z])
z_axis = ifc.createIfcDirection([0.0, 0.0, 1.0])
x_axis = ifc.createIfcDirection([1.0, 0.0, 0.0])
a2p = ifc.createIfcAxis2Placement3D(origin, z_axis, x_axis)
return ifc.createIfcLocalPlacement(PlacementRelTo=None, RelativePlacement=a2p)
# --------------------------------------------------------------------------- #
# Main conversion
# --------------------------------------------------------------------------- #
def mesh_to_ifc(
ifc: ifcopenshell.file,
body_context,
obj: Base,
scale: float = 0.001,
material_manager=None,
) -> tuple:
"""
Convert a Speckle DataObject → (IfcShapeRepresentation, IfcLocalPlacement).
Creates one IfcFacetedBrep per mesh so each can carry its own material style.
Returns (None, None) if no usable geometry is found.
"""
meshes = get_display_meshes(obj)
if not meshes:
return None, None
obj_scale = _resolve_scale(obj, scale)
# ------------------------------------------------------------------ #
# Pass 1: collect all scaled vertices to compute world origin
# ------------------------------------------------------------------ #
all_scaled = []
for mesh in meshes:
raw_verts = _get(mesh, "vertices") or []
verts = unwrap_chunks(list(raw_verts))
if not verts:
continue
ms = _resolve_scale(mesh, obj_scale)
for i in range(0, len(verts) - 2, 3):
all_scaled.extend([
float(verts[i]) * ms,
float(verts[i+1]) * ms,
float(verts[i+2]) * ms,
])
if not all_scaled:
return None, None
ox, oy, oz = compute_origin(all_scaled)
# ------------------------------------------------------------------ #
# Pass 2: one brep per mesh (so each can have its own material style)
# ------------------------------------------------------------------ #
brep_items = []
for mesh in meshes:
raw_verts = _get(mesh, "vertices") or []
raw_faces = _get(mesh, "faces") or []
verts = unwrap_chunks(list(raw_verts))
faces_raw = unwrap_chunks(list(raw_faces))
if not verts or not faces_raw:
continue
ms = _resolve_scale(mesh, obj_scale)
try:
face_groups = decode_faces(faces_raw)
except Exception as e:
print(f" ⚠️ Face decode error: {e}")
continue
# Build pre-scaled vertex list (relative to origin) for this mesh
verts_scaled = []
for vi in range(0, len(verts) - 2, 3):
verts_scaled.append(float(verts[vi]) * ms - ox)
verts_scaled.append(float(verts[vi+1]) * ms - oy)
verts_scaled.append(float(verts[vi+2]) * ms - oz)
mesh_breps = build_ifc_breps(ifc, verts_scaled, face_groups)
if not mesh_breps:
continue
# Apply material style to every component brep of this mesh
if material_manager:
mesh_app_id = _get(mesh, "applicationId")
if mesh_app_id:
for brep in mesh_breps:
material_manager.apply_to_item(brep, str(mesh_app_id))
brep_items.extend(mesh_breps)
if not brep_items:
return None, None
# ------------------------------------------------------------------ #
# Assemble IfcShapeRepresentation + IfcLocalPlacement
# ------------------------------------------------------------------ #
rep = ifc.createIfcShapeRepresentation(
ContextOfItems=body_context,
RepresentationIdentifier="Body",
RepresentationType="Brep",
Items=brep_items,
)
placement = _make_placement(ifc, ox, oy, oz)
return rep, placement
+373
View File
@@ -0,0 +1,373 @@
# =============================================================================
# instances.py
# Handles Speckle InstanceProxy objects from both:
#
# FORMAT A — Revit connector (our actual use case):
# _units = "mm"
# transform = 16 floats, row-major, translation in MM
# definitionId = 64-char uppercase hex hash (matches object id[:32] in tree)
# The definition object lives somewhere in the object tree.
#
# FORMAT B — speckleifc IFC→Speckle converter:
# units = "m"
# transform = 16 floats, row-major, translation in METRES
# definitionId = "DEFINITION:{meshAppId}"
# Definition geometry lives in root → Collection("definitionGeometry")
#
# We detect the format by the definitionId prefix.
# =============================================================================
from specklepy.objects.base import Base
from utils.geometry import _get, unwrap_chunks, decode_faces, _UNIT_SCALES, build_ifc_breps
def is_instance(obj) -> bool:
"""Returns True if this object is a Speckle InstanceProxy."""
return _get(obj, "transform") is not None and _get(obj, "definitionId") is not None
def _is_ifc_format(definition_id: str) -> bool:
"""True if this is speckleifc format (definitionId starts with 'DEFINITION:')."""
return definition_id.startswith("DEFINITION:")
def build_definition_map(root: Base) -> dict:
"""
Build a unified definition map that handles both formats.
Returns dict with keys:
"by_id" : {obj_id_lower[:32] → object} for Revit format
"by_app_id" : {applicationId_lower → object} for Revit format
"ifc_proxies" : {"DEFINITION:xxx" → proxy} for IFC format
"ifc_meshes" : {meshAppId → Mesh} for IFC format
"""
by_id = {}
by_app_id = {}
ifc_proxies = {}
ifc_meshes = {}
# --- Walk entire tree for Revit format ---
_collect_all(root, by_id, by_app_id, depth=0)
# --- Extract speckleifc structures for IFC format ---
proxies_raw = _get(root, "instanceDefinitionProxies")
if proxies_raw:
for proxy in (proxies_raw if isinstance(proxies_raw, list) else [proxies_raw]):
app_id = _get(proxy, "applicationId")
if app_id:
ifc_proxies[app_id] = proxy # original case (for IFC format)
ifc_proxies[app_id.lower()] = proxy # lowercase (for Revit format)
elements = _get(root, "elements") or _get(root, "@elements") or []
for child in (elements if isinstance(elements, list) else []):
if (_get(child, "name") or "") == "definitionGeometry":
geom_elements = _get(child, "elements") or _get(child, "@elements") or []
for mesh in (geom_elements if isinstance(geom_elements, list) else []):
mesh_app_id = _get(mesh, "applicationId")
if mesh_app_id:
ifc_meshes[mesh_app_id] = mesh
print(f" Objects indexed by id: {len(by_id)}")
print(f" Objects indexed by appId: {len(by_app_id)}")
print(f" IFC definition proxies: {len(ifc_proxies)}")
print(f" IFC definition meshes: {len(ifc_meshes)}")
# Diagnostic: dump first 3 instanceDefinitionProxies to understand structure
print("\n [PROXY DIAG] First 3 instanceDefinitionProxies from root:")
proxies_raw2 = _get(root, "instanceDefinitionProxies")
if proxies_raw2:
sample = proxies_raw2 if isinstance(proxies_raw2, list) else [proxies_raw2]
for i, proxy in enumerate(sample[:3]):
app_id = _get(proxy, "applicationId") or "?"
name = _get(proxy, "name") or "?"
objects = _get(proxy, "objects") or []
obj_ids = list(objects)[:3] if objects else []
print(f" [{i}] appId={app_id}")
print(f" name={name}")
print(f" objects={obj_ids} (len={len(list(objects)) if objects else 0})")
# Check if first object is found in our maps
if obj_ids:
oid = str(obj_ids[0])
in_by_id = oid.lower()[:32] in by_id
in_by_app_id = oid.lower() in by_app_id
print(f" objects[0]='{oid}' → in by_id: {in_by_id}, in by_app_id: {in_by_app_id}")
else:
print(" [PROXY DIAG] No instanceDefinitionProxies found on root!")
# Check where they might be
for key in ["@instanceDefinitionProxies", "instancedefinitionproxies"]:
val = _get(root, key)
if val:
print(f" Found under key '{key}': {type(val)}")
return {
"by_id": by_id,
"by_app_id": by_app_id,
"ifc_proxies": ifc_proxies,
"ifc_meshes": ifc_meshes,
}
def _collect_all(obj, by_id: dict, by_app_id: dict, depth: int):
if obj is None or depth > 25:
return
obj_id = _get(obj, "id")
if obj_id and isinstance(obj_id, str):
key = obj_id.lower()
by_id[key] = obj
# Also store truncated — definitionId (64 chars) matches id (32 chars)
if len(key) == 32:
by_id[key] = obj
elif len(key) > 32:
by_id[key[:32]] = obj
app_id = _get(obj, "applicationId")
if app_id and isinstance(app_id, str):
by_app_id[app_id.lower()] = obj
for key in ["elements", "@elements", "displayValue", "@displayValue",
"objects", "@objects", "definition", "@definition"]:
try:
children = obj[key]
if children is None:
continue
if not isinstance(children, list):
children = [children]
for child in children:
_collect_all(child, by_id, by_app_id, depth + 1)
except Exception:
continue
def _get_revit_meshes(definition_id: str, definition_map: dict) -> list:
"""
Revit format:
definitionId (64-char hex) → InstanceDefinitionProxy.applicationId
proxy.objects[0] is a UUID applicationId → find mesh by applicationId
"""
from utils.geometry import get_display_meshes
# Step 1: find the InstanceDefinitionProxy by its applicationId (case-insensitive)
ifc_proxies = definition_map.get("ifc_proxies", {})
proxy = ifc_proxies.get(definition_id) or ifc_proxies.get(definition_id.lower())
if proxy is None:
return []
# Step 2: get the mesh applicationIds from proxy.objects
object_ids = _get(proxy, "objects") or []
if not isinstance(object_ids, list):
object_ids = list(object_ids)
# Step 3: look up each mesh by applicationId
by_app_id = definition_map.get("by_app_id", {})
meshes = []
for oid in object_ids:
obj = by_app_id.get(str(oid).lower())
if obj is not None:
# The found object may itself be a mesh, or contain displayValue meshes
found_meshes = get_display_meshes(obj)
if found_meshes:
meshes.extend(found_meshes)
else:
# It IS the mesh directly
meshes.append(obj)
return meshes
def _get_ifc_meshes(definition_id: str, definition_map: dict) -> list:
"""
IFC format: definitionId = "DEFINITION:224058_mat0"
Look up proxy → objects list → meshes from ifc_meshes dict.
"""
ifc_proxies = definition_map.get("ifc_proxies", {})
ifc_meshes = definition_map.get("ifc_meshes", {})
proxy = ifc_proxies.get(definition_id)
if proxy is None:
return []
object_ids = _get(proxy, "objects") or []
result = []
for oid in (object_ids if isinstance(object_ids, list) else [object_ids]):
mesh = ifc_meshes.get(str(oid))
if mesh is not None:
result.append(mesh)
return result
def _resolve_instance_scale(obj, stream_scale: float) -> float:
"""
Resolve scale for the transform translation.
Tries bracket access for '_units' (Revit uses underscore).
IFC format instances have units="m" → scale=1.0 (no scaling).
"""
for key in ["units", "_units"]:
try:
units = obj[key]
if units and isinstance(units, str):
s = _UNIT_SCALES.get(units.lower().strip())
if s is not None:
return s
except Exception:
pass
return stream_scale
def _parse_transform(t: list, scale: float) -> tuple:
"""
Row-major 4x4 matrix.
Translation at t[3], t[7], t[11] — scaled to metres.
Local X axis = row 0, Local Z axis = row 2.
"""
tx = float(t[3]) * scale
ty = float(t[7]) * scale
tz = float(t[11]) * scale
x_axis = (float(t[0]), float(t[1]), float(t[2]))
z_axis = (float(t[8]), float(t[9]), float(t[10]))
return (tx, ty, tz), x_axis, z_axis
def _make_ifc_placement(ifc, tx, ty, tz, x_axis, z_axis):
origin = ifc.createIfcCartesianPoint([tx, ty, tz])
x_dir = ifc.createIfcDirection(list(x_axis))
z_dir = ifc.createIfcDirection(list(z_axis))
a2p = ifc.createIfcAxis2Placement3D(origin, z_dir, x_dir)
return ifc.createIfcLocalPlacement(PlacementRelTo=None, RelativePlacement=a2p)
# Stats
_stats = {"found": 0, "not_found": 0}
_dbg_cnt = [0]
_MM_SCALES = {
"mm": 1.0, "millimeter": 1.0, "millimeters": 1.0,
"cm": 10.0, "centimeter": 10.0,
"m": 1000.0, "meter": 1000.0, "meters": 1000.0,
"ft": 304.8, "in": 25.4,
}
def _apply_transform(t: list, vx: float, vy: float, vz: float, ts: float) -> tuple:
"""
Apply a row-major 4x4 transform to a single vertex.
ts = scale factor applied to the translation components only (not rotation).
For Revit mm data with IFC in mm: ts=1.0 (no conversion).
For IFC-format transforms (metres): ts=1000.0 (m→mm).
Rotation components are dimensionless and never scaled.
"""
x = t[0]*vx + t[1]*vy + t[2]*vz + t[3] * ts
y = t[4]*vx + t[5]*vy + t[6]*vz + t[7] * ts
z = t[8]*vx + t[9]*vy + t[10]*vz + t[11] * ts
return x, y, z
def instance_to_ifc(ifc, body_context, obj: Base, definition_map: dict,
scale: float = 1.0, material_manager=None):
"""
Convert a Speckle InstanceProxy → (IfcShapeRepresentation, IfcLocalPlacement).
Strategy: BAKE the full 4x4 transform into every vertex (world coordinates).
Creates one IfcFacetedBrep per definition mesh so each can carry its own
material style via renderMaterialProxies.
"""
transform_raw = _get(obj, "transform")
if not transform_raw:
return None, None
t = list(transform_raw)
if len(t) != 16:
return None, None
definition_id = _get(obj, "definitionId") or ""
ifc_format = _is_ifc_format(definition_id)
# Translation scale: IFC format transform is in metres → convert to mm
# Revit format transform is already in mm (same as IFC file units)
ts = 1000.0 if ifc_format else _resolve_instance_scale(obj, scale)
if _dbg_cnt[0] < 6:
_dbg_cnt[0] += 1
fmt = "IFC" if ifc_format else "Revit"
x_axis = (round(t[0],2), round(t[1],2), round(t[2],2))
z_axis = (round(t[8],2), round(t[9],2), round(t[10],2))
print(f" [INST {_dbg_cnt[0]} {fmt}] {definition_id[:40]}")
print(f" t[3]={t[3]:.1f} t[7]={t[7]:.1f} t[11]={t[11]:.1f} x={x_axis} z={z_axis}")
# World-origin placement (geometry is baked to world coords)
origin = ifc.createIfcCartesianPoint([0.0, 0.0, 0.0])
a2p = ifc.createIfcAxis2Placement3D(origin, None, None)
placement = ifc.createIfcLocalPlacement(PlacementRelTo=None, RelativePlacement=a2p)
# Get definition meshes
if ifc_format:
meshes = _get_ifc_meshes(definition_id, definition_map)
else:
meshes = _get_revit_meshes(definition_id, definition_map)
if not meshes:
_stats["not_found"] += 1
return None, placement
_stats["found"] += 1
# One brep per mesh so each can have its own material style
brep_items = []
for mesh in meshes:
raw_verts = _get(mesh, "vertices") or []
raw_faces = _get(mesh, "faces") or []
verts = unwrap_chunks(list(raw_verts))
faces_raw = unwrap_chunks(list(raw_faces))
if not verts or not faces_raw:
continue
mesh_units = _get(mesh, "units") or _get(mesh, "_units") or ("m" if ifc_format else "mm")
ms = _MM_SCALES.get(mesh_units.lower().strip(), 1.0)
try:
face_groups = decode_faces(faces_raw)
except Exception as e:
print(f" ⚠️ Instance face decode: {e}")
continue
# Pre-compute world coords for all vertices in this mesh
verts_world = []
for vi in range(0, len(verts) - 2, 3):
lx = float(verts[vi]) * ms
ly = float(verts[vi+1]) * ms
lz = float(verts[vi+2]) * ms
wx, wy, wz = _apply_transform(t, lx, ly, lz, ts)
verts_world.append(wx)
verts_world.append(wy)
verts_world.append(wz)
mesh_breps = build_ifc_breps(ifc, verts_world, face_groups)
if not mesh_breps:
continue
# Apply material style to every component brep of this mesh
if material_manager:
mesh_app_id = _get(mesh, "applicationId")
if mesh_app_id:
for brep in mesh_breps:
material_manager.apply_to_item(brep, str(mesh_app_id))
brep_items.extend(mesh_breps)
if not brep_items:
return None, placement
rep = ifc.createIfcShapeRepresentation(
ContextOfItems=body_context,
RepresentationIdentifier="Body",
RepresentationType="Brep",
Items=brep_items,
)
return rep, placement
def print_instance_stats():
total = _stats["found"] + _stats["not_found"]
print(f" Instance resolution: {_stats['found']}/{total} definitions found")
if _stats["not_found"] > 0:
print(f" ⚠️ {_stats['not_found']} instances had no definition geometry")
+137
View File
@@ -0,0 +1,137 @@
# =============================================================================
# mapper.py
# Maps Speckle speckle_type strings and Revit category names → IFC entity classes.
#
# Strategy:
# 1. Try to match speckle_type exactly or by prefix
# 2. Fall back to Revit category name (e.g. "Floors" → IfcSlab)
# 3. Fall back to IfcBuildingElementProxy if nothing matches
# =============================================================================
# --- speckle_type → IFC class ---
# Covers Objects.BuiltElements.* from the Speckle Objects kit
SPECKLE_TYPE_MAP: dict[str, str] = {
"Objects.BuiltElements.Wall": "IfcWall",
"Objects.BuiltElements.Floor": "IfcSlab",
"Objects.BuiltElements.Roof": "IfcRoof",
"Objects.BuiltElements.Column": "IfcColumn",
"Objects.BuiltElements.Beam": "IfcBeam",
"Objects.BuiltElements.Brace": "IfcMember",
"Objects.BuiltElements.Duct": "IfcDuctSegment",
"Objects.BuiltElements.Pipe": "IfcPipeSegment",
"Objects.BuiltElements.Wire": "IfcCableCarrierSegment",
"Objects.BuiltElements.Opening": "IfcOpeningElement",
"Objects.BuiltElements.Room": "IfcSpace",
"Objects.BuiltElements.Ceiling": "IfcCovering",
"Objects.BuiltElements.Stair": "IfcStair",
"Objects.BuiltElements.Ramp": "IfcRamp",
"Objects.BuiltElements.Foundation": "IfcFooting",
"Objects.BuiltElements.Grid": "IfcGrid",
"Objects.BuiltElements.Level": "IfcBuildingStorey",
"Objects.BuiltElements.Revit.RevitWall": "IfcWall",
"Objects.BuiltElements.Revit.RevitFloor": "IfcSlab",
"Objects.BuiltElements.Revit.RevitRoof": "IfcRoof",
"Objects.BuiltElements.Revit.RevitColumn": "IfcColumn",
"Objects.BuiltElements.Revit.RevitBeam": "IfcBeam",
"Objects.BuiltElements.Revit.RevitBrace": "IfcMember",
"Objects.BuiltElements.Revit.RevitDuct": "IfcDuctSegment",
"Objects.BuiltElements.Revit.RevitPipe": "IfcPipeSegment",
"Objects.BuiltElements.Revit.RevitRoom": "IfcSpace",
"Objects.BuiltElements.Revit.RevitStair": "IfcStair",
"Objects.BuiltElements.Revit.RevitRailing": "IfcRailing",
"Objects.BuiltElements.Revit.RevitCeiling": "IfcCovering",
"Objects.BuiltElements.Revit.RevitTopography": "IfcGeographicElement",
"Objects.BuiltElements.Revit.RevitElementType": "IfcBuildingElementProxy",
"Objects.Geometry.Mesh": "IfcBuildingElementProxy",
"Objects.Geometry.Brep": "IfcBuildingElementProxy",
}
# --- Revit category name → IFC class (fallback) ---
CATEGORY_MAP: dict[str, str] = {
"Walls": "IfcWall",
"Floors": "IfcSlab",
"Roofs": "IfcRoof",
"Structural Columns": "IfcColumn",
"Columns": "IfcColumn",
"Structural Framing": "IfcBeam",
"Beams": "IfcBeam",
"Ducts": "IfcDuctSegment",
"Pipes": "IfcPipeSegment",
"Conduits": "IfcCableCarrierSegment",
"Cable Trays": "IfcCableCarrierSegment",
"Rooms": "IfcSpace",
"Spaces": "IfcSpace",
"Ceilings": "IfcCovering",
"Stairs": "IfcStair",
"Ramps": "IfcRamp",
"Railings": "IfcRailing",
"Curtain Panels": "IfcCurtainWall",
"Curtain Wall Mullions": "IfcMember",
"Doors": "IfcDoor",
"Windows": "IfcWindow",
"Furniture": "IfcFurnishingElement",
"Furniture Systems": "IfcFurnishingElement",
"Casework": "IfcFurnishingElement",
"Plumbing Fixtures": "IfcSanitaryTerminal",
"Electrical Fixtures": "IfcElectricAppliance",
"Lighting Fixtures": "IfcLightFixture",
"Mechanical Equipment": "IfcUnitaryEquipment",
"Electrical Equipment": "IfcElectricDistributionBoard",
"Structural Foundations": "IfcFooting",
"Foundation Slabs": "IfcSlab",
"Topography": "IfcGeographicElement",
"Site": "IfcSite",
"Parking": "IfcSpace",
"Generic Models": "IfcBuildingElementProxy",
"Mass": "IfcBuildingElementProxy",
"Specialty Equipment": "IfcBuildingElementProxy",
}
def classify(obj, category_name: str = "") -> str:
"""
Determine the IFC class for a Speckle object.
With the new Objects.Data.DataObject:Objects.Data.RevitObject speckle_type,
category name is now the primary classification signal.
Args:
obj: A specklepy Base object (leaf element).
category_name: The Revit category string from the traversal context
e.g. "Floors", "Walls", "Structural Columns"
Returns:
An IFC class name string e.g. "IfcWall"
"""
speckle_type = getattr(obj, "speckle_type", "") or ""
# 1. Category name — PRIMARY lookup for RevitObject types
if category_name:
# Exact match
if category_name in CATEGORY_MAP:
return CATEGORY_MAP[category_name]
# Partial match handles Revit appending IDs e.g. "Structural Framing [12345]"
for key, ifc_class in CATEGORY_MAP.items():
if key.lower() in category_name.lower():
return ifc_class
# 2. Read 'category' directly off the object itself
# Per docs: category is a TOP-LEVEL field on RevitObject, not inside properties
obj_category = getattr(obj, "category", None)
if obj_category and isinstance(obj_category, str):
if obj_category in CATEGORY_MAP:
return CATEGORY_MAP[obj_category]
for key, ifc_class in CATEGORY_MAP.items():
if key.lower() in obj_category.lower():
return ifc_class
# 3. speckle_type — fallback for non-RevitObject types (geometry, structural, etc.)
if speckle_type in SPECKLE_TYPE_MAP:
return SPECKLE_TYPE_MAP[speckle_type]
for key, ifc_class in SPECKLE_TYPE_MAP.items():
if speckle_type.startswith(key):
return ifc_class
# 4. Last resort
return "IfcBuildingElementProxy"
+151
View File
@@ -0,0 +1,151 @@
# =============================================================================
# materials.py
# Reads renderMaterialProxies from the Speckle root object and applies
# IfcSurfaceStyle colours to IFC geometry.
#
# Structure of renderMaterialProxies:
# root.renderMaterialProxies = [
# {
# id: "636259b3..."
# value: RenderMaterial {
# name: "Glass"
# diffuse: -16744256 ← ARGB packed int (A=255, R=0, G=128, B=192)
# opacity: 0.1 ← 0=transparent, 1=opaque
# }
# objects: ["a1a6b0c2-...", "d5dd3127-...", ...] ← mesh applicationIds
# },
# ...
# ]
#
# Usage:
# mgr = MaterialManager(ifc, root)
# mgr.apply_to_item(brep_item, mesh_app_id)
# =============================================================================
import ifcopenshell
import ifcopenshell.api
from specklepy.objects.base import Base
def _argb_to_rgb(argb_int: int) -> tuple[float, float, float]:
"""Unpack a signed ARGB int to normalised (R, G, B) floats 0..1."""
unsigned = argb_int & 0xFFFFFFFF
r = ((unsigned >> 16) & 0xFF) / 255.0
g = ((unsigned >> 8) & 0xFF) / 255.0
b = (unsigned & 0xFF) / 255.0
return r, g, b
def _get(obj, key, default=None):
try:
val = getattr(obj, key, None)
if val is not None:
return val
except Exception:
pass
try:
val = obj[key]
if val is not None:
return val
except Exception:
pass
return default
class MaterialManager:
"""
Builds a lookup from mesh applicationId → IfcSurfaceStyle,
then applies styles to IFC geometry items.
"""
def __init__(self, ifc: ifcopenshell.file, root: Base):
self._ifc = ifc
# mesh applicationId (lowercase) → IfcSurfaceStyle (populated lazily)
self._style_map: dict[str, object] = {}
# name → IfcSurfaceStyle (cache to avoid duplicates)
self._style_cache: dict[str, object] = {}
self._build(root)
def _build(self, root: Base):
"""
Parse renderMaterialProxies and store raw material data keyed by mesh applicationId.
IFC styles are created lazily (only when actually assigned to geometry) to avoid
orphaned IfcSurfaceStyle instances that would fail IFC105 validation.
"""
proxies = _get(root, "renderMaterialProxies") or []
if not isinstance(proxies, list):
proxies = list(proxies) if proxies else []
# mesh applicationId (lowercase) → (name, diffuse_argb, transparency)
self._material_data: dict[str, tuple] = {}
for proxy in proxies:
material = _get(proxy, "value")
if material is None:
continue
name = _get(material, "name") or "Unnamed"
diffuse = _get(material, "diffuse")
opacity = _get(material, "opacity")
if diffuse is None:
continue
opacity_val = float(opacity) if opacity is not None else 1.0
transparency = max(0.0, min(1.0, 1.0 - opacity_val))
objects = _get(proxy, "objects") or []
for app_id in (objects if isinstance(objects, list) else []):
if app_id:
self._material_data[str(app_id).lower()] = (name, int(diffuse), transparency)
print(f" Materials: {len(self._material_data)} mesh mappings (styles created on demand)")
def _get_or_create_style(self, name: str, diffuse_argb: int, transparency: float):
"""Return cached style or create a new IfcSurfaceStyle."""
cache_key = f"{name}|{diffuse_argb}|{transparency:.4f}"
if cache_key in self._style_cache:
return self._style_cache[cache_key]
r, g, b = _argb_to_rgb(diffuse_argb)
style = ifcopenshell.api.run("style.add_style", self._ifc, name=name)
ifcopenshell.api.run(
"style.add_surface_style",
self._ifc,
style=style,
ifc_class="IfcSurfaceStyleRendering",
attributes={
"SurfaceColour": {"Name": None, "Red": r, "Green": g, "Blue": b},
"Transparency": transparency,
"ReflectanceMethod": "NOTDEFINED",
},
)
self._style_cache[cache_key] = style
return style
def get_style(self, mesh_app_id: str):
"""Return the IfcSurfaceStyle for a mesh applicationId (created on demand), or None."""
key = str(mesh_app_id).lower()
# Return already-created style if cached
if key in self._style_map:
return self._style_map[key]
# Create style now only if this mesh has material data
data = self._material_data.get(key)
if data is None:
return None
name, diffuse, transparency = data
style = self._get_or_create_style(name, diffuse, transparency)
self._style_map[key] = style
return style
def apply_to_item(self, item, mesh_app_id: str):
"""Assign the material style to a single IFC geometry item (e.g. IfcFacetedBrep)."""
style = self.get_style(mesh_app_id)
if style is None:
return
try:
ifcopenshell.api.run(
"style.assign_item_style",
self._ifc,
item=item,
style=style,
)
except Exception as e:
pass # Non-fatal — geometry still exports without colour
+177
View File
@@ -0,0 +1,177 @@
# =============================================================================
# properties.py
# Extracts Revit data from Speckle DataObjects and writes IFC property sets.
#
# Revit parameter structure from the Speckle connector:
# obj.properties = {
# "elementId": "704282",
# "Parameters": {
# "Type Parameters": {
# "Dimensions": {
# "Thickness": {"name": "Thickness", "value": 25.4, "units": "Millimeters", ...}
# },
# ...
# },
# "Instance Parameters": {
# "Constraints": {
# "Level": {"name": "Level", "value": "Level 1", ...}
# },
# ...
# }
# }
# }
#
# We flatten this into two IFC property sets:
# Pset_RevitTypeParameters — from "Type Parameters"
# Pset_RevitInstanceParameters — from "Instance Parameters"
# =============================================================================
import ifcopenshell.api
from specklepy.objects.base import Base
def _safe_val(value) -> str | None:
"""Convert a value to a clean IFC-safe string."""
if value is None:
return None
if isinstance(value, bool):
return "Yes" if value else "No"
if isinstance(value, float):
# Trim excessive decimals
return f"{value:.6g}"
if isinstance(value, (int, str)):
s = str(value).strip()
return s if s else None
return str(value).strip() or None
def _extract_param(entry) -> tuple[str, str] | None:
"""
Given a Revit parameter entry dict like:
{"name": "Thickness", "value": 25.4, "units": "Millimeters", ...}
Returns (display_name, display_value) or None if unusable.
"""
if not isinstance(entry, dict):
return None
name = entry.get("name")
value = entry.get("value")
if not name or value is None:
return None
units = entry.get("units", "")
# Skip non-informative unit labels
skip_units = {"", "None", "General", "Currency", "Integer"}
val_str = _safe_val(value)
if val_str is None:
return None
if units and units not in skip_units:
display = f"{val_str} {units}"
else:
display = val_str
return str(name), display
def _flatten_param_group(group: dict) -> dict:
"""
Flatten one parameter group (e.g. "Dimensions", "Constraints") dict.
Each value is a Revit parameter entry {"name":..., "value":..., "units":...}.
Returns {display_name: display_value}.
"""
result = {}
if not isinstance(group, dict):
return result
for _internal_key, entry in group.items():
pair = _extract_param(entry)
if pair:
name, val = pair
result[name] = val
return result
def _extract_parameter_block(params_block: dict) -> dict:
"""
Flatten all groups in a parameter block (Type Parameters or Instance Parameters).
Returns a merged {display_name: display_value} dict.
"""
result = {}
if not isinstance(params_block, dict):
return result
for _group_name, group in params_block.items():
result.update(_flatten_param_group(group))
return result
def _get_properties_dict(obj: Base) -> dict:
"""Extract the raw properties dict from a DataObject."""
for key in ["properties", "@properties", "_properties"]:
try:
props = obj[key]
if props is None:
continue
if hasattr(props, "get_dynamic_member_names"):
names = props.get_dynamic_member_names()
return {n: props[n] for n in names}
if isinstance(props, dict):
return props
except Exception:
continue
return {}
def _write_pset(ifc, element, pset_name: str, props: dict):
"""Write a property set if there are any properties."""
if not props:
return
try:
pset = ifcopenshell.api.run("pset.add_pset", ifc, product=element, name=pset_name)
ifcopenshell.api.run("pset.edit_pset", ifc, pset=pset, properties=props)
except Exception as e:
print(f" ⚠️ {pset_name}: {e}")
def write_properties(ifc, element, obj: Base):
"""
Write Revit parameters as IFC property sets.
Creates separate psets for Type and Instance parameters.
"""
props_dict = _get_properties_dict(obj)
parameters = props_dict.get("Parameters") or {}
# Type Parameters → Pset_RevitTypeParameters
type_params = parameters.get("Type Parameters") or {}
type_flat = _extract_parameter_block(type_params)
_write_pset(ifc, element, "RVT_TypeParameters", type_flat)
# Instance Parameters → Pset_RevitInstanceParameters
inst_params = parameters.get("Instance Parameters") or {}
inst_flat = _extract_parameter_block(inst_params)
_write_pset(ifc, element, "RVT_InstanceParameters", inst_flat)
# Top-level semantic fields → Pset_RevitIdentity
identity = {}
for field in ["type", "family", "category", "level"]:
val = getattr(obj, field, None)
if val and isinstance(val, str) and val.strip():
identity[field.capitalize()] = val.strip()
# Also include elementId if present
elem_id = props_dict.get("elementId")
if elem_id:
identity["ElementId"] = str(elem_id)
_write_pset(ifc, element, "RVT_Identity", identity)
def write_common_properties(ifc, element, obj: Base, category_name: str = ""):
"""
Write Pset_SpeckleData for traceability back to the Speckle source object.
"""
props = {}
speckle_id = getattr(obj, "id", None)
app_id = getattr(obj, "applicationId", None)
speckle_type = getattr(obj, "speckle_type", None)
if speckle_id: props["SpeckleId"] = str(speckle_id)
if app_id: props["ApplicationId"] = str(app_id)
if speckle_type: props["SpeckleType"] = str(speckle_type)
if category_name: props["RevitCategory"] = str(category_name)
_write_pset(ifc, element, "RVT_SpeckleData", props)
+68
View File
@@ -0,0 +1,68 @@
# =============================================================================
# receiver.py
# Connects to Speckle and receives the root Base object for a given version.
# =============================================================================
from specklepy.api.client import SpeckleClient
from specklepy.api.credentials import get_default_account
from specklepy.api import operations
from specklepy.transports.server import ServerTransport
import utils.config as config
def get_client() -> SpeckleClient:
"""
Create and authenticate a SpeckleClient.
Uses a personal access token from config.py.
To use your local Speckle Manager account instead, swap to get_default_account().
"""
client = SpeckleClient(host=config.SPECKLE_HOST)
if config.SPECKLE_TOKEN and config.SPECKLE_TOKEN != "YOUR_PERSONAL_ACCESS_TOKEN":
client.authenticate_with_token(config.SPECKLE_TOKEN)
else:
# Fallback: use account from Speckle Manager desktop app
account = get_default_account()
if account is None:
raise RuntimeError(
"No Speckle account found. Either set SPECKLE_TOKEN in config.py "
"or log in via Speckle Manager."
)
client.authenticate_with_account(account)
return client
def receive_version(project_id: str, version_id: str):
"""
Receive the root Base object from a Speckle version.
Args:
project_id: The Speckle project (stream) ID.
version_id: The version (commit) ID to receive.
Returns:
A specklepy Base object — the root of the object graph.
"""
client = get_client()
print(f"🔗 Connecting to {config.SPECKLE_HOST}...")
print(f"📦 Receiving project={project_id} version={version_id}")
# Get version metadata to find the referenced object ID
version = client.version.get(version_id,project_id)
referenced_object_id = version.referenced_object
# Download the full object graph
transport = ServerTransport(stream_id=project_id, client=client)
base = operations.receive(referenced_object_id, transport)
# Read units from the root object
units = getattr(base, "units", config.DEFAULT_UNITS) or config.DEFAULT_UNITS
# IFC file is declared in MILLIMETRES — no conversion needed.
# All geometry stays in source units (mm). scale=1.0 means "keep as-is".
scale = 1.0
print(f"✅ Received root object units={units} scale=1.0 (IFC declared as mm)")
return base, scale
+246
View File
@@ -0,0 +1,246 @@
# =============================================================================
# traversal.py
# Walks the nested Speckle Collection tree from a Revit export.
#
# Expected structure (from your screenshot):
# root
# └── elements[]
# └── Collection (project)
# └── elements[]
# └── Collection (Level 18, Level 19, ...) ← storeys
# └── elements[]
# └── Collection (Floors, Walls, ...) ← categories
# └── elements[]
# └── Base object ← real BIM element
# =============================================================================
from typing import Generator, Tuple
from specklepy.objects.base import Base
# --------------------------------------------------------------------------- #
# Low-level helpers
# --------------------------------------------------------------------------- #
def is_collection(obj) -> bool:
"""Returns True if this object is a Speckle Collection node (not a leaf element)."""
speckle_type = getattr(obj, "speckle_type", "") or ""
return "Collection" in speckle_type
def get_children(obj) -> list:
"""
Safely get the 'elements' list from a Base/Collection object.
Handles both 'elements' and '@elements' (detached) variants.
"""
for key in ["elements", "@elements"]:
try:
val = obj[key]
if val is not None:
return list(val)
except Exception:
continue
return []
def get_prop(obj, key: str, default=None):
"""Safe property access for Speckle Base objects — avoids AttributeError."""
try:
val = getattr(obj, key, None)
if val is None:
val = obj[key]
return val
except Exception:
return default
# speckle_type fragments that mark a non-exportable / spatial-structure object
_SKIP_TYPE_FRAGMENTS = {
"Collection", "Level", "Grid", "View", "RenderMaterial",
"Site", "Building", "Storey",
}
def _is_valid_element(obj) -> bool:
"""
Returns True only for leaf objects that should become IFC elements.
Filters out Collections, spatial structure types, and other non-geometry nodes.
"""
if obj is None:
return False
speckle_type = getattr(obj, "speckle_type", "") or ""
for fragment in _SKIP_TYPE_FRAGMENTS:
if fragment in speckle_type:
return False
return True
# --------------------------------------------------------------------------- #
# Helpers
# --------------------------------------------------------------------------- #
def _element_level(obj) -> str:
"""
Try to read the level/storey name directly from an element's properties.
Handles both flat and deeply nested Revit property structures.
"""
# Top-level field (Revit connector puts it here for parent elements)
level = get_prop(obj, "level") or get_prop(obj, "Level")
if level and isinstance(level, str) and level.strip():
return level.strip()
props = get_prop(obj, "properties")
if isinstance(props, dict):
# Flat key
for key in ["Level", "level", "Building Storey"]:
val = props.get(key)
if val and isinstance(val, str) and val.strip():
return val.strip()
# Nested: properties.Instance Parameters.Constraints.Level.value
# (used by curtain wall children / panels / mullions)
instance_params = props.get("Instance Parameters") or {}
constraints = instance_params.get("Constraints") or {}
level_entry = constraints.get("Level") or {}
if isinstance(level_entry, dict):
val = level_entry.get("value")
if val and isinstance(val, str) and val.strip():
return val.strip()
# Also check Identity Data
identity = props.get("Identity Data") or {}
for key in ["Level", "level"]:
val = identity.get(key)
if val and isinstance(val, str) and val.strip():
return val.strip()
return ""
def _yield_element_and_children(obj, level_name: str, category_name: str):
"""
Yield a leaf element, then recursively yield any DataObject children
from its elements[] list (e.g. curtain wall panels and mullions).
Children have their own level and displayValue geometry.
"""
yield obj, level_name, category_name
children = get_children(obj)
for child in children:
if child is None or is_collection(child):
continue
if not _is_valid_element(child):
continue
# Get child's own level, fall back to parent's level
child_level = _element_level(child) or level_name
if child_level and child_level != "Unknown Level":
child_category = getattr(child, "category", None) or category_name
yield from _yield_element_and_children(child, child_level, child_category)
# --------------------------------------------------------------------------- #
# Main traversal
# --------------------------------------------------------------------------- #
def traverse(
root: Base,
) -> Generator[Tuple[Base, str, str], None, None]:
"""
Walk the full Speckle object tree from the root Base object.
Yields:
(element, level_name, category_name) for every leaf BIM element found.
level_name — e.g. "Level 18"
category_name — e.g. "Floors", "Walls", "Structural Columns"
"""
root_children = get_children(root)
if not root_children:
if _is_valid_element(root):
yield root, "Unknown Level", "Unknown Category"
return
for child in root_children:
if is_collection(child):
yield from _walk_level(child)
else:
if _is_valid_element(child):
level = _element_level(child)
if level:
yield child, level, "Unknown Category"
def _walk_level(project_collection: Base):
"""Walk the project collection → level collections."""
for level_obj in get_children(project_collection):
level_name = getattr(level_obj, "name", None) or ""
if is_collection(level_obj):
# Only walk into this level if it has a real name
if level_name and level_name != "Unknown Level":
yield from _walk_category(level_obj, level_name)
else:
if _is_valid_element(level_obj):
level = _element_level(level_obj) or level_name
if level and level != "Unknown Level":
yield from _yield_element_and_children(level_obj, level, "Unknown Category")
def _walk_category(level_obj: Base, level_name: str):
"""Walk level collection → category collections → leaf elements."""
for category_obj in get_children(level_obj):
category_name = getattr(category_obj, "name", "Unknown Category") or "Unknown Category"
if is_collection(category_obj):
for element in get_children(category_obj):
if is_collection(element):
# One extra nesting level (e.g. sub-families)
for sub_element in get_children(element):
if _is_valid_element(sub_element):
level = _element_level(sub_element) or level_name
if level and level != "Unknown Level":
yield from _yield_element_and_children(sub_element, level, category_name)
else:
if _is_valid_element(element):
level = _element_level(element) or level_name
if level and level != "Unknown Level":
yield from _yield_element_and_children(element, level, category_name)
else:
if _is_valid_element(category_obj):
level = _element_level(category_obj) or level_name
if level and level != "Unknown Level":
yield from _yield_element_and_children(category_obj, level, "Unknown Category")
# --------------------------------------------------------------------------- #
# Debug helper
# --------------------------------------------------------------------------- #
def print_tree(obj: Base, indent: int = 0, max_depth: int = 5):
"""
Print the object tree structure for debugging.
Call this on the root object to understand your data before exporting.
Usage:
from traversal import print_tree
print_tree(base)
"""
if indent > max_depth:
return
prefix = " " * indent
name = getattr(obj, "name", None) or ""
speckle_type = getattr(obj, "speckle_type", "") or ""
children = get_children(obj)
child_count = f" ({len(children)} children)" if children else ""
print(f"{prefix}├─ [{speckle_type}] name={name!r}{child_count}")
for child in children[:5]: # limit to first 5 per level to avoid spam
print_tree(child, indent + 1, max_depth)
if len(children) > 5:
print(f"{prefix} ... and {len(children) - 5} more")
+115
View File
@@ -0,0 +1,115 @@
# =============================================================================
# writer.py
# Creates and manages the IFC file structure:
# IfcProject → IfcSite → IfcBuilding → IfcBuildingStorey (one per level)
#
# Also provides StoreyManager which lazily creates storeys on demand
# as the traversal encounters new level names.
# =============================================================================
import ifcopenshell
import ifcopenshell.api
import utils.config as config
def create_ifc_scaffold() -> tuple:
"""
Create the IFC file with the required project/site/building hierarchy.
Returns:
(ifc_file, building, body_context)
- ifc_file: The ifcopenshell file object
- building: The IfcBuilding entity (storeys are assigned under this)
- body_context: The Body geometry subcontext for shape representations
"""
ifc = ifcopenshell.file(schema="IFC4X3")
# Project
project = ifcopenshell.api.run(
"root.create_entity", ifc,
ifc_class="IfcProject",
name=config.IFC_PROJECT_NAME,
)
# Units — millimetres (matching Revit/Speckle source data)
# This avoids any mm→m conversion errors and keeps coordinates at full precision
ifcopenshell.api.run(
"unit.assign_unit", ifc,
length={"is_metric": True, "raw": "MILLIMETRES"},
)
# Geometry contexts
model_ctx = ifcopenshell.api.run(
"context.add_context", ifc,
context_type="Model",
)
body_ctx = ifcopenshell.api.run(
"context.add_context", ifc,
context_type="Model",
context_identifier="Body",
target_view="MODEL_VIEW",
parent=model_ctx,
)
# Spatial hierarchy
site = ifcopenshell.api.run(
"root.create_entity", ifc,
ifc_class="IfcSite",
name=config.IFC_SITE_NAME,
)
building = ifcopenshell.api.run(
"root.create_entity", ifc,
ifc_class="IfcBuilding",
name=config.IFC_BUILDING_NAME,
)
ifcopenshell.api.run(
"aggregate.assign_object", ifc,
relating_object=project,
products=[site],
)
ifcopenshell.api.run(
"aggregate.assign_object", ifc,
relating_object=site,
products=[building],
)
return ifc, building, body_ctx
class StoreyManager:
"""
Lazily creates IfcBuildingStorey entities as new level names are encountered.
Keeps storeys in insertion order so the IFC file is logically ordered.
"""
def __init__(self, ifc: ifcopenshell.file, building):
self.ifc = ifc
self.building = building
self._storeys: dict[str, object] = {} # level_name → IfcBuildingStorey
def get_or_create(self, level_name: str):
"""Return existing storey or create a new one for this level name."""
if level_name not in self._storeys:
storey = ifcopenshell.api.run(
"root.create_entity", self.ifc,
ifc_class="IfcBuildingStorey",
name=level_name,
)
ifcopenshell.api.run(
"aggregate.assign_object", self.ifc,
relating_object=self.building,
products=[storey],
)
self._storeys[level_name] = storey
print(f" 🏢 Created storey: {level_name}")
return self._storeys[level_name]
@property
def count(self) -> int:
return len(self._storeys)
@property
def names(self) -> list[str]:
return list(self._storeys.keys())