From e2c123529d1dbc9e2d7ccfd9b58a2db4e3af412d Mon Sep 17 00:00:00 2001 From: KatKatKateryna Date: Tue, 20 Aug 2024 23:25:26 +0100 Subject: [PATCH] refactor --- pygeoapi/provider/speckle.py | 644 ++---------------- .../provider/speckle_utils/converter_utils.py | 144 ++++ .../provider/speckle_utils/coords_utils.py | 102 +++ pygeoapi/provider/speckle_utils/crs_utils.py | 39 ++ .../provider/speckle_utils/display_utils.py | 167 +++++ .../provider/speckle_utils/install_utils.py | 79 +++ .../speckle_utils/{ => patch}/GisFeature.py | 0 .../provider/speckle_utils/patch/__init__.py | 0 .../{ => patch}/patch_specklepy.py | 4 +- .../speckle_utils/{ => patch}/server.py | 0 .../provider/speckle_utils/props_utils.py | 102 +++ 11 files changed, 673 insertions(+), 608 deletions(-) create mode 100644 pygeoapi/provider/speckle_utils/converter_utils.py create mode 100644 pygeoapi/provider/speckle_utils/coords_utils.py create mode 100644 pygeoapi/provider/speckle_utils/crs_utils.py create mode 100644 pygeoapi/provider/speckle_utils/display_utils.py create mode 100644 pygeoapi/provider/speckle_utils/install_utils.py rename pygeoapi/provider/speckle_utils/{ => patch}/GisFeature.py (100%) create mode 100644 pygeoapi/provider/speckle_utils/patch/__init__.py rename pygeoapi/provider/speckle_utils/{ => patch}/patch_specklepy.py (97%) rename pygeoapi/provider/speckle_utils/{ => patch}/server.py (100%) create mode 100644 pygeoapi/provider/speckle_utils/props_utils.py diff --git a/pygeoapi/provider/speckle.py b/pygeoapi/provider/speckle.py index 6608a5f..c5f2725 100644 --- a/pygeoapi/provider/speckle.py +++ b/pygeoapi/provider/speckle.py @@ -28,10 +28,9 @@ # # ================================================================= -import copy +from datetime import datetime import json import logging -import math import os import sys from typing import Any, Dict, List, Optional, Tuple, Union @@ -42,11 +41,7 @@ from pygeoapi.util import crs_transform LOGGER = logging.getLogger(__name__) -DEFAULT_COLOR = (255 << 24) + (150 << 16) + (150 << 8) + 150 -_user_data_env_var = "SPECKLE_USERDATA_PATH" -_application_name = "Speckle" -_host_application = "pygeoapi" - +HOST_APP = "pygeoapi" class SpeckleProvider(BaseProvider): """Provider class for Speckle server data @@ -80,9 +75,7 @@ class SpeckleProvider(BaseProvider): # ) from subprocess import run - from pygeoapi.provider.speckle_utils.patch_specklepy import patch_specklepy - - # path = str(self.connector_installation_path(_host_application)) + from pygeoapi.provider.speckle_utils.patch.patch_specklepy import patch_specklepy try: import specklepy @@ -121,9 +114,6 @@ class SpeckleProvider(BaseProvider): patch_specklepy() # assign global values - global DEFAULT_COLOR - DEFAULT_COLOR = (255 << 24) + (150 << 16) + (150 << 8) + 150 - self.url: str = self.data # to store the value and check if self.data has changed self.speckle_url = self.url.lower().split("speckleurl=")[-1].split("&")[0].split("@")[0].split("?")[0] @@ -169,6 +159,8 @@ class SpeckleProvider(BaseProvider): def _load(self, skip_geometry=None, properties=[], select_properties=[]): """Validate Speckle data""" + from pygeoapi.provider.speckle_utils.crs_utils import create_crs_from_authid + if self.data == "": return raise ValueError( @@ -208,7 +200,7 @@ class SpeckleProvider(BaseProvider): # if CRS assigned, create one: if len(crs_authid)>3: - self.create_crs_from_authid() + create_crs_from_authid(self) # check if it's a new request (self.data was updated and doesn't match self.url) new_request = False @@ -363,7 +355,7 @@ class SpeckleProvider(BaseProvider): from specklepy.logging.metrics import set_host_app from specklepy.transports.server import ServerTransport - set_host_app("pygeoapi", "0.0.99") + set_host_app(HOST_APP, "0.0.99") # get URL that will not trigget Client init url_proj: str = self.speckle_url.split("models")[0] @@ -422,7 +414,6 @@ class SpeckleProvider(BaseProvider): def traverse_data(self, commit_obj): - from specklepy.objects.geometry import Base from specklepy.objects.geometry import Point, Line, Polyline, Curve, Mesh, Brep from specklepy.objects.GIS.CRS import CRS from specklepy.objects.GIS.layers import VectorLayer @@ -432,6 +423,11 @@ class SpeckleProvider(BaseProvider): GraphTraversal, TraversalRule, ) + from pygeoapi.provider.speckle_utils.crs_utils import create_crs_from_wkt, create_crs_default, create_crs_dict + from pygeoapi.provider.speckle_utils.coords_utils import reproject_bulk + from pygeoapi.provider.speckle_utils.props_utils import assign_props, assign_missing_props + from pygeoapi.provider.speckle_utils.converter_utils import assign_geometry + from pygeoapi.provider.speckle_utils.display_utils import find_display_obj, set_default_color, assign_display_properties, get_display_units supported_classes = [GisFeature, GisPolygonElement, Mesh, Brep, Point, Line, Polyline, Curve] supported_types = [y().speckle_type for y in supported_classes] @@ -451,7 +447,7 @@ class SpeckleProvider(BaseProvider): "features": [], "model_crs": "-", } - self.assign_crs_to_geojson(data) + self.assign_coordinate_system_to_geojson(data) rule = TraversalRule( [lambda _: True], lambda x: [ @@ -463,7 +459,7 @@ class SpeckleProvider(BaseProvider): ) context_list = [x for x in GraphTraversal([rule]).traverse(commit_obj)] - # iterate Speckle objects to get "crs" property + # iterate Speckle objects to get CRS, DisplayUnits, offsets, rotation crs = None displayUnits = None offset_x = 0 @@ -480,7 +476,7 @@ class SpeckleProvider(BaseProvider): offset_x = crs["offset_x"] offset_y = crs["offset_y"] self.north_degrees = crs["rotation"] - self.create_crs_from_wkt(crs["wkt"]) + create_crs_from_wkt(self, crs["wkt"]) if self.crs.to_authority() is not None: data["model_crs"] = f"{self.crs.to_authority()}, {self.crs.name} " @@ -488,62 +484,35 @@ class SpeckleProvider(BaseProvider): data["model_crs"] = f"{self.crs.to_proj4()}" break - elif displayUnits is None and type(item) in supported_types: - displayUnits = item.units except AttributeError as ex: pass # old commit structure # if CRS not found, create default one and get model units for scaling if self.crs is None: - self.create_crs_default() - for item in context_list: - if hasattr(item.current, "displayValue"): - try: - displayVal = item.current["displayValue"] - except: - displayVal = item.current.displayValue - if isinstance(displayVal, list) and len(displayVal)>0: - displayUnits = displayVal[0].units - break - elif isinstance(displayVal, Base): - displayUnits = item.current.units - break - else: - if item.current.units is not None: - displayUnits = item.current.units - break + create_crs_default(self) + # if displayUnits not found, get from displayable object + if displayUnits is None: + displayUnits = get_display_units(context_list) - self.create_crs_dict(offset_x, offset_y, displayUnits) - - # iterate to get features - #list_len = len(context_list) - #load = 0 - #print(f"{load}% loaded") + create_crs_dict(self, offset_x, offset_y, displayUnits) # get coordinates in bulk all_coords = [] all_coord_counts = [] - print(f"Loading..") - all_props = [] + all_props = [] + set_default_color(context_list) + + print(f"Loading features..") + time1 = datetime.now() for item in context_list: - # for GIS-commits, use default blue color - if isinstance(item.current, VectorLayer): - global DEFAULT_COLOR - DEFAULT_COLOR = (255 << 24) + (10 << 16) + (132 << 8) + 255 - - #new_load = round(i / list_len * 10, 1) * 10 - #if new_load % 10 == 0 and new_load != load: - # load = round(i / list_len * 100) - # print(f"{load}% loaded") - f_base = item.current f_id = item.current.id f_fid = len(data["features"]) + 1 - # feature + # initialize feature feature: Dict = { "type": "Feature", # "bbox": [-180.0, -90.0, 180.0, 90.0], @@ -555,323 +524,35 @@ class SpeckleProvider(BaseProvider): }, } - # feature geometry - obj_display = self.find_display_obj(f_base) - coords, coord_counts = self.assign_geometry(feature, obj_display) - + # feature geometry, props and displayProps + obj_display, obj_get_color = find_display_obj(f_base) + coords, coord_counts = assign_geometry(feature, obj_display) if len(coords)!=0: all_coords.extend(coords) all_coord_counts.append(coord_counts) - self.assign_props(f_base, feature["properties"]) + assign_props(f_base, feature["properties"]) # update list of all properties for prop in feature["properties"]: if prop not in all_props: all_props.append(prop) - feature["displayProperties"] = {} - self.assign_color(obj_display, feature["displayProperties"]) - - # other properties for rendering - if isinstance(f_base, Mesh) or isinstance(f_base, Brep): - feature["displayProperties"]['lineWidth'] = 0.3 - elif "Line" in feature["geometry"]["type"]: - feature["displayProperties"]['lineWidth'] = 3 - else: - feature["displayProperties"]['lineWidth'] = 1 - - # if "Point" in feature["geometry"]["type"]: - try: - feature["displayProperties"]["radius"] = feature["properties"]["weight"] - except: - feature["displayProperties"]["radius"] = 10 - + assign_display_properties(feature, f_base, obj_get_color) data["features"].append(feature) - # assign all props to all features - for feat in data["features"]: - for prop in all_props: - if prop not in list(feat["properties"].keys()): - feat["properties"][prop] = "N/A" + assign_missing_props(data["features"], all_props) - if len(all_coords)==0: + if len(data["features"])==0: raise ValueError("No supported features found") + + time2 = datetime.now() + print(f"Loading features before reprojecting time: {(time2-time1).total_seconds()}") - self.reproject_bulk(all_coords, all_coord_counts, [f["geometry"] for f in data["features"]]) + reproject_bulk(self, all_coords, all_coord_counts, [f["geometry"] for f in data["features"]]) return data - def reproject_bulk(self, all_coords, all_coord_counts: List[List[None| List[int]]], geometries): - from datetime import datetime - # reproject all coords - time1 = datetime.now() - flat_coords = self.reproject_2d_coords_list( - all_coords - ) - time2 = datetime.now() - print((time2-time1).total_seconds()) - - # define type of features - feat_coord_group_is_multi = [True if None in x else False for x in all_coord_counts] - - feat_coord_group_counts = [[ y for y in x if y is not None] for x in all_coord_counts] - feat_coord_group_counts_per_part = [[ sum(y) for y in x if y is not None] for x in all_coord_counts] - - feat_coord_group_flat_counts: List[int] = [sum([ sum(y) for y in x if y is not None]) for x in all_coord_counts] - - feat_coord_groups = [flat_coords[sum(feat_coord_group_flat_counts[:i]):sum(feat_coord_group_flat_counts[:i])+x] for i, x in enumerate(feat_coord_group_flat_counts)] - - for i, geometry in enumerate(geometries): - geometry["coordinates"] = [] - if feat_coord_group_is_multi[i] is False: - - if geometry["type"] == "Point": - geometry["coordinates"].extend(feat_coord_groups[i][0]) - else: - geometry["coordinates"].extend(feat_coord_groups[i]) - else: - polygon_parts = [] - local_coords_count: List[List[int]] = feat_coord_group_counts[i] - local_coords_count_flat: List[int] = feat_coord_group_counts_per_part[i] - local_flat_coords: List[int] = feat_coord_groups[i] - - for c, poly_part_count_lists in enumerate(local_coords_count): - poly_part = [] - start_index = sum(local_coords_count_flat[:c]) if c!=0 else 0 # all used coords in all parts - - for part_count in poly_part_count_lists: - range_coords_indices = range(start_index, start_index + part_count) - - if geometry["type"] == "MultiPoint": - poly_part.extend([local_flat_coords[ind] for ind in range_coords_indices]) - else: - poly_part.append([local_flat_coords[ind] for ind in range_coords_indices]) - - start_index += part_count - - if geometry["type"] in ["MultiPoint","MultiLineString"] : - polygon_parts.extend(poly_part) - else: - polygon_parts.append(poly_part) - - geometry["coordinates"].extend(polygon_parts) - - time3 = datetime.now() - print((time3-time2).total_seconds()) - - def create_crs_from_wkt(self, wkt: str | None): - - from pyproj import CRS - self.crs = CRS.from_user_input(wkt) - - def create_crs_from_authid(self, authid: str | None): - - from pyproj import CRS - - crs_obj = CRS.from_string(authid) - self.crs = crs_obj - - def create_crs_default(self): - - from pyproj import CRS - - wkt = f'PROJCS["SpeckleCRS_latlon_{self.lat}_{self.lon}", GEOGCS["GCS_WGS_1984", DATUM["D_WGS_1984", SPHEROID["WGS_1984", 6378137.0, 298.257223563]], PRIMEM["Greenwich", 0.0], UNIT["Degree", 0.0174532925199433]], PROJECTION["Transverse_Mercator"], PARAMETER["False_Easting", 0.0], PARAMETER["False_Northing", 0.0], PARAMETER["Central_Meridian", {self.lon}], PARAMETER["Scale_Factor", 1.0], PARAMETER["Latitude_Of_Origin", {self.lat}], UNIT["Meter", 1.0]]' - crs_obj = CRS.from_user_input(wkt) - self.crs = crs_obj - - def create_crs_dict(self, offset_x, offset_y, displayUnits: str | None): - if self.crs is not None: - self.crs_dict = { - "wkt": self.crs.to_wkt(), - "offset_x": offset_x, - "offset_y": offset_y, - "rotation": self.north_degrees, - "units_native": displayUnits, - "obj": self.crs, - } - - def assign_geometry(self, feature: Dict, f_base): - - from specklepy.objects.geometry import Point, Line, Polyline, Curve, Mesh, Brep - from specklepy.objects.GIS.geometry import GisPolygonGeometry - from specklepy.objects.GIS.GisFeature import GisFeature - - geometry = feature["geometry"] - coords = [] - coord_counts = [] - - if isinstance(f_base, Point): - geometry["type"] = "MultiPoint" - coord_counts.append(None) - - coords.append([f_base.x, f_base.y]) - coord_counts.append([1]) - - elif isinstance(f_base, Mesh) or isinstance(f_base, Brep): - geometry["type"] = "MultiPolygon" - coord_counts.append(None) # as an indicator of a MultiPolygon - - faces = [] - vertices = [] - if isinstance(f_base, Mesh): - faces = f_base.faces - vertices = f_base.vertices - elif isinstance(f_base, Brep): - if f_base.displayValue is None or ( - isinstance(f_base.displayValue, list) - and len(f_base.displayValue) == 0 - ): - geometry = {} - return - elif isinstance(f_base.displayValue, list): - faces = f_base.displayValue[0].faces - vertices = f_base.displayValue[0].vertices - else: - faces = f_base.displayValue.faces - vertices = f_base.displayValue.vertices - - count: int = 0 - for i, pt_count in enumerate(faces): - if i != count: - continue - - # old encoding - if pt_count == 0: - pt_count = 3 - elif pt_count == 1: - pt_count = 4 - coord_counts.append([pt_count]) - - for vertex_index in faces[count + 1 : count + 1 + pt_count]: - x = vertices[vertex_index * 3] - y = vertices[vertex_index * 3 + 1] - coords.append([x, y]) - - count += pt_count + 1 - - elif f_base.speckle_type.endswith(".GisFeature") and len(f_base["geometry"]) > 0: # isinstance(f_base, GisFeature) and len(f_base.geometry) > 0: - # GisFeature doesn't deserialize properly, need to check for speckle_type - - if isinstance(f_base.geometry[0], Point): - geometry["type"] = "MultiPoint" - coord_counts.append(None) - - for geom in f_base.geometry: - coords.append([geom.x, geom.y]) - coord_counts.append([1]) - - elif isinstance(f_base.geometry[0], Polyline): - geometry["type"] = "MultiLineString" - coord_counts.append(None) - - for geom in f_base.geometry: - coord_counts.append([]) - local_poly_count = 0 - - for pt in geom.as_points(): - coords.append([pt.x, pt.y]) - local_poly_count += 1 - if len(coords)>2 and geom.closed is True and coords[0] != coords[-1]: - coords.append(coords[0]) - local_poly_count += 1 - - coord_counts[-1].append(local_poly_count) - - elif isinstance(f_base.geometry[0], GisPolygonGeometry): - geometry["type"] = "MultiPolygon" - coord_counts.append(None) - - for polygon in f_base.geometry: - coord_counts.append([]) - boundary_count = 0 - for pt in polygon.boundary.as_points(): - coords.append([pt.x, pt.y]) - boundary_count += 1 - - coord_counts[-1].append(boundary_count) - - for void in polygon.voids: - void_count = 0 - for pt_void in void.as_points(): - coords.append([pt_void.x, pt_void.y]) - void_count += 1 - - coord_counts[-1].append(void_count) - - elif isinstance(f_base, Line): - geometry["type"] = "LineString" - start = [f_base.start.x, f_base.start.y] - end = [f_base.end.x, f_base.end.y] - - coords.extend([start, end]) - coord_counts.append([2]) - - elif isinstance(f_base, Polyline): - geometry["type"] = "LineString" - for pt in f_base.as_points(): - coords.append([pt.x, pt.y]) - if len(coords)>2 and f_base.closed is True and coords[0] != coords[-1]: - coords.append(coords[0]) - - coord_counts.append([len(coords)]) - - elif isinstance(f_base, Curve): - geometry["type"] = "LineString" - #geometry["coordinates"] = [] - for pt in f_base.displayValue.as_points(): - #geometry["coordinates"].append([pt.x, pt.y]) - coords.append([pt.x, pt.y]) - if len(coords)>2 and f_base.displayValue.closed is True and coords[0] != coords[-1]: - coords.append(coords[0]) - - coord_counts.append([len(coords)]) - #geometry["coordinates"] = self.reproject_2d_coords_list( - # geometry["coordinates"] - #) - - else: - geometry = {} - # print(f"Unsupported geometry type: {f_base.speckle_type}") - - return coords, coord_counts - - def reproject_2d_coords_list(self, coords_in: List[list]): - - from pyproj import Transformer - from pyproj import CRS - - coords_offset = self.offset_rotate(copy.deepcopy(coords_in)) - - transformer = Transformer.from_crs( - self.crs, - CRS.from_user_input(4326), - always_xy=True, - ) - return [[pt[0], pt[1]] for pt in transformer.itransform(coords_offset)] - - def offset_rotate(self, coords_in: List[list]): - - from specklepy.objects.units import get_scale_factor_from_string - - scale_factor = 1 - if isinstance(self.crs_dict["units_native"], str): - scale_factor = get_scale_factor_from_string(self.crs_dict["units_native"], "m") - - final_coords = [] - for coord in coords_in: - a = self.crs_dict["rotation"] * math.pi / 180 - x2 = coord[0] * math.cos(a) - coord[1] * math.sin(a) - y2 = coord[0] * math.sin(a) + coord[1] * math.cos(a) - final_coords.append( - [ - scale_factor * (x2 + self.crs_dict["offset_x"]), - scale_factor * (y2 + self.crs_dict["offset_y"]), - ] - ) - - return final_coords - - def assign_crs_to_geojson(self, data: Dict): + def assign_coordinate_system_to_geojson(self, data: Dict): crs = { "crs": { @@ -882,183 +563,6 @@ class SpeckleProvider(BaseProvider): data["crs"] = crs - def assign_props(self, obj, props): - from specklepy.objects.geometry import Base - from specklepy.objects.other import RevitParameter - - all_prop_names = obj.get_member_names() - dynamic_prop_names = obj.get_dynamic_member_names() - typed_prop_names = obj.get_typed_member_names() - - # check if GIS object - if "attributes" in all_prop_names and isinstance(obj["attributes"], Base): - all_prop_names = obj["attributes"].get_dynamic_member_names() - for prop_name in all_prop_names: - - value = getattr(obj["attributes"], prop_name) - - if (prop_name - in [ - "geometry", - "Speckle_ID", - "id", - ] - ): - pass - else: - if ( - isinstance(value, Base) - or isinstance(value, List) - or isinstance(value, Dict) - ): - props[prop_name] = str(value) - else: - props[prop_name] = value - return - - # if Rhino: - elif "userStrings" in dynamic_prop_names and isinstance(obj["userStrings"], Base): - all_prop_names = obj["userStrings"].get_dynamic_member_names() - for prop_name in all_prop_names: - - if prop_name in ["id"]: - continue - - value = getattr(obj["userStrings"], prop_name) - if not isinstance(value, str): - props[prop_name] = str(value) - else: - props[prop_name] = value - return - - for prop_name in obj.get_dynamic_member_names(): - if ( - prop_name - in [ - "displayValue", - "displayStyle", - "renderMaterial", - "revitLinkedModelPath", - "id", - ] - ): - pass - else: - value = getattr(obj, prop_name) - if ( - isinstance(value, Base) - or isinstance(value, List) - or isinstance(value, Dict) - ): - props[prop_name] = str(value) - else: - props[prop_name] = value - - # if Revit: - if "parameters" in all_prop_names and isinstance(obj.parameters, Base): - for prop_name in obj.parameters.get_dynamic_member_names(): - if prop_name in ["id","revitLinkedModelPath"]: - continue - - param = getattr(obj.parameters, prop_name) - if isinstance(param, RevitParameter): - - if not isinstance(param.value, str): - props[prop_name] = str(param.value) - else: - props[prop_name] = param.value - # add after dynamic parameters - - def find_display_obj(self, obj): - - from specklepy.objects.geometry import Base, Mesh - displayVal = None - - if hasattr(obj, 'displayValue'): - displayVal = getattr(obj, 'displayValue') - elif hasattr(obj, '@displayValue'): - displayVal = getattr(obj, '@displayValue') - - if isinstance(displayVal, Base): - return displayVal - elif isinstance(displayVal, List): - faces = [] - verts = [] - colors = [] - for item in displayVal: - if isinstance(item, Mesh): - start_vert_count = int(len(verts)/3) - - # only add colors if existing and incoming colors are valid (same length as vertices) - if len(colors) == start_vert_count and isinstance(item.colors, List) and len(item.colors)== int(len(item.vertices)/3)>0: - colors.extend(item.colors) - else: - colors = [] - - verts.extend(item.vertices) - - count = 0 - for _ in item.faces: - try: - vert_num = item.faces[count] - faces.append(vert_num) - faces.extend([ x+start_vert_count for x in item.faces[count+1 : count+1+vert_num]]) - count += vert_num+1 - except IndexError: - break - else: - return item - - mesh = Mesh.create(faces= faces, vertices=verts, colors=colors) - for prop in displayVal[0].get_member_names(): - if prop not in ["colors", "vertices", "faces"]: - mesh[prop] = getattr(displayVal[0], prop) - return mesh - - return obj - - def assign_color(self, obj_display, props): - from specklepy.objects.geometry import Mesh - - # initialize Speckle Blue color - color = DEFAULT_COLOR - - try: - if hasattr(obj_display, 'renderMaterial'): - color = obj_display['renderMaterial']['diffuse'] - elif hasattr(obj_display, '@renderMaterial'): - color = obj_display['@renderMaterial']['diffuse'] - elif hasattr(obj_display, 'displayStyle'): - color = obj_display['displayStyle']['color'] - elif hasattr(obj_display, '@displayStyle'): - color = obj_display['@displayStyle']['color'] - elif isinstance(obj_display, Mesh) and isinstance(obj_display.colors, List): - sameColors = True - color1 = obj_display.colors[0] - for c in obj_display.colors: - if c != color1: - sameColors = False - break - if sameColors is True: - color = color1 - except Exception as e: - print(e) - - r, g, b = self.get_r_g_b(color) - hex_color = '#%02x%02x%02x' % (r, g, b) - props['color'] = hex_color - - def get_r_g_b(self, rgb: int) -> Tuple[int, int, int]: - r = g = b = 0 - try: - r = (rgb & 0xFF0000) >> 16 - g = (rgb & 0xFF00) >> 8 - b = rgb & 0xFF - except Exception as e: - r = g = b = 150 - return r, g, b - - def get_python_path(self): if sys.platform.startswith("linux"): return sys.executable @@ -1068,75 +572,3 @@ class SpeckleProvider(BaseProvider): else: pythonExec += "/bin/python3" return pythonExec - - def user_application_data_path(self) -> "Path": - """Get the platform specific user configuration folder path""" - from pathlib import Path - - path_override = self._path() - if path_override: - return path_override - - try: - if sys.platform.startswith("win"): - app_data_path = os.getenv("APPDATA") - if not app_data_path: - raise Exception("Cannot get appdata path from environment.") - return Path(app_data_path) - else: - # try getting the standard XDG_DATA_HOME value - # as that is used as an override - app_data_path = os.getenv("XDG_DATA_HOME") - if app_data_path: - return Path(app_data_path) - else: - return self.ensure_folder_exists(Path.home(), ".config") - except Exception as ex: - raise Exception("Failed to initialize user application data path.", ex) - - def ensure_folder_exists(self, base_path: "Path", folder_name: str) -> "Path": - from pathlib import Path - - path = base_path.joinpath(folder_name) - path.mkdir(exist_ok=True, parents=True) - return path - - def _path(self) -> Optional["Path"]: - from pathlib import Path - - """Read the user data path override setting.""" - path_override = os.environ.get(_user_data_env_var) - if path_override: - return Path(path_override) - return None - - def connector_installation_path(self, host_application: str) -> "Path": - connector_installation_path = self.user_speckle_connector_installation_path( - host_application - ) - connector_installation_path.mkdir(exist_ok=True, parents=True) - - # set user modules path at beginning of paths for earlier hit - if sys.path[0] != connector_installation_path: - sys.path.insert(0, str(connector_installation_path)) - - # print(f"Using connector installation path {connector_installation_path}") - return connector_installation_path - - def user_speckle_connector_installation_path(self, host_application: str) -> "Path": - """ - Gets a connector specific installation folder. - In this folder we can put our connector installation and all python packages. - """ - return self.ensure_folder_exists( - self.ensure_folder_exists( - self.user_speckle_folder_path(), "connector_installations" - ), - host_application, - ) - - def user_speckle_folder_path(self) -> "Path": - """Get the folder where the user's Speckle data should be stored.""" - return self.ensure_folder_exists( - self.user_application_data_path(), _application_name - ) diff --git a/pygeoapi/provider/speckle_utils/converter_utils.py b/pygeoapi/provider/speckle_utils/converter_utils.py new file mode 100644 index 0000000..622c48e --- /dev/null +++ b/pygeoapi/provider/speckle_utils/converter_utils.py @@ -0,0 +1,144 @@ + +from typing import Dict, List + + +def assign_geometry(feature: Dict, f_base) -> ( List[List[List[float]]], List[List[None| List[int]]] ): + """Assign geom type and convert object coords into flat lists of coordinates and schema.""" + + from specklepy.objects.geometry import Point, Line, Polyline, Curve, Mesh, Brep + from specklepy.objects.GIS.geometry import GisPolygonGeometry + from specklepy.objects.GIS.GisFeature import GisFeature + + geometry = feature["geometry"] + coords = [] + coord_counts = [] + + if isinstance(f_base, Point): + geometry["type"] = "MultiPoint" + coord_counts.append(None) + + coords.append([f_base.x, f_base.y]) + coord_counts.append([1]) + + elif isinstance(f_base, Mesh) or isinstance(f_base, Brep): + geometry["type"] = "MultiPolygon" + coord_counts.append(None) # as an indicator of a MultiPolygon + + faces = [] + vertices = [] + if isinstance(f_base, Mesh): + faces = f_base.faces + vertices = f_base.vertices + elif isinstance(f_base, Brep): + if f_base.displayValue is None or ( + isinstance(f_base.displayValue, list) + and len(f_base.displayValue) == 0 + ): + geometry = {} + return + elif isinstance(f_base.displayValue, list): + faces = f_base.displayValue[0].faces + vertices = f_base.displayValue[0].vertices + else: + faces = f_base.displayValue.faces + vertices = f_base.displayValue.vertices + + count: int = 0 + for i, pt_count in enumerate(faces): + if i != count: + continue + + # old encoding + if pt_count == 0: + pt_count = 3 + elif pt_count == 1: + pt_count = 4 + coord_counts.append([pt_count]) + + for vertex_index in faces[count + 1 : count + 1 + pt_count]: + x = vertices[vertex_index * 3] + y = vertices[vertex_index * 3 + 1] + coords.append([x, y]) + + count += pt_count + 1 + + elif f_base.speckle_type.endswith(".GisFeature") and len(f_base["geometry"]) > 0: # isinstance(f_base, GisFeature) and len(f_base.geometry) > 0: + # GisFeature doesn't deserialize properly, need to check for speckle_type + + if isinstance(f_base.geometry[0], Point): + geometry["type"] = "MultiPoint" + coord_counts.append(None) + + for geom in f_base.geometry: + coords.append([geom.x, geom.y]) + coord_counts.append([1]) + + elif isinstance(f_base.geometry[0], Polyline): + geometry["type"] = "MultiLineString" + coord_counts.append(None) + + for geom in f_base.geometry: + coord_counts.append([]) + local_poly_count = 0 + + for pt in geom.as_points(): + coords.append([pt.x, pt.y]) + local_poly_count += 1 + if len(coords)>2 and geom.closed is True and coords[0] != coords[-1]: + coords.append(coords[0]) + local_poly_count += 1 + + coord_counts[-1].append(local_poly_count) + + elif isinstance(f_base.geometry[0], GisPolygonGeometry): + geometry["type"] = "MultiPolygon" + coord_counts.append(None) + + for polygon in f_base.geometry: + coord_counts.append([]) + boundary_count = 0 + for pt in polygon.boundary.as_points(): + coords.append([pt.x, pt.y]) + boundary_count += 1 + + coord_counts[-1].append(boundary_count) + + for void in polygon.voids: + void_count = 0 + for pt_void in void.as_points(): + coords.append([pt_void.x, pt_void.y]) + void_count += 1 + + coord_counts[-1].append(void_count) + + elif isinstance(f_base, Line): + geometry["type"] = "LineString" + start = [f_base.start.x, f_base.start.y] + end = [f_base.end.x, f_base.end.y] + + coords.extend([start, end]) + coord_counts.append([2]) + + elif isinstance(f_base, Polyline): + geometry["type"] = "LineString" + for pt in f_base.as_points(): + coords.append([pt.x, pt.y]) + if len(coords)>2 and f_base.closed is True and coords[0] != coords[-1]: + coords.append(coords[0]) + + coord_counts.append([len(coords)]) + + elif isinstance(f_base, Curve): + geometry["type"] = "LineString" + for pt in f_base.displayValue.as_points(): + coords.append([pt.x, pt.y]) + if len(coords)>2 and f_base.displayValue.closed is True and coords[0] != coords[-1]: + coords.append(coords[0]) + + coord_counts.append([len(coords)]) + + else: + geometry = {} + # print(f"Unsupported geometry type: {f_base.speckle_type}") + + return coords, coord_counts diff --git a/pygeoapi/provider/speckle_utils/coords_utils.py b/pygeoapi/provider/speckle_utils/coords_utils.py new file mode 100644 index 0000000..7abefe4 --- /dev/null +++ b/pygeoapi/provider/speckle_utils/coords_utils.py @@ -0,0 +1,102 @@ + +import copy +import math +from typing import List + + +def reproject_bulk(self, all_coords: List[List[List[float]]], all_coord_counts: List[List[None| List[int]]], geometries) -> None: + """Reproject coordinates and assign to corresponding geometries.""" + + from datetime import datetime + + # reproject all coords + time1 = datetime.now() + flat_coords = reproject_2d_coords_list(self, all_coords) + time2 = datetime.now() + print(f"Reproject time: {(time2-time1).total_seconds()}") + + # define type of features + feat_coord_group_is_multi = [True if None in x else False for x in all_coord_counts] + + feat_coord_group_counts = [[ y for y in x if y is not None] for x in all_coord_counts] + feat_coord_group_counts_per_part = [[ sum(y) for y in x if y is not None] for x in all_coord_counts] + + feat_coord_group_flat_counts: List[int] = [sum([ sum(y) for y in x if y is not None]) for x in all_coord_counts] + + feat_coord_groups = [flat_coords[sum(feat_coord_group_flat_counts[:i]):sum(feat_coord_group_flat_counts[:i])+x] for i, x in enumerate(feat_coord_group_flat_counts)] + + for i, geometry in enumerate(geometries): + geometry["coordinates"] = [] + if feat_coord_group_is_multi[i] is False: + + if geometry["type"] == "Point": + geometry["coordinates"].extend(feat_coord_groups[i][0]) + else: + geometry["coordinates"].extend(feat_coord_groups[i]) + else: + polygon_parts = [] + local_coords_count: List[List[int]] = feat_coord_group_counts[i] + local_coords_count_flat: List[int] = feat_coord_group_counts_per_part[i] + local_flat_coords: List[int] = feat_coord_groups[i] + + for c, poly_part_count_lists in enumerate(local_coords_count): + poly_part = [] + start_index = sum(local_coords_count_flat[:c]) if c!=0 else 0 # all used coords in all parts + + for part_count in poly_part_count_lists: + range_coords_indices = range(start_index, start_index + part_count) + + if geometry["type"] == "MultiPoint": + poly_part.extend([local_flat_coords[ind] for ind in range_coords_indices]) + else: + poly_part.append([local_flat_coords[ind] for ind in range_coords_indices]) + + start_index += part_count + + if geometry["type"] in ["MultiPoint","MultiLineString"] : + polygon_parts.extend(poly_part) + else: + polygon_parts.append(poly_part) + + geometry["coordinates"].extend(polygon_parts) + + time3 = datetime.now() + print(f"Construct back geometry time: {(time3-time2).total_seconds()}") + +def reproject_2d_coords_list(self, coords_in: List[List[float]]) -> List[List[float]]: + """Return coordinates in a CRS of SpeckleProvider.""" + + from pyproj import Transformer + from pyproj import CRS + + coords_offset = offset_rotate(self, copy.deepcopy(coords_in)) + + transformer = Transformer.from_crs( + self.crs, + CRS.from_user_input(4326), + always_xy=True, + ) + return [[pt[0], pt[1]] for pt in transformer.itransform(coords_offset)] + +def offset_rotate(self, coords_in: List[list]) -> List[List[float]]: + """Apply offset and rotation to coordinates, according to SpeckleProvider CRS_dict.""" + + from specklepy.objects.units import get_scale_factor_from_string + + scale_factor = 1 + if isinstance(self.crs_dict["units_native"], str): + scale_factor = get_scale_factor_from_string(self.crs_dict["units_native"], "m") + + final_coords = [] + for coord in coords_in: + a = self.crs_dict["rotation"] * math.pi / 180 + x2 = coord[0] * math.cos(a) - coord[1] * math.sin(a) + y2 = coord[0] * math.sin(a) + coord[1] * math.cos(a) + final_coords.append( + [ + scale_factor * (x2 + self.crs_dict["offset_x"]), + scale_factor * (y2 + self.crs_dict["offset_y"]), + ] + ) + + return final_coords diff --git a/pygeoapi/provider/speckle_utils/crs_utils.py b/pygeoapi/provider/speckle_utils/crs_utils.py new file mode 100644 index 0000000..07e9336 --- /dev/null +++ b/pygeoapi/provider/speckle_utils/crs_utils.py @@ -0,0 +1,39 @@ + +def create_crs_from_wkt(self: "SpeckleProvider", wkt: str | None) -> None: + """Create and assign CRS object from WKT string.""" + + from pyproj import CRS + self.crs = CRS.from_user_input(wkt) + + +def create_crs_from_authid(self: "SpeckleProvider", authid: str | None) -> None: + """Create and assign CRS object from Authority ID.""" + + from pyproj import CRS + + crs_obj = CRS.from_string(authid) + self.crs = crs_obj + + +def create_crs_default(self: "SpeckleProvider") -> None: + """Create and assign custom CRS using SpeckleProvider Lat & Lon.""" + + from pyproj import CRS + + wkt = f'PROJCS["SpeckleCRS_latlon_{self.lat}_{self.lon}", GEOGCS["GCS_WGS_1984", DATUM["D_WGS_1984", SPHEROID["WGS_1984", 6378137.0, 298.257223563]], PRIMEM["Greenwich", 0.0], UNIT["Degree", 0.0174532925199433]], PROJECTION["Transverse_Mercator"], PARAMETER["False_Easting", 0.0], PARAMETER["False_Northing", 0.0], PARAMETER["Central_Meridian", {self.lon}], PARAMETER["Scale_Factor", 1.0], PARAMETER["Latitude_Of_Origin", {self.lat}], UNIT["Meter", 1.0]]' + crs_obj = CRS.from_user_input(wkt) + self.crs = crs_obj + +def create_crs_dict(self: "SpeckleProvider", offset_x, offset_y, displayUnits: str | None) -> None: + """Create and assign CRS object from WKT string.""" + + if self.crs is not None: + self.crs_dict = { + "wkt": self.crs.to_wkt(), + "offset_x": offset_x, + "offset_y": offset_y, + "rotation": self.north_degrees, + "units_native": displayUnits, + "obj": self.crs, + } + diff --git a/pygeoapi/provider/speckle_utils/display_utils.py b/pygeoapi/provider/speckle_utils/display_utils.py new file mode 100644 index 0000000..d9a1da9 --- /dev/null +++ b/pygeoapi/provider/speckle_utils/display_utils.py @@ -0,0 +1,167 @@ + +from typing import Dict, List, Tuple + +DEFAULT_COLOR = (255 << 24) + (150 << 16) + (150 << 8) + 150 + +def find_display_obj(obj) -> Tuple["Base", "Base"]: + """Get displayable object.""" + + from specklepy.objects.geometry import Base, Mesh + + displayVal = obj + displayValForColor = obj + + if hasattr(obj, 'displayValue'): + displayValForColor = getattr(obj, 'displayValue') + elif hasattr(obj, '@displayValue'): + displayValForColor = getattr(obj, '@displayValue') + + if isinstance(displayValForColor, List): + faces = [] + verts = [] + colors = [] + for item in displayValForColor: + if isinstance(item, Mesh): + start_vert_count = int(len(verts)/3) + + # only add colors if existing and incoming colors are valid (same length as vertices) + if len(colors) == start_vert_count and isinstance(item.colors, List) and len(item.colors)== int(len(item.vertices)/3)>0: + colors.extend(item.colors) + else: + colors = [] + + verts.extend(item.vertices) + + count = 0 + for _ in item.faces: + try: + vert_num = item.faces[count] + faces.append(vert_num) + faces.extend([ x+start_vert_count for x in item.faces[count+1 : count+1+vert_num]]) + count += vert_num+1 + except IndexError: + break + elif item is not None: + displayValForColor = item + + mesh = Mesh.create(faces= faces, vertices=verts, colors=colors) + for prop in displayValForColor[0].get_member_names(): + if prop not in ["colors", "vertices", "faces"]: + mesh[prop] = getattr(displayValForColor[0], prop) + displayValForColor = mesh + + displayVal = displayValForColor + print(displayValForColor) + + # if not searching for colored object, return GisFeatures as is + if obj.speckle_type.endswith(".GisFeature"): + displayVal = obj + + return displayVal, displayValForColor + +def get_display_units(context_list: List["TraversalContext"]) -> None | str: + """Get units from either of displayable objects.""" + + from specklepy.objects.geometry import Base + + displayUnits = None + + for item in context_list: + if hasattr(item.current, "displayValue"): + try: + displayVal = item.current["displayValue"] + except: + displayVal = item.current.displayValue + if isinstance(displayVal, list) and len(displayVal)>0: + displayUnits = displayVal[0].units + break + elif isinstance(displayVal, Base): + displayUnits = item.current.units + break + else: + if item.current.units is not None: + displayUnits = item.current.units + break + + return displayUnits + +def set_default_color(context_list: List["TraversalContext"]) -> None: + """Get and set the default color.""" + + from specklepy.objects.GIS.layers import VectorLayer + + global DEFAULT_COLOR + DEFAULT_COLOR = (255 << 24) + (150 << 16) + (150 << 8) + 150 + + for item in context_list: + # for GIS-commits, use default blue color + if isinstance(item.current, VectorLayer): + DEFAULT_COLOR = (255 << 24) + (10 << 16) + (132 << 8) + 255 + break + +def assign_color(obj_display, props) -> None: + """Get and assign color to feature displayProperties.""" + + from specklepy.objects.geometry import Mesh + + # initialize Speckle Blue color + color = DEFAULT_COLOR + + try: + if hasattr(obj_display, 'renderMaterial'): + color = obj_display['renderMaterial']['diffuse'] + elif hasattr(obj_display, '@renderMaterial'): + color = obj_display['@renderMaterial']['diffuse'] + elif hasattr(obj_display, 'displayStyle'): + color = obj_display['displayStyle']['color'] + elif hasattr(obj_display, '@displayStyle'): + color = obj_display['@displayStyle']['color'] + elif isinstance(obj_display, Mesh) and isinstance(obj_display.colors, List): + sameColors = True + color1 = obj_display.colors[0] + for c in obj_display.colors: + if c != color1: + sameColors = False + break + if sameColors is True: + color = color1 + except Exception as e: + print(e) + + r, g, b = get_r_g_b(color) + hex_color = '#%02x%02x%02x' % (r, g, b) + props['color'] = hex_color + +def get_r_g_b(rgb: int) -> Tuple[int, int, int]: + """Get R, G, B values from int.""" + + r = g = b = 0 + try: + r = (rgb & 0xFF0000) >> 16 + g = (rgb & 0xFF00) >> 8 + b = rgb & 0xFF + except Exception as e: + r = g = b = 150 + return r, g, b + +def assign_display_properties(feature: Dict, f_base: "Base", obj_display: "Base") -> None: + """Assign displayProperties to the feature.""" + + from specklepy.objects.geometry import Mesh, Brep + + feature["displayProperties"] = {} + assign_color(obj_display, feature["displayProperties"]) + + # other properties for rendering + if isinstance(f_base, Mesh) or isinstance(f_base, Brep): + feature["displayProperties"]['lineWidth'] = 0.3 + elif "Line" in feature["geometry"]["type"]: + feature["displayProperties"]['lineWidth'] = 3 + else: + feature["displayProperties"]['lineWidth'] = 1 + + # if "Point" in feature["geometry"]["type"]: + try: + feature["displayProperties"]["radius"] = feature["properties"]["weight"] + except: + feature["displayProperties"]["radius"] = 10 diff --git a/pygeoapi/provider/speckle_utils/install_utils.py b/pygeoapi/provider/speckle_utils/install_utils.py new file mode 100644 index 0000000..fa91730 --- /dev/null +++ b/pygeoapi/provider/speckle_utils/install_utils.py @@ -0,0 +1,79 @@ + +import os +import sys +from typing import Optional + +_user_data_env_var = "SPECKLE_USERDATA_PATH" +_application_name = "Speckle" + +def user_application_data_path() -> "Path": + """Get the platform specific user configuration folder path""" + from pathlib import Path + + path_override = _path() + if path_override: + return path_override + + try: + if sys.platform.startswith("win"): + app_data_path = os.getenv("APPDATA") + if not app_data_path: + raise Exception("Cannot get appdata path from environment.") + return Path(app_data_path) + else: + # try getting the standard XDG_DATA_HOME value + # as that is used as an override + app_data_path = os.getenv("XDG_DATA_HOME") + if app_data_path: + return Path(app_data_path) + else: + return ensure_folder_exists(Path.home(), ".config") + except Exception as ex: + raise Exception("Failed to initialize user application data path.", ex) + +def ensure_folder_exists(base_path: "Path", folder_name: str) -> "Path": + from pathlib import Path + + path = base_path.joinpath(folder_name) + path.mkdir(exist_ok=True, parents=True) + return path + +def _path() -> Optional["Path"]: + from pathlib import Path + + """Read the user data path override setting.""" + path_override = os.environ.get(_user_data_env_var) + if path_override: + return Path(path_override) + return None + +def connector_installation_path(host_application: str) -> "Path": + connector_installation_path = user_speckle_connector_installation_path( + host_application + ) + connector_installation_path.mkdir(exist_ok=True, parents=True) + + # set user modules path at beginning of paths for earlier hit + if sys.path[0] != connector_installation_path: + sys.path.insert(0, str(connector_installation_path)) + + # print(f"Using connector installation path {connector_installation_path}") + return connector_installation_path + +def user_speckle_connector_installation_path(host_application: str) -> "Path": + """ + Gets a connector specific installation folder. + In this folder we can put our connector installation and all python packages. + """ + return ensure_folder_exists( + ensure_folder_exists( + user_speckle_folder_path(), "connector_installations" + ), + host_application, + ) + +def user_speckle_folder_path() -> "Path": + """Get the folder where the user's Speckle data should be stored.""" + return ensure_folder_exists( + user_application_data_path(), _application_name + ) diff --git a/pygeoapi/provider/speckle_utils/GisFeature.py b/pygeoapi/provider/speckle_utils/patch/GisFeature.py similarity index 100% rename from pygeoapi/provider/speckle_utils/GisFeature.py rename to pygeoapi/provider/speckle_utils/patch/GisFeature.py diff --git a/pygeoapi/provider/speckle_utils/patch/__init__.py b/pygeoapi/provider/speckle_utils/patch/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pygeoapi/provider/speckle_utils/patch_specklepy.py b/pygeoapi/provider/speckle_utils/patch/patch_specklepy.py similarity index 97% rename from pygeoapi/provider/speckle_utils/patch_specklepy.py rename to pygeoapi/provider/speckle_utils/patch/patch_specklepy.py index 7c17492..acb49e6 100644 --- a/pygeoapi/provider/speckle_utils/patch_specklepy.py +++ b/pygeoapi/provider/speckle_utils/patch/patch_specklepy.py @@ -27,12 +27,12 @@ def get_transport_path(): return str(credentials_path) def get_transport_path_src(): - credentials_path = Path(get_pygeoapi_path(), "provider", "speckle_utils", "server.py") + credentials_path = Path(get_pygeoapi_path(), "provider", "speckle_utils", "patch", "server.py") return str(credentials_path) def get_gis_feature_path_src(): - credentials_path = Path(get_pygeoapi_path(), "provider", "speckle_utils", "GisFeature.py") + credentials_path = Path(get_pygeoapi_path(), "provider", "speckle_utils", "patch", "GisFeature.py") return str(credentials_path) diff --git a/pygeoapi/provider/speckle_utils/server.py b/pygeoapi/provider/speckle_utils/patch/server.py similarity index 100% rename from pygeoapi/provider/speckle_utils/server.py rename to pygeoapi/provider/speckle_utils/patch/server.py diff --git a/pygeoapi/provider/speckle_utils/props_utils.py b/pygeoapi/provider/speckle_utils/props_utils.py new file mode 100644 index 0000000..c222548 --- /dev/null +++ b/pygeoapi/provider/speckle_utils/props_utils.py @@ -0,0 +1,102 @@ + +from typing import Dict, List + + +def assign_props(obj: "Base", props: Dict): + """Assign properties to the feature from Base object.""" + + from specklepy.objects.geometry import Base + from specklepy.objects.other import RevitParameter + + all_prop_names = obj.get_member_names() + dynamic_prop_names = obj.get_dynamic_member_names() + typed_prop_names = obj.get_typed_member_names() + + # check if GIS object + if "attributes" in all_prop_names and isinstance(obj["attributes"], Base): + all_prop_names = obj["attributes"].get_dynamic_member_names() + for prop_name in all_prop_names: + + value = getattr(obj["attributes"], prop_name) + + if (prop_name + in [ + "geometry", + "Speckle_ID", + "id", + ] + ): + pass + else: + if ( + isinstance(value, Base) + or isinstance(value, List) + or isinstance(value, Dict) + ): + props[prop_name] = str(value) + else: + props[prop_name] = value + return + + # if Rhino: + elif "userStrings" in dynamic_prop_names and isinstance(obj["userStrings"], Base): + all_prop_names = obj["userStrings"].get_dynamic_member_names() + for prop_name in all_prop_names: + + if prop_name in ["id"]: + continue + + value = getattr(obj["userStrings"], prop_name) + if not isinstance(value, str): + props[prop_name] = str(value) + else: + props[prop_name] = value + return + + for prop_name in obj.get_dynamic_member_names(): + if ( + prop_name + in [ + "displayValue", + "displayStyle", + "renderMaterial", + "revitLinkedModelPath", + "id", + ] + ): + pass + else: + value = getattr(obj, prop_name) + if ( + isinstance(value, Base) + or isinstance(value, List) + or isinstance(value, Dict) + ): + props[prop_name] = str(value) + else: + props[prop_name] = value + + # if Revit: + if "parameters" in all_prop_names and isinstance(obj.parameters, Base): + for prop_name in obj.parameters.get_dynamic_member_names(): + if prop_name in ["id","revitLinkedModelPath"]: + continue + + param = getattr(obj.parameters, prop_name) + if isinstance(param, RevitParameter): + + if not isinstance(param.value, str): + props[prop_name] = str(param.value) + else: + props[prop_name] = param.value + # add after dynamic parameters + + +def assign_missing_props(features: Dict, all_props: List[str]) -> None: + """Assign NA values to missing properties.""" + + # assign all props to all features + for feat in features: + for prop in all_props: + if prop not in list(feat["properties"].keys()): + feat["properties"][prop] = "N/A"