Proposal to split api.py into different files (#1405)

* Move api to subdirectory

* Move processes api to own file

* Adapt processes view methods

* Move openapi definition to processes api

* Use processes api in flask

* Linter

* Fix import issues

* Allow calling refactored views from starlette

* Allow calling refactored views from django

* Linter

* Move edr api to own file

* Adapt edr api to new style

* Fix typo in django views

* Move maps api to own file

* Adapt maps api to new style

* Move edr openapi to edr api file

* Move maps openapi to maps api file

* Move stac views to own file

* Refactor stac views to new file

* Move stac openapi to stac api file

* Move tiles api to own file

* Adapt tiles api to new style

* Also move tilematrixset to tiles api

* Adapt tilesetmatrix views to new style

NOTE: I had to remove one tilematrixsets test because
it tested that an invalid format would produce an error.
This now happens by default for all views, but the actual
code is outside of the endpoint function.

* update features, records, coverages

* update release version

* switch back to dev

* backport of #1313

* backport of #1313 fix

* backport of #1585

* Flask: sanitize OGC schema pathing (#1593)

* update release version

* switch back to dev

* backport of #1596

* Port test_gzip_csv test

Note that apply_gzip is now called by the web framework adapters,
so to test it in general, we have to call it in the test manually

* Add empty conformance class list to stac api

* Fix queryables call in starlette

* fix ref

* Unify request validity checking

The default case is handled by the web framework adapters. If custom
format handling is required, the check in the adapter must be skipped.

* Fix imports in django views

* backport #1598

* Remove test about format handling in endpoint

This is now handled outside of the endpoint function

* add docstring to base process manager (#1603)

* backport of #1601

* Port api ogr tests to new style

* Move processes tests to own file

* Run api tests from new dir in CI

* Move edr tests to own file

* Move maps tests to own file

* Move tiles tests to own file

* Actually hide hidden layers in openapi

* 1600 allow providing default value in config (#1604)

* move coverages tests to own file

* move itemtypes to own file, move core into init test

* fix OpenAPI output

* update tests

* add missing descriptions to OpenAPI admin responses

* update tests

* fix tests autodiscovery

* remove unused logging in tests

* address PR comments

* test with xarray 2024.2.0

* remove unneeded file

* safeguard xarray error

* unpin xarray

* fix OpenAPI generation

* fix schema endpoint in Flask and Starlette

* Safely serialize configuration JSON (#1605)

* Safely serialize configuration JSON

Co-Authored-By: Tom Kralidis <tomkralidis@gmail.com>

* Revert "Safely serialize configuration JSON"

This reverts commit 36feb067ee6f87e61955852dc48994f075806370.

* Add test for datetime with Admin API

* Safely serialize configuration JSON

---------

Co-authored-by: Tom Kralidis <tomkralidis@gmail.com>

* backport #1611

* Also fix schema endpoint for django

Fix is analogous to e72d4ba3a5ba3b8621ca839e7814429beeeb8f01

* address additional PR comments

---------

Co-authored-by: Tom Kralidis <tomkralidis@gmail.com>
Co-authored-by: Angelos Tzotsos <gcpp.kalxas@gmail.com>
Co-authored-by: Ricardo Garcia Silva <ricardo.garcia.silva@gmail.com>
Co-authored-by: Benjamin Webb <40066515+webb-ben@users.noreply.github.com>
This commit is contained in:
Bernhard Mallinger
2024-04-05 12:06:25 +02:00
committed by GitHub
parent 36e75f8b72
commit 35bdcb6f02
31 changed files with 8918 additions and 7812 deletions
+740
View File
@@ -0,0 +1,740 @@
# =================================================================
# Authors: Tom Kralidis <tomkralidis@gmail.com>
# Francesco Bartoli <xbartolone@gmail.com>
# Sander Schaminee <sander.schaminee@geocat.net>
# John A Stevenson <jostev@bgs.ac.uk>
# Colin Blackburn <colb@bgs.ac.uk>
# Ricardo Garcia Silva <ricardo.garcia.silva@geobeyond.it>
# Bernhard Mallinger <bernhard.mallinger@eox.at>
#
# Copyright (c) 2024 Tom Kralidis
# Copyright (c) 2022 Francesco Bartoli
# Copyright (c) 2022 John A Stevenson and Colin Blackburn
# Copyright (c) 2023 Ricardo Garcia Silva
# Copyright (c) 2024 Bernhard Mallinger
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
from copy import deepcopy
from datetime import datetime, timezone
from http import HTTPStatus
import json
import logging
from typing import Tuple
from pygeoapi import l10n
from pygeoapi.util import (
json_serial, render_j2_template, JobStatus, RequestedProcessExecutionMode,
to_json, DATETIME_FORMAT)
from pygeoapi.process.base import (
JobNotFoundError, JobResultNotFoundError, ProcessorExecuteError
)
from pygeoapi.process.manager.base import get_manager, Subscriber
from . import (
APIRequest, API, SYSTEM_LOCALE, F_JSON, FORMAT_TYPES, F_HTML, F_JSONLD,
)
LOGGER = logging.getLogger(__name__)
CONFORMANCE_CLASSES = [
'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/ogc-process-description', # noqa
'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/core',
'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/json',
'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/oas30',
'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/callback'
]
def describe_processes(api: API, request: APIRequest,
process=None) -> Tuple[dict, int, str]:
"""
Provide processes metadata
:param request: A request object
:param process: process identifier, defaults to None to obtain
information about all processes
:returns: tuple of headers, status code, content
"""
processes = []
headers = request.get_response_headers(**api.api_headers)
if process is not None:
if process not in api.manager.processes.keys():
msg = 'Identifier not found'
return api.get_exception(
HTTPStatus.NOT_FOUND, headers,
request.format, 'NoSuchProcess', msg)
if len(api.manager.processes) > 0:
if process is not None:
relevant_processes = [process]
else:
LOGGER.debug('Processing limit parameter')
try:
limit = int(request.params.get('limit'))
if limit <= 0:
msg = 'limit value should be strictly positive'
return api.get_exception(
HTTPStatus.BAD_REQUEST, headers, request.format,
'InvalidParameterValue', msg)
relevant_processes = list(api.manager.processes)[:limit]
except TypeError:
LOGGER.debug('returning all processes')
relevant_processes = api.manager.processes.keys()
except ValueError:
msg = 'limit value should be an integer'
return api.get_exception(
HTTPStatus.BAD_REQUEST, headers, request.format,
'InvalidParameterValue', msg)
for key in relevant_processes:
p = api.manager.get_processor(key)
p2 = l10n.translate_struct(deepcopy(p.metadata),
request.locale)
p2['id'] = key
if process is None:
p2.pop('inputs')
p2.pop('outputs')
p2.pop('example', None)
p2['jobControlOptions'] = ['sync-execute']
if api.manager.is_async:
p2['jobControlOptions'].append('async-execute')
p2['outputTransmission'] = ['value']
p2['links'] = p2.get('links', [])
jobs_url = f"{api.base_url}/jobs"
process_url = f"{api.base_url}/processes/{key}"
# TODO translation support
link = {
'type': FORMAT_TYPES[F_JSON],
'rel': request.get_linkrel(F_JSON),
'href': f'{process_url}?f={F_JSON}',
'title': 'Process description as JSON',
'hreflang': api.default_locale
}
p2['links'].append(link)
link = {
'type': FORMAT_TYPES[F_HTML],
'rel': request.get_linkrel(F_HTML),
'href': f'{process_url}?f={F_HTML}',
'title': 'Process description as HTML',
'hreflang': api.default_locale
}
p2['links'].append(link)
link = {
'type': FORMAT_TYPES[F_HTML],
'rel': 'http://www.opengis.net/def/rel/ogc/1.0/job-list',
'href': f'{jobs_url}?f={F_HTML}',
'title': 'jobs for this process as HTML',
'hreflang': api.default_locale
}
p2['links'].append(link)
link = {
'type': FORMAT_TYPES[F_JSON],
'rel': 'http://www.opengis.net/def/rel/ogc/1.0/job-list',
'href': f'{jobs_url}?f={F_JSON}',
'title': 'jobs for this process as JSON',
'hreflang': api.default_locale
}
p2['links'].append(link)
link = {
'type': FORMAT_TYPES[F_JSON],
'rel': 'http://www.opengis.net/def/rel/ogc/1.0/execute',
'href': f'{process_url}/execution?f={F_JSON}',
'title': 'Execution for this process as JSON',
'hreflang': api.default_locale
}
p2['links'].append(link)
processes.append(p2)
if process is not None:
response = processes[0]
else:
process_url = f"{api.base_url}/processes"
response = {
'processes': processes,
'links': [{
'type': FORMAT_TYPES[F_JSON],
'rel': request.get_linkrel(F_JSON),
'title': 'This document as JSON',
'href': f'{process_url}?f={F_JSON}'
}, {
'type': FORMAT_TYPES[F_JSONLD],
'rel': request.get_linkrel(F_JSONLD),
'title': 'This document as RDF (JSON-LD)',
'href': f'{process_url}?f={F_JSONLD}'
}, {
'type': FORMAT_TYPES[F_HTML],
'rel': request.get_linkrel(F_HTML),
'title': 'This document as HTML',
'href': f'{process_url}?f={F_HTML}'
}]
}
if request.format == F_HTML: # render
if process is not None:
response = render_j2_template(api.tpl_config,
'processes/process.html',
response, request.locale)
else:
response = render_j2_template(api.tpl_config,
'processes/index.html', response,
request.locale)
return headers, HTTPStatus.OK, response
return headers, HTTPStatus.OK, to_json(response, api.pretty_print)
# TODO: get_jobs doesn't have tests
def get_jobs(api: API, request: APIRequest,
job_id=None) -> Tuple[dict, int, str]:
"""
Get process jobs
:param request: A request object
:param job_id: id of job
:returns: tuple of headers, status code, content
"""
headers = request.get_response_headers(SYSTEM_LOCALE,
**api.api_headers)
if job_id is None:
jobs = sorted(api.manager.get_jobs(),
key=lambda k: k['job_start_datetime'],
reverse=True)
else:
try:
jobs = [api.manager.get_job(job_id)]
except JobNotFoundError:
return api.get_exception(
HTTPStatus.NOT_FOUND, headers, request.format,
'InvalidParameterValue', job_id)
serialized_jobs = {
'jobs': [],
'links': [{
'href': f"{api.base_url}/jobs?f={F_HTML}",
'rel': request.get_linkrel(F_HTML),
'type': FORMAT_TYPES[F_HTML],
'title': 'Jobs list as HTML'
}, {
'href': f"{api.base_url}/jobs?f={F_JSON}",
'rel': request.get_linkrel(F_JSON),
'type': FORMAT_TYPES[F_JSON],
'title': 'Jobs list as JSON'
}]
}
for job_ in jobs:
job2 = {
'type': 'process',
'processID': job_['process_id'],
'jobID': job_['identifier'],
'status': job_['status'],
'message': job_['message'],
'progress': job_['progress'],
'parameters': job_.get('parameters'),
'job_start_datetime': job_['job_start_datetime'],
'job_end_datetime': job_['job_end_datetime']
}
# TODO: translate
if JobStatus[job_['status']] in (
JobStatus.successful, JobStatus.running, JobStatus.accepted):
job_result_url = f"{api.base_url}/jobs/{job_['identifier']}/results" # noqa
job2['links'] = [{
'href': f'{job_result_url}?f={F_HTML}',
'rel': 'http://www.opengis.net/def/rel/ogc/1.0/results',
'type': FORMAT_TYPES[F_HTML],
'title': f'results of job {job_id} as HTML'
}, {
'href': f'{job_result_url}?f={F_JSON}',
'rel': 'http://www.opengis.net/def/rel/ogc/1.0/results',
'type': FORMAT_TYPES[F_JSON],
'title': f'results of job {job_id} as JSON'
}]
if job_['mimetype'] not in (FORMAT_TYPES[F_JSON],
FORMAT_TYPES[F_HTML]):
job2['links'].append({
'href': job_result_url,
'rel': 'http://www.opengis.net/def/rel/ogc/1.0/results', # noqa
'type': job_['mimetype'],
'title': f"results of job {job_id} as {job_['mimetype']}" # noqa
})
serialized_jobs['jobs'].append(job2)
if job_id is None:
j2_template = 'jobs/index.html'
else:
serialized_jobs = serialized_jobs['jobs'][0]
j2_template = 'jobs/job.html'
if request.format == F_HTML:
data = {
'jobs': serialized_jobs,
'now': datetime.now(timezone.utc).strftime(DATETIME_FORMAT)
}
response = render_j2_template(api.tpl_config, j2_template, data,
request.locale)
return headers, HTTPStatus.OK, response
return headers, HTTPStatus.OK, to_json(serialized_jobs,
api.pretty_print)
def execute_process(api: API, request: APIRequest,
process_id) -> Tuple[dict, int, str]:
"""
Execute process
:param request: A request object
:param process_id: id of process
:returns: tuple of headers, status code, content
"""
# Responses are always in US English only
headers = request.get_response_headers(SYSTEM_LOCALE,
**api.api_headers)
if process_id not in api.manager.processes:
msg = 'identifier not found'
return api.get_exception(
HTTPStatus.NOT_FOUND, headers,
request.format, 'NoSuchProcess', msg)
data = request.data
if not data:
# TODO not all processes require input, e.g. time-dependent or
# random value generators
msg = 'missing request data'
return api.get_exception(
HTTPStatus.BAD_REQUEST, headers, request.format,
'MissingParameterValue', msg)
try:
# Parse bytes data, if applicable
data = data.decode()
LOGGER.debug(data)
except (UnicodeDecodeError, AttributeError):
pass
try:
data = json.loads(data)
except (json.decoder.JSONDecodeError, TypeError) as err:
# Input does not appear to be valid JSON
LOGGER.error(err)
msg = 'invalid request data'
return api.get_exception(
HTTPStatus.BAD_REQUEST, headers, request.format,
'InvalidParameterValue', msg)
data_dict = data.get('inputs', {})
LOGGER.debug(data_dict)
subscriber = None
subscriber_dict = data.get('subscriber')
if subscriber_dict:
try:
success_uri = subscriber_dict['successUri']
except KeyError:
return api.get_exception(
HTTPStatus.BAD_REQUEST, headers, request.format,
'MissingParameterValue', 'Missing successUri')
else:
subscriber = Subscriber(
# NOTE: successUri is mandatory according to the standard
success_uri=success_uri,
in_progress_uri=subscriber_dict.get('inProgressUri'),
failed_uri=subscriber_dict.get('failedUri'),
)
try:
execution_mode = RequestedProcessExecutionMode(
request.headers.get('Prefer', request.headers.get('prefer'))
)
except ValueError:
execution_mode = None
try:
LOGGER.debug('Executing process')
result = api.manager.execute_process(
process_id, data_dict, execution_mode=execution_mode,
subscriber=subscriber)
job_id, mime_type, outputs, status, additional_headers = result
headers.update(additional_headers or {})
headers['Location'] = f'{api.base_url}/jobs/{job_id}'
except ProcessorExecuteError as err:
LOGGER.error(err)
return api.get_exception(
err.http_status_code, headers,
request.format, err.ogc_exception_code, err.message)
response = {}
if status == JobStatus.failed:
response = outputs
if data.get('response', 'raw') == 'raw':
headers['Content-Type'] = mime_type
response = outputs
elif status not in (JobStatus.failed, JobStatus.accepted):
response['outputs'] = [outputs]
if status == JobStatus.accepted:
http_status = HTTPStatus.CREATED
elif status == JobStatus.failed:
http_status = HTTPStatus.BAD_REQUEST
else:
http_status = HTTPStatus.OK
if mime_type == 'application/json':
response2 = to_json(response, api.pretty_print)
else:
response2 = response
return headers, http_status, response2
def get_job_result(api: API, request: APIRequest,
job_id) -> Tuple[dict, int, str]:
"""
Get result of job (instance of a process)
:param request: A request object
:param job_id: ID of job
:returns: tuple of headers, status code, content
"""
headers = request.get_response_headers(SYSTEM_LOCALE,
**api.api_headers)
try:
job = api.manager.get_job(job_id)
except JobNotFoundError:
return api.get_exception(
HTTPStatus.NOT_FOUND, headers,
request.format, 'NoSuchJob', job_id
)
status = JobStatus[job['status']]
if status == JobStatus.running:
msg = 'job still running'
return api.get_exception(
HTTPStatus.NOT_FOUND, headers,
request.format, 'ResultNotReady', msg)
elif status == JobStatus.accepted:
# NOTE: this case is not mentioned in the specification
msg = 'job accepted but not yet running'
return api.get_exception(
HTTPStatus.NOT_FOUND, headers,
request.format, 'ResultNotReady', msg)
elif status == JobStatus.failed:
msg = 'job failed'
return api.get_exception(
HTTPStatus.BAD_REQUEST, headers, request.format,
'InvalidParameterValue', msg)
try:
mimetype, job_output = api.manager.get_job_result(job_id)
except JobResultNotFoundError:
return api.get_exception(
HTTPStatus.INTERNAL_SERVER_ERROR, headers,
request.format, 'JobResultNotFound', job_id
)
if mimetype not in (None, FORMAT_TYPES[F_JSON]):
headers['Content-Type'] = mimetype
content = job_output
else:
if request.format == F_JSON:
content = json.dumps(job_output, sort_keys=True, indent=4,
default=json_serial)
else:
# HTML
headers['Content-Type'] = "text/html"
data = {
'job': {'id': job_id},
'result': job_output
}
content = render_j2_template(
api.config, 'jobs/results/index.html',
data, request.locale)
return headers, HTTPStatus.OK, content
def delete_job(
api: API, request: APIRequest, job_id
) -> Tuple[dict, int, str]:
"""
Delete a process job
:param job_id: job identifier
:returns: tuple of headers, status code, content
"""
response_headers = request.get_response_headers(
SYSTEM_LOCALE, **api.api_headers)
try:
success = api.manager.delete_job(job_id)
except JobNotFoundError:
return api.get_exception(
HTTPStatus.NOT_FOUND, response_headers, request.format,
'NoSuchJob', job_id
)
else:
if success:
http_status = HTTPStatus.OK
jobs_url = f"{api.base_url}/jobs"
response = {
'jobID': job_id,
'status': JobStatus.dismissed.value,
'message': 'Job dismissed',
'progress': 100,
'links': [{
'href': jobs_url,
'rel': 'up',
'type': FORMAT_TYPES[F_JSON],
'title': 'The job list for the current process'
}]
}
else:
return api.get_exception(
HTTPStatus.INTERNAL_SERVER_ERROR, response_headers,
request.format, 'InternalError', job_id
)
LOGGER.info(response)
# TODO: this response does not have any headers
return {}, http_status, response
def get_oas_30(cfg: dict, locale: str) -> tuple[list[dict[str, str]], dict[str, dict]]: # noqa
"""
Get OpenAPI fragments
:param cfg: `dict` of configuration
:param locale: `str` of locale
:returns: `tuple` of `list` of tag objects, and `dict` of path objects
"""
from pygeoapi.openapi import OPENAPI_YAML
LOGGER.debug('setting up processes endpoints')
oas = {'tags': []}
paths = {}
process_manager = get_manager(cfg)
if len(process_manager.processes) > 0:
paths['/processes'] = {
'get': {
'summary': 'Processes',
'description': 'Processes',
'tags': ['server'],
'operationId': 'getProcesses',
'parameters': [
{'$ref': '#/components/parameters/f'}
],
'responses': {
'200': {'$ref': f"{OPENAPI_YAML['oapip']}/responses/ProcessList.yaml"}, # noqa
'default': {'$ref': '#/components/responses/default'}
}
}
}
LOGGER.debug('setting up processes')
for k, v in process_manager.processes.items():
if k.startswith('_'):
LOGGER.debug(f'Skipping hidden layer: {k}')
continue
name = l10n.translate(k, locale)
p = process_manager.get_processor(k)
md_desc = l10n.translate(p.metadata['description'], locale)
process_name_path = f'/processes/{name}'
tag = {
'name': name,
'description': md_desc,
'externalDocs': {}
}
for link in p.metadata.get('links', []):
if link['type'] == 'information':
translated_link = l10n.translate(link, locale)
tag['externalDocs']['description'] = translated_link[
'type']
tag['externalDocs']['url'] = translated_link['url']
break
if len(tag['externalDocs']) == 0:
del tag['externalDocs']
oas['tags'].append(tag)
paths[process_name_path] = {
'get': {
'summary': 'Get process metadata',
'description': md_desc,
'tags': [name],
'operationId': f'describe{name.capitalize()}Process',
'parameters': [
{'$ref': '#/components/parameters/f'}
],
'responses': {
'200': {'$ref': '#/components/responses/200'},
'default': {'$ref': '#/components/responses/default'}
}
}
}
paths[f'{process_name_path}/execution'] = {
'post': {
'summary': f"Process {l10n.translate(p.metadata['title'], locale)} execution", # noqa
'description': md_desc,
'tags': [name],
'operationId': f'execute{name.capitalize()}Job',
'responses': {
'200': {'$ref': '#/components/responses/200'},
'201': {'$ref': f"{OPENAPI_YAML['oapip']}/responses/ExecuteAsync.yaml"}, # noqa
'404': {'$ref': f"{OPENAPI_YAML['oapip']}/responses/NotFound.yaml"}, # noqa
'500': {'$ref': f"{OPENAPI_YAML['oapip']}/responses/ServerError.yaml"}, # noqa
'default': {'$ref': '#/components/responses/default'}
},
'requestBody': {
'description': 'Mandatory execute request JSON',
'required': True,
'content': {
'application/json': {
'schema': {
'$ref': f"{OPENAPI_YAML['oapip']}/schemas/execute.yaml" # noqa
}
}
}
}
}
}
if 'example' in p.metadata:
paths[f'{process_name_path}/execution']['post']['requestBody']['content']['application/json']['example'] = p.metadata['example'] # noqa
name_in_path = {
'name': 'jobId',
'in': 'path',
'description': 'job identifier',
'required': True,
'schema': {
'type': 'string'
}
}
paths['/jobs'] = {
'get': {
'summary': 'Retrieve jobs list',
'description': 'Retrieve a list of jobs',
'tags': ['jobs'],
'operationId': 'getJobs',
'responses': {
'200': {'$ref': '#/components/responses/200'},
'404': {'$ref': f"{OPENAPI_YAML['oapip']}/responses/NotFound.yaml"}, # noqa
'default': {'$ref': '#/components/responses/default'}
}
}
}
paths['/jobs/{jobId}'] = {
'get': {
'summary': 'Retrieve job details',
'description': 'Retrieve job details',
'tags': ['jobs'],
'parameters': [
name_in_path,
{'$ref': '#/components/parameters/f'}
],
'operationId': 'getJob',
'responses': {
'200': {'$ref': '#/components/responses/200'},
'404': {'$ref': f"{OPENAPI_YAML['oapip']}/responses/NotFound.yaml"}, # noqa
'default': {'$ref': '#/components/responses/default'}
}
},
'delete': {
'summary': 'Cancel / delete job',
'description': 'Cancel / delete job',
'tags': ['jobs'],
'parameters': [
name_in_path
],
'operationId': 'deleteJob',
'responses': {
'204': {'$ref': '#/components/responses/204'},
'404': {'$ref': f"{OPENAPI_YAML['oapip']}/responses/NotFound.yaml"}, # noqa
'default': {'$ref': '#/components/responses/default'}
}
},
}
paths['/jobs/{jobId}/results'] = {
'get': {
'summary': 'Retrieve job results',
'description': 'Retrive job resiults',
'tags': ['jobs'],
'parameters': [
name_in_path,
{'$ref': '#/components/parameters/f'}
],
'operationId': 'getJobResults',
'responses': {
'200': {'$ref': '#/components/responses/200'},
'404': {'$ref': f"{OPENAPI_YAML['oapip']}/responses/NotFound.yaml"}, # noqa
'default': {'$ref': '#/components/responses/default'}
}
}
}
return [{'name': 'proceses'}, {'name': 'jobs'}], {'paths': paths}