Add support for parameter Outputs of OGC API Processes (#1602)
* Solve issue #1420 Add support for OGC API Processes Outputs * Resolve Jan, 3 2024 totycro comments * Solve issue 1420 with full backward compatibility for Processors. * changed formattings * Some additional formatting changes * Update api.py missing line in api * Update base.py To resolve conflict with #1603 * Update base.py Added subscriber inline doc * After Ricardo Silva comments on 13 Apr. Included all the suggested changes on code format andparams name. * Changed line length * fixed trailing spaces. * Update formatting base.py * Update base.py --------- Co-authored-by: FrancescoIngv <FrancescoIngv@users.noreply.github.com>
This commit is contained in:
@@ -7,12 +7,14 @@
|
||||
# Colin Blackburn <colb@bgs.ac.uk>
|
||||
# Ricardo Garcia Silva <ricardo.garcia.silva@geobeyond.it>
|
||||
# Bernhard Mallinger <bernhard.mallinger@eox.at>
|
||||
# Francesco Martinelli <francesco.martinelli@ingv.it>
|
||||
#
|
||||
# Copyright (c) 2024 Tom Kralidis
|
||||
# Copyright (c) 2022 Francesco Bartoli
|
||||
# Copyright (c) 2022 John A Stevenson and Colin Blackburn
|
||||
# Copyright (c) 2023 Ricardo Garcia Silva
|
||||
# Copyright (c) 2024 Bernhard Mallinger
|
||||
# Copyright (c) 2024 Francesco Martinelli
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person
|
||||
# obtaining a copy of this software and associated documentation
|
||||
@@ -374,6 +376,9 @@ def execute_process(api: API, request: APIRequest,
|
||||
data_dict = data.get('inputs', {})
|
||||
LOGGER.debug(data_dict)
|
||||
|
||||
requested_outputs = data.get('outputs')
|
||||
LOGGER.debug(f'outputs: {requested_outputs}')
|
||||
|
||||
subscriber = None
|
||||
subscriber_dict = data.get('subscriber')
|
||||
if subscriber_dict:
|
||||
@@ -401,6 +406,7 @@ def execute_process(api: API, request: APIRequest,
|
||||
LOGGER.debug('Executing process')
|
||||
result = api.manager.execute_process(
|
||||
process_id, data_dict, execution_mode=execution_mode,
|
||||
requested_outputs=requested_outputs,
|
||||
subscriber=subscriber)
|
||||
job_id, mime_type, outputs, status, additional_headers = result
|
||||
headers.update(additional_headers or {})
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
# =================================================================
|
||||
#
|
||||
# Authors: Tom Kralidis <tomkralidis@gmail.com>
|
||||
# Francesco Martinelli <francesco.martinelli@ingv.it>
|
||||
#
|
||||
# Copyright (c) 2022 Tom Kralidis
|
||||
# Copyright (c) 2024 Francesco Martinelli
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person
|
||||
# obtaining a copy of this software and associated documentation
|
||||
@@ -28,7 +30,7 @@
|
||||
# =================================================================
|
||||
|
||||
import logging
|
||||
from typing import Any, Tuple
|
||||
from typing import Any, Tuple, Optional
|
||||
|
||||
from pygeoapi.error import GenericError
|
||||
|
||||
@@ -49,14 +51,19 @@ class BaseProcessor:
|
||||
"""
|
||||
self.name = processor_def['name']
|
||||
self.metadata = process_metadata
|
||||
self.supports_outputs = False
|
||||
|
||||
def execute(self, data: dict) -> Tuple[str, Any]:
|
||||
def execute(self, data: dict, outputs: Optional[dict] = None
|
||||
) -> Tuple[str, Any]:
|
||||
"""
|
||||
execute the process
|
||||
|
||||
:param data: Dict with the input data that the process needs in order
|
||||
to execute
|
||||
|
||||
:param outputs: `dict` optionally specify the subset of required
|
||||
outputs - defaults to all outputs.
|
||||
The value of any key may be an object and include the property
|
||||
`transmissionMode` - defauts to `value`.
|
||||
:returns: tuple of MIME type and process response
|
||||
(string or bytes, or dict)
|
||||
"""
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
# =================================================================
|
||||
#
|
||||
# Authors: Tom Kralidis <tomkralidis@gmail.com>
|
||||
# Francesco Martinelli <francesco.martinelli@ingv.it>
|
||||
#
|
||||
# Copyright (c) 2022 Tom Kralidis
|
||||
# Copyright (c) 2024 Francesco Martinelli
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person
|
||||
# obtaining a copy of this software and associated documentation
|
||||
@@ -115,9 +117,9 @@ class HelloWorldProcessor(BaseProcessor):
|
||||
"""
|
||||
|
||||
super().__init__(processor_def, PROCESS_METADATA)
|
||||
self.supports_outputs = True
|
||||
|
||||
def execute(self, data):
|
||||
|
||||
def execute(self, data, outputs=None):
|
||||
mimetype = 'application/json'
|
||||
name = data.get('name')
|
||||
|
||||
@@ -127,12 +129,14 @@ class HelloWorldProcessor(BaseProcessor):
|
||||
message = data.get('message', '')
|
||||
value = f'Hello {name}! {message}'.strip()
|
||||
|
||||
outputs = {
|
||||
'id': 'echo',
|
||||
'value': value
|
||||
}
|
||||
produced_outputs = {}
|
||||
if outputs is None or 'echo' in outputs:
|
||||
produced_outputs = {
|
||||
'id': 'echo',
|
||||
'value': value
|
||||
}
|
||||
|
||||
return mimetype, outputs
|
||||
return mimetype, produced_outputs
|
||||
|
||||
def __repr__(self):
|
||||
return f'<HelloWorldProcessor> {self.name}'
|
||||
|
||||
@@ -2,9 +2,11 @@
|
||||
#
|
||||
# Authors: Tom Kralidis <tomkralidis@gmail.com>
|
||||
# Ricardo Garcia Silva <ricardo.garcia.silva@geobeyond.it>
|
||||
# Francesco Martinelli <francesco.martinelli@ingv.it>
|
||||
#
|
||||
# Copyright (c) 2024 Tom Kralidis
|
||||
# (c) 2023 Ricardo Garcia Silva
|
||||
# (c) 2024 Francesco Martinelli
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person
|
||||
# obtaining a copy of this software and associated documentation
|
||||
@@ -183,6 +185,7 @@ class BaseManager:
|
||||
|
||||
def _execute_handler_async(self, p: BaseProcessor, job_id: str,
|
||||
data_dict: dict,
|
||||
requested_outputs: Optional[dict] = None,
|
||||
subscriber: Optional[Subscriber] = None,
|
||||
) -> Tuple[str, None, JobStatus]:
|
||||
"""
|
||||
@@ -194,6 +197,11 @@ class BaseManager:
|
||||
:param p: `pygeoapi.process` object
|
||||
:param job_id: job identifier
|
||||
:param data_dict: `dict` of data parameters
|
||||
:param requested_outputs: `dict` specify the subset of required
|
||||
outputs - defaults to all outputs.
|
||||
The value of any key may be an object and include the property
|
||||
`transmissionMode` - defauts to `value`.
|
||||
Note: 'optional' is for backward compatibility.
|
||||
:param subscriber: optional `Subscriber` specifying callback URLs
|
||||
|
||||
:returns: tuple of None (i.e. initial response payload)
|
||||
@@ -201,13 +209,14 @@ class BaseManager:
|
||||
"""
|
||||
_process = dummy.Process(
|
||||
target=self._execute_handler_sync,
|
||||
args=(p, job_id, data_dict, subscriber)
|
||||
args=(p, job_id, data_dict, requested_outputs, subscriber)
|
||||
)
|
||||
_process.start()
|
||||
return 'application/json', None, JobStatus.accepted
|
||||
|
||||
def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
|
||||
data_dict: dict,
|
||||
requested_outputs: Optional[dict] = None,
|
||||
subscriber: Optional[Subscriber] = None,
|
||||
) -> Tuple[str, Any, JobStatus]:
|
||||
"""
|
||||
@@ -220,6 +229,12 @@ class BaseManager:
|
||||
:param p: `pygeoapi.process` object
|
||||
:param job_id: job identifier
|
||||
:param data_dict: `dict` of data parameters
|
||||
:param requested_outputs: `dict` specify the subset of required
|
||||
outputs - defaults to all outputs.
|
||||
The value of any key may be an object and include the property
|
||||
`transmissionMode` - defauts to `value`.
|
||||
Note: 'optional' is for backward compatibility.
|
||||
:param subscriber: optional `Subscriber` specifying callback URLs
|
||||
|
||||
:returns: tuple of MIME type, response payload and status
|
||||
"""
|
||||
@@ -252,7 +267,13 @@ class BaseManager:
|
||||
job_filename = None
|
||||
|
||||
current_status = JobStatus.running
|
||||
jfmt, outputs = p.execute(data_dict)
|
||||
jfmt, outputs = p.execute(
|
||||
data_dict,
|
||||
# only pass requested_outputs if supported,
|
||||
# otherwise this breaks existing processes
|
||||
**({'outputs': requested_outputs}
|
||||
if p.supports_outputs else {})
|
||||
)
|
||||
|
||||
self.update_job(job_id, {
|
||||
'status': current_status.value,
|
||||
@@ -327,6 +348,7 @@ class BaseManager:
|
||||
process_id: str,
|
||||
data_dict: dict,
|
||||
execution_mode: Optional[RequestedProcessExecutionMode] = None,
|
||||
requested_outputs: Optional[dict] = None,
|
||||
subscriber: Optional[Subscriber] = None
|
||||
) -> Tuple[str, Any, JobStatus, Optional[Dict[str, str]]]:
|
||||
"""
|
||||
@@ -336,6 +358,11 @@ class BaseManager:
|
||||
:param data_dict: `dict` of data parameters
|
||||
:param execution_mode: `str` optionally specifying sync or async
|
||||
processing.
|
||||
:param requested_outputs: `dict` optionally specify the subset of
|
||||
required outputs - defaults to all outputs.
|
||||
The value of any key may be an object and include the property
|
||||
`transmissionMode` - defauts to `value`.
|
||||
Note: 'optional' is for backward compatibility.
|
||||
:param subscriber: `Subscriber` optionally specifying callback urls
|
||||
|
||||
:raises UnknownProcessError: if the input process_id does not
|
||||
@@ -385,6 +412,7 @@ class BaseManager:
|
||||
processor,
|
||||
job_id,
|
||||
data_dict,
|
||||
requested_outputs,
|
||||
# only pass subscriber if supported, otherwise this breaks existing
|
||||
# managers
|
||||
**({'subscriber': subscriber} if self.supports_subscribing else {})
|
||||
|
||||
Reference in New Issue
Block a user