OAProc updates (#840)

* update OAProc jobs as a top-level resource (#838)

* update config

* update copyright year
This commit is contained in:
Tom Kralidis
2022-01-04 08:43:39 -05:00
committed by GitHub
parent 6271e41bc7
commit 5d072f4efa
15 changed files with 191 additions and 246 deletions
@@ -62,8 +62,8 @@ Processing examples
* http://localhost:5000/processes
* describe the ``hello-world`` process
* http://localhost:5000/processes/hello-world
* show all jobs for the ``hello-world`` process
* http://localhost:5000/processes/hello-world/jobs
* show all jobs
* http://localhost:5000/jobs
* execute a job for the ``hello-world`` process
* ``curl -X POST "http://localhost:5000/processes/hello-world/execution" -H "Content-Type: application/json" -d "{\"inputs\":{\"name\": \"hi there2\"}}"``
* execute a job for the ``hello-world`` process with a raw response (default)
+1 -1
View File
@@ -77,7 +77,7 @@ metadata:
- api
keywords_type: theme
terms_of_service: https://creativecommons.org/licenses/by/4.0/
url: http://example.org
url: https://example.org
license:
name: CC-BY 4.0 license
url: https://creativecommons.org/licenses/by/4.0/
+48 -65
View File
@@ -4,7 +4,7 @@
# Francesco Bartoli <xbartolone@gmail.com>
# Sander Schaminee <sander.schaminee@geocat.net>
#
# Copyright (c) 2021 Tom Kralidis
# Copyright (c) 2022 Tom Kralidis
# Copyright (c) 2020 Francesco Bartoli
#
# Permission is hereby granted, free of charge, to any person
@@ -675,6 +675,16 @@ class API:
'type': FORMAT_TYPES[F_JSON],
'title': 'Collections',
'href': '{}/collections'.format(self.config['server']['url'])
}, {
'rel': 'http://www.opengis.net/def/rel/ogc/1.0/processes',
'type': FORMAT_TYPES[F_JSON],
'title': 'Processes',
'href': '{}/processes'.format(self.config['server']['url'])
}, {
'rel': 'http://www.opengis.net/def/rel/ogc/1.0/job-list',
'type': FORMAT_TYPES[F_JSON],
'title': 'Jobs',
'href': '{}/jobs'.format(self.config['server']['url'])
}]
headers = request.get_response_headers()
@@ -2506,8 +2516,7 @@ class API:
p2['outputTransmission'] = ['value']
p2['links'] = p2.get('links', [])
jobs_url = '{}/processes/{}/jobs'.format(
self.config['server']['url'], key)
jobs_url = '{}/jobs'.format(self.config['server']['url'])
# TODO translation support
link = {
@@ -2553,13 +2562,12 @@ class API:
@gzip
@pre_process
def get_process_jobs(self, request: Union[APIRequest, Any],
process_id, job_id=None) -> Tuple[dict, int, str]:
def get_jobs(self, request: Union[APIRequest, Any],
job_id=None) -> Tuple[dict, int, str]:
"""
Get process jobs
:param request: A request object
:param process_id: id of process
:param job_id: id of job
:returns: tuple of headers, status code, content
@@ -2569,30 +2577,35 @@ class API:
return self.get_format_exception(request)
headers = request.get_response_headers(SYSTEM_LOCALE)
processes = filter_dict_by_key_value(
self.config['resources'], 'type', 'process')
if process_id not in processes:
msg = 'identifier not found'
return self.get_exception(
404, headers, request.format, 'NoSuchProcess', msg)
p = load_plugin('process', processes[process_id]['processor'])
if self.manager:
if job_id is None:
jobs = sorted(self.manager.get_jobs(process_id),
print(self.manager.get_jobs())
jobs = sorted(self.manager.get_jobs(),
key=lambda k: k['job_start_datetime'],
reverse=True)
else:
jobs = [self.manager.get_job(process_id, job_id)]
jobs = [self.manager.get_job(job_id)]
else:
LOGGER.debug('Process management not configured')
jobs = []
serialized_jobs = []
serialized_jobs = {
'jobs': [],
'links': [{
'href': '{}/jobs?f={}'.format(self.config['server']['url'], F_HTML), # noqa
'rel': request.get_linkrel(F_HTML),
'type': FORMAT_TYPES[F_HTML],
'title': 'Jobs list as HTML'
}, {
'href': '{}/jobs?f={}'.format(self.config['server']['url'], F_JSON), # noqa
'rel': request.get_linkrel(F_JSON),
'type': FORMAT_TYPES[F_JSON],
'title': 'Jobs list as JSON'
}]
}
for job_ in jobs:
job2 = {
'processID': job_['process_id'],
'jobID': job_['identifier'],
'status': job_['status'],
'message': job_['message'],
@@ -2606,9 +2619,8 @@ class API:
if JobStatus[job_['status']] in (
JobStatus.successful, JobStatus.running, JobStatus.accepted):
job_result_url = '{}/processes/{}/jobs/{}/results'.format(
self.config['server']['url'],
process_id, job_['identifier'])
job_result_url = '{}/jobs/{}/results'.format(
self.config['server']['url'], job_['identifier'])
job2['links'] = [{
'href': '{}?f={}'.format(job_result_url, F_HTML),
@@ -2632,21 +2644,16 @@ class API:
job_id, job_['mimetype'])
})
serialized_jobs.append(job2)
serialized_jobs['jobs'].append(job2)
if job_id is None:
j2_template = 'processes/jobs/index.html'
j2_template = 'jobs/index.html'
else:
serialized_jobs = serialized_jobs[0]
j2_template = 'processes/jobs/job.html'
serialized_jobs = serialized_jobs['jobs'][0]
j2_template = 'jobs/job.html'
if request.format == F_HTML:
data = {
'process': {
'id': process_id,
'title': l10n.translate(p.metadata['title'],
SYSTEM_LOCALE)
},
'jobs': serialized_jobs,
'now': datetime.now(timezone.utc).strftime(DATETIME_FORMAT)
}
@@ -2719,8 +2726,8 @@ class API:
LOGGER.debug(data_dict)
job_id = data.get("job_id", str(uuid.uuid1()))
url = '{}/processes/{}/jobs/{}'.format(
self.config['server']['url'], process_id, job_id)
url = '{}/jobs/{}'.format(
self.config['server']['url'], job_id)
headers['Location'] = url
@@ -2767,13 +2774,12 @@ class API:
@gzip
@pre_process
def get_process_job_result(self, request: Union[APIRequest, Any],
process_id, job_id) -> Tuple[dict, int, str]:
def get_job_result(self, request: Union[APIRequest, Any],
job_id) -> Tuple[dict, int, str]:
"""
Get result of job (instance of a process)
:param request: A request object
:param process_id: name of process
:param job_id: ID of job
:returns: tuple of headers, status code, content
@@ -2783,23 +2789,7 @@ class API:
return self.get_format_exception(request)
headers = request.get_response_headers(SYSTEM_LOCALE)
processes_config = filter_dict_by_key_value(self.config['resources'],
'type', 'process')
if process_id not in processes_config:
msg = 'identifier not found'
return self.get_exception(
404, headers, request.format, 'NoSuchProcess', msg)
process = load_plugin('process',
processes_config[process_id]['processor'])
if not process:
msg = 'identifier not found'
return self.get_exception(
404, headers, request.format, 'NoSuchProcess', msg)
job = self.manager.get_job(process_id, job_id)
job = self.manager.get_job(job_id)
if not job:
msg = 'job not found'
@@ -2824,7 +2814,7 @@ class API:
return self.get_exception(
400, headers, request.format, 'InvalidParameterValue', msg)
mimetype, job_output = self.manager.get_job_result(process_id, job_id)
mimetype, job_output = self.manager.get_job_result(job_id)
if mimetype not in (None, FORMAT_TYPES[F_JSON]):
headers['Content-Type'] = mimetype
@@ -2836,31 +2826,25 @@ class API:
else:
# HTML
data = {
'process': {
'id': process_id,
'title': l10n.translate(process.metadata['title'],
SYSTEM_LOCALE)
},
'job': {'id': job_id},
'result': job_output
}
content = render_j2_template(
self.config, 'processes/jobs/results/index.html',
self.config, 'jobs/results/index.html',
data, SYSTEM_LOCALE)
return headers, 200, content
def delete_process_job(self, process_id, job_id) -> Tuple[dict, int, str]:
def delete_job(self, job_id) -> Tuple[dict, int, str]:
"""
Delete a process job
:param process_id: process identifier
:param job_id: job identifier
:returns: tuple of headers, status code, content
"""
success = self.manager.delete_job(process_id, job_id)
success = self.manager.delete_job(job_id)
if not success:
http_status = 404
@@ -2870,8 +2854,7 @@ class API:
}
else:
http_status = 200
jobs_url = '{}/processes/{}/jobs'.format(
self.config['server']['url'], process_id)
jobs_url = '{}/jobs'.format(self.config['server']['url'])
response = {
'jobID': job_id,
+14 -19
View File
@@ -3,7 +3,7 @@
# Authors: Tom Kralidis <tomkralidis@gmail.com>
# Norman Barker <norman.barker@gmail.com>
#
# Copyright (c) 2020 Tom Kralidis
# Copyright (c) 2022 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
@@ -296,27 +296,25 @@ def get_processes(process_id=None):
return get_response(api_.describe_processes(request, process_id))
@BLUEPRINT.route('/processes/<process_id>/jobs')
@BLUEPRINT.route('/processes/<process_id>/jobs/<job_id>',
@BLUEPRINT.route('/jobs')
@BLUEPRINT.route('/jobs/<job_id>',
methods=['GET', 'DELETE'])
def get_process_jobs(process_id=None, job_id=None):
def get_jobs(job_id=None):
"""
OGC API - Processes jobs endpoint
:param process_id: process identifier
:param job_id: job identifier
:returns: HTTP response
"""
if job_id is None:
return get_response(api_.get_process_jobs(request, process_id))
return get_response(api_.get_jobs(request))
else:
if request.method == 'DELETE': # dismiss job
return get_response(api_.delete_process_job(process_id, job_id))
return get_response(api_.delete_job(job_id))
else: # Return status of a specific job
return get_response(api_.get_process_jobs(
request, process_id, job_id))
return get_response(api_.get_jobs(request, job_id))
@BLUEPRINT.route('/processes/<process_id>/execution', methods=['POST'])
@@ -332,35 +330,32 @@ def execute_process_jobs(process_id):
return get_response(api_.execute_process(request, process_id))
@BLUEPRINT.route('/processes/<process_id>/jobs/<job_id>/results',
@BLUEPRINT.route('/jobs/<job_id>/results',
methods=['GET'])
def get_process_job_result(process_id=None, job_id=None):
def get_job_result(job_id=None):
"""
OGC API - Processes job result endpoint
:param process_id: process identifier
:param job_id: job identifier
:returns: HTTP response
"""
return get_response(api_.get_process_job_result(
request, process_id, job_id))
return get_response(api_.get_job_result(request, job_id))
@BLUEPRINT.route('/processes/<process_id>/jobs/<job_id>/results/<resource>',
@BLUEPRINT.route('/jobs/<job_id>/results/<resource>',
methods=['GET'])
def get_process_job_result_resource(process_id, job_id, resource):
def get_job_result_resource(job_id, resource):
"""
OGC API - Processes job result resource endpoint
:param process_id: process identifier
:param job_id: job identifier
:param resource: job resource
:returns: HTTP response
"""
return get_response(api_.get_process_job_result_resource(
request, process_id, job_id, resource))
return get_response(api_.get_job_result_resource(
request, job_id, resource))
@BLUEPRINT.route('/collections/<collection_id>/position')
+63 -63
View File
@@ -2,7 +2,7 @@
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
#
# Copyright (c) 2021 Tom Kralidis
# Copyright (c) 2022 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
@@ -47,7 +47,7 @@ LOGGER = logging.getLogger(__name__)
OPENAPI_YAML = {
'oapif': 'http://schemas.opengis.net/ogcapi/features/part1/1.0/openapi/ogcapi-features-1.yaml', # noqa
'oapip': 'https://raw.githubusercontent.com/opengeospatial/ogcapi-processes/master/core/openapi', # noqa
'oapip': 'http://schemas.opengis.net/ogcapi/processes/part1/1.0/openapi',
'oacov': 'https://raw.githubusercontent.com/tomkralidis/ogcapi-coverages-1/fix-cis/yaml-unresolved', # noqa
'oapit': 'https://raw.githubusercontent.com/opengeospatial/ogcapi-tiles/master/openapi/swaggerhub/tiles.yaml', # noqa
'oapimt': 'https://raw.githubusercontent.com/opengeospatial/ogcapi-tiles/master/openapi/swaggerhub/map-tiles.yaml', # noqa
@@ -908,19 +908,6 @@ def get_oas_30(cfg):
}
}
}
paths['{}/jobs'.format(process_name_path)] = {
'get': {
'summary': 'Retrieve job list for process',
'description': md_desc,
'tags': [name],
'operationId': 'get{}Jobs'.format(name.capitalize()),
'responses': {
'200': {'$ref': '#/components/responses/200'},
'404': {'$ref': '{}/responses/NotFound.yaml'.format(OPENAPI_YAML['oapip'])}, # noqa
'default': {'$ref': '#/components/responses/default'}
}
}
}
paths['{}/execution'.format(process_name_path)] = {
'post': {
@@ -962,57 +949,70 @@ def get_oas_30(cfg):
}
}
if has_manager:
# TODO: define jobId as parameter in dict
paths[f'{process_name_path}/jobs/{{jobId}}'] = {
'get': {
'summary': 'Retrieve job details',
'description': '',
'tags': [name],
'parameters': [
name_in_path,
{'$ref': '#/components/parameters/f'}
],
'operationId': f'get{name.capitalize()}Job',
'responses': {
'200': {'$ref': '#/components/responses/200'},
'404': {'$ref': '{}/responses/NotFound.yaml'.format(OPENAPI_YAML['oapip'])}, # noqa
'default': {'$ref': '#/components/responses/default'} # noqa
}
},
'delete': {
'summary': 'Cancel / delete job',
'description': '',
'tags': [name],
'parameters': [
name_in_path
],
'operationId': f'delete{name.capitalize()}Job',
'responses': {
'204': {'$ref': '#/components/responses/204'},
'404': {'$ref': '{}/responses/NotFound.yaml'.format(OPENAPI_YAML['oapip'])}, # noqa
'default': {'$ref': '#/components/responses/default'} # noqa
}
},
if has_manager:
paths['/jobs'] = {
'get': {
'summary': 'Retrieve jobs list',
'description': 'Retrieve a list of jobs',
'tags': ['server'],
'operationId': 'getJobs',
'responses': {
'200': {'$ref': '#/components/responses/200'},
'404': {'$ref': '{}/responses/NotFound.yaml'.format(OPENAPI_YAML['oapip'])}, # noqa
'default': {'$ref': '#/components/responses/default'}
}
}
}
paths[f'{process_name_path}/jobs/{{jobId}}/results'] = {
'get': {
'summary': 'Retrieve job results',
'description': '',
'tags': [name],
'parameters': [
name_in_path,
{'$ref': '#/components/parameters/f'}
],
'operationId': f'get{name.capitalize()}JobResults',
'responses': {
'200': {'$ref': '#/components/responses/200'},
'404': {'$ref': '{}/responses/NotFound.yaml'.format(OPENAPI_YAML['oapip'])}, # noqa
'default': {'$ref': '#/components/responses/default'} # noqa
}
},
paths['/jobs/{jobId}'] = {
'get': {
'summary': 'Retrieve job details',
'description': 'Retrieve job details',
'tags': ['server'],
'parameters': [
name_in_path,
{'$ref': '#/components/parameters/f'}
],
'operationId': 'getJob',
'responses': {
'200': {'$ref': '#/components/responses/200'},
'404': {'$ref': '{}/responses/NotFound.yaml'.format(OPENAPI_YAML['oapip'])}, # noqa
'default': {'$ref': '#/components/responses/default'} # noqa
}
},
'delete': {
'summary': 'Cancel / delete job',
'description': 'Cancel / delete job',
'tags': ['server'],
'parameters': [
name_in_path
],
'operationId': 'deleteJob',
'responses': {
'204': {'$ref': '#/components/responses/204'},
'404': {'$ref': '{}/responses/NotFound.yaml'.format(OPENAPI_YAML['oapip'])}, # noqa
'default': {'$ref': '#/components/responses/default'} # noqa
}
},
}
paths['/jobs/{jobId}/results'] = {
'get': {
'summary': 'Retrieve job results',
'description': 'Retrive job resiults',
'tags': ['server'],
'parameters': [
name_in_path,
{'$ref': '#/components/parameters/f'}
],
'operationId': 'getJobResults',
'responses': {
'200': {'$ref': '#/components/responses/200'},
'404': {'$ref': '{}/responses/NotFound.yaml'.format(OPENAPI_YAML['oapip'])}, # noqa
'default': {'$ref': '#/components/responses/default'} # noqa
}
}
}
oas['paths'] = paths
+9 -14
View File
@@ -2,7 +2,7 @@
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
#
# Copyright (c) 2020 Tom Kralidis
# Copyright (c) 2022 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
@@ -56,11 +56,10 @@ class BaseManager:
self.connection = manager_def.get('connection', None)
self.output_dir = manager_def.get('output_dir', None)
def get_jobs(self, process_id=None, status=None):
def get_jobs(self, status=None):
"""
Get process jobs, optionally filtered by status
:param process_id: process identifier
:param status: job status (accepted, running, successful,
failed, results) (default is all)
@@ -80,11 +79,10 @@ class BaseManager:
raise NotImplementedError()
def update_job(self, process_id, job_id, update_dict):
def update_job(self, job_id, update_dict):
"""
Updates a job
:param process_id: process identifier
:param job_id: job identifier
:param update_dict: `dict` of property updates
@@ -93,11 +91,10 @@ class BaseManager:
raise NotImplementedError()
def get_job(self, process_id, job_id):
def get_job(self, job_id):
"""
Get a job (!)
:param process_id: process identifier
:param job_id: job identifier
:returns: `dict` of job result
@@ -105,11 +102,10 @@ class BaseManager:
raise NotImplementedError()
def get_job_result(self, process_id, job_id):
def get_job_result(self, job_id):
"""
Returns the actual output from a completed process
:param process_id: process identifier
:param job_id: job identifier
:returns: `tuple` of mimetype and raw output
@@ -117,11 +113,10 @@ class BaseManager:
raise NotImplementedError()
def delete_job(self, process_id, job_id):
def delete_job(self, job_id):
"""
Deletes a job and associated results/outputs
:param process_id: process identifier
:param job_id: job identifier
:returns: `bool` of status result
@@ -193,7 +188,7 @@ class BaseManager:
current_status = JobStatus.running
jfmt, outputs = p.execute(data_dict)
self.update_job(process_id, job_id, {
self.update_job(job_id, {
'status': current_status.value,
'message': 'Writing job output',
'progress': 95
@@ -224,7 +219,7 @@ class BaseManager:
'progress': 100
}
self.update_job(process_id, job_id, job_update_metadata)
self.update_job(job_id, job_update_metadata)
except Exception as err:
# TODO assess correct exception type and description to help users
@@ -253,7 +248,7 @@ class BaseManager:
jfmt = 'application/json'
self.update_job(process_id, job_id, job_metadata)
self.update_job(job_id, job_metadata)
return jfmt, outputs, current_status
+2 -3
View File
@@ -2,7 +2,7 @@
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
#
# Copyright (c) 2020 Tom Kralidis
# Copyright (c) 2022 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
@@ -49,11 +49,10 @@ class DummyManager(BaseManager):
super().__init__(manager_def)
def get_jobs(self, process_id=None, status=None):
def get_jobs(self, status=None):
"""
Get process jobs, optionally filtered by status
:param process_id: process identifier
:param status: job status (accepted, running, successful,
failed, results) (default is all)
+12 -23
View File
@@ -2,7 +2,7 @@
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
#
# Copyright (c) 2020 Tom Kralidis
# Copyright (c) 2022 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
@@ -77,11 +77,10 @@ class TinyDBManager(BaseManager):
self.db.close()
return True
def get_jobs(self, process_id=None, status=None):
def get_jobs(self, status=None):
"""
Get jobs
:param process_id: process identifier
:param status: job status (accepted, running, successful,
failed, results) (default is all)
@@ -89,12 +88,7 @@ class TinyDBManager(BaseManager):
"""
self._connect()
if process_id is None:
jobs_list = [doc.doc_id for doc in self.db.all()]
else:
query = tinydb.Query()
jobs_list = self.db.search(query.process_id == process_id)
jobs_list = self.db.all()
self.db.close()
return jobs_list
@@ -114,11 +108,10 @@ class TinyDBManager(BaseManager):
return doc_id
def update_job(self, process_id, job_id, update_dict):
def update_job(self, job_id, update_dict):
"""
Updates a job
:param process_id: process identifier
:param job_id: job identifier
:param update_dict: `dict` of property updates
@@ -131,17 +124,16 @@ class TinyDBManager(BaseManager):
return True
def delete_job(self, process_id, job_id):
def delete_job(self, job_id):
"""
Deletes a job
:param process_id: process identifier
:param job_id: job identifier
:return `bool` of status result
"""
# delete result file if present
job_result = self.get_job(process_id, job_id)
job_result = self.get_job(job_id)
if job_result:
location = job_result.get('location', None)
if location and self.output_dir is not None:
@@ -153,38 +145,35 @@ class TinyDBManager(BaseManager):
return removed
def get_job(self, process_id, job_id):
def get_job(self, job_id):
"""
Get a single job
:param process_id: process identifier
:param jobid: job identifier
:param job_id: job identifier
:returns: `dict` # `pygeoapi.process.manager.Job`
"""
self._connect()
query = tinydb.Query()
result = self.db.search((
query.process_id == process_id) & (query.identifier == job_id))
result = self.db.search(query.identifier == job_id)
result = result[0] if result else None
self.db.close()
return result
def get_job_result(self, process_id, job_id):
def get_job_result(self, job_id):
"""
Get a job's status, and actual output of executing the process
:param process_id: process identifier
:param jobid: job identifier
:returns: `tuple` of mimetype and raw output
"""
job_result = self.get_job(process_id, job_id)
job_result = self.get_job(job_id)
if not job_result:
# processs/job does not exist
# job does not exist
return None
location = job_result.get('location', None)
+18 -30
View File
@@ -4,7 +4,7 @@
# Tom Kralidis <tomkralidis@gmail.com>
#
# Copyright (c) 2020 Francesco Bartoli
# Copyright (c) 2020 Tom Kralidis
# Copyright (c) 2022 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
@@ -321,33 +321,29 @@ async def get_processes(request: Request, process_id=None):
return get_response(api_.describe_processes(request, process_id))
@app.route('/processes/{process_id}/jobs')
@app.route('/processes/{process_id}/jobs/{job_id}', methods=['GET', 'DELETE'])
@app.route('/processes/{process_id}/jobs/{job_id}/', methods=['GET', 'DELETE'])
async def get_process_jobs(request: Request, process_id=None, job_id=None):
@app.route('/jobs')
@app.route('/jobs/{job_id}', methods=['GET', 'DELETE'])
@app.route('/jobs/{job_id}/', methods=['GET', 'DELETE'])
async def get_jobs(request: Request, job_id=None):
"""
OGC API - Processes jobs endpoint
:param request: Starlette Request instance
:param process_id: process identifier
:param job_id: job identifier
:returns: Starlette HTTP Response
"""
if 'process_id' in request.path_params:
process_id = request.path_params['process_id']
if 'job_id' in request.path_params:
job_id = request.path_params['job_id']
if job_id is None: # list of submit job
return get_response(api_.get_process_jobs(request, process_id))
return get_response(api_.get_jobs(request))
else: # get or delete job
if request.method == 'DELETE':
return get_response(api_.delete_process_job(process_id, job_id))
return get_response(api_.delete_job(job_id))
else: # Return status of a specific job
return get_response(api_.get_process_jobs(
request, process_id, job_id))
return get_response(api_.get_jobs(request, job_id))
@app.route('/processes/{process_id}/execution', methods=['POST'])
@@ -368,55 +364,47 @@ async def execute_process_jobs(request: Request, process_id):
return get_response(api_.execute_process(request, process_id))
@app.route('/processes/{process_id}/jobs/{job_id}/results', methods=['GET'])
@app.route('/processes/{process_id}/jobs/{job_id}/results/', methods=['GET'])
async def get_process_job_result(request: Request, process_id=None,
job_id=None):
@app.route('/jobs/{job_id}/results', methods=['GET'])
@app.route('/jobs/{job_id}/results/', methods=['GET'])
async def get_job_result(request: Request, job_id=None):
"""
OGC API - Processes job result endpoint
:param request: Starlette Request instance
:param process_id: process identifier
:param job_id: job identifier
:returns: HTTP response
"""
if 'process_id' in request.path_params:
process_id = request.path_params['process_id']
if 'job_id' in request.path_params:
job_id = request.path_params['job_id']
return get_response(api_.get_process_job_result(
request, process_id, job_id))
return get_response(api_.get_job_result(request, job_id))
@app.route('/processes/{process_id}/jobs/{job_id}/results/{resource}',
@app.route('/jobs/{job_id}/results/{resource}',
methods=['GET'])
@app.route('/processes/{process_id}/jobs/{job_id}/results/{resource}/',
@app.route('/jobs/{job_id}/results/{resource}/',
methods=['GET'])
async def get_process_job_result_resource(request: Request, process_id=None,
job_id=None, resource=None):
async def get_job_result_resource(request: Request,
job_id=None, resource=None):
"""
OGC API - Processes job result resource endpoint
:param request: Starlette Request instance
:param process_id: process identifier
:param job_id: job identifier
:param resource: job resource
:returns: HTTP response
"""
if 'process_id' in request.path_params:
process_id = request.path_params['process_id']
if 'job_id' in request.path_params:
job_id = request.path_params['job_id']
if 'resource' in request.path_params:
resource = request.path_params['resource']
return get_response(api_.get_process_job_result_resource(
request, process_id, job_id, resource))
return get_response(api_.get_job_result_resource(
request, job_id, resource))
@app.route('/collections/{collection_id}/position')
@@ -1,8 +1,6 @@
{% extends "_base.html" %}
{% block title %}{{ super() }} {% trans %}Jobs{% endtrans %} {% endblock %}
{% block crumbs %}{{ super() }}
/ <a href="../../processes">{% trans %}Processes{% endtrans %}</a>
/ <a href="../{{ data.process.id }}">{{ data.process.title }}</a>
/ <a href="./jobs">{% trans %}Jobs{% endtrans %}</a>
{% endblock %}
{% block body %}
@@ -13,7 +11,8 @@
<caption>{% trans %}Jobs{% endtrans %}</caption>
<thead>
<tr>
<th>{% trans %}ID{% endtrans %}</th>
<th>{% trans %}Job ID{% endtrans %}</th>
<th>{% trans %}Process ID{% endtrans %}</th>
<th>{% trans %}Start{% endtrans %}</th>
<th>{% trans %}Duration{% endtrans %}</th>
<th>{% trans %}Progress{% endtrans %}</th>
@@ -22,9 +21,10 @@
</tr>
</thead>
<tbody>
{% for job in data.jobs %}
{% for job in data.jobs.jobs %}
<tr>
<td class="small"><a href="{{ config['server']['url'] }}/processes/{{data.process.id}}/jobs/{{ job.jobID}}">{{ job.jobID }}</a></td>
<td class="small"><a href="{{ config['server']['url'] }}/jobs/{{ job.jobID}}">{{ job.jobID }}</a></td>
<td class="small"><a href="{{ config['server']['url'] }}/processes/{{ job.processID}}">{{ job.processID }}</a></td>
<td><abbr title="{{ job.job_start_datetime|format_datetime }}">{{ job.job_start_datetime|format_datetime }}</abbr></td>
<td>
{% if job.status == 'running' %}
@@ -1,8 +1,6 @@
{% extends "_base.html" %}
{% block title %}{{ super() }} {% trans %}Job status{% endtrans %} {% endblock %}
{% block crumbs %}{{ super() }}
/ <a href="../../">{% trans %}Processes{% endtrans %}</a>
/ <a href="../">{{ data['process']['title'] }}</a>
/ <a href="../jobs">{% trans %}Jobs{% endtrans %}</a>
/ <a href="./{{ data['jobs']['jobID'] }}">{{ data['jobs']['jobID'] }}</a>
{% endblock %}
@@ -1,8 +1,6 @@
{% extends "_base.html" %}
{% block title %}{{ super() }} {% trans %}Job result{% endtrans %} {% endblock %}
{% block crumbs %}{{ super() }}
/ <a href="../../../../processes">{% trans %}Processes{% endtrans %}</a>
/ <a href="../../../{{ data.process.id }}">{{ data.process.title }}</a>
/ <a href="../../jobs">{% trans %}Jobs{% endtrans %}</a>
/ <a href="../{{ data.job.id }}">{{ data.job.id }}</a>
/ <a href="./results">{% trans %}Results{% endtrans %}</a>
+6
View File
@@ -64,6 +64,12 @@
<a href="{{ config['server']['url'] }}/processes?f=html">{% trans %}View the processes in this service{% endtrans %}</a>
</p>
</section>
<section id="jobs">
<h2>{% trans %}Jobs{% endtrans %}</h2>
<p>
<a title="Browse jobs" href="{{config.server.url}}/jobs">{% trans %}Browse jobs{% endtrans %}</a>
</p>
</section>
{% endif %}
<section id="openapi">
<h2>{% trans %}API Definition{% endtrans %}</h2>
+1 -1
View File
@@ -74,7 +74,7 @@
{% if 'async-execute' in data.jobControlOptions %}<li>{% trans %}Asynchronous{% endtrans %}</li>{% endif %}
</ul>
<h2>{% trans %}Jobs{% endtrans %}</h2>
<a title="Browse jobs" href="{{config.server.url}}/processes/{{data.id}}/jobs">{% trans %}Browse jobs{% endtrans %}</a>
<a title="Browse jobs" href="{{config.server.url}}/jobs">{% trans %}Browse jobs{% endtrans %}</a>
<h2>{% trans %}Links{% endtrans %}</h2>
<ul>
{% for link in data['links'] %}
+10 -16
View File
@@ -2,7 +2,7 @@
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
#
# Copyright (c) 2021 Tom Kralidis
# Copyright (c) 2022 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
@@ -363,7 +363,7 @@ def test_root(config, api_):
for link in root['links'])
assert any(link['href'].endswith('f=html') and link['rel'] == 'alternate'
for link in root['links'])
assert len(root['links']) == 7
assert len(root['links']) == 9
assert 'title' in root
assert root['title'] == 'pygeoapi default instance'
assert 'description' in root
@@ -1348,15 +1348,13 @@ def test_execute_process(config, api_):
# Cleanup
time.sleep(2) # Allow time for any outstanding async jobs
for process_id, job_id in cleanup_jobs:
rsp_headers, code, response = api_.delete_process_job(
process_id, job_id)
for _, job_id in cleanup_jobs:
rsp_headers, code, response = api_.delete_job(job_id)
assert code == 200
def test_delete_process_job(api_):
rsp_headers, code, response = api_.delete_process_job(
'does-not-exist', 'does-not-exist')
def test_delete_job(api_):
rsp_headers, code, response = api_.delete_job('does-not-exist')
assert code == 404
@@ -1383,13 +1381,11 @@ def test_delete_process_job(api_):
assert data['value'] == 'Hello Sync Test Deletion!'
job_id = rsp_headers['Location'].split('/')[-1]
rsp_headers, code, response = api_.delete_process_job(
'hello-world', job_id)
rsp_headers, code, response = api_.delete_job(job_id)
assert code == 200
rsp_headers, code, response = api_.delete_process_job(
'hello-world', job_id)
rsp_headers, code, response = api_.delete_job(job_id)
assert code == 404
req = mock_request(data=req_body_async)
@@ -1401,12 +1397,10 @@ def test_delete_process_job(api_):
time.sleep(2) # Allow time for async execution to complete
job_id = rsp_headers['Location'].split('/')[-1]
rsp_headers, code, response = api_.delete_process_job(
'hello-world', job_id)
rsp_headers, code, response = api_.delete_job(job_id)
assert code == 200
rsp_headers, code, response = api_.delete_process_job(
'hello-world', job_id)
rsp_headers, code, response = api_.delete_job(job_id)
assert code == 404