From 6a6ec9cb9315e12492f9fb6350d0a78e9fcc1a2f Mon Sep 17 00:00:00 2001 From: francescoingv <9592487+francescoingv@users.noreply.github.com> Date: Fri, 10 May 2024 14:41:47 +0200 Subject: [PATCH] Add support for parameter Outputs of OGC API Processes (#1602) * Solve issue #1420 Add support for OGC API Processes Outputs * Resolve Jan, 3 2024 totycro comments * Solve issue 1420 with full backward compatibility for Processors. * changed formattings * Some additional formatting changes * Update api.py missing line in api * Update base.py To resolve conflict with #1603 * Update base.py Added subscriber inline doc * After Ricardo Silva comments on 13 Apr. Included all the suggested changes on code format andparams name. * Changed line length * fixed trailing spaces. * Update formatting base.py * Update base.py --------- Co-authored-by: FrancescoIngv --- pygeoapi/api/processes.py | 6 ++++++ pygeoapi/process/base.py | 13 ++++++++++--- pygeoapi/process/hello_world.py | 18 +++++++++++------- pygeoapi/process/manager/base.py | 32 ++++++++++++++++++++++++++++++-- 4 files changed, 57 insertions(+), 12 deletions(-) diff --git a/pygeoapi/api/processes.py b/pygeoapi/api/processes.py index 798bf60..edb913f 100644 --- a/pygeoapi/api/processes.py +++ b/pygeoapi/api/processes.py @@ -7,12 +7,14 @@ # Colin Blackburn # Ricardo Garcia Silva # Bernhard Mallinger +# Francesco Martinelli # # Copyright (c) 2024 Tom Kralidis # Copyright (c) 2022 Francesco Bartoli # Copyright (c) 2022 John A Stevenson and Colin Blackburn # Copyright (c) 2023 Ricardo Garcia Silva # Copyright (c) 2024 Bernhard Mallinger +# Copyright (c) 2024 Francesco Martinelli # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation @@ -374,6 +376,9 @@ def execute_process(api: API, request: APIRequest, data_dict = data.get('inputs', {}) LOGGER.debug(data_dict) + requested_outputs = data.get('outputs') + LOGGER.debug(f'outputs: {requested_outputs}') + subscriber = None subscriber_dict = data.get('subscriber') if subscriber_dict: @@ -401,6 +406,7 @@ def execute_process(api: API, request: APIRequest, LOGGER.debug('Executing process') result = api.manager.execute_process( process_id, data_dict, execution_mode=execution_mode, + requested_outputs=requested_outputs, subscriber=subscriber) job_id, mime_type, outputs, status, additional_headers = result headers.update(additional_headers or {}) diff --git a/pygeoapi/process/base.py b/pygeoapi/process/base.py index 3e37386..ec6f22d 100644 --- a/pygeoapi/process/base.py +++ b/pygeoapi/process/base.py @@ -1,8 +1,10 @@ # ================================================================= # # Authors: Tom Kralidis +# Francesco Martinelli # # Copyright (c) 2022 Tom Kralidis +# Copyright (c) 2024 Francesco Martinelli # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation @@ -28,7 +30,7 @@ # ================================================================= import logging -from typing import Any, Tuple +from typing import Any, Tuple, Optional from pygeoapi.error import GenericError @@ -49,14 +51,19 @@ class BaseProcessor: """ self.name = processor_def['name'] self.metadata = process_metadata + self.supports_outputs = False - def execute(self, data: dict) -> Tuple[str, Any]: + def execute(self, data: dict, outputs: Optional[dict] = None + ) -> Tuple[str, Any]: """ execute the process :param data: Dict with the input data that the process needs in order to execute - + :param outputs: `dict` optionally specify the subset of required + outputs - defaults to all outputs. + The value of any key may be an object and include the property + `transmissionMode` - defauts to `value`. :returns: tuple of MIME type and process response (string or bytes, or dict) """ diff --git a/pygeoapi/process/hello_world.py b/pygeoapi/process/hello_world.py index bb77f04..11566bc 100644 --- a/pygeoapi/process/hello_world.py +++ b/pygeoapi/process/hello_world.py @@ -1,8 +1,10 @@ # ================================================================= # # Authors: Tom Kralidis +# Francesco Martinelli # # Copyright (c) 2022 Tom Kralidis +# Copyright (c) 2024 Francesco Martinelli # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation @@ -115,9 +117,9 @@ class HelloWorldProcessor(BaseProcessor): """ super().__init__(processor_def, PROCESS_METADATA) + self.supports_outputs = True - def execute(self, data): - + def execute(self, data, outputs=None): mimetype = 'application/json' name = data.get('name') @@ -127,12 +129,14 @@ class HelloWorldProcessor(BaseProcessor): message = data.get('message', '') value = f'Hello {name}! {message}'.strip() - outputs = { - 'id': 'echo', - 'value': value - } + produced_outputs = {} + if outputs is None or 'echo' in outputs: + produced_outputs = { + 'id': 'echo', + 'value': value + } - return mimetype, outputs + return mimetype, produced_outputs def __repr__(self): return f' {self.name}' diff --git a/pygeoapi/process/manager/base.py b/pygeoapi/process/manager/base.py index 9091cec..59d14e5 100644 --- a/pygeoapi/process/manager/base.py +++ b/pygeoapi/process/manager/base.py @@ -2,9 +2,11 @@ # # Authors: Tom Kralidis # Ricardo Garcia Silva +# Francesco Martinelli # # Copyright (c) 2024 Tom Kralidis # (c) 2023 Ricardo Garcia Silva +# (c) 2024 Francesco Martinelli # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation @@ -183,6 +185,7 @@ class BaseManager: def _execute_handler_async(self, p: BaseProcessor, job_id: str, data_dict: dict, + requested_outputs: Optional[dict] = None, subscriber: Optional[Subscriber] = None, ) -> Tuple[str, None, JobStatus]: """ @@ -194,6 +197,11 @@ class BaseManager: :param p: `pygeoapi.process` object :param job_id: job identifier :param data_dict: `dict` of data parameters + :param requested_outputs: `dict` specify the subset of required + outputs - defaults to all outputs. + The value of any key may be an object and include the property + `transmissionMode` - defauts to `value`. + Note: 'optional' is for backward compatibility. :param subscriber: optional `Subscriber` specifying callback URLs :returns: tuple of None (i.e. initial response payload) @@ -201,13 +209,14 @@ class BaseManager: """ _process = dummy.Process( target=self._execute_handler_sync, - args=(p, job_id, data_dict, subscriber) + args=(p, job_id, data_dict, requested_outputs, subscriber) ) _process.start() return 'application/json', None, JobStatus.accepted def _execute_handler_sync(self, p: BaseProcessor, job_id: str, data_dict: dict, + requested_outputs: Optional[dict] = None, subscriber: Optional[Subscriber] = None, ) -> Tuple[str, Any, JobStatus]: """ @@ -220,6 +229,12 @@ class BaseManager: :param p: `pygeoapi.process` object :param job_id: job identifier :param data_dict: `dict` of data parameters + :param requested_outputs: `dict` specify the subset of required + outputs - defaults to all outputs. + The value of any key may be an object and include the property + `transmissionMode` - defauts to `value`. + Note: 'optional' is for backward compatibility. + :param subscriber: optional `Subscriber` specifying callback URLs :returns: tuple of MIME type, response payload and status """ @@ -252,7 +267,13 @@ class BaseManager: job_filename = None current_status = JobStatus.running - jfmt, outputs = p.execute(data_dict) + jfmt, outputs = p.execute( + data_dict, + # only pass requested_outputs if supported, + # otherwise this breaks existing processes + **({'outputs': requested_outputs} + if p.supports_outputs else {}) + ) self.update_job(job_id, { 'status': current_status.value, @@ -327,6 +348,7 @@ class BaseManager: process_id: str, data_dict: dict, execution_mode: Optional[RequestedProcessExecutionMode] = None, + requested_outputs: Optional[dict] = None, subscriber: Optional[Subscriber] = None ) -> Tuple[str, Any, JobStatus, Optional[Dict[str, str]]]: """ @@ -336,6 +358,11 @@ class BaseManager: :param data_dict: `dict` of data parameters :param execution_mode: `str` optionally specifying sync or async processing. + :param requested_outputs: `dict` optionally specify the subset of + required outputs - defaults to all outputs. + The value of any key may be an object and include the property + `transmissionMode` - defauts to `value`. + Note: 'optional' is for backward compatibility. :param subscriber: `Subscriber` optionally specifying callback urls :raises UnknownProcessError: if the input process_id does not @@ -385,6 +412,7 @@ class BaseManager: processor, job_id, data_dict, + requested_outputs, # only pass subscriber if supported, otherwise this breaks existing # managers **({'subscriber': subscriber} if self.supports_subscribing else {})