diff --git a/pygeoapi/api/processes.py b/pygeoapi/api/processes.py index e95d90a..b2a5777 100644 --- a/pygeoapi/api/processes.py +++ b/pygeoapi/api/processes.py @@ -46,6 +46,7 @@ from http import HTTPStatus import json import logging from typing import Tuple +import urllib.parse from pygeoapi import l10n from pygeoapi.util import ( @@ -240,10 +241,51 @@ def get_jobs(api: API, request: APIRequest, headers = request.get_response_headers(SYSTEM_LOCALE, **api.api_headers) + LOGGER.debug('Processing limit parameter') + try: + limit = int(request.params.get('limit')) + + if limit <= 0: + msg = 'limit value should be strictly positive' + return api.get_exception( + HTTPStatus.BAD_REQUEST, headers, request.format, + 'InvalidParameterValue', msg) + except TypeError: + limit = int(api.config['server']['limit']) + LOGGER.debug('returning all jobs') + except ValueError: + msg = 'limit value should be an integer' + return api.get_exception( + HTTPStatus.BAD_REQUEST, headers, request.format, + 'InvalidParameterValue', msg) + + LOGGER.debug('Processing offset parameter') + try: + offset = int(request.params.get('offset')) + if offset < 0: + msg = 'offset value should be positive or zero' + return api.get_exception( + HTTPStatus.BAD_REQUEST, headers, request.format, + 'InvalidParameterValue', msg) + except TypeError as err: + LOGGER.warning(err) + offset = 0 + except ValueError: + msg = 'offset value should be an integer' + return api.get_exception( + HTTPStatus.BAD_REQUEST, headers, request.format, + 'InvalidParameterValue', msg) + if job_id is None: - jobs = sorted(api.manager.get_jobs(), + jobs_data = api.manager.get_jobs(limit=limit, offset=offset) + # TODO: For pagination to work, the provider has to do the sorting. + # Here we do sort again in case the provider doesn't support + # pagination yet and always returns all jobs. + jobs = sorted(jobs_data['jobs'], key=lambda k: k['job_start_datetime'], reverse=True) + numberMatched = jobs_data['numberMatched'] + else: try: jobs = [api.manager.get_job(job_id)] @@ -251,6 +293,7 @@ def get_jobs(api: API, request: APIRequest, return api.get_exception( HTTPStatus.NOT_FOUND, headers, request.format, 'InvalidParameterValue', job_id) + numberMatched = 1 serialized_jobs = { 'jobs': [], @@ -309,6 +352,44 @@ def get_jobs(api: API, request: APIRequest, serialized_jobs['jobs'].append(job2) + serialized_query_params = '' + for k, v in request.params.items(): + if k not in ('f', 'offset'): + serialized_query_params += '&' + serialized_query_params += urllib.parse.quote(k, safe='') + serialized_query_params += '=' + serialized_query_params += urllib.parse.quote(str(v), safe=',') + + uri = f'{api.base_url}/jobs' + + if offset > 0: + prev = max(0, offset - limit) + serialized_jobs['links'].append( + { + 'href': f'{uri}?offset={prev}{serialized_query_params}', + 'type': FORMAT_TYPES[F_JSON], + 'rel': 'prev', + 'title': l10n.translate('Items (prev)', request.locale), + }) + + next_link = False + + if numberMatched > (limit + offset): + next_link = True + elif len(jobs) == limit: + next_link = True + + if next_link: + next_ = offset + limit + next_href = f'{uri}?offset={next_}{serialized_query_params}' + serialized_jobs['links'].append( + { + 'href': next_href, + 'rel': 'next', + 'type': FORMAT_TYPES[F_JSON], + 'title': l10n.translate('Items (next)', request.locale), + }) + if job_id is None: j2_template = 'jobs/index.html' else: @@ -318,6 +399,7 @@ def get_jobs(api: API, request: APIRequest, if request.format == F_HTML: data = { 'jobs': serialized_jobs, + 'offset': offset, 'now': datetime.now(timezone.utc).strftime(DATETIME_FORMAT) } response = render_j2_template(api.tpl_config, j2_template, data, diff --git a/pygeoapi/process/manager/base.py b/pygeoapi/process/manager/base.py index f0d3148..df5c41a 100644 --- a/pygeoapi/process/manager/base.py +++ b/pygeoapi/process/manager/base.py @@ -108,14 +108,21 @@ class BaseManager: else: return load_plugin('process', process_conf['processor']) - def get_jobs(self, status: JobStatus = None) -> list: + def get_jobs(self, + status: JobStatus = None, + limit: Optional[int] = None, + offset: Optional[int] = None + ) -> dict: """ Get process jobs, optionally filtered by status :param status: job status (accepted, running, successful, failed, results) (default is all) + :param limit: number of jobs to return + :param offset: pagination offset - :returns: `list` of jobs (identifier, status, process identifier) + :returns: dict of list of jobs (identifier, status, process identifier) + and numberMatched """ raise NotImplementedError() diff --git a/pygeoapi/process/manager/dummy.py b/pygeoapi/process/manager/dummy.py index 4a9866b..7c3a703 100644 --- a/pygeoapi/process/manager/dummy.py +++ b/pygeoapi/process/manager/dummy.py @@ -56,17 +56,21 @@ class DummyManager(BaseManager): super().__init__(manager_def) - def get_jobs(self, status: JobStatus = None) -> list: + def get_jobs(self, status: JobStatus = None, limit=None, offset=None + ) -> dict: """ Get process jobs, optionally filtered by status :param status: job status (accepted, running, successful, failed, results) (default is all) + :param limit: number of jobs to return + :param offset: pagination offset - :returns: `list` of jobs (identifier, status, process identifier) + :returns: dict of list of jobs (identifier, status, process identifier) + and numberMatched """ - return [] + return {'jobs': [], 'numberMatched': 0} def execute_process( self, diff --git a/pygeoapi/process/manager/mongodb_.py b/pygeoapi/process/manager/mongodb_.py index 66886f9..2b64806 100644 --- a/pygeoapi/process/manager/mongodb_.py +++ b/pygeoapi/process/manager/mongodb_.py @@ -70,7 +70,7 @@ class MongoDBManager(BaseManager): exc_info=(traceback)) return False - def get_jobs(self, status=None): + def get_jobs(self, status=None, limit=None, offset=None): try: self._connect() database = self.db.job_manager_pygeoapi @@ -80,7 +80,10 @@ class MongoDBManager(BaseManager): else: jobs = list(collection.find({})) LOGGER.info("JOBMANAGER - MongoDB jobs queried") - return jobs + return { + 'jobs': jobs, + 'numberMatched': len(jobs) + } except Exception: LOGGER.error("JOBMANAGER - get_jobs error", exc_info=(traceback)) diff --git a/pygeoapi/process/manager/postgresql.py b/pygeoapi/process/manager/postgresql.py index 30b7b8d..72f3b75 100644 --- a/pygeoapi/process/manager/postgresql.py +++ b/pygeoapi/process/manager/postgresql.py @@ -116,16 +116,18 @@ class PostgreSQLManager(BaseManager): LOGGER.error(f'{msg}: {err}') raise ProcessorGenericError(msg) - def get_jobs(self, status: JobStatus = None) -> list: + def get_jobs(self, status: JobStatus = None, limit=None, offset=None + ) -> dict: """ Get jobs :param status: job status (accepted, running, successful, failed, results) (default is all) + :param limit: number of jobs to return + :param offset: pagination offset - :returns: 'list` of jobs (type (default='process'), identifier, - status, process_id, job_start_datetime, job_end_datetime, location, - mimetype, message, progress) + :returns: dict of list of jobs (identifier, status, process identifier) + and numberMatched """ LOGGER.debug('Querying for jobs') @@ -135,7 +137,11 @@ class PostgreSQLManager(BaseManager): column = getattr(self.table_model, 'status') results = results.filter(column == status.value) - return [r.__dict__ for r in results.all()] + jobs = [r.__dict__ for r in results.all()] + return { + 'jobs': jobs, + 'numberMatched': len(jobs) + } def add_job(self, job_metadata: dict) -> str: """ diff --git a/pygeoapi/process/manager/tinydb_.py b/pygeoapi/process/manager/tinydb_.py index 3966e9d..2f022a3 100644 --- a/pygeoapi/process/manager/tinydb_.py +++ b/pygeoapi/process/manager/tinydb_.py @@ -82,20 +82,35 @@ class TinyDBManager(BaseManager): return True - def get_jobs(self, status: JobStatus = None) -> list: + def get_jobs(self, status: JobStatus = None, limit=None, offset=None + ) -> dict: """ Get jobs :param status: job status (accepted, running, successful, failed, results) (default is all) + :param limit: number of jobs to return + :param offset: pagination offset - :returns: 'list` of jobs (identifier, status, process identifier) + :returns: dict of list of jobs (identifier, status, process identifier) + and numberMatched """ with self._db() as db: jobs_list = db.all() - return jobs_list + number_matched = len(jobs_list) + + if offset: + jobs_list = jobs_list[offset:] + + if limit: + jobs_list = jobs_list[:limit] + + return { + 'jobs': jobs_list, + 'numberMatched': number_matched + } def add_job(self, job_metadata: dict) -> str: """ diff --git a/pygeoapi/templates/jobs/index.html b/pygeoapi/templates/jobs/index.html index 05c12d9..1c19035 100644 --- a/pygeoapi/templates/jobs/index.html +++ b/pygeoapi/templates/jobs/index.html @@ -48,5 +48,38 @@ +
+
+ {% trans %}Limit{% endtrans %}: + + +
+
+
+
+ {% for link in data['jobs']['links'] %} + {% if link['rel'] == 'prev' and data['offset'] > 0 %} + {% trans %}Prev{% endtrans %} + {% elif link['rel'] == 'next' and data['jobs']['jobs'] %} + {% trans %}Next{% endtrans %} + {% endif %} + {% endfor %} +
+
+ {% endblock %} diff --git a/tests/api/test_processes.py b/tests/api/test_processes.py index ee569c4..62256e4 100644 --- a/tests/api/test_processes.py +++ b/tests/api/test_processes.py @@ -39,7 +39,7 @@ from unittest import mock from pygeoapi.api import FORMAT_TYPES, F_HTML, F_JSON from pygeoapi.api.processes import ( - describe_processes, execute_process, delete_job, get_job_result, + describe_processes, execute_process, delete_job, get_job_result, get_jobs ) from tests.util import mock_api_request @@ -442,4 +442,51 @@ def test_get_job_result(api_): ) assert code == HTTPStatus.OK assert rsp_headers['Content-Type'] == 'application/json' - assert json.loads(response)['value'] == "Hello Sync Test!" + assert json.loads(response)['value'] == 'Hello Sync Test!' + + +def test_get_jobs_single(api_): + job_id = _execute_a_job(api_) + headers, code, response = get_jobs(api_, mock_api_request(), job_id=job_id) + assert code == HTTPStatus.OK + + job = json.loads(response) + assert job['jobID'] == job_id + assert job['status'] == 'successful' + + +def test_get_jobs_pagination(api_): + # generate test jobs for querying + for _ in range(11): + _execute_a_job(api_) + + # test default pagination limit + headers, code, response = get_jobs(api_, mock_api_request(), job_id=None) + job_response = json.loads(response) + assert len(job_response['jobs']) == 10 + assert next( + link for link in job_response['links'] if link['rel'] == 'next' + )['href'].endswith('/jobs?offset=10') + + headers, code, response = get_jobs( + api_, + mock_api_request({'limit': 10, 'offset': 9}), + job_id=None) + job_response_offset = json.loads(response) + # check to get 1 same job id with an offset of 9 and limit of 10 + same_job_ids = {job['jobID'] for job in job_response['jobs']}.intersection( + {job['jobID'] for job in job_response_offset['jobs']} + ) + assert len(same_job_ids) == 1 + assert next( + link for link in job_response_offset['links'] if link['rel'] == 'prev' + )['href'].endswith('/jobs?offset=0&limit=10') + + # test custom limit + headers, code, response = get_jobs( + api_, + mock_api_request({'limit': 20}), + job_id=None) + job_response = json.loads(response) + # might be more than 11 due to test interaction + assert len(job_response['jobs']) > 10