Files
pygeoapi/tests/api/test_processes.py
Bernhard Mallinger deb043f928 Jobs pagination (#1779)
* Add pagination for job list

Adds limit and offset parameter to `get_jobs`.

Process manager `get_jobs` now also returns the number of matched jobs
additionally to the jobs themselves so we can calculate whether we need
a next link.

Note that this is a breaking change.

* Add pagination support to jobs UI

This works exactly the same way as for itemtypes

* Add note regarding job sorting

* Formatting fixes
2024-09-12 07:37:11 -04:00

493 lines
17 KiB
Python

# =================================================================
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
# John A Stevenson <jostev@bgs.ac.uk>
# Colin Blackburn <colb@bgs.ac.uk>
# Bernhard Mallinger <bernhard.mallinger@eox.at>
#
# Copyright (c) 2024 Tom Kralidis
# Copyright (c) 2022 John A Stevenson and Colin Blackburn
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
import json
from http import HTTPStatus
import time
from unittest import mock
from pygeoapi.api import FORMAT_TYPES, F_HTML, F_JSON
from pygeoapi.api.processes import (
describe_processes, execute_process, delete_job, get_job_result, get_jobs
)
from tests.util import mock_api_request
def test_describe_processes(config, api_):
req = mock_api_request({'limit': 1})
# Test for description of single processes
rsp_headers, code, response = describe_processes(api_, req)
data = json.loads(response)
assert code == HTTPStatus.OK
assert len(data['processes']) == 1
assert len(data['links']) == 3
req = mock_api_request()
# Test for undefined process
rsp_headers, code, response = describe_processes(api_, req, 'foo')
data = json.loads(response)
assert code == HTTPStatus.NOT_FOUND
assert data['code'] == 'NoSuchProcess'
# Test for description of all processes
rsp_headers, code, response = describe_processes(api_, req)
data = json.loads(response)
assert code == HTTPStatus.OK
assert len(data['processes']) == 2
assert len(data['links']) == 3
# Test for particular, defined process
rsp_headers, code, response = describe_processes(api_, req, 'hello-world')
process = json.loads(response)
assert code == HTTPStatus.OK
assert rsp_headers['Content-Type'] == FORMAT_TYPES[F_JSON]
assert process['id'] == 'hello-world'
assert process['version'] == '0.2.0'
assert process['title'] == 'Hello World'
assert len(process['keywords']) == 3
assert len(process['links']) == 6
assert len(process['inputs']) == 2
assert len(process['outputs']) == 1
assert len(process['outputTransmission']) == 1
assert len(process['jobControlOptions']) == 2
assert 'sync-execute' in process['jobControlOptions']
assert 'async-execute' in process['jobControlOptions']
# Check HTML response when requested in headers
req = mock_api_request(HTTP_ACCEPT='text/html')
rsp_headers, code, response = describe_processes(api_, req, 'hello-world')
assert code == HTTPStatus.OK
assert rsp_headers['Content-Type'] == FORMAT_TYPES[F_HTML]
# No language requested: return default from YAML
assert rsp_headers['Content-Language'] == 'en-US'
# Check JSON response when requested in headers
req = mock_api_request(HTTP_ACCEPT='application/json')
rsp_headers, code, response = describe_processes(api_, req, 'hello-world')
assert code == HTTPStatus.OK
assert rsp_headers['Content-Type'] == FORMAT_TYPES[F_JSON]
assert rsp_headers['Content-Language'] == 'en-US'
# Check HTML response when requested with query parameter
req = mock_api_request({'f': 'html'})
rsp_headers, code, response = describe_processes(api_, req, 'hello-world')
assert code == HTTPStatus.OK
assert rsp_headers['Content-Type'] == FORMAT_TYPES[F_HTML]
# No language requested: return default from YAML
assert rsp_headers['Content-Language'] == 'en-US'
# Check JSON response when requested with query parameter
req = mock_api_request({'f': 'json'})
rsp_headers, code, response = describe_processes(api_, req, 'hello-world')
assert code == HTTPStatus.OK
assert rsp_headers['Content-Type'] == FORMAT_TYPES[F_JSON]
assert rsp_headers['Content-Language'] == 'en-US'
# Check JSON response when requested with French language parameter
req = mock_api_request({'lang': 'fr'})
rsp_headers, code, response = describe_processes(api_, req, 'hello-world')
assert code == HTTPStatus.OK
assert rsp_headers['Content-Type'] == FORMAT_TYPES[F_JSON]
assert rsp_headers['Content-Language'] == 'fr-CA'
process = json.loads(response)
assert process['title'] == 'Bonjour le Monde'
# Check JSON response when language requested in headers
req = mock_api_request(HTTP_ACCEPT_LANGUAGE='fr')
rsp_headers, code, response = describe_processes(api_, req, 'hello-world')
assert code == HTTPStatus.OK
assert rsp_headers['Content-Type'] == FORMAT_TYPES[F_JSON]
assert rsp_headers['Content-Language'] == 'fr-CA'
# Test for undefined process
req = mock_api_request()
rsp_headers, code, response = describe_processes(api_, req,
'goodbye-world')
data = json.loads(response)
assert code == HTTPStatus.NOT_FOUND
assert data['code'] == 'NoSuchProcess'
assert rsp_headers['Content-Type'] == FORMAT_TYPES[F_JSON]
# Test describe doesn't crash if example is missing
req = mock_api_request()
processor = api_.manager.get_processor("hello-world")
example = processor.metadata.pop("example")
rsp_headers, code, response = describe_processes(api_, req)
processor.metadata['example'] = example
data = json.loads(response)
assert code == HTTPStatus.OK
assert len(data['processes']) == 2
def test_execute_process(config, api_):
req_body_0 = {
'inputs': {
'name': 'Test'
}
}
req_body_1 = {
'inputs': {
'name': 'Test'
},
'response': 'document'
}
req_body_2 = {
'inputs': {
'name': 'Tést'
}
}
req_body_3 = {
'inputs': {
'name': 'Tést',
'message': 'This is a test.'
}
}
req_body_4 = {
'inputs': {
'foo': 'Tést'
}
}
req_body_5 = {
'inputs': {}
}
req_body_6 = {
'inputs': {
'name': None
}
}
req_body_7 = {
'inputs': {
'name': 'Test'
},
'subscriber': {
'successUri': 'https://example.com/success',
'inProgressUri': 'https://example.com/inProgress',
'failedUri': 'https://example.com/failed',
}
}
req_body_8 = {
'inputs': {
'name': 'Test document'
},
'response': 'document'
}
cleanup_jobs = set()
# Test posting empty payload to existing process
req = mock_api_request(data='')
rsp_headers, code, response = execute_process(api_, req, 'hello-world')
assert rsp_headers['Content-Language'] == 'en-US'
data = json.loads(response)
assert code == HTTPStatus.BAD_REQUEST
assert 'Location' not in rsp_headers
assert data['code'] == 'MissingParameterValue'
req = mock_api_request(data=req_body_0)
rsp_headers, code, response = execute_process(api_, req, 'foo')
data = json.loads(response)
assert code == HTTPStatus.NOT_FOUND
assert 'Location' not in rsp_headers
assert data['code'] == 'NoSuchProcess'
rsp_headers, code, response = execute_process(api_, req, 'hello-world')
data = json.loads(response)
assert code == HTTPStatus.OK
assert 'Location' in rsp_headers
assert len(data.keys()) == 2
assert data['id'] == 'echo'
assert data['value'] == 'Hello Test!'
cleanup_jobs.add(tuple(['hello-world',
rsp_headers['Location'].split('/')[-1]]))
req = mock_api_request(data=req_body_1)
rsp_headers, code, response = execute_process(api_, req, 'hello-world')
data = json.loads(response)
assert code == HTTPStatus.OK
assert 'Location' in rsp_headers
assert len(data.keys()) == 1
assert data['outputs'][0]['id'] == 'echo'
assert data['outputs'][0]['value'] == 'Hello Test!'
cleanup_jobs.add(tuple(['hello-world',
rsp_headers['Location'].split('/')[-1]]))
req = mock_api_request(data=req_body_2)
rsp_headers, code, response = execute_process(api_, req, 'hello-world')
data = json.loads(response)
assert code == HTTPStatus.OK
assert 'Location' in rsp_headers
assert data['value'] == 'Hello Tést!'
cleanup_jobs.add(tuple(['hello-world',
rsp_headers['Location'].split('/')[-1]]))
req = mock_api_request(data=req_body_3)
rsp_headers, code, response = execute_process(api_, req, 'hello-world')
data = json.loads(response)
assert code == HTTPStatus.OK
assert 'Location' in rsp_headers
assert data['value'] == 'Hello Tést! This is a test.'
cleanup_jobs.add(tuple(['hello-world',
rsp_headers['Location'].split('/')[-1]]))
req = mock_api_request(data=req_body_4)
rsp_headers, code, response = execute_process(api_, req, 'hello-world')
data = json.loads(response)
assert code == HTTPStatus.BAD_REQUEST
assert 'Location' in rsp_headers
assert data['code'] == 'InvalidParameterValue'
cleanup_jobs.add(tuple(['hello-world',
rsp_headers['Location'].split('/')[-1]]))
req = mock_api_request(data=req_body_5)
rsp_headers, code, response = execute_process(api_, req, 'hello-world')
data = json.loads(response)
assert code == HTTPStatus.BAD_REQUEST
assert 'Location' in rsp_headers
assert data['code'] == 'InvalidParameterValue'
assert data['description'].startswith('Error executing process: ')
cleanup_jobs.add(tuple(['hello-world',
rsp_headers['Location'].split('/')[-1]]))
req = mock_api_request(data=req_body_6)
rsp_headers, code, response = execute_process(api_, req, 'hello-world')
data = json.loads(response)
assert code == HTTPStatus.BAD_REQUEST
assert 'Location' in rsp_headers
assert data['code'] == 'InvalidParameterValue'
assert data['description'].startswith('Error executing process: ')
cleanup_jobs.add(tuple(['hello-world',
rsp_headers['Location'].split('/')[-1]]))
req = mock_api_request(data=req_body_0)
rsp_headers, code, response = execute_process(api_, req, 'goodbye-world')
response = json.loads(response)
assert code == HTTPStatus.NOT_FOUND
assert 'Location' not in rsp_headers
assert response['code'] == 'NoSuchProcess'
rsp_headers, code, response = execute_process(api_, req, 'hello-world')
response = json.loads(response)
assert code == HTTPStatus.OK
cleanup_jobs.add(tuple(['hello-world',
rsp_headers['Location'].split('/')[-1]]))
req = mock_api_request(data=req_body_1, HTTP_Prefer='respond-async')
rsp_headers, code, response = execute_process(api_, req, 'hello-world')
assert 'Location' in rsp_headers
response = json.loads(response)
assert isinstance(response, dict)
assert code == HTTPStatus.CREATED
cleanup_jobs.add(tuple(['hello-world',
rsp_headers['Location'].split('/')[-1]]))
req = mock_api_request(data=req_body_7)
with mock.patch(
'pygeoapi.process.manager.base.requests.post'
) as post_mocker:
rsp_headers, code, response = execute_process(api_, req, 'hello-world')
assert code == HTTPStatus.OK
post_mocker.assert_any_call(
req_body_7['subscriber']['inProgressUri'], json={}
)
post_mocker.assert_any_call(
req_body_7['subscriber']['successUri'],
json={'id': 'echo', 'value': 'Hello Test!'}
)
assert post_mocker.call_count == 2
cleanup_jobs.add(tuple(['hello-world',
rsp_headers['Location'].split('/')[-1]]))
req = mock_api_request(data=req_body_8)
rsp_headers, code, response = execute_process(api_, req, 'hello-world')
response = json.loads(response)
assert code == HTTPStatus.OK
assert 'outputs' in response
assert isinstance(response['outputs'], list)
# Cleanup
time.sleep(2) # Allow time for any outstanding async jobs
for _, job_id in cleanup_jobs:
rsp_headers, code, response = delete_job(api_, mock_api_request(),
job_id)
assert code == HTTPStatus.OK
def _execute_a_job(api_):
req_body_sync = {
'inputs': {
'name': 'Sync Test'
}
}
req = mock_api_request(data=req_body_sync)
rsp_headers, code, response = execute_process(api_, req, 'hello-world')
data = json.loads(response)
assert code == HTTPStatus.OK
assert 'Location' in rsp_headers
assert data['value'] == 'Hello Sync Test!'
job_id = rsp_headers['Location'].split('/')[-1]
return job_id
def test_delete_job(api_):
rsp_headers, code, response = delete_job(api_, mock_api_request(),
'does-not-exist')
assert code == HTTPStatus.NOT_FOUND
req_body_async = {
'inputs': {
'name': 'Async Test Deletion'
}
}
job_id = _execute_a_job(api_)
rsp_headers, code, response = delete_job(api_, mock_api_request(), job_id)
data = json.loads(response)
assert code == HTTPStatus.OK
assert data['message'] == 'Job dismissed'
rsp_headers, code, response = delete_job(api_, mock_api_request(), job_id)
assert code == HTTPStatus.NOT_FOUND
req = mock_api_request(data=req_body_async, HTTP_Prefer='respond-async')
rsp_headers, code, response = execute_process(api_, req, 'hello-world')
assert code == HTTPStatus.CREATED
assert 'Location' in rsp_headers
time.sleep(2) # Allow time for async execution to complete
job_id = rsp_headers['Location'].split('/')[-1]
rsp_headers, code, response = delete_job(api_, mock_api_request(), job_id)
assert code == HTTPStatus.OK
rsp_headers, code, response = delete_job(api_, mock_api_request(), job_id)
assert code == HTTPStatus.NOT_FOUND
def test_get_job_result(api_):
rsp_headers, code, response = get_job_result(
api_, mock_api_request(), 'not-exist',
)
assert code == HTTPStatus.NOT_FOUND
job_id = _execute_a_job(api_)
rsp_headers, code, response = get_job_result(api_, mock_api_request(),
job_id)
# default response is html
assert code == HTTPStatus.OK
assert rsp_headers['Content-Type'] == 'text/html'
assert 'Hello Sync Test!' in response
rsp_headers, code, response = get_job_result(
api_, mock_api_request({'f': 'json'}), job_id,
)
assert code == HTTPStatus.OK
assert rsp_headers['Content-Type'] == 'application/json'
assert json.loads(response)['value'] == 'Hello Sync Test!'
def test_get_jobs_single(api_):
job_id = _execute_a_job(api_)
headers, code, response = get_jobs(api_, mock_api_request(), job_id=job_id)
assert code == HTTPStatus.OK
job = json.loads(response)
assert job['jobID'] == job_id
assert job['status'] == 'successful'
def test_get_jobs_pagination(api_):
# generate test jobs for querying
for _ in range(11):
_execute_a_job(api_)
# test default pagination limit
headers, code, response = get_jobs(api_, mock_api_request(), job_id=None)
job_response = json.loads(response)
assert len(job_response['jobs']) == 10
assert next(
link for link in job_response['links'] if link['rel'] == 'next'
)['href'].endswith('/jobs?offset=10')
headers, code, response = get_jobs(
api_,
mock_api_request({'limit': 10, 'offset': 9}),
job_id=None)
job_response_offset = json.loads(response)
# check to get 1 same job id with an offset of 9 and limit of 10
same_job_ids = {job['jobID'] for job in job_response['jobs']}.intersection(
{job['jobID'] for job in job_response_offset['jobs']}
)
assert len(same_job_ids) == 1
assert next(
link for link in job_response_offset['links'] if link['rel'] == 'prev'
)['href'].endswith('/jobs?offset=0&limit=10')
# test custom limit
headers, code, response = get_jobs(
api_,
mock_api_request({'limit': 20}),
job_id=None)
job_response = json.loads(response)
# might be more than 11 due to test interaction
assert len(job_response['jobs']) > 10