diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 3805b9a..444075d 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -62,6 +62,12 @@ jobs: host node port: 9300 node port: 9300 discovery type: 'single-node' + - name: Install and run OpenSearch 📦 + uses: esmarkowski/opensearch-github-action@v1.0.0 + with: + version: 2.12.0 + security-disabled: true + port: 9209 - name: Install and run MongoDB uses: supercharge/mongodb-github-action@1.5.0 with: @@ -99,6 +105,7 @@ jobs: - name: setup test data ⚙️ run: | python3 tests/load_es_data.py tests/data/ne_110m_populated_places_simple.geojson geonameid + python3 tests/load_opensearch_data.py tests/data/ne_110m_populated_places_simple.geojson geonameid python3 tests/load_mongo_data.py tests/data/ne_110m_populated_places_simple.geojson gunzip < tests/data/hotosm_bdi_waterways.sql.gz | psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test -f tests/data/dummy_data.sql @@ -117,6 +124,7 @@ jobs: pytest tests/test_csv__provider.py pytest tests/test_django.py pytest tests/test_elasticsearch__provider.py + pytest tests/test_opensearch__provider.py pytest tests/test_esri_provider.py pytest tests/test_filesystem_provider.py pytest tests/test_geojson_provider.py diff --git a/docs/source/data-publishing/ogcapi-features.rst b/docs/source/data-publishing/ogcapi-features.rst index 12478c1..2a1f87d 100644 --- a/docs/source/data-publishing/ogcapi-features.rst +++ b/docs/source/data-publishing/ogcapi-features.rst @@ -26,6 +26,7 @@ parameters. `GeoJSON`_,✅/✅,results/hits,❌,❌,❌,✅,❌,❌,✅ `MongoDB`_,✅/❌,results,✅,✅,✅,✅,❌,❌,✅ `OGR`_,✅/❌,results/hits,✅,❌,❌,✅,❌,❌,✅ + `OpenSearch`_,✅/✅,results/hits,✅,✅,✅,✅,✅,✅,✅ `Oracle`_,✅/✅,results/hits,✅,❌,✅,✅,❌,❌,✅ `Parquet`_,✅/✅,results/hits,✅,✅,❌,✅,❌,❌,✅ `PostgreSQL`_,✅/✅,results/hits,✅,✅,✅,✅,✅,❌,✅ @@ -322,6 +323,44 @@ The OGR provider requires a recent (3+) version of GDAL to be installed. The `crs` query parameter is used as follows: e.g. ``http://localhost:5000/collections/foo/items?crs=http%3A%2F%2Fwww.opengis.net%2Fdef%2Fcrs%2FEPSG%2F0%2F28992``. +.. _OpenSearch: + +OpenSearch +^^^^^^^^^^ + +.. note:: + Requires Python package opensearch-py + +To publish an OpenSearch index, the following are required in your index: + +* indexes must be documents of valid GeoJSON Features +* index mappings must define the GeoJSON ``geometry`` as a ``geo_shape`` + +.. code-block:: yaml + + providers: + - type: feature + name: OpenSearch + editable: true|false # optional, default is false + data: http://localhost:9200/ne_110m_populated_places_simple + id_field: geonameid + time_field: datetimefield + +.. note:: + + For OpenSearch indexes that are password protect, a RFC1738 URL can be used as follows: + + ``data: http://username:password@localhost:9200/ne_110m_populated_places_simple`` + + To further conceal authentication credentials, environment variables can be used: + + ``data: http://${MY_USERNAME}:${MY_PASSWORD}@localhost:9200/ne_110m_populated_places_simple`` + +The OpenSearch provider also has the support for the CQL queries as indicated in the table above. + +.. seealso:: + :ref:`cql` for more details on how to use Common Query Language (CQL) to filter the collection with specific queries. + .. _Oracle: Oracle diff --git a/pygeoapi/api/itemtypes.py b/pygeoapi/api/itemtypes.py index 008b28c..7699c86 100644 --- a/pygeoapi/api/itemtypes.py +++ b/pygeoapi/api/itemtypes.py @@ -889,7 +889,7 @@ def post_collection_items( HTTPStatus.BAD_REQUEST, headers, request.format, 'InvalidParameterValue', msg) else: - LOGGER.debug('processing Elasticsearch CQL_JSON data') + LOGGER.debug('processing CQL_JSON data') try: filter_ = CQLModel.parse_raw(data) except Exception: diff --git a/pygeoapi/plugin.py b/pygeoapi/plugin.py index ee46172..c0bf22e 100644 --- a/pygeoapi/plugin.py +++ b/pygeoapi/plugin.py @@ -51,10 +51,11 @@ PLUGINS = { 'MapScript': 'pygeoapi.provider.mapscript_.MapScriptProvider', 'MongoDB': 'pygeoapi.provider.mongo.MongoProvider', 'MVT-tippecanoe': 'pygeoapi.provider.mvt_tippecanoe.MVTTippecanoeProvider', # noqa: E501 - 'MVT-elastic': 'pygeoapi.provider.mvt_elastic.MVTElasticProvider', # noqa: E501 - 'MVT-proxy': 'pygeoapi.provider.mvt_proxy.MVTProxyProvider', # noqa: E501 + 'MVT-elastic': 'pygeoapi.provider.mvt_elastic.MVTElasticProvider', + 'MVT-proxy': 'pygeoapi.provider.mvt_proxy.MVTProxyProvider', 'OracleDB': 'pygeoapi.provider.oracle.OracleProvider', 'OGR': 'pygeoapi.provider.ogr.OGRProvider', + 'OpenSearch': 'pygeoapi.provider.opensearch_.OpenSearchProvider', 'Parquet': 'pygeoapi.provider.parquet.ParquetProvider', 'PostgreSQL': 'pygeoapi.provider.postgresql.PostgreSQLProvider', 'rasterio': 'pygeoapi.provider.rasterio_.RasterioProvider', diff --git a/pygeoapi/provider/opensearch_.py b/pygeoapi/provider/opensearch_.py new file mode 100644 index 0000000..b6b308e --- /dev/null +++ b/pygeoapi/provider/opensearch_.py @@ -0,0 +1,742 @@ +# ================================================================= +# +# Authors: Tom Kralidis +# Francesco Bartoli +# +# Copyright (c) 2024 Tom Kralidis +# Copyright (c) 2024 Francesco Bartoli +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +from typing import Dict +from collections import OrderedDict +import json +import logging +import uuid + +from opensearchpy import OpenSearch, helpers +from opensearch_dsl import Search, Q + +from pygeoapi.provider.base import (BaseProvider, ProviderConnectionError, + ProviderQueryError, + ProviderItemNotFoundError) +from pygeoapi.models.cql import CQLModel, get_next_node +from pygeoapi.util import get_envelope, crs_transform + + +LOGGER = logging.getLogger(__name__) + + +class OpenSearchProvider(BaseProvider): + """OpenSearch Provider""" + + def __init__(self, provider_def): + """ + Initialize object + + :param provider_def: provider definition + + :returns: pygeoapi.provider.opensearch_.OpenSearchProvider + """ + + super().__init__(provider_def) + + self.select_properties = [] + + self.os_host, self.index_name = self.data.rsplit('/', 1) + + LOGGER.debug('Setting OpenSearch properties') + + LOGGER.debug(f'host: {self.os_host}') + LOGGER.debug(f'index: {self.index_name}') + + LOGGER.debug('Connecting to OpenSearch') + self.os_ = OpenSearch(self.os_host, verify_certs=0) + if not self.os_.ping(): + msg = f'Cannot connect to OpenSearch: {self.os_host}' + LOGGER.error(msg) + raise ProviderConnectionError(msg) + + LOGGER.debug('Determining OpenSearch version') + v = self.os_.info()['version']['number'][:3] + LOGGER.debug(f'OpenSearch version: {v}') + + LOGGER.debug('Grabbing field information') + try: + self.get_fields() + except Exception as err: + LOGGER.error(err) + raise ProviderQueryError(err) + + def get_fields(self): + """ + Get provider field information (names, types) + + :returns: dict of fields + """ + if not self._fields: + ii = self.os_.indices.get(index=self.index_name, + allow_no_indices=False) + + LOGGER.debug(f'Response: {ii}') + try: + if '*' not in self.index_name: + mappings = ii[self.index_name]['mappings'] + p = mappings['properties']['properties'] + else: + LOGGER.debug('Wildcard index; setting from first match') + index_name_ = list(ii.keys())[0] + p = ii[index_name_]['mappings']['properties']['properties'] + except KeyError: + LOGGER.warning('Trying for alias') + alias_name = next(iter(ii)) + p = ii[alias_name]['mappings']['properties']['properties'] + except IndexError: + LOGGER.warning('could not get fields; returning empty set') + return {} + + for k, v in p['properties'].items(): + if 'type' in v: + if v['type'] == 'text': + self._fields[k] = {'type': 'string'} + elif v['type'] == 'date': + self._fields[k] = {'type': 'string', 'format': 'date'} + elif v['type'] in ('float', 'long'): + self._fields[k] = {'type': 'number', + 'format': v['type']} + else: + self._fields[k] = {'type': v['type']} + + return self._fields + + @crs_transform + def query(self, offset=0, limit=10, resulttype='results', + bbox=[], datetime_=None, properties=[], sortby=[], + select_properties=[], skip_geometry=False, q=None, + filterq=None, **kwargs): + """ + query OpenSearch index + + :param offset: starting record to return (default 0) + :param limit: number of records to return (default 10) + :param resulttype: return results or hit limit (default results) + :param bbox: bounding box [minx,miny,maxx,maxy] + :param datetime_: temporal (datestamp or extent) + :param properties: list of tuples (name, value) + :param sortby: list of dicts (property, order) + :param select_properties: list of property names + :param skip_geometry: bool of whether to skip geometry (default False) + :param q: full-text search term(s) + :param filterq: filter object + + :returns: dict of 0..n GeoJSON features + """ + + self.select_properties = select_properties + + query = {'track_total_hits': True, 'query': {'bool': {'filter': []}}} + filter_ = [] + + feature_collection = { + 'type': 'FeatureCollection', + 'features': [] + } + + if resulttype == 'hits': + LOGGER.debug('hits only specified') + limit = 0 + + if bbox: + LOGGER.debug('processing bbox parameter') + minx, miny, maxx, maxy = bbox + bbox_filter = { + 'geo_shape': { + 'geometry': { + 'shape': { + 'type': 'envelope', + 'coordinates': [[minx, maxy], [maxx, miny]] + }, + 'relation': 'intersects' + } + } + } + + query['query']['bool']['filter'].append(bbox_filter) + + if datetime_ is not None: + LOGGER.debug('processing datetime parameter') + if self.time_field is None: + LOGGER.error('time_field not enabled for collection') + raise ProviderQueryError() + + time_field = self.mask_prop(self.time_field) + + if '/' in datetime_: # envelope + LOGGER.debug('detected time range') + time_begin, time_end = datetime_.split('/') + + range_ = { + 'range': { + time_field: { + 'gte': time_begin, + 'lte': time_end + } + } + } + if time_begin == '..': + range_['range'][time_field].pop('gte') + elif time_end == '..': + range_['range'][time_field].pop('lte') + + filter_.append(range_) + + else: # time instant + LOGGER.debug('detected time instant') + filter_.append({'match': {time_field: datetime_}}) + + LOGGER.debug(filter_) + query['query']['bool']['filter'].append(*filter_) + + if properties: + LOGGER.debug('processing properties') + for prop in properties: + prop_name = self.mask_prop(prop[0]) + pf = { + 'match': { + prop_name: { + 'query': prop[1] + } + } + } + query['query']['bool']['filter'].append(pf) + + if '|' not in prop[1]: + pf['match'][prop_name]['minimum_should_match'] = '100%' + + if sortby: + LOGGER.debug('processing sortby') + query['sort'] = [] + for sort in sortby: + LOGGER.debug(f'processing sort object: {sort}') + + sp = sort['property'] + + if (self.fields[sp]['type'] == 'string' + and self.fields[sp].get('format') != 'date'): + LOGGER.debug('setting OpenSearch .raw on property') + sort_property = f'{self.mask_prop(sp)}.raw' + else: + sort_property = self.mask_prop(sp) + + sort_order = 'asc' + if sort['order'] == '-': + sort_order = 'desc' + + sort_ = { + sort_property: { + 'order': sort_order + } + } + query['sort'].append(sort_) + + if q is not None: + LOGGER.debug('Adding free-text search') + query['query']['bool']['must'] = {'query_string': {'query': q}} + + query['_source'] = { + 'excludes': [ + 'properties._metadata-payload', + 'properties._metadata-schema', + 'properties._metadata-format' + ] + } + + if self.properties or self.select_properties: + LOGGER.debug('filtering properties') + + all_properties = self.get_properties() + + query['_source'] = { + 'includes': list(map(self.mask_prop, all_properties)) + } + + query['_source']['includes'].append('id') + query['_source']['includes'].append('type') + query['_source']['includes'].append('geometry') + + if skip_geometry: + LOGGER.debug('excluding geometry') + try: + query['_source']['excludes'] = ['geometry'] + except KeyError: + query['_source'] = {'excludes': ['geometry']} + try: + LOGGER.debug('querying OpenSearch') + if filterq: + LOGGER.debug(f'adding cql object: {filterq.json()}') + query = update_query(input_query=query, cql=filterq) + LOGGER.debug(json.dumps(query, indent=4)) + + LOGGER.debug('Testing for OpenSearch scrolling') + if offset + limit > 10000: + gen = helpers.scan(client=self.os_, query=query, + preserve_order=True, + index=self.index_name) + results = {'hits': {'total': limit, 'hits': []}} + for i in range(offset + limit): + try: + if i >= offset: + results['hits']['hits'].append(next(gen)) + else: + next(gen) + except StopIteration: + break + + matched = len(results['hits']['hits']) + offset + returned = len(results['hits']['hits']) + else: + es_results = self.os_.search(index=self.index_name, + from_=offset, size=limit, + body=query) + results = es_results + matched = es_results['hits']['total']['value'] + returned = len(es_results['hits']['hits']) + + except ValueError: + pass + + feature_collection['numberMatched'] = matched + + if resulttype == 'hits': + return feature_collection + + feature_collection['numberReturned'] = returned + + LOGGER.debug('serializing features') + for feature in results['hits']['hits']: + feature_ = self.osdoc2geojson(feature) + feature_collection['features'].append(feature_) + + return feature_collection + + @crs_transform + def get(self, identifier, **kwargs): + """ + Get OpenSearch document by id + + :param identifier: feature id + + :returns: dict of single GeoJSON feature + """ + + try: + LOGGER.debug(f'Fetching identifier {identifier}') + result = self.os_.get(index=self.index_name, id=identifier) + LOGGER.debug('Serializing feature') + feature_ = self.osdoc2geojson(result) + except Exception as err: + LOGGER.debug(f'Not found via OpenSearch id query: {err}') + LOGGER.debug('Trying via a real query') + + query = { + 'query': { + 'bool': { + 'filter': [{ + 'match_phrase': { + '_id': identifier + } + }] + } + } + } + + LOGGER.debug(f'Query: {query}') + try: + result = self.os_search(index=self.index_name, **query) + if len(result['hits']['hits']) == 0: + LOGGER.error(err) + raise ProviderItemNotFoundError(err) + LOGGER.debug('Serializing feature') + feature_ = self.osdoc2geojson(result['hits']['hits'][0]) + except Exception as err2: + LOGGER.error(err2) + raise ProviderItemNotFoundError(err2) + except Exception as err: + LOGGER.error(err) + return None + + return feature_ + + def create(self, item): + """ + Create a new item + + :param item: `dict` of new item + + :returns: identifier of created item + """ + + identifier, json_data = self._load_and_prepare_item( + item, accept_missing_identifier=True) + if identifier is None: + # If there is no incoming identifier, allocate a random one + identifier = str(uuid.uuid4()) + json_data["id"] = identifier + + LOGGER.debug(f'Inserting data with identifier {identifier}') + _ = self.os_.index(index=self.index_name, id=identifier, + body=json_data) + LOGGER.debug('Item added') + + return identifier + + def update(self, identifier, item): + """ + Updates an existing item + + :param identifier: feature id + :param item: `dict` of partial or full item + + :returns: `bool` of update result + """ + + LOGGER.debug(f'Updating item {identifier}') + identifier, json_data = self._load_and_prepare_item( + item, identifier, raise_if_exists=False) + + _ = self.os_index(index=self.index_name, id=identifier, body=json_data) + + return True + + def delete(self, identifier): + """ + Deletes an existing item + + :param identifier: item id + + :returns: `bool` of deletion result + """ + + LOGGER.debug(f'Deleting item {identifier}') + _ = self.os_delete(index=self.index_name, id=identifier) + + return True + + def osdoc2geojson(self, doc): + """ + generate GeoJSON `dict` from OpenSearch document + :param doc: `dict` of OpenSearch document + + :returns: GeoJSON `dict` + """ + + feature_ = {} + feature_thinned = {} + + LOGGER.debug('Fetching id and geometry from GeoJSON document') + feature_ = doc['_source'] + + try: + id_ = doc['_source']['properties'][self.id_field] + except KeyError as err: + LOGGER.debug(f'Missing field: {err}') + id_ = doc['_source'].get('id', doc['_id']) + + feature_['id'] = id_ + feature_['geometry'] = doc['_source'].get('geometry') + + if self.properties or self.select_properties: + LOGGER.debug('Filtering properties') + all_properties = self.get_properties() + + feature_thinned = { + 'id': id_, + 'type': feature_['type'], + 'geometry': feature_.get('geometry'), + 'properties': OrderedDict() + } + for p in all_properties: + try: + feature_thinned['properties'][p] = feature_['properties'][p] # noqa + except KeyError as err: + LOGGER.error(err) + raise ProviderQueryError() + + if feature_thinned: + return feature_thinned + else: + return feature_ + + def mask_prop(self, property_name): + """ + generate property name based on OpenSearch backend setup + + :param property_name: property name + + :returns: masked property name + """ + + return f'properties.{property_name}' + + def get_properties(self): + all_properties = [] + + LOGGER.debug(f'configured properties: {self.properties}') + LOGGER.debug(f'selected properties: {self.select_properties}') + + if not self.properties and not self.select_properties: + all_properties = self.get_fields() + if self.properties and self.select_properties: + all_properties = self.properties and self.select_properties + else: + all_properties = self.properties or self.select_properties + + LOGGER.debug(f'resulting properties: {all_properties}') + return all_properties + + def __repr__(self): + return f' {self.data}' + + +class OpenSearchCatalogueProvider(OpenSearchProvider): + """OpenSearch Provider""" + + def __init__(self, provider_def): + super().__init__(provider_def) + + def _excludes(self): + return [ + 'properties._metadata-anytext' + ] + + def get_fields(self): + fields = super().get_fields() + for i in self._excludes(): + if i in fields: + del fields[i] + + fields['q'] = {'type': 'string'} + + return fields + + def query(self, offset=0, limit=10, resulttype='results', + bbox=[], datetime_=None, properties=[], sortby=[], + select_properties=[], skip_geometry=False, q=None, + filterq=None, **kwargs): + + records = super().query( + offset=offset, limit=limit, + resulttype=resulttype, bbox=bbox, + datetime_=datetime_, properties=properties, + sortby=sortby, + select_properties=select_properties, + skip_geometry=skip_geometry, + q=q) + + return records + + def __repr__(self): + return f' {self.data}' + + +class OpenSearchQueryBuilder: + def __init__(self): + self._operation = None + self.must_value = {} + self.should_value = {} + self.mustnot_value = {} + self.filter_value = {} + + def must(self, must_value): + self.must_value = must_value + return self + + def should(self, should_value): + self.should_value = should_value + return self + + def must_not(self, mustnot_value): + self.mustnot_value = mustnot_value + return self + + def filter(self, filter_value): + self.filter_value = filter_value + return self + + @property + def operation(self): + return self._operation + + @operation.setter + def operation(self, value): + self._operation = value + + def build(self): + if self.must_value: + must_clause = self.must_value or {} + if self.should_value: + should_clause = self.should_value or {} + if self.mustnot_value: + mustnot_clause = self.mustnot_value or {} + if self.filter_value: + filter_clause = self.filter_value or {} + else: + filter_clause = {} + + # to figure out how to deal with logical operations + # return match_clause & range_clause + clauses = must_clause or should_clause or mustnot_clause + filters = filter_clause + if self.operation == 'and': + res = Q( + 'bool', + must=[clause for clause in clauses], + filter=[filter for filter in filters]) + elif self.operation == 'or': + res = Q( + 'bool', + should=[clause for clause in clauses], + filter=[filter for filter in filters]) + elif self.operation == 'not': + res = Q( + 'bool', + must_not=[clause for clause in clauses], + filter=[filter for filter in filters]) + else: + if filters: + res = Q( + 'bool', + must=[clauses], + filter=[filters]) + else: + res = Q( + 'bool', + must=[clauses]) + + return res + + +def _build_query(q, cql): + + # this would be handled by the AST with the traverse of CQL model + op, node = get_next_node(cql.__root__) + q.operation = op + if isinstance(node, list): + query_list = [] + for elem in node: + op, next_node = get_next_node(elem) + if not getattr(next_node, 'between', 0) == 0: + property = next_node.between.value.__root__.__root__.property + lower = next_node.between.lower.__root__.__root__ + upper = next_node.between.upper.__root__.__root__ + query_list.append(Q( + { + 'range': + { + f'{property}': { + 'gte': lower, 'lte': upper + } + } + } + )) + if not getattr(next_node, '__root__', 0) == 0: + scalars = tuple(next_node.__root__.eq.__root__) + property = scalars[0].__root__.property + value = scalars[1].__root__.__root__ + query_list.append(Q( + {'match': {f'{property}': f'{value}'}} + )) + q.must(query_list) + elif not getattr(node, 'between', 0) == 0: + property = node.between.value.__root__.__root__.property + lower = None + if not getattr(node.between.lower, + '__root__', 0) == 0: + lower = node.between.lower.__root__.__root__ + upper = None + if not getattr(node.between.upper, + '__root__', 0) == 0: + upper = node.between.upper.__root__.__root__ + query = Q( + { + 'range': + { + f'{property}': { + 'gte': lower, 'lte': upper + } + } + } + ) + q.must(query) + elif not getattr(node, '__root__', 0) == 0: + next_op, next_node = get_next_node(node) + if not getattr(next_node, 'eq', 0) == 0: + scalars = tuple(next_node.eq.__root__) + property = scalars[0].__root__.property + value = scalars[1].__root__.__root__ + query = Q( + {'match': {f'{property}': f'{value}'}} + ) + q.must(query) + elif not getattr(node, 'intersects', 0) == 0: + property = node.intersects.__root__[0].__root__.property + if property == 'geometry': + geom_type = node.intersects.__root__[ + 1].__root__.__root__.__root__.type + if geom_type == 'Polygon': + coordinates = node.intersects.__root__[ + 1].__root__.__root__.__root__.coordinates + coords_list = [ + poly_coords.__root__ for poly_coords in coordinates[0] + ] + filter_ = Q( + { + 'geo_shape': { + 'geometry': { + 'shape': { + 'type': 'envelope', + 'coordinates': get_envelope( + coords_list) + }, + 'relation': 'intersects' + } + } + } + ) + query_all = Q( + {'match_all': {}} + ) + q.must(query_all) + q.filter(filter_) + return q.build() + + +def update_query(input_query: Dict, cql: CQLModel): + s = Search.from_dict(input_query) + query = OpenSearchQueryBuilder() + output_query = _build_query(query, cql) + s = s.query(output_query) + + LOGGER.debug(f'Enhanced query: {json.dumps(s.to_dict())}') + return s.to_dict() diff --git a/requirements-provider.txt b/requirements-provider.txt index 74bc473..08c15ac 100644 --- a/requirements-provider.txt +++ b/requirements-provider.txt @@ -9,6 +9,8 @@ geoalchemy2 geopandas netCDF4 numpy==2.0.1 +opensearch-dsl +opensearch-py oracledb pandas psycopg2 diff --git a/tests/load_es_data.py b/tests/load_es_data.py index 90db8cb..3324a69 100644 --- a/tests/load_es_data.py +++ b/tests/load_es_data.py @@ -95,5 +95,5 @@ es.options(request_timeout=90).indices.create( with open(sys.argv[1], encoding='utf-8') as fh: d = json.load(fh) - # call generator function to yield features into ES build API + # call generator function to yield features into ES bulk API helpers.bulk(es, gendata(d)) diff --git a/tests/load_opensearch_data.py b/tests/load_opensearch_data.py new file mode 100644 index 0000000..42842d4 --- /dev/null +++ b/tests/load_opensearch_data.py @@ -0,0 +1,102 @@ +# ================================================================= +# +# Authors: Tom Kralidis +# +# Copyright (c) 2024 Tom Kralidis +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +import json +from pathlib import Path +import sys + +from opensearchpy import OpenSearch, helpers + +os_ = OpenSearch(['localhost:9209']) + +if len(sys.argv) < 3: + print(f'Usage: {sys.argv[0]} ') + sys.exit(1) + +index_name = Path(sys.argv[1]).stem.lower() +id_field = sys.argv[2] + +if os_.indices.exists(index=index_name): + os_.indices.delete(index=index_name) + +# index settings +body = { + 'settings': { + 'index': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + } + }, + 'mappings': { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + }, + 'properties': { + 'properties': { + 'nameascii': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + } + } + } + } + } +} + + +def gendata(data): + """ + Generator function to yield features + """ + + for f in data['features']: + try: + f['properties'][id_field] = int(f['properties'][id_field]) + except ValueError: + f['properties'][id_field] = f['properties'][id_field] + yield { + "_index": index_name, + "_id": f['properties'][id_field], + "_source": f + } + + +# create index +os_.indices.create(index=index_name, body=body) + +with open(sys.argv[1], encoding='utf-8') as fh: + d = json.load(fh) + + # call generator function to yield features into OpenSearch bulk API + helpers.bulk(os_, gendata(d)) diff --git a/tests/test_opensearch__provider.py b/tests/test_opensearch__provider.py new file mode 100644 index 0000000..3ba1801 --- /dev/null +++ b/tests/test_opensearch__provider.py @@ -0,0 +1,318 @@ +# ================================================================= +# +# Authors: Tom Kralidis +# +# Copyright (c) 2024 Tom Kralidis +# Copyright (c) 2024 Francesco Bartoli +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +import pytest + +from pygeoapi.provider.base import ProviderItemNotFoundError +from pygeoapi.provider.opensearch_ import OpenSearchProvider +from pygeoapi.models.cql import CQLModel + + +@pytest.fixture() +def config(): + return { + 'name': 'OpenSearch', + 'type': 'feature', + 'data': 'http://localhost:9209/ne_110m_populated_places_simple', # noqa + 'id_field': 'geonameid' + } + + +@pytest.fixture() +def config_ordered_properties(): + return { + 'name': 'OpenSearch', + 'type': 'feature', + 'data': 'http://localhost:9209/ne_110m_populated_places_simple', # noqa + 'id_field': 'geonameid', + 'properties': [ + 'adm0name', + 'adm1name' + ] + } + + +@pytest.fixture() +def config_cql(): + return { + 'name': 'OpenSearch', + 'type': 'feature', + 'data': 'http://localhost:9209/nhsl_hazard_threat_all_indicators_s_bc', # noqa + 'id_field': 'Sauid' + } + + +@pytest.fixture() +def between(): + between_ = { + "between": { + "value": {"property": "properties.pop_max"}, + "lower": 10000, + "upper": 100000 + } + } + return CQLModel.parse_obj(between_) + + +@pytest.fixture() +def between_upper(): + between_ = { + "between": { + "value": {"property": "properties.pop_max"}, + "upper": 100000 + } + } + return CQLModel.parse_obj(between_) + + +@pytest.fixture() +def between_lower(): + between_ = { + "between": { + "value": {"property": "properties.pop_max"}, + "lower": 10000 + } + } + return CQLModel.parse_obj(between_) + + +@pytest.fixture() +def eq(): + eq_ = { + "eq": [ + {"property": "properties.featurecla"}, + "Admin-0 capital" + ] + } + return CQLModel.parse_obj(eq_) + + +@pytest.fixture() +def _and(eq, between): + and_ = { + "and": [ + { + "between": { + "value": { + "property": "properties.pop_max" + }, + "lower": 100000, + "upper": 1000000 + } + }, + { + "eq": [ + {"property": "properties.featurecla"}, + "Admin-0 capital" + ] + } + ] + } + return CQLModel.parse_obj(and_) + + +@pytest.fixture() +def intersects(): + intersects = {"intersects": [ + {"property": "geometry"}, + { + "type": "Polygon", + "coordinates": [ + [ + [10.497565, 41.520355], + [10.497565, 43.308645], + [15.111823, 43.308645], + [15.111823, 41.520355], + [10.497565, 41.520355] + ] + ] + } + ]} + return CQLModel.parse_obj(intersects) + + +def test_query(config): + p = OpenSearchProvider(config) + + fields = p.get_fields() + assert len(fields) == 37 + assert fields['scalerank']['type'] == 'number' + assert fields['scalerank']['format'] == 'long' + assert fields['changed']['type'] == 'number' + assert fields['changed']['format'] == 'float' + assert fields['ls_name']['type'] == 'string' + + results = p.query() + assert len(results['features']) == 10 + assert results['numberMatched'] == 242 + assert results['numberReturned'] == 10 + assert results['features'][0]['id'] == 6691831 + assert results['features'][0]['properties']['nameascii'] == 'Vatican City' + + results = p.query(properties=[('nameascii', 'Vatican City')]) + assert len(results['features']) == 1 + assert results['numberMatched'] == 1 + assert results['numberReturned'] == 1 + + results = p.query(limit=1) + assert len(results['features']) == 1 + assert results['features'][0]['id'] == 6691831 + + results = p.query(offset=2, limit=1) + assert len(results['features']) == 1 + assert results['features'][0]['id'] == 3042030 + + results = p.query(sortby=[{'property': 'nameascii', 'order': '+'}]) + assert results['features'][0]['properties']['nameascii'] == 'Abidjan' + + results = p.query(sortby=[{'property': 'nameascii', 'order': '-'}]) + assert results['features'][0]['properties']['nameascii'] == 'Zagreb' + + results = p.query(sortby=[{'property': 'scalerank', 'order': '+'}]) + assert results['features'][0]['properties']['scalerank'] == 0 + + results = p.query(sortby=[{'property': 'scalerank', 'order': '-'}]) + assert results['features'][0]['properties']['scalerank'] == 8 + + assert len(results['features'][0]['properties']) == 37 + + results = p.query(sortby=[{'property': 'nameascii', 'order': '-'}], + limit=10001) + assert results['features'][0]['properties']['nameascii'] == 'Zagreb' + assert len(results['features']) == 242 + assert results['numberMatched'] == 242 + assert results['numberReturned'] == 242 + + results = p.query(select_properties=['nameascii']) + assert len(results['features'][0]['properties']) == 1 + + results = p.query(select_properties=['nameascii', 'scalerank']) + assert len(results['features'][0]['properties']) == 2 + + results = p.query(skip_geometry=True) + assert results['features'][0]['geometry'] is None + + config['properties'] = ['nameascii'] + p = OpenSearchProvider(config) + results = p.query() + assert len(results['features'][0]['properties']) == 1 + + +def test_query_ordered_properties(config_ordered_properties): + p = OpenSearchProvider(config_ordered_properties) + + result = p.query() + feature_properties = list(result['features'][0]['properties'].keys()) + + assert feature_properties == ['adm0name', 'adm1name'] + + +def test_get(config): + p = OpenSearchProvider(config) + + result = p.get('3413829') + assert result['id'] == 3413829 + assert result['properties']['ls_name'] == 'Reykjavik' + + +def test_get_not_existing_item_raise_exception(config): + """Testing query for a not existing object""" + p = OpenSearchProvider(config) + with pytest.raises(ProviderItemNotFoundError): + p.get('404') + + +def test_post_cql_json_between_query(config, between): + """Testing cql json query for a between object""" + p = OpenSearchProvider(config) + + results = p.query(limit=100, filterq=between) + + assert len(results['features']) == 23 + assert results['numberMatched'] == 23 + assert results['numberReturned'] == 23 + + for item in results['features']: + assert 10000 <= item["properties"]["pop_max"] <= 100000 + + +def test_post_cql_json_between_lte_query(config, between_upper): + """Testing cql json query for a between object""" + p = OpenSearchProvider(config) + + results = p.query(limit=100, filterq=between_upper) + + assert len(results['features']) == 28 + assert results['numberMatched'] == 28 + assert results['numberReturned'] == 28 + + for item in results['features']: + assert item["properties"]["pop_max"] <= 100000 + + +def test_post_cql_json_between_gte_query(config, between_lower): + """Testing cql json query for a between object""" + p = OpenSearchProvider(config) + + results = p.query(limit=500, filterq=between_lower) + + assert len(results['features']) == 237 + assert results['numberMatched'] == 237 + assert results['numberReturned'] == 237 + + for item in results['features']: + assert 10000 <= item["properties"]["pop_max"] + + +def test_post_cql_json_eq_query(config, eq): + """Testing cql json query for an eq object""" + p = OpenSearchProvider(config) + + results = p.query(limit=500, filterq=eq) + + assert len(results['features']) == 235 + + +def test_post_cql_json_and_query(config, _and): + """Testing cql json query for an and object""" + p = OpenSearchProvider(config) + + results = p.query(limit=1000, filterq=_and) + + assert len(results['features']) == 77 + + +def test_post_cql_json_intersects_query(config, intersects): + """Testing cql json query for an intersects object""" + p = OpenSearchProvider(config) + + results = p.query(limit=100, filterq=intersects) + + assert len(results['features']) == 2