From 0ae95cb5547cffa562b72d0c866432fdb7a26da0 Mon Sep 17 00:00:00 2001 From: Tom Kralidis Date: Mon, 22 Nov 2021 07:04:58 -0500 Subject: [PATCH] fix OACov subset parsing (#818) --- pygeoapi/api.py | 78 ++++++++++++++++++++++++++++++++++------------- tests/test_api.py | 19 +++++++++++- 2 files changed, 75 insertions(+), 22 deletions(-) diff --git a/pygeoapi/api.py b/pygeoapi/api.py index 60ffe52..25eb2a3 100644 --- a/pygeoapi/api.py +++ b/pygeoapi/api.py @@ -2020,30 +2020,21 @@ class API: 400, headers, format_, 'InvalidParameterValue', msg) if 'subset' in request.params: - subsets = {} LOGGER.debug('Processing subset parameter') - for s in (request.params['subset'] or '').split(','): - try: - if '"' not in s: - m = re.search(r'(.*)\((.*):(.*)\)', s) - else: - m = re.search(r'(.*)\(\"(\S+)\":\"(\S+.*)\"\)', s) - - subset_name = m.group(1) - - if subset_name not in p.axes: - msg = 'Invalid axis name' - return self.get_exception( - 400, headers, format_, - 'InvalidParameterValue', msg) - - subsets[subset_name] = list(map( - get_typed_value, m.group(2, 3))) - except AttributeError: - msg = 'subset should be like "axis(min:max)"' - return self.get_exception( + try: + subsets = validate_subset(request.params['subset'] or '') + except (AttributeError, ValueError) as err: + msg = 'Invalid subset: {}'.format(err) + LOGGER.error(msg) + return self.get_exception( 400, headers, format_, 'InvalidParameterValue', msg) + if not set(subsets.keys()).issubset(p.axes): + msg = 'Invalid axis name' + LOGGER.error(msg) + return self.get_exception( + 400, headers, format_, 'InvalidParameterValue', msg) + query_args['subsets'] = subsets LOGGER.debug('Subsets: {}'.format(query_args['subsets'])) @@ -3331,3 +3322,48 @@ def validate_datetime(resource_def, datetime_=None) -> str: raise ValueError(msg) return datetime_ + + +def validate_subset(value: str) -> dict: + """ + Helper function to validate subset parameter + + :param value: `subset` parameter + + :returns: dict of axis/values + """ + + subsets = {} + + for s in value.split(','): + LOGGER.debug('Processing subset {}'.format(s)) + m = re.search(r'(.*)\((.*)\)', s) + subset_name, values = m.group(1, 2) + + if '"' in values: + LOGGER.debug('Values are strings') + if values.count('"') % 2 != 0: + msg = 'Invalid format: subset should be like axis("min"[:"max"])' # noqa + LOGGER.error(msg) + raise ValueError(msg) + try: + LOGGER.debug('Value is an interval') + m = re.search(r'"(\S+)":"(\S+)"', values) + values = list(m.group(1, 2)) + except AttributeError: + LOGGER.debug('Value is point') + m = re.search(r'"(.*)"', values) + values = [m.group(1)] + else: + LOGGER.debug('Values are numbers') + try: + LOGGER.debug('Value is an interval') + m = re.search(r'(\S+):(\S+)', values) + values = list(m.group(1, 2)) + except AttributeError: + LOGGER.debug('Value is point') + values = [values] + + subsets[subset_name] = list(map(get_typed_value, values)) + + return subsets diff --git a/tests/test_api.py b/tests/test_api.py index 18e9267..692aeec 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -36,7 +36,7 @@ from pyld import jsonld import pytest from pygeoapi.api import ( API, APIRequest, FORMAT_TYPES, validate_bbox, validate_datetime, - F_HTML, F_JSON, F_JSONLD, F_GZIP + validate_subset, F_HTML, F_JSON, F_JSONLD, F_GZIP ) from pygeoapi.util import yaml_load @@ -1555,6 +1555,23 @@ def test_validate_datetime(): _ = validate_datetime(config, '../1999') +@pytest.mark.parametrize("value, expected", [ + ('time(2000-11-11)', {'time': ['2000-11-11']}), + ('time("2000-11-11")', {'time': ['2000-11-11']}), + ('time("2000-11-11T00:11:11")', {'time': ['2000-11-11T00:11:11']}), + ('time("2000-11-11T11:12:13":"2021-12-22T:13:33:33")', {'time': ['2000-11-11T11:12:13', '2021-12-22T:13:33:33']}), # noqa + ('lat(40)', {'lat': [40]}), + ('lat(0:40)', {'lat': [0, 40]}), + ('foo("bar")', {'foo': ['bar']}), + ('foo("bar":"baz")', {'foo': ['bar', 'baz']}) +]) +def test_validate_subset(value, expected): + assert validate_subset(value) == expected + + with pytest.raises(ValueError): + validate_subset('foo("bar)') + + def test_get_exception(config, api_): d = api_.get_exception(500, {}, 'json', 'NoApplicableCode', 'oops') assert d[0] == {}