OGC API - Processes CITE compliance (#1311)

* update OGC API - Processes CITE compliance
This commit is contained in:
Tom Kralidis
2024-03-06 10:37:12 -05:00
committed by GitHub
parent 8d377072b9
commit 5accd7f190
16 changed files with 177 additions and 93 deletions
+1 -1
View File
@@ -121,7 +121,7 @@ jobs:
- name: setup test data ⚙️
run: |
python3 tests/load_es_data.py tests/data/ne_110m_populated_places_simple.geojson geonameid
python3 tests/load_es_data.py tests/cite/ogcapi-features/canada-hydat-daily-mean-02HC003.geojson IDENTIFIER
python3 tests/load_es_data.py tests/cite/canada-hydat-daily-mean-02HC003.geojson IDENTIFIER
python3 tests/load_mongo_data.py tests/data/ne_110m_populated_places_simple.geojson
gunzip < tests/data/hotosm_bdi_waterways.sql.gz | psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test
psql postgresql://postgres:${{ secrets.DatabasePassword || 'postgres' }}@localhost:5432/test -f tests/data/dummy_data.sql
-1
View File
@@ -24,4 +24,3 @@ Top level code documentation. Follow the links in each section for module/class
:members:
:private-members:
:special-members:
@@ -108,7 +108,6 @@ Processing examples
-H "Prefer: respond-async"
-d "{\"inputs\":{\"name\": \"hi there2\"}}"
.. todo:: add more examples once OAProc implementation is complete
.. _`OGC API - Processes`: https://ogcapi.ogc.org/processes
.. _`sample`: https://github.com/geopython/pygeoapi/blob/master/pygeoapi/process/hello_world.py
+73 -16
View File
@@ -83,9 +83,9 @@ the following contents:
# module: myplugin.cli
import click
@click.command(name="super-command")
@click.command(name='super-command')
def my_cli_command():
print("Hello, this is my custom pygeoapi CLI command!")
print('Hello, this is my custom pygeoapi CLI command!')
Then, in your plugin's ``setup.py`` file, specify the entrypoints section:
@@ -104,7 +104,7 @@ Alternatively, if using a ``pyproject.toml`` file instead:
# file: pyproject.toml
# Noter that this example uses poetry, other Python projects may differ in
# how they expect entry_points to be specified
[tool.poetry.plugins."pygeoapi"]
[tool.poetry.plugins.'pygeoapi']
my-plugin = 'myplugin.cli:my_cli_command'
@@ -180,7 +180,7 @@ The below template provides a minimal example (let's call the file ``mycoolvecto
# optionally specify the output filename pygeoapi can use as part
# of the response (HTTP Content-Disposition header)
self.filename = "my-cool-filename.dat"
self.filename = 'my-cool-filename.dat'
# open data file (self.data) and process, return
return {
@@ -256,7 +256,7 @@ The below template provides a minimal example (let's call the file ``mycoolraste
# optionally specify the output filename pygeoapi can use as part
of the response (HTTP Content-Disposition header)
self.filename = "my-cool-filename.dat"
self.filename = 'my-cool-filename.dat'
if format_ == 'json':
# return a CoverageJSON representation
@@ -265,13 +265,78 @@ The below template provides a minimal example (let's call the file ``mycoolraste
# return default (likely binary) representation
return bytes(112)
For brevity, the above code will always JSON for metadata and binary or CoverageJSON for the data. In reality, the plugin
For brevity, the above code will always return JSON for metadata and binary or CoverageJSON for the data. In reality, the plugin
developer would connect to a data source with capabilities to run queries and return a relevant result set,
As long as the plugin implements the API contract of its base provider, all other functionality is left to the provider
implementation.
Each base class documents the functions, arguments and return types required for implementation.
Example: custom pygeoapi processing plugin
------------------------------------------
Let's consider a simple process plugin to calculate a square root from a number:
Python code
^^^^^^^^^^^
The below template provides a minimal example (let's call the file ``mycoolsqrtprocess.py``:
.. code-block:: python
import math
from pygeoapi.process.base import BaseProcessor, ProcessorExecuteError
PROCESS_METADATA = {
# reduced for brevity (see examples of PROCESS_METADATA in pygeoapi/process/hello_world.py)
}
class MyCoolSqrtProcessor(BaseProcessor)
"""My cool sqrt process plugin"""
def __init__(self, processor_def):
"""
Initialize object
:param processor_def: provider definition
:returns: pygeoapi.process.mycoolsqrtprocess.MyCoolSqrtProcessor
"""
super().__init__(processor_def, PROCESS_METADATA)
def execute(self, data):
mimetype = 'application/json'
number = data.get('number')
if number is None:
raise ProcessorExecuteError('Cannot process without a number')
try:
number = float(data.get('number'))
except TypeError:
raise ProcessorExecuteError('Number required')
value = math.sqrt(number)
outputs = {
'id': 'sqrt',
'value': value
}
return mimetype, outputs
def __repr__(self):
return f'<MyCoolSqrtProcessor> {self.name}'
The example above handles a dictionary of the JSON payload passed from the client, calculates the square root of a float or integer, and returns the result in an output JSON payload. The plugin is responsible for defining the expected inputs and outputs in ``PROCESS_METADATA`` and to return the output in any format along with the corresponding media type.
.. note::
Additional processing plugins can also be found in ``pygeoapi/process``.
Example: custom pygeoapi formatter
----------------------------------
@@ -298,22 +363,14 @@ The below template provides a minimal example (let's call the file ``mycooljsonf
def write(self, options={}, data=None):
"""custom writer"""
out_data {'rows': []}
out_data = {'rows': []}
for feature in data['features']:
out_data.append(feature['properties'])
out_data['rows'].append(feature['properties'])
return out_data
Processing plugins
------------------
Processing plugins are following the OGC API - Processes development. Given that the specification is
under development, the implementation in ``pygeoapi/process/hello_world.py`` provides a suitable example
for the time being.
Featured plugins
----------------
+2 -3
View File
@@ -172,7 +172,7 @@ By hot-reloading we mean to be able to directly see changes reflected in the app
This is useful for development, as the changes made by developers are easily and rapidly reflected and they can take advantage
of the hot-reloading capabilities that offer each of the web servers available.
For enabling hot-reloading, install the pygeoapi package using pip (instead of the setup.py script) with the following command:
To enable hot-reloading, install the pygeoapi package using pip (instead of `setup.py`) with the following command:
.. code-block:: bash
@@ -181,8 +181,7 @@ For enabling hot-reloading, install the pygeoapi package using pip (instead of t
.. note::
This command must be run from the root directory of pygeoapi.
After the local package is built, you can use the ``pygeoapi serve``
again and the changes on the codebase will be directly reflected on the running instance.
After the local package is built, run ``pygeoapi serve`` again and the changes to the codebase will be reflected live on the running instance.
Running in production
+8 -3
View File
@@ -3377,6 +3377,7 @@ class API:
}
for job_ in jobs:
job2 = {
'type': 'process',
'processID': job_['process_id'],
'jobID': job_['identifier'],
'status': job_['status'],
@@ -3395,21 +3396,22 @@ class API:
job2['links'] = [{
'href': f'{job_result_url}?f={F_HTML}',
'rel': 'about',
'rel': 'http://www.opengis.net/def/rel/ogc/1.0/results',
'type': FORMAT_TYPES[F_HTML],
'title': f'results of job {job_id} as HTML'
}, {
'href': f'{job_result_url}?f={F_JSON}',
'rel': 'about',
'rel': 'http://www.opengis.net/def/rel/ogc/1.0/results',
'type': FORMAT_TYPES[F_JSON],
'title': f'results of job {job_id} as JSON'
}]
if job_['mimetype'] not in (FORMAT_TYPES[F_JSON],
FORMAT_TYPES[F_HTML]):
job2['links'].append({
'href': job_result_url,
'rel': 'about',
'rel': 'http://www.opengis.net/def/rel/ogc/1.0/results', # noqa
'type': job_['mimetype'],
'title': f"results of job {job_id} as {job_['mimetype']}" # noqa
})
@@ -3519,6 +3521,8 @@ class API:
if status == JobStatus.accepted:
http_status = HTTPStatus.CREATED
elif status == JobStatus.failed:
http_status = HTTPStatus.BAD_REQUEST
else:
http_status = HTTPStatus.OK
@@ -3984,6 +3988,7 @@ class API:
LOGGER.error(description)
exception = {
'code': code,
'type': code,
'description': description
}
+2 -1
View File
@@ -70,7 +70,8 @@ PLUGINS = {
'CSV': 'pygeoapi.formatter.csv_.CSVFormatter'
},
'process': {
'HelloWorld': 'pygeoapi.process.hello_world.HelloWorldProcessor'
'HelloWorld': 'pygeoapi.process.hello_world.HelloWorldProcessor',
'Echo': 'pygeoapi.process.echo.EchoProcessor'
},
'process_manager': {
'Dummy': 'pygeoapi.process.manager.dummy.DummyManager',
+46 -41
View File
@@ -1,8 +1,10 @@
# =================================================================
#
# Authors: Alexander Pilz <a.pilz@52north.org>
# Tom Kralidis <tomkralidis@gmail.com>
#
# Copyright (c) 2023 Alexander Pilz
# Copyright (c) 2023 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
@@ -26,6 +28,7 @@
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
import logging
import time
@@ -35,40 +38,40 @@ LOGGER = logging.getLogger(__name__)
#: Process metadata and description
PROCESS_METADATA = {
"id": "echo",
"title": "Echo Process",
"description": "Testable Echo process.",
"version": "1.0.0",
"jobControlOptions": [
"async-execute",
"sync-execute"
'id': 'echo',
'title': 'Echo Process',
'description': 'Testable Echo process.',
'version': '1.0.0',
'jobControlOptions': [
'async-execute',
'sync-execute'
],
"outputTransmission": [
"value",
"reference"
'outputTransmission': [
'value',
'reference'
],
"inputs": {
"echoInput": {
"title": "Echo value",
"description": "Value to be echoed back.",
"minOccurs": 1,
"maxOccurs": 1,
"schema": {
"type": "string",
"enum": [
"Echo",
"Test",
"42"
'inputs': {
'echoInput': {
'title': 'Echo value',
'description': 'Value to be echoed back.',
'minOccurs': 1,
'maxOccurs': 1,
'schema': {
'type': 'string',
'enum': [
'Echo',
'Test',
'42'
]
}},
"pause": {
"title": "Pause value",
"description": "Value to control the processing time.",
"minOccurs": 1,
"maxOccurs": 1,
"schema": {
"type": "float",
"enum": [
'pause': {
'title': 'Pause value',
'description': 'Value to control the processing time.',
'minOccurs': 1,
'maxOccurs': 1,
'schema': {
'type': 'number',
'enum': [
5.5,
10.25,
42.0
@@ -76,14 +79,14 @@ PROCESS_METADATA = {
}
}
},
"outputs": {
"echoOutput": {
"schema": {
"type": "string"
'outputs': {
'echoOutput': {
'schema': {
'type': 'string'
}
}
},
"links": [{
'links': [{
'type': 'text/html',
'rel': 'about',
'title': 'information',
@@ -99,7 +102,7 @@ PROCESS_METADATA = {
}
class echoProcessor(BaseProcessor):
class EchoProcessor(BaseProcessor):
"""Echo Processor example"""
def __init__(self, processor_def):
"""
@@ -107,7 +110,7 @@ class echoProcessor(BaseProcessor):
:param processor_def: provider definition
:returns: pygeoapi.process.echo.echoProcessor
:returns: pygeoapi.process.echo.EchoProcessor
"""
super().__init__(processor_def, PROCESS_METADATA)
@@ -116,24 +119,26 @@ class echoProcessor(BaseProcessor):
mimetype = 'application/json'
echo = data.get('echoInput', None)
pause = data.get('pause', None)
echo = data.get('echoInput')
pause = data.get('pause')
if echo is None:
raise ProcessorExecuteError(
'Cannot run process without echo value')
if not isinstance(echo, str):
raise ProcessorExecuteError(
'Cannot run process with echo not of type String')
'Cannot run process with echo not of type string')
outputs = {
'id': 'echoOutput',
'value': echo
}
if pause is not None and isinstance(pause, float):
time.sleep(pause)
return mimetype, outputs
def __repr__(self):
return '<echoProcessor> {}'.format(self.name)
return f'<EchoProcessor> {self.name}'
-2
View File
@@ -69,7 +69,6 @@ PROCESS_METADATA = {
},
'minOccurs': 1,
'maxOccurs': 1,
'metadata': None, # TODO how to use?
'keywords': ['full name', 'personal']
},
'message': {
@@ -80,7 +79,6 @@ PROCESS_METADATA = {
},
'minOccurs': 0,
'maxOccurs': 1,
'metadata': None,
'keywords': ['message']
}
},
+4 -2
View File
@@ -219,6 +219,7 @@ class BaseManager:
current_status = JobStatus.accepted
job_metadata = {
'type': 'process',
'identifier': job_id,
'process_id': process_id,
'job_start_datetime': datetime.utcnow().strftime(
@@ -226,7 +227,7 @@ class BaseManager:
'job_end_datetime': None,
'status': current_status.value,
'location': None,
'mimetype': None,
'mimetype': 'application/octet-stream',
'message': 'Job accepted and ready for execution',
'progress': 5
}
@@ -289,6 +290,7 @@ class BaseManager:
current_status = JobStatus.failed
code = 'InvalidParameterValue'
outputs = {
'type': code,
'code': code,
'description': 'Error updating job'
}
@@ -298,7 +300,7 @@ class BaseManager:
DATETIME_FORMAT),
'status': current_status.value,
'location': None,
'mimetype': None,
'mimetype': 'application/octet-stream',
'message': f'{code}: {outputs["description"]}'
}
+29
View File
@@ -0,0 +1,29 @@
# CITE testing for OGC API capabilities
- OGC API - Features
- OGC API - Processes
## Test data
### OGC API - Features
Test data used is a subset of the [Canadian National Water Data Archive](https://www.canada.ca/en/environment-climate-change/services/water-overview/quantity/monitoring/survey/data-products-services/national-archive-hydat.html) as extracted from the [MSC GeoMet OGC API](https://eccc-msc.github.io/open-data/msc-geomet/web-services_en/#ogc-api-features) service.
### OGC API - Processes
The `hello-world` test process that is provided with pygeoapi by default is used.
Process job management is configured in `server.manager` in support of asynchronous testing.
## Running
```bash
# install pygeoapi as per https://pygeoapi.io/#install-in-5-minutes
# the service needs to run with HTTP 1.1 support, so let's install gunicorn
# remove job manager
rm -f /tmp/pygeoapi-process-manager.db*
pip3 install gunicorn
cd tests/cite
. cite.env
python3 ../load_es_data.py ./canada-hydat-daily-mean-02hc003.geojson IDENTIFIER
pygeoapi openapi generate $PYGEOAPI_CONFIG --output-file $PYGEOAPI_OPENAPI
gunicorn pygeoapi.flask_app:APP -b 0.0.0.0:5001 --access-logfile '-'
```
@@ -13,6 +13,10 @@ server:
map:
url: https://tile.openstreetmap.org/{z}/{x}/{y}.png
attribution: '&copy; <a href="https://openstreetmap.org/copyright">OpenStreetMap contributors</a>'
manager:
name: TinyDB
connection: /tmp/pygeoapi-process-manager.db
output_dir: /tmp/
logging:
level: ERROR
@@ -99,3 +103,8 @@ resources:
data: http://localhost:9200/canada-hydat-daily-mean-02hc003
id_field: IDENTIFIER
time_field: DATE
hello-world:
type: process
processor:
name: HelloWorld
-19
View File
@@ -1,19 +0,0 @@
# CITE testing for OGC API - Features
## Test data
Test data used is a subset of the [Canadian National Water Data Archive](https://www.canada.ca/en/environment-climate-change/services/water-overview/quantity/monitoring/survey/data-products-services/national-archive-hydat.html)
as extracted from the [MSC GeoMet OGC API](https://eccc-msc.github.io/open-data/msc-geomet/web-services_en/#ogc-api-features) service.
## Running
```bash
# install pygeoapi as per https://pygeoapi.io/#install-in-5-minutes
# the service needs to run with HTTP 1.1 support, so let's install gunicorn
pip install gunicorn
cd tests/cite/ogcapi-features
. cite.env
python ../../load_es_data.py ./canada-hydat-daily-mean-02hc003.geojson IDENTIFIER
pygeoapi openapi generate $PYGEOAPI_CONFIG --output-file $PYGEOAPI_OPENAPI
gunicorn pygeoapi.flask_app:APP -b 0.0.0.0:5001 --access-logfile '-'
```
+3 -3
View File
@@ -1812,7 +1812,7 @@ def test_execute_process(config, api_):
rsp_headers, code, response = api_.execute_process(req, 'hello-world')
data = json.loads(response)
assert code == HTTPStatus.OK
assert code == HTTPStatus.BAD_REQUEST
assert 'Location' in rsp_headers
assert data['code'] == 'InvalidParameterValue'
cleanup_jobs.add(tuple(['hello-world',
@@ -1821,7 +1821,7 @@ def test_execute_process(config, api_):
req = mock_request(data=req_body_5)
rsp_headers, code, response = api_.execute_process(req, 'hello-world')
data = json.loads(response)
assert code == HTTPStatus.OK
assert code == HTTPStatus.BAD_REQUEST
assert 'Location' in rsp_headers
assert data['code'] == 'InvalidParameterValue'
assert data['description'] == 'Error updating job'
@@ -1833,7 +1833,7 @@ def test_execute_process(config, api_):
rsp_headers, code, response = api_.execute_process(req, 'hello-world')
data = json.loads(response)
assert code == HTTPStatus.OK
assert code == HTTPStatus.BAD_REQUEST
assert 'Location' in rsp_headers
assert data['code'] == 'InvalidParameterValue'
assert data['description'] == 'Error updating job'