add support for OpenSearch provider (#1844)

This commit is contained in:
Tom Kralidis
2024-11-20 18:49:04 -05:00
committed by GitHub
parent 065ef3a495
commit e1fec87d6f
9 changed files with 1216 additions and 4 deletions
+8
View File
@@ -62,6 +62,12 @@ jobs:
host node port: 9300
node port: 9300
discovery type: 'single-node'
- name: Install and run OpenSearch 📦
uses: esmarkowski/opensearch-github-action@v1.0.0
with:
version: 2.12.0
security-disabled: true
port: 9209
- name: Install and run MongoDB
uses: supercharge/mongodb-github-action@1.5.0
with:
@@ -99,6 +105,7 @@ jobs:
- name: setup test data ⚙️
run: |
python3 tests/load_es_data.py tests/data/ne_110m_populated_places_simple.geojson geonameid
python3 tests/load_opensearch_data.py tests/data/ne_110m_populated_places_simple.geojson geonameid
python3 tests/load_mongo_data.py tests/data/ne_110m_populated_places_simple.geojson
gunzip < tests/data/hotosm_bdi_waterways.sql.gz | psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test
psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test -f tests/data/dummy_data.sql
@@ -117,6 +124,7 @@ jobs:
pytest tests/test_csv__provider.py
pytest tests/test_django.py
pytest tests/test_elasticsearch__provider.py
pytest tests/test_opensearch__provider.py
pytest tests/test_esri_provider.py
pytest tests/test_filesystem_provider.py
pytest tests/test_geojson_provider.py
@@ -26,6 +26,7 @@ parameters.
`GeoJSON`_,✅/✅,results/hits,❌,❌,❌,✅,❌,❌,✅
`MongoDB`_,✅/❌,results,✅,✅,✅,✅,❌,❌,✅
`OGR`_,✅/❌,results/hits,✅,❌,❌,✅,❌,❌,✅
`OpenSearch`_,✅/✅,results/hits,✅,✅,✅,✅,✅,✅,✅
`Oracle`_,✅/✅,results/hits,✅,❌,✅,✅,❌,❌,✅
`Parquet`_,✅/✅,results/hits,✅,✅,❌,✅,❌,❌,✅
`PostgreSQL`_,✅/✅,results/hits,✅,✅,✅,✅,✅,❌,✅
@@ -322,6 +323,44 @@ The OGR provider requires a recent (3+) version of GDAL to be installed.
The `crs` query parameter is used as follows:
e.g. ``http://localhost:5000/collections/foo/items?crs=http%3A%2F%2Fwww.opengis.net%2Fdef%2Fcrs%2FEPSG%2F0%2F28992``.
.. _OpenSearch:
OpenSearch
^^^^^^^^^^
.. note::
Requires Python package opensearch-py
To publish an OpenSearch index, the following are required in your index:
* indexes must be documents of valid GeoJSON Features
* index mappings must define the GeoJSON ``geometry`` as a ``geo_shape``
.. code-block:: yaml
providers:
- type: feature
name: OpenSearch
editable: true|false # optional, default is false
data: http://localhost:9200/ne_110m_populated_places_simple
id_field: geonameid
time_field: datetimefield
.. note::
For OpenSearch indexes that are password protect, a RFC1738 URL can be used as follows:
``data: http://username:password@localhost:9200/ne_110m_populated_places_simple``
To further conceal authentication credentials, environment variables can be used:
``data: http://${MY_USERNAME}:${MY_PASSWORD}@localhost:9200/ne_110m_populated_places_simple``
The OpenSearch provider also has the support for the CQL queries as indicated in the table above.
.. seealso::
:ref:`cql` for more details on how to use Common Query Language (CQL) to filter the collection with specific queries.
.. _Oracle:
Oracle
+1 -1
View File
@@ -889,7 +889,7 @@ def post_collection_items(
HTTPStatus.BAD_REQUEST, headers, request.format,
'InvalidParameterValue', msg)
else:
LOGGER.debug('processing Elasticsearch CQL_JSON data')
LOGGER.debug('processing CQL_JSON data')
try:
filter_ = CQLModel.parse_raw(data)
except Exception:
+3 -2
View File
@@ -51,10 +51,11 @@ PLUGINS = {
'MapScript': 'pygeoapi.provider.mapscript_.MapScriptProvider',
'MongoDB': 'pygeoapi.provider.mongo.MongoProvider',
'MVT-tippecanoe': 'pygeoapi.provider.mvt_tippecanoe.MVTTippecanoeProvider', # noqa: E501
'MVT-elastic': 'pygeoapi.provider.mvt_elastic.MVTElasticProvider', # noqa: E501
'MVT-proxy': 'pygeoapi.provider.mvt_proxy.MVTProxyProvider', # noqa: E501
'MVT-elastic': 'pygeoapi.provider.mvt_elastic.MVTElasticProvider',
'MVT-proxy': 'pygeoapi.provider.mvt_proxy.MVTProxyProvider',
'OracleDB': 'pygeoapi.provider.oracle.OracleProvider',
'OGR': 'pygeoapi.provider.ogr.OGRProvider',
'OpenSearch': 'pygeoapi.provider.opensearch_.OpenSearchProvider',
'Parquet': 'pygeoapi.provider.parquet.ParquetProvider',
'PostgreSQL': 'pygeoapi.provider.postgresql.PostgreSQLProvider',
'rasterio': 'pygeoapi.provider.rasterio_.RasterioProvider',
+742
View File
@@ -0,0 +1,742 @@
# =================================================================
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
# Francesco Bartoli <xbartolone@gmail.com>
#
# Copyright (c) 2024 Tom Kralidis
# Copyright (c) 2024 Francesco Bartoli
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
from typing import Dict
from collections import OrderedDict
import json
import logging
import uuid
from opensearchpy import OpenSearch, helpers
from opensearch_dsl import Search, Q
from pygeoapi.provider.base import (BaseProvider, ProviderConnectionError,
ProviderQueryError,
ProviderItemNotFoundError)
from pygeoapi.models.cql import CQLModel, get_next_node
from pygeoapi.util import get_envelope, crs_transform
LOGGER = logging.getLogger(__name__)
class OpenSearchProvider(BaseProvider):
"""OpenSearch Provider"""
def __init__(self, provider_def):
"""
Initialize object
:param provider_def: provider definition
:returns: pygeoapi.provider.opensearch_.OpenSearchProvider
"""
super().__init__(provider_def)
self.select_properties = []
self.os_host, self.index_name = self.data.rsplit('/', 1)
LOGGER.debug('Setting OpenSearch properties')
LOGGER.debug(f'host: {self.os_host}')
LOGGER.debug(f'index: {self.index_name}')
LOGGER.debug('Connecting to OpenSearch')
self.os_ = OpenSearch(self.os_host, verify_certs=0)
if not self.os_.ping():
msg = f'Cannot connect to OpenSearch: {self.os_host}'
LOGGER.error(msg)
raise ProviderConnectionError(msg)
LOGGER.debug('Determining OpenSearch version')
v = self.os_.info()['version']['number'][:3]
LOGGER.debug(f'OpenSearch version: {v}')
LOGGER.debug('Grabbing field information')
try:
self.get_fields()
except Exception as err:
LOGGER.error(err)
raise ProviderQueryError(err)
def get_fields(self):
"""
Get provider field information (names, types)
:returns: dict of fields
"""
if not self._fields:
ii = self.os_.indices.get(index=self.index_name,
allow_no_indices=False)
LOGGER.debug(f'Response: {ii}')
try:
if '*' not in self.index_name:
mappings = ii[self.index_name]['mappings']
p = mappings['properties']['properties']
else:
LOGGER.debug('Wildcard index; setting from first match')
index_name_ = list(ii.keys())[0]
p = ii[index_name_]['mappings']['properties']['properties']
except KeyError:
LOGGER.warning('Trying for alias')
alias_name = next(iter(ii))
p = ii[alias_name]['mappings']['properties']['properties']
except IndexError:
LOGGER.warning('could not get fields; returning empty set')
return {}
for k, v in p['properties'].items():
if 'type' in v:
if v['type'] == 'text':
self._fields[k] = {'type': 'string'}
elif v['type'] == 'date':
self._fields[k] = {'type': 'string', 'format': 'date'}
elif v['type'] in ('float', 'long'):
self._fields[k] = {'type': 'number',
'format': v['type']}
else:
self._fields[k] = {'type': v['type']}
return self._fields
@crs_transform
def query(self, offset=0, limit=10, resulttype='results',
bbox=[], datetime_=None, properties=[], sortby=[],
select_properties=[], skip_geometry=False, q=None,
filterq=None, **kwargs):
"""
query OpenSearch index
:param offset: starting record to return (default 0)
:param limit: number of records to return (default 10)
:param resulttype: return results or hit limit (default results)
:param bbox: bounding box [minx,miny,maxx,maxy]
:param datetime_: temporal (datestamp or extent)
:param properties: list of tuples (name, value)
:param sortby: list of dicts (property, order)
:param select_properties: list of property names
:param skip_geometry: bool of whether to skip geometry (default False)
:param q: full-text search term(s)
:param filterq: filter object
:returns: dict of 0..n GeoJSON features
"""
self.select_properties = select_properties
query = {'track_total_hits': True, 'query': {'bool': {'filter': []}}}
filter_ = []
feature_collection = {
'type': 'FeatureCollection',
'features': []
}
if resulttype == 'hits':
LOGGER.debug('hits only specified')
limit = 0
if bbox:
LOGGER.debug('processing bbox parameter')
minx, miny, maxx, maxy = bbox
bbox_filter = {
'geo_shape': {
'geometry': {
'shape': {
'type': 'envelope',
'coordinates': [[minx, maxy], [maxx, miny]]
},
'relation': 'intersects'
}
}
}
query['query']['bool']['filter'].append(bbox_filter)
if datetime_ is not None:
LOGGER.debug('processing datetime parameter')
if self.time_field is None:
LOGGER.error('time_field not enabled for collection')
raise ProviderQueryError()
time_field = self.mask_prop(self.time_field)
if '/' in datetime_: # envelope
LOGGER.debug('detected time range')
time_begin, time_end = datetime_.split('/')
range_ = {
'range': {
time_field: {
'gte': time_begin,
'lte': time_end
}
}
}
if time_begin == '..':
range_['range'][time_field].pop('gte')
elif time_end == '..':
range_['range'][time_field].pop('lte')
filter_.append(range_)
else: # time instant
LOGGER.debug('detected time instant')
filter_.append({'match': {time_field: datetime_}})
LOGGER.debug(filter_)
query['query']['bool']['filter'].append(*filter_)
if properties:
LOGGER.debug('processing properties')
for prop in properties:
prop_name = self.mask_prop(prop[0])
pf = {
'match': {
prop_name: {
'query': prop[1]
}
}
}
query['query']['bool']['filter'].append(pf)
if '|' not in prop[1]:
pf['match'][prop_name]['minimum_should_match'] = '100%'
if sortby:
LOGGER.debug('processing sortby')
query['sort'] = []
for sort in sortby:
LOGGER.debug(f'processing sort object: {sort}')
sp = sort['property']
if (self.fields[sp]['type'] == 'string'
and self.fields[sp].get('format') != 'date'):
LOGGER.debug('setting OpenSearch .raw on property')
sort_property = f'{self.mask_prop(sp)}.raw'
else:
sort_property = self.mask_prop(sp)
sort_order = 'asc'
if sort['order'] == '-':
sort_order = 'desc'
sort_ = {
sort_property: {
'order': sort_order
}
}
query['sort'].append(sort_)
if q is not None:
LOGGER.debug('Adding free-text search')
query['query']['bool']['must'] = {'query_string': {'query': q}}
query['_source'] = {
'excludes': [
'properties._metadata-payload',
'properties._metadata-schema',
'properties._metadata-format'
]
}
if self.properties or self.select_properties:
LOGGER.debug('filtering properties')
all_properties = self.get_properties()
query['_source'] = {
'includes': list(map(self.mask_prop, all_properties))
}
query['_source']['includes'].append('id')
query['_source']['includes'].append('type')
query['_source']['includes'].append('geometry')
if skip_geometry:
LOGGER.debug('excluding geometry')
try:
query['_source']['excludes'] = ['geometry']
except KeyError:
query['_source'] = {'excludes': ['geometry']}
try:
LOGGER.debug('querying OpenSearch')
if filterq:
LOGGER.debug(f'adding cql object: {filterq.json()}')
query = update_query(input_query=query, cql=filterq)
LOGGER.debug(json.dumps(query, indent=4))
LOGGER.debug('Testing for OpenSearch scrolling')
if offset + limit > 10000:
gen = helpers.scan(client=self.os_, query=query,
preserve_order=True,
index=self.index_name)
results = {'hits': {'total': limit, 'hits': []}}
for i in range(offset + limit):
try:
if i >= offset:
results['hits']['hits'].append(next(gen))
else:
next(gen)
except StopIteration:
break
matched = len(results['hits']['hits']) + offset
returned = len(results['hits']['hits'])
else:
es_results = self.os_.search(index=self.index_name,
from_=offset, size=limit,
body=query)
results = es_results
matched = es_results['hits']['total']['value']
returned = len(es_results['hits']['hits'])
except ValueError:
pass
feature_collection['numberMatched'] = matched
if resulttype == 'hits':
return feature_collection
feature_collection['numberReturned'] = returned
LOGGER.debug('serializing features')
for feature in results['hits']['hits']:
feature_ = self.osdoc2geojson(feature)
feature_collection['features'].append(feature_)
return feature_collection
@crs_transform
def get(self, identifier, **kwargs):
"""
Get OpenSearch document by id
:param identifier: feature id
:returns: dict of single GeoJSON feature
"""
try:
LOGGER.debug(f'Fetching identifier {identifier}')
result = self.os_.get(index=self.index_name, id=identifier)
LOGGER.debug('Serializing feature')
feature_ = self.osdoc2geojson(result)
except Exception as err:
LOGGER.debug(f'Not found via OpenSearch id query: {err}')
LOGGER.debug('Trying via a real query')
query = {
'query': {
'bool': {
'filter': [{
'match_phrase': {
'_id': identifier
}
}]
}
}
}
LOGGER.debug(f'Query: {query}')
try:
result = self.os_search(index=self.index_name, **query)
if len(result['hits']['hits']) == 0:
LOGGER.error(err)
raise ProviderItemNotFoundError(err)
LOGGER.debug('Serializing feature')
feature_ = self.osdoc2geojson(result['hits']['hits'][0])
except Exception as err2:
LOGGER.error(err2)
raise ProviderItemNotFoundError(err2)
except Exception as err:
LOGGER.error(err)
return None
return feature_
def create(self, item):
"""
Create a new item
:param item: `dict` of new item
:returns: identifier of created item
"""
identifier, json_data = self._load_and_prepare_item(
item, accept_missing_identifier=True)
if identifier is None:
# If there is no incoming identifier, allocate a random one
identifier = str(uuid.uuid4())
json_data["id"] = identifier
LOGGER.debug(f'Inserting data with identifier {identifier}')
_ = self.os_.index(index=self.index_name, id=identifier,
body=json_data)
LOGGER.debug('Item added')
return identifier
def update(self, identifier, item):
"""
Updates an existing item
:param identifier: feature id
:param item: `dict` of partial or full item
:returns: `bool` of update result
"""
LOGGER.debug(f'Updating item {identifier}')
identifier, json_data = self._load_and_prepare_item(
item, identifier, raise_if_exists=False)
_ = self.os_index(index=self.index_name, id=identifier, body=json_data)
return True
def delete(self, identifier):
"""
Deletes an existing item
:param identifier: item id
:returns: `bool` of deletion result
"""
LOGGER.debug(f'Deleting item {identifier}')
_ = self.os_delete(index=self.index_name, id=identifier)
return True
def osdoc2geojson(self, doc):
"""
generate GeoJSON `dict` from OpenSearch document
:param doc: `dict` of OpenSearch document
:returns: GeoJSON `dict`
"""
feature_ = {}
feature_thinned = {}
LOGGER.debug('Fetching id and geometry from GeoJSON document')
feature_ = doc['_source']
try:
id_ = doc['_source']['properties'][self.id_field]
except KeyError as err:
LOGGER.debug(f'Missing field: {err}')
id_ = doc['_source'].get('id', doc['_id'])
feature_['id'] = id_
feature_['geometry'] = doc['_source'].get('geometry')
if self.properties or self.select_properties:
LOGGER.debug('Filtering properties')
all_properties = self.get_properties()
feature_thinned = {
'id': id_,
'type': feature_['type'],
'geometry': feature_.get('geometry'),
'properties': OrderedDict()
}
for p in all_properties:
try:
feature_thinned['properties'][p] = feature_['properties'][p] # noqa
except KeyError as err:
LOGGER.error(err)
raise ProviderQueryError()
if feature_thinned:
return feature_thinned
else:
return feature_
def mask_prop(self, property_name):
"""
generate property name based on OpenSearch backend setup
:param property_name: property name
:returns: masked property name
"""
return f'properties.{property_name}'
def get_properties(self):
all_properties = []
LOGGER.debug(f'configured properties: {self.properties}')
LOGGER.debug(f'selected properties: {self.select_properties}')
if not self.properties and not self.select_properties:
all_properties = self.get_fields()
if self.properties and self.select_properties:
all_properties = self.properties and self.select_properties
else:
all_properties = self.properties or self.select_properties
LOGGER.debug(f'resulting properties: {all_properties}')
return all_properties
def __repr__(self):
return f'<OpenSearchProvider> {self.data}'
class OpenSearchCatalogueProvider(OpenSearchProvider):
"""OpenSearch Provider"""
def __init__(self, provider_def):
super().__init__(provider_def)
def _excludes(self):
return [
'properties._metadata-anytext'
]
def get_fields(self):
fields = super().get_fields()
for i in self._excludes():
if i in fields:
del fields[i]
fields['q'] = {'type': 'string'}
return fields
def query(self, offset=0, limit=10, resulttype='results',
bbox=[], datetime_=None, properties=[], sortby=[],
select_properties=[], skip_geometry=False, q=None,
filterq=None, **kwargs):
records = super().query(
offset=offset, limit=limit,
resulttype=resulttype, bbox=bbox,
datetime_=datetime_, properties=properties,
sortby=sortby,
select_properties=select_properties,
skip_geometry=skip_geometry,
q=q)
return records
def __repr__(self):
return f'<OpenSearchCatalogueProvider> {self.data}'
class OpenSearchQueryBuilder:
def __init__(self):
self._operation = None
self.must_value = {}
self.should_value = {}
self.mustnot_value = {}
self.filter_value = {}
def must(self, must_value):
self.must_value = must_value
return self
def should(self, should_value):
self.should_value = should_value
return self
def must_not(self, mustnot_value):
self.mustnot_value = mustnot_value
return self
def filter(self, filter_value):
self.filter_value = filter_value
return self
@property
def operation(self):
return self._operation
@operation.setter
def operation(self, value):
self._operation = value
def build(self):
if self.must_value:
must_clause = self.must_value or {}
if self.should_value:
should_clause = self.should_value or {}
if self.mustnot_value:
mustnot_clause = self.mustnot_value or {}
if self.filter_value:
filter_clause = self.filter_value or {}
else:
filter_clause = {}
# to figure out how to deal with logical operations
# return match_clause & range_clause
clauses = must_clause or should_clause or mustnot_clause
filters = filter_clause
if self.operation == 'and':
res = Q(
'bool',
must=[clause for clause in clauses],
filter=[filter for filter in filters])
elif self.operation == 'or':
res = Q(
'bool',
should=[clause for clause in clauses],
filter=[filter for filter in filters])
elif self.operation == 'not':
res = Q(
'bool',
must_not=[clause for clause in clauses],
filter=[filter for filter in filters])
else:
if filters:
res = Q(
'bool',
must=[clauses],
filter=[filters])
else:
res = Q(
'bool',
must=[clauses])
return res
def _build_query(q, cql):
# this would be handled by the AST with the traverse of CQL model
op, node = get_next_node(cql.__root__)
q.operation = op
if isinstance(node, list):
query_list = []
for elem in node:
op, next_node = get_next_node(elem)
if not getattr(next_node, 'between', 0) == 0:
property = next_node.between.value.__root__.__root__.property
lower = next_node.between.lower.__root__.__root__
upper = next_node.between.upper.__root__.__root__
query_list.append(Q(
{
'range':
{
f'{property}': {
'gte': lower, 'lte': upper
}
}
}
))
if not getattr(next_node, '__root__', 0) == 0:
scalars = tuple(next_node.__root__.eq.__root__)
property = scalars[0].__root__.property
value = scalars[1].__root__.__root__
query_list.append(Q(
{'match': {f'{property}': f'{value}'}}
))
q.must(query_list)
elif not getattr(node, 'between', 0) == 0:
property = node.between.value.__root__.__root__.property
lower = None
if not getattr(node.between.lower,
'__root__', 0) == 0:
lower = node.between.lower.__root__.__root__
upper = None
if not getattr(node.between.upper,
'__root__', 0) == 0:
upper = node.between.upper.__root__.__root__
query = Q(
{
'range':
{
f'{property}': {
'gte': lower, 'lte': upper
}
}
}
)
q.must(query)
elif not getattr(node, '__root__', 0) == 0:
next_op, next_node = get_next_node(node)
if not getattr(next_node, 'eq', 0) == 0:
scalars = tuple(next_node.eq.__root__)
property = scalars[0].__root__.property
value = scalars[1].__root__.__root__
query = Q(
{'match': {f'{property}': f'{value}'}}
)
q.must(query)
elif not getattr(node, 'intersects', 0) == 0:
property = node.intersects.__root__[0].__root__.property
if property == 'geometry':
geom_type = node.intersects.__root__[
1].__root__.__root__.__root__.type
if geom_type == 'Polygon':
coordinates = node.intersects.__root__[
1].__root__.__root__.__root__.coordinates
coords_list = [
poly_coords.__root__ for poly_coords in coordinates[0]
]
filter_ = Q(
{
'geo_shape': {
'geometry': {
'shape': {
'type': 'envelope',
'coordinates': get_envelope(
coords_list)
},
'relation': 'intersects'
}
}
}
)
query_all = Q(
{'match_all': {}}
)
q.must(query_all)
q.filter(filter_)
return q.build()
def update_query(input_query: Dict, cql: CQLModel):
s = Search.from_dict(input_query)
query = OpenSearchQueryBuilder()
output_query = _build_query(query, cql)
s = s.query(output_query)
LOGGER.debug(f'Enhanced query: {json.dumps(s.to_dict())}')
return s.to_dict()
+2
View File
@@ -9,6 +9,8 @@ geoalchemy2
geopandas
netCDF4
numpy==2.0.1
opensearch-dsl
opensearch-py
oracledb
pandas
psycopg2
+1 -1
View File
@@ -95,5 +95,5 @@ es.options(request_timeout=90).indices.create(
with open(sys.argv[1], encoding='utf-8') as fh:
d = json.load(fh)
# call generator function to yield features into ES build API
# call generator function to yield features into ES bulk API
helpers.bulk(es, gendata(d))
+102
View File
@@ -0,0 +1,102 @@
# =================================================================
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
#
# Copyright (c) 2024 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
import json
from pathlib import Path
import sys
from opensearchpy import OpenSearch, helpers
os_ = OpenSearch(['localhost:9209'])
if len(sys.argv) < 3:
print(f'Usage: {sys.argv[0]} <path/to/data.geojson> <id-field>')
sys.exit(1)
index_name = Path(sys.argv[1]).stem.lower()
id_field = sys.argv[2]
if os_.indices.exists(index=index_name):
os_.indices.delete(index=index_name)
# index settings
body = {
'settings': {
'index': {
'number_of_shards': 1,
'number_of_replicas': 0
}
},
'mappings': {
'properties': {
'geometry': {
'type': 'geo_shape'
},
'properties': {
'properties': {
'nameascii': {
'type': 'text',
'fields': {
'raw': {
'type': 'keyword'
}
}
}
}
}
}
}
}
def gendata(data):
"""
Generator function to yield features
"""
for f in data['features']:
try:
f['properties'][id_field] = int(f['properties'][id_field])
except ValueError:
f['properties'][id_field] = f['properties'][id_field]
yield {
"_index": index_name,
"_id": f['properties'][id_field],
"_source": f
}
# create index
os_.indices.create(index=index_name, body=body)
with open(sys.argv[1], encoding='utf-8') as fh:
d = json.load(fh)
# call generator function to yield features into OpenSearch bulk API
helpers.bulk(os_, gendata(d))
+318
View File
@@ -0,0 +1,318 @@
# =================================================================
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
#
# Copyright (c) 2024 Tom Kralidis
# Copyright (c) 2024 Francesco Bartoli
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
import pytest
from pygeoapi.provider.base import ProviderItemNotFoundError
from pygeoapi.provider.opensearch_ import OpenSearchProvider
from pygeoapi.models.cql import CQLModel
@pytest.fixture()
def config():
return {
'name': 'OpenSearch',
'type': 'feature',
'data': 'http://localhost:9209/ne_110m_populated_places_simple', # noqa
'id_field': 'geonameid'
}
@pytest.fixture()
def config_ordered_properties():
return {
'name': 'OpenSearch',
'type': 'feature',
'data': 'http://localhost:9209/ne_110m_populated_places_simple', # noqa
'id_field': 'geonameid',
'properties': [
'adm0name',
'adm1name'
]
}
@pytest.fixture()
def config_cql():
return {
'name': 'OpenSearch',
'type': 'feature',
'data': 'http://localhost:9209/nhsl_hazard_threat_all_indicators_s_bc', # noqa
'id_field': 'Sauid'
}
@pytest.fixture()
def between():
between_ = {
"between": {
"value": {"property": "properties.pop_max"},
"lower": 10000,
"upper": 100000
}
}
return CQLModel.parse_obj(between_)
@pytest.fixture()
def between_upper():
between_ = {
"between": {
"value": {"property": "properties.pop_max"},
"upper": 100000
}
}
return CQLModel.parse_obj(between_)
@pytest.fixture()
def between_lower():
between_ = {
"between": {
"value": {"property": "properties.pop_max"},
"lower": 10000
}
}
return CQLModel.parse_obj(between_)
@pytest.fixture()
def eq():
eq_ = {
"eq": [
{"property": "properties.featurecla"},
"Admin-0 capital"
]
}
return CQLModel.parse_obj(eq_)
@pytest.fixture()
def _and(eq, between):
and_ = {
"and": [
{
"between": {
"value": {
"property": "properties.pop_max"
},
"lower": 100000,
"upper": 1000000
}
},
{
"eq": [
{"property": "properties.featurecla"},
"Admin-0 capital"
]
}
]
}
return CQLModel.parse_obj(and_)
@pytest.fixture()
def intersects():
intersects = {"intersects": [
{"property": "geometry"},
{
"type": "Polygon",
"coordinates": [
[
[10.497565, 41.520355],
[10.497565, 43.308645],
[15.111823, 43.308645],
[15.111823, 41.520355],
[10.497565, 41.520355]
]
]
}
]}
return CQLModel.parse_obj(intersects)
def test_query(config):
p = OpenSearchProvider(config)
fields = p.get_fields()
assert len(fields) == 37
assert fields['scalerank']['type'] == 'number'
assert fields['scalerank']['format'] == 'long'
assert fields['changed']['type'] == 'number'
assert fields['changed']['format'] == 'float'
assert fields['ls_name']['type'] == 'string'
results = p.query()
assert len(results['features']) == 10
assert results['numberMatched'] == 242
assert results['numberReturned'] == 10
assert results['features'][0]['id'] == 6691831
assert results['features'][0]['properties']['nameascii'] == 'Vatican City'
results = p.query(properties=[('nameascii', 'Vatican City')])
assert len(results['features']) == 1
assert results['numberMatched'] == 1
assert results['numberReturned'] == 1
results = p.query(limit=1)
assert len(results['features']) == 1
assert results['features'][0]['id'] == 6691831
results = p.query(offset=2, limit=1)
assert len(results['features']) == 1
assert results['features'][0]['id'] == 3042030
results = p.query(sortby=[{'property': 'nameascii', 'order': '+'}])
assert results['features'][0]['properties']['nameascii'] == 'Abidjan'
results = p.query(sortby=[{'property': 'nameascii', 'order': '-'}])
assert results['features'][0]['properties']['nameascii'] == 'Zagreb'
results = p.query(sortby=[{'property': 'scalerank', 'order': '+'}])
assert results['features'][0]['properties']['scalerank'] == 0
results = p.query(sortby=[{'property': 'scalerank', 'order': '-'}])
assert results['features'][0]['properties']['scalerank'] == 8
assert len(results['features'][0]['properties']) == 37
results = p.query(sortby=[{'property': 'nameascii', 'order': '-'}],
limit=10001)
assert results['features'][0]['properties']['nameascii'] == 'Zagreb'
assert len(results['features']) == 242
assert results['numberMatched'] == 242
assert results['numberReturned'] == 242
results = p.query(select_properties=['nameascii'])
assert len(results['features'][0]['properties']) == 1
results = p.query(select_properties=['nameascii', 'scalerank'])
assert len(results['features'][0]['properties']) == 2
results = p.query(skip_geometry=True)
assert results['features'][0]['geometry'] is None
config['properties'] = ['nameascii']
p = OpenSearchProvider(config)
results = p.query()
assert len(results['features'][0]['properties']) == 1
def test_query_ordered_properties(config_ordered_properties):
p = OpenSearchProvider(config_ordered_properties)
result = p.query()
feature_properties = list(result['features'][0]['properties'].keys())
assert feature_properties == ['adm0name', 'adm1name']
def test_get(config):
p = OpenSearchProvider(config)
result = p.get('3413829')
assert result['id'] == 3413829
assert result['properties']['ls_name'] == 'Reykjavik'
def test_get_not_existing_item_raise_exception(config):
"""Testing query for a not existing object"""
p = OpenSearchProvider(config)
with pytest.raises(ProviderItemNotFoundError):
p.get('404')
def test_post_cql_json_between_query(config, between):
"""Testing cql json query for a between object"""
p = OpenSearchProvider(config)
results = p.query(limit=100, filterq=between)
assert len(results['features']) == 23
assert results['numberMatched'] == 23
assert results['numberReturned'] == 23
for item in results['features']:
assert 10000 <= item["properties"]["pop_max"] <= 100000
def test_post_cql_json_between_lte_query(config, between_upper):
"""Testing cql json query for a between object"""
p = OpenSearchProvider(config)
results = p.query(limit=100, filterq=between_upper)
assert len(results['features']) == 28
assert results['numberMatched'] == 28
assert results['numberReturned'] == 28
for item in results['features']:
assert item["properties"]["pop_max"] <= 100000
def test_post_cql_json_between_gte_query(config, between_lower):
"""Testing cql json query for a between object"""
p = OpenSearchProvider(config)
results = p.query(limit=500, filterq=between_lower)
assert len(results['features']) == 237
assert results['numberMatched'] == 237
assert results['numberReturned'] == 237
for item in results['features']:
assert 10000 <= item["properties"]["pop_max"]
def test_post_cql_json_eq_query(config, eq):
"""Testing cql json query for an eq object"""
p = OpenSearchProvider(config)
results = p.query(limit=500, filterq=eq)
assert len(results['features']) == 235
def test_post_cql_json_and_query(config, _and):
"""Testing cql json query for an and object"""
p = OpenSearchProvider(config)
results = p.query(limit=1000, filterq=_and)
assert len(results['features']) == 77
def test_post_cql_json_intersects_query(config, intersects):
"""Testing cql json query for an intersects object"""
p = OpenSearchProvider(config)
results = p.query(limit=100, filterq=intersects)
assert len(results['features']) == 2