From 6915efbcada0fb2b9652d46ca34edd7ef2000500 Mon Sep 17 00:00:00 2001 From: Tom Kralidis Date: Wed, 30 Dec 2020 09:30:12 -0500 Subject: [PATCH] add links, refactor job handling, HTML updates (#599) * add links, refactor job handling, HTML updates * fix test * misc fixes --- pygeoapi/api.py | 228 ++++++++++++---------------- pygeoapi/flask_app.py | 2 +- pygeoapi/process/manager/base.py | 29 ++-- pygeoapi/process/manager/tinydb_.py | 12 +- pygeoapi/templates/job.html | 41 +++-- pygeoapi/templates/jobs.html | 13 +- tests/test_api.py | 2 +- 7 files changed, 143 insertions(+), 184 deletions(-) diff --git a/pygeoapi/api.py b/pygeoapi/api.py index ab728e7..2b40d28 100644 --- a/pygeoapi/api.py +++ b/pygeoapi/api.py @@ -1903,13 +1903,21 @@ tiles/{{{}}}/{{{}}}/{{{}}}/{{{}}}?f=mvt' link = { 'type': 'text/html', 'rel': 'collection', - 'href': jobs_url, - 'title': 'Collection of jobs for the {} process'.format( - key), + 'href': '{}?f=html'.format(jobs_url), + 'title': 'jobs for this process as HTML', 'hreflang': self.config['server'].get('language', None) } - p2['links'].append(link) + + link = { + 'type': 'application/json', + 'rel': 'collection', + 'href': '{}?f=json'.format(jobs_url), + 'title': 'jobs for this process as JSON', + 'hreflang': self.config['server'].get('language', None) + } + p2['links'].append(link) + processes.append(p2) if process is not None: @@ -1932,13 +1940,14 @@ tiles/{{{}}}/{{{}}}/{{{}}}/{{{}}}?f=mvt' return headers_, 200, to_json(response, self.pretty_print) - def get_process_jobs(self, headers, args, process_id): + def get_process_jobs(self, headers, args, process_id, job_id=None): """ Get process jobs :param headers: dict of HTTP headers :param args: dict of HTTP request parameters :param process_id: id of process + :param job_id: id of job :returns: tuple of headers, status code, content """ @@ -1971,26 +1980,77 @@ tiles/{{{}}}/{{{}}}/{{{}}}/{{{}}}?f=mvt' p = load_plugin('process', processes[process_id]['processor']) if self.manager: - jobs = sorted(self.manager.get_jobs(process_id), - key=lambda k: k['process_start_datetime'], - reverse=True) + if job_id is None: + jobs = sorted(self.manager.get_jobs(process_id), + key=lambda k: k['job_start_datetime'], + reverse=True) + else: + jobs = [self.manager.get_job(process_id, job_id)] else: LOGGER.debug('Process management not configured') jobs = [] + serialized_jobs = [] + for job_ in jobs: + job2 = { + 'jobID': job_['identifier'], + 'status': job_['status'], + 'message': job_['message'], + 'progress': job_['progress'], + 'job_start_datetime': job_['job_start_datetime'], + 'job_end_datetime': job_['job_end_datetime'] + } + + if JobStatus[job_['status']] in [ + JobStatus.successful, JobStatus.running, JobStatus.accepted]: + + job_result_url = '{}/processes/{}/jobs/{}/results'.format( + self.config['server']['url'], + process_id, job_['identifier']) + + job2['links'] = [{ + 'href': '{}?f=html'.format(job_result_url), + 'rel': 'about', + 'type': 'text/html', + 'title': 'results of job {} as HTML'.format(job_id) + }, { + 'href': '{}?f=json'.format(job_result_url), + 'rel': 'about', + 'type': 'application/json', + 'title': 'results of job {} as JSON'.format(job_id) + }] + + if job_['mimetype'] not in ['application/json', 'text/html']: + job2['links'].append({ + 'href': job_result_url, + 'rel': 'about', + 'type': job_['mimetype'], + 'title': 'results of job {} as {}'.format( + job_id, job_['mimetype']) + }) + + serialized_jobs.append(job2) + + if job_id is None: + j2_template = 'jobs.html' + else: + serialized_jobs = serialized_jobs[0] + j2_template = 'job.html' + if format_ == 'html': headers_['Content-Type'] = 'text/html' data = { - 'process': {'id': process_id, 'title': p.metadata['title']}, - 'jobs': jobs, + 'process': { + 'id': process_id, + 'title': p.metadata['title'] + }, + 'jobs': serialized_jobs, 'now': datetime.now(timezone.utc).strftime(DATETIME_FORMAT) } - response = render_j2_template(self.config, 'jobs.html', data) + response = render_j2_template(self.config, j2_template, data) return headers_, 200, response - response = [job['identifier'] for job in jobs] - - return headers_, 200, to_json(response, self.pretty_print) + return headers_, 200, to_json(serialized_jobs, self.pretty_print) def execute_process(self, headers, args, data, process_id): """ @@ -2138,111 +2198,6 @@ tiles/{{{}}}/{{{}}}/{{{}}}/{{{}}}?f=mvt' return headers_, http_status, to_json(response, self.pretty_print) - def get_process_job(self, headers, args, process_id, job_id): - """ - Get status of job (instance of a process) - - :param headers: dict of HTTP headers - :param args: dict of HTTP request parameters - :param process_id: process identifier - :param job_id: job identifier - - :returns: tuple of headers, status code, content - """ - headers_ = HEADERS.copy() - - processes = filter_dict_by_key_value(self.config['resources'], - 'type', 'process') - if process_id not in processes: - exception = { - 'code': 'NoSuchProcess', - 'description': 'identifier not found' - } - LOGGER.info(exception) - return headers_, 404, to_json(exception, self.pretty_print) - - job = self.manager.get_job(process_id, job_id) - - if not job: - exception = { - 'code': 'NoSuchJob', - 'description': 'job not found' - } - LOGGER.info(exception) - return headers_, 404, to_json(exception, self.pretty_print) - - format_ = check_format(args, headers) - if format_ is not None and format_ not in FORMATS: - exception = { - 'code': 'InvalidParameterValue', - 'description': 'Invalid format' - } - LOGGER.error(exception) - return headers_, 400, to_json(exception, self.pretty_print) - - status = JobStatus[job['status']] - response = { - 'jobID': job_id, - 'status': status.value, - 'message': job.get('message', None), - 'links': [{ - 'href': '{}/processes/{}/jobs/{}'.format( - self.config['server']['url'], process_id, job_id - ), - 'rel': 'self', - 'type': 'application/json', - 'title': 'Status of {} job {}'.format(process_id, job_id) - }] - } - - if status in (JobStatus.successful, JobStatus.running, - JobStatus.accepted): - # TODO link also if accepted/running? - response['links'].append({ - 'href': '{}/processes/{}/jobs/{}/results'.format( - self.config['server']['url'], process_id, job_id - ), - 'rel': 'about', - 'type': 'application/json', - 'title': 'Results of {} job {}'.format(process_id, job_id) - }) - elif status == JobStatus.failed: - # TODO link to exception report? - pass - - if format_ != 'html': - return headers_, 200, json.dumps(response, default=json_serial) - else: - headers_['Content-Type'] = 'text/html' - process = load_plugin('process', - processes[process_id]['processor']) - - process_info = { - 'id': process_id, - 'title': process.metadata['title'] - } - - psd = job.get('process_start_datetime', None) - ped = job.get('process_end_datetime', None) - - if status == JobStatus.successful: - progress = 100 - else: - progress = job.get('progress', 0) - - job_info = { - 'process_start_datetime': psd, - 'process_end_datetime': ped, - 'progress': progress, - **response - } - - return headers_, 200, render_j2_template(self.config, 'job.html', { - 'process': process_info, - 'job': job_info, - 'now': datetime.now(timezone.utc).strftime(DATETIME_FORMAT), - }) - def get_process_job_result(self, headers, args, process_id, job_id): """ Get result of job (instance of a process) @@ -2313,25 +2268,28 @@ tiles/{{{}}}/{{{}}}/{{{}}}/{{{}}}?f=mvt' LOGGER.info(exception) return headers_, 400, json.dumps(exception) - job_output = self.manager.get_job_result(process_id, job_id) + mimetype, job_output = self.manager.get_job_result(process_id, job_id) format_ = check_format(args, headers) - if format_ == 'html': - headers_['Content-Type'] = 'text/html' - data = { - 'process': { - 'id': process_id, 'title': process.metadata['title'] - }, - 'job': {'id': job_id}, - 'result': job_output - } - response = render_j2_template(self.config, 'job_result.html', - data) - return headers_, 200, response - - content = json.dumps(job_output, sort_keys=True, indent=4, - default=json_serial) + if mimetype not in [None, 'application/json']: + headers_['Content-Type'] = mimetype + content = job_output + else: + if format_ == 'json': + content = json.dumps(job_output, sort_keys=True, indent=4, + default=json_serial) + else: + headers_['Content-Type'] = 'text/html' + data = { + 'process': { + 'id': process_id, 'title': process.metadata['title'] + }, + 'job': {'id': job_id}, + 'result': job_output + } + content = render_j2_template(self.config, 'job_result.html', + data) return headers_, 200, content diff --git a/pygeoapi/flask_app.py b/pygeoapi/flask_app.py index 4badfa4..27bcb48 100644 --- a/pygeoapi/flask_app.py +++ b/pygeoapi/flask_app.py @@ -404,7 +404,7 @@ def get_process_jobs(process_id=None, job_id=None): headers, status_code, content = api_.delete_process_job( process_id, job_id) else: # Return status of a specific job - headers, status_code, content = api_.get_process_job( + headers, status_code, content = api_.get_process_jobs( request.headers, request.args, process_id, job_id) response = make_response(content, status_code) diff --git a/pygeoapi/process/manager/base.py b/pygeoapi/process/manager/base.py index a3c9881..9a28f45 100644 --- a/pygeoapi/process/manager/base.py +++ b/pygeoapi/process/manager/base.py @@ -112,7 +112,7 @@ class BaseManager: :param process_id: process identifier :param job_id: job identifier - :returns: `str` of raw output or None + :returns: `tuple` of mimetype and raw output """ raise NotImplementedError() @@ -165,23 +165,18 @@ class BaseManager: :returns: tuple of response payload and status """ - if self.output_dir is not None: - filename = '{}-{}'.format(p.metadata['id'], job_id) - job_filename = os.path.join(self.output_dir, filename) - else: - job_filename = 'stdout' - process_id = p.metadata['id'] current_status = JobStatus.accepted job_metadata = { 'identifier': job_id, 'process_id': process_id, - 'process_start_datetime': datetime.utcnow().strftime( + 'job_start_datetime': datetime.utcnow().strftime( DATETIME_FORMAT), - 'process_end_datetime': None, + 'job_end_datetime': None, 'status': current_status.value, 'location': None, + 'mimetype': None, 'message': 'Job accepted and ready for execution', 'progress': 5 } @@ -189,8 +184,17 @@ class BaseManager: self.add_job(job_metadata) try: + if self.output_dir is not None: + filename = '{}-{}'.format(p.metadata['id'], job_id) + job_filename = os.path.join(self.output_dir, filename) + else: + job_filename = None + + jfmt = p.metadata['outputs'][0]['output']['formats'][0]['mimeType'] + current_status = JobStatus.running outputs = p.execute(data_dict) + self.update_job(process_id, job_id, { 'status': current_status.value, 'message': 'Writing job output', @@ -203,11 +207,13 @@ class BaseManager: fh.write(json.dumps(outputs, sort_keys=True, indent=4)) current_status = JobStatus.successful + job_update_metadata = { - 'process_end_datetime': datetime.utcnow().strftime( + 'job_end_datetime': datetime.utcnow().strftime( DATETIME_FORMAT), 'status': current_status.value, 'location': job_filename, + 'mimetype': jfmt, 'message': 'Job complete', 'progress': 100 } @@ -231,10 +237,11 @@ class BaseManager: } LOGGER.error(err) job_metadata = { - 'process_end_datetime': datetime.utcnow().strftime( + 'job_end_datetime': datetime.utcnow().strftime( DATETIME_FORMAT), 'status': current_status.value, 'location': None, + 'mimetype': None, 'message': f'{code}: {outputs["description"]}' } diff --git a/pygeoapi/process/manager/tinydb_.py b/pygeoapi/process/manager/tinydb_.py index 8a65131..9a4f773 100644 --- a/pygeoapi/process/manager/tinydb_.py +++ b/pygeoapi/process/manager/tinydb_.py @@ -179,26 +179,30 @@ class TinyDBManager(BaseManager): :param process_id: process identifier :param jobid: job identifier - :returns: The process output as a `dict` + :returns: `tuple` of mimetype and raw output """ job_result = self.get_job(process_id, job_id) if not job_result: # processs/job does not exist return None + location = job_result.get('location', None) + mimetype = job_result.get('mimetype', None) job_status = JobStatus[job_result['status']] + if not job_status == JobStatus.successful: # Job is incomplete - return None + return (None,) if not location: # Job data was not written for some reason # TODO log/raise exception? - return {} + return (None,) + with io.open(location, 'r', encoding='utf-8') as filehandler: result = json.load(filehandler) - return result + return mimetype, result def __repr__(self): return ' {}'.format(self.name) diff --git a/pygeoapi/templates/job.html b/pygeoapi/templates/job.html index 56c3c36..4f6a1d3 100644 --- a/pygeoapi/templates/job.html +++ b/pygeoapi/templates/job.html @@ -2,9 +2,9 @@ {% block title %}{{ super() }} Job status {% endblock %} {% block crumbs %}{{ super() }} / Processes -/ {{ data.process.title }} +/ {{ data['process']['title'] }} / Jobs -/ {{ data.job.jobID }} +/ {{ data['jobs']['jobID'] }} {% endblock %} {% block body %}
@@ -13,43 +13,38 @@
-
-
-

Status: {{ data['job']['status'] }}

-

Progress: {{ data.job.progress }}%

+
+
+

Status: {{ data['jobs']['status'] }}

+

Progress: {{ data['jobs']['progress'] }}%

Message

-

{{ data.job.message }}

+

{{ data['jobs']['message'] }}

- +

- {% if data.job.status == 'running' %} - {{ data.job.process_start_datetime|format_duration(data.now) }} + {% if data['jobs']['status'] == 'running' %} + {{ data['jobs']['job_start_datetime']|format_duration(data.now) }} {% else %} - {{ data.job.process_start_datetime|format_duration(data.job.process_end_datetime) }} + {{ data['jobs']['job_start_datetime']|format_duration(data['jobs']['job_end_datetime']) }} {% endif %}

-

{{ data.job.process_start_datetime|format_datetime }}

+

{{ data['jobs']['job_start_datetime']|format_datetime }}

-

{{ data.job.process_end_datetime|format_datetime }}

-
-
diff --git a/pygeoapi/templates/jobs.html b/pygeoapi/templates/jobs.html index 1d4fed0..e4ae4b4 100644 --- a/pygeoapi/templates/jobs.html +++ b/pygeoapi/templates/jobs.html @@ -5,9 +5,6 @@ / {{ data.process.title }} / Jobs {% endblock %} -{% block form_js %} - -{% endblock %} {% block body %}
@@ -27,15 +24,13 @@ {% for job in data.jobs %} - {{ job.identifier }} - - {{ job.process_start_datetime|format_datetime }} - + {{ job.jobID }} + {{ job.job_start_datetime|format_datetime }} {% if job.status == 'running' %} - {{ job.process_start_datetime|format_duration(data.now) }} + {{ job.job_start_datetime|format_duration(data.now) }} {% else %} - {{ job.process_start_datetime|format_duration(job.process_end_datetime) }} + {{ job.job_start_datetime|format_duration(job.job_end_datetime) }} {% endif %} diff --git a/tests/test_api.py b/tests/test_api.py index 276c54e..1f69572 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -758,7 +758,7 @@ def test_describe_processes(config, api_): assert process['version'] == '0.2.0' assert process['title'] == 'Hello World' assert len(process['keywords']) == 3 - assert len(process['links']) == 2 + assert len(process['links']) == 3 assert len(process['inputs']) == 2 assert len(process['outputs']) == 1 assert len(process['outputTransmission']) == 1