From 40e82424f3d8df5339ebbab0d7d7143dc73a5485 Mon Sep 17 00:00:00 2001 From: Ricardo Garcia Silva Date: Sun, 2 Jul 2023 09:51:34 +0100 Subject: [PATCH] Process manager instantiates processor (#1278) * Process manager now knows about existing processes * Move get_manager to base module * manager is now responsible for instantiating a processor * manager.execute_process now accepts process_id instead of processor instance * Add license to test file * fix email address typo --------- Co-authored-by: Ricardo Garcia Silva --- pygeoapi/api.py | 9 ++---- pygeoapi/openapi.py | 3 +- pygeoapi/process/manager/base.py | 36 ++++++++++++++++++++---- pygeoapi/process/manager/dummy.py | 8 +++--- tests/test_manager.py | 46 ++++++++++++++++++++++++++++++- 5 files changed, 82 insertions(+), 20 deletions(-) diff --git a/pygeoapi/api.py b/pygeoapi/api.py index bd30342..d9a164e 100644 --- a/pygeoapi/api.py +++ b/pygeoapi/api.py @@ -3285,9 +3285,7 @@ class API: 'InvalidParameterValue', msg) for key in relevant_processes: - p = load_plugin( - 'process', self.manager.processes[key]['processor']) - + p = self.manager.get_processor(key) p2 = l10n.translate_struct(deepcopy(p.metadata), request.locale) @@ -3514,9 +3512,6 @@ class API: HTTPStatus.NOT_FOUND, headers, request.format, 'NoSuchProcess', msg) - process = load_plugin('process', - self.manager.processes[process_id]['processor']) - data = request.data if not data: # TODO not all processes require input, e.g. time-dependent or @@ -3555,7 +3550,7 @@ class API: try: LOGGER.debug('Executing process') result = self.manager.execute_process( - process, data_dict, execution_mode=execution_mode) + process_id, data_dict, execution_mode=execution_mode) job_id, mime_type, outputs, status, additional_headers = result headers.update(additional_headers or {}) headers['Location'] = f'{self.base_url}/jobs/{job_id}' diff --git a/pygeoapi/openapi.py b/pygeoapi/openapi.py index afc716b..5cf0d2c 100644 --- a/pygeoapi/openapi.py +++ b/pygeoapi/openapi.py @@ -1132,8 +1132,7 @@ def get_oas_30(cfg): LOGGER.debug(f'Skipping hidden layer: {k}') continue name = l10n.translate(k, locale_) - p = load_plugin('process', v['processor']) - + p = process_manager.get_processor(k) md_desc = l10n.translate(p.metadata['description'], locale_) process_name_path = f'/processes/{name}' tag = { diff --git a/pygeoapi/process/manager/base.py b/pygeoapi/process/manager/base.py index c08326e..0c75f4b 100644 --- a/pygeoapi/process/manager/base.py +++ b/pygeoapi/process/manager/base.py @@ -80,6 +80,23 @@ class BaseManager: for id_, process_conf in manager_def.get('processes', {}).items(): self.processes[id_] = dict(process_conf) + def get_processor(self, process_id: str) -> Optional[BaseProcessor]: + """Instantiate a processor. + + :param process_id: Identifier of the process + + :raise UnknownProcessError: if the processor cannot be created + :returns: instance of the processor + """ + + try: + process_conf = self.processes[process_id] + except KeyError as err: + raise UnknownProcessError( + 'Invalid process identifier') from err + else: + return load_plugin('process', process_conf['processor']) + def get_jobs(self, status: JobStatus = None) -> list: """ Get process jobs, optionally filtered by status @@ -281,31 +298,34 @@ class BaseManager: def execute_process( self, - p: BaseProcessor, + process_id: str, data_dict: dict, execution_mode: Optional[RequestedProcessExecutionMode] = None ) -> Tuple[str, Any, JobStatus, Optional[Dict[str, str]]]: """ Default process execution handler - :param p: `pygeoapi.process` object + :param process_id: process identifier :param data_dict: `dict` of data parameters :param execution_mode: `str` optionally specifying sync or async processing. + :raises: UnknownProcessError if the input process_id does not + correspond to a known process :returns: tuple of job_id, MIME type, response payload, status and optionally additional HTTP headers to include in the final response """ job_id = str(uuid.uuid1()) + processor = self.get_processor(process_id) if execution_mode == RequestedProcessExecutionMode.respond_async: + job_control_options = processor.metadata.get( + 'jobControlOptions', []) # client wants async - do we support it? process_supports_async = ( - ProcessExecutionMode.async_execute.value in p.metadata.get( - 'jobControlOptions', [] + ProcessExecutionMode.async_execute.value in job_control_options ) - ) if self.is_async and process_supports_async: LOGGER.debug('Asynchronous execution') handler = self._execute_handler_async @@ -333,13 +353,17 @@ class BaseManager: response_headers = None # TODO: handler's response could also be allowed to include more HTTP # headers - mime_type, outputs, status = handler(p, job_id, data_dict) + mime_type, outputs, status = handler(processor, job_id, data_dict) return job_id, mime_type, outputs, status, response_headers def __repr__(self): return f' {self.name}' +class UnknownProcessError(Exception): + pass + + def get_manager(config: Dict) -> BaseManager: """Instantiate process manager from the supplied configuration. diff --git a/pygeoapi/process/manager/dummy.py b/pygeoapi/process/manager/dummy.py index 41a1c89..7372faf 100644 --- a/pygeoapi/process/manager/dummy.py +++ b/pygeoapi/process/manager/dummy.py @@ -31,7 +31,6 @@ import logging from typing import Any, Dict, Optional, Tuple import uuid -from pygeoapi.process.base import BaseProcessor from pygeoapi.process.manager.base import BaseManager from pygeoapi.util import ( RequestedProcessExecutionMode, @@ -69,14 +68,14 @@ class DummyManager(BaseManager): def execute_process( self, - p: BaseProcessor, + process_id: str, data_dict: dict, execution_mode: Optional[RequestedProcessExecutionMode] = None ) -> Tuple[str, str, Any, JobStatus, Optional[Dict[str, str]]]: """ Default process execution handler - :param p: `pygeoapi.process` object + :param process_id: process identifier :param data_dict: `dict` of data parameters :param execution_mode: requested execution mode @@ -95,8 +94,9 @@ class DummyManager(BaseManager): LOGGER.debug('Dummy manager does not support asynchronous') LOGGER.debug('Forcing synchronous execution') + processor = self.get_processor(process_id) try: - jfmt, outputs = p.execute(data_dict) + jfmt, outputs = processor.execute(data_dict) current_status = JobStatus.successful except Exception as err: outputs = { diff --git a/tests/test_manager.py b/tests/test_manager.py index d19cf93..7aa7287 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -1,8 +1,39 @@ +# ================================================================= +# +# Authors: Ricardo Garcia Silva +# +# Copyright (c) 2023 Ricardo Garcia Silva +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= from typing import Dict import pytest -from pygeoapi.process.manager.base import get_manager +from pygeoapi.process.manager.base import ( + get_manager, + UnknownProcessError, +) @pytest.fixture() @@ -29,3 +60,16 @@ def test_get_manager(config): manager = get_manager(config) assert manager.name == config['server']['manager']['name'] assert 'hello-world' in manager.processes + + +def test_get_processor(config): + manager = get_manager(config) + process_id = 'hello-world' + processor = manager.get_processor(process_id) + assert processor.metadata["id"] == process_id + + +def test_get_processor_raises_exception(config): + manager = get_manager(config) + with pytest.raises(expected_exception=UnknownProcessError): + manager.get_processor('foo')