diff --git a/pygeoapi/api.py b/pygeoapi/api.py index bbb12e7..bd30342 100644 --- a/pygeoapi/api.py +++ b/pygeoapi/api.py @@ -5,10 +5,12 @@ # Sander Schaminee # John A Stevenson # Colin Blackburn +# Ricardo Garcia Silva # # Copyright (c) 2023 Tom Kralidis # Copyright (c) 2022 Francesco Bartoli # Copyright (c) 2022 John A Stevenson and Colin Blackburn +# Copyright (c) 2023 Ricardo Garcia Silva # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation @@ -63,6 +65,7 @@ from pygeoapi.linked_data import (geojson2jsonld, jsonldify, jsonldify_collection) from pygeoapi.log import setup_logger from pygeoapi.process.base import ProcessorExecuteError +from pygeoapi.process.manager.base import get_manager from pygeoapi.plugin import load_plugin, PLUGINS from pygeoapi.provider.base import ( ProviderGenericError, ProviderConnectionError, ProviderNotFoundError, @@ -671,19 +674,7 @@ class API: self.tpl_config = deepcopy(self.config) self.tpl_config['server']['url'] = self.base_url - # TODO: add as decorator - if 'manager' in self.config['server']: - manager_def = self.config['server']['manager'] - else: - LOGGER.info('No process manager defined; starting dummy manager') - manager_def = { - 'name': 'Dummy', - 'connection': None, - 'output_dir': None - } - - LOGGER.debug(f"Loading process manager {manager_def['name']}") - self.manager = load_plugin('process_manager', manager_def) + self.manager = get_manager(self.config) LOGGER.info('Process manager plugin loaded') @gzip @@ -3261,17 +3252,15 @@ class API: if not request.is_valid(): return self.get_format_exception(request) headers = request.get_response_headers(**self.api_headers) - processes_config = filter_dict_by_key_value(self.config['resources'], - 'type', 'process') if process is not None: - if process not in processes_config.keys() or not processes_config: + if process not in self.manager.processes.keys(): msg = 'Identifier not found' return self.get_exception( HTTPStatus.NOT_FOUND, headers, request.format, 'NoSuchProcess', msg) - if processes_config: + if len(self.manager.processes) > 0: if process is not None: relevant_processes = [process] else: @@ -3285,10 +3274,10 @@ class API: HTTPStatus.BAD_REQUEST, headers, request.format, 'InvalidParameterValue', msg) - relevant_processes = [*processes_config][:limit] + relevant_processes = list(self.manager.processes)[:limit] except TypeError: LOGGER.debug('returning all processes') - relevant_processes = processes_config.keys() + relevant_processes = self.manager.processes.keys() except ValueError: msg = 'limit value should be an integer' return self.get_exception( @@ -3296,8 +3285,8 @@ class API: 'InvalidParameterValue', msg) for key in relevant_processes: - p = load_plugin('process', - processes_config[key]['processor']) + p = load_plugin( + 'process', self.manager.processes[key]['processor']) p2 = l10n.translate_struct(deepcopy(p.metadata), request.locale) @@ -3420,16 +3409,12 @@ class API: return self.get_format_exception(request) headers = request.get_response_headers(SYSTEM_LOCALE, **self.api_headers) - if self.manager: - if job_id is None: - jobs = sorted(self.manager.get_jobs(), - key=lambda k: k['job_start_datetime'], - reverse=True) - else: - jobs = [self.manager.get_job(job_id)] + if job_id is None: + jobs = sorted(self.manager.get_jobs(), + key=lambda k: k['job_start_datetime'], + reverse=True) else: - LOGGER.debug('Process management not configured') - jobs = [] + jobs = [self.manager.get_job(job_id)] serialized_jobs = { 'jobs': [], @@ -3523,17 +3508,14 @@ class API: # Responses are always in US English only headers = request.get_response_headers(SYSTEM_LOCALE, **self.api_headers) - processes_config = filter_dict_by_key_value( - self.config['resources'], 'type', 'process' - ) - if process_id not in processes_config: + if process_id not in self.manager.processes: msg = 'identifier not found' return self.get_exception( HTTPStatus.NOT_FOUND, headers, request.format, 'NoSuchProcess', msg) process = load_plugin('process', - processes_config[process_id]['processor']) + self.manager.processes[process_id]['processor']) data = request.data if not data: diff --git a/pygeoapi/openapi.py b/pygeoapi/openapi.py index 44745e2..afc716b 100644 --- a/pygeoapi/openapi.py +++ b/pygeoapi/openapi.py @@ -2,9 +2,11 @@ # # Authors: Tom Kralidis # Authors: Francesco Bartoli +# Authors: Ricardo Garcia Silva # # Copyright (c) 2022 Tom Kralidis # Copyright (c) 2022 Francesco Bartoli +# Copyright (c) 2023 Ricardo Garcia Silva # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation @@ -42,8 +44,9 @@ from jsonschema import validate as jsonschema_validate import yaml from pygeoapi import l10n -from pygeoapi.plugin import load_plugin from pygeoapi.models.openapi import OAPIFormat +from pygeoapi.plugin import load_plugin +from pygeoapi.process.manager.base import get_manager from pygeoapi.provider.base import ProviderTypeError, SchemaType from pygeoapi.util import (filter_dict_by_key_value, get_provider_by_type, filter_providers_by_type, to_json, yaml_load, @@ -1104,9 +1107,9 @@ def get_oas_30(cfg): } } - processes = filter_dict_by_key_value(cfg['resources'], 'type', 'process') + process_manager = get_manager(cfg) - if processes: + if len(process_manager.processes) > 0: paths['/processes'] = { 'get': { 'summary': 'Processes', @@ -1124,7 +1127,7 @@ def get_oas_30(cfg): } LOGGER.debug('setting up processes') - for k, v in processes.items(): + for k, v in process_manager.processes.items(): if k.startswith('_'): LOGGER.debug(f'Skipping hidden layer: {k}') continue diff --git a/pygeoapi/process/manager/base.py b/pygeoapi/process/manager/base.py index 072ff7c..c08326e 100644 --- a/pygeoapi/process/manager/base.py +++ b/pygeoapi/process/manager/base.py @@ -1,8 +1,10 @@ # ================================================================= # # Authors: Tom Kralidis +# Ricardo Garcia Silva # # Copyright (c) 2022 Tom Kralidis +# (c) 2023 Ricardo Garcia Silva # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation @@ -27,27 +29,30 @@ # # ================================================================= +import collections from datetime import datetime import json import logging from multiprocessing import dummy from pathlib import Path -from typing import Any, Dict, Tuple, Optional +from typing import Any, Dict, Tuple, Optional, OrderedDict import uuid +from pygeoapi.plugin import load_plugin +from pygeoapi.process.base import BaseProcessor from pygeoapi.util import ( DATETIME_FORMAT, JobStatus, ProcessExecutionMode, RequestedProcessExecutionMode, ) -from pygeoapi.process.base import BaseProcessor LOGGER = logging.getLogger(__name__) class BaseManager: """generic Manager ABC""" + processes: OrderedDict[str, Dict] def __init__(self, manager_def: dict): """ @@ -66,6 +71,15 @@ class BaseManager: if self.output_dir is not None: self.output_dir = Path(self.output_dir) + # Note: There are two different things named OrderedDict here - one + # is coming from typing.OrderedDict (type annotation), the other is + # coming from collections.OrderedDict (actual type we want to use here) + # - this will not be needed anymore when pygeoapi moves to requiring + # Python 3.9 as the minimum supported Python version + self.processes = collections.OrderedDict() + for id_, process_conf in manager_def.get('processes', {}).items(): + self.processes[id_] = dict(process_conf) + def get_jobs(self, status: JobStatus = None) -> list: """ Get process jobs, optionally filtered by status @@ -324,3 +338,28 @@ class BaseManager: def __repr__(self): return f' {self.name}' + + +def get_manager(config: Dict) -> BaseManager: + """Instantiate process manager from the supplied configuration. + + :param config: pygeoapi configuration + + :returns: The pygeoapi process manager object + """ + manager_conf = config.get('server', {}).get( + 'manager', + { + 'name': 'Dummy', + 'connection': None, + 'output_dir': None + } + ) + processes_conf = {} + for id_, resource_conf in config.get('resources', {}).items(): + if resource_conf.get('type') == 'process': + processes_conf[id_] = resource_conf + manager_conf['processes'] = processes_conf + if manager_conf.get('name') == 'Dummy': + LOGGER.info('Starting dummy manager') + return load_plugin('process_manager', manager_conf) diff --git a/tests/test_manager.py b/tests/test_manager.py new file mode 100644 index 0000000..d19cf93 --- /dev/null +++ b/tests/test_manager.py @@ -0,0 +1,31 @@ +from typing import Dict + +import pytest + +from pygeoapi.process.manager.base import get_manager + + +@pytest.fixture() +def config() -> Dict: + return { + 'server': { + 'manager': { + 'name': 'TinyDB', + 'output_dir': '/tmp', + } + }, + 'resources': { + 'hello-world': { + 'type': 'process', + 'processor': { + 'name': 'HelloWorld' + } + } + } + } + + +def test_get_manager(config): + manager = get_manager(config) + assert manager.name == config['server']['manager']['name'] + assert 'hello-world' in manager.processes