first release

This commit is contained in:
NLSA
2026-03-19 16:58:08 +01:00
parent 1cefb95b48
commit b433b91902
17 changed files with 2213 additions and 243 deletions
+456
View File
@@ -0,0 +1,456 @@
# =============================================================================
# geometry.py
# Converts Speckle DataObject geometry → IFC IfcPolygonalFaceSet + IfcLocalPlacement
#
# Key facts:
# - After specklepy receive(), vertices and faces are FLAT Python lists
# - displayValue is an array of Mesh objects
# - Units are in mm (for Revit), scale to metres for IFC
# - Vertices are in absolute world coordinates
# - Uses IfcPolygonalFaceSet (indexed vertices) instead of IfcFacetedBrep
# for compact output — each vertex stored once, not once per face.
# =============================================================================
import ifcopenshell
from specklepy.objects.base import Base
# Scale factors → MILLIMETRES (IFC file is declared as mm)
_UNIT_SCALES = {
"mm": 1.0, "millimeter": 1.0, "millimeters": 1.0,
"cm": 10.0, "centimeter": 10.0, "centimeters": 10.0,
"m": 1000.0, "meter": 1000.0, "meters": 1000.0,
"ft": 304.8, "foot": 304.8, "feet": 304.8,
"in": 25.4, "inch": 25.4, "inches": 25.4,
}
# --------------------------------------------------------------------------- #
# Geometry validation helpers (GEM111 fix)
# --------------------------------------------------------------------------- #
# Minimum distance in mm below which two vertices are considered identical (GEM111).
_VERTEX_MERGE_TOL = 0.01 # 0.01 mm
_INV_TOL = 1.0 / _VERTEX_MERGE_TOL # pre-computed: multiply instead of divide
def build_ifc_facesets(ifc, verts_scaled: list, face_groups: list) -> list:
"""
Build a list of IfcPolygonalFaceSet from scaled (x,y,z) vertices and face index groups.
Uses IfcCartesianPointList3D + IfcIndexedPolygonalFace for compact output.
Vertices are deduplicated via snap grid so each unique position is stored once.
GEM111 fix: skip faces with near-duplicate vertices (snapped to same grid cell).
verts_scaled: flat list of already-scaled floats [x0,y0,z0, x1,y1,z1, ...]
face_groups: list of index lists [[i,j,k], [i,j,k,l], ...]
Returns: list of IfcPolygonalFaceSet (typically one, empty on failure).
"""
snap_to_idx = {} # snap_key → 0-based index in deduped_verts
deduped_verts = [] # [[x, y, z], ...] — lists for direct IFC use
inv_tol = _INV_TOL
# Validate faces and remap indices to deduplicated vertex list
valid_faces = [] # list of (idx0+1, idx1+1, ...) tuples (1-based for IFC)
for indices in face_groups:
try:
remapped = []
seen_snaps = set()
degenerate = False
for i in indices:
i3 = i * 3
x = verts_scaled[i3]
y = verts_scaled[i3 + 1]
z = verts_scaled[i3 + 2]
key = (round(x * inv_tol), round(y * inv_tol), round(z * inv_tol))
if key in seen_snaps:
degenerate = True
break
seen_snaps.add(key)
idx = snap_to_idx.get(key)
if idx is None:
idx = len(deduped_verts)
snap_to_idx[key] = idx
deduped_verts.append([x, y, z])
remapped.append(idx + 1) # 1-based for IFC
if degenerate or len(remapped) < 3:
continue
valid_faces.append(remapped)
except Exception:
continue
if not valid_faces or not deduped_verts:
return []
# Build IFC entities
try:
point_list = ifc.createIfcCartesianPointList3D(deduped_verts)
ifc_faces = [
ifc.createIfcIndexedPolygonalFace(fi) for fi in valid_faces
]
faceset = ifc.createIfcPolygonalFaceSet(point_list, None, ifc_faces, None)
return [faceset]
except Exception:
return []
# --------------------------------------------------------------------------- #
# Safe data access helpers
# --------------------------------------------------------------------------- #
def _get(obj, key, default=None):
"""
Safe access for specklepy Base objects.
Tries attribute access first, then bracket access.
"""
try:
val = getattr(obj, key, None)
if val is not None:
return val
except Exception:
pass
try:
val = obj[key]
if val is not None:
return val
except Exception:
pass
return default
def unwrap_chunks(raw) -> list:
"""
Flatten a Speckle data array into a plain Python list of numbers.
Handles two cases:
1. Already flat list of numbers (after specklepy receive deserializes)
→ returned as-is (fast path)
2. List of DataChunk objects (raw from server before deserialization)
→ each chunk's .data list is concatenated
"""
if not raw:
return []
# Fast path: if first item is a number, assume all items are numbers
first = raw[0]
if isinstance(first, (int, float)):
return raw
# Slow path: DataChunk objects or mixed content
result = []
for item in raw:
if item is None:
continue
if isinstance(item, (int, float)):
result.append(item)
continue
speckle_type = getattr(item, "speckle_type", "") or ""
if "DataChunk" in speckle_type:
chunk_data = _get(item, "data") or _get(item, "@data")
if chunk_data:
result.extend(list(chunk_data))
else:
try:
result.extend(list(item))
except Exception:
pass
return result
def _resolve_scale(obj, stream_scale: float) -> float:
"""Resolve unit scale: obj.units → stream fallback."""
units = _get(obj, "units")
if units and isinstance(units, str):
return _UNIT_SCALES.get(units.lower().strip(), stream_scale)
return stream_scale
# --------------------------------------------------------------------------- #
# Mesh extraction
# --------------------------------------------------------------------------- #
def _is_mesh(item) -> bool:
"""
Detect if a specklepy object is a Mesh.
Uses speckle_type string — more reliable than hasattr on Base objects.
"""
if item is None:
return False
speckle_type = _get(item, "speckle_type") or ""
if "Mesh" in speckle_type:
return True
# Fallback: has both vertices and faces data
verts = _get(item, "vertices")
faces = _get(item, "faces")
return verts is not None and faces is not None
def _collect_meshes_from_display(obj) -> list:
"""
Collect Mesh objects from an object's displayValue.
If an item is not a Mesh (e.g. BrepX, Brep), recursively check
its own displayValue for nested meshes.
"""
meshes = []
for key in ["displayValue", "@displayValue", "_displayValue"]:
display = _get(obj, key)
if display is None:
continue
items = display if isinstance(display, list) else [display]
for item in items:
if item is None:
continue
if _is_mesh(item):
meshes.append(item)
else:
# BrepX / Brep / other geometry types may carry a nested
# displayValue with the tessellated mesh representation
meshes.extend(_collect_meshes_from_display(item))
if meshes:
break
return meshes
def get_display_meshes(obj: Base) -> list:
"""
Extract all Mesh objects from a DataObject's displayValue.
Handles nested geometry types (BrepX, Brep) that wrap meshes
inside their own displayValue.
"""
meshes = _collect_meshes_from_display(obj)
# Fallback: object itself is a Mesh
if not meshes and _is_mesh(obj):
speckle_type = _get(obj, "speckle_type") or ""
if "Mesh" in speckle_type:
meshes.append(obj)
return meshes
def get_display_instances(obj: Base) -> list:
"""
Extract InstanceProxy objects from a DataObject's displayValue.
Per the official speckleifc converter, every IFC element's displayValue
contains InstanceProxy objects (not raw meshes). Each InstanceProxy has:
- transform: 16-float row-major matrix, translation in metres
- definitionId: "DEFINITION:{meshAppId}" string
- units: "m"
Raw meshes do NOT appear in displayValue in IFC→Speckle exports.
"""
instances = []
for key in ["displayValue", "@displayValue", "_displayValue"]:
display = _get(obj, key)
if display is None:
continue
items = display if isinstance(display, list) else [display]
for item in items:
if item is None:
continue
transform = _get(item, "transform")
definition_id = _get(item, "definitionId")
if transform is not None and definition_id is not None:
instances.append(item)
if instances:
break
return instances
# --------------------------------------------------------------------------- #
# Face decoding
# --------------------------------------------------------------------------- #
def decode_faces(faces_raw: list) -> list:
"""
Decode Speckle's run-length encoded face list into vertex index groups.
Format: [n, i0, i1, ..., n, i0, i1, ...]
n=0 → triangle (legacy), n=1 → quad (legacy), n≥3 → n-gon
"""
decoded = []
i = 0
total = len(faces_raw)
# Check if values are already ints (common after unwrap_chunks)
already_int = total > 0 and isinstance(faces_raw[0], int)
while i < total:
n = faces_raw[i] if already_int else int(faces_raw[i])
if n == 0:
n = 3
elif n == 1:
n = 4
end = i + 1 + n
if end > total:
break
if already_int:
decoded.append(faces_raw[i + 1:end])
else:
decoded.append([int(v) for v in faces_raw[i + 1:end]])
i = end
return decoded
# --------------------------------------------------------------------------- #
# Bounding box + placement
# --------------------------------------------------------------------------- #
def compute_origin(flat_verts: list) -> tuple:
"""
Compute placement origin from scaled vertex list (mm).
X, Y = bounding box centroid
Z = minimum Z (bottom face of element — more natural for IFC)
Single-pass to avoid creating 3 sliced copies of a large list.
"""
x0 = flat_verts[0]
y0 = flat_verts[1]
z0 = flat_verts[2]
xmin = xmax = x0
ymin = ymax = y0
zmin = z0
for i in range(3, len(flat_verts) - 2, 3):
x = flat_verts[i]
y = flat_verts[i + 1]
z = flat_verts[i + 2]
if x < xmin:
xmin = x
elif x > xmax:
xmax = x
if y < ymin:
ymin = y
elif y > ymax:
ymax = y
if z < zmin:
zmin = z
return (xmin + xmax) / 2.0, (ymin + ymax) / 2.0, zmin
# Cache for shared IFC direction/point entities (keyed by ifc file id)
_shared_entities: dict[int, dict] = {}
def _get_shared(ifc):
"""Return (or create) shared IfcDirection and IfcCartesianPoint entities for this file."""
fid = id(ifc)
if fid not in _shared_entities:
_shared_entities[fid] = {
"z_axis": ifc.createIfcDirection([0.0, 0.0, 1.0]),
"x_axis": ifc.createIfcDirection([1.0, 0.0, 0.0]),
"origin_0": ifc.createIfcCartesianPoint([0.0, 0.0, 0.0]),
}
return _shared_entities[fid]
def _make_placement(ifc, x: float, y: float, z: float):
"""Create an IfcLocalPlacement at absolute world coordinates (metres)."""
shared = _get_shared(ifc)
origin = ifc.createIfcCartesianPoint([x, y, z])
a2p = ifc.createIfcAxis2Placement3D(origin, shared["z_axis"], shared["x_axis"])
return ifc.createIfcLocalPlacement(PlacementRelTo=None, RelativePlacement=a2p)
# --------------------------------------------------------------------------- #
# Main conversion
# --------------------------------------------------------------------------- #
def mesh_to_ifc(
ifc: ifcopenshell.file,
body_context,
obj: Base,
scale: float = 0.001,
material_manager=None,
) -> tuple:
"""
Convert a Speckle DataObject → (IfcShapeRepresentation, IfcLocalPlacement).
Creates one IfcPolygonalFaceSet per mesh so each can carry its own material style.
Returns (None, None) if no usable geometry is found.
"""
meshes = get_display_meshes(obj)
if not meshes:
return None, None
obj_scale = _resolve_scale(obj, scale)
# ------------------------------------------------------------------ #
# Pass 1: unpack vertices once per mesh, collect all scaled coords
# to compute world origin. Cache (verts, ms) for Pass 2.
# ------------------------------------------------------------------ #
mesh_cache = [] # [(verts_list, ms, scaled)] or None per mesh
all_scaled = []
for mesh in meshes:
raw_verts = _get(mesh, "vertices") or []
verts = unwrap_chunks(raw_verts if isinstance(raw_verts, list) else list(raw_verts))
if not verts:
mesh_cache.append(None)
continue
ms = _resolve_scale(mesh, obj_scale)
# Pre-scale vertices once, reuse in Pass 2
scaled = [float(v) * ms for v in verts]
mesh_cache.append((verts, ms, scaled))
all_scaled.extend(scaled)
if not all_scaled:
return None, None
ox, oy, oz = compute_origin(all_scaled)
# ------------------------------------------------------------------ #
# Pass 2: one faceset per mesh — reuse cached verts, only unpack faces
# ------------------------------------------------------------------ #
geom_items = []
for mesh, cached in zip(meshes, mesh_cache):
if cached is None:
continue
verts, ms, scaled = cached
raw_faces = _get(mesh, "faces") or []
faces_raw = unwrap_chunks(raw_faces if isinstance(raw_faces, list) else list(raw_faces))
if not faces_raw:
continue
try:
face_groups = decode_faces(faces_raw)
except Exception as e:
print(f" ⚠️ Face decode error: {e}")
continue
# Offset pre-scaled vertices relative to origin (flat list, no tuples)
n = len(scaled)
verts_scaled = [0.0] * n
for vi in range(0, n, 3):
verts_scaled[vi] = scaled[vi] - ox
verts_scaled[vi + 1] = scaled[vi + 1] - oy
verts_scaled[vi + 2] = scaled[vi + 2] - oz
mesh_facesets = build_ifc_facesets(ifc, verts_scaled, face_groups)
if not mesh_facesets:
continue
# Apply material style to every faceset of this mesh
if material_manager:
mesh_app_id = _get(mesh, "applicationId")
if mesh_app_id:
for fs in mesh_facesets:
material_manager.apply_to_item(fs, str(mesh_app_id))
geom_items.extend(mesh_facesets)
if not geom_items:
return None, None
# ------------------------------------------------------------------ #
# Assemble IfcShapeRepresentation + IfcLocalPlacement
# ------------------------------------------------------------------ #
rep = ifc.createIfcShapeRepresentation(
ContextOfItems=body_context,
RepresentationIdentifier="Body",
RepresentationType="Tessellation",
Items=geom_items,
)
placement = _make_placement(ifc, ox, oy, oz)
return rep, placement
+470
View File
@@ -0,0 +1,470 @@
# =============================================================================
# instances.py
# Handles Speckle InstanceProxy objects from both:
#
# FORMAT A — Revit connector (our actual use case):
# _units = "mm"
# transform = 16 floats, row-major, translation in MM
# definitionId = 64-char uppercase hex hash (matches object id[:32] in tree)
# The definition object lives somewhere in the object tree.
#
# FORMAT B — speckleifc IFC→Speckle converter:
# units = "m"
# transform = 16 floats, row-major, translation in METRES
# definitionId = "DEFINITION:{meshAppId}"
# Definition geometry lives in root → Collection("definitionGeometry")
#
# We detect the format by the definitionId prefix.
#
# Performance: uses IfcRepresentationMap + IfcMappedItem so that all instances
# sharing the same definition reference a single copy of the geometry.
# =============================================================================
import math
from specklepy.objects.base import Base
from utils.geometry import _get, unwrap_chunks, decode_faces, _UNIT_SCALES, build_ifc_facesets, _get_shared
def is_instance(obj) -> bool:
"""Returns True if this object is a Speckle InstanceProxy."""
return _get(obj, "transform") is not None and _get(obj, "definitionId") is not None
def _is_ifc_format(definition_id: str) -> bool:
"""True if this is speckleifc format (definitionId starts with 'DEFINITION:')."""
return definition_id.startswith("DEFINITION:")
def build_definition_map(root: Base) -> dict:
"""
Build a unified definition map that handles both formats.
Returns dict with keys:
"by_id" : {obj_id_lower[:32] → object} for Revit format
"by_app_id" : {applicationId_lower → object} for Revit format
"ifc_proxies" : {"DEFINITION:xxx" → proxy} for IFC format
"ifc_meshes" : {meshAppId → Mesh} for IFC format
"""
by_id = {}
by_app_id = {}
ifc_proxies = {}
ifc_meshes = {}
# --- Walk entire tree for Revit format ---
_collect_all(root, by_id, by_app_id, depth=0)
# --- Extract speckleifc structures for IFC format ---
proxies_raw = _get(root, "instanceDefinitionProxies")
if proxies_raw:
for proxy in (proxies_raw if isinstance(proxies_raw, list) else [proxies_raw]):
app_id = _get(proxy, "applicationId")
if app_id:
ifc_proxies[app_id] = proxy # original case (for IFC format)
ifc_proxies[app_id.lower()] = proxy # lowercase (for Revit format)
elements = _get(root, "elements") or _get(root, "@elements") or []
for child in (elements if isinstance(elements, list) else []):
if (_get(child, "name") or "") == "definitionGeometry":
geom_elements = _get(child, "elements") or _get(child, "@elements") or []
for mesh in (geom_elements if isinstance(geom_elements, list) else []):
mesh_app_id = _get(mesh, "applicationId")
if mesh_app_id:
ifc_meshes[mesh_app_id] = mesh
print(f" Objects indexed by id: {len(by_id)}")
print(f" Objects indexed by appId: {len(by_app_id)}")
print(f" IFC definition proxies: {len(ifc_proxies)}")
print(f" IFC definition meshes: {len(ifc_meshes)}")
return {
"by_id": by_id,
"by_app_id": by_app_id,
"ifc_proxies": ifc_proxies,
"ifc_meshes": ifc_meshes,
}
def _collect_all(obj, by_id: dict, by_app_id: dict, depth: int):
if obj is None or depth > 25:
return
obj_id = _get(obj, "id")
if obj_id and isinstance(obj_id, str):
key = obj_id.lower()
by_id[key] = obj
# Also store truncated — definitionId (64 chars) matches id (32 chars)
if len(key) == 32:
by_id[key] = obj
elif len(key) > 32:
by_id[key[:32]] = obj
app_id = _get(obj, "applicationId")
if app_id and isinstance(app_id, str):
by_app_id[app_id.lower()] = obj
for key in ["elements", "@elements", "_elements",
"displayValue", "@displayValue", "_displayValue",
"objects", "@objects", "definition", "@definition"]:
try:
children = obj[key]
if children is None:
continue
if not isinstance(children, list):
children = [children]
for child in children:
_collect_all(child, by_id, by_app_id, depth + 1)
except Exception:
continue
def _get_revit_meshes(definition_id: str, definition_map: dict) -> list:
"""
Revit format:
definitionId (64-char hex) → InstanceDefinitionProxy.applicationId
proxy.objects[0] is a UUID applicationId → find mesh by applicationId
"""
from utils.geometry import get_display_meshes
# Step 1: find the InstanceDefinitionProxy by its applicationId (case-insensitive)
ifc_proxies = definition_map.get("ifc_proxies", {})
proxy = ifc_proxies.get(definition_id) or ifc_proxies.get(definition_id.lower())
if proxy is None:
return []
# Step 2: get the mesh applicationIds from proxy.objects
object_ids = _get(proxy, "objects") or []
if not isinstance(object_ids, list):
object_ids = list(object_ids)
# Step 3: look up each mesh by applicationId
by_app_id = definition_map.get("by_app_id", {})
meshes = []
for oid in object_ids:
obj = by_app_id.get(str(oid).lower())
if obj is not None:
# The found object may itself be a mesh, or contain displayValue meshes
found_meshes = get_display_meshes(obj)
if found_meshes:
meshes.extend(found_meshes)
else:
# It IS the mesh directly
meshes.append(obj)
return meshes
def _get_ifc_meshes(definition_id: str, definition_map: dict) -> list:
"""
IFC format: definitionId = "DEFINITION:224058_mat0"
Look up proxy → objects list → meshes from ifc_meshes dict.
"""
ifc_proxies = definition_map.get("ifc_proxies", {})
ifc_meshes = definition_map.get("ifc_meshes", {})
proxy = ifc_proxies.get(definition_id)
if proxy is None:
return []
object_ids = _get(proxy, "objects") or []
result = []
for oid in (object_ids if isinstance(object_ids, list) else [object_ids]):
mesh = ifc_meshes.get(str(oid))
if mesh is not None:
result.append(mesh)
return result
def _resolve_instance_scale(obj, stream_scale: float) -> float:
"""
Resolve scale for the transform translation.
Tries bracket access for '_units' (Revit uses underscore).
IFC format instances have units="m" → scale=1.0 (no scaling).
"""
for key in ["units", "_units"]:
try:
units = obj[key]
if units and isinstance(units, str):
s = _UNIT_SCALES.get(units.lower().strip())
if s is not None:
return s
except Exception:
pass
return stream_scale
# Stats
_stats = {"found": 0, "not_found": 0}
# Cache: mesh id → (verts_scaled, face_groups) to avoid re-unpacking
# AND re-scaling the same definition mesh across many instances that share it.
_mesh_data_cache: dict = {}
# Cache: definition_id → IfcRepresentationMap (or None if no geometry)
# All instances sharing the same definition reuse one geometry copy.
_rep_map_cache: dict = {}
# Shared identity placement for all instances (keyed by ifc file id)
_identity_placement_cache: dict[int, object] = {}
_MM_SCALES = {
"mm": 1.0, "millimeter": 1.0, "millimeters": 1.0,
"cm": 10.0, "centimeter": 10.0,
"m": 1000.0, "meter": 1000.0, "meters": 1000.0,
"ft": 304.8, "in": 25.4,
}
# --------------------------------------------------------------------------- #
# IfcRepresentationMap builder — geometry created once per definition
# --------------------------------------------------------------------------- #
def _build_rep_map(ifc, body_context, meshes: list, ifc_format: bool,
material_manager=None):
"""
Build an IfcRepresentationMap from definition meshes.
Geometry is in local coordinates (mm, no instance transform applied).
Returns IfcRepresentationMap or None if no valid geometry.
"""
geom_items = []
for mesh in meshes:
mesh_id = _get(mesh, "id") or _get(mesh, "applicationId")
if mesh_id and mesh_id in _mesh_data_cache:
verts_local, face_groups = _mesh_data_cache[mesh_id]
else:
raw_verts = _get(mesh, "vertices") or []
raw_faces = _get(mesh, "faces") or []
verts = unwrap_chunks(list(raw_verts))
faces_raw = unwrap_chunks(list(raw_faces))
if not verts or not faces_raw:
continue
mesh_units = _get(mesh, "units") or _get(mesh, "_units") or ("m" if ifc_format else "mm")
ms = _MM_SCALES.get(mesh_units.lower().strip(), 1.0)
try:
face_groups = decode_faces(faces_raw)
except Exception as e:
print(f" ⚠️ Instance face decode: {e}")
continue
# Scale vertices once and cache the result
verts_local = [float(v) * ms for v in verts]
if mesh_id:
_mesh_data_cache[mesh_id] = (verts_local, face_groups)
mesh_facesets = build_ifc_facesets(ifc, verts_local, face_groups)
if not mesh_facesets:
continue
# Apply material style to each faceset
if material_manager:
mesh_app_id = _get(mesh, "applicationId")
if mesh_app_id:
for fs in mesh_facesets:
material_manager.apply_to_item(fs, str(mesh_app_id))
geom_items.extend(mesh_facesets)
if not geom_items:
return None
# Mapping origin = identity (local coords origin) — reuse shared origin
shared = _get_shared(ifc)
a2p = ifc.createIfcAxis2Placement3D(shared["origin_0"], None, None)
# The mapped representation holds the actual geometry
mapped_rep = ifc.createIfcShapeRepresentation(
ContextOfItems=body_context,
RepresentationIdentifier="Body",
RepresentationType="Tessellation",
Items=geom_items,
)
return ifc.createIfcRepresentationMap(a2p, mapped_rep)
# --------------------------------------------------------------------------- #
# Transform → IfcCartesianTransformationOperator3D
# --------------------------------------------------------------------------- #
def _vec_magnitude(x, y, z):
return math.sqrt(x*x + y*y + z*z)
def _make_transform_operator(ifc, t: list, ts: float):
"""
Convert a row-major 4x4 matrix + translation scale into an
IfcCartesianTransformationOperator3DnonUniform.
t: 16 floats, row-major [r00,r01,r02,tx, r10,r11,r12,ty, r20,r21,r22,tz, 0,0,0,1]
ts: scale factor for translation components (e.g. 1000.0 for m→mm)
The matrix acts as: p' = M * p + translation, where M rows are:
row0 = (t[0], t[1], t[2])
row1 = (t[4], t[5], t[6])
row2 = (t[8], t[9], t[10])
IfcCartesianTransformationOperator axes represent the COLUMNS of M:
Axis1 = column 0 = where local X maps → (t[0], t[4], t[8])
Axis2 = column 1 = where local Y maps → (t[1], t[5], t[9])
Axis3 = column 2 = where local Z maps → (t[2], t[6], t[10])
Returns the IFC entity, or None if the transform is degenerate.
"""
# Extract COLUMNS of the 3x3 rotation/scale sub-matrix
ax1 = (float(t[0]), float(t[4]), float(t[8])) # column 0: X-axis direction
ax2 = (float(t[1]), float(t[5]), float(t[9])) # column 1: Y-axis direction
ax3 = (float(t[2]), float(t[6]), float(t[10])) # column 2: Z-axis direction
s1 = _vec_magnitude(*ax1)
s2 = _vec_magnitude(*ax2)
s3 = _vec_magnitude(*ax3)
if s1 < 1e-10 or s2 < 1e-10 or s3 < 1e-10:
return None # degenerate transform
# Normalized direction vectors
d1 = ifc.createIfcDirection([ax1[0]/s1, ax1[1]/s1, ax1[2]/s1])
d2 = ifc.createIfcDirection([ax2[0]/s2, ax2[1]/s2, ax2[2]/s2])
d3 = ifc.createIfcDirection([ax3[0]/s3, ax3[1]/s3, ax3[2]/s3])
# Translation, scaled to mm
tx = float(t[3]) * ts
ty = float(t[7]) * ts
tz = float(t[11]) * ts
origin = ifc.createIfcCartesianPoint([tx, ty, tz])
# Use non-uniform variant to handle mirrors and non-uniform scale
return ifc.createIfcCartesianTransformationOperator3DnonUniform(
d1, # Axis1
d2, # Axis2
origin, # LocalOrigin
s1, # Scale
d3, # Axis3
s2, # Scale2
s3, # Scale3
)
# --------------------------------------------------------------------------- #
# Main conversion — IfcMappedItem approach
# --------------------------------------------------------------------------- #
def instance_to_ifc(ifc, body_context, obj: Base, definition_map: dict,
scale: float = 1.0, material_manager=None):
"""
Convert a Speckle InstanceProxy → (IfcShapeRepresentation, IfcLocalPlacement).
Strategy: create geometry once per definition as an IfcRepresentationMap,
then reference it via IfcMappedItem + IfcCartesianTransformationOperator3D
for each instance. This avoids duplicating geometry across instances.
"""
transform_raw = _get(obj, "transform")
if not transform_raw:
return None, None
t = list(transform_raw)
if len(t) != 16:
return None, None
definition_id = _get(obj, "definitionId") or ""
ifc_format = _is_ifc_format(definition_id)
# Translation scale: IFC format transform is in metres → convert to mm
# Revit format transform is already in mm (same as IFC file units)
ts = 1000.0 if ifc_format else _resolve_instance_scale(obj, scale)
# Identity placement (transform is encoded in the MappedItem) — shared across all instances
fid = id(ifc)
if fid not in _identity_placement_cache:
shared = _get_shared(ifc)
a2p = ifc.createIfcAxis2Placement3D(shared["origin_0"], None, None)
_identity_placement_cache[fid] = ifc.createIfcLocalPlacement(PlacementRelTo=None, RelativePlacement=a2p)
placement = _identity_placement_cache[fid]
# --- Get or build IfcRepresentationMap (cached per definition_id) ---
if definition_id not in _rep_map_cache:
if ifc_format:
meshes = _get_ifc_meshes(definition_id, definition_map)
else:
meshes = _get_revit_meshes(definition_id, definition_map)
if not meshes:
_stats["not_found"] += 1
_rep_map_cache[definition_id] = None
return None, placement
_stats["found"] += 1
_rep_map_cache[definition_id] = _build_rep_map(
ifc, body_context, meshes, ifc_format, material_manager
)
else:
# Track stats even for cached definitions
if _rep_map_cache[definition_id] is not None:
_stats["found"] += 1
else:
_stats["not_found"] += 1
rep_map = _rep_map_cache[definition_id]
if rep_map is None:
return None, placement
# --- Build transform operator from instance's 4x4 matrix ---
transform_op = _make_transform_operator(ifc, t, ts)
if transform_op is None:
return None, placement
# --- Create IfcMappedItem referencing the shared geometry ---
mapped_item = ifc.createIfcMappedItem(rep_map, transform_op)
rep = ifc.createIfcShapeRepresentation(
ContextOfItems=body_context,
RepresentationIdentifier="Body",
RepresentationType="MappedRepresentation",
Items=[mapped_item],
)
return rep, placement
def get_definition_object(obj: Base, definition_map: dict):
"""
Resolve the definition's source object for an InstanceProxy.
Returns the first object referenced by the definition proxy, which
carries the proper category/type info. Returns None if not found.
"""
definition_id = _get(obj, "definitionId") or ""
if not definition_id:
return None
ifc_proxies = definition_map.get("ifc_proxies", {})
proxy = ifc_proxies.get(definition_id) or ifc_proxies.get(definition_id.lower())
if proxy is None:
return None
object_ids = _get(proxy, "objects") or []
if not isinstance(object_ids, list):
object_ids = list(object_ids)
if not object_ids:
return None
by_app_id = definition_map.get("by_app_id", {})
source = by_app_id.get(str(object_ids[0]).lower())
return source
def print_instance_stats():
total = _stats["found"] + _stats["not_found"]
print(f" Instance resolution: {_stats['found']}/{total} definitions found")
if _stats["not_found"] > 0:
print(f" ⚠️ {_stats['not_found']} instances had no definition geometry")
def reset_caches():
"""Reset module-level caches (call at start of each export run)."""
_mesh_data_cache.clear()
_rep_map_cache.clear()
_identity_placement_cache.clear()
_stats["found"] = 0
_stats["not_found"] = 0
+21
View File
@@ -0,0 +1,21 @@
# =============================================================================
# mapper.py
# Maps Speckle objects → IFC entity classes.
#
# Reads the IFC class from _properties.Attributes.type
# Falls back to IfcBuildingElementProxy if not present.
# =============================================================================
from utils.properties import get_attributes
def classify(obj) -> str:
"""
Determine the IFC class for a Speckle object.
Reads from _properties.Attributes.type. Falls back to IfcBuildingElementProxy.
"""
attrs = get_attributes(obj)
ifc_type = attrs.get("type")
if ifc_type and isinstance(ifc_type, str):
return ifc_type.strip()
return "IfcBuildingElementProxy"
+151
View File
@@ -0,0 +1,151 @@
# =============================================================================
# materials.py
# Reads renderMaterialProxies from the Speckle root object and applies
# IfcSurfaceStyle colours to IFC geometry.
#
# Structure of renderMaterialProxies:
# root.renderMaterialProxies = [
# {
# id: "636259b3..."
# value: RenderMaterial {
# name: "Glass"
# diffuse: -16744256 ← ARGB packed int (A=255, R=0, G=128, B=192)
# opacity: 0.1 ← 0=transparent, 1=opaque
# }
# objects: ["a1a6b0c2-...", "d5dd3127-...", ...] ← mesh applicationIds
# },
# ...
# ]
#
# Usage:
# mgr = MaterialManager(ifc, root)
# mgr.apply_to_item(brep_item, mesh_app_id)
# =============================================================================
import ifcopenshell
import ifcopenshell.api
from specklepy.objects.base import Base
def _argb_to_rgb(argb_int: int) -> tuple[float, float, float]:
"""Unpack a signed ARGB int to normalised (R, G, B) floats 0..1."""
unsigned = argb_int & 0xFFFFFFFF
r = ((unsigned >> 16) & 0xFF) / 255.0
g = ((unsigned >> 8) & 0xFF) / 255.0
b = (unsigned & 0xFF) / 255.0
return r, g, b
def _get(obj, key, default=None):
try:
val = getattr(obj, key, None)
if val is not None:
return val
except Exception:
pass
try:
val = obj[key]
if val is not None:
return val
except Exception:
pass
return default
class MaterialManager:
"""
Builds a lookup from mesh applicationId → IfcSurfaceStyle,
then applies styles to IFC geometry items.
"""
def __init__(self, ifc: ifcopenshell.file, root: Base):
self._ifc = ifc
# mesh applicationId (lowercase) → IfcSurfaceStyle (populated lazily)
self._style_map: dict[str, object] = {}
# name → IfcSurfaceStyle (cache to avoid duplicates)
self._style_cache: dict[str, object] = {}
self._build(root)
def _build(self, root: Base):
"""
Parse renderMaterialProxies and store raw material data keyed by mesh applicationId.
IFC styles are created lazily (only when actually assigned to geometry) to avoid
orphaned IfcSurfaceStyle instances that would fail IFC105 validation.
"""
proxies = _get(root, "renderMaterialProxies") or []
if not isinstance(proxies, list):
proxies = list(proxies) if proxies else []
# mesh applicationId (lowercase) → (name, diffuse_argb, transparency)
self._material_data: dict[str, tuple] = {}
for proxy in proxies:
material = _get(proxy, "value")
if material is None:
continue
name = _get(material, "name") or "Unnamed"
diffuse = _get(material, "diffuse")
opacity = _get(material, "opacity")
if diffuse is None:
continue
opacity_val = float(opacity) if opacity is not None else 1.0
transparency = max(0.0, min(1.0, 1.0 - opacity_val))
objects = _get(proxy, "objects") or []
for app_id in (objects if isinstance(objects, list) else []):
if app_id:
self._material_data[str(app_id).lower()] = (name, int(diffuse), transparency)
print(f" Materials: {len(self._material_data)} mesh mappings (styles created on demand)")
def _get_or_create_style(self, name: str, diffuse_argb: int, transparency: float):
"""Return cached style or create a new IfcSurfaceStyle."""
cache_key = f"{name}|{diffuse_argb}|{transparency:.4f}"
if cache_key in self._style_cache:
return self._style_cache[cache_key]
r, g, b = _argb_to_rgb(diffuse_argb)
style = ifcopenshell.api.run("style.add_style", self._ifc, name=name)
ifcopenshell.api.run(
"style.add_surface_style",
self._ifc,
style=style,
ifc_class="IfcSurfaceStyleRendering",
attributes={
"SurfaceColour": {"Name": None, "Red": r, "Green": g, "Blue": b},
"Transparency": transparency,
"ReflectanceMethod": "NOTDEFINED",
},
)
self._style_cache[cache_key] = style
return style
def get_style(self, mesh_app_id: str):
"""Return the IfcSurfaceStyle for a mesh applicationId (created on demand), or None."""
key = str(mesh_app_id).lower()
# Return already-created style if cached
if key in self._style_map:
return self._style_map[key]
# Create style now only if this mesh has material data
data = self._material_data.get(key)
if data is None:
return None
name, diffuse, transparency = data
style = self._get_or_create_style(name, diffuse, transparency)
self._style_map[key] = style
return style
def apply_to_item(self, item, mesh_app_id: str):
"""Assign the material style to a single IFC geometry item (e.g. IfcPolygonalFaceSet)."""
style = self.get_style(mesh_app_id)
if style is None:
return
try:
ifcopenshell.api.run(
"style.assign_item_style",
self._ifc,
item=item,
style=style,
)
except Exception as e:
pass # Non-fatal — geometry still exports without colour
+308
View File
@@ -0,0 +1,308 @@
# =============================================================================
# properties.py
# Generically clones all properties from a Speckle object into IFC entities.
#
# Source structure (from _properties / properties):
# Attributes → IFC element attributes (GlobalId, Name, Tag, etc.)
# Property Sets → dict of {pset_name: {prop_name: value}}
# Quantities → dict of {qto_name: {qty_name: {name, units, value}}}
# Building Storey → string, used for storey assignment
# Element Type Attributes → used by type_manager to create IfcTypeObject
# Element Type Property Sets → psets written on the IfcTypeObject
# =============================================================================
import ifcopenshell
import ifcopenshell.api
from specklepy.objects.base import Base
# ---------------------------------------------------------------------------
# Safe access helpers
# ---------------------------------------------------------------------------
def _get(obj, key, default=None):
"""Safe access for both dicts and Speckle Base objects."""
if obj is None:
return default
if isinstance(obj, dict):
return obj.get(key, default)
try:
val = getattr(obj, key, None)
if val is not None:
return val
except Exception:
pass
try:
val = obj[key]
if val is not None:
return val
except Exception:
pass
return default
def _to_dict(obj) -> dict:
"""Convert a Speckle Base object or dict to a plain dict."""
if obj is None:
return {}
if isinstance(obj, dict):
return obj
if hasattr(obj, "get_dynamic_member_names"):
result = {}
try:
for n in obj.get_dynamic_member_names():
try:
result[n] = obj[n]
except Exception:
pass
except Exception:
pass
return result
if hasattr(obj, "items"):
try:
return dict(obj.items())
except Exception:
pass
return {}
# ---------------------------------------------------------------------------
# Property extraction from Speckle object
# ---------------------------------------------------------------------------
def get_properties(obj) -> dict:
"""Get the _properties / properties dict from a Speckle object."""
for key in ["_properties", "properties", "@properties"]:
val = _get(obj, key)
if val is not None:
return _to_dict(val)
return {}
def get_building_storey(obj) -> str:
"""Extract Building Storey name from properties."""
props = get_properties(obj)
storey = props.get("Building Storey")
if storey and isinstance(storey, str):
return storey.strip()
return "Unknown Storey"
def get_attributes(obj) -> dict:
"""Get Attributes dict from properties."""
props = get_properties(obj)
return _to_dict(props.get("Attributes")) or {}
def get_element_name(obj) -> str:
"""Get element name from Attributes, falling back to object name."""
attrs = get_attributes(obj)
name = attrs.get("Name")
if name:
return str(name)
# Fallback to object-level name fields
for key in ["name", "_name"]:
val = _get(obj, key)
if val and isinstance(val, str):
return val
return "unnamed"
# ---------------------------------------------------------------------------
# IFC value creation
# ---------------------------------------------------------------------------
def _make_ifc_value(ifc, value):
"""Create an IFC nominal value entity from a Python value, detecting type."""
if isinstance(value, bool):
return ifc.create_entity("IfcBoolean", wrappedValue=value)
if isinstance(value, int):
return ifc.create_entity("IfcInteger", wrappedValue=value)
if isinstance(value, float):
return ifc.create_entity("IfcReal", wrappedValue=value)
if isinstance(value, list):
return ifc.create_entity("IfcLabel", wrappedValue=", ".join(str(v) for v in value))
return ifc.create_entity("IfcLabel", wrappedValue=str(value))
# ---------------------------------------------------------------------------
# Unit → IFC quantity type mapping
# ---------------------------------------------------------------------------
_UNIT_QTY_MAP = {
"millimetre": ("IfcQuantityLength", "LengthValue"),
"millimeter": ("IfcQuantityLength", "LengthValue"),
"centimetre": ("IfcQuantityLength", "LengthValue"),
"centimeter": ("IfcQuantityLength", "LengthValue"),
"metre": ("IfcQuantityLength", "LengthValue"),
"meter": ("IfcQuantityLength", "LengthValue"),
"foot": ("IfcQuantityLength", "LengthValue"),
"feet": ("IfcQuantityLength", "LengthValue"),
"inch": ("IfcQuantityLength", "LengthValue"),
"square metre": ("IfcQuantityArea", "AreaValue"),
"square meter": ("IfcQuantityArea", "AreaValue"),
"square foot": ("IfcQuantityArea", "AreaValue"),
"cubic metre": ("IfcQuantityVolume", "VolumeValue"),
"cubic meter": ("IfcQuantityVolume", "VolumeValue"),
"cubic foot": ("IfcQuantityVolume", "VolumeValue"),
"kilogram": ("IfcQuantityWeight", "WeightValue"),
"pound": ("IfcQuantityWeight", "WeightValue"),
"degree": ("IfcQuantityCount", "CountValue"),
}
# ---------------------------------------------------------------------------
# Set IFC element attributes from _properties.Attributes
# ---------------------------------------------------------------------------
def set_element_attributes(ifc, element, obj):
"""Set IFC element attributes from _properties.Attributes."""
attrs = get_attributes(obj)
if not attrs:
return
for ifc_attr in ["GlobalId", "Name", "Tag", "ObjectType", "Description"]:
val = attrs.get(ifc_attr)
if val is not None:
try:
setattr(element, ifc_attr, str(val))
except Exception:
pass
# PredefinedType requires special handling (enum value)
ptype = attrs.get("PredefinedType")
if ptype:
try:
element.PredefinedType = str(ptype)
except Exception:
pass
# ---------------------------------------------------------------------------
# Write property sets from _properties.Property Sets
# ---------------------------------------------------------------------------
def write_property_sets(ifc, element, obj):
"""Write all property sets from _properties.Property Sets."""
props = get_properties(obj)
properties_section = _to_dict(props.get("Property Sets"))
if not properties_section:
return
for pset_name, pset_data in properties_section.items():
pset_dict = _to_dict(pset_data) if not isinstance(pset_data, dict) else pset_data
if not pset_dict:
continue
ifc_props = []
for prop_name, prop_value in pset_dict.items():
if prop_name == "id":
continue
if prop_value is None:
continue
try:
nominal = _make_ifc_value(ifc, prop_value)
p = ifc.create_entity(
"IfcPropertySingleValue",
Name=str(prop_name),
NominalValue=nominal,
)
ifc_props.append(p)
except Exception:
continue
if ifc_props:
try:
pset = ifcopenshell.api.run(
"pset.add_pset", ifc, product=element, name=pset_name
)
pset.HasProperties = ifc_props
except Exception as e:
print(f" Warning: {pset_name}: {e}")
# ---------------------------------------------------------------------------
# Write quantity sets from _properties.Quantities
# ---------------------------------------------------------------------------
def _try_float(value):
"""Try to convert a value to float. Returns None on failure."""
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
try:
return float(value)
except ValueError:
return None
return None
def write_quantity_sets(ifc, element, obj):
"""Write all quantity sets from _properties.Quantities."""
props = get_properties(obj)
quantities_section = _to_dict(props.get("Quantities"))
if not quantities_section:
return
for qto_name, qto_data in quantities_section.items():
qto_dict = _to_dict(qto_data) if not isinstance(qto_data, dict) else qto_data
if not qto_dict:
continue
quantities = []
for qty_key, qty_entry in qto_dict.items():
if qty_key == "id":
continue
# Quantity entries can be:
# - {name, units, value} dicts (ArchiCAD / IFC-native)
# - plain numbers or numeric strings (Grasshopper)
if isinstance(qty_entry, dict):
name = qty_entry.get("name", qty_key)
units = (qty_entry.get("units") or "").strip().lower()
value = _try_float(qty_entry.get("value"))
else:
value = _try_float(qty_entry)
if value is None:
continue
name = qty_key
units = ""
if value is None:
continue
try:
mapping = _UNIT_QTY_MAP.get(units)
if mapping:
qty_type, value_attr = mapping
qty = ifc.create_entity(
qty_type, Name=name, **{value_attr: value}
)
else:
qty = ifc.create_entity(
"IfcQuantityCount", Name=name, CountValue=value
)
quantities.append(qty)
except Exception as e:
print(f" Warning: quantity {name}: {e}")
continue
if quantities:
try:
qto = ifcopenshell.api.run(
"pset.add_qto", ifc, product=element, name=qto_name
)
qto.Quantities = quantities
except Exception as e:
print(f" Warning: {qto_name}: {e}")
# ---------------------------------------------------------------------------
# Public API — called from main.py
# ---------------------------------------------------------------------------
def write_all_properties(ifc, element, obj):
"""Write all properties, quantities, and attributes from _properties."""
set_element_attributes(ifc, element, obj)
write_property_sets(ifc, element, obj)
write_quantity_sets(ifc, element, obj)
+75
View File
@@ -0,0 +1,75 @@
# =============================================================================
# receiver.py
# Connects to Speckle and receives the root Base object for a given version.
# =============================================================================
import os
from dotenv import load_dotenv
from specklepy.api.client import SpeckleClient
from specklepy.api.credentials import get_default_account
from specklepy.api import operations
from specklepy.transports.server import ServerTransport
load_dotenv()
SPECKLE_HOST = os.getenv("SPECKLE_SERVER_URL", "https://app.speckle.systems")
SPECKLE_TOKEN = os.getenv("SPECKLE_TOKEN", "")
DEFAULT_UNITS = "mm"
def get_client() -> SpeckleClient:
"""
Create and authenticate a SpeckleClient.
Uses a personal access token from the .env file.
To use your local Speckle Manager account instead, swap to get_default_account().
"""
client = SpeckleClient(host=SPECKLE_HOST)
if SPECKLE_TOKEN and SPECKLE_TOKEN != "YOUR_PERSONAL_ACCESS_TOKEN":
client.authenticate_with_token(SPECKLE_TOKEN)
else:
# Fallback: use account from Speckle Manager desktop app
account = get_default_account()
if account is None:
raise RuntimeError(
"No Speckle account found. Either set SPECKLE_TOKEN in .env "
"or log in via Speckle Manager."
)
client.authenticate_with_account(account)
return client
def receive_version(project_id: str, version_id: str):
"""
Receive the root Base object from a Speckle version.
Args:
project_id: The Speckle project (stream) ID.
version_id: The version (commit) ID to receive.
Returns:
A specklepy Base object — the root of the object graph.
"""
client = get_client()
print(f"🔗 Connecting to {SPECKLE_HOST}...")
print(f"📦 Receiving project={project_id} version={version_id}")
# Get version metadata to find the referenced object ID
version = client.version.get(version_id, project_id)
referenced_object_id = version.referenced_object
# Download the full object graph
transport = ServerTransport(stream_id=project_id, client=client)
base = operations.receive(referenced_object_id, transport)
# Read units from the root object
units = getattr(base, "units", DEFAULT_UNITS) or DEFAULT_UNITS
# IFC file is declared in MILLIMETRES — no conversion needed.
# All geometry stays in source units (mm). scale=1.0 means "keep as-is".
scale = 1.0
print(f"✅ Received root object units={units} scale=1.0 (IFC declared as mm)")
return base, scale
+87
View File
@@ -0,0 +1,87 @@
# =============================================================================
# traversal.py
# Walks the nested Speckle Collection tree generically.
#
# Expected structure:
# Root Collection
# └── Collection
# └── Collection
# └── Object (leaf BIM element)
#
# Collections can nest to any depth. Every non-Collection leaf is yielded.
# =============================================================================
from typing import Generator
from specklepy.objects.base import Base
def is_collection(obj) -> bool:
"""Returns True if this object is a Speckle Collection node (not a leaf element)."""
speckle_type = getattr(obj, "speckle_type", "") or ""
return "Collection" in speckle_type
def get_children(obj) -> list:
"""
Safely get the 'elements' list from a Base/Collection object.
Handles 'elements', '@elements', and '_elements' variants.
"""
for key in ["elements", "@elements", "_elements"]:
try:
val = obj[key]
if val is not None:
return list(val)
except Exception:
continue
return []
def traverse(root: Base) -> Generator[Base, None, None]:
"""
Walk the full Speckle object tree from the root Base object.
Yields every non-Collection leaf object found at any depth.
"""
yield from _walk(root)
def _walk(obj):
"""Recursively walk: descend into Collections, yield leaf objects."""
if obj is None:
return
children = get_children(obj)
if is_collection(obj):
for child in children:
yield from _walk(child)
else:
# Leaf object — yield it
yield obj
# Also check for nested children (e.g. curtain wall sub-elements)
for child in children:
if child is not None and not is_collection(child):
yield from _walk(child)
# --------------------------------------------------------------------------- #
# Debug helper
# --------------------------------------------------------------------------- #
def print_tree(obj: Base, indent: int = 0, max_depth: int = 5):
"""Print the object tree structure for debugging."""
if indent > max_depth:
return
prefix = " " * indent
name = getattr(obj, "name", None) or ""
speckle_type = getattr(obj, "speckle_type", "") or ""
children = get_children(obj)
child_count = f" ({len(children)} children)" if children else ""
print(f"{prefix}├─ [{speckle_type}] name={name!r}{child_count}")
for child in children[:5]:
print_tree(child, indent + 1, max_depth)
if len(children) > 5:
print(f"{prefix} ... and {len(children) - 5} more")
+146
View File
@@ -0,0 +1,146 @@
# =============================================================================
# type_manager.py
# Creates and caches IfcTypeObjects from Element Type Attributes and links
# element instances to them via IfcRelDefinesByType.
#
# Type info comes from _properties:
# Element Type Attributes → type class, Name, GlobalId, Tag, PredefinedType
# Element Type Property Sets → psets written on the type object
#
# Type objects are SHARED — keyed by GlobalId or (type_class, Name).
# =============================================================================
import ifcopenshell
import ifcopenshell.api
from specklepy.objects.base import Base
from utils.properties import get_properties, _to_dict, _make_ifc_value
class TypeManager:
"""
Creates IfcTypeObjects on demand and caches them.
Call assign(element, obj) for each exported element.
"""
def __init__(self, ifc: ifcopenshell.file):
self._ifc = ifc
self._cache: dict[str, object] = {} # cache_key → IfcTypeObject
self._pending: dict[int, list] = {} # type_obj.id() → [element, ...]
def assign(self, element, obj: Base, ifc_class: str = ""):
"""Create or retrieve cached type object and queue the assignment."""
props = get_properties(obj)
type_attrs = _to_dict(props.get("Element Type Attributes"))
type_psets = _to_dict(props.get("Element Type Property Sets"))
if not type_attrs and not type_psets:
return
# Determine type class and cache key
type_class = None
if type_attrs:
type_class = type_attrs.get("type")
if type_class:
# Format A: Element Type Attributes has explicit type info
global_id = type_attrs.get("GlobalId")
name = type_attrs.get("Name") or ""
cache_key = global_id if global_id else f"{type_class}:{name}"
if cache_key not in self._cache:
type_obj = self._create_type(type_class, type_attrs, type_psets)
self._cache[cache_key] = type_obj
else:
# Format B: No explicit type class — derive from element IFC class
type_class = ifc_class + "Type" if ifc_class else None
if not type_class:
return
# Merge: if type_attrs has no 'type' key, it contains psets directly
merged_psets = type_psets.copy() if type_psets else {}
if type_attrs:
merged_psets.update(type_attrs)
cache_key = f"{type_class}:{repr(sorted(merged_psets.items()))}"
if cache_key not in self._cache:
type_obj = self._create_type(type_class, {}, merged_psets)
self._cache[cache_key] = type_obj
type_obj = self._cache[cache_key]
type_id = type_obj.id()
if type_id not in self._pending:
self._pending[type_id] = []
self._pending[type_id].append(element)
def flush(self):
"""Write all IfcRelDefinesByType relationships."""
for type_id, elements in self._pending.items():
type_obj = self._ifc.by_id(type_id)
ifcopenshell.api.run(
"type.assign_type", self._ifc,
related_objects=elements,
relating_type=type_obj,
)
self._pending.clear()
print(f" Type objects created: {len(self._cache)}")
def _create_type(self, type_class: str, type_attrs: dict, type_psets: dict):
"""Instantiate the IfcTypeObject with attributes and property sets."""
ifc = self._ifc
name = type_attrs.get("Name") or ""
type_obj = ifcopenshell.api.run(
"root.create_entity", ifc,
ifc_class=type_class,
name=name,
)
# Set standard attributes
for attr in ["GlobalId", "Tag", "Description", "ElementType"]:
val = type_attrs.get(attr)
if val is not None:
try:
setattr(type_obj, attr, str(val))
except Exception:
pass
# PredefinedType (enum)
ptype = type_attrs.get("PredefinedType")
if ptype:
try:
type_obj.PredefinedType = str(ptype)
except Exception:
pass
# Write type-level property sets
if type_psets:
for pset_name, pset_data in type_psets.items():
pset_dict = _to_dict(pset_data) if not isinstance(pset_data, dict) else pset_data
if not pset_dict:
continue
ifc_props = []
for prop_name, prop_value in pset_dict.items():
if prop_name == "id" or prop_value is None:
continue
try:
nominal = _make_ifc_value(ifc, prop_value)
p = ifc.create_entity(
"IfcPropertySingleValue",
Name=str(prop_name),
NominalValue=nominal,
)
ifc_props.append(p)
except Exception:
continue
if ifc_props:
try:
pset = ifcopenshell.api.run(
"pset.add_pset", ifc, product=type_obj, name=pset_name
)
pset.HasProperties = ifc_props
except Exception:
pass
return type_obj
+160
View File
@@ -0,0 +1,160 @@
# =============================================================================
# writer.py
# Creates and manages the IFC file structure:
# IfcProject → IfcSite → IfcBuilding → IfcBuildingStorey (one per level)
#
# Also provides StoreyManager which lazily creates storeys on demand
# as the traversal encounters new level names.
# =============================================================================
import ifcopenshell
import ifcopenshell.api
def create_ifc_scaffold(
project_name: str = "Default Project",
site_name: str = "Default Site",
building_name: str = "Default Building",
) -> tuple:
"""
Create the IFC file with the required project/site/building hierarchy.
Returns:
(ifc_file, site, building, body_context)
- ifc_file: The ifcopenshell file object
- site: The IfcSite entity
- building: The IfcBuilding entity (storeys are assigned under this)
- body_context: The Body geometry subcontext for shape representations
"""
ifc = ifcopenshell.file(schema="IFC4X3")
# Project
project = ifcopenshell.api.run(
"root.create_entity", ifc,
ifc_class="IfcProject",
name=project_name,
)
# Units — millimetres (matching Revit/Speckle source data)
# This avoids any mm→m conversion errors and keeps coordinates at full precision
ifcopenshell.api.run(
"unit.assign_unit", ifc,
length={"is_metric": True, "raw": "MILLIMETRES"},
)
# Geometry contexts
model_ctx = ifcopenshell.api.run(
"context.add_context", ifc,
context_type="Model",
)
body_ctx = ifcopenshell.api.run(
"context.add_context", ifc,
context_type="Model",
context_identifier="Body",
target_view="MODEL_VIEW",
parent=model_ctx,
)
# Spatial hierarchy
site = ifcopenshell.api.run(
"root.create_entity", ifc,
ifc_class="IfcSite",
name=site_name,
)
building = ifcopenshell.api.run(
"root.create_entity", ifc,
ifc_class="IfcBuilding",
name=building_name,
)
ifcopenshell.api.run(
"aggregate.assign_object", ifc,
relating_object=project,
products=[site],
)
ifcopenshell.api.run(
"aggregate.assign_object", ifc,
relating_object=site,
products=[building],
)
return ifc, site, building, body_ctx
class StoreyManager:
"""
Lazily creates IfcBuildingStorey entities as new level names are encountered.
Keeps storeys in insertion order so the IFC file is logically ordered.
Spatial containment is batched — call flush() after all elements are created
to write all IfcRelContainedInSpatialStructure / aggregate relationships at once.
"""
def __init__(self, ifc: ifcopenshell.file, building):
self.ifc = ifc
self.building = building
self._storeys: dict[str, object] = {} # level_name → IfcBuildingStorey
# Batched containment: storey_id → [element, ...]
self._contained: dict[int, list] = {}
# Batched aggregation (IfcSite etc.): storey_id → [element, ...]
self._aggregated: dict[int, list] = {}
def get_or_create(self, level_name: str):
"""Return existing storey or create a new one for this level name."""
if level_name not in self._storeys:
storey = ifcopenshell.api.run(
"root.create_entity", self.ifc,
ifc_class="IfcBuildingStorey",
name=level_name,
)
ifcopenshell.api.run(
"aggregate.assign_object", self.ifc,
relating_object=self.building,
products=[storey],
)
self._storeys[level_name] = storey
print(f" 🏢 Created storey: {level_name}")
return self._storeys[level_name]
def queue_contain(self, storey, element):
"""Queue an element for spatial containment (batched flush)."""
sid = storey.id()
if sid not in self._contained:
self._contained[sid] = []
self._contained[sid].append(element)
def queue_aggregate(self, storey, element):
"""Queue an element for aggregation under storey (e.g. IfcSite)."""
sid = storey.id()
if sid not in self._aggregated:
self._aggregated[sid] = []
self._aggregated[sid].append(element)
def flush(self):
"""Write all batched spatial containment and aggregation relationships."""
ifc = self.ifc
for sid, elements in self._contained.items():
storey = ifc.by_id(sid)
ifcopenshell.api.run(
"spatial.assign_container", ifc,
relating_structure=storey,
products=elements,
)
for sid, elements in self._aggregated.items():
storey = ifc.by_id(sid)
ifcopenshell.api.run(
"aggregate.assign_object", ifc,
relating_object=storey,
products=elements,
)
self._contained.clear()
self._aggregated.clear()
@property
def count(self) -> int:
return len(self._storeys)
@property
def names(self) -> list[str]:
return list(self._storeys.keys())