Process manager instantiates processor (#1278)

* Process manager now knows about existing processes

* Move get_manager to base module

* manager is now responsible for instantiating a processor

* manager.execute_process now accepts process_id instead of processor instance

* Add license to test file

* fix email address typo

---------

Co-authored-by: Ricardo Garcia Silva <ricardo@kartoza.com>
This commit is contained in:
Ricardo Garcia Silva
2023-07-02 09:51:34 +01:00
committed by GitHub
parent 862e83da1f
commit 40e82424f3
5 changed files with 82 additions and 20 deletions
+2 -7
View File
@@ -3285,9 +3285,7 @@ class API:
'InvalidParameterValue', msg)
for key in relevant_processes:
p = load_plugin(
'process', self.manager.processes[key]['processor'])
p = self.manager.get_processor(key)
p2 = l10n.translate_struct(deepcopy(p.metadata),
request.locale)
@@ -3514,9 +3512,6 @@ class API:
HTTPStatus.NOT_FOUND, headers,
request.format, 'NoSuchProcess', msg)
process = load_plugin('process',
self.manager.processes[process_id]['processor'])
data = request.data
if not data:
# TODO not all processes require input, e.g. time-dependent or
@@ -3555,7 +3550,7 @@ class API:
try:
LOGGER.debug('Executing process')
result = self.manager.execute_process(
process, data_dict, execution_mode=execution_mode)
process_id, data_dict, execution_mode=execution_mode)
job_id, mime_type, outputs, status, additional_headers = result
headers.update(additional_headers or {})
headers['Location'] = f'{self.base_url}/jobs/{job_id}'
+1 -2
View File
@@ -1132,8 +1132,7 @@ def get_oas_30(cfg):
LOGGER.debug(f'Skipping hidden layer: {k}')
continue
name = l10n.translate(k, locale_)
p = load_plugin('process', v['processor'])
p = process_manager.get_processor(k)
md_desc = l10n.translate(p.metadata['description'], locale_)
process_name_path = f'/processes/{name}'
tag = {
+30 -6
View File
@@ -80,6 +80,23 @@ class BaseManager:
for id_, process_conf in manager_def.get('processes', {}).items():
self.processes[id_] = dict(process_conf)
def get_processor(self, process_id: str) -> Optional[BaseProcessor]:
"""Instantiate a processor.
:param process_id: Identifier of the process
:raise UnknownProcessError: if the processor cannot be created
:returns: instance of the processor
"""
try:
process_conf = self.processes[process_id]
except KeyError as err:
raise UnknownProcessError(
'Invalid process identifier') from err
else:
return load_plugin('process', process_conf['processor'])
def get_jobs(self, status: JobStatus = None) -> list:
"""
Get process jobs, optionally filtered by status
@@ -281,31 +298,34 @@ class BaseManager:
def execute_process(
self,
p: BaseProcessor,
process_id: str,
data_dict: dict,
execution_mode: Optional[RequestedProcessExecutionMode] = None
) -> Tuple[str, Any, JobStatus, Optional[Dict[str, str]]]:
"""
Default process execution handler
:param p: `pygeoapi.process` object
:param process_id: process identifier
:param data_dict: `dict` of data parameters
:param execution_mode: `str` optionally specifying sync or async
processing.
:raises: UnknownProcessError if the input process_id does not
correspond to a known process
:returns: tuple of job_id, MIME type, response payload, status and
optionally additional HTTP headers to include in the final
response
"""
job_id = str(uuid.uuid1())
processor = self.get_processor(process_id)
if execution_mode == RequestedProcessExecutionMode.respond_async:
job_control_options = processor.metadata.get(
'jobControlOptions', [])
# client wants async - do we support it?
process_supports_async = (
ProcessExecutionMode.async_execute.value in p.metadata.get(
'jobControlOptions', []
ProcessExecutionMode.async_execute.value in job_control_options
)
)
if self.is_async and process_supports_async:
LOGGER.debug('Asynchronous execution')
handler = self._execute_handler_async
@@ -333,13 +353,17 @@ class BaseManager:
response_headers = None
# TODO: handler's response could also be allowed to include more HTTP
# headers
mime_type, outputs, status = handler(p, job_id, data_dict)
mime_type, outputs, status = handler(processor, job_id, data_dict)
return job_id, mime_type, outputs, status, response_headers
def __repr__(self):
return f'<BaseManager> {self.name}'
class UnknownProcessError(Exception):
pass
def get_manager(config: Dict) -> BaseManager:
"""Instantiate process manager from the supplied configuration.
+4 -4
View File
@@ -31,7 +31,6 @@ import logging
from typing import Any, Dict, Optional, Tuple
import uuid
from pygeoapi.process.base import BaseProcessor
from pygeoapi.process.manager.base import BaseManager
from pygeoapi.util import (
RequestedProcessExecutionMode,
@@ -69,14 +68,14 @@ class DummyManager(BaseManager):
def execute_process(
self,
p: BaseProcessor,
process_id: str,
data_dict: dict,
execution_mode: Optional[RequestedProcessExecutionMode] = None
) -> Tuple[str, str, Any, JobStatus, Optional[Dict[str, str]]]:
"""
Default process execution handler
:param p: `pygeoapi.process` object
:param process_id: process identifier
:param data_dict: `dict` of data parameters
:param execution_mode: requested execution mode
@@ -95,8 +94,9 @@ class DummyManager(BaseManager):
LOGGER.debug('Dummy manager does not support asynchronous')
LOGGER.debug('Forcing synchronous execution')
processor = self.get_processor(process_id)
try:
jfmt, outputs = p.execute(data_dict)
jfmt, outputs = processor.execute(data_dict)
current_status = JobStatus.successful
except Exception as err:
outputs = {
+45 -1
View File
@@ -1,8 +1,39 @@
# =================================================================
#
# Authors: Ricardo Garcia Silva <ricardo.garcia.silva@geobeyond.it>
#
# Copyright (c) 2023 Ricardo Garcia Silva
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
from typing import Dict
import pytest
from pygeoapi.process.manager.base import get_manager
from pygeoapi.process.manager.base import (
get_manager,
UnknownProcessError,
)
@pytest.fixture()
@@ -29,3 +60,16 @@ def test_get_manager(config):
manager = get_manager(config)
assert manager.name == config['server']['manager']['name']
assert 'hello-world' in manager.processes
def test_get_processor(config):
manager = get_manager(config)
process_id = 'hello-world'
processor = manager.get_processor(process_id)
assert processor.metadata["id"] == process_id
def test_get_processor_raises_exception(config):
manager = get_manager(config)
with pytest.raises(expected_exception=UnknownProcessError):
manager.get_processor('foo')