add processing framework (#84)

This commit is contained in:
Tom Kralidis
2019-03-05 06:36:43 -05:00
committed by GitHub
parent 949b7277a3
commit 2518d4b0a8
8 changed files with 404 additions and 0 deletions
+5
View File
@@ -225,3 +225,8 @@ datasets:
schema: public
id_field: osm_id
table: hotosm_bdi_waterways
processes:
hello-world:
processor:
name: HelloWorld
+87
View File
@@ -539,6 +539,93 @@ class API(object):
return headers_, 200, json.dumps(content)
def describe_processes(self, headers, args, process=None):
"""
Provide processes metadata
:param headers: dict of HTTP headers
:param args: dict of HTTP request parameters
:param process: name of process
:returns: tuple of headers, status code, content
"""
headers_ = HEADERS.copy()
processes_config = self.config['processes']
if process is not None:
if process not in processes_config.keys():
exception = {
'code': 'NotFound',
'description': 'identifier not found'
}
LOGGER.error(exception)
return headers_, 404, json.dumps(exception)
p = load_plugin('process', processes_config[process]['processor'])
p.metadata['jobControlOptions'] = ['sync-execute']
p.metadata['outputTransmission'] = ['value']
response = p.metadata
else:
processes = []
for k, v in processes_config.items():
p = load_plugin('process', processes_config[k]['processor'])
p.metadata['jobControlOptions'] = ['sync-execute']
p.metadata['outputTransmission'] = ['value']
processes.append(p.metadata)
response = {
'processes': processes
}
return headers_, 200, json.dumps(response)
def execute_process(self, headers, args, data, process):
"""
Execute process
:param headers: dict of HTTP headers
:param args: dict of HTTP request parameters
:param data: process data
:param process: name of process
:returns: tuple of headers, status code, content
"""
headers_ = HEADERS.copy()
data_dict = {}
response = {}
if not data:
exception = {
'code': 'MissingParameterValue',
'description': 'missing request data'
}
LOGGER.error(exception)
return headers_, 400, json.dumps(exception)
if process not in self.config['processes'].keys():
exception = {
'code': 'NotFound',
'description': 'identifier not found'
}
LOGGER.error(exception)
return headers_, 404, json.dumps(exception)
p = load_plugin('process',
self.config['processes'][process]['processor'])
data_ = json.loads(data)
for input_ in data_['inputs']:
data_dict[input_['id']] = input_['value']
outputs = p.execute(data_dict)
response['outputs'] = outputs
return headers_, 201, json.dumps(response)
def to_json(dict_):
"""
+30
View File
@@ -120,6 +120,36 @@ def dataset(feature_collection, feature=None):
return response
@APP.route('/processes')
@APP.route('/processes/<name>')
def describe_processes(name=None):
headers, status_code, content = api_.describe_processes(
request.headers, request.args, name)
response = make_response(content, status_code)
if headers:
response.headers = headers
return response
@APP.route('/processes/<name>/jobs', methods=['GET', 'POST'])
def execute_process(name=None):
if request.method == 'GET':
headers, status_code, content = ({}, 200, "[]")
elif request.method == 'POST':
headers, status_code, content = api_.execute_process(
request.headers, request.args, request.data, name)
response = make_response(content, status_code)
if headers:
response.headers = headers
return response
@click.command()
@click.pass_context
@click.option('--debug', '-d', default=False, is_flag=True, help='debug')
+93
View File
@@ -249,6 +249,99 @@ def get_oas_30(cfg):
}
}
}
paths['/processes'] = {
'get': {
'summary': 'Processes',
'description': 'Processes',
'tags': ['server'],
'responses': {
200: {
'description': 'successful operation'
}
}
}
}
LOGGER.debug('setting up processes')
for k, v in cfg['processes'].items():
p = load_plugin('process', v['processor'])
process_name_path = '/processes/{}'.format(k)
tag = {
'name': k,
'description': p.metadata['description'],
'externalDocs': {}
}
for link in p.metadata['links']:
if link['type'] == 'information':
tag['externalDocs']['description'] = link['type']
tag['externalDocs']['url'] = link['url']
break
if len(tag['externalDocs']) == 0:
del tag['externalDocs']
oas['tags'].append(tag)
paths[process_name_path] = {
'get': {
'summary': 'Get process metadata'.format(p.metadata['title']),
'description': p.metadata['description'],
'tags': [k],
'responses': {
200: {
'description': 'successful operation'
},
400: {
'description': 'Invalid ID supplied'
},
404: {
'description': 'not found'
}
}
}
}
paths['{}/jobs'.format(process_name_path)] = {
'get': {
'summary': 'Retrieve job list for process',
'description': p.metadata['description'],
'tags': [k],
'responses': {
200: {
'description': 'successful operation'
}
}
},
'post': {
'summary': 'Process {} execution'.format(p.metadata['title']),
'description': p.metadata['description'],
'tags': [k],
'parameters': [],
'responses': {
200: {
'description': 'successful operation'
},
400: {
'description': 'Invalid ID supplied'
},
404: {
'description': 'not found'
},
},
'requestBody': {
'description': 'Mandatory execute request JSON',
'required': True,
'content': {
'application/json': {
'schema': {
'$ref': 'execute.yaml'
}
}
}
}
}
}
oas['paths'] = paths
oas['components'] = {
+3
View File
@@ -43,6 +43,9 @@ PLUGINS = {
},
'formatter': {
'CSV': 'pygeoapi.formatter.csv_.CSVFormatter'
},
'process': {
'HelloWorld': 'pygeoapi.process.hello_world.HelloWorldProcessor'
}
}
+28
View File
@@ -0,0 +1,28 @@
# =================================================================
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
#
# Copyright (c) 2019 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
+62
View File
@@ -0,0 +1,62 @@
# =================================================================
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
#
# Copyright (c) 2019 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
import logging
LOGGER = logging.getLogger(__name__)
class BaseProcessor(object):
"""generic Processor ABC"""
def __init__(self, processor_def, process_metadata):
"""
Initialize object
:param processor_def: processor definition
:returns: pygeoapi.processors.base.BaseProvider
"""
self.name = processor_def['name']
self.metadata = process_metadata
def execute(self):
"""
execute the process
:returns: dict of process response
"""
raise NotImplementedError()
def __repr__(self):
return '<BaseProcessor> {}'.format(self.name)
class ProcessorExecuteError(Exception):
"""query / backend error"""
pass
+96
View File
@@ -0,0 +1,96 @@
# =================================================================
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
#
# Copyright (c) 2019 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
import logging
from pygeoapi.process.base import BaseProcessor
LOGGER = logging.getLogger(__name__)
PROCESS_METADATA = {
'version': '0.1.0',
'id': 'hello-world',
'title': 'Hello World process',
'description': 'Hello World process',
'keywords': ['hello world'],
'links': [{
'type': 'text/html',
'rel': 'canonical',
'title': 'information',
'href': 'https://example.org/process',
'hreflang': 'en-US'
}],
'inputs': [{
'id': 'name',
'title': 'name',
'input': {
'literalDataDomain': {
'dataType': 'string',
'valueDefinition': {
'anyValue': True
}
}
},
'minOccurs': 1,
'maxOccurs': 1
}],
'outputs': [{
'id': 'hello-world-response',
'title': 'output hello world',
'input': {
'formats': [{
'mimeType': 'application/json'
}]
}
}]
}
class HelloWorldProcessor(BaseProcessor):
"""Hello World Processor"""
def __init__(self, provider_def):
"""
Initialize object
:param provider_def: provider definition
:returns: pygeoapi.process.hello_world.HelloWorldProcessor
"""
BaseProcessor.__init__(self, provider_def, PROCESS_METADATA)
def execute(self, data):
outputs = [{
'id': 'name',
'value': data['name']
}]
return outputs
def __repr__(self):
return '<HelloWorldProcessor> {}'.format(self.name)