This commit is contained in:
KatKatKateryna
2024-08-20 23:25:26 +01:00
parent 6a5484fead
commit e2c123529d
11 changed files with 673 additions and 608 deletions
+38 -606
View File
@@ -28,10 +28,9 @@
#
# =================================================================
import copy
from datetime import datetime
import json
import logging
import math
import os
import sys
from typing import Any, Dict, List, Optional, Tuple, Union
@@ -42,11 +41,7 @@ from pygeoapi.util import crs_transform
LOGGER = logging.getLogger(__name__)
DEFAULT_COLOR = (255 << 24) + (150 << 16) + (150 << 8) + 150
_user_data_env_var = "SPECKLE_USERDATA_PATH"
_application_name = "Speckle"
_host_application = "pygeoapi"
HOST_APP = "pygeoapi"
class SpeckleProvider(BaseProvider):
"""Provider class for Speckle server data
@@ -80,9 +75,7 @@ class SpeckleProvider(BaseProvider):
# )
from subprocess import run
from pygeoapi.provider.speckle_utils.patch_specklepy import patch_specklepy
# path = str(self.connector_installation_path(_host_application))
from pygeoapi.provider.speckle_utils.patch.patch_specklepy import patch_specklepy
try:
import specklepy
@@ -121,9 +114,6 @@ class SpeckleProvider(BaseProvider):
patch_specklepy()
# assign global values
global DEFAULT_COLOR
DEFAULT_COLOR = (255 << 24) + (150 << 16) + (150 << 8) + 150
self.url: str = self.data # to store the value and check if self.data has changed
self.speckle_url = self.url.lower().split("speckleurl=")[-1].split("&")[0].split("@")[0].split("?")[0]
@@ -169,6 +159,8 @@ class SpeckleProvider(BaseProvider):
def _load(self, skip_geometry=None, properties=[], select_properties=[]):
"""Validate Speckle data"""
from pygeoapi.provider.speckle_utils.crs_utils import create_crs_from_authid
if self.data == "":
return
raise ValueError(
@@ -208,7 +200,7 @@ class SpeckleProvider(BaseProvider):
# if CRS assigned, create one:
if len(crs_authid)>3:
self.create_crs_from_authid()
create_crs_from_authid(self)
# check if it's a new request (self.data was updated and doesn't match self.url)
new_request = False
@@ -363,7 +355,7 @@ class SpeckleProvider(BaseProvider):
from specklepy.logging.metrics import set_host_app
from specklepy.transports.server import ServerTransport
set_host_app("pygeoapi", "0.0.99")
set_host_app(HOST_APP, "0.0.99")
# get URL that will not trigget Client init
url_proj: str = self.speckle_url.split("models")[0]
@@ -422,7 +414,6 @@ class SpeckleProvider(BaseProvider):
def traverse_data(self, commit_obj):
from specklepy.objects.geometry import Base
from specklepy.objects.geometry import Point, Line, Polyline, Curve, Mesh, Brep
from specklepy.objects.GIS.CRS import CRS
from specklepy.objects.GIS.layers import VectorLayer
@@ -432,6 +423,11 @@ class SpeckleProvider(BaseProvider):
GraphTraversal,
TraversalRule,
)
from pygeoapi.provider.speckle_utils.crs_utils import create_crs_from_wkt, create_crs_default, create_crs_dict
from pygeoapi.provider.speckle_utils.coords_utils import reproject_bulk
from pygeoapi.provider.speckle_utils.props_utils import assign_props, assign_missing_props
from pygeoapi.provider.speckle_utils.converter_utils import assign_geometry
from pygeoapi.provider.speckle_utils.display_utils import find_display_obj, set_default_color, assign_display_properties, get_display_units
supported_classes = [GisFeature, GisPolygonElement, Mesh, Brep, Point, Line, Polyline, Curve]
supported_types = [y().speckle_type for y in supported_classes]
@@ -451,7 +447,7 @@ class SpeckleProvider(BaseProvider):
"features": [],
"model_crs": "-",
}
self.assign_crs_to_geojson(data)
self.assign_coordinate_system_to_geojson(data)
rule = TraversalRule(
[lambda _: True],
lambda x: [
@@ -463,7 +459,7 @@ class SpeckleProvider(BaseProvider):
)
context_list = [x for x in GraphTraversal([rule]).traverse(commit_obj)]
# iterate Speckle objects to get "crs" property
# iterate Speckle objects to get CRS, DisplayUnits, offsets, rotation
crs = None
displayUnits = None
offset_x = 0
@@ -480,7 +476,7 @@ class SpeckleProvider(BaseProvider):
offset_x = crs["offset_x"]
offset_y = crs["offset_y"]
self.north_degrees = crs["rotation"]
self.create_crs_from_wkt(crs["wkt"])
create_crs_from_wkt(self, crs["wkt"])
if self.crs.to_authority() is not None:
data["model_crs"] = f"{self.crs.to_authority()}, {self.crs.name} "
@@ -488,62 +484,35 @@ class SpeckleProvider(BaseProvider):
data["model_crs"] = f"{self.crs.to_proj4()}"
break
elif displayUnits is None and type(item) in supported_types:
displayUnits = item.units
except AttributeError as ex:
pass # old commit structure
# if CRS not found, create default one and get model units for scaling
if self.crs is None:
self.create_crs_default()
for item in context_list:
if hasattr(item.current, "displayValue"):
try:
displayVal = item.current["displayValue"]
except:
displayVal = item.current.displayValue
if isinstance(displayVal, list) and len(displayVal)>0:
displayUnits = displayVal[0].units
break
elif isinstance(displayVal, Base):
displayUnits = item.current.units
break
else:
if item.current.units is not None:
displayUnits = item.current.units
break
create_crs_default(self)
# if displayUnits not found, get from displayable object
if displayUnits is None:
displayUnits = get_display_units(context_list)
self.create_crs_dict(offset_x, offset_y, displayUnits)
# iterate to get features
#list_len = len(context_list)
#load = 0
#print(f"{load}% loaded")
create_crs_dict(self, offset_x, offset_y, displayUnits)
# get coordinates in bulk
all_coords = []
all_coord_counts = []
print(f"Loading..")
all_props = []
all_props = []
set_default_color(context_list)
print(f"Loading features..")
time1 = datetime.now()
for item in context_list:
# for GIS-commits, use default blue color
if isinstance(item.current, VectorLayer):
global DEFAULT_COLOR
DEFAULT_COLOR = (255 << 24) + (10 << 16) + (132 << 8) + 255
#new_load = round(i / list_len * 10, 1) * 10
#if new_load % 10 == 0 and new_load != load:
# load = round(i / list_len * 100)
# print(f"{load}% loaded")
f_base = item.current
f_id = item.current.id
f_fid = len(data["features"]) + 1
# feature
# initialize feature
feature: Dict = {
"type": "Feature",
# "bbox": [-180.0, -90.0, 180.0, 90.0],
@@ -555,323 +524,35 @@ class SpeckleProvider(BaseProvider):
},
}
# feature geometry
obj_display = self.find_display_obj(f_base)
coords, coord_counts = self.assign_geometry(feature, obj_display)
# feature geometry, props and displayProps
obj_display, obj_get_color = find_display_obj(f_base)
coords, coord_counts = assign_geometry(feature, obj_display)
if len(coords)!=0:
all_coords.extend(coords)
all_coord_counts.append(coord_counts)
self.assign_props(f_base, feature["properties"])
assign_props(f_base, feature["properties"])
# update list of all properties
for prop in feature["properties"]:
if prop not in all_props:
all_props.append(prop)
feature["displayProperties"] = {}
self.assign_color(obj_display, feature["displayProperties"])
# other properties for rendering
if isinstance(f_base, Mesh) or isinstance(f_base, Brep):
feature["displayProperties"]['lineWidth'] = 0.3
elif "Line" in feature["geometry"]["type"]:
feature["displayProperties"]['lineWidth'] = 3
else:
feature["displayProperties"]['lineWidth'] = 1
# if "Point" in feature["geometry"]["type"]:
try:
feature["displayProperties"]["radius"] = feature["properties"]["weight"]
except:
feature["displayProperties"]["radius"] = 10
assign_display_properties(feature, f_base, obj_get_color)
data["features"].append(feature)
# assign all props to all features
for feat in data["features"]:
for prop in all_props:
if prop not in list(feat["properties"].keys()):
feat["properties"][prop] = "N/A"
assign_missing_props(data["features"], all_props)
if len(all_coords)==0:
if len(data["features"])==0:
raise ValueError("No supported features found")
time2 = datetime.now()
print(f"Loading features before reprojecting time: {(time2-time1).total_seconds()}")
self.reproject_bulk(all_coords, all_coord_counts, [f["geometry"] for f in data["features"]])
reproject_bulk(self, all_coords, all_coord_counts, [f["geometry"] for f in data["features"]])
return data
def reproject_bulk(self, all_coords, all_coord_counts: List[List[None| List[int]]], geometries):
from datetime import datetime
# reproject all coords
time1 = datetime.now()
flat_coords = self.reproject_2d_coords_list(
all_coords
)
time2 = datetime.now()
print((time2-time1).total_seconds())
# define type of features
feat_coord_group_is_multi = [True if None in x else False for x in all_coord_counts]
feat_coord_group_counts = [[ y for y in x if y is not None] for x in all_coord_counts]
feat_coord_group_counts_per_part = [[ sum(y) for y in x if y is not None] for x in all_coord_counts]
feat_coord_group_flat_counts: List[int] = [sum([ sum(y) for y in x if y is not None]) for x in all_coord_counts]
feat_coord_groups = [flat_coords[sum(feat_coord_group_flat_counts[:i]):sum(feat_coord_group_flat_counts[:i])+x] for i, x in enumerate(feat_coord_group_flat_counts)]
for i, geometry in enumerate(geometries):
geometry["coordinates"] = []
if feat_coord_group_is_multi[i] is False:
if geometry["type"] == "Point":
geometry["coordinates"].extend(feat_coord_groups[i][0])
else:
geometry["coordinates"].extend(feat_coord_groups[i])
else:
polygon_parts = []
local_coords_count: List[List[int]] = feat_coord_group_counts[i]
local_coords_count_flat: List[int] = feat_coord_group_counts_per_part[i]
local_flat_coords: List[int] = feat_coord_groups[i]
for c, poly_part_count_lists in enumerate(local_coords_count):
poly_part = []
start_index = sum(local_coords_count_flat[:c]) if c!=0 else 0 # all used coords in all parts
for part_count in poly_part_count_lists:
range_coords_indices = range(start_index, start_index + part_count)
if geometry["type"] == "MultiPoint":
poly_part.extend([local_flat_coords[ind] for ind in range_coords_indices])
else:
poly_part.append([local_flat_coords[ind] for ind in range_coords_indices])
start_index += part_count
if geometry["type"] in ["MultiPoint","MultiLineString"] :
polygon_parts.extend(poly_part)
else:
polygon_parts.append(poly_part)
geometry["coordinates"].extend(polygon_parts)
time3 = datetime.now()
print((time3-time2).total_seconds())
def create_crs_from_wkt(self, wkt: str | None):
from pyproj import CRS
self.crs = CRS.from_user_input(wkt)
def create_crs_from_authid(self, authid: str | None):
from pyproj import CRS
crs_obj = CRS.from_string(authid)
self.crs = crs_obj
def create_crs_default(self):
from pyproj import CRS
wkt = f'PROJCS["SpeckleCRS_latlon_{self.lat}_{self.lon}", GEOGCS["GCS_WGS_1984", DATUM["D_WGS_1984", SPHEROID["WGS_1984", 6378137.0, 298.257223563]], PRIMEM["Greenwich", 0.0], UNIT["Degree", 0.0174532925199433]], PROJECTION["Transverse_Mercator"], PARAMETER["False_Easting", 0.0], PARAMETER["False_Northing", 0.0], PARAMETER["Central_Meridian", {self.lon}], PARAMETER["Scale_Factor", 1.0], PARAMETER["Latitude_Of_Origin", {self.lat}], UNIT["Meter", 1.0]]'
crs_obj = CRS.from_user_input(wkt)
self.crs = crs_obj
def create_crs_dict(self, offset_x, offset_y, displayUnits: str | None):
if self.crs is not None:
self.crs_dict = {
"wkt": self.crs.to_wkt(),
"offset_x": offset_x,
"offset_y": offset_y,
"rotation": self.north_degrees,
"units_native": displayUnits,
"obj": self.crs,
}
def assign_geometry(self, feature: Dict, f_base):
from specklepy.objects.geometry import Point, Line, Polyline, Curve, Mesh, Brep
from specklepy.objects.GIS.geometry import GisPolygonGeometry
from specklepy.objects.GIS.GisFeature import GisFeature
geometry = feature["geometry"]
coords = []
coord_counts = []
if isinstance(f_base, Point):
geometry["type"] = "MultiPoint"
coord_counts.append(None)
coords.append([f_base.x, f_base.y])
coord_counts.append([1])
elif isinstance(f_base, Mesh) or isinstance(f_base, Brep):
geometry["type"] = "MultiPolygon"
coord_counts.append(None) # as an indicator of a MultiPolygon
faces = []
vertices = []
if isinstance(f_base, Mesh):
faces = f_base.faces
vertices = f_base.vertices
elif isinstance(f_base, Brep):
if f_base.displayValue is None or (
isinstance(f_base.displayValue, list)
and len(f_base.displayValue) == 0
):
geometry = {}
return
elif isinstance(f_base.displayValue, list):
faces = f_base.displayValue[0].faces
vertices = f_base.displayValue[0].vertices
else:
faces = f_base.displayValue.faces
vertices = f_base.displayValue.vertices
count: int = 0
for i, pt_count in enumerate(faces):
if i != count:
continue
# old encoding
if pt_count == 0:
pt_count = 3
elif pt_count == 1:
pt_count = 4
coord_counts.append([pt_count])
for vertex_index in faces[count + 1 : count + 1 + pt_count]:
x = vertices[vertex_index * 3]
y = vertices[vertex_index * 3 + 1]
coords.append([x, y])
count += pt_count + 1
elif f_base.speckle_type.endswith(".GisFeature") and len(f_base["geometry"]) > 0: # isinstance(f_base, GisFeature) and len(f_base.geometry) > 0:
# GisFeature doesn't deserialize properly, need to check for speckle_type
if isinstance(f_base.geometry[0], Point):
geometry["type"] = "MultiPoint"
coord_counts.append(None)
for geom in f_base.geometry:
coords.append([geom.x, geom.y])
coord_counts.append([1])
elif isinstance(f_base.geometry[0], Polyline):
geometry["type"] = "MultiLineString"
coord_counts.append(None)
for geom in f_base.geometry:
coord_counts.append([])
local_poly_count = 0
for pt in geom.as_points():
coords.append([pt.x, pt.y])
local_poly_count += 1
if len(coords)>2 and geom.closed is True and coords[0] != coords[-1]:
coords.append(coords[0])
local_poly_count += 1
coord_counts[-1].append(local_poly_count)
elif isinstance(f_base.geometry[0], GisPolygonGeometry):
geometry["type"] = "MultiPolygon"
coord_counts.append(None)
for polygon in f_base.geometry:
coord_counts.append([])
boundary_count = 0
for pt in polygon.boundary.as_points():
coords.append([pt.x, pt.y])
boundary_count += 1
coord_counts[-1].append(boundary_count)
for void in polygon.voids:
void_count = 0
for pt_void in void.as_points():
coords.append([pt_void.x, pt_void.y])
void_count += 1
coord_counts[-1].append(void_count)
elif isinstance(f_base, Line):
geometry["type"] = "LineString"
start = [f_base.start.x, f_base.start.y]
end = [f_base.end.x, f_base.end.y]
coords.extend([start, end])
coord_counts.append([2])
elif isinstance(f_base, Polyline):
geometry["type"] = "LineString"
for pt in f_base.as_points():
coords.append([pt.x, pt.y])
if len(coords)>2 and f_base.closed is True and coords[0] != coords[-1]:
coords.append(coords[0])
coord_counts.append([len(coords)])
elif isinstance(f_base, Curve):
geometry["type"] = "LineString"
#geometry["coordinates"] = []
for pt in f_base.displayValue.as_points():
#geometry["coordinates"].append([pt.x, pt.y])
coords.append([pt.x, pt.y])
if len(coords)>2 and f_base.displayValue.closed is True and coords[0] != coords[-1]:
coords.append(coords[0])
coord_counts.append([len(coords)])
#geometry["coordinates"] = self.reproject_2d_coords_list(
# geometry["coordinates"]
#)
else:
geometry = {}
# print(f"Unsupported geometry type: {f_base.speckle_type}")
return coords, coord_counts
def reproject_2d_coords_list(self, coords_in: List[list]):
from pyproj import Transformer
from pyproj import CRS
coords_offset = self.offset_rotate(copy.deepcopy(coords_in))
transformer = Transformer.from_crs(
self.crs,
CRS.from_user_input(4326),
always_xy=True,
)
return [[pt[0], pt[1]] for pt in transformer.itransform(coords_offset)]
def offset_rotate(self, coords_in: List[list]):
from specklepy.objects.units import get_scale_factor_from_string
scale_factor = 1
if isinstance(self.crs_dict["units_native"], str):
scale_factor = get_scale_factor_from_string(self.crs_dict["units_native"], "m")
final_coords = []
for coord in coords_in:
a = self.crs_dict["rotation"] * math.pi / 180
x2 = coord[0] * math.cos(a) - coord[1] * math.sin(a)
y2 = coord[0] * math.sin(a) + coord[1] * math.cos(a)
final_coords.append(
[
scale_factor * (x2 + self.crs_dict["offset_x"]),
scale_factor * (y2 + self.crs_dict["offset_y"]),
]
)
return final_coords
def assign_crs_to_geojson(self, data: Dict):
def assign_coordinate_system_to_geojson(self, data: Dict):
crs = {
"crs": {
@@ -882,183 +563,6 @@ class SpeckleProvider(BaseProvider):
data["crs"] = crs
def assign_props(self, obj, props):
from specklepy.objects.geometry import Base
from specklepy.objects.other import RevitParameter
all_prop_names = obj.get_member_names()
dynamic_prop_names = obj.get_dynamic_member_names()
typed_prop_names = obj.get_typed_member_names()
# check if GIS object
if "attributes" in all_prop_names and isinstance(obj["attributes"], Base):
all_prop_names = obj["attributes"].get_dynamic_member_names()
for prop_name in all_prop_names:
value = getattr(obj["attributes"], prop_name)
if (prop_name
in [
"geometry",
"Speckle_ID",
"id",
]
):
pass
else:
if (
isinstance(value, Base)
or isinstance(value, List)
or isinstance(value, Dict)
):
props[prop_name] = str(value)
else:
props[prop_name] = value
return
# if Rhino:
elif "userStrings" in dynamic_prop_names and isinstance(obj["userStrings"], Base):
all_prop_names = obj["userStrings"].get_dynamic_member_names()
for prop_name in all_prop_names:
if prop_name in ["id"]:
continue
value = getattr(obj["userStrings"], prop_name)
if not isinstance(value, str):
props[prop_name] = str(value)
else:
props[prop_name] = value
return
for prop_name in obj.get_dynamic_member_names():
if (
prop_name
in [
"displayValue",
"displayStyle",
"renderMaterial",
"revitLinkedModelPath",
"id",
]
):
pass
else:
value = getattr(obj, prop_name)
if (
isinstance(value, Base)
or isinstance(value, List)
or isinstance(value, Dict)
):
props[prop_name] = str(value)
else:
props[prop_name] = value
# if Revit:
if "parameters" in all_prop_names and isinstance(obj.parameters, Base):
for prop_name in obj.parameters.get_dynamic_member_names():
if prop_name in ["id","revitLinkedModelPath"]:
continue
param = getattr(obj.parameters, prop_name)
if isinstance(param, RevitParameter):
if not isinstance(param.value, str):
props[prop_name] = str(param.value)
else:
props[prop_name] = param.value
# add after dynamic parameters
def find_display_obj(self, obj):
from specklepy.objects.geometry import Base, Mesh
displayVal = None
if hasattr(obj, 'displayValue'):
displayVal = getattr(obj, 'displayValue')
elif hasattr(obj, '@displayValue'):
displayVal = getattr(obj, '@displayValue')
if isinstance(displayVal, Base):
return displayVal
elif isinstance(displayVal, List):
faces = []
verts = []
colors = []
for item in displayVal:
if isinstance(item, Mesh):
start_vert_count = int(len(verts)/3)
# only add colors if existing and incoming colors are valid (same length as vertices)
if len(colors) == start_vert_count and isinstance(item.colors, List) and len(item.colors)== int(len(item.vertices)/3)>0:
colors.extend(item.colors)
else:
colors = []
verts.extend(item.vertices)
count = 0
for _ in item.faces:
try:
vert_num = item.faces[count]
faces.append(vert_num)
faces.extend([ x+start_vert_count for x in item.faces[count+1 : count+1+vert_num]])
count += vert_num+1
except IndexError:
break
else:
return item
mesh = Mesh.create(faces= faces, vertices=verts, colors=colors)
for prop in displayVal[0].get_member_names():
if prop not in ["colors", "vertices", "faces"]:
mesh[prop] = getattr(displayVal[0], prop)
return mesh
return obj
def assign_color(self, obj_display, props):
from specklepy.objects.geometry import Mesh
# initialize Speckle Blue color
color = DEFAULT_COLOR
try:
if hasattr(obj_display, 'renderMaterial'):
color = obj_display['renderMaterial']['diffuse']
elif hasattr(obj_display, '@renderMaterial'):
color = obj_display['@renderMaterial']['diffuse']
elif hasattr(obj_display, 'displayStyle'):
color = obj_display['displayStyle']['color']
elif hasattr(obj_display, '@displayStyle'):
color = obj_display['@displayStyle']['color']
elif isinstance(obj_display, Mesh) and isinstance(obj_display.colors, List):
sameColors = True
color1 = obj_display.colors[0]
for c in obj_display.colors:
if c != color1:
sameColors = False
break
if sameColors is True:
color = color1
except Exception as e:
print(e)
r, g, b = self.get_r_g_b(color)
hex_color = '#%02x%02x%02x' % (r, g, b)
props['color'] = hex_color
def get_r_g_b(self, rgb: int) -> Tuple[int, int, int]:
r = g = b = 0
try:
r = (rgb & 0xFF0000) >> 16
g = (rgb & 0xFF00) >> 8
b = rgb & 0xFF
except Exception as e:
r = g = b = 150
return r, g, b
def get_python_path(self):
if sys.platform.startswith("linux"):
return sys.executable
@@ -1068,75 +572,3 @@ class SpeckleProvider(BaseProvider):
else:
pythonExec += "/bin/python3"
return pythonExec
def user_application_data_path(self) -> "Path":
"""Get the platform specific user configuration folder path"""
from pathlib import Path
path_override = self._path()
if path_override:
return path_override
try:
if sys.platform.startswith("win"):
app_data_path = os.getenv("APPDATA")
if not app_data_path:
raise Exception("Cannot get appdata path from environment.")
return Path(app_data_path)
else:
# try getting the standard XDG_DATA_HOME value
# as that is used as an override
app_data_path = os.getenv("XDG_DATA_HOME")
if app_data_path:
return Path(app_data_path)
else:
return self.ensure_folder_exists(Path.home(), ".config")
except Exception as ex:
raise Exception("Failed to initialize user application data path.", ex)
def ensure_folder_exists(self, base_path: "Path", folder_name: str) -> "Path":
from pathlib import Path
path = base_path.joinpath(folder_name)
path.mkdir(exist_ok=True, parents=True)
return path
def _path(self) -> Optional["Path"]:
from pathlib import Path
"""Read the user data path override setting."""
path_override = os.environ.get(_user_data_env_var)
if path_override:
return Path(path_override)
return None
def connector_installation_path(self, host_application: str) -> "Path":
connector_installation_path = self.user_speckle_connector_installation_path(
host_application
)
connector_installation_path.mkdir(exist_ok=True, parents=True)
# set user modules path at beginning of paths for earlier hit
if sys.path[0] != connector_installation_path:
sys.path.insert(0, str(connector_installation_path))
# print(f"Using connector installation path {connector_installation_path}")
return connector_installation_path
def user_speckle_connector_installation_path(self, host_application: str) -> "Path":
"""
Gets a connector specific installation folder.
In this folder we can put our connector installation and all python packages.
"""
return self.ensure_folder_exists(
self.ensure_folder_exists(
self.user_speckle_folder_path(), "connector_installations"
),
host_application,
)
def user_speckle_folder_path(self) -> "Path":
"""Get the folder where the user's Speckle data should be stored."""
return self.ensure_folder_exists(
self.user_application_data_path(), _application_name
)
@@ -0,0 +1,144 @@
from typing import Dict, List
def assign_geometry(feature: Dict, f_base) -> ( List[List[List[float]]], List[List[None| List[int]]] ):
"""Assign geom type and convert object coords into flat lists of coordinates and schema."""
from specklepy.objects.geometry import Point, Line, Polyline, Curve, Mesh, Brep
from specklepy.objects.GIS.geometry import GisPolygonGeometry
from specklepy.objects.GIS.GisFeature import GisFeature
geometry = feature["geometry"]
coords = []
coord_counts = []
if isinstance(f_base, Point):
geometry["type"] = "MultiPoint"
coord_counts.append(None)
coords.append([f_base.x, f_base.y])
coord_counts.append([1])
elif isinstance(f_base, Mesh) or isinstance(f_base, Brep):
geometry["type"] = "MultiPolygon"
coord_counts.append(None) # as an indicator of a MultiPolygon
faces = []
vertices = []
if isinstance(f_base, Mesh):
faces = f_base.faces
vertices = f_base.vertices
elif isinstance(f_base, Brep):
if f_base.displayValue is None or (
isinstance(f_base.displayValue, list)
and len(f_base.displayValue) == 0
):
geometry = {}
return
elif isinstance(f_base.displayValue, list):
faces = f_base.displayValue[0].faces
vertices = f_base.displayValue[0].vertices
else:
faces = f_base.displayValue.faces
vertices = f_base.displayValue.vertices
count: int = 0
for i, pt_count in enumerate(faces):
if i != count:
continue
# old encoding
if pt_count == 0:
pt_count = 3
elif pt_count == 1:
pt_count = 4
coord_counts.append([pt_count])
for vertex_index in faces[count + 1 : count + 1 + pt_count]:
x = vertices[vertex_index * 3]
y = vertices[vertex_index * 3 + 1]
coords.append([x, y])
count += pt_count + 1
elif f_base.speckle_type.endswith(".GisFeature") and len(f_base["geometry"]) > 0: # isinstance(f_base, GisFeature) and len(f_base.geometry) > 0:
# GisFeature doesn't deserialize properly, need to check for speckle_type
if isinstance(f_base.geometry[0], Point):
geometry["type"] = "MultiPoint"
coord_counts.append(None)
for geom in f_base.geometry:
coords.append([geom.x, geom.y])
coord_counts.append([1])
elif isinstance(f_base.geometry[0], Polyline):
geometry["type"] = "MultiLineString"
coord_counts.append(None)
for geom in f_base.geometry:
coord_counts.append([])
local_poly_count = 0
for pt in geom.as_points():
coords.append([pt.x, pt.y])
local_poly_count += 1
if len(coords)>2 and geom.closed is True and coords[0] != coords[-1]:
coords.append(coords[0])
local_poly_count += 1
coord_counts[-1].append(local_poly_count)
elif isinstance(f_base.geometry[0], GisPolygonGeometry):
geometry["type"] = "MultiPolygon"
coord_counts.append(None)
for polygon in f_base.geometry:
coord_counts.append([])
boundary_count = 0
for pt in polygon.boundary.as_points():
coords.append([pt.x, pt.y])
boundary_count += 1
coord_counts[-1].append(boundary_count)
for void in polygon.voids:
void_count = 0
for pt_void in void.as_points():
coords.append([pt_void.x, pt_void.y])
void_count += 1
coord_counts[-1].append(void_count)
elif isinstance(f_base, Line):
geometry["type"] = "LineString"
start = [f_base.start.x, f_base.start.y]
end = [f_base.end.x, f_base.end.y]
coords.extend([start, end])
coord_counts.append([2])
elif isinstance(f_base, Polyline):
geometry["type"] = "LineString"
for pt in f_base.as_points():
coords.append([pt.x, pt.y])
if len(coords)>2 and f_base.closed is True and coords[0] != coords[-1]:
coords.append(coords[0])
coord_counts.append([len(coords)])
elif isinstance(f_base, Curve):
geometry["type"] = "LineString"
for pt in f_base.displayValue.as_points():
coords.append([pt.x, pt.y])
if len(coords)>2 and f_base.displayValue.closed is True and coords[0] != coords[-1]:
coords.append(coords[0])
coord_counts.append([len(coords)])
else:
geometry = {}
# print(f"Unsupported geometry type: {f_base.speckle_type}")
return coords, coord_counts
@@ -0,0 +1,102 @@
import copy
import math
from typing import List
def reproject_bulk(self, all_coords: List[List[List[float]]], all_coord_counts: List[List[None| List[int]]], geometries) -> None:
"""Reproject coordinates and assign to corresponding geometries."""
from datetime import datetime
# reproject all coords
time1 = datetime.now()
flat_coords = reproject_2d_coords_list(self, all_coords)
time2 = datetime.now()
print(f"Reproject time: {(time2-time1).total_seconds()}")
# define type of features
feat_coord_group_is_multi = [True if None in x else False for x in all_coord_counts]
feat_coord_group_counts = [[ y for y in x if y is not None] for x in all_coord_counts]
feat_coord_group_counts_per_part = [[ sum(y) for y in x if y is not None] for x in all_coord_counts]
feat_coord_group_flat_counts: List[int] = [sum([ sum(y) for y in x if y is not None]) for x in all_coord_counts]
feat_coord_groups = [flat_coords[sum(feat_coord_group_flat_counts[:i]):sum(feat_coord_group_flat_counts[:i])+x] for i, x in enumerate(feat_coord_group_flat_counts)]
for i, geometry in enumerate(geometries):
geometry["coordinates"] = []
if feat_coord_group_is_multi[i] is False:
if geometry["type"] == "Point":
geometry["coordinates"].extend(feat_coord_groups[i][0])
else:
geometry["coordinates"].extend(feat_coord_groups[i])
else:
polygon_parts = []
local_coords_count: List[List[int]] = feat_coord_group_counts[i]
local_coords_count_flat: List[int] = feat_coord_group_counts_per_part[i]
local_flat_coords: List[int] = feat_coord_groups[i]
for c, poly_part_count_lists in enumerate(local_coords_count):
poly_part = []
start_index = sum(local_coords_count_flat[:c]) if c!=0 else 0 # all used coords in all parts
for part_count in poly_part_count_lists:
range_coords_indices = range(start_index, start_index + part_count)
if geometry["type"] == "MultiPoint":
poly_part.extend([local_flat_coords[ind] for ind in range_coords_indices])
else:
poly_part.append([local_flat_coords[ind] for ind in range_coords_indices])
start_index += part_count
if geometry["type"] in ["MultiPoint","MultiLineString"] :
polygon_parts.extend(poly_part)
else:
polygon_parts.append(poly_part)
geometry["coordinates"].extend(polygon_parts)
time3 = datetime.now()
print(f"Construct back geometry time: {(time3-time2).total_seconds()}")
def reproject_2d_coords_list(self, coords_in: List[List[float]]) -> List[List[float]]:
"""Return coordinates in a CRS of SpeckleProvider."""
from pyproj import Transformer
from pyproj import CRS
coords_offset = offset_rotate(self, copy.deepcopy(coords_in))
transformer = Transformer.from_crs(
self.crs,
CRS.from_user_input(4326),
always_xy=True,
)
return [[pt[0], pt[1]] for pt in transformer.itransform(coords_offset)]
def offset_rotate(self, coords_in: List[list]) -> List[List[float]]:
"""Apply offset and rotation to coordinates, according to SpeckleProvider CRS_dict."""
from specklepy.objects.units import get_scale_factor_from_string
scale_factor = 1
if isinstance(self.crs_dict["units_native"], str):
scale_factor = get_scale_factor_from_string(self.crs_dict["units_native"], "m")
final_coords = []
for coord in coords_in:
a = self.crs_dict["rotation"] * math.pi / 180
x2 = coord[0] * math.cos(a) - coord[1] * math.sin(a)
y2 = coord[0] * math.sin(a) + coord[1] * math.cos(a)
final_coords.append(
[
scale_factor * (x2 + self.crs_dict["offset_x"]),
scale_factor * (y2 + self.crs_dict["offset_y"]),
]
)
return final_coords
@@ -0,0 +1,39 @@
def create_crs_from_wkt(self: "SpeckleProvider", wkt: str | None) -> None:
"""Create and assign CRS object from WKT string."""
from pyproj import CRS
self.crs = CRS.from_user_input(wkt)
def create_crs_from_authid(self: "SpeckleProvider", authid: str | None) -> None:
"""Create and assign CRS object from Authority ID."""
from pyproj import CRS
crs_obj = CRS.from_string(authid)
self.crs = crs_obj
def create_crs_default(self: "SpeckleProvider") -> None:
"""Create and assign custom CRS using SpeckleProvider Lat & Lon."""
from pyproj import CRS
wkt = f'PROJCS["SpeckleCRS_latlon_{self.lat}_{self.lon}", GEOGCS["GCS_WGS_1984", DATUM["D_WGS_1984", SPHEROID["WGS_1984", 6378137.0, 298.257223563]], PRIMEM["Greenwich", 0.0], UNIT["Degree", 0.0174532925199433]], PROJECTION["Transverse_Mercator"], PARAMETER["False_Easting", 0.0], PARAMETER["False_Northing", 0.0], PARAMETER["Central_Meridian", {self.lon}], PARAMETER["Scale_Factor", 1.0], PARAMETER["Latitude_Of_Origin", {self.lat}], UNIT["Meter", 1.0]]'
crs_obj = CRS.from_user_input(wkt)
self.crs = crs_obj
def create_crs_dict(self: "SpeckleProvider", offset_x, offset_y, displayUnits: str | None) -> None:
"""Create and assign CRS object from WKT string."""
if self.crs is not None:
self.crs_dict = {
"wkt": self.crs.to_wkt(),
"offset_x": offset_x,
"offset_y": offset_y,
"rotation": self.north_degrees,
"units_native": displayUnits,
"obj": self.crs,
}
@@ -0,0 +1,167 @@
from typing import Dict, List, Tuple
DEFAULT_COLOR = (255 << 24) + (150 << 16) + (150 << 8) + 150
def find_display_obj(obj) -> Tuple["Base", "Base"]:
"""Get displayable object."""
from specklepy.objects.geometry import Base, Mesh
displayVal = obj
displayValForColor = obj
if hasattr(obj, 'displayValue'):
displayValForColor = getattr(obj, 'displayValue')
elif hasattr(obj, '@displayValue'):
displayValForColor = getattr(obj, '@displayValue')
if isinstance(displayValForColor, List):
faces = []
verts = []
colors = []
for item in displayValForColor:
if isinstance(item, Mesh):
start_vert_count = int(len(verts)/3)
# only add colors if existing and incoming colors are valid (same length as vertices)
if len(colors) == start_vert_count and isinstance(item.colors, List) and len(item.colors)== int(len(item.vertices)/3)>0:
colors.extend(item.colors)
else:
colors = []
verts.extend(item.vertices)
count = 0
for _ in item.faces:
try:
vert_num = item.faces[count]
faces.append(vert_num)
faces.extend([ x+start_vert_count for x in item.faces[count+1 : count+1+vert_num]])
count += vert_num+1
except IndexError:
break
elif item is not None:
displayValForColor = item
mesh = Mesh.create(faces= faces, vertices=verts, colors=colors)
for prop in displayValForColor[0].get_member_names():
if prop not in ["colors", "vertices", "faces"]:
mesh[prop] = getattr(displayValForColor[0], prop)
displayValForColor = mesh
displayVal = displayValForColor
print(displayValForColor)
# if not searching for colored object, return GisFeatures as is
if obj.speckle_type.endswith(".GisFeature"):
displayVal = obj
return displayVal, displayValForColor
def get_display_units(context_list: List["TraversalContext"]) -> None | str:
"""Get units from either of displayable objects."""
from specklepy.objects.geometry import Base
displayUnits = None
for item in context_list:
if hasattr(item.current, "displayValue"):
try:
displayVal = item.current["displayValue"]
except:
displayVal = item.current.displayValue
if isinstance(displayVal, list) and len(displayVal)>0:
displayUnits = displayVal[0].units
break
elif isinstance(displayVal, Base):
displayUnits = item.current.units
break
else:
if item.current.units is not None:
displayUnits = item.current.units
break
return displayUnits
def set_default_color(context_list: List["TraversalContext"]) -> None:
"""Get and set the default color."""
from specklepy.objects.GIS.layers import VectorLayer
global DEFAULT_COLOR
DEFAULT_COLOR = (255 << 24) + (150 << 16) + (150 << 8) + 150
for item in context_list:
# for GIS-commits, use default blue color
if isinstance(item.current, VectorLayer):
DEFAULT_COLOR = (255 << 24) + (10 << 16) + (132 << 8) + 255
break
def assign_color(obj_display, props) -> None:
"""Get and assign color to feature displayProperties."""
from specklepy.objects.geometry import Mesh
# initialize Speckle Blue color
color = DEFAULT_COLOR
try:
if hasattr(obj_display, 'renderMaterial'):
color = obj_display['renderMaterial']['diffuse']
elif hasattr(obj_display, '@renderMaterial'):
color = obj_display['@renderMaterial']['diffuse']
elif hasattr(obj_display, 'displayStyle'):
color = obj_display['displayStyle']['color']
elif hasattr(obj_display, '@displayStyle'):
color = obj_display['@displayStyle']['color']
elif isinstance(obj_display, Mesh) and isinstance(obj_display.colors, List):
sameColors = True
color1 = obj_display.colors[0]
for c in obj_display.colors:
if c != color1:
sameColors = False
break
if sameColors is True:
color = color1
except Exception as e:
print(e)
r, g, b = get_r_g_b(color)
hex_color = '#%02x%02x%02x' % (r, g, b)
props['color'] = hex_color
def get_r_g_b(rgb: int) -> Tuple[int, int, int]:
"""Get R, G, B values from int."""
r = g = b = 0
try:
r = (rgb & 0xFF0000) >> 16
g = (rgb & 0xFF00) >> 8
b = rgb & 0xFF
except Exception as e:
r = g = b = 150
return r, g, b
def assign_display_properties(feature: Dict, f_base: "Base", obj_display: "Base") -> None:
"""Assign displayProperties to the feature."""
from specklepy.objects.geometry import Mesh, Brep
feature["displayProperties"] = {}
assign_color(obj_display, feature["displayProperties"])
# other properties for rendering
if isinstance(f_base, Mesh) or isinstance(f_base, Brep):
feature["displayProperties"]['lineWidth'] = 0.3
elif "Line" in feature["geometry"]["type"]:
feature["displayProperties"]['lineWidth'] = 3
else:
feature["displayProperties"]['lineWidth'] = 1
# if "Point" in feature["geometry"]["type"]:
try:
feature["displayProperties"]["radius"] = feature["properties"]["weight"]
except:
feature["displayProperties"]["radius"] = 10
@@ -0,0 +1,79 @@
import os
import sys
from typing import Optional
_user_data_env_var = "SPECKLE_USERDATA_PATH"
_application_name = "Speckle"
def user_application_data_path() -> "Path":
"""Get the platform specific user configuration folder path"""
from pathlib import Path
path_override = _path()
if path_override:
return path_override
try:
if sys.platform.startswith("win"):
app_data_path = os.getenv("APPDATA")
if not app_data_path:
raise Exception("Cannot get appdata path from environment.")
return Path(app_data_path)
else:
# try getting the standard XDG_DATA_HOME value
# as that is used as an override
app_data_path = os.getenv("XDG_DATA_HOME")
if app_data_path:
return Path(app_data_path)
else:
return ensure_folder_exists(Path.home(), ".config")
except Exception as ex:
raise Exception("Failed to initialize user application data path.", ex)
def ensure_folder_exists(base_path: "Path", folder_name: str) -> "Path":
from pathlib import Path
path = base_path.joinpath(folder_name)
path.mkdir(exist_ok=True, parents=True)
return path
def _path() -> Optional["Path"]:
from pathlib import Path
"""Read the user data path override setting."""
path_override = os.environ.get(_user_data_env_var)
if path_override:
return Path(path_override)
return None
def connector_installation_path(host_application: str) -> "Path":
connector_installation_path = user_speckle_connector_installation_path(
host_application
)
connector_installation_path.mkdir(exist_ok=True, parents=True)
# set user modules path at beginning of paths for earlier hit
if sys.path[0] != connector_installation_path:
sys.path.insert(0, str(connector_installation_path))
# print(f"Using connector installation path {connector_installation_path}")
return connector_installation_path
def user_speckle_connector_installation_path(host_application: str) -> "Path":
"""
Gets a connector specific installation folder.
In this folder we can put our connector installation and all python packages.
"""
return ensure_folder_exists(
ensure_folder_exists(
user_speckle_folder_path(), "connector_installations"
),
host_application,
)
def user_speckle_folder_path() -> "Path":
"""Get the folder where the user's Speckle data should be stored."""
return ensure_folder_exists(
user_application_data_path(), _application_name
)
@@ -27,12 +27,12 @@ def get_transport_path():
return str(credentials_path)
def get_transport_path_src():
credentials_path = Path(get_pygeoapi_path(), "provider", "speckle_utils", "server.py")
credentials_path = Path(get_pygeoapi_path(), "provider", "speckle_utils", "patch", "server.py")
return str(credentials_path)
def get_gis_feature_path_src():
credentials_path = Path(get_pygeoapi_path(), "provider", "speckle_utils", "GisFeature.py")
credentials_path = Path(get_pygeoapi_path(), "provider", "speckle_utils", "patch", "GisFeature.py")
return str(credentials_path)
@@ -0,0 +1,102 @@
from typing import Dict, List
def assign_props(obj: "Base", props: Dict):
"""Assign properties to the feature from Base object."""
from specklepy.objects.geometry import Base
from specklepy.objects.other import RevitParameter
all_prop_names = obj.get_member_names()
dynamic_prop_names = obj.get_dynamic_member_names()
typed_prop_names = obj.get_typed_member_names()
# check if GIS object
if "attributes" in all_prop_names and isinstance(obj["attributes"], Base):
all_prop_names = obj["attributes"].get_dynamic_member_names()
for prop_name in all_prop_names:
value = getattr(obj["attributes"], prop_name)
if (prop_name
in [
"geometry",
"Speckle_ID",
"id",
]
):
pass
else:
if (
isinstance(value, Base)
or isinstance(value, List)
or isinstance(value, Dict)
):
props[prop_name] = str(value)
else:
props[prop_name] = value
return
# if Rhino:
elif "userStrings" in dynamic_prop_names and isinstance(obj["userStrings"], Base):
all_prop_names = obj["userStrings"].get_dynamic_member_names()
for prop_name in all_prop_names:
if prop_name in ["id"]:
continue
value = getattr(obj["userStrings"], prop_name)
if not isinstance(value, str):
props[prop_name] = str(value)
else:
props[prop_name] = value
return
for prop_name in obj.get_dynamic_member_names():
if (
prop_name
in [
"displayValue",
"displayStyle",
"renderMaterial",
"revitLinkedModelPath",
"id",
]
):
pass
else:
value = getattr(obj, prop_name)
if (
isinstance(value, Base)
or isinstance(value, List)
or isinstance(value, Dict)
):
props[prop_name] = str(value)
else:
props[prop_name] = value
# if Revit:
if "parameters" in all_prop_names and isinstance(obj.parameters, Base):
for prop_name in obj.parameters.get_dynamic_member_names():
if prop_name in ["id","revitLinkedModelPath"]:
continue
param = getattr(obj.parameters, prop_name)
if isinstance(param, RevitParameter):
if not isinstance(param.value, str):
props[prop_name] = str(param.value)
else:
props[prop_name] = param.value
# add after dynamic parameters
def assign_missing_props(features: Dict, all_props: List[str]) -> None:
"""Assign NA values to missing properties."""
# assign all props to all features
for feat in features:
for prop in all_props:
if prop not in list(feat["properties"].keys()):
feat["properties"][prop] = "N/A"