diff --git a/docs/source/data-publishing/ogcapi-features.rst b/docs/source/data-publishing/ogcapi-features.rst index 30ccf07..00d48a0 100644 --- a/docs/source/data-publishing/ogcapi-features.rst +++ b/docs/source/data-publishing/ogcapi-features.rst @@ -27,6 +27,7 @@ parameters. `MongoDB`_,✅/❌,results,✅,✅,✅,✅,❌,❌,✅ `OGR`_,✅/❌,results/hits,✅,❌,❌,✅,❌,❌,✅ `Oracle`_,✅/✅,results/hits,✅,❌,✅,✅,❌,❌,✅ + `Parquet`_,✅/✅,results/hits,✅,✅,❌,✅,❌,❌,✅ `PostgreSQL`_,✅/✅,results/hits,✅,✅,✅,✅,✅,❌,✅ `SQLiteGPKG`_,✅/❌,results/hits,✅,❌,❌,✅,❌,❌,✅ `SensorThings API`_,✅/✅,results/hits,✅,✅,✅,✅,❌,❌,✅ @@ -432,6 +433,40 @@ useful e.g. for authorization at row level or manipulation of the explain plan w An example an more information about that feature you can find in the test class in tests/test_oracle_provider.py. +.. _Parquet: + +Parquet +^^^^^^^ + +.. note:: + Requires Python package pyarrow + +To publish a GeoParquet file (with a geometry column) the geopandas package is also required. + +.. note:: + Reading data directly from a public s3 bucket is also supported. + +.. code-block:: yaml + + providers: + - type: feature + name: Parquet + data: + source: ./tests/data/parquet/random.parquet + id_field: id + time_field: time + x_field: + - minlon + - maxlon + y_field: + - minlat + - maxlat + +For GeoParquet data, the `x_field` and `y_field` must be specified in the provider definition, +and they must be arrays of two column names that contain the x and y coordinates of the +bounding box of each geometry. If the geometries in the data are all points, the `x_field` and `y_field` +can be strings instead of arrays and refer to a single column each. + .. _PostgreSQL: PostgreSQL diff --git a/pygeoapi/plugin.py b/pygeoapi/plugin.py index 74ef732..ee46172 100644 --- a/pygeoapi/plugin.py +++ b/pygeoapi/plugin.py @@ -55,6 +55,7 @@ PLUGINS = { 'MVT-proxy': 'pygeoapi.provider.mvt_proxy.MVTProxyProvider', # noqa: E501 'OracleDB': 'pygeoapi.provider.oracle.OracleProvider', 'OGR': 'pygeoapi.provider.ogr.OGRProvider', + 'Parquet': 'pygeoapi.provider.parquet.ParquetProvider', 'PostgreSQL': 'pygeoapi.provider.postgresql.PostgreSQLProvider', 'rasterio': 'pygeoapi.provider.rasterio_.RasterioProvider', 'SensorThings': 'pygeoapi.provider.sensorthings.SensorThingsProvider', diff --git a/pygeoapi/provider/parquet.py b/pygeoapi/provider/parquet.py new file mode 100644 index 0000000..70ec81e --- /dev/null +++ b/pygeoapi/provider/parquet.py @@ -0,0 +1,458 @@ +# ================================================================= +# +# Authors: Leo Ghignone +# +# Copyright (c) 2024 Leo Ghignone +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +from itertools import chain +import json +import logging + +from dateutil.parser import isoparse +import geopandas as gpd +import pyarrow +import pyarrow.compute as pc +import pyarrow.dataset +import s3fs + +from pygeoapi.provider.base import ( + BaseProvider, + ProviderConnectionError, + ProviderGenericError, + ProviderItemNotFoundError, + ProviderQueryError, +) +from pygeoapi.util import crs_transform + +LOGGER = logging.getLogger(__name__) + + +def arrow_to_pandas_type(arrow_type): + pd_type = arrow_type.to_pandas_dtype() + try: + # Needed for specific types such as dtype(' pc.scalar(minx)) + & (pc.field(self.miny) > pc.scalar(miny)) + & (pc.field(self.maxx) < pc.scalar(maxx)) + & (pc.field(self.maxy) < pc.scalar(maxy)) + ) + + if datetime_ is not None: + if self.time_field is None: + msg = ( + 'Dataset does not have a time field, ' + 'querying by datetime is not supported.' + ) + raise ProviderQueryError(msg) + timefield = pc.field(self.time_field) + if '/' in datetime_: + begin, end = datetime_.split('/') + if begin != '..': + begin = isoparse(begin) + filter = filter & (timefield >= begin) + if end != '..': + end = isoparse(end) + filter = filter & (timefield <= end) + else: + target_time = isoparse(datetime_) + filter = filter & (timefield == target_time) + + if properties: + LOGGER.debug('processing properties') + for name, value in properties: + field = self.ds.schema.field(name) + pd_type = arrow_to_pandas_type(field.type) + expr = pc.field(name) == pc.scalar(pd_type(value)) + + filter = filter & expr + + if len(select_properties) == 0: + select_properties = self.ds.schema.names + else: # Load id and geometry together with any specified columns + if self.has_geometry and 'geometry' not in select_properties: + select_properties.append('geometry') + if self.id_field not in select_properties: + select_properties.insert(0, self.id_field) + + if skip_geometry: + select_properties.remove('geometry') + + # Make response based on resulttype specified + if resulttype == 'hits': + LOGGER.debug('hits only specified') + result = self._response_feature_hits(filter) + elif resulttype == 'results': + LOGGER.debug('results specified') + result = self._response_feature_collection( + filter, offset, limit, columns=select_properties + ) + else: + LOGGER.error(f'Invalid resulttype: {resulttype}') + + except RuntimeError as err: + LOGGER.error(err) + raise ProviderQueryError(err) + except ProviderConnectionError as err: + LOGGER.error(err) + raise ProviderConnectionError(err) + except Exception as err: + LOGGER.error(err) + raise ProviderGenericError(err) + + return result + + @crs_transform + def get(self, identifier, **kwargs): + """ + Get Feature by id + + :param identifier: feature id + + :returns: a single feature + """ + result = None + try: + LOGGER.debug(f'Fetching identifier {identifier}') + id_type = arrow_to_pandas_type( + self.ds.schema.field(self.id_field).type) + batches = self._read_parquet( + filter=( + pc.field(self.id_field) == pc.scalar(id_type(identifier)) + ) + ) + + for batch in batches: + if batch.num_rows > 0: + assert ( + batch.num_rows == 1 + ), f'Multiple items found with ID {identifier}' + row = batch.to_pandas() + break + else: + raise ProviderItemNotFoundError(f'ID {identifier} not found') + + if self.has_geometry: + geom = gpd.GeoSeries.from_wkb(row['geometry'], crs=self.crs) + else: + geom = [None] + gdf = gpd.GeoDataFrame(row, geometry=geom) + LOGGER.debug('results computed') + + # Grab the collection from geopandas geo_interface + result = gdf.__geo_interface__['features'][0] + + except RuntimeError as err: + LOGGER.error(err) + raise ProviderQueryError(err) + except ProviderConnectionError as err: + LOGGER.error(err) + raise ProviderConnectionError(err) + except ProviderItemNotFoundError as err: + LOGGER.error(err) + raise ProviderItemNotFoundError(err) + except Exception as err: + LOGGER.error(err) + raise ProviderGenericError(err) + + return result + + def __repr__(self): + return f' {self.data}' + + def _response_feature_collection(self, filter, offset, limit, + columns=None): + """ + Assembles output from query as + GeoJSON FeatureCollection structure. + + :returns: GeoJSON FeatureCollection + """ + + LOGGER.debug(f'offset:{offset}, limit:{limit}') + + try: + batches, scanner = self._read_parquet( + filter=filter, columns=columns, return_scanner=True + ) + + # Discard batches until offset is reached + counted = 0 + for batch in batches: + if counted + batch.num_rows > offset: + # Slice current batch to start from the requested row + batch = batch.slice(offset=offset - counted) + # Build a new generator yielding the current batch + # and all following ones + + batches = chain([batch], batches) + break + else: + counted += batch.num_rows + + # batches is a generator, it will now be either fully spent + # or set to the new generator starting from offset + + # Get the next `limit+1` rows + # The extra row is used to check if a "next" link is needed + # (when numberMatched > offset + limit) + batches_list = [] + read = 0 + + for batch in batches: + read += batch.num_rows + if read > limit: + batches_list.append(batch.slice(0, limit + 1)) + break + else: + batches_list.append(batch) + + # Passing schema from scanner in case no rows are returned + table = pyarrow.Table.from_batches( + batches_list, schema=scanner.projected_schema + ) + + rp = table.to_pandas() + + number_matched = offset + len(rp) + + # Remove the extra row + if len(rp) > limit: + rp = rp.iloc[:-1] + + if 'geometry' not in rp.columns: + # We need a null geometry column to create a GeoDataFrame + rp['geometry'] = None + geom = gpd.GeoSeries.from_wkb(rp['geometry']) + else: + geom = gpd.GeoSeries.from_wkb(rp['geometry'], crs=self.crs) + + gdf = gpd.GeoDataFrame(rp, geometry=geom) + LOGGER.debug('results computed') + result = gdf.__geo_interface__ + + # Add numberMatched to generate "next" link + result['numberMatched'] = number_matched + + return result + + except RuntimeError as error: + LOGGER.error(error) + raise error + + def _response_feature_hits(self, filter): + """ + Assembles GeoJSON hits from row count + + :returns: GeoJSON FeaturesCollection + """ + + try: + scanner = pyarrow.dataset.Scanner.from_dataset(self.ds, + filter=filter) + return { + 'type': 'FeatureCollection', + 'numberMatched': scanner.count_rows(), + 'features': [], + } + except Exception as error: + LOGGER.error(error) + raise error diff --git a/requirements-provider.txt b/requirements-provider.txt index b42210f..3f5bdb7 100644 --- a/requirements-provider.txt +++ b/requirements-provider.txt @@ -6,11 +6,13 @@ elasticsearch-dsl fiona GDAL<=3.8.4 geoalchemy2 +geopandas netCDF4 numpy oracledb pandas psycopg2 +pyarrow pygeofilter[backend-sqlalchemy] pygeoif pygeometa diff --git a/tests/data/random.parquet b/tests/data/random.parquet new file mode 100644 index 0000000..f8168f4 Binary files /dev/null and b/tests/data/random.parquet differ diff --git a/tests/data/random_nogeom.parquet b/tests/data/random_nogeom.parquet new file mode 100644 index 0000000..e29695d Binary files /dev/null and b/tests/data/random_nogeom.parquet differ diff --git a/tests/test_parquet_provider.py b/tests/test_parquet_provider.py new file mode 100644 index 0000000..eaa28a8 --- /dev/null +++ b/tests/test_parquet_provider.py @@ -0,0 +1,211 @@ +# ================================================================= +# +# Authors: Leo Ghignone +# +# Copyright (c) 2024 Leo Ghignone +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +import pytest + +from pygeoapi.provider.base import ProviderItemNotFoundError +from pygeoapi.provider.parquet import ParquetProvider + +from .util import get_test_file_path + +path = get_test_file_path( + 'data/random.parquet') + +path_nogeom = get_test_file_path( + 'data/random_nogeom.parquet') + + +@pytest.fixture() +def config_parquet(): + return { + 'name': 'Parquet', + 'type': 'feature', + 'data': { + 'source_type': 'Parquet', + 'source': path, + }, + 'id_field': 'id', + 'time_field': 'time', + 'x_field': 'lon', + 'y_field': 'lat', + } + + +@pytest.fixture() +def config_parquet_nogeom_notime(): + return { + 'name': 'ParquetNoGeomNoTime', + 'type': 'feature', + 'data': { + 'source_type': 'Parquet', + 'source': path_nogeom, + }, + 'id_field': 'id' + } + + +def test_get_fields(config_parquet): + """Testing field types""" + + p = ParquetProvider(config_parquet) + results = p.get_fields() + assert results['lat']['type'] == 'number' + assert results['lon']['format'] == 'double' + assert results['time']['format'] == 'date-time' + + +def test_get(config_parquet): + """Testing query for a specific object""" + + p = ParquetProvider(config_parquet) + result = p.get('42') + assert result['id'] == '42' + assert result['properties']['lon'] == 4.947447 + + +def test_get_not_existing_feature_raise_exception( + config_parquet +): + """Testing query for a not existing object""" + p = ParquetProvider(config_parquet) + with pytest.raises(ProviderItemNotFoundError): + p.get(-1) + + +def test_query_hits(config_parquet): + """Testing query on entire collection for hits""" + + p = ParquetProvider(config_parquet) + feature_collection = p.query(resulttype='hits') + assert feature_collection.get('type') == 'FeatureCollection' + features = feature_collection.get('features') + assert len(features) == 0 + hits = feature_collection.get('numberMatched') + assert hits is not None + assert hits == 100 + + +def test_query_bbox_hits(config_parquet): + """Testing query for a valid JSON object with geometry""" + + p = ParquetProvider(config_parquet) + feature_collection = p.query( + bbox=[100, -50, 150, 0], + resulttype='hits') + assert feature_collection.get('type') == 'FeatureCollection' + features = feature_collection.get('features') + assert len(features) == 0 + hits = feature_collection.get('numberMatched') + assert hits is not None + assert hits == 6 + + +def test_query_with_limit(config_parquet): + """Testing query for a valid JSON object with geometry""" + + p = ParquetProvider(config_parquet) + feature_collection = p.query(limit=2, resulttype='results') + assert feature_collection.get('type') == 'FeatureCollection' + features = feature_collection.get('features') + assert len(features) == 2 + hits = feature_collection.get('numberMatched') + assert hits > 2 + feature = features[0] + properties = feature.get('properties') + assert properties is not None + geometry = feature.get('geometry') + assert geometry is not None + + +def test_query_with_offset(config_parquet): + """Testing query for a valid JSON object with geometry""" + + p = ParquetProvider(config_parquet) + feature_collection = p.query(offset=20, limit=10, resulttype='results') + assert feature_collection.get('type') == 'FeatureCollection' + features = feature_collection.get('features') + assert len(features) == 10 + hits = feature_collection.get('numberMatched') + assert hits > 30 + feature = features[0] + properties = feature.get('properties') + assert properties is not None + assert feature['id'] == '21' + assert properties['lat'] == 66.264988 + geometry = feature.get('geometry') + assert geometry is not None + + +def test_query_with_property(config_parquet): + """Testing query for a valid JSON object with property filter""" + + p = ParquetProvider(config_parquet) + feature_collection = p.query( + resulttype='results', + properties=[('lon', -12.855022)]) + assert feature_collection.get('type') == 'FeatureCollection' + features = feature_collection.get('features') + assert len(features) == 1 + for feature in features: + assert feature['properties']['lon'] == -12.855022 + + +def test_query_with_skip_geometry(config_parquet): + """Testing query for a valid JSON object with property filter""" + + p = ParquetProvider(config_parquet) + feature_collection = p.query(skip_geometry=True) + for feature in feature_collection['features']: + assert feature.get('geometry') is None + + +def test_query_with_datetime(config_parquet): + """Testing query for a valid JSON object with time""" + + p = ParquetProvider(config_parquet) + feature_collection = p.query( + datetime_='2022-05-01T00:00:00Z/2022-05-31T23:59:59Z') + assert feature_collection.get('type') == 'FeatureCollection' + features = feature_collection.get('features') + assert len(features) == 7 + for feature in feature_collection['features']: + time = feature['properties'][config_parquet['time_field']] + assert time.year == 2022 + assert time.month == 5 + + +def test_query_nogeom(config_parquet_nogeom_notime): + """Testing query for a valid JSON object without geometry""" + + p = ParquetProvider(config_parquet_nogeom_notime) + feature_collection = p.query(resulttype='results') + assert feature_collection.get('type') == 'FeatureCollection' + assert len(feature_collection.get('features')) > 0 + for feature in feature_collection['features']: + assert feature.get('geometry') is None