SensorThings API provider cleanup (#1807)

* Add support for ObservedProperties OAF and custom expand of entities

* Update sensorthings.py

* Respond to feedback

* Use `pygeoapi.get_config` for SensorThings Intralinking
This commit is contained in:
Benjamin Webb
2024-09-11 15:29:40 -04:00
committed by GitHub
parent 28618034b8
commit 0677c2e646
+143 -113
View File
@@ -30,14 +30,14 @@
# =================================================================
from json.decoder import JSONDecodeError
import os
import logging
from requests import Session
from pygeoapi.config import get_config
from pygeoapi.provider.base import (
BaseProvider, ProviderQueryError, ProviderConnectionError)
from pygeoapi.util import (
yaml_load, url_join, get_provider_default, crs_transform, get_base_url)
url_join, get_provider_default, crs_transform, get_base_url)
LOGGER = logging.getLogger(__name__)
@@ -51,10 +51,10 @@ ENTITY = {
_EXPAND = {
'Things': 'Locations,Datastreams',
'Observations': 'Datastream,FeatureOfInterest',
'ObservedProperties': 'Datastreams/Thing/Locations',
'Datastreams': """
Sensor
,ObservedProperty
,Thing
,Thing/Locations
,Observations(
$select=@iot.id;
@@ -71,6 +71,7 @@ EXPAND = {k: ''.join(v.split()).replace('_', ' ')
class SensorThingsProvider(BaseProvider):
"""SensorThings API (STA) Provider"""
expand = EXPAND
def __init__(self, provider_def):
"""
@@ -82,69 +83,12 @@ class SensorThingsProvider(BaseProvider):
:returns: pygeoapi.provider.sensorthings.SensorThingsProvider
"""
LOGGER.debug('Setting SensorThings API (STA) provider')
self.linked_entity = {}
super().__init__(provider_def)
self.data.rstrip('/')
try:
self.entity = provider_def['entity']
self._url = url_join(self.data, self.entity)
except KeyError:
LOGGER.debug('Attempting to parse Entity from provider data')
if not self._get_entity(self.data):
raise RuntimeError('Entity type required')
self.entity = self._get_entity(self.data)
self._url = self.data
self.data = self._url.rstrip(f'/{self.entity}')
self._generate_mappings(provider_def)
LOGGER.debug(f'STA endpoint: {self.data}, Entity: {self.entity}')
# Default id
if self.id_field:
LOGGER.debug(f'Using id field: {self.id_field}')
else:
LOGGER.debug('Using default @iot.id for id field')
self.id_field = '@iot.id'
# Create intra-links
self.links = {}
self.intralink = provider_def.get('intralink', False)
if self.intralink and provider_def.get('rel_link'):
# For pytest
self.rel_link = provider_def['rel_link']
elif self.intralink:
# Read from pygeoapi config
with open(os.getenv('PYGEOAPI_CONFIG'), encoding='utf8') as fh:
CONFIG = yaml_load(fh)
self.rel_link = get_base_url(CONFIG)
for (name, rs) in CONFIG['resources'].items():
pvs = rs.get('providers')
if pvs is None:
LOGGER.debug(f'Skipping collection: {name}')
continue
p = get_provider_default(pvs)
e = p.get('entity') or self._get_entity(p['data'])
if any([
not pvs, # No providers in resource
not p.get('intralink'), # No configuration for intralinks
not e, # No STA entity found
self.data not in p.get('data') # No common STA endpoint
]):
continue
if p.get('uri_field'):
LOGGER.debug(f'Linking {e} with field: {p["uri_field"]}')
else:
LOGGER.debug(f'Linking {e} with collection: {name}')
self.links[e] = {
'cnm': name, # OAPI collection name,
'cid': p.get('id_field', '@iot.id'), # OAPI id_field
'uri': p.get('uri_field') # STA uri_field
}
# Start session
self.http = Session()
self.get_fields()
@@ -277,17 +221,19 @@ class SensorThingsProvider(BaseProvider):
return fc
def _make_feature(self, entity, select_properties=[], skip_geometry=False):
def _make_feature(self, feature, select_properties=[], skip_geometry=False,
entity=None):
"""
Private function: Create feature from entity
:param entity: `dict` of STA entity
:param feature: `dict` of STA entity
:param select_properties: list of property names
:param skip_geometry: bool of whether to skip geometry (default False)
:param entity: SensorThings entity name
:returns: dict of GeoJSON Feature
"""
_ = entity.pop(self.id_field)
_ = feature.pop(self.id_field)
id = f"'{_}'" if isinstance(_, str) else str(_)
f = {
'type': 'Feature', 'id': id, 'properties': {}, 'geometry': None
@@ -295,28 +241,35 @@ class SensorThingsProvider(BaseProvider):
# Make geometry
if not skip_geometry:
f['geometry'] = self._geometry(entity)
f['geometry'] = self._geometry(feature, entity)
# Fill properties block
try:
f['properties'] = self._expand_properties(
entity, select_properties)
feature, select_properties, entity)
except KeyError as err:
LOGGER.error(err)
raise ProviderQueryError(err)
return f
def _get_response(self, url, params={}):
def _get_response(self, url, params={}, entity=None, expand=None):
"""
Private function: Get STA response
:param url: request url
:param params: query parameters
:param entity: SensorThings entity name
:param expand: SensorThings expand query
:returns: STA response
"""
params.update({'$expand': EXPAND[self.entity]})
if expand:
params.update({'$expand': expand})
else:
entity_ = entity or self.entity
params.update({'$expand': self.expand[entity_]})
r = self.http.get(url, params=params)
@@ -332,13 +285,15 @@ class SensorThingsProvider(BaseProvider):
return response
def _make_filter(self, properties, bbox=[], datetime_=None):
def _make_filter(self, properties, bbox=[], datetime_=None,
entity=None):
"""
Private function: Make STA filter from query properties
:param properties: list of tuples (name, value)
:param bbox: bounding box [minx,miny,maxx,maxy]
:param datetime_: temporal (datestamp or extent)
:param entity: SensorThings entity name
:returns: STA $filter string of properties
"""
@@ -350,16 +305,8 @@ class SensorThingsProvider(BaseProvider):
ret.append(f'{name} eq {value}')
if bbox:
minx, miny, maxx, maxy = bbox
bbox_ = f'POLYGON (({minx} {miny}, {maxx} {miny}, \
{maxx} {maxy}, {minx} {maxy}, {minx} {miny}))'
if self.entity == 'Things':
loc = 'Locations/location'
elif self.entity == 'Datastreams':
loc = 'Thing/Locations/location'
elif self.entity == 'Observations':
loc = 'FeatureOfInterest/feature'
ret.append(f"st_within({loc}, geography'{bbox_}')")
entity_ = entity or self.entity
ret.append(self._make_bbox(bbox, entity_))
if datetime_ is not None:
if self.time_field is None:
@@ -378,6 +325,20 @@ class SensorThingsProvider(BaseProvider):
return ' and '.join(ret)
@staticmethod
def _make_bbox(bbox, entity):
minx, miny, maxx, maxy = bbox
bbox_ = f'POLYGON(({minx} {miny},{maxx} {miny},{maxx} {maxy},{minx} {maxy},{minx} {miny}))' # noqa
if entity == 'Things':
loc = 'Locations/location'
elif entity == 'Datastreams':
loc = 'Thing/Locations/location'
elif entity == 'Observations':
loc = 'FeatureOfInterest/feature'
elif entity == 'ObservedProperties':
loc = 'Datastreams/observedArea'
return f"st_within({loc},geography'{bbox_}')"
def _make_orderby(self, sortby):
"""
Private function: Make STA filter from query properties
@@ -398,79 +359,85 @@ class SensorThingsProvider(BaseProvider):
return ','.join(ret)
def _geometry(self, entity):
def _geometry(self, feature, entity=None):
"""
Private function: Retrieve STA geometry
:param entity: SensorThings entity
:param feature: SensorThings entity
:param entity: SensorThings entity name
:returns: GeoJSON Geometry for feature
"""
entity_ = entity or self.entity
try:
if self.entity == 'Things':
return entity['Locations'][0]['location']
if entity_ == 'Things':
return feature['Locations'][0]['location']
elif self.entity == 'Observations':
return entity['FeatureOfInterest'].pop('feature')
elif entity_ == 'Observations':
return feature['FeatureOfInterest'].pop('feature')
elif self.entity == 'Datastreams':
elif entity_ == 'Datastreams':
try:
return entity['Observations'][0]['FeatureOfInterest'].pop('feature') # noqa
return feature['Observations'][0]['FeatureOfInterest'].pop('feature') # noqa
except (KeyError, IndexError):
return entity['Thing'].pop('Locations')[0]['location']
return feature['Thing'].pop('Locations')[0]['location']
elif entity_ == 'ObservedProperties':
return feature['Datastreams'][0]['Thing']['Locations'][0]['location'] # noqa
except (KeyError, IndexError):
LOGGER.warning('No geometry found')
return None
def _expand_properties(self, entity, keys=(), uri=''):
def _expand_properties(self, feature, keys=(), uri='',
entity=None):
"""
Private function: Parse STA entity into feature
:param entity: SensorThings entity
:param feature: `dict` of SensorThings entity
:param keys: keys used in properties block
:param uri: uri of STA entity
:param entity: SensorThings entity name
:returns: dict of SensorThings feature properties
"""
LOGGER.debug('Adding extra properties')
# Properties filter & display
keys = (() if not self.properties and not keys else
set(self.properties) | set(keys))
if self.entity == 'Things':
self._expand_location(entity)
elif 'Thing' in entity.keys():
self._expand_location(entity['Thing'])
entity = entity or self.entity
if entity == 'Things':
self._expand_location(feature)
elif 'Thing' in feature.keys():
self._expand_location(feature['Thing'])
# Retain URI if present
if entity.get('properties') and self.uri_field:
uri = entity['properties']
if feature.get('properties') and self.uri_field:
uri = feature['properties']
# Create intra links
LOGGER.debug('Creating intralinks')
for k, v in entity.items():
if k in self.links:
entity[k] = [self._get_uri(_v, **self.links[k]) for _v in v]
for k, v in feature.items():
if k in self.linked_entity:
feature[k] = [self._get_uri(_v, **self.linked_entity[k])
for _v in v]
LOGGER.debug(f'Created link for {k}')
elif f'{k}s' in self.links:
entity[k] = self._get_uri(v, **self.links[f'{k}s'])
elif f'{k}s' in self.linked_entity:
feature[k] = \
self._get_uri(v, **self.linked_entity[f'{k}s'])
LOGGER.debug(f'Created link for {k}')
# Make properties block
LOGGER.debug('Making properties block')
if entity.get('properties'):
entity.update(entity.pop('properties'))
if feature.get('properties'):
feature.update(feature.pop('properties'))
if keys:
ret = {k: entity.pop(k) for k in keys}
entity = ret
ret = {k: feature.pop(k) for k in keys}
feature = ret
if self.uri_field is not None and uri != '':
entity[self.uri_field] = uri
feature[self.uri_field] = uri
return entity
return feature
@staticmethod
def _expand_location(entity):
@@ -522,5 +489,68 @@ class SensorThingsProvider(BaseProvider):
else:
return ''
def _generate_mappings(self, provider_def: dict):
"""
Generate mappings for the STA entity and set up intra-links.
This function sets up the necessary mappings and configurations for
the STA entity based on the provided provider definition.
:param provider_def: `dict` of provider definition containing
configuration details for the STA entity.
"""
self.data.rstrip('/')
try:
self.entity = provider_def['entity']
self._url = url_join(self.data, self.entity)
except KeyError:
LOGGER.debug('Attempting to parse Entity from provider data')
if not self._get_entity(self.data):
raise RuntimeError('Entity type required')
self.entity = self._get_entity(self.data)
self._url = self.data
self.data = self._url.rstrip(f'/{self.entity}')
# Default id
if self.id_field:
LOGGER.debug(f'Using id field: {self.id_field}')
else:
LOGGER.debug('Using default @iot.id for id field')
self.id_field = '@iot.id'
# Create intra-links
self.intralink = provider_def.get('intralink', False)
if self.intralink and provider_def.get('rel_link'):
# For pytest
self.rel_link = provider_def['rel_link']
elif self.intralink:
# Read from pygeoapi config
CONFIG = get_config()
self.rel_link = get_base_url(CONFIG)
for name, rs in CONFIG['resources'].items():
pvs = rs.get('providers')
p = get_provider_default(pvs)
e = p.get('entity') or self._get_entity(p['data'])
if any([
not pvs, # No providers in resource
not p.get('intralink'), # No configuration for intralinks
not e, # No STA entity found
self.data not in p.get('data') # No common STA endpoint
]):
continue
if p.get('uri_field'):
LOGGER.debug(f'Linking {e} with field: {p["uri_field"]}')
else:
LOGGER.debug(f'Linking {e} with collection: {name}')
self.linked_entity[e] = {
'cnm': name, # OAPI collection name,
'cid': p.get('id_field', '@iot.id'), # OAPI id_field
'uri': p.get('uri_field') # STA uri_field
}
def __repr__(self):
return f'<SensorThingsProvider> {self.data}, {self.entity}'