Jobs pagination (#1779)
* Add pagination for job list Adds limit and offset parameter to `get_jobs`. Process manager `get_jobs` now also returns the number of matched jobs additionally to the jobs themselves so we can calculate whether we need a next link. Note that this is a breaking change. * Add pagination support to jobs UI This works exactly the same way as for itemtypes * Add note regarding job sorting * Formatting fixes
This commit is contained in:
committed by
GitHub
parent
0677c2e646
commit
deb043f928
@@ -46,6 +46,7 @@ from http import HTTPStatus
|
||||
import json
|
||||
import logging
|
||||
from typing import Tuple
|
||||
import urllib.parse
|
||||
|
||||
from pygeoapi import l10n
|
||||
from pygeoapi.util import (
|
||||
@@ -240,10 +241,51 @@ def get_jobs(api: API, request: APIRequest,
|
||||
|
||||
headers = request.get_response_headers(SYSTEM_LOCALE,
|
||||
**api.api_headers)
|
||||
LOGGER.debug('Processing limit parameter')
|
||||
try:
|
||||
limit = int(request.params.get('limit'))
|
||||
|
||||
if limit <= 0:
|
||||
msg = 'limit value should be strictly positive'
|
||||
return api.get_exception(
|
||||
HTTPStatus.BAD_REQUEST, headers, request.format,
|
||||
'InvalidParameterValue', msg)
|
||||
except TypeError:
|
||||
limit = int(api.config['server']['limit'])
|
||||
LOGGER.debug('returning all jobs')
|
||||
except ValueError:
|
||||
msg = 'limit value should be an integer'
|
||||
return api.get_exception(
|
||||
HTTPStatus.BAD_REQUEST, headers, request.format,
|
||||
'InvalidParameterValue', msg)
|
||||
|
||||
LOGGER.debug('Processing offset parameter')
|
||||
try:
|
||||
offset = int(request.params.get('offset'))
|
||||
if offset < 0:
|
||||
msg = 'offset value should be positive or zero'
|
||||
return api.get_exception(
|
||||
HTTPStatus.BAD_REQUEST, headers, request.format,
|
||||
'InvalidParameterValue', msg)
|
||||
except TypeError as err:
|
||||
LOGGER.warning(err)
|
||||
offset = 0
|
||||
except ValueError:
|
||||
msg = 'offset value should be an integer'
|
||||
return api.get_exception(
|
||||
HTTPStatus.BAD_REQUEST, headers, request.format,
|
||||
'InvalidParameterValue', msg)
|
||||
|
||||
if job_id is None:
|
||||
jobs = sorted(api.manager.get_jobs(),
|
||||
jobs_data = api.manager.get_jobs(limit=limit, offset=offset)
|
||||
# TODO: For pagination to work, the provider has to do the sorting.
|
||||
# Here we do sort again in case the provider doesn't support
|
||||
# pagination yet and always returns all jobs.
|
||||
jobs = sorted(jobs_data['jobs'],
|
||||
key=lambda k: k['job_start_datetime'],
|
||||
reverse=True)
|
||||
numberMatched = jobs_data['numberMatched']
|
||||
|
||||
else:
|
||||
try:
|
||||
jobs = [api.manager.get_job(job_id)]
|
||||
@@ -251,6 +293,7 @@ def get_jobs(api: API, request: APIRequest,
|
||||
return api.get_exception(
|
||||
HTTPStatus.NOT_FOUND, headers, request.format,
|
||||
'InvalidParameterValue', job_id)
|
||||
numberMatched = 1
|
||||
|
||||
serialized_jobs = {
|
||||
'jobs': [],
|
||||
@@ -309,6 +352,44 @@ def get_jobs(api: API, request: APIRequest,
|
||||
|
||||
serialized_jobs['jobs'].append(job2)
|
||||
|
||||
serialized_query_params = ''
|
||||
for k, v in request.params.items():
|
||||
if k not in ('f', 'offset'):
|
||||
serialized_query_params += '&'
|
||||
serialized_query_params += urllib.parse.quote(k, safe='')
|
||||
serialized_query_params += '='
|
||||
serialized_query_params += urllib.parse.quote(str(v), safe=',')
|
||||
|
||||
uri = f'{api.base_url}/jobs'
|
||||
|
||||
if offset > 0:
|
||||
prev = max(0, offset - limit)
|
||||
serialized_jobs['links'].append(
|
||||
{
|
||||
'href': f'{uri}?offset={prev}{serialized_query_params}',
|
||||
'type': FORMAT_TYPES[F_JSON],
|
||||
'rel': 'prev',
|
||||
'title': l10n.translate('Items (prev)', request.locale),
|
||||
})
|
||||
|
||||
next_link = False
|
||||
|
||||
if numberMatched > (limit + offset):
|
||||
next_link = True
|
||||
elif len(jobs) == limit:
|
||||
next_link = True
|
||||
|
||||
if next_link:
|
||||
next_ = offset + limit
|
||||
next_href = f'{uri}?offset={next_}{serialized_query_params}'
|
||||
serialized_jobs['links'].append(
|
||||
{
|
||||
'href': next_href,
|
||||
'rel': 'next',
|
||||
'type': FORMAT_TYPES[F_JSON],
|
||||
'title': l10n.translate('Items (next)', request.locale),
|
||||
})
|
||||
|
||||
if job_id is None:
|
||||
j2_template = 'jobs/index.html'
|
||||
else:
|
||||
@@ -318,6 +399,7 @@ def get_jobs(api: API, request: APIRequest,
|
||||
if request.format == F_HTML:
|
||||
data = {
|
||||
'jobs': serialized_jobs,
|
||||
'offset': offset,
|
||||
'now': datetime.now(timezone.utc).strftime(DATETIME_FORMAT)
|
||||
}
|
||||
response = render_j2_template(api.tpl_config, j2_template, data,
|
||||
|
||||
@@ -108,14 +108,21 @@ class BaseManager:
|
||||
else:
|
||||
return load_plugin('process', process_conf['processor'])
|
||||
|
||||
def get_jobs(self, status: JobStatus = None) -> list:
|
||||
def get_jobs(self,
|
||||
status: JobStatus = None,
|
||||
limit: Optional[int] = None,
|
||||
offset: Optional[int] = None
|
||||
) -> dict:
|
||||
"""
|
||||
Get process jobs, optionally filtered by status
|
||||
|
||||
:param status: job status (accepted, running, successful,
|
||||
failed, results) (default is all)
|
||||
:param limit: number of jobs to return
|
||||
:param offset: pagination offset
|
||||
|
||||
:returns: `list` of jobs (identifier, status, process identifier)
|
||||
:returns: dict of list of jobs (identifier, status, process identifier)
|
||||
and numberMatched
|
||||
"""
|
||||
|
||||
raise NotImplementedError()
|
||||
|
||||
@@ -56,17 +56,21 @@ class DummyManager(BaseManager):
|
||||
|
||||
super().__init__(manager_def)
|
||||
|
||||
def get_jobs(self, status: JobStatus = None) -> list:
|
||||
def get_jobs(self, status: JobStatus = None, limit=None, offset=None
|
||||
) -> dict:
|
||||
"""
|
||||
Get process jobs, optionally filtered by status
|
||||
|
||||
:param status: job status (accepted, running, successful,
|
||||
failed, results) (default is all)
|
||||
:param limit: number of jobs to return
|
||||
:param offset: pagination offset
|
||||
|
||||
:returns: `list` of jobs (identifier, status, process identifier)
|
||||
:returns: dict of list of jobs (identifier, status, process identifier)
|
||||
and numberMatched
|
||||
"""
|
||||
|
||||
return []
|
||||
return {'jobs': [], 'numberMatched': 0}
|
||||
|
||||
def execute_process(
|
||||
self,
|
||||
|
||||
@@ -70,7 +70,7 @@ class MongoDBManager(BaseManager):
|
||||
exc_info=(traceback))
|
||||
return False
|
||||
|
||||
def get_jobs(self, status=None):
|
||||
def get_jobs(self, status=None, limit=None, offset=None):
|
||||
try:
|
||||
self._connect()
|
||||
database = self.db.job_manager_pygeoapi
|
||||
@@ -80,7 +80,10 @@ class MongoDBManager(BaseManager):
|
||||
else:
|
||||
jobs = list(collection.find({}))
|
||||
LOGGER.info("JOBMANAGER - MongoDB jobs queried")
|
||||
return jobs
|
||||
return {
|
||||
'jobs': jobs,
|
||||
'numberMatched': len(jobs)
|
||||
}
|
||||
except Exception:
|
||||
LOGGER.error("JOBMANAGER - get_jobs error",
|
||||
exc_info=(traceback))
|
||||
|
||||
@@ -116,16 +116,18 @@ class PostgreSQLManager(BaseManager):
|
||||
LOGGER.error(f'{msg}: {err}')
|
||||
raise ProcessorGenericError(msg)
|
||||
|
||||
def get_jobs(self, status: JobStatus = None) -> list:
|
||||
def get_jobs(self, status: JobStatus = None, limit=None, offset=None
|
||||
) -> dict:
|
||||
"""
|
||||
Get jobs
|
||||
|
||||
:param status: job status (accepted, running, successful,
|
||||
failed, results) (default is all)
|
||||
:param limit: number of jobs to return
|
||||
:param offset: pagination offset
|
||||
|
||||
:returns: 'list` of jobs (type (default='process'), identifier,
|
||||
status, process_id, job_start_datetime, job_end_datetime, location,
|
||||
mimetype, message, progress)
|
||||
:returns: dict of list of jobs (identifier, status, process identifier)
|
||||
and numberMatched
|
||||
"""
|
||||
|
||||
LOGGER.debug('Querying for jobs')
|
||||
@@ -135,7 +137,11 @@ class PostgreSQLManager(BaseManager):
|
||||
column = getattr(self.table_model, 'status')
|
||||
results = results.filter(column == status.value)
|
||||
|
||||
return [r.__dict__ for r in results.all()]
|
||||
jobs = [r.__dict__ for r in results.all()]
|
||||
return {
|
||||
'jobs': jobs,
|
||||
'numberMatched': len(jobs)
|
||||
}
|
||||
|
||||
def add_job(self, job_metadata: dict) -> str:
|
||||
"""
|
||||
|
||||
@@ -82,20 +82,35 @@ class TinyDBManager(BaseManager):
|
||||
|
||||
return True
|
||||
|
||||
def get_jobs(self, status: JobStatus = None) -> list:
|
||||
def get_jobs(self, status: JobStatus = None, limit=None, offset=None
|
||||
) -> dict:
|
||||
"""
|
||||
Get jobs
|
||||
|
||||
:param status: job status (accepted, running, successful,
|
||||
failed, results) (default is all)
|
||||
:param limit: number of jobs to return
|
||||
:param offset: pagination offset
|
||||
|
||||
:returns: 'list` of jobs (identifier, status, process identifier)
|
||||
:returns: dict of list of jobs (identifier, status, process identifier)
|
||||
and numberMatched
|
||||
"""
|
||||
|
||||
with self._db() as db:
|
||||
jobs_list = db.all()
|
||||
|
||||
return jobs_list
|
||||
number_matched = len(jobs_list)
|
||||
|
||||
if offset:
|
||||
jobs_list = jobs_list[offset:]
|
||||
|
||||
if limit:
|
||||
jobs_list = jobs_list[:limit]
|
||||
|
||||
return {
|
||||
'jobs': jobs_list,
|
||||
'numberMatched': number_matched
|
||||
}
|
||||
|
||||
def add_job(self, job_metadata: dict) -> str:
|
||||
"""
|
||||
|
||||
@@ -48,5 +48,38 @@
|
||||
</table>
|
||||
</div>
|
||||
</div>
|
||||
<div class="row">
|
||||
<div class="col-sm-12">
|
||||
{% trans %}Limit{% endtrans %}:
|
||||
<select id="limits">
|
||||
<option value="{{ config['server']['limit'] }}">{{ config['server']['limit'] }} ({% trans %}default{% endtrans %})</option>
|
||||
<option value="100">100</option>
|
||||
<option value="1000">1,000</option>
|
||||
<option value="2000">2,000</option>
|
||||
</select>
|
||||
<script>
|
||||
var select = document.getElementById('limits');
|
||||
var defaultValue = select.getElementsByTagName('option')[0].value;
|
||||
let params = (new URL(document.location)).searchParams;
|
||||
select.value = params.get('limit') || defaultValue;
|
||||
select.addEventListener('change', ev => {
|
||||
var limit = ev.target.value;
|
||||
document.location.search = `limit=${limit}`;
|
||||
});
|
||||
</script>
|
||||
</div>
|
||||
</div>
|
||||
<div class="row">
|
||||
<div class="col-sm-12">
|
||||
{% for link in data['jobs']['links'] %}
|
||||
{% if link['rel'] == 'prev' and data['offset'] > 0 %}
|
||||
<a role="button" href="{{ link['href'] }}">{% trans %}Prev{% endtrans %}</a>
|
||||
{% elif link['rel'] == 'next' and data['jobs']['jobs'] %}
|
||||
<a role="button" href="{{ link['href'] }}">{% trans %}Next{% endtrans %}</a>
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
</section>
|
||||
{% endblock %}
|
||||
|
||||
@@ -39,7 +39,7 @@ from unittest import mock
|
||||
|
||||
from pygeoapi.api import FORMAT_TYPES, F_HTML, F_JSON
|
||||
from pygeoapi.api.processes import (
|
||||
describe_processes, execute_process, delete_job, get_job_result,
|
||||
describe_processes, execute_process, delete_job, get_job_result, get_jobs
|
||||
)
|
||||
|
||||
from tests.util import mock_api_request
|
||||
@@ -442,4 +442,51 @@ def test_get_job_result(api_):
|
||||
)
|
||||
assert code == HTTPStatus.OK
|
||||
assert rsp_headers['Content-Type'] == 'application/json'
|
||||
assert json.loads(response)['value'] == "Hello Sync Test!"
|
||||
assert json.loads(response)['value'] == 'Hello Sync Test!'
|
||||
|
||||
|
||||
def test_get_jobs_single(api_):
|
||||
job_id = _execute_a_job(api_)
|
||||
headers, code, response = get_jobs(api_, mock_api_request(), job_id=job_id)
|
||||
assert code == HTTPStatus.OK
|
||||
|
||||
job = json.loads(response)
|
||||
assert job['jobID'] == job_id
|
||||
assert job['status'] == 'successful'
|
||||
|
||||
|
||||
def test_get_jobs_pagination(api_):
|
||||
# generate test jobs for querying
|
||||
for _ in range(11):
|
||||
_execute_a_job(api_)
|
||||
|
||||
# test default pagination limit
|
||||
headers, code, response = get_jobs(api_, mock_api_request(), job_id=None)
|
||||
job_response = json.loads(response)
|
||||
assert len(job_response['jobs']) == 10
|
||||
assert next(
|
||||
link for link in job_response['links'] if link['rel'] == 'next'
|
||||
)['href'].endswith('/jobs?offset=10')
|
||||
|
||||
headers, code, response = get_jobs(
|
||||
api_,
|
||||
mock_api_request({'limit': 10, 'offset': 9}),
|
||||
job_id=None)
|
||||
job_response_offset = json.loads(response)
|
||||
# check to get 1 same job id with an offset of 9 and limit of 10
|
||||
same_job_ids = {job['jobID'] for job in job_response['jobs']}.intersection(
|
||||
{job['jobID'] for job in job_response_offset['jobs']}
|
||||
)
|
||||
assert len(same_job_ids) == 1
|
||||
assert next(
|
||||
link for link in job_response_offset['links'] if link['rel'] == 'prev'
|
||||
)['href'].endswith('/jobs?offset=0&limit=10')
|
||||
|
||||
# test custom limit
|
||||
headers, code, response = get_jobs(
|
||||
api_,
|
||||
mock_api_request({'limit': 20}),
|
||||
job_id=None)
|
||||
job_response = json.loads(response)
|
||||
# might be more than 11 due to test interaction
|
||||
assert len(job_response['jobs']) > 10
|
||||
|
||||
Reference in New Issue
Block a user