Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a0102037c4 |
@@ -0,0 +1,4 @@
|
||||
SPECKLE_TOKEN="mytoken"
|
||||
SPECKLE_SERVER_URL="http://127.0.0.1:3000"
|
||||
SPECKLE_PROJECT_ID=""
|
||||
SPECKLE_AUTOMATION_ID=""
|
||||
@@ -12,7 +12,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v5.0.0
|
||||
- uses: actions/setup-python@v5
|
||||
- uses: actions/setup-python@v6
|
||||
with:
|
||||
python-version: "3.13"
|
||||
- name: Install dependencies
|
||||
@@ -30,4 +30,3 @@ jobs:
|
||||
speckle_function_id: ${{ secrets.SPECKLE_FUNCTION_ID }}
|
||||
speckle_function_input_schema_file_path: ${{ env.FUNCTION_SCHEMA_FILE_NAME }}
|
||||
speckle_function_command: "python -u main.py run"
|
||||
speckle_function_recommended_memory_mi: 8000
|
||||
|
||||
@@ -1,117 +1,174 @@
|
||||
# Speckle → IFC 4.3 Exporter (Rhino)
|
||||
# Speckle Automate function template - Python
|
||||
|
||||
## 🚧 Project Status: WIP
|
||||
Hey there! This project is still under active development, so expect changes, bugs, and incomplete features.
|
||||
If you have any questions or suggestions, don’t hesitate to reach out at: **nikos@speckle.systems**
|
||||
This template repository is for a Speckle Automate function written in Python
|
||||
using the [specklepy](https://pypi.org/project/specklepy/) SDK to interact with Speckle data.
|
||||
|
||||
A [Speckle Automate](https://docs.speckle.systems/developers/automate/introduction) function that converts Speckle models from Rhino into IFC 4X3 files.
|
||||
This template contains the full scaffolding required to publish a function to the Automate environment.
|
||||
It also has some sane defaults for development environment setups.
|
||||
|
||||
> ⚠️ **Note on Model Uploads**
|
||||
>
|
||||
> Large models (greater than 200MB) may fail to upload due to current file size limitations. The team is actively working on resolving this issue.
|
||||
## Getting started
|
||||
|
||||
## How It Works
|
||||
1. Use this template repository to create a new repository in your own / organization's profile.
|
||||
1. Register the function
|
||||
|
||||
1. **Receives** a Speckle model version via Speckle Automate.
|
||||
2. **Traverses** the nested Collection tree to find all BIM elements.
|
||||
3. **Classifies** each element into its IFC type (e.g. `IfcColumn`, `IfcWall`) using the `Attributes.type` property.
|
||||
4. **Exports geometry** — meshes, instances (block definitions), and curves — into IFC representations.
|
||||
5. **Clones properties** — attributes, property sets, and quantities — from the Speckle object onto the IFC entity.
|
||||
6. **Writes** the resulting `.ifc` file, compressed as a `.zip`.
|
||||
### Add new dependencies
|
||||
|
||||
## Performance Optimizations
|
||||
To add new Python package dependencies to the project, edit the `pyproject.toml` file:
|
||||
|
||||
The exporter is optimized for file size and speed:
|
||||
|
||||
- **Geometry deduplication** — identical meshes are hashed (MD5 of vertex + face data) and shared via `IfcRepresentationMap` + `IfcMappedItem`, so instances reuse a single geometry copy.
|
||||
- **Shared property sets** — identical `IfcPropertySet` / `IfcElementQuantity` entities are created once and linked to all elements via batched `IfcRelDefinesByProperties`.
|
||||
- **Batched spatial containment** — `IfcRelContainedInSpatialStructure` and type assignments are written in bulk at the end, not per-element.
|
||||
- **Vertex deduplication & rounding** — near-coincident vertices are merged (0.01mm tolerance) and coordinates rounded to 3 decimal places.
|
||||
- **Direction & value caching** — `IfcDirection`, `IfcCartesianPoint`, and `IfcNominalValue` entities are reused across the file.
|
||||
- **Lazy material creation** — `IfcSurfaceStyle` entities are only created when actually assigned to geometry.
|
||||
- **ZIP compression** — output is compressed before upload.
|
||||
|
||||
## Supported Property Formats
|
||||
|
||||
The exporter handles two property formats:
|
||||
|
||||
### Nested (ArchiCAD / IFC-native)
|
||||
Properties are stored as nested dicts under `_properties`:
|
||||
```
|
||||
_properties:
|
||||
Attributes:
|
||||
Name: "MyColumn"
|
||||
type: "IfcColumn"
|
||||
Property Sets:
|
||||
Pset_ColumnCommon:
|
||||
IsExternal: true
|
||||
Quantities:
|
||||
BaseQuantities:
|
||||
Height: {name: "Height", units: "millimetre", value: 3000}
|
||||
**For packages your function needs to run** (like pandas, requests, etc.):
|
||||
```toml
|
||||
dependencies = [
|
||||
"specklepy==3.0.0",
|
||||
"pandas==2.1.0", # Add production dependencies here
|
||||
]
|
||||
```
|
||||
|
||||
### Flat Dot-Notation (Rhino / Speckle)
|
||||
Properties are stored as flat key-value pairs with dot-separated paths under `properties`:
|
||||
```
|
||||
properties:
|
||||
Attributes.Name: "600S162-43-50"
|
||||
Attributes.type: "IfcColumn"
|
||||
Attributes.GlobalId: "yOTS1rnOhBKW4JIfec29TS"
|
||||
Quantities.BaseQuantities.Gross Weight.value: "15.452"
|
||||
Quantities.BaseQuantities.Member Length.value: "118.938"
|
||||
**For development tools** (like testing or formatting tools):
|
||||
```toml
|
||||
[project.optional-dependencies]
|
||||
dev = [
|
||||
"black==23.12.1",
|
||||
"pytest-mock==3.11.1", # Add development dependencies here
|
||||
# ... other dev tools
|
||||
]
|
||||
```
|
||||
|
||||
The exporter automatically detects the format and unflattens dot-notation keys into the nested structure before processing.
|
||||
**How to decide which section?**
|
||||
- If your `main.py` (or other function logic) imports it → `dependencies`
|
||||
- If it's just a tool to help you code → `[project.optional-dependencies].dev`
|
||||
|
||||
## Instance Handling
|
||||
Example:
|
||||
```python
|
||||
# In your main.py
|
||||
import pandas as pd # ← This goes in dependencies
|
||||
import specklepy # ← This goes in dependencies
|
||||
|
||||
Speckle InstanceProxy objects (block instances) are exported using the IFC mapped representation pattern:
|
||||
|
||||
- Each unique block definition becomes an `IfcRepresentationMap` (geometry stored once).
|
||||
- Each instance becomes an `IfcMappedItem` with an `IfcCartesianTransformationOperator3DnonUniform` encoding the full 4x4 transform (explicit Axis3 for correct orientation with mirrors and non-orthogonal transforms).
|
||||
- Content-based geometry hashing ensures that different definition IDs with identical geometry share the same `IfcRepresentationMap`.
|
||||
|
||||
Supports both Revit-format instances (hex hash definitionId, mm units) and speckleifc-format instances (`DEFINITION:` prefix, metre units).
|
||||
|
||||
## Function Inputs
|
||||
|
||||
| Input | Description |
|
||||
|---|---|
|
||||
| `file_name` | Output IFC filename (timestamp is appended) |
|
||||
| `IFC_PROJECT_NAME` | Name for the IfcProject entity |
|
||||
| `IFC_SITE_NAME` | Name for the IfcSite entity |
|
||||
| `IFC_BUILDING_NAME` | Name for the IfcBuilding entity |
|
||||
|
||||
## Project Structure
|
||||
|
||||
```
|
||||
main.py # Automate entry point — traversal, export loop, IFC writing
|
||||
utils/
|
||||
traversal.py # Walks the Speckle Collection tree
|
||||
mapper.py # Maps Speckle objects → IFC entity classes
|
||||
properties.py # Extracts & writes attributes, property sets, quantities
|
||||
geometry.py # Mesh → IFC geometry conversion (IfcPolygonalFaceSet)
|
||||
instances.py # Block instance / definition handling (RepMap + MappedItem)
|
||||
curves.py # Curve geometry (Polycurve, Line, Arc → IfcIndexedPolyCurve)
|
||||
writer.py # IFC scaffold creation, storey management
|
||||
type_manager.py # IfcTypeObject creation & assignment
|
||||
materials.py # Material colour mapping (IfcSurfaceStyle)
|
||||
helpers.py # Shared utilities (_get, unit scales)
|
||||
# You won't import these in main.py:
|
||||
# pytest, black, mypy ← These go in [project.optional-dependencies].dev
|
||||
```
|
||||
|
||||
## Getting Started
|
||||
### Change launch variables
|
||||
|
||||
### Using with Speckle Automate
|
||||
Describe how the launch.json should be edited.
|
||||
|
||||
1. Go to the Automations tab in your project
|
||||
2. Click New Automation
|
||||
3. Select your function from the library
|
||||
4. Configure function inputs and parameters
|
||||
5. Choose a Speckle model to trigger the automation
|
||||
6. Name your automation and click Create
|
||||
### GitHub Codespaces
|
||||
|
||||
Create a new repo from this template, and use the create new code.
|
||||
|
||||
### Using this Speckle Function
|
||||
|
||||
1. [Create](https://automate.speckle.dev/) a new Speckle Automation.
|
||||
1. Select your Speckle Project and Speckle Model.
|
||||
1. Select the deployed Speckle Function.
|
||||
1. Enter a phrase to use in the comment.
|
||||
1. Click `Create Automation`.
|
||||
|
||||
## Getting Started with Creating Your Own Speckle Function
|
||||
|
||||
1. [Register](https://automate.speckle.dev/) your Function with [Speckle Automate](https://automate.speckle.dev/) and select the Python template.
|
||||
1. A new repository will be created in your GitHub account.
|
||||
1. Make changes to your Function in `main.py`. See below for the Developer Requirements and instructions on how to test.
|
||||
1. To create a new version of your Function, create a new [GitHub release](https://docs.github.com/en/repositories/releasing-projects-on-github/managing-releases-in-a-repository) in your repository.
|
||||
|
||||
## Developer Requirements
|
||||
|
||||
1. Install the following:
|
||||
- [Python 3.11+](https://www.python.org/downloads/)
|
||||
1. Run the following to set up your development environment:
|
||||
```bash
|
||||
python -m venv .venv
|
||||
# On Windows
|
||||
.venv\Scripts\activate
|
||||
# On macOS/Linux
|
||||
source .venv/bin/activate
|
||||
|
||||
pip install --upgrade pip
|
||||
pip install .[dev]
|
||||
```
|
||||
|
||||
**What this installs:**
|
||||
- All the packages your function needs to run (`dependencies`)
|
||||
- Plus development tools like testing and code formatting (`[project.optional-dependencies].dev`)
|
||||
|
||||
**Why separate sections?**
|
||||
- `dependencies`: Only what gets deployed with your function (lightweight)
|
||||
- `dev` dependencies: Extra tools to help you write better code locally
|
||||
|
||||
## Building and Testing
|
||||
|
||||
The code can be tested locally by running `pytest`.
|
||||
|
||||
### Alternative dependency managers
|
||||
|
||||
This template uses the modern **PEP 621** standard in `pyproject.toml`, which works with all modern Python dependency managers:
|
||||
|
||||
#### Using Poetry
|
||||
```bash
|
||||
poetry install # Automatically reads pyproject.toml
|
||||
```
|
||||
|
||||
#### Using uv
|
||||
```bash
|
||||
uv sync # Automatically reads pyproject.toml
|
||||
```
|
||||
|
||||
#### Using pip-tools
|
||||
```bash
|
||||
pip-compile pyproject.toml # Generate requirements.txt from pyproject.toml
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
#### Using pdm
|
||||
```bash
|
||||
pdm install # Automatically reads pyproject.toml
|
||||
```
|
||||
|
||||
**Advantage**: All tools read the same `pyproject.toml` file, so there's no need to keep multiple files in sync!
|
||||
|
||||
### Building and running the Docker Container Image
|
||||
|
||||
Running and testing your code on your machine is a great way to develop your Function; the following instructions are a bit more in-depth and only required if you are having issues with your Function in GitHub Actions or on Speckle Automate.
|
||||
|
||||
#### Building the Docker Container Image
|
||||
|
||||
The GitHub Action packages your code into the format required by Speckle Automate. This is done by building a Docker Image, which Speckle Automate runs. You can attempt to build the Docker Image locally to test the building process.
|
||||
|
||||
To build the Docker Container Image, you must have [Docker](https://docs.docker.com/get-docker/) installed.
|
||||
|
||||
Once you have Docker running on your local machine:
|
||||
|
||||
1. Open a terminal
|
||||
1. Navigate to the directory in which you cloned this repository
|
||||
1. Run the following command:
|
||||
|
||||
```bash
|
||||
docker build -f ./Dockerfile -t speckle_automate_python_example .
|
||||
```
|
||||
|
||||
#### Running the Docker Container Image
|
||||
|
||||
Once the GitHub Action has built the image, it is sent to Speckle Automate. When Speckle Automate runs your Function as part of an Automation, it will run the Docker Container Image. You can test that your Docker Container Image runs correctly locally.
|
||||
|
||||
1. To then run the Docker Container Image, run the following command:
|
||||
|
||||
```bash
|
||||
docker run --rm speckle_automate_python_example \
|
||||
python -u main.py run \
|
||||
'{"projectId": "1234", "modelId": "1234", "branchName": "myBranch", "versionId": "1234", "speckleServerUrl": "https://speckle.xyz", "automationId": "1234", "automationRevisionId": "1234", "automationRunId": "1234", "functionId": "1234", "functionName": "my function", "functionLogo": "base64EncodedPng"}' \
|
||||
'{}' \
|
||||
yourSpeckleServerAuthenticationToken
|
||||
```
|
||||
|
||||
Let's explain this in more detail:
|
||||
|
||||
`docker run—-rm speckle_automate_python_example` tells Docker to run the Docker Container Image we built earlier. `speckle_automate_python_example` is the name of the Docker Container Image. The `--rm` flag tells Docker to remove the container after it has finished running, freeing up space on your machine.
|
||||
|
||||
The line `python -u main.py run` is the command run inside the Docker Container Image. The rest of the command is the arguments passed to the command. The arguments are:
|
||||
|
||||
- `'{"projectId": "1234", "modelId": "1234", "branchName": "myBranch", "versionId": "1234", "speckleServerUrl": "https://speckle.xyz", "automationId": "1234", "automationRevisionId": "1234", "automationRunId": "1234", "functionId": "1234", "functionName": "my function", "functionLogo": "base64EncodedPng"}'` - the metadata that describes the automation and the function.
|
||||
- `{}` - the input parameters for the function the Automation creator can set. Here, they are blank, but you can add your parameters to test your function.
|
||||
- `yourSpeckleServerAuthenticationToken`—the authentication token for the Speckle Server that the Automation can connect to. This is required to interact with the Speckle Server, for example, to get data from the Model.
|
||||
|
||||
## Resources
|
||||
|
||||
- [SpecklePy SDK](https://speckle.guide/dev/python.html)
|
||||
- [Speckle Automate](https://docs.speckle.systems/developers/automate/introduction)
|
||||
- [IFC 4.3 Documentation](https://standards.buildingsmart.org/IFC/RELEASE/IFC4_3/)
|
||||
- [Learn](https://speckle.guide/dev/python.html) more about SpecklePy and interacting with Speckle from Python.
|
||||
|
||||
+27
@@ -0,0 +1,27 @@
|
||||
"""Helper module for a simple speckle object tree flattening."""
|
||||
|
||||
from collections.abc import Iterable
|
||||
|
||||
from specklepy.objects import Base
|
||||
|
||||
|
||||
def flatten_base(base: Base) -> Iterable[Base]:
|
||||
"""Flatten a base object into an iterable of bases.
|
||||
|
||||
This function recursively traverses the `elements` or `@elements` attribute of the
|
||||
base object, yielding each nested base object.
|
||||
|
||||
Args:
|
||||
base (Base): The base object to flatten.
|
||||
|
||||
Yields:
|
||||
Base: Each nested base object in the hierarchy.
|
||||
"""
|
||||
# Attempt to get the elements attribute, fallback to @elements if necessary
|
||||
elements = getattr(base, "elements", getattr(base, "@elements", None))
|
||||
|
||||
if elements is not None:
|
||||
for element in elements:
|
||||
yield from flatten_base(element)
|
||||
|
||||
yield base
|
||||
@@ -1,38 +1,18 @@
|
||||
import zipfile
|
||||
from datetime import datetime
|
||||
"""This module contains the function's business logic.
|
||||
|
||||
import ifcopenshell.api
|
||||
Use the automation_context module to wrap your function in an Automate context helper.
|
||||
"""
|
||||
|
||||
from utils.traversal import traverse, print_tree
|
||||
from utils.mapper import classify
|
||||
from utils.geometry import mesh_to_ifc, get_display_instances, _make_placement
|
||||
from utils.instances import (
|
||||
is_instance, is_definition_source, instance_to_ifc, build_definition_map,
|
||||
print_instance_stats, get_definition_object,
|
||||
)
|
||||
from utils.properties import (
|
||||
get_building_storey, get_element_name, write_all_properties,
|
||||
PropertySetManager,
|
||||
)
|
||||
from utils.curves import curve_to_ifc
|
||||
from utils.writer import create_ifc_scaffold, StoreyManager
|
||||
from utils.type_manager import TypeManager
|
||||
from utils.materials import MaterialManager
|
||||
|
||||
|
||||
SPATIAL_STRUCTURE_TYPES = {
|
||||
"IfcBuilding", "IfcBuildingStorey",
|
||||
"IfcExternalSpatialElement", "IfcSpatialZone",
|
||||
"IfcGrid", "IfcAnnotation",
|
||||
}
|
||||
|
||||
from pydantic import Field
|
||||
from pydantic import Field, SecretStr
|
||||
from speckle_automate import (
|
||||
AutomateBase,
|
||||
AutomationContext,
|
||||
execute_automate_function,
|
||||
)
|
||||
|
||||
from flatten import flatten_base
|
||||
|
||||
|
||||
class FunctionInputs(AutomateBase):
|
||||
"""These are function author-defined values.
|
||||
|
||||
@@ -40,237 +20,77 @@ class FunctionInputs(AutomateBase):
|
||||
Please use the pydantic model schema to define your inputs:
|
||||
https://docs.pydantic.dev/latest/usage/models/
|
||||
"""
|
||||
file_name: str = Field(
|
||||
title="File Name",
|
||||
description="The name of the IFC file.",
|
||||
|
||||
# An example of how to use secret values.
|
||||
whisper_message: SecretStr = Field(title="This is a secret message")
|
||||
forbidden_speckle_type: str = Field(
|
||||
title="Forbidden speckle type",
|
||||
description=(
|
||||
"If a object has the following speckle_type,"
|
||||
" it will be marked with an error."
|
||||
),
|
||||
)
|
||||
IFC_PROJECT_NAME : str = Field(
|
||||
title="IFC Project Name",
|
||||
description="The name of the IFC project.",
|
||||
)
|
||||
IFC_SITE_NAME : str = Field(
|
||||
title="IFC Site Name",
|
||||
description="The name of the IFC site.",
|
||||
)
|
||||
IFC_BUILDING_NAME : str = Field(
|
||||
title="IFC Building Name",
|
||||
description="The name of the IFC building.",
|
||||
)
|
||||
|
||||
|
||||
def automate_function(
|
||||
automate_context: AutomationContext,
|
||||
function_inputs: FunctionInputs,
|
||||
) -> None:
|
||||
print("=" * 60)
|
||||
print(" Speckle -> IFC4.3 Exporter")
|
||||
print("=" * 60)
|
||||
"""This is an example Speckle Automate function.
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# 1. Receive
|
||||
# ------------------------------------------------------------------ #
|
||||
base = automate_context.receive_version()
|
||||
Args:
|
||||
automate_context: A context-helper object that carries relevant information
|
||||
about the runtime context of this function.
|
||||
It gives access to the Speckle project data that triggered this run.
|
||||
It also has convenient methods for attaching results to the Speckle model.
|
||||
function_inputs: An instance object matching the defined schema.
|
||||
"""
|
||||
# The context provides a convenient way to receive the triggering version.
|
||||
version_root_object = automate_context.receive_version()
|
||||
|
||||
# Uncomment to debug object tree:
|
||||
# print_tree(base)
|
||||
objects_with_forbidden_speckle_type = [
|
||||
b
|
||||
for b in flatten_base(version_root_object)
|
||||
if b.speckle_type == function_inputs.forbidden_speckle_type
|
||||
]
|
||||
count = len(objects_with_forbidden_speckle_type)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# 2. Build definition map (for instance resolution)
|
||||
# ------------------------------------------------------------------ #
|
||||
definition_map = build_definition_map(base)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# 3. Set up IFC
|
||||
# ------------------------------------------------------------------ #
|
||||
ifc, _site, building, body_context = create_ifc_scaffold(
|
||||
project_name=function_inputs.IFC_PROJECT_NAME,
|
||||
site_name=function_inputs.IFC_SITE_NAME,
|
||||
building_name=function_inputs.IFC_BUILDING_NAME,
|
||||
)
|
||||
storey_manager = StoreyManager(ifc, building)
|
||||
material_manager = MaterialManager(ifc, base)
|
||||
material_manager.build_definition_material_map(definition_map)
|
||||
type_manager = TypeManager(ifc)
|
||||
property_manager = PropertySetManager(ifc)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# 4. Traverse & export
|
||||
# ------------------------------------------------------------------ #
|
||||
total = 0
|
||||
no_geometry = 0
|
||||
skipped_spatial = 0
|
||||
instance_count = 0
|
||||
|
||||
print(f"\nProcessing elements...\n")
|
||||
|
||||
for obj in traverse(base):
|
||||
|
||||
ifc_class = classify(obj)
|
||||
|
||||
if ifc_class in SPATIAL_STRUCTURE_TYPES:
|
||||
skipped_spatial += 1
|
||||
continue
|
||||
|
||||
# Skip objects that serve as instance definition geometry sources
|
||||
if is_definition_source(obj, definition_map):
|
||||
continue
|
||||
|
||||
# Get building storey from properties
|
||||
storey_name = get_building_storey(obj)
|
||||
storey = storey_manager.get_or_create(storey_name)
|
||||
|
||||
# Get element name
|
||||
name = get_element_name(obj)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Path A: Instance object (has transform + definitionId)
|
||||
# ------------------------------------------------------------------ #
|
||||
if is_instance(obj):
|
||||
# Try to get a better IFC class from the definition object
|
||||
if ifc_class == "IfcBuildingElementProxy":
|
||||
def_obj = get_definition_object(obj, definition_map)
|
||||
if def_obj:
|
||||
ifc_class = classify(def_obj)
|
||||
|
||||
rep, placement = instance_to_ifc(
|
||||
ifc, body_context, obj, definition_map,
|
||||
scale=1.0, material_manager=material_manager,
|
||||
)
|
||||
if not rep:
|
||||
no_geometry += 1
|
||||
continue
|
||||
element = _create_element(ifc, ifc_class, name, rep, placement,
|
||||
storey, storey_manager=storey_manager)
|
||||
write_all_properties(ifc, element, obj, property_manager)
|
||||
type_manager.assign(element, obj, ifc_class)
|
||||
instance_count += 1
|
||||
total += 1
|
||||
|
||||
else:
|
||||
# ------------------------------------------------------------------ #
|
||||
# Path B: Normal object — may have:
|
||||
# B1. Direct mesh geometry in displayValue
|
||||
# B2. Instance objects in displayValue
|
||||
# B3. Curve geometry (Polycurve, Line, Arc)
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
# B1: Mesh geometry
|
||||
rep, placement = mesh_to_ifc(
|
||||
ifc, body_context, obj, scale=1.0,
|
||||
material_manager=material_manager,
|
||||
)
|
||||
if rep:
|
||||
element = _create_element(ifc, ifc_class, name, rep, placement,
|
||||
storey, storey_manager=storey_manager)
|
||||
write_all_properties(ifc, element, obj, property_manager)
|
||||
type_manager.assign(element, obj, ifc_class)
|
||||
total += 1
|
||||
|
||||
# B2: Instance objects nested inside displayValue
|
||||
nested_instances = get_display_instances(obj)
|
||||
for inst in nested_instances:
|
||||
inst_rep, inst_placement = instance_to_ifc(
|
||||
ifc, body_context, inst, definition_map,
|
||||
scale=1.0, material_manager=material_manager,
|
||||
)
|
||||
if not inst_rep:
|
||||
no_geometry += 1
|
||||
continue
|
||||
inst_element = _create_element(
|
||||
ifc, ifc_class, name, inst_rep, inst_placement,
|
||||
storey, storey_manager=storey_manager,
|
||||
)
|
||||
write_all_properties(ifc, inst_element, obj, property_manager)
|
||||
type_manager.assign(inst_element, obj, ifc_class)
|
||||
instance_count += 1
|
||||
total += 1
|
||||
|
||||
# B3: Curve geometry (Polycurve, Line, Arc) — fallback if no mesh/instances
|
||||
if not rep and not nested_instances:
|
||||
rep, placement = curve_to_ifc(
|
||||
ifc, body_context, obj, scale=1.0,
|
||||
material_manager=material_manager,
|
||||
)
|
||||
if rep:
|
||||
element = _create_element(ifc, ifc_class, name, rep, placement,
|
||||
storey, storey_manager=storey_manager)
|
||||
write_all_properties(ifc, element, obj, property_manager)
|
||||
type_manager.assign(element, obj, ifc_class)
|
||||
total += 1
|
||||
|
||||
# Track if no path produced geometry
|
||||
if not rep and not nested_instances:
|
||||
no_geometry += 1
|
||||
|
||||
if total % 100 == 0 and total > 0:
|
||||
print(f" ... processed {total} elements")
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# 5. Write output
|
||||
# ------------------------------------------------------------------ #
|
||||
print("\nFlushing spatial containment...")
|
||||
storey_manager.flush()
|
||||
print("Flushing type relationships...")
|
||||
type_manager.flush()
|
||||
print("Flushing shared property sets...")
|
||||
property_manager.flush()
|
||||
|
||||
file_name = function_inputs.file_name
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
|
||||
ifc_filename = f"{file_name}_{timestamp}.ifc"
|
||||
|
||||
ifc.write(ifc_filename)
|
||||
print(f"\nIFC file written: {ifc_filename}")
|
||||
|
||||
zip_filename = f"{file_name}_{timestamp}.zip"
|
||||
with zipfile.ZipFile(zip_filename, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||
zf.write(ifc_filename)
|
||||
print(f"Zipped: {zip_filename}")
|
||||
|
||||
try:
|
||||
automate_context.mark_run_success("Success! You can download the IFC file below.")
|
||||
automate_context.store_file_result(f"./{zip_filename}")
|
||||
except Exception as e:
|
||||
print(f" Could not upload file result (network issue?): {e}")
|
||||
automate_context.mark_run_failed(f"Something went wrong when storing file result. Exception detail: {e}")
|
||||
|
||||
print(f"\n{'=' * 60}")
|
||||
print(f" Export complete!")
|
||||
print(f" Total exported : {total}")
|
||||
print(f" Instances : {instance_count}")
|
||||
print(f" Without geometry : {no_geometry}")
|
||||
print(f" Skipped (spatial) : {skipped_spatial}")
|
||||
print(f" Storeys created : {storey_manager.count}")
|
||||
print(f" Levels : {', '.join(storey_manager.names)}")
|
||||
print_instance_stats()
|
||||
material_manager.print_stats()
|
||||
property_manager.print_stats()
|
||||
print(f"{'=' * 60}\n")
|
||||
|
||||
def _create_element(ifc, ifc_class, name, rep, placement, storey,
|
||||
storey_manager=None):
|
||||
"""Helper: create an IFC element, assign geometry + placement, queue containment."""
|
||||
element = ifcopenshell.api.run("root.create_entity", ifc,
|
||||
ifc_class=ifc_class, name=str(name))
|
||||
|
||||
if rep and placement:
|
||||
element.Representation = ifc.createIfcProductDefinitionShape(
|
||||
Representations=(rep,)
|
||||
if count > 0:
|
||||
# This is how a run is marked with a failure cause.
|
||||
automate_context.attach_error_to_objects(
|
||||
category="Forbidden speckle_type"
|
||||
f" ({function_inputs.forbidden_speckle_type})",
|
||||
affected_objects=objects_with_forbidden_speckle_type,
|
||||
message="This project should not contain the type: "
|
||||
f"{function_inputs.forbidden_speckle_type}",
|
||||
)
|
||||
automate_context.mark_run_failed(
|
||||
"Automation failed: "
|
||||
f"Found {count} object that have one of the forbidden speckle types: "
|
||||
f"{function_inputs.forbidden_speckle_type}"
|
||||
)
|
||||
element.ObjectPlacement = placement
|
||||
elif placement:
|
||||
element.ObjectPlacement = placement
|
||||
else:
|
||||
element.ObjectPlacement = _make_placement(ifc, 0.0, 0.0, 0.0)
|
||||
|
||||
# Queue spatial assignment (batched flush at end for performance)
|
||||
if storey_manager:
|
||||
if ifc_class in ("IfcSite", "IfcSpace"):
|
||||
storey_manager.queue_aggregate(storey, element)
|
||||
else:
|
||||
storey_manager.queue_contain(storey, element)
|
||||
return element
|
||||
# Set the automation context view to the original model/version view
|
||||
# to show the offending objects.
|
||||
automate_context.set_context_view()
|
||||
|
||||
else:
|
||||
automate_context.mark_run_success("No forbidden types found.")
|
||||
|
||||
# If the function generates file results, this is how it can be
|
||||
# attached to the Speckle project/model
|
||||
# automate_context.store_file_result("./report.pdf")
|
||||
|
||||
|
||||
def automate_function_without_inputs(automate_context: AutomationContext) -> None:
|
||||
"""A function example without inputs.
|
||||
|
||||
If your function does not need any input variables,
|
||||
besides what the automation context provides,
|
||||
the inputs argument can be omitted.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
# make sure to call the function with the executor
|
||||
if __name__ == "__main__":
|
||||
@@ -280,4 +100,4 @@ if __name__ == "__main__":
|
||||
execute_automate_function(automate_function, FunctionInputs)
|
||||
|
||||
# If the function has no arguments, the executor can handle it like so
|
||||
# execute_automate_function(automate_function_without_inputs)
|
||||
# execute_automate_function(automate_function_without_inputs)
|
||||
|
||||
+4
-6
@@ -7,16 +7,14 @@ maintainers = [{ name = "Speckle Systems", email = "hello@speckle.systems" }]
|
||||
description = "A Speckle Automate function template using specklepy"
|
||||
readme = "README.md"
|
||||
license = "Apache-2.0"
|
||||
keywords = ["speckle", "automate", "bim", "aec", "ifc", "export", "revit"]
|
||||
keywords = ["speckle", "automate", "bim", "aec"]
|
||||
|
||||
dependencies = ["specklepy==3.1.0",
|
||||
"ifcopenshell==0.8.4.post1",
|
||||
"python-dotenv>=1.0.0",]
|
||||
dependencies = ["specklepy==3.1.0"]
|
||||
|
||||
[project.optional-dependencies]
|
||||
dev = [
|
||||
"mypy==1.13.0",
|
||||
"pytest==9.0.3",
|
||||
"pytest==7.4.4",
|
||||
"ruff==0.11.12",
|
||||
]
|
||||
|
||||
@@ -36,4 +34,4 @@ select = [
|
||||
convention = "google"
|
||||
|
||||
[tool.setuptools]
|
||||
py-modules = []
|
||||
py-modules = []
|
||||
|
||||
@@ -23,10 +23,8 @@ def test_function_run(
|
||||
automation_context,
|
||||
automate_function,
|
||||
FunctionInputs(
|
||||
file_name="test_output.ifc",
|
||||
IFC_PROJECT_NAME = "Speckle Export",
|
||||
IFC_SITE_NAME = "Site",
|
||||
IFC_BUILDING_NAME = "Building"
|
||||
forbidden_speckle_type="None",
|
||||
whisper_message=SecretStr("testing automatically"),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
-357
@@ -1,357 +0,0 @@
|
||||
# =============================================================================
|
||||
# curves.py
|
||||
# Converts Speckle 2D curve geometry (Polycurve, Line, Arc, Circle, Polyline)
|
||||
# into IFC IfcIndexedPolyCurve representations.
|
||||
#
|
||||
# Curve types in segments:
|
||||
# - Objects.Geometry.Line → start/end Points → IfcLineIndex
|
||||
# - Objects.Geometry.Arc → startPoint/midPoint/endPoint → IfcArcIndex
|
||||
# - Objects.Geometry.Circle → converted to arc segments
|
||||
# - Objects.Geometry.Polyline → point sequence → IfcLineIndex chains
|
||||
#
|
||||
# The result is an IfcIndexedPolyCurve with IfcCartesianPointList3D.
|
||||
# =============================================================================
|
||||
|
||||
import ifcopenshell
|
||||
import ifcopenshell.api
|
||||
from specklepy.objects.base import Base
|
||||
from utils.helpers import _get, MM_SCALES
|
||||
from utils.geometry import _get_shared, _make_placement
|
||||
|
||||
|
||||
# Speckle types that are curve geometry
|
||||
_CURVE_TYPES = {"Line", "Arc", "Circle", "Ellipse", "Polycurve", "Polyline", "Curve"}
|
||||
|
||||
|
||||
def is_curve(obj) -> bool:
|
||||
"""Return True if this object is a Speckle curve type."""
|
||||
speckle_type = _get(obj, "speckle_type") or ""
|
||||
return any(ct in speckle_type for ct in _CURVE_TYPES)
|
||||
|
||||
|
||||
def _resolve_scale(obj, fallback: float) -> float:
|
||||
"""Resolve unit scale for a curve object."""
|
||||
units = _get(obj, "units")
|
||||
if units and isinstance(units, str):
|
||||
return MM_SCALES.get(units.lower().strip(), fallback)
|
||||
return fallback
|
||||
|
||||
|
||||
def _point_coords(pt, scale: float) -> tuple:
|
||||
"""Extract (x, y, z) from a Speckle Point, scaled to mm and rounded."""
|
||||
x = round(float(_get(pt, "x") or 0) * scale, 3)
|
||||
y = round(float(_get(pt, "y") or 0) * scale, 3)
|
||||
z = round(float(_get(pt, "z") or 0) * scale, 3)
|
||||
return x, y, z
|
||||
|
||||
|
||||
def _extract_polycurve(obj, scale: float) -> tuple:
|
||||
"""
|
||||
Extract points and segment indices from a Polycurve.
|
||||
|
||||
Returns (points_3d, segments) where:
|
||||
points_3d: list of [x, y, z] coordinate lists
|
||||
segments: list of IfcLineIndex/IfcArcIndex-compatible tuples
|
||||
("line", [i, j]) or ("arc", [i, mid, j]) (1-based)
|
||||
"""
|
||||
segments_raw = _get(obj, "segments") or []
|
||||
if not isinstance(segments_raw, list):
|
||||
segments_raw = list(segments_raw)
|
||||
if not segments_raw:
|
||||
return [], []
|
||||
|
||||
obj_scale = _resolve_scale(obj, scale)
|
||||
points = [] # list of [x, y, z]
|
||||
point_map = {} # (rounded_x, rounded_y, rounded_z) → 1-based index
|
||||
ifc_segments = []
|
||||
|
||||
def _add_point(pt, seg_scale: float) -> int:
|
||||
"""Add a point and return its 1-based index (deduplicating nearby points)."""
|
||||
x, y, z = _point_coords(pt, seg_scale)
|
||||
# Snap to 0.01mm grid for deduplication
|
||||
key = (round(x * 100), round(y * 100), round(z * 100))
|
||||
if key in point_map:
|
||||
return point_map[key]
|
||||
idx = len(points) + 1 # 1-based for IFC
|
||||
points.append([x, y, z])
|
||||
point_map[key] = idx
|
||||
return idx
|
||||
|
||||
for seg in segments_raw:
|
||||
if seg is None:
|
||||
continue
|
||||
seg_type = (_get(seg, "speckle_type") or "").split(".")[-1]
|
||||
seg_scale = _resolve_scale(seg, obj_scale)
|
||||
|
||||
if seg_type == "Line":
|
||||
start_pt = _get(seg, "start")
|
||||
end_pt = _get(seg, "end")
|
||||
if start_pt is None or end_pt is None:
|
||||
continue
|
||||
i = _add_point(start_pt, seg_scale)
|
||||
j = _add_point(end_pt, seg_scale)
|
||||
if i != j:
|
||||
ifc_segments.append(("line", [i, j]))
|
||||
|
||||
elif seg_type == "Arc":
|
||||
start_pt = _get(seg, "startPoint")
|
||||
mid_pt = _get(seg, "midPoint")
|
||||
end_pt = _get(seg, "endPoint")
|
||||
if start_pt is None or mid_pt is None or end_pt is None:
|
||||
continue
|
||||
i = _add_point(start_pt, seg_scale)
|
||||
m = _add_point(mid_pt, seg_scale)
|
||||
j = _add_point(end_pt, seg_scale)
|
||||
if i != j and i != m and m != j:
|
||||
ifc_segments.append(("arc", [i, m, j]))
|
||||
|
||||
elif seg_type == "Polyline":
|
||||
raw_value = _get(seg, "value") or []
|
||||
if not raw_value:
|
||||
continue
|
||||
values = list(raw_value) if not isinstance(raw_value, list) else raw_value
|
||||
indices = []
|
||||
for vi in range(0, len(values) - 2, 3):
|
||||
x = round(float(values[vi]) * seg_scale, 3)
|
||||
y = round(float(values[vi + 1]) * seg_scale, 3)
|
||||
z = round(float(values[vi + 2]) * seg_scale, 3)
|
||||
key = (round(x * 100), round(y * 100), round(z * 100))
|
||||
if key in point_map:
|
||||
idx = point_map[key]
|
||||
else:
|
||||
idx = len(points) + 1
|
||||
points.append([x, y, z])
|
||||
point_map[key] = idx
|
||||
indices.append(idx)
|
||||
if len(indices) >= 2:
|
||||
ifc_segments.append(("line", indices))
|
||||
|
||||
return points, ifc_segments
|
||||
|
||||
|
||||
def _extract_single_line(obj, scale: float) -> tuple:
|
||||
"""Extract a single Line as points + segment."""
|
||||
obj_scale = _resolve_scale(obj, scale)
|
||||
start_pt = _get(obj, "start")
|
||||
end_pt = _get(obj, "end")
|
||||
if start_pt is None or end_pt is None:
|
||||
return [], []
|
||||
sx, sy, sz = _point_coords(start_pt, obj_scale)
|
||||
ex, ey, ez = _point_coords(end_pt, obj_scale)
|
||||
return [[sx, sy, sz], [ex, ey, ez]], [("line", [1, 2])]
|
||||
|
||||
|
||||
def _extract_single_arc(obj, scale: float) -> tuple:
|
||||
"""Extract a single Arc as points + segment."""
|
||||
obj_scale = _resolve_scale(obj, scale)
|
||||
start_pt = _get(obj, "startPoint")
|
||||
mid_pt = _get(obj, "midPoint")
|
||||
end_pt = _get(obj, "endPoint")
|
||||
if start_pt is None or mid_pt is None or end_pt is None:
|
||||
return [], []
|
||||
sx, sy, sz = _point_coords(start_pt, obj_scale)
|
||||
mx, my, mz = _point_coords(mid_pt, obj_scale)
|
||||
ex, ey, ez = _point_coords(end_pt, obj_scale)
|
||||
return [[sx, sy, sz], [mx, my, mz], [ex, ey, ez]], [("arc", [1, 2, 3])]
|
||||
|
||||
|
||||
def extract_curve_data(obj, scale: float = 1.0) -> tuple:
|
||||
"""
|
||||
Extract curve points and segments from any supported curve type.
|
||||
Returns (points_3d, segments) or ([], []) if not a curve.
|
||||
"""
|
||||
speckle_type = (_get(obj, "speckle_type") or "").split(".")[-1]
|
||||
|
||||
if speckle_type == "Polycurve":
|
||||
return _extract_polycurve(obj, scale)
|
||||
elif speckle_type == "Line":
|
||||
return _extract_single_line(obj, scale)
|
||||
elif speckle_type == "Arc":
|
||||
return _extract_single_arc(obj, scale)
|
||||
return [], []
|
||||
|
||||
|
||||
def build_ifc_curve(ifc, points: list, segments: list):
|
||||
"""
|
||||
Build an IfcIndexedPolyCurve from points and segment descriptors.
|
||||
|
||||
points: list of [x, y, z] coordinates
|
||||
segments: list of ("line", [indices]) or ("arc", [indices])
|
||||
|
||||
Returns IfcIndexedPolyCurve or None.
|
||||
"""
|
||||
if not points or not segments:
|
||||
return None
|
||||
|
||||
point_list = ifc.createIfcCartesianPointList3D(points)
|
||||
|
||||
ifc_segments = []
|
||||
for seg_type, indices in segments:
|
||||
if seg_type == "arc":
|
||||
ifc_segments.append(ifc.create_entity("IfcArcIndex", indices))
|
||||
else:
|
||||
ifc_segments.append(ifc.create_entity("IfcLineIndex", indices))
|
||||
|
||||
if not ifc_segments:
|
||||
return None
|
||||
|
||||
return ifc.createIfcIndexedPolyCurve(
|
||||
Points=point_list,
|
||||
Segments=ifc_segments,
|
||||
SelfIntersect=False,
|
||||
)
|
||||
|
||||
|
||||
def get_display_curves(obj) -> list:
|
||||
"""
|
||||
Collect curve objects from an object's displayValue, or the object itself.
|
||||
Returns a list of curve objects (Polycurve, Line, Arc, etc.).
|
||||
"""
|
||||
curves = []
|
||||
for key in ["displayValue", "@displayValue", "_displayValue"]:
|
||||
display = _get(obj, key)
|
||||
if display is None:
|
||||
continue
|
||||
items = display if isinstance(display, list) else [display]
|
||||
for item in items:
|
||||
if item is not None and is_curve(item):
|
||||
curves.append(item)
|
||||
if curves:
|
||||
break
|
||||
|
||||
# Fallback: the object itself is a curve
|
||||
if not curves and is_curve(obj):
|
||||
curves.append(obj)
|
||||
|
||||
return curves
|
||||
|
||||
|
||||
def curve_to_ifc(
|
||||
ifc: ifcopenshell.file,
|
||||
body_context,
|
||||
obj: Base,
|
||||
scale: float = 1.0,
|
||||
material_manager=None,
|
||||
) -> tuple:
|
||||
"""
|
||||
Convert a Speckle object with curve geometry → (IfcShapeRepresentation, IfcLocalPlacement).
|
||||
Looks for curves in displayValue first, then checks the object itself.
|
||||
Creates one IfcIndexedPolyCurve per curve item.
|
||||
Returns (None, None) if no usable curve geometry.
|
||||
"""
|
||||
curves = get_display_curves(obj)
|
||||
if not curves:
|
||||
return None, None
|
||||
|
||||
obj_app_id = _get(obj, "applicationId")
|
||||
obj_scale = _resolve_scale(obj, scale)
|
||||
|
||||
# Collect curve data and compute origin incrementally (no all_points accumulation)
|
||||
curve_cache = [] # list of (points, segments) or None
|
||||
xmin = ymin = zmin = float("inf")
|
||||
xmax = ymax = float("-inf")
|
||||
has_points = False
|
||||
|
||||
for curve_obj in curves:
|
||||
points, segments = extract_curve_data(curve_obj, obj_scale)
|
||||
if points and segments:
|
||||
curve_cache.append((points, segments))
|
||||
has_points = True
|
||||
for p in points:
|
||||
x, y, z = p[0], p[1], p[2]
|
||||
if x < xmin: xmin = x
|
||||
if x > xmax: xmax = x
|
||||
if y < ymin: ymin = y
|
||||
if y > ymax: ymax = y
|
||||
if z < zmin: zmin = z
|
||||
else:
|
||||
curve_cache.append(None)
|
||||
|
||||
if not has_points:
|
||||
return None, None
|
||||
|
||||
ox = (xmin + xmax) / 2.0
|
||||
oy = (ymin + ymax) / 2.0
|
||||
oz = zmin
|
||||
|
||||
# Build IFC curve entities
|
||||
geom_items = []
|
||||
for i, cached in enumerate(curve_cache):
|
||||
if cached is None:
|
||||
continue
|
||||
points, segments = cached
|
||||
|
||||
offset_points = [
|
||||
[p[0] - ox, p[1] - oy, p[2] - oz] for p in points
|
||||
]
|
||||
|
||||
curve_entity = build_ifc_curve(ifc, offset_points, segments)
|
||||
if curve_entity is None:
|
||||
continue
|
||||
|
||||
# Apply material
|
||||
if material_manager:
|
||||
curve_app_id = _get(curves[i], "applicationId") or obj_app_id
|
||||
if curve_app_id:
|
||||
material_manager.apply_to_item(curve_entity, str(curve_app_id))
|
||||
|
||||
geom_items.append(curve_entity)
|
||||
|
||||
if not geom_items:
|
||||
return None, None
|
||||
|
||||
rep = ifc.createIfcShapeRepresentation(
|
||||
ContextOfItems=body_context,
|
||||
RepresentationIdentifier="Body",
|
||||
RepresentationType="Curve3D",
|
||||
Items=geom_items,
|
||||
)
|
||||
placement = _make_placement(ifc, ox, oy, oz)
|
||||
return rep, placement
|
||||
|
||||
|
||||
def build_curve_rep_map(ifc, body_context, obj, scale: float = 1.0,
|
||||
material_manager=None, fallback_app_ids: list = None,
|
||||
definition_id: str = None):
|
||||
"""
|
||||
Build an IfcRepresentationMap from a curve definition object.
|
||||
Used for instance-based curve geometry (shared across instances).
|
||||
Returns IfcRepresentationMap or None.
|
||||
"""
|
||||
points, segments = extract_curve_data(obj, scale)
|
||||
if not points or not segments:
|
||||
return None
|
||||
|
||||
curve_entity = build_ifc_curve(ifc, points, segments)
|
||||
if curve_entity is None:
|
||||
return None
|
||||
|
||||
# Apply material (3-tier: object app_id → fallbacks → definition)
|
||||
if material_manager:
|
||||
app_id = _get(obj, "applicationId")
|
||||
style = material_manager.get_style_with_fallbacks(
|
||||
primary_app_id=str(app_id) if app_id else None,
|
||||
fallback_app_ids=fallback_app_ids,
|
||||
definition_id=definition_id,
|
||||
)
|
||||
if style:
|
||||
try:
|
||||
ifcopenshell.api.run(
|
||||
"style.assign_item_style", ifc,
|
||||
item=curve_entity, style=style,
|
||||
)
|
||||
material_manager._apply_count += 1
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
shared = _get_shared(ifc)
|
||||
a2p = ifc.createIfcAxis2Placement3D(shared["origin_0"], None, None)
|
||||
|
||||
mapped_rep = ifc.createIfcShapeRepresentation(
|
||||
ContextOfItems=body_context,
|
||||
RepresentationIdentifier="Body",
|
||||
RepresentationType="Curve3D",
|
||||
Items=[curve_entity],
|
||||
)
|
||||
|
||||
return ifc.createIfcRepresentationMap(a2p, mapped_rep)
|
||||
@@ -1,447 +0,0 @@
|
||||
# =============================================================================
|
||||
# geometry.py
|
||||
# Converts Speckle DataObject geometry → IFC IfcPolygonalFaceSet + IfcLocalPlacement
|
||||
#
|
||||
# Key facts:
|
||||
# - After specklepy receive(), vertices and faces are FLAT Python lists
|
||||
# - displayValue is an array of Mesh objects
|
||||
# - Units are in mm (for Revit), scale to metres for IFC
|
||||
# - Vertices are in absolute world coordinates
|
||||
# - Uses IfcPolygonalFaceSet (indexed vertices) instead of IfcFacetedBrep
|
||||
# for compact output — each vertex stored once, not once per face.
|
||||
# =============================================================================
|
||||
|
||||
import ifcopenshell
|
||||
from specklepy.objects.base import Base
|
||||
from utils.helpers import _get, MM_SCALES as _UNIT_SCALES
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Geometry validation helpers (GEM111 fix)
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
# Minimum distance in mm below which two vertices are considered identical (GEM111).
|
||||
_VERTEX_MERGE_TOL = 0.01 # 0.01 mm
|
||||
_INV_TOL = 1.0 / _VERTEX_MERGE_TOL # pre-computed: multiply instead of divide
|
||||
|
||||
|
||||
def build_ifc_facesets(ifc, verts_scaled: list, face_groups: list) -> list:
|
||||
"""
|
||||
Build a list of IfcPolygonalFaceSet from scaled (x,y,z) vertices and face index groups.
|
||||
|
||||
Uses IfcCartesianPointList3D + IfcIndexedPolygonalFace for compact output.
|
||||
Vertices are deduplicated via snap grid so each unique position is stored once.
|
||||
|
||||
GEM111 fix: skip faces with near-duplicate vertices (snapped to same grid cell).
|
||||
|
||||
verts_scaled: flat list of already-scaled floats [x0,y0,z0, x1,y1,z1, ...]
|
||||
face_groups: list of index lists [[i,j,k], [i,j,k,l], ...]
|
||||
Returns: list of IfcPolygonalFaceSet (typically one, empty on failure).
|
||||
"""
|
||||
snap_to_idx = {} # snap_key → 0-based index in deduped_verts
|
||||
deduped_verts = [] # [[x, y, z], ...] — lists for direct IFC use
|
||||
inv_tol = _INV_TOL
|
||||
|
||||
# Validate faces and remap indices to deduplicated vertex list
|
||||
valid_faces = [] # list of (idx0+1, idx1+1, ...) tuples (1-based for IFC)
|
||||
for indices in face_groups:
|
||||
try:
|
||||
remapped = []
|
||||
seen_snaps = set()
|
||||
degenerate = False
|
||||
|
||||
for i in indices:
|
||||
i3 = i * 3
|
||||
x = verts_scaled[i3]
|
||||
y = verts_scaled[i3 + 1]
|
||||
z = verts_scaled[i3 + 2]
|
||||
key = (round(x * inv_tol), round(y * inv_tol), round(z * inv_tol))
|
||||
if key in seen_snaps:
|
||||
degenerate = True
|
||||
break
|
||||
seen_snaps.add(key)
|
||||
idx = snap_to_idx.get(key)
|
||||
if idx is None:
|
||||
idx = len(deduped_verts)
|
||||
snap_to_idx[key] = idx
|
||||
deduped_verts.append([x, y, z])
|
||||
remapped.append(idx + 1) # 1-based for IFC
|
||||
|
||||
if degenerate or len(remapped) < 3:
|
||||
continue
|
||||
valid_faces.append(remapped)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not valid_faces or not deduped_verts:
|
||||
return []
|
||||
|
||||
# Round vertex coordinates to reduce IFC text file size
|
||||
# 3 decimal places = 0.001mm precision (more than sufficient)
|
||||
for v in deduped_verts:
|
||||
v[0] = round(v[0], 3)
|
||||
v[1] = round(v[1], 3)
|
||||
v[2] = round(v[2], 3)
|
||||
|
||||
# Build IFC entities
|
||||
try:
|
||||
point_list = ifc.createIfcCartesianPointList3D(deduped_verts)
|
||||
ifc_faces = [
|
||||
ifc.createIfcIndexedPolygonalFace(fi) for fi in valid_faces
|
||||
]
|
||||
faceset = ifc.createIfcPolygonalFaceSet(point_list, None, ifc_faces, None)
|
||||
return [faceset]
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def unwrap_chunks(raw) -> list:
|
||||
"""
|
||||
Flatten a Speckle data array into a plain Python list of numbers.
|
||||
|
||||
Handles two cases:
|
||||
1. Already flat list of numbers (after specklepy receive deserializes)
|
||||
→ returned as-is (fast path)
|
||||
2. List of DataChunk objects (raw from server before deserialization)
|
||||
→ each chunk's .data list is concatenated
|
||||
"""
|
||||
if not raw:
|
||||
return []
|
||||
|
||||
# Fast path: if first item is a number, assume all items are numbers
|
||||
first = raw[0]
|
||||
if isinstance(first, (int, float)):
|
||||
return raw
|
||||
|
||||
# Slow path: DataChunk objects or mixed content
|
||||
result = []
|
||||
for item in raw:
|
||||
if item is None:
|
||||
continue
|
||||
if isinstance(item, (int, float)):
|
||||
result.append(item)
|
||||
continue
|
||||
speckle_type = getattr(item, "speckle_type", "") or ""
|
||||
if "DataChunk" in speckle_type:
|
||||
chunk_data = _get(item, "data") or _get(item, "@data")
|
||||
if chunk_data:
|
||||
result.extend(list(chunk_data))
|
||||
else:
|
||||
try:
|
||||
result.extend(list(item))
|
||||
except Exception:
|
||||
pass
|
||||
return result
|
||||
|
||||
|
||||
def _resolve_scale(obj, stream_scale: float) -> float:
|
||||
"""Resolve unit scale: obj.units → stream fallback."""
|
||||
units = _get(obj, "units")
|
||||
if units and isinstance(units, str):
|
||||
return _UNIT_SCALES.get(units.lower().strip(), stream_scale)
|
||||
return stream_scale
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Mesh extraction
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def _is_mesh(item) -> bool:
|
||||
"""
|
||||
Detect if a specklepy object is a Mesh.
|
||||
Uses speckle_type string — more reliable than hasattr on Base objects.
|
||||
"""
|
||||
if item is None:
|
||||
return False
|
||||
speckle_type = _get(item, "speckle_type") or ""
|
||||
if "Mesh" in speckle_type:
|
||||
return True
|
||||
# Fallback: has both vertices and faces data
|
||||
verts = _get(item, "vertices")
|
||||
faces = _get(item, "faces")
|
||||
return verts is not None and faces is not None
|
||||
|
||||
|
||||
def _collect_meshes_from_display(obj) -> list:
|
||||
"""
|
||||
Collect Mesh objects from an object's displayValue.
|
||||
If an item is not a Mesh (e.g. BrepX, Brep), recursively check
|
||||
its own displayValue for nested meshes.
|
||||
"""
|
||||
meshes = []
|
||||
for key in ["displayValue", "@displayValue", "_displayValue"]:
|
||||
display = _get(obj, key)
|
||||
if display is None:
|
||||
continue
|
||||
items = display if isinstance(display, list) else [display]
|
||||
for item in items:
|
||||
if item is None:
|
||||
continue
|
||||
if _is_mesh(item):
|
||||
meshes.append(item)
|
||||
else:
|
||||
# BrepX / Brep / other geometry types may carry a nested
|
||||
# displayValue with the tessellated mesh representation
|
||||
meshes.extend(_collect_meshes_from_display(item))
|
||||
if meshes:
|
||||
break
|
||||
return meshes
|
||||
|
||||
|
||||
def get_display_meshes(obj: Base) -> list:
|
||||
"""
|
||||
Extract all Mesh objects from a DataObject's displayValue.
|
||||
Handles nested geometry types (BrepX, Brep) that wrap meshes
|
||||
inside their own displayValue.
|
||||
"""
|
||||
meshes = _collect_meshes_from_display(obj)
|
||||
|
||||
# Fallback: object itself is a Mesh
|
||||
if not meshes and _is_mesh(obj):
|
||||
speckle_type = _get(obj, "speckle_type") or ""
|
||||
if "Mesh" in speckle_type:
|
||||
meshes.append(obj)
|
||||
|
||||
return meshes
|
||||
|
||||
|
||||
def get_display_instances(obj: Base) -> list:
|
||||
"""
|
||||
Extract InstanceProxy objects from a DataObject's displayValue.
|
||||
|
||||
Per the official speckleifc converter, every IFC element's displayValue
|
||||
contains InstanceProxy objects (not raw meshes). Each InstanceProxy has:
|
||||
- transform: 16-float row-major matrix, translation in metres
|
||||
- definitionId: "DEFINITION:{meshAppId}" string
|
||||
- units: "m"
|
||||
|
||||
Raw meshes do NOT appear in displayValue in IFC→Speckle exports.
|
||||
"""
|
||||
instances = []
|
||||
for key in ["displayValue", "@displayValue", "_displayValue"]:
|
||||
display = _get(obj, key)
|
||||
if display is None:
|
||||
continue
|
||||
items = display if isinstance(display, list) else [display]
|
||||
for item in items:
|
||||
if item is None:
|
||||
continue
|
||||
transform = _get(item, "transform")
|
||||
definition_id = _get(item, "definitionId")
|
||||
if transform is not None and definition_id is not None:
|
||||
instances.append(item)
|
||||
if instances:
|
||||
break
|
||||
return instances
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Face decoding
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def decode_faces(faces_raw: list) -> list:
|
||||
"""
|
||||
Decode Speckle's run-length encoded face list into vertex index groups.
|
||||
Format: [n, i0, i1, ..., n, i0, i1, ...]
|
||||
n=0 → triangle (legacy), n=1 → quad (legacy), n≥3 → n-gon
|
||||
"""
|
||||
decoded = []
|
||||
i = 0
|
||||
total = len(faces_raw)
|
||||
# Check if values are already ints (common after unwrap_chunks)
|
||||
already_int = total > 0 and isinstance(faces_raw[0], int)
|
||||
while i < total:
|
||||
n = faces_raw[i] if already_int else int(faces_raw[i])
|
||||
if n == 0:
|
||||
n = 3
|
||||
elif n == 1:
|
||||
n = 4
|
||||
end = i + 1 + n
|
||||
if end > total:
|
||||
break
|
||||
if already_int:
|
||||
decoded.append(faces_raw[i + 1:end])
|
||||
else:
|
||||
decoded.append([int(v) for v in faces_raw[i + 1:end]])
|
||||
i = end
|
||||
return decoded
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Bounding box + placement
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def compute_origin(flat_verts: list) -> tuple:
|
||||
"""
|
||||
Compute placement origin from scaled vertex list (mm).
|
||||
X, Y = bounding box centroid
|
||||
Z = minimum Z (bottom face of element — more natural for IFC)
|
||||
Single-pass to avoid creating 3 sliced copies of a large list.
|
||||
"""
|
||||
x0 = flat_verts[0]
|
||||
y0 = flat_verts[1]
|
||||
z0 = flat_verts[2]
|
||||
xmin = xmax = x0
|
||||
ymin = ymax = y0
|
||||
zmin = z0
|
||||
for i in range(3, len(flat_verts) - 2, 3):
|
||||
x = flat_verts[i]
|
||||
y = flat_verts[i + 1]
|
||||
z = flat_verts[i + 2]
|
||||
if x < xmin:
|
||||
xmin = x
|
||||
elif x > xmax:
|
||||
xmax = x
|
||||
if y < ymin:
|
||||
ymin = y
|
||||
elif y > ymax:
|
||||
ymax = y
|
||||
if z < zmin:
|
||||
zmin = z
|
||||
return (xmin + xmax) / 2.0, (ymin + ymax) / 2.0, zmin
|
||||
|
||||
|
||||
# Cache for shared IFC direction/point entities (keyed by ifc file id)
|
||||
_shared_entities: dict[int, dict] = {}
|
||||
|
||||
|
||||
def _get_shared(ifc):
|
||||
"""Return (or create) shared IfcDirection and IfcCartesianPoint entities for this file."""
|
||||
fid = id(ifc)
|
||||
if fid not in _shared_entities:
|
||||
_shared_entities[fid] = {
|
||||
"z_axis": ifc.createIfcDirection([0.0, 0.0, 1.0]),
|
||||
"x_axis": ifc.createIfcDirection([1.0, 0.0, 0.0]),
|
||||
"origin_0": ifc.createIfcCartesianPoint([0.0, 0.0, 0.0]),
|
||||
}
|
||||
return _shared_entities[fid]
|
||||
|
||||
|
||||
def _make_placement(ifc, x: float, y: float, z: float):
|
||||
"""Create an IfcLocalPlacement at absolute world coordinates (mm)."""
|
||||
shared = _get_shared(ifc)
|
||||
origin = ifc.createIfcCartesianPoint([round(x, 3), round(y, 3), round(z, 3)])
|
||||
a2p = ifc.createIfcAxis2Placement3D(origin, shared["z_axis"], shared["x_axis"])
|
||||
return ifc.createIfcLocalPlacement(PlacementRelTo=None, RelativePlacement=a2p)
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Main conversion
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def mesh_to_ifc(
|
||||
ifc: ifcopenshell.file,
|
||||
body_context,
|
||||
obj: Base,
|
||||
scale: float = 0.001,
|
||||
material_manager=None,
|
||||
) -> tuple:
|
||||
"""
|
||||
Convert a Speckle DataObject → (IfcShapeRepresentation, IfcLocalPlacement).
|
||||
Creates one IfcPolygonalFaceSet per mesh so each can carry its own material style.
|
||||
Returns (None, None) if no usable geometry is found.
|
||||
"""
|
||||
meshes = get_display_meshes(obj)
|
||||
if not meshes:
|
||||
return None, None
|
||||
|
||||
# Parent object's applicationId — used as fallback for material lookup
|
||||
# when inner meshes (e.g. from BrepX) don't have their own applicationId
|
||||
obj_app_id = _get(obj, "applicationId")
|
||||
|
||||
obj_scale = _resolve_scale(obj, scale)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Pass 1: unpack and scale vertices once per mesh, compute origin
|
||||
# incrementally without accumulating all vertices in memory.
|
||||
# ------------------------------------------------------------------ #
|
||||
mesh_cache = [] # [scaled_verts_list] or None per mesh
|
||||
xmin = ymin = zmin = float("inf")
|
||||
xmax = ymax = float("-inf")
|
||||
has_verts = False
|
||||
|
||||
for mesh in meshes:
|
||||
raw_verts = _get(mesh, "vertices") or []
|
||||
verts = unwrap_chunks(raw_verts if isinstance(raw_verts, list) else list(raw_verts))
|
||||
if not verts:
|
||||
mesh_cache.append(None)
|
||||
continue
|
||||
ms = _resolve_scale(mesh, obj_scale)
|
||||
scaled = [float(v) * ms for v in verts]
|
||||
mesh_cache.append(scaled)
|
||||
has_verts = True
|
||||
|
||||
# Update bounding box from this mesh's scaled vertices
|
||||
for i in range(0, len(scaled) - 2, 3):
|
||||
x, y, z = scaled[i], scaled[i + 1], scaled[i + 2]
|
||||
if x < xmin: xmin = x
|
||||
if x > xmax: xmax = x
|
||||
if y < ymin: ymin = y
|
||||
if y > ymax: ymax = y
|
||||
if z < zmin: zmin = z
|
||||
|
||||
if not has_verts:
|
||||
return None, None
|
||||
|
||||
ox = (xmin + xmax) / 2.0
|
||||
oy = (ymin + ymax) / 2.0
|
||||
oz = zmin
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Pass 2: one faceset per mesh — reuse cached verts, only unpack faces
|
||||
# ------------------------------------------------------------------ #
|
||||
geom_items = []
|
||||
|
||||
for mesh, scaled in zip(meshes, mesh_cache):
|
||||
if scaled is None:
|
||||
continue
|
||||
raw_faces = _get(mesh, "faces") or []
|
||||
faces_raw = unwrap_chunks(raw_faces if isinstance(raw_faces, list) else list(raw_faces))
|
||||
|
||||
if not faces_raw:
|
||||
continue
|
||||
|
||||
try:
|
||||
face_groups = decode_faces(faces_raw)
|
||||
except Exception as e:
|
||||
print(f" Warning: Face decode error: {e}")
|
||||
continue
|
||||
|
||||
# Offset pre-scaled vertices relative to origin (flat list, no tuples)
|
||||
n = len(scaled)
|
||||
verts_scaled = [0.0] * n
|
||||
for vi in range(0, n, 3):
|
||||
verts_scaled[vi] = scaled[vi] - ox
|
||||
verts_scaled[vi + 1] = scaled[vi + 1] - oy
|
||||
verts_scaled[vi + 2] = scaled[vi + 2] - oz
|
||||
|
||||
mesh_facesets = build_ifc_facesets(ifc, verts_scaled, face_groups)
|
||||
|
||||
if not mesh_facesets:
|
||||
continue
|
||||
|
||||
# Apply material style to every faceset of this mesh
|
||||
# Inner meshes (from BrepX) may lack applicationId — fall back to parent's
|
||||
if material_manager:
|
||||
mesh_app_id = _get(mesh, "applicationId") or obj_app_id
|
||||
if mesh_app_id:
|
||||
for fs in mesh_facesets:
|
||||
material_manager.apply_to_item(fs, str(mesh_app_id))
|
||||
|
||||
geom_items.extend(mesh_facesets)
|
||||
|
||||
if not geom_items:
|
||||
return None, None
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Assemble IfcShapeRepresentation + IfcLocalPlacement
|
||||
# ------------------------------------------------------------------ #
|
||||
rep = ifc.createIfcShapeRepresentation(
|
||||
ContextOfItems=body_context,
|
||||
RepresentationIdentifier="Body",
|
||||
RepresentationType="Tessellation",
|
||||
Items=geom_items,
|
||||
)
|
||||
placement = _make_placement(ifc, ox, oy, oz)
|
||||
|
||||
return rep, placement
|
||||
@@ -1,38 +0,0 @@
|
||||
# =============================================================================
|
||||
# helpers.py
|
||||
# Shared utilities used across the exporter modules.
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def _get(obj, key, default=None):
|
||||
"""
|
||||
Safe access for specklepy Base objects, dicts, or any hybrid.
|
||||
Tries attribute access first, then bracket access.
|
||||
"""
|
||||
if obj is None:
|
||||
return default
|
||||
if isinstance(obj, dict):
|
||||
return obj.get(key, default)
|
||||
try:
|
||||
val = getattr(obj, key, None)
|
||||
if val is not None:
|
||||
return val
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
val = obj[key]
|
||||
if val is not None:
|
||||
return val
|
||||
except Exception:
|
||||
pass
|
||||
return default
|
||||
|
||||
|
||||
# Scale factors → MILLIMETRES (IFC file is declared as mm)
|
||||
MM_SCALES = {
|
||||
"mm": 1.0, "millimeter": 1.0, "millimeters": 1.0,
|
||||
"cm": 10.0, "centimeter": 10.0, "centimeters": 10.0,
|
||||
"m": 1000.0, "meter": 1000.0, "meters": 1000.0,
|
||||
"ft": 304.8, "foot": 304.8, "feet": 304.8,
|
||||
"in": 25.4, "inch": 25.4, "inches": 25.4,
|
||||
}
|
||||
@@ -1,640 +0,0 @@
|
||||
# =============================================================================
|
||||
# instances.py
|
||||
# Handles Speckle InstanceProxy objects from both:
|
||||
#
|
||||
# FORMAT A — Revit connector (our actual use case):
|
||||
# _units = "mm"
|
||||
# transform = 16 floats, row-major, translation in MM
|
||||
# definitionId = 64-char uppercase hex hash (matches object id[:32] in tree)
|
||||
# The definition object lives somewhere in the object tree.
|
||||
#
|
||||
# FORMAT B — speckleifc IFC→Speckle converter:
|
||||
# units = "m"
|
||||
# transform = 16 floats, row-major, translation in METRES
|
||||
# definitionId = "DEFINITION:{meshAppId}"
|
||||
# Definition geometry lives in root → Collection("definitionGeometry")
|
||||
#
|
||||
# We detect the format by the definitionId prefix.
|
||||
#
|
||||
# Performance: uses IfcRepresentationMap + IfcMappedItem so that all instances
|
||||
# sharing the same definition reference a single copy of the geometry.
|
||||
# =============================================================================
|
||||
|
||||
import hashlib
|
||||
import math
|
||||
import struct
|
||||
import ifcopenshell.api
|
||||
from specklepy.objects.base import Base
|
||||
from utils.helpers import _get, MM_SCALES
|
||||
from utils.geometry import unwrap_chunks, decode_faces, build_ifc_facesets, _get_shared, _is_mesh
|
||||
from utils.curves import is_curve, build_curve_rep_map
|
||||
|
||||
|
||||
def is_instance(obj) -> bool:
|
||||
"""Returns True if this object is a Speckle InstanceProxy."""
|
||||
return _get(obj, "transform") is not None and _get(obj, "definitionId") is not None
|
||||
|
||||
|
||||
def _is_ifc_format(definition_id: str) -> bool:
|
||||
"""True if this is speckleifc format (definitionId starts with 'DEFINITION:')."""
|
||||
return definition_id.startswith("DEFINITION:")
|
||||
|
||||
|
||||
def build_definition_map(root: Base) -> dict:
|
||||
"""
|
||||
Build a unified definition map that handles both formats.
|
||||
|
||||
Returns dict with keys:
|
||||
"by_id" : {obj_id_lower[:32] → object} for Revit format
|
||||
"by_app_id" : {applicationId_lower → object} for Revit format
|
||||
"ifc_proxies" : {"DEFINITION:xxx" → proxy} for IFC format
|
||||
"ifc_meshes" : {meshAppId → Mesh} for IFC format
|
||||
"definition_sources": set of applicationId (lowercase) that are definition
|
||||
geometry sources — these should be skipped during export
|
||||
"""
|
||||
by_id = {}
|
||||
by_app_id = {}
|
||||
ifc_proxies = {}
|
||||
ifc_meshes = {}
|
||||
definition_sources = set() # applicationIds used as definition geometry (skip during export)
|
||||
|
||||
# --- Walk entire tree for Revit format ---
|
||||
_collect_all(root, by_id, by_app_id, depth=0)
|
||||
|
||||
# --- Extract speckleifc structures for IFC format ---
|
||||
proxies_raw = _get(root, "instanceDefinitionProxies")
|
||||
if proxies_raw:
|
||||
for proxy in (proxies_raw if isinstance(proxies_raw, list) else [proxies_raw]):
|
||||
app_id = _get(proxy, "applicationId")
|
||||
if app_id:
|
||||
ifc_proxies[app_id] = proxy # original case (for IFC format)
|
||||
ifc_proxies[app_id.lower()] = proxy # lowercase (for Revit format)
|
||||
# Collect all objects referenced by this proxy as definition sources
|
||||
object_ids = _get(proxy, "objects") or []
|
||||
for oid in (object_ids if isinstance(object_ids, list) else [object_ids]):
|
||||
if oid:
|
||||
definition_sources.add(str(oid).lower())
|
||||
|
||||
elements = _get(root, "elements") or _get(root, "@elements") or []
|
||||
for child in (elements if isinstance(elements, list) else []):
|
||||
if (_get(child, "name") or "") == "definitionGeometry":
|
||||
geom_elements = _get(child, "elements") or _get(child, "@elements") or []
|
||||
for mesh in (geom_elements if isinstance(geom_elements, list) else []):
|
||||
mesh_app_id = _get(mesh, "applicationId")
|
||||
if mesh_app_id:
|
||||
ifc_meshes[mesh_app_id] = mesh
|
||||
|
||||
print(f" Objects indexed by id: {len(by_id)}")
|
||||
print(f" Objects indexed by appId: {len(by_app_id)}")
|
||||
print(f" IFC definition proxies: {len(ifc_proxies)}")
|
||||
print(f" IFC definition meshes: {len(ifc_meshes)}")
|
||||
print(f" Definition sources: {len(definition_sources)}")
|
||||
|
||||
return {
|
||||
"by_id": by_id,
|
||||
"by_app_id": by_app_id,
|
||||
"ifc_proxies": ifc_proxies,
|
||||
"ifc_meshes": ifc_meshes,
|
||||
"definition_sources": definition_sources,
|
||||
}
|
||||
|
||||
|
||||
def _collect_all(obj, by_id: dict, by_app_id: dict, depth: int):
|
||||
if obj is None or depth > 25:
|
||||
return
|
||||
|
||||
obj_id = _get(obj, "id")
|
||||
if obj_id and isinstance(obj_id, str):
|
||||
key = obj_id.lower()
|
||||
by_id[key] = obj
|
||||
# Also store truncated — definitionId (64 chars) matches id (32 chars)
|
||||
if len(key) == 32:
|
||||
by_id[key] = obj
|
||||
elif len(key) > 32:
|
||||
by_id[key[:32]] = obj
|
||||
|
||||
app_id = _get(obj, "applicationId")
|
||||
if app_id and isinstance(app_id, str):
|
||||
by_app_id[app_id.lower()] = obj
|
||||
|
||||
for key in ["elements", "@elements", "_elements",
|
||||
"displayValue", "@displayValue", "_displayValue",
|
||||
"objects", "@objects", "definition", "@definition"]:
|
||||
try:
|
||||
children = obj[key]
|
||||
if children is None:
|
||||
continue
|
||||
if not isinstance(children, list):
|
||||
children = [children]
|
||||
for child in children:
|
||||
_collect_all(child, by_id, by_app_id, depth + 1)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
|
||||
def _get_definition_source_object(definition_id: str, definition_map: dict):
|
||||
"""Resolve the first source object referenced by a definition proxy."""
|
||||
ifc_proxies = definition_map.get("ifc_proxies", {})
|
||||
proxy = ifc_proxies.get(definition_id) or ifc_proxies.get(definition_id.lower())
|
||||
if proxy is None:
|
||||
return None
|
||||
object_ids = _get(proxy, "objects") or []
|
||||
if not isinstance(object_ids, list):
|
||||
object_ids = list(object_ids)
|
||||
if not object_ids:
|
||||
return None
|
||||
by_app_id = definition_map.get("by_app_id", {})
|
||||
return by_app_id.get(str(object_ids[0]).lower())
|
||||
|
||||
|
||||
def _get_revit_meshes(definition_id: str, definition_map: dict) -> tuple:
|
||||
"""
|
||||
Revit format:
|
||||
definitionId (64-char hex) → InstanceDefinitionProxy.applicationId
|
||||
proxy.objects[0] is a UUID applicationId → find mesh by applicationId
|
||||
|
||||
Returns (meshes, app_ids) where app_ids are all applicationIds encountered
|
||||
in the resolution chain (definition objects, geometry objects) for material fallback.
|
||||
"""
|
||||
from utils.geometry import get_display_meshes
|
||||
|
||||
# Step 1: find the InstanceDefinitionProxy by its applicationId (case-insensitive)
|
||||
ifc_proxies = definition_map.get("ifc_proxies", {})
|
||||
proxy = ifc_proxies.get(definition_id) or ifc_proxies.get(definition_id.lower())
|
||||
if proxy is None:
|
||||
return [], []
|
||||
|
||||
# Step 2: get the mesh applicationIds from proxy.objects
|
||||
object_ids = _get(proxy, "objects") or []
|
||||
if not isinstance(object_ids, list):
|
||||
object_ids = list(object_ids)
|
||||
|
||||
# Step 3: look up each mesh by applicationId, collecting all encountered app IDs
|
||||
by_app_id = definition_map.get("by_app_id", {})
|
||||
meshes = []
|
||||
encountered_app_ids = []
|
||||
for oid in object_ids:
|
||||
obj = by_app_id.get(str(oid).lower())
|
||||
if obj is not None:
|
||||
# Collect this object's applicationId
|
||||
obj_aid = _get(obj, "applicationId")
|
||||
if obj_aid:
|
||||
encountered_app_ids.append(str(obj_aid))
|
||||
# Also collect applicationIds from displayValue items (BrepX, etc.)
|
||||
for key in ["displayValue", "@displayValue", "_displayValue"]:
|
||||
display = _get(obj, key)
|
||||
if display:
|
||||
items = display if isinstance(display, list) else [display]
|
||||
for item in items:
|
||||
item_aid = _get(item, "applicationId")
|
||||
if item_aid:
|
||||
encountered_app_ids.append(str(item_aid))
|
||||
break
|
||||
# The found object may itself be a mesh, or contain displayValue meshes
|
||||
found_meshes = get_display_meshes(obj)
|
||||
if found_meshes:
|
||||
meshes.extend(found_meshes)
|
||||
elif _is_mesh(obj):
|
||||
# Object itself is a mesh (no displayValue wrapping)
|
||||
meshes.append(obj)
|
||||
return meshes, encountered_app_ids
|
||||
|
||||
|
||||
def _get_ifc_meshes(definition_id: str, definition_map: dict) -> tuple:
|
||||
"""
|
||||
IFC format: definitionId = "DEFINITION:224058_mat0"
|
||||
Look up proxy → objects list → meshes from ifc_meshes dict.
|
||||
Returns (meshes, []) — no extra app_ids needed, mesh applicationIds match directly.
|
||||
"""
|
||||
ifc_proxies = definition_map.get("ifc_proxies", {})
|
||||
ifc_meshes = definition_map.get("ifc_meshes", {})
|
||||
|
||||
proxy = ifc_proxies.get(definition_id)
|
||||
if proxy is None:
|
||||
return [], []
|
||||
|
||||
object_ids = _get(proxy, "objects") or []
|
||||
result = []
|
||||
for oid in (object_ids if isinstance(object_ids, list) else [object_ids]):
|
||||
mesh = ifc_meshes.get(str(oid))
|
||||
if mesh is not None:
|
||||
result.append(mesh)
|
||||
return result, []
|
||||
|
||||
|
||||
def _resolve_instance_scale(obj, stream_scale: float) -> float:
|
||||
"""
|
||||
Resolve scale for the transform translation.
|
||||
Tries bracket access for '_units' (Revit uses underscore).
|
||||
IFC format instances have units="m" → scale=1.0 (no scaling).
|
||||
"""
|
||||
for key in ["units", "_units"]:
|
||||
try:
|
||||
units = obj[key]
|
||||
if units and isinstance(units, str):
|
||||
s = MM_SCALES.get(units.lower().strip())
|
||||
if s is not None:
|
||||
return s
|
||||
except Exception:
|
||||
pass
|
||||
return stream_scale
|
||||
|
||||
|
||||
# Stats
|
||||
_stats = {"found": 0, "not_found": 0}
|
||||
|
||||
# Cache: mesh id → (verts_scaled, face_groups) to avoid re-unpacking
|
||||
# AND re-scaling the same definition mesh across many instances that share it.
|
||||
_mesh_data_cache: dict = {}
|
||||
|
||||
# Cache: definition_id → IfcRepresentationMap (or None if no geometry)
|
||||
# All instances sharing the same definition reuse one geometry copy.
|
||||
_rep_map_cache: dict = {}
|
||||
|
||||
# Cache: geometry content hash → IfcRepresentationMap
|
||||
# Enables sharing across different definitionIds that have identical geometry.
|
||||
_geometry_hash_cache: dict = {}
|
||||
|
||||
# Shared identity placement for all instances (keyed by ifc file id)
|
||||
_identity_placement_cache: dict[int, object] = {}
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Geometry content hashing
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def _hash_mesh_data(mesh_data_list: list, material_key: str = "") -> str:
|
||||
"""Compute a content hash from mesh geometry data for deduplication.
|
||||
|
||||
mesh_data_list: list of (verts_local, face_groups) tuples
|
||||
material_key: string identifying the material (included in hash)
|
||||
Returns: hex digest string
|
||||
"""
|
||||
h = hashlib.md5(usedforsecurity=False)
|
||||
for verts_local, face_groups in mesh_data_list:
|
||||
# Hash rounded vertices as packed floats (faster than str conversion)
|
||||
for i in range(0, len(verts_local), 3):
|
||||
h.update(struct.pack("3f",
|
||||
round(verts_local[i], 3),
|
||||
round(verts_local[i+1], 3),
|
||||
round(verts_local[i+2], 3),
|
||||
))
|
||||
# Hash face indices
|
||||
for face in face_groups:
|
||||
h.update(struct.pack(f"{len(face)}i", *face))
|
||||
# Separator between meshes
|
||||
h.update(b"|")
|
||||
if material_key:
|
||||
h.update(material_key.encode())
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# IfcRepresentationMap builder — geometry created once per definition
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def _collect_mesh_data(meshes: list, ifc_format: bool) -> list:
|
||||
"""Unpack, scale, and cache mesh vertex/face data.
|
||||
|
||||
Returns list of (mesh_obj, verts_local, face_groups) tuples.
|
||||
"""
|
||||
result = []
|
||||
for mesh in meshes:
|
||||
mesh_id = _get(mesh, "id") or _get(mesh, "applicationId")
|
||||
if mesh_id and mesh_id in _mesh_data_cache:
|
||||
verts_local, face_groups = _mesh_data_cache[mesh_id]
|
||||
else:
|
||||
raw_verts = _get(mesh, "vertices") or []
|
||||
raw_faces = _get(mesh, "faces") or []
|
||||
verts = unwrap_chunks(raw_verts if isinstance(raw_verts, list) else list(raw_verts))
|
||||
faces_raw = unwrap_chunks(raw_faces if isinstance(raw_faces, list) else list(raw_faces))
|
||||
if not verts or not faces_raw:
|
||||
continue
|
||||
|
||||
mesh_units = _get(mesh, "units") or _get(mesh, "_units") or ("m" if ifc_format else "mm")
|
||||
ms = MM_SCALES.get(mesh_units.lower().strip(), 1.0)
|
||||
|
||||
try:
|
||||
face_groups = decode_faces(faces_raw)
|
||||
except Exception as e:
|
||||
print(f" Warning: Instance face decode: {e}")
|
||||
continue
|
||||
|
||||
verts_local = [float(v) * ms for v in verts]
|
||||
|
||||
if mesh_id:
|
||||
_mesh_data_cache[mesh_id] = (verts_local, face_groups)
|
||||
|
||||
result.append((mesh, verts_local, face_groups))
|
||||
return result
|
||||
|
||||
|
||||
def _resolve_material_key(meshes_data: list, material_manager, fallback_app_ids, definition_id) -> str:
|
||||
"""Build a material cache key string for geometry hashing."""
|
||||
if not material_manager:
|
||||
return ""
|
||||
parts = []
|
||||
for mesh, _, _ in meshes_data:
|
||||
mesh_app_id = _get(mesh, "applicationId")
|
||||
style = material_manager.get_style_with_fallbacks(
|
||||
primary_app_id=str(mesh_app_id) if mesh_app_id else None,
|
||||
fallback_app_ids=fallback_app_ids,
|
||||
definition_id=definition_id,
|
||||
)
|
||||
parts.append(str(id(style)) if style else "")
|
||||
return "|".join(parts)
|
||||
|
||||
|
||||
def _build_rep_map(ifc, body_context, meshes: list, ifc_format: bool,
|
||||
material_manager=None, fallback_app_ids: list = None,
|
||||
definition_id: str = None):
|
||||
"""
|
||||
Build an IfcRepresentationMap from definition meshes.
|
||||
Uses content-based hashing to reuse identical geometry across different
|
||||
definitionIds. Returns IfcRepresentationMap or None if no valid geometry.
|
||||
"""
|
||||
# Step 1: Collect and cache raw mesh data (no IFC entities created yet)
|
||||
meshes_data = _collect_mesh_data(meshes, ifc_format)
|
||||
if not meshes_data:
|
||||
return None
|
||||
|
||||
# Step 2: Compute content hash to check for identical geometry
|
||||
mat_key = _resolve_material_key(meshes_data, material_manager, fallback_app_ids, definition_id)
|
||||
geom_hash = _hash_mesh_data(
|
||||
[(verts, faces) for _, verts, faces in meshes_data],
|
||||
material_key=mat_key,
|
||||
)
|
||||
|
||||
if geom_hash in _geometry_hash_cache:
|
||||
return _geometry_hash_cache[geom_hash]
|
||||
|
||||
# Step 3: No match — build IFC geometry entities
|
||||
geom_items = []
|
||||
|
||||
for mesh, verts_local, face_groups in meshes_data:
|
||||
mesh_facesets = build_ifc_facesets(ifc, verts_local, face_groups)
|
||||
if not mesh_facesets:
|
||||
continue
|
||||
|
||||
if material_manager:
|
||||
mesh_app_id = _get(mesh, "applicationId")
|
||||
style = material_manager.get_style_with_fallbacks(
|
||||
primary_app_id=str(mesh_app_id) if mesh_app_id else None,
|
||||
fallback_app_ids=fallback_app_ids,
|
||||
definition_id=definition_id,
|
||||
)
|
||||
if style:
|
||||
for fs in mesh_facesets:
|
||||
try:
|
||||
ifcopenshell.api.run(
|
||||
"style.assign_item_style", ifc,
|
||||
item=fs, style=style,
|
||||
)
|
||||
material_manager._apply_count += 1
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
geom_items.extend(mesh_facesets)
|
||||
|
||||
if not geom_items:
|
||||
_geometry_hash_cache[geom_hash] = None
|
||||
return None
|
||||
|
||||
shared = _get_shared(ifc)
|
||||
a2p = ifc.createIfcAxis2Placement3D(shared["origin_0"], None, None)
|
||||
|
||||
mapped_rep = ifc.createIfcShapeRepresentation(
|
||||
ContextOfItems=body_context,
|
||||
RepresentationIdentifier="Body",
|
||||
RepresentationType="Tessellation",
|
||||
Items=geom_items,
|
||||
)
|
||||
|
||||
rep_map = ifc.createIfcRepresentationMap(a2p, mapped_rep)
|
||||
_geometry_hash_cache[geom_hash] = rep_map
|
||||
return rep_map
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Transform → IfcCartesianTransformationOperator3D
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def _vec_magnitude(x, y, z):
|
||||
return math.sqrt(x*x + y*y + z*z)
|
||||
|
||||
|
||||
# Cache: rounded direction tuple → IfcDirection entity (keyed by ifc file id)
|
||||
_direction_cache: dict[int, dict] = {}
|
||||
|
||||
def _get_or_create_direction(ifc, dx, dy, dz):
|
||||
"""Return a cached IfcDirection or create and cache a new one."""
|
||||
fid = id(ifc)
|
||||
if fid not in _direction_cache:
|
||||
_direction_cache[fid] = {}
|
||||
cache = _direction_cache[fid]
|
||||
# Round to 6 decimals — sufficient for unit vectors
|
||||
key = (round(dx, 6), round(dy, 6), round(dz, 6))
|
||||
if key not in cache:
|
||||
cache[key] = ifc.createIfcDirection([key[0], key[1], key[2]])
|
||||
return cache[key]
|
||||
|
||||
|
||||
def _make_transform_operator(ifc, t: list, ts: float):
|
||||
"""
|
||||
Convert a row-major 4x4 matrix + translation scale into an
|
||||
IfcCartesianTransformationOperator3DnonUniform.
|
||||
|
||||
t: 16 floats, row-major [r00,r01,r02,tx, r10,r11,r12,ty, r20,r21,r22,tz, 0,0,0,1]
|
||||
ts: scale factor for translation components (e.g. 1000.0 for m→mm)
|
||||
|
||||
IfcCartesianTransformationOperator axes represent the COLUMNS of M:
|
||||
Axis1 = column 0 = where local X maps → (t[0], t[4], t[8])
|
||||
Axis2 = column 1 = where local Y maps → (t[1], t[5], t[9])
|
||||
Axis3 = column 2 = where local Z maps → (t[2], t[6], t[10])
|
||||
|
||||
Always uses the non-uniform variant with explicit Axis3 to ensure
|
||||
correct orientation for all transform types (mirrors, non-orthogonal, etc.).
|
||||
|
||||
Returns the IFC entity, or None if the transform is degenerate.
|
||||
"""
|
||||
# Extract COLUMNS of the 3x3 rotation/scale sub-matrix
|
||||
ax1 = (float(t[0]), float(t[4]), float(t[8]))
|
||||
ax2 = (float(t[1]), float(t[5]), float(t[9]))
|
||||
ax3 = (float(t[2]), float(t[6]), float(t[10]))
|
||||
|
||||
s1 = _vec_magnitude(*ax1)
|
||||
s2 = _vec_magnitude(*ax2)
|
||||
s3 = _vec_magnitude(*ax3)
|
||||
|
||||
if s1 < 1e-10 or s2 < 1e-10 or s3 < 1e-10:
|
||||
return None # degenerate transform
|
||||
|
||||
# Normalized direction vectors — reuse cached IfcDirection entities
|
||||
d1 = _get_or_create_direction(ifc, ax1[0]/s1, ax1[1]/s1, ax1[2]/s1)
|
||||
d2 = _get_or_create_direction(ifc, ax2[0]/s2, ax2[1]/s2, ax2[2]/s2)
|
||||
d3 = _get_or_create_direction(ifc, ax3[0]/s3, ax3[1]/s3, ax3[2]/s3)
|
||||
|
||||
# Translation, scaled and rounded to mm
|
||||
tx = round(float(t[3]) * ts, 3)
|
||||
ty = round(float(t[7]) * ts, 3)
|
||||
tz = round(float(t[11]) * ts, 3)
|
||||
origin = ifc.createIfcCartesianPoint([tx, ty, tz])
|
||||
|
||||
# Round scales for cleaner output
|
||||
s1 = round(s1, 6)
|
||||
s2 = round(s2, 6)
|
||||
s3 = round(s3, 6)
|
||||
|
||||
return ifc.createIfcCartesianTransformationOperator3DnonUniform(
|
||||
d1, # Axis1
|
||||
d2, # Axis2
|
||||
origin, # LocalOrigin
|
||||
s1, # Scale
|
||||
d3, # Axis3 (explicit — never derived)
|
||||
s2, # Scale2
|
||||
s3, # Scale3
|
||||
)
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Main conversion — IfcMappedItem approach
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def instance_to_ifc(ifc, body_context, obj: Base, definition_map: dict,
|
||||
scale: float = 1.0, material_manager=None):
|
||||
"""
|
||||
Convert a Speckle InstanceProxy → (IfcShapeRepresentation, IfcLocalPlacement).
|
||||
|
||||
Strategy: create geometry once per definition as an IfcRepresentationMap,
|
||||
then reference it via IfcMappedItem + IfcCartesianTransformationOperator3D
|
||||
for each instance. This avoids duplicating geometry across instances.
|
||||
"""
|
||||
transform_raw = _get(obj, "transform")
|
||||
if not transform_raw:
|
||||
return None, None
|
||||
t = list(transform_raw)
|
||||
if len(t) != 16:
|
||||
return None, None
|
||||
|
||||
definition_id = _get(obj, "definitionId") or ""
|
||||
ifc_format = _is_ifc_format(definition_id)
|
||||
|
||||
# Translation scale: IFC format transform is in metres → convert to mm
|
||||
# Revit format transform is already in mm (same as IFC file units)
|
||||
ts = 1000.0 if ifc_format else _resolve_instance_scale(obj, scale)
|
||||
|
||||
# Identity placement (transform is encoded in the MappedItem) — shared across all instances
|
||||
fid = id(ifc)
|
||||
if fid not in _identity_placement_cache:
|
||||
shared = _get_shared(ifc)
|
||||
a2p = ifc.createIfcAxis2Placement3D(shared["origin_0"], None, None)
|
||||
_identity_placement_cache[fid] = ifc.createIfcLocalPlacement(PlacementRelTo=None, RelativePlacement=a2p)
|
||||
placement = _identity_placement_cache[fid]
|
||||
|
||||
# --- Get or build IfcRepresentationMap (cached per definition_id) ---
|
||||
if definition_id not in _rep_map_cache:
|
||||
if ifc_format:
|
||||
meshes, extra_app_ids = _get_ifc_meshes(definition_id, definition_map)
|
||||
else:
|
||||
meshes, extra_app_ids = _get_revit_meshes(definition_id, definition_map)
|
||||
|
||||
# Build fallback app_id list: instance's own + definition chain IDs
|
||||
instance_app_id = _get(obj, "applicationId")
|
||||
fallback_ids = []
|
||||
if instance_app_id:
|
||||
fallback_ids.append(str(instance_app_id))
|
||||
fallback_ids.extend(extra_app_ids)
|
||||
|
||||
rep_map_result = None
|
||||
if meshes:
|
||||
rep_map_result = _build_rep_map(
|
||||
ifc, body_context, meshes, ifc_format, material_manager,
|
||||
fallback_app_ids=fallback_ids,
|
||||
definition_id=definition_id,
|
||||
)
|
||||
|
||||
# If no mesh geometry produced, try curve geometry from the definition object
|
||||
if rep_map_result is None:
|
||||
curve_obj = _get_definition_source_object(definition_id, definition_map)
|
||||
if curve_obj and is_curve(curve_obj):
|
||||
curve_scale = _resolve_instance_scale(curve_obj, 1.0)
|
||||
rep_map_result = build_curve_rep_map(
|
||||
ifc, body_context, curve_obj, scale=curve_scale,
|
||||
material_manager=material_manager,
|
||||
fallback_app_ids=fallback_ids,
|
||||
definition_id=definition_id,
|
||||
)
|
||||
|
||||
_rep_map_cache[definition_id] = rep_map_result
|
||||
if rep_map_result is not None:
|
||||
_stats["found"] += 1
|
||||
else:
|
||||
_stats["not_found"] += 1
|
||||
else:
|
||||
# Track stats even for cached definitions
|
||||
if _rep_map_cache[definition_id] is not None:
|
||||
_stats["found"] += 1
|
||||
else:
|
||||
_stats["not_found"] += 1
|
||||
|
||||
rep_map = _rep_map_cache[definition_id]
|
||||
if rep_map is None:
|
||||
return None, placement
|
||||
|
||||
# --- Build transform operator from instance's 4x4 matrix ---
|
||||
transform_op = _make_transform_operator(ifc, t, ts)
|
||||
if transform_op is None:
|
||||
return None, placement
|
||||
|
||||
# --- Create IfcMappedItem referencing the shared geometry ---
|
||||
mapped_item = ifc.createIfcMappedItem(rep_map, transform_op)
|
||||
|
||||
rep = ifc.createIfcShapeRepresentation(
|
||||
ContextOfItems=body_context,
|
||||
RepresentationIdentifier="Body",
|
||||
RepresentationType="MappedRepresentation",
|
||||
Items=[mapped_item],
|
||||
)
|
||||
return rep, placement
|
||||
|
||||
|
||||
def get_definition_object(obj: Base, definition_map: dict):
|
||||
"""
|
||||
Resolve the definition's source object for an InstanceProxy.
|
||||
Returns the first object referenced by the definition proxy, which
|
||||
carries the proper category/type info. Returns None if not found.
|
||||
"""
|
||||
definition_id = _get(obj, "definitionId") or ""
|
||||
if not definition_id:
|
||||
return None
|
||||
return _get_definition_source_object(definition_id, definition_map)
|
||||
|
||||
|
||||
def is_definition_source(obj, definition_map: dict) -> bool:
|
||||
"""Return True if this object is a definition geometry source (should not be exported standalone)."""
|
||||
app_id = _get(obj, "applicationId")
|
||||
if not app_id:
|
||||
return False
|
||||
return str(app_id).lower() in definition_map.get("definition_sources", set())
|
||||
|
||||
|
||||
def print_instance_stats():
|
||||
total = _stats["found"] + _stats["not_found"]
|
||||
print(f" Instance resolution: {_stats['found']}/{total} definitions found")
|
||||
if _stats["not_found"] > 0:
|
||||
print(f" Warning: {_stats['not_found']} instances had no definition geometry")
|
||||
unique_defs = len(_rep_map_cache)
|
||||
unique_geom = len([v for v in _geometry_hash_cache.values() if v is not None])
|
||||
if unique_defs > unique_geom:
|
||||
print(f" Geometry dedup: {unique_defs} definitions -> {unique_geom} unique geometries")
|
||||
|
||||
|
||||
def reset_caches():
|
||||
"""Reset module-level caches (call at start of each export run)."""
|
||||
_mesh_data_cache.clear()
|
||||
_rep_map_cache.clear()
|
||||
_geometry_hash_cache.clear()
|
||||
_identity_placement_cache.clear()
|
||||
_direction_cache.clear()
|
||||
_stats["found"] = 0
|
||||
_stats["not_found"] = 0
|
||||
@@ -1,21 +0,0 @@
|
||||
# =============================================================================
|
||||
# mapper.py
|
||||
# Maps Speckle objects → IFC entity classes.
|
||||
#
|
||||
# Reads the IFC class from _properties.Attributes.type
|
||||
# Falls back to IfcBuildingElementProxy if not present.
|
||||
# =============================================================================
|
||||
|
||||
from utils.properties import get_attributes
|
||||
|
||||
|
||||
def classify(obj) -> str:
|
||||
"""
|
||||
Determine the IFC class for a Speckle object.
|
||||
Reads from _properties.Attributes.type. Falls back to IfcBuildingElementProxy.
|
||||
"""
|
||||
attrs = get_attributes(obj)
|
||||
ifc_type = attrs.get("type")
|
||||
if ifc_type and isinstance(ifc_type, str):
|
||||
return ifc_type.strip()
|
||||
return "IfcBuildingElementProxy"
|
||||
@@ -1,191 +0,0 @@
|
||||
# =============================================================================
|
||||
# materials.py
|
||||
# Reads renderMaterialProxies from the Speckle root object and applies
|
||||
# IfcSurfaceStyle colours to IFC geometry.
|
||||
#
|
||||
# Structure of renderMaterialProxies:
|
||||
# root.renderMaterialProxies = [
|
||||
# {
|
||||
# id: "636259b3..."
|
||||
# value: RenderMaterial {
|
||||
# name: "Glass"
|
||||
# diffuse: -16744256 ← ARGB packed int (A=255, R=0, G=128, B=192)
|
||||
# opacity: 0.1 ← 0=transparent, 1=opaque
|
||||
# }
|
||||
# objects: ["a1a6b0c2-...", "d5dd3127-...", ...] ← mesh applicationIds
|
||||
# },
|
||||
# ...
|
||||
# ]
|
||||
#
|
||||
# Usage:
|
||||
# mgr = MaterialManager(ifc, root)
|
||||
# mgr.apply_to_item(brep_item, mesh_app_id)
|
||||
# =============================================================================
|
||||
|
||||
import ifcopenshell
|
||||
import ifcopenshell.api
|
||||
from specklepy.objects.base import Base
|
||||
from utils.helpers import _get
|
||||
|
||||
|
||||
def _argb_to_rgb(argb_int: int) -> tuple[float, float, float]:
|
||||
"""Unpack a signed ARGB int to normalised (R, G, B) floats 0..1."""
|
||||
unsigned = argb_int & 0xFFFFFFFF
|
||||
r = ((unsigned >> 16) & 0xFF) / 255.0
|
||||
g = ((unsigned >> 8) & 0xFF) / 255.0
|
||||
b = (unsigned & 0xFF) / 255.0
|
||||
return r, g, b
|
||||
|
||||
|
||||
class MaterialManager:
|
||||
"""
|
||||
Builds a lookup from mesh applicationId → IfcSurfaceStyle,
|
||||
then applies styles to IFC geometry items.
|
||||
"""
|
||||
|
||||
def __init__(self, ifc: ifcopenshell.file, root: Base):
|
||||
self._ifc = ifc
|
||||
# mesh applicationId (lowercase) → IfcSurfaceStyle (populated lazily)
|
||||
self._style_map: dict[str, object] = {}
|
||||
# name → IfcSurfaceStyle (cache to avoid duplicates)
|
||||
self._style_cache: dict[str, object] = {}
|
||||
self._apply_count = 0
|
||||
self._miss_count = 0
|
||||
self._build(root)
|
||||
|
||||
def _build(self, root: Base):
|
||||
"""
|
||||
Parse renderMaterialProxies and store raw material data keyed by mesh applicationId.
|
||||
IFC styles are created lazily (only when actually assigned to geometry) to avoid
|
||||
orphaned IfcSurfaceStyle instances that would fail IFC105 validation.
|
||||
"""
|
||||
proxies = _get(root, "renderMaterialProxies") or []
|
||||
if not isinstance(proxies, list):
|
||||
proxies = list(proxies) if proxies else []
|
||||
|
||||
# mesh applicationId (lowercase) → (name, diffuse_argb, transparency)
|
||||
self._material_data: dict[str, tuple] = {}
|
||||
|
||||
for proxy in proxies:
|
||||
material = _get(proxy, "value")
|
||||
if material is None:
|
||||
continue
|
||||
name = _get(material, "name") or "Unnamed"
|
||||
diffuse = _get(material, "diffuse")
|
||||
opacity = _get(material, "opacity")
|
||||
if diffuse is None:
|
||||
continue
|
||||
opacity_val = float(opacity) if opacity is not None else 1.0
|
||||
transparency = max(0.0, min(1.0, 1.0 - opacity_val))
|
||||
|
||||
objects = _get(proxy, "objects") or []
|
||||
for app_id in (objects if isinstance(objects, list) else []):
|
||||
if app_id:
|
||||
self._material_data[str(app_id).lower()] = (name, int(diffuse), transparency)
|
||||
|
||||
print(f" Materials: {len(self._material_data)} mesh mappings (styles created on demand)")
|
||||
|
||||
def _get_or_create_style(self, name: str, diffuse_argb: int, transparency: float):
|
||||
"""Return cached style or create a new IfcSurfaceStyle."""
|
||||
cache_key = f"{name}|{diffuse_argb}|{transparency:.4f}"
|
||||
if cache_key in self._style_cache:
|
||||
return self._style_cache[cache_key]
|
||||
|
||||
r, g, b = _argb_to_rgb(diffuse_argb)
|
||||
style = ifcopenshell.api.run("style.add_style", self._ifc, name=name)
|
||||
ifcopenshell.api.run(
|
||||
"style.add_surface_style",
|
||||
self._ifc,
|
||||
style=style,
|
||||
ifc_class="IfcSurfaceStyleRendering",
|
||||
attributes={
|
||||
"SurfaceColour": {"Name": None, "Red": r, "Green": g, "Blue": b},
|
||||
"Transparency": transparency,
|
||||
"ReflectanceMethod": "NOTDEFINED",
|
||||
},
|
||||
)
|
||||
self._style_cache[cache_key] = style
|
||||
return style
|
||||
|
||||
def get_style(self, mesh_app_id: str):
|
||||
"""Return the IfcSurfaceStyle for a mesh applicationId (created on demand), or None."""
|
||||
key = str(mesh_app_id).lower()
|
||||
# Return already-created style if cached
|
||||
if key in self._style_map:
|
||||
return self._style_map[key]
|
||||
# Create style now only if this mesh has material data
|
||||
data = self._material_data.get(key)
|
||||
if data is None:
|
||||
return None
|
||||
name, diffuse, transparency = data
|
||||
style = self._get_or_create_style(name, diffuse, transparency)
|
||||
self._style_map[key] = style
|
||||
return style
|
||||
|
||||
def get_style_with_fallbacks(self, primary_app_id: str = None,
|
||||
fallback_app_ids: list = None,
|
||||
definition_id: str = None):
|
||||
"""3-tier style lookup: primary → fallbacks → definition mapping."""
|
||||
style = None
|
||||
if primary_app_id:
|
||||
style = self.get_style(primary_app_id)
|
||||
if not style and fallback_app_ids:
|
||||
for fid in fallback_app_ids:
|
||||
style = self.get_style(fid)
|
||||
if style:
|
||||
break
|
||||
if not style and definition_id:
|
||||
style = self.get_style_by_definition(definition_id)
|
||||
return style
|
||||
|
||||
def apply_to_item(self, item, mesh_app_id: str):
|
||||
"""Assign the material style to a single IFC geometry item (e.g. IfcPolygonalFaceSet)."""
|
||||
style = self.get_style(mesh_app_id)
|
||||
if style is None:
|
||||
self._miss_count += 1
|
||||
return
|
||||
try:
|
||||
ifcopenshell.api.run(
|
||||
"style.assign_item_style",
|
||||
self._ifc,
|
||||
item=item,
|
||||
style=style,
|
||||
)
|
||||
self._apply_count += 1
|
||||
except Exception as e:
|
||||
pass # Non-fatal — geometry still exports without colour
|
||||
|
||||
def build_definition_material_map(self, definition_map: dict):
|
||||
"""
|
||||
Build a mapping from definitionId → material data by resolving which
|
||||
InstanceProxy objects the material proxy references and finding their definitionId.
|
||||
This handles the case where renderMaterialProxies.objects references inner
|
||||
InstanceProxy applicationIds rather than the top-level element applicationIds.
|
||||
"""
|
||||
by_app_id = definition_map.get("by_app_id", {})
|
||||
self._definition_material: dict[str, tuple] = {} # definitionId → (name, diffuse, transparency)
|
||||
|
||||
for app_id_key, mat_data in self._material_data.items():
|
||||
obj = by_app_id.get(app_id_key)
|
||||
if obj is None:
|
||||
continue
|
||||
def_id = _get(obj, "definitionId")
|
||||
if def_id and isinstance(def_id, str):
|
||||
self._definition_material[def_id.lower()] = mat_data
|
||||
|
||||
if self._definition_material:
|
||||
print(f" Material definitionId mappings: {len(self._definition_material)}")
|
||||
|
||||
def get_style_by_definition(self, definition_id: str):
|
||||
"""Return IfcSurfaceStyle for a definitionId (created on demand), or None."""
|
||||
if not hasattr(self, '_definition_material'):
|
||||
return None
|
||||
key = str(definition_id).lower()
|
||||
data = self._definition_material.get(key)
|
||||
if data is None:
|
||||
return None
|
||||
name, diffuse, transparency = data
|
||||
return self._get_or_create_style(name, diffuse, transparency)
|
||||
|
||||
def print_stats(self):
|
||||
print(f" Materials applied: {self._apply_count}, missed: {self._miss_count}")
|
||||
@@ -1,540 +0,0 @@
|
||||
# =============================================================================
|
||||
# properties.py
|
||||
# Generically clones all properties from a Speckle object into IFC entities.
|
||||
#
|
||||
# Source structure (from _properties / properties):
|
||||
# Attributes → IFC element attributes (GlobalId, Name, Tag, etc.)
|
||||
# Property Sets → dict of {pset_name: {prop_name: value}}
|
||||
# Quantities → dict of {qto_name: {qty_name: {name, units, value}}}
|
||||
# Building Storey → string, used for storey assignment
|
||||
# Element Type Attributes → used by type_manager to create IfcTypeObject
|
||||
# Element Type Property Sets → psets written on the IfcTypeObject
|
||||
# =============================================================================
|
||||
|
||||
import ifcopenshell
|
||||
import ifcopenshell.api
|
||||
import ifcopenshell.guid
|
||||
from specklepy.objects.base import Base
|
||||
from utils.helpers import _get
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Safe access helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _to_dict(obj) -> dict:
|
||||
"""Convert a Speckle Base object or dict to a plain dict."""
|
||||
if obj is None:
|
||||
return {}
|
||||
if isinstance(obj, dict):
|
||||
return obj
|
||||
if hasattr(obj, "get_dynamic_member_names"):
|
||||
result = {}
|
||||
try:
|
||||
for n in obj.get_dynamic_member_names():
|
||||
try:
|
||||
result[n] = obj[n]
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
return result
|
||||
if hasattr(obj, "items"):
|
||||
try:
|
||||
return dict(obj.items())
|
||||
except Exception:
|
||||
pass
|
||||
return {}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Property extraction from Speckle object
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _unflatten_dot_keys(flat: dict) -> dict:
|
||||
"""Convert flat dot-notation keys into a nested dict.
|
||||
|
||||
Example:
|
||||
{"Attributes.Name": "X", "Quantities.BaseQuantities.Gross Weight.value": "15"}
|
||||
→ {"Attributes": {"Name": "X"}, "Quantities": {"BaseQuantities": {"Gross Weight": {"value": "15"}}}}
|
||||
|
||||
Keys without dots are kept as-is. If a dict already contains nested
|
||||
dicts (i.e. non-flat), it is returned unchanged.
|
||||
"""
|
||||
# Quick check: if any value is already a dict/Base, assume nested — skip
|
||||
if any(isinstance(v, dict) or hasattr(v, "get_dynamic_member_names") for v in flat.values()):
|
||||
return flat
|
||||
|
||||
nested: dict = {}
|
||||
for dotted_key, value in flat.items():
|
||||
parts = dotted_key.split(".")
|
||||
d = nested
|
||||
for part in parts[:-1]:
|
||||
d = d.setdefault(part, {})
|
||||
d[parts[-1]] = value
|
||||
return nested
|
||||
|
||||
|
||||
def get_properties(obj) -> dict:
|
||||
"""Get the _properties / properties dict from a Speckle object.
|
||||
|
||||
Handles both nested property dicts (ArchiCAD-style) and flat
|
||||
dot-notation dicts (Rhino/Speckle-style) by unflattening on the fly.
|
||||
"""
|
||||
for key in ["_properties", "properties", "@properties"]:
|
||||
val = _get(obj, key)
|
||||
if val is not None:
|
||||
props = _to_dict(val)
|
||||
return _unflatten_dot_keys(props) if props else props
|
||||
return {}
|
||||
|
||||
|
||||
def get_building_storey(obj) -> str:
|
||||
"""Extract Building Storey name from properties."""
|
||||
props = get_properties(obj)
|
||||
storey = props.get("Building Storey")
|
||||
if storey and isinstance(storey, str):
|
||||
return storey.strip()
|
||||
return "Unknown Storey"
|
||||
|
||||
|
||||
def get_attributes(obj) -> dict:
|
||||
"""Get Attributes dict from properties."""
|
||||
props = get_properties(obj)
|
||||
return _to_dict(props.get("Attributes")) or {}
|
||||
|
||||
|
||||
def get_element_name(obj) -> str:
|
||||
"""Get element name from Attributes, falling back to object name."""
|
||||
attrs = get_attributes(obj)
|
||||
name = attrs.get("Name")
|
||||
if name:
|
||||
return str(name)
|
||||
# Fallback to object-level name fields
|
||||
for key in ["name", "_name"]:
|
||||
val = _get(obj, key)
|
||||
if val and isinstance(val, str):
|
||||
return val
|
||||
return "unnamed"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# IFC value creation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_ifc_value_cache: dict[int, dict] = {}
|
||||
|
||||
|
||||
def _make_ifc_value(ifc, value):
|
||||
"""Create a cached IFC nominal value entity from a Python value, detecting type.
|
||||
|
||||
Identical values reuse the same IFC entity to reduce file size.
|
||||
"""
|
||||
fid = id(ifc)
|
||||
if fid not in _ifc_value_cache:
|
||||
_ifc_value_cache[fid] = {}
|
||||
cache = _ifc_value_cache[fid]
|
||||
|
||||
if isinstance(value, bool):
|
||||
cache_key = ("IfcBoolean", value)
|
||||
elif isinstance(value, int):
|
||||
cache_key = ("IfcInteger", value)
|
||||
elif isinstance(value, float):
|
||||
cache_key = ("IfcReal", value)
|
||||
elif isinstance(value, list):
|
||||
cache_key = ("IfcLabel", ", ".join(str(v) for v in value))
|
||||
else:
|
||||
cache_key = ("IfcLabel", str(value))
|
||||
|
||||
if cache_key not in cache:
|
||||
entity_type, wrapped = cache_key
|
||||
cache[cache_key] = ifc.create_entity(entity_type, wrappedValue=wrapped)
|
||||
return cache[cache_key]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Unit → IFC quantity type mapping
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_UNIT_QTY_MAP = {
|
||||
"millimetre": ("IfcQuantityLength", "LengthValue"),
|
||||
"millimeter": ("IfcQuantityLength", "LengthValue"),
|
||||
"centimetre": ("IfcQuantityLength", "LengthValue"),
|
||||
"centimeter": ("IfcQuantityLength", "LengthValue"),
|
||||
"metre": ("IfcQuantityLength", "LengthValue"),
|
||||
"meter": ("IfcQuantityLength", "LengthValue"),
|
||||
"foot": ("IfcQuantityLength", "LengthValue"),
|
||||
"feet": ("IfcQuantityLength", "LengthValue"),
|
||||
"inch": ("IfcQuantityLength", "LengthValue"),
|
||||
"square metre": ("IfcQuantityArea", "AreaValue"),
|
||||
"square meter": ("IfcQuantityArea", "AreaValue"),
|
||||
"square foot": ("IfcQuantityArea", "AreaValue"),
|
||||
"cubic metre": ("IfcQuantityVolume", "VolumeValue"),
|
||||
"cubic meter": ("IfcQuantityVolume", "VolumeValue"),
|
||||
"cubic foot": ("IfcQuantityVolume", "VolumeValue"),
|
||||
"kilogram": ("IfcQuantityWeight", "WeightValue"),
|
||||
"pound": ("IfcQuantityWeight", "WeightValue"),
|
||||
"degree": ("IfcQuantityCount", "CountValue"),
|
||||
}
|
||||
|
||||
# Name keyword → IFC quantity type (used when no units are provided)
|
||||
_NAME_QTY_MAP = {
|
||||
"length": ("IfcQuantityLength", "LengthValue"),
|
||||
"width": ("IfcQuantityLength", "LengthValue"),
|
||||
"height": ("IfcQuantityLength", "LengthValue"),
|
||||
"depth": ("IfcQuantityLength", "LengthValue"),
|
||||
"perimeter": ("IfcQuantityLength", "LengthValue"),
|
||||
"area": ("IfcQuantityArea", "AreaValue"),
|
||||
"volume": ("IfcQuantityVolume", "VolumeValue"),
|
||||
"volumn": ("IfcQuantityVolume", "VolumeValue"), # common typo
|
||||
"weight": ("IfcQuantityWeight", "WeightValue"),
|
||||
"mass": ("IfcQuantityWeight", "WeightValue"),
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Set IFC element attributes from _properties.Attributes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def set_element_attributes(ifc, element, obj):
|
||||
"""Set IFC element attributes from _properties.Attributes."""
|
||||
attrs = get_attributes(obj)
|
||||
if not attrs:
|
||||
return
|
||||
|
||||
for ifc_attr in ["GlobalId", "Name", "Tag", "ObjectType", "Description"]:
|
||||
val = attrs.get(ifc_attr)
|
||||
if val is not None:
|
||||
try:
|
||||
setattr(element, ifc_attr, str(val))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# PredefinedType requires special handling (enum value)
|
||||
ptype = attrs.get("PredefinedType")
|
||||
if ptype:
|
||||
try:
|
||||
element.PredefinedType = str(ptype)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PropertySetManager — shared property sets across elements
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _try_float(value):
|
||||
"""Try to convert a value to float. Returns None on failure."""
|
||||
if isinstance(value, (int, float)):
|
||||
return float(value)
|
||||
if isinstance(value, str):
|
||||
try:
|
||||
return float(value)
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def _pset_content_key(pset_name: str, items: list) -> str:
|
||||
"""Build a hashable key from pset name + sorted property name-value pairs."""
|
||||
return repr((pset_name, sorted(items)))
|
||||
|
||||
|
||||
def _qto_content_key(qto_name: str, items: list) -> str:
|
||||
"""Build a hashable key from qto name + sorted quantity tuples."""
|
||||
return repr((qto_name, sorted(items)))
|
||||
|
||||
|
||||
class PropertySetManager:
|
||||
"""Creates shared IfcPropertySet / IfcElementQuantity entities.
|
||||
|
||||
Instead of creating one pset per element, identical psets are created
|
||||
once and linked to all elements that share them via a single
|
||||
IfcRelDefinesByProperties (written at flush time).
|
||||
"""
|
||||
|
||||
def __init__(self, ifc: ifcopenshell.file):
|
||||
self._ifc = ifc
|
||||
# content_key → IfcPropertySet / IfcElementQuantity entity
|
||||
self._pset_cache: dict[str, object] = {}
|
||||
# pset entity id → [element, ...]
|
||||
self._pending: dict[int, list] = {}
|
||||
self._pset_count = 0
|
||||
self._shared_count = 0
|
||||
|
||||
def queue_property_sets(self, element, obj):
|
||||
"""Extract Property Sets from obj and queue shared assignment to element."""
|
||||
props = get_properties(obj)
|
||||
properties_section = _to_dict(props.get("Property Sets"))
|
||||
if not properties_section:
|
||||
return
|
||||
|
||||
ifc = self._ifc
|
||||
for pset_name, pset_data in properties_section.items():
|
||||
pset_dict = _to_dict(pset_data) if not isinstance(pset_data, dict) else pset_data
|
||||
if not pset_dict:
|
||||
continue
|
||||
|
||||
# Build content items for hashing (skip id and None values)
|
||||
content_items = []
|
||||
for prop_name, prop_value in pset_dict.items():
|
||||
if prop_name == "id" or prop_value is None:
|
||||
continue
|
||||
content_items.append((str(prop_name), repr(prop_value)))
|
||||
|
||||
if not content_items:
|
||||
continue
|
||||
|
||||
key = _pset_content_key(pset_name, content_items)
|
||||
|
||||
if key not in self._pset_cache:
|
||||
# Create the shared IfcPropertySet entity
|
||||
ifc_props = []
|
||||
for prop_name, prop_value in pset_dict.items():
|
||||
if prop_name == "id" or prop_value is None:
|
||||
continue
|
||||
try:
|
||||
nominal = _make_ifc_value(ifc, prop_value)
|
||||
p = ifc.create_entity(
|
||||
"IfcPropertySingleValue",
|
||||
Name=str(prop_name),
|
||||
NominalValue=nominal,
|
||||
)
|
||||
ifc_props.append(p)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not ifc_props:
|
||||
continue
|
||||
|
||||
pset = ifc.create_entity(
|
||||
"IfcPropertySet",
|
||||
GlobalId=ifcopenshell.guid.new(),
|
||||
Name=pset_name,
|
||||
HasProperties=ifc_props,
|
||||
)
|
||||
self._pset_cache[key] = pset
|
||||
self._pset_count += 1
|
||||
else:
|
||||
self._shared_count += 1
|
||||
|
||||
pset = self._pset_cache[key]
|
||||
pid = pset.id()
|
||||
if pid not in self._pending:
|
||||
self._pending[pid] = []
|
||||
self._pending[pid].append(element)
|
||||
|
||||
def queue_quantity_sets(self, element, obj):
|
||||
"""Extract Quantities from obj and queue shared assignment to element."""
|
||||
props = get_properties(obj)
|
||||
quantities_raw = props.get("Quantities")
|
||||
if quantities_raw is None:
|
||||
return
|
||||
quantities_section = _to_dict(quantities_raw)
|
||||
if not quantities_section:
|
||||
return
|
||||
|
||||
ifc = self._ifc
|
||||
for qto_name, qto_data in quantities_section.items():
|
||||
qto_dict = _to_dict(qto_data) if not isinstance(qto_data, dict) else qto_data
|
||||
if not qto_dict:
|
||||
continue
|
||||
|
||||
# Build content items for hashing
|
||||
content_items = []
|
||||
for qty_key, qty_entry in qto_dict.items():
|
||||
if qty_key == "id":
|
||||
continue
|
||||
if isinstance(qty_entry, dict):
|
||||
name = qty_entry.get("name", qty_key)
|
||||
units = (qty_entry.get("units") or "").strip().lower()
|
||||
value = _try_float(qty_entry.get("value"))
|
||||
else:
|
||||
value = _try_float(qty_entry)
|
||||
if value is None:
|
||||
continue
|
||||
name = qty_key
|
||||
units = ""
|
||||
if value is not None:
|
||||
content_items.append((name, units, value))
|
||||
|
||||
if not content_items:
|
||||
continue
|
||||
|
||||
key = _qto_content_key(qto_name, content_items)
|
||||
|
||||
if key not in self._pset_cache:
|
||||
quantities = []
|
||||
for qty_key, qty_entry in qto_dict.items():
|
||||
if qty_key == "id":
|
||||
continue
|
||||
if isinstance(qty_entry, dict):
|
||||
name = qty_entry.get("name", qty_key)
|
||||
units = (qty_entry.get("units") or "").strip().lower()
|
||||
value = _try_float(qty_entry.get("value"))
|
||||
else:
|
||||
value = _try_float(qty_entry)
|
||||
if value is None:
|
||||
continue
|
||||
name = qty_key
|
||||
units = ""
|
||||
if value is None:
|
||||
continue
|
||||
|
||||
try:
|
||||
mapping = _UNIT_QTY_MAP.get(units)
|
||||
if not mapping:
|
||||
name_lower = name.lower()
|
||||
for keyword, m in _NAME_QTY_MAP.items():
|
||||
if keyword in name_lower:
|
||||
mapping = m
|
||||
break
|
||||
if mapping:
|
||||
qty_type, value_attr = mapping
|
||||
qty = ifc.create_entity(
|
||||
qty_type, Name=name, **{value_attr: value}
|
||||
)
|
||||
else:
|
||||
qty = ifc.create_entity(
|
||||
"IfcQuantityCount", Name=name, CountValue=int(value)
|
||||
)
|
||||
quantities.append(qty)
|
||||
except Exception as e:
|
||||
print(f" Warning: quantity {name}: {e}")
|
||||
continue
|
||||
|
||||
if not quantities:
|
||||
continue
|
||||
|
||||
qto = ifc.create_entity(
|
||||
"IfcElementQuantity",
|
||||
GlobalId=ifcopenshell.guid.new(),
|
||||
Name=qto_name,
|
||||
Quantities=quantities,
|
||||
)
|
||||
self._pset_cache[key] = qto
|
||||
self._pset_count += 1
|
||||
else:
|
||||
self._shared_count += 1
|
||||
|
||||
qto = self._pset_cache[key]
|
||||
qid = qto.id()
|
||||
if qid not in self._pending:
|
||||
self._pending[qid] = []
|
||||
self._pending[qid].append(element)
|
||||
|
||||
def flush(self):
|
||||
"""Write all batched IfcRelDefinesByProperties relationships."""
|
||||
ifc = self._ifc
|
||||
for pset_id, elements in self._pending.items():
|
||||
pset = ifc.by_id(pset_id)
|
||||
try:
|
||||
ifc.create_entity(
|
||||
"IfcRelDefinesByProperties",
|
||||
GlobalId=ifcopenshell.guid.new(),
|
||||
RelatedObjects=elements,
|
||||
RelatingPropertyDefinition=pset,
|
||||
)
|
||||
except Exception as e:
|
||||
print(f" Warning: pset rel: {e}")
|
||||
self._pending.clear()
|
||||
|
||||
def print_stats(self):
|
||||
total = self._pset_count + self._shared_count
|
||||
print(f" Property sets: {self._pset_count} unique / {total} total ({self._shared_count} shared)")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API — called from main.py
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def write_all_properties(ifc, element, obj, property_manager=None):
|
||||
"""Write all properties, quantities, and attributes from _properties."""
|
||||
set_element_attributes(ifc, element, obj)
|
||||
if property_manager:
|
||||
property_manager.queue_property_sets(element, obj)
|
||||
property_manager.queue_quantity_sets(element, obj)
|
||||
else:
|
||||
# Fallback: direct per-element creation (no sharing)
|
||||
_write_property_sets_direct(ifc, element, obj)
|
||||
_write_quantity_sets_direct(ifc, element, obj)
|
||||
|
||||
|
||||
def _write_property_sets_direct(ifc, element, obj):
|
||||
"""Legacy per-element property set writing (fallback)."""
|
||||
props = get_properties(obj)
|
||||
properties_section = _to_dict(props.get("Property Sets"))
|
||||
if not properties_section:
|
||||
return
|
||||
for pset_name, pset_data in properties_section.items():
|
||||
pset_dict = _to_dict(pset_data) if not isinstance(pset_data, dict) else pset_data
|
||||
if not pset_dict:
|
||||
continue
|
||||
ifc_props = []
|
||||
for prop_name, prop_value in pset_dict.items():
|
||||
if prop_name == "id" or prop_value is None:
|
||||
continue
|
||||
try:
|
||||
nominal = _make_ifc_value(ifc, prop_value)
|
||||
p = ifc.create_entity("IfcPropertySingleValue", Name=str(prop_name), NominalValue=nominal)
|
||||
ifc_props.append(p)
|
||||
except Exception:
|
||||
continue
|
||||
if ifc_props:
|
||||
try:
|
||||
pset = ifcopenshell.api.run("pset.add_pset", ifc, product=element, name=pset_name)
|
||||
pset.HasProperties = ifc_props
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _write_quantity_sets_direct(ifc, element, obj):
|
||||
"""Legacy per-element quantity set writing (fallback)."""
|
||||
props = get_properties(obj)
|
||||
quantities_raw = props.get("Quantities")
|
||||
if quantities_raw is None:
|
||||
return
|
||||
quantities_section = _to_dict(quantities_raw)
|
||||
if not quantities_section:
|
||||
return
|
||||
for qto_name, qto_data in quantities_section.items():
|
||||
qto_dict = _to_dict(qto_data) if not isinstance(qto_data, dict) else qto_data
|
||||
if not qto_dict:
|
||||
continue
|
||||
quantities = []
|
||||
for qty_key, qty_entry in qto_dict.items():
|
||||
if qty_key == "id":
|
||||
continue
|
||||
if isinstance(qty_entry, dict):
|
||||
name = qty_entry.get("name", qty_key)
|
||||
units = (qty_entry.get("units") or "").strip().lower()
|
||||
value = _try_float(qty_entry.get("value"))
|
||||
else:
|
||||
value = _try_float(qty_entry)
|
||||
if value is None:
|
||||
continue
|
||||
name = qty_key
|
||||
units = ""
|
||||
if value is None:
|
||||
continue
|
||||
try:
|
||||
mapping = _UNIT_QTY_MAP.get(units)
|
||||
if not mapping:
|
||||
for keyword, m in _NAME_QTY_MAP.items():
|
||||
if keyword in name.lower():
|
||||
mapping = m
|
||||
break
|
||||
if mapping:
|
||||
qty_type, value_attr = mapping
|
||||
qty = ifc.create_entity(qty_type, Name=name, **{value_attr: value})
|
||||
else:
|
||||
qty = ifc.create_entity("IfcQuantityCount", Name=name, CountValue=int(value))
|
||||
quantities.append(qty)
|
||||
except Exception:
|
||||
continue
|
||||
if quantities:
|
||||
try:
|
||||
qto = ifcopenshell.api.run("pset.add_qto", ifc, product=element, name=qto_name)
|
||||
qto.Quantities = quantities
|
||||
except Exception:
|
||||
pass
|
||||
@@ -1,75 +0,0 @@
|
||||
# =============================================================================
|
||||
# receiver.py
|
||||
# Connects to Speckle and receives the root Base object for a given version.
|
||||
# =============================================================================
|
||||
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
from specklepy.api.client import SpeckleClient
|
||||
from specklepy.api.credentials import get_default_account
|
||||
from specklepy.api import operations
|
||||
from specklepy.transports.server import ServerTransport
|
||||
|
||||
load_dotenv()
|
||||
|
||||
SPECKLE_HOST = os.getenv("SPECKLE_SERVER_URL", "https://app.speckle.systems")
|
||||
SPECKLE_TOKEN = os.getenv("SPECKLE_TOKEN", "")
|
||||
DEFAULT_UNITS = "mm"
|
||||
|
||||
|
||||
def get_client() -> SpeckleClient:
|
||||
"""
|
||||
Create and authenticate a SpeckleClient.
|
||||
Uses a personal access token from the .env file.
|
||||
To use your local Speckle Manager account instead, swap to get_default_account().
|
||||
"""
|
||||
client = SpeckleClient(host=SPECKLE_HOST)
|
||||
|
||||
if SPECKLE_TOKEN and SPECKLE_TOKEN != "YOUR_PERSONAL_ACCESS_TOKEN":
|
||||
client.authenticate_with_token(SPECKLE_TOKEN)
|
||||
else:
|
||||
# Fallback: use account from Speckle Manager desktop app
|
||||
account = get_default_account()
|
||||
if account is None:
|
||||
raise RuntimeError(
|
||||
"No Speckle account found. Either set SPECKLE_TOKEN in .env "
|
||||
"or log in via Speckle Manager."
|
||||
)
|
||||
client.authenticate_with_account(account)
|
||||
|
||||
return client
|
||||
|
||||
|
||||
def receive_version(project_id: str, version_id: str):
|
||||
"""
|
||||
Receive the root Base object from a Speckle version.
|
||||
|
||||
Args:
|
||||
project_id: The Speckle project (stream) ID.
|
||||
version_id: The version (commit) ID to receive.
|
||||
|
||||
Returns:
|
||||
A specklepy Base object — the root of the object graph.
|
||||
"""
|
||||
client = get_client()
|
||||
|
||||
print(f"🔗 Connecting to {SPECKLE_HOST}...")
|
||||
print(f"📦 Receiving project={project_id} version={version_id}")
|
||||
|
||||
# Get version metadata to find the referenced object ID
|
||||
version = client.version.get(version_id, project_id)
|
||||
referenced_object_id = version.referenced_object
|
||||
|
||||
# Download the full object graph
|
||||
transport = ServerTransport(stream_id=project_id, client=client)
|
||||
base = operations.receive(referenced_object_id, transport)
|
||||
|
||||
# Read units from the root object
|
||||
units = getattr(base, "units", DEFAULT_UNITS) or DEFAULT_UNITS
|
||||
|
||||
# IFC file is declared in MILLIMETRES — no conversion needed.
|
||||
# All geometry stays in source units (mm). scale=1.0 means "keep as-is".
|
||||
scale = 1.0
|
||||
|
||||
print(f"✅ Received root object units={units} scale=1.0 (IFC declared as mm)")
|
||||
return base, scale
|
||||
@@ -1,87 +0,0 @@
|
||||
# =============================================================================
|
||||
# traversal.py
|
||||
# Walks the nested Speckle Collection tree generically.
|
||||
#
|
||||
# Expected structure:
|
||||
# Root Collection
|
||||
# └── Collection
|
||||
# └── Collection
|
||||
# └── Object (leaf BIM element)
|
||||
#
|
||||
# Collections can nest to any depth. Every non-Collection leaf is yielded.
|
||||
# =============================================================================
|
||||
|
||||
from typing import Generator
|
||||
from specklepy.objects.base import Base
|
||||
|
||||
|
||||
def is_collection(obj) -> bool:
|
||||
"""Returns True if this object is a Speckle Collection node (not a leaf element)."""
|
||||
speckle_type = getattr(obj, "speckle_type", "") or ""
|
||||
return "Collection" in speckle_type
|
||||
|
||||
|
||||
def get_children(obj) -> list:
|
||||
"""
|
||||
Safely get the 'elements' list from a Base/Collection object.
|
||||
Handles 'elements', '@elements', and '_elements' variants.
|
||||
"""
|
||||
for key in ["elements", "@elements", "_elements"]:
|
||||
try:
|
||||
val = obj[key]
|
||||
if val is not None:
|
||||
return list(val)
|
||||
except Exception:
|
||||
continue
|
||||
return []
|
||||
|
||||
|
||||
def traverse(root: Base) -> Generator[Base, None, None]:
|
||||
"""
|
||||
Walk the full Speckle object tree from the root Base object.
|
||||
Yields every non-Collection leaf object found at any depth.
|
||||
"""
|
||||
yield from _walk(root)
|
||||
|
||||
|
||||
def _walk(obj):
|
||||
"""Recursively walk: descend into Collections, yield leaf objects."""
|
||||
if obj is None:
|
||||
return
|
||||
|
||||
children = get_children(obj)
|
||||
|
||||
if is_collection(obj):
|
||||
for child in children:
|
||||
yield from _walk(child)
|
||||
else:
|
||||
# Leaf object — yield it
|
||||
yield obj
|
||||
# Also check for nested children (e.g. curtain wall sub-elements)
|
||||
for child in children:
|
||||
if child is not None and not is_collection(child):
|
||||
yield from _walk(child)
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Debug helper
|
||||
# --------------------------------------------------------------------------- #
|
||||
|
||||
def print_tree(obj: Base, indent: int = 0, max_depth: int = 5):
|
||||
"""Print the object tree structure for debugging."""
|
||||
if indent > max_depth:
|
||||
return
|
||||
|
||||
prefix = " " * indent
|
||||
name = getattr(obj, "name", None) or ""
|
||||
speckle_type = getattr(obj, "speckle_type", "") or ""
|
||||
children = get_children(obj)
|
||||
child_count = f" ({len(children)} children)" if children else ""
|
||||
|
||||
print(f"{prefix}├─ [{speckle_type}] name={name!r}{child_count}")
|
||||
|
||||
for child in children[:5]:
|
||||
print_tree(child, indent + 1, max_depth)
|
||||
|
||||
if len(children) > 5:
|
||||
print(f"{prefix} ... and {len(children) - 5} more")
|
||||
@@ -1,146 +0,0 @@
|
||||
# =============================================================================
|
||||
# type_manager.py
|
||||
# Creates and caches IfcTypeObjects from Element Type Attributes and links
|
||||
# element instances to them via IfcRelDefinesByType.
|
||||
#
|
||||
# Type info comes from _properties:
|
||||
# Element Type Attributes → type class, Name, GlobalId, Tag, PredefinedType
|
||||
# Element Type Property Sets → psets written on the type object
|
||||
#
|
||||
# Type objects are SHARED — keyed by GlobalId or (type_class, Name).
|
||||
# =============================================================================
|
||||
|
||||
import ifcopenshell
|
||||
import ifcopenshell.api
|
||||
from specklepy.objects.base import Base
|
||||
from utils.properties import get_properties, _to_dict, _make_ifc_value
|
||||
|
||||
|
||||
class TypeManager:
|
||||
"""
|
||||
Creates IfcTypeObjects on demand and caches them.
|
||||
Call assign(element, obj) for each exported element.
|
||||
"""
|
||||
|
||||
def __init__(self, ifc: ifcopenshell.file):
|
||||
self._ifc = ifc
|
||||
self._cache: dict[str, object] = {} # cache_key → IfcTypeObject
|
||||
self._pending: dict[int, list] = {} # type_obj.id() → [element, ...]
|
||||
|
||||
def assign(self, element, obj: Base, ifc_class: str = ""):
|
||||
"""Create or retrieve cached type object and queue the assignment."""
|
||||
props = get_properties(obj)
|
||||
type_attrs = _to_dict(props.get("Element Type Attributes"))
|
||||
type_psets = _to_dict(props.get("Element Type Property Sets"))
|
||||
|
||||
if not type_attrs and not type_psets:
|
||||
return
|
||||
|
||||
# Determine type class and cache key
|
||||
type_class = None
|
||||
if type_attrs:
|
||||
type_class = type_attrs.get("type")
|
||||
|
||||
if type_class:
|
||||
# Format A: Element Type Attributes has explicit type info
|
||||
global_id = type_attrs.get("GlobalId")
|
||||
name = type_attrs.get("Name") or ""
|
||||
cache_key = global_id if global_id else f"{type_class}:{name}"
|
||||
|
||||
if cache_key not in self._cache:
|
||||
type_obj = self._create_type(type_class, type_attrs, type_psets)
|
||||
self._cache[cache_key] = type_obj
|
||||
else:
|
||||
# Format B: No explicit type class — derive from element IFC class
|
||||
type_class = ifc_class + "Type" if ifc_class else None
|
||||
if not type_class:
|
||||
return
|
||||
|
||||
# Merge: if type_attrs has no 'type' key, it contains psets directly
|
||||
merged_psets = type_psets.copy() if type_psets else {}
|
||||
if type_attrs:
|
||||
merged_psets.update(type_attrs)
|
||||
|
||||
cache_key = f"{type_class}:{repr(sorted(merged_psets.items()))}"
|
||||
|
||||
if cache_key not in self._cache:
|
||||
type_obj = self._create_type(type_class, {}, merged_psets)
|
||||
self._cache[cache_key] = type_obj
|
||||
|
||||
type_obj = self._cache[cache_key]
|
||||
type_id = type_obj.id()
|
||||
|
||||
if type_id not in self._pending:
|
||||
self._pending[type_id] = []
|
||||
self._pending[type_id].append(element)
|
||||
|
||||
def flush(self):
|
||||
"""Write all IfcRelDefinesByType relationships."""
|
||||
for type_id, elements in self._pending.items():
|
||||
type_obj = self._ifc.by_id(type_id)
|
||||
ifcopenshell.api.run(
|
||||
"type.assign_type", self._ifc,
|
||||
related_objects=elements,
|
||||
relating_type=type_obj,
|
||||
)
|
||||
self._pending.clear()
|
||||
print(f" Type objects created: {len(self._cache)}")
|
||||
|
||||
def _create_type(self, type_class: str, type_attrs: dict, type_psets: dict):
|
||||
"""Instantiate the IfcTypeObject with attributes and property sets."""
|
||||
ifc = self._ifc
|
||||
name = type_attrs.get("Name") or ""
|
||||
|
||||
type_obj = ifcopenshell.api.run(
|
||||
"root.create_entity", ifc,
|
||||
ifc_class=type_class,
|
||||
name=name,
|
||||
)
|
||||
|
||||
# Set standard attributes
|
||||
for attr in ["GlobalId", "Tag", "Description", "ElementType"]:
|
||||
val = type_attrs.get(attr)
|
||||
if val is not None:
|
||||
try:
|
||||
setattr(type_obj, attr, str(val))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# PredefinedType (enum)
|
||||
ptype = type_attrs.get("PredefinedType")
|
||||
if ptype:
|
||||
try:
|
||||
type_obj.PredefinedType = str(ptype)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Write type-level property sets
|
||||
if type_psets:
|
||||
for pset_name, pset_data in type_psets.items():
|
||||
pset_dict = _to_dict(pset_data) if not isinstance(pset_data, dict) else pset_data
|
||||
if not pset_dict:
|
||||
continue
|
||||
ifc_props = []
|
||||
for prop_name, prop_value in pset_dict.items():
|
||||
if prop_name == "id" or prop_value is None:
|
||||
continue
|
||||
try:
|
||||
nominal = _make_ifc_value(ifc, prop_value)
|
||||
p = ifc.create_entity(
|
||||
"IfcPropertySingleValue",
|
||||
Name=str(prop_name),
|
||||
NominalValue=nominal,
|
||||
)
|
||||
ifc_props.append(p)
|
||||
except Exception:
|
||||
continue
|
||||
if ifc_props:
|
||||
try:
|
||||
pset = ifcopenshell.api.run(
|
||||
"pset.add_pset", ifc, product=type_obj, name=pset_name
|
||||
)
|
||||
pset.HasProperties = ifc_props
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return type_obj
|
||||
-160
@@ -1,160 +0,0 @@
|
||||
# =============================================================================
|
||||
# writer.py
|
||||
# Creates and manages the IFC file structure:
|
||||
# IfcProject → IfcSite → IfcBuilding → IfcBuildingStorey (one per level)
|
||||
#
|
||||
# Also provides StoreyManager which lazily creates storeys on demand
|
||||
# as the traversal encounters new level names.
|
||||
# =============================================================================
|
||||
|
||||
import ifcopenshell
|
||||
import ifcopenshell.api
|
||||
|
||||
|
||||
def create_ifc_scaffold(
|
||||
project_name: str = "Default Project",
|
||||
site_name: str = "Default Site",
|
||||
building_name: str = "Default Building",
|
||||
) -> tuple:
|
||||
"""
|
||||
Create the IFC file with the required project/site/building hierarchy.
|
||||
|
||||
Returns:
|
||||
(ifc_file, site, building, body_context)
|
||||
- ifc_file: The ifcopenshell file object
|
||||
- site: The IfcSite entity
|
||||
- building: The IfcBuilding entity (storeys are assigned under this)
|
||||
- body_context: The Body geometry subcontext for shape representations
|
||||
"""
|
||||
ifc = ifcopenshell.file(schema="IFC4X3")
|
||||
|
||||
# Project
|
||||
project = ifcopenshell.api.run(
|
||||
"root.create_entity", ifc,
|
||||
ifc_class="IfcProject",
|
||||
name=project_name,
|
||||
)
|
||||
|
||||
# Units — millimetres (matching Revit/Speckle source data)
|
||||
# This avoids any mm→m conversion errors and keeps coordinates at full precision
|
||||
ifcopenshell.api.run(
|
||||
"unit.assign_unit", ifc,
|
||||
length={"is_metric": True, "raw": "MILLIMETRES"},
|
||||
)
|
||||
|
||||
# Geometry contexts
|
||||
model_ctx = ifcopenshell.api.run(
|
||||
"context.add_context", ifc,
|
||||
context_type="Model",
|
||||
)
|
||||
body_ctx = ifcopenshell.api.run(
|
||||
"context.add_context", ifc,
|
||||
context_type="Model",
|
||||
context_identifier="Body",
|
||||
target_view="MODEL_VIEW",
|
||||
parent=model_ctx,
|
||||
)
|
||||
|
||||
# Spatial hierarchy
|
||||
site = ifcopenshell.api.run(
|
||||
"root.create_entity", ifc,
|
||||
ifc_class="IfcSite",
|
||||
name=site_name,
|
||||
)
|
||||
building = ifcopenshell.api.run(
|
||||
"root.create_entity", ifc,
|
||||
ifc_class="IfcBuilding",
|
||||
name=building_name,
|
||||
)
|
||||
|
||||
ifcopenshell.api.run(
|
||||
"aggregate.assign_object", ifc,
|
||||
relating_object=project,
|
||||
products=[site],
|
||||
)
|
||||
ifcopenshell.api.run(
|
||||
"aggregate.assign_object", ifc,
|
||||
relating_object=site,
|
||||
products=[building],
|
||||
)
|
||||
|
||||
return ifc, site, building, body_ctx
|
||||
|
||||
|
||||
class StoreyManager:
|
||||
"""
|
||||
Lazily creates IfcBuildingStorey entities as new level names are encountered.
|
||||
Keeps storeys in insertion order so the IFC file is logically ordered.
|
||||
|
||||
Spatial containment is batched — call flush() after all elements are created
|
||||
to write all IfcRelContainedInSpatialStructure / aggregate relationships at once.
|
||||
"""
|
||||
|
||||
def __init__(self, ifc: ifcopenshell.file, building):
|
||||
self.ifc = ifc
|
||||
self.building = building
|
||||
self._storeys: dict[str, object] = {} # level_name → IfcBuildingStorey
|
||||
# Batched containment: storey_id → [element, ...]
|
||||
self._contained: dict[int, list] = {}
|
||||
# Batched aggregation (IfcSite etc.): storey_id → [element, ...]
|
||||
self._aggregated: dict[int, list] = {}
|
||||
|
||||
def get_or_create(self, level_name: str):
|
||||
"""Return existing storey or create a new one for this level name."""
|
||||
if level_name not in self._storeys:
|
||||
storey = ifcopenshell.api.run(
|
||||
"root.create_entity", self.ifc,
|
||||
ifc_class="IfcBuildingStorey",
|
||||
name=level_name,
|
||||
)
|
||||
ifcopenshell.api.run(
|
||||
"aggregate.assign_object", self.ifc,
|
||||
relating_object=self.building,
|
||||
products=[storey],
|
||||
)
|
||||
self._storeys[level_name] = storey
|
||||
print(f" Created storey: {level_name}")
|
||||
|
||||
return self._storeys[level_name]
|
||||
|
||||
def queue_contain(self, storey, element):
|
||||
"""Queue an element for spatial containment (batched flush)."""
|
||||
sid = storey.id()
|
||||
if sid not in self._contained:
|
||||
self._contained[sid] = []
|
||||
self._contained[sid].append(element)
|
||||
|
||||
def queue_aggregate(self, storey, element):
|
||||
"""Queue an element for aggregation under storey (e.g. IfcSite)."""
|
||||
sid = storey.id()
|
||||
if sid not in self._aggregated:
|
||||
self._aggregated[sid] = []
|
||||
self._aggregated[sid].append(element)
|
||||
|
||||
def flush(self):
|
||||
"""Write all batched spatial containment and aggregation relationships."""
|
||||
ifc = self.ifc
|
||||
for sid, elements in self._contained.items():
|
||||
storey = ifc.by_id(sid)
|
||||
ifcopenshell.api.run(
|
||||
"spatial.assign_container", ifc,
|
||||
relating_structure=storey,
|
||||
products=elements,
|
||||
)
|
||||
for sid, elements in self._aggregated.items():
|
||||
storey = ifc.by_id(sid)
|
||||
ifcopenshell.api.run(
|
||||
"aggregate.assign_object", ifc,
|
||||
relating_object=storey,
|
||||
products=elements,
|
||||
)
|
||||
self._contained.clear()
|
||||
self._aggregated.clear()
|
||||
|
||||
@property
|
||||
def count(self) -> int:
|
||||
return len(self._storeys)
|
||||
|
||||
@property
|
||||
def names(self) -> list[str]:
|
||||
return list(self._storeys.keys())
|
||||
Reference in New Issue
Block a user