Files
pygeoapi/tests/test_postgresql_provider.py
T
John A Stevenson 644281359a Add CQL to PostgreSQL provider (via pygeofilter and sqlalchemy) (#964)
* Add pygeofilter, unpin psycopg2

* Add pygeofilter SQLAlchemy query demo

* Add note about primary key

* Add todos to script

* Make script generic

* Add (commented out) PostgreSQL provider hotosm_bdi_waterways to config

* Update to use local test database

* Return result as dictionary

* Pass GEOM_FIELD as variable

* Add offset and limt, fix two names

* Implement order by

* Use function for cql query

* Add cql query block to query and test

* Link up to db engine

* Convert WKB geometry to GeoJSON

* Add shapely as explicit dependency

* Add tests for CQL queries

* Add test using CROSSES with LINESTRING

* Uncomment Docker PostGIS test layer

* First pass at wiring up API

The CQL is read from the request but the parser fails to convert to
AST.

* Turn logging to DEBUG

Example query
http://localhost:5000/collections/hot_osm_waterways/items?cql=%27osm_id%20BETWEEN%2080800000%20AND%2080900000%20AND%20name%20IS%20NULL%27

* Create dedicated pygeoapi-config.yml for testing PostgreSQL

* Add test for PostgreSQL CQL

* Return 400 for bad CQL

* Tidy up old files

* Bring API error type tests together

* Only reflect the table of interest

* Add pygeofilter to requirements

* Remove local setup_env.sh from .gitignore

* Remove lark exception handling

* Remove lark codes from tests, docstring change

* Split cql test into good and error

* Rename cql parameter to filter

* Add optional filter-lang parameter with test

* Initial changes

* Rename cql text parser

* Use CQL-JSON POST for PostgreSQL query

* Refactor to get data early and handle errors

* Add error tests for CQL-JSON PostgreSQL

* Add tests for prev/next and provider instantiation

* As filter-lang is set the block in not needed

* Clarify test fixtures

* Use filterq only in all CQL calls

* Make engine and table_model attributes of provider

* Handle different instantiation failures

* Use SQLAlchemy for get_fields()

* Minor doc edits

* Use SQLAlchemy for get()

* Simplify _sqlalchemy_to_feature

* Store database parameters in dedicated function

* Strip out non-SQLAlchemy code

* Add draft query() based on CQL filters

* Add property filtering to query()

* Implement select_properties and skip_geometry

* Implement bbox filter

* Implement properties subset defined in config

* Clean up materialized view test

We only need to test the fields that are present to be sure that the
view has been accessed, so other checks have been removed.

* Flake8 fixes and tidy up

* Add combined test for CQL with bbox and properties

* Create sessions directly where required

* Add Engine store to allow connection reuse

* Replace cql_ast with filterq

* Tidy up tests by using fixture for api

* Add authors

* Use Session() as context manager

A session requires a connection to the database, as provided by the
SQLAlchemy engine.  When the session goes out of scope, e.g. at exit
from the function when it was used, it should be garbage collected and
the connection freed.  However, during load testing it was found that
this wasn't happening.  This commit uses a context manager pattern to
start the session, which ensures that the session is closed at the end
of the `with` block.

* Return fields type as {'type': ...}

* Cache the table_model with the engine

Generating the table_model by "reflecting" the database is expensive but
the resulting model doesn't change.  If we cache the table_model with
the engine we save on a round-trip to the database and speed up response
time.

* Use separate stores for Engine and table_models

Using a combined store based on both engine and table_model meant that
a pygeoapi serving multiple tables on the same database would maintain
a connection pool for each table.  This could eventually saturate the
number of available connections.  This commit separates the store into
two parts.

* Update documentation with example

* Manage or post items based on POST content type

* ghcr build action

* only ghcr job

* run criteria edit

* Update README.md

* job = main

* remove success requirememt

* add tag

* tags: bgs-cql

* Update main.yml

* Update README.md

* remove binary

* update

* Update actions to latest versions

* Dockerfile refactor

* Dockerfile refactor

* restore dockerhub

* fix pygeoif version

* # 3.6 test & dockerhub ref

* Pin pygeoif version to less than 1

* Replace psycopg-binary with psycopg

* Split Docker build into sections

* Pin click version >7,<=8

* Add further cql examples

* Update workflows to match GeoPython repo

* update flake python version

* Unquote PostgreSQL column names

* Update container workflow uses versions

* flake8 use python version 3.7

* Use pre ping to get db connection

* Test that properties are set in provider

* Amendments as requested by @tomkralidis

* Correction

* Fix reviewed changes in api code

Co-authored-by: Colin Blackburn <colb@bgs.ac.uk>
Co-authored-by: KoalaGeo <eddlewis85@gmail.com>
Co-authored-by: Edd <edlew@bgs.ac.uk>
Co-authored-by: Colin Blackburn <ximenesuk@users.noreply.github.com>
2022-10-29 02:15:31 -04:00

362 lines
13 KiB
Python

# =================================================================
#
# Authors: Just van den Broecke <justb4@gmail.com>
# Tom Kralidis <tomkralidis@gmail.com>
# John A Stevenson <jostev@bgs.ac.uk>
# Colin Blackburn <colb@bgs.ac.uk>
#
# Copyright (c) 2019 Just van den Broecke
# Copyright (c) 2019 Tom Kralidis
# Copyright (c) 2022 John A Stevenson and Colin Blackburn
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
# Needs to be run like: python3 -m pytest
# See pygeoapi/provider/postgresql.py for instructions on setting up
# test database in Docker
import pytest
from pygeofilter.parsers.ecql import parse
from pygeoapi.provider.base import (
ProviderConnectionError,
ProviderItemNotFoundError,
ProviderQueryError
)
from pygeoapi.provider.postgresql import PostgreSQLProvider
import pygeoapi.provider.postgresql as postgresql_provider_module
import os
PASSWORD = os.environ.get('POSTGRESQL_PASSWORD', 'postgres')
@pytest.fixture()
def config():
return {
'name': 'PostgreSQL',
'type': 'feature',
'data': {'host': '127.0.0.1',
'dbname': 'test',
'user': 'postgres',
'password': PASSWORD,
'search_path': ['osm', 'public']
},
'id_field': 'osm_id',
'table': 'hotosm_bdi_waterways',
'geom_field': 'foo_geom'
}
def test_query(config):
"""Testing query for a valid JSON object with geometry"""
p = PostgreSQLProvider(config)
feature_collection = p.query()
assert feature_collection.get('type', None) == 'FeatureCollection'
features = feature_collection.get('features', None)
assert features is not None
feature = features[0]
properties = feature.get('properties', None)
assert properties is not None
geometry = feature.get('geometry', None)
assert geometry is not None
def test_query_materialised_view(config):
"""Testing query using a materialised view"""
config_materialised_view = config.copy()
config_materialised_view['table'] = 'hotosm_bdi_drains'
provider = PostgreSQLProvider(config_materialised_view)
# Only ID, width and depth properties should be available
assert set(provider.get_fields().keys()) == {"osm_id", "width", "depth"}
def test_query_with_property_filter(config):
"""Test query valid features when filtering by property"""
p = PostgreSQLProvider(config)
feature_collection = p.query(properties=[("waterway", "stream")])
features = feature_collection.get('features', None)
stream_features = list(
filter(lambda feature: feature['properties']['waterway'] == 'stream',
features))
assert (len(features) == len(stream_features))
feature_collection = p.query(limit=50)
features = feature_collection.get('features', None)
stream_features = list(
filter(lambda feature: feature['properties']['waterway'] == 'stream',
features))
other_features = list(
filter(lambda feature: feature['properties']['waterway'] != 'stream',
features))
assert (len(features) != len(stream_features))
assert (len(other_features) != 0)
def test_query_with_config_properties(config):
"""
Test that query is restricted by properties in the config.
No properties should be returned that are not requested.
Note that not all requested properties have to exist in the query result.
"""
properties_subset = ['name', 'waterway', 'width', 'does_not_exist']
config.update({'properties': properties_subset})
provider = PostgreSQLProvider(config)
assert provider.properties == properties_subset
result = provider.query()
feature = result.get('features')[0]
properties = feature.get('properties', None)
for property_name in properties.keys():
assert property_name in config["properties"]
@pytest.mark.parametrize("property_filter, expected", [
([], 14776),
([("waterway", "stream")], 13930),
([("waterway", "this does not exist")], 0),
])
def test_query_hits_with_property_filter(config, property_filter, expected):
"""Test query resulttype=hits"""
provider = PostgreSQLProvider(config)
results = provider.query(properties=property_filter, resulttype="hits")
assert results["numberMatched"] == expected
def test_query_bbox(config):
"""Test query with a specified bounding box"""
psp = PostgreSQLProvider(config)
boxed_feature_collection = psp.query(
bbox=[29.3373, -3.4099, 29.3761, -3.3924]
)
assert len(boxed_feature_collection['features']) == 5
def test_query_sortby(config):
"""Test query with sorting"""
psp = PostgreSQLProvider(config)
up = psp.query(sortby=[{'property': 'osm_id', 'order': '+'}])
assert up['features'][0]['id'] == 13990765
down = psp.query(sortby=[{'property': 'osm_id', 'order': '-'}])
assert down['features'][0]['id'] == 620735702
name = psp.query(sortby=[{'property': 'name', 'order': '+'}])
assert name['features'][0]['properties']['name'] == 'Agasasa'
def test_query_skip_geometry(config):
"""Test query without geometry"""
provider = PostgreSQLProvider(config)
result = provider.query(skip_geometry=True)
feature = result['features'][0]
assert feature['geometry'] is None
@pytest.mark.parametrize('properties', [
['name'],
['name', 'waterway'],
['name', 'waterway', 'this does not exist']
])
def test_query_select_properties(config, properties):
"""Test query with selected properties"""
provider = PostgreSQLProvider(config)
result = provider.query(select_properties=properties)
feature = result['features'][0]
expected = set(provider.get_fields().keys()).intersection(properties)
assert set(feature['properties'].keys()) == expected
@pytest.mark.parametrize('id_, prev, next_', [
(29701937, 29698243, 29704504),
(13990765, 13990765, 25469515), # First item, prev should be id_
(620735702, 620420337, 620735702), # Last item, next should be id_
])
def test_get_simple(config, id_, prev, next_):
"""Testing query for a specific object and identifying prev/next"""
p = PostgreSQLProvider(config)
result = p.get(id_)
assert result['id'] == id_
assert 'geometry' in result
assert 'properties' in result
assert result['type'] == 'Feature'
assert 'foo_geom' not in result['properties'] # geometry is separate
assert result['prev'] == prev
assert result['next'] == next_
def test_get_not_existing_item_raise_exception(config):
"""Testing query for a not existing object"""
p = PostgreSQLProvider(config)
with pytest.raises(ProviderItemNotFoundError):
p.get(-1)
@pytest.mark.parametrize('cql, expected_ids', [
("osm_id BETWEEN 80800000 AND 80900000",
[80827787, 80827793, 80835468, 80835470, 80835472, 80835474,
80835475, 80835478, 80835483, 80835486]),
("osm_id BETWEEN 80800000 AND 80900000 AND waterway = 'stream'",
[80835470]),
("osm_id BETWEEN 80800000 AND 80900000 AND waterway ILIKE 'sTrEam'",
[80835470]),
("osm_id BETWEEN 80800000 AND 80900000 AND waterway LIKE 's%'",
[80835470]),
("osm_id BETWEEN 80800000 AND 80900000 AND name IN ('Muhira', 'Mpanda')",
[80835468, 80835472, 80835475, 80835478]),
("osm_id BETWEEN 80800000 AND 80900000 AND name IS NULL",
[80835474, 80835483]),
("osm_id BETWEEN 80800000 AND 80900000 AND BBOX(foo_geom, 29, -2.8, 29.2, -2.9)", # noqa
[80827793, 80835470, 80835472, 80835483, 80835489]),
("osm_id BETWEEN 80800000 AND 80900000 AND "
"CROSSES(foo_geom, LINESTRING(29.091 -2.731, 29.253 -2.845))",
[80835470, 80835472, 80835489])
])
def test_query_cql(config, cql, expected_ids):
"""Test a variety of CQL queries"""
ast = parse(cql)
provider = PostgreSQLProvider(config)
feature_collection = provider.query(filterq=ast)
assert feature_collection.get('type', None) == 'FeatureCollection'
features = feature_collection.get('features', None)
ids = [feature["id"] for feature in features]
assert ids == expected_ids
def test_query_cql_properties_bbox_filters(config):
"""Test query with CQL, properties and bbox filters"""
# Arrange
properties = [('waterway', 'stream')]
bbox = [29, -2.8, 29.2, -2.9]
filterq = parse("osm_id BETWEEN 80800000 AND 80900000")
expected_ids = [80835470]
# Act
provider = PostgreSQLProvider(config)
feature_collection = provider.query(filterq=filterq,
properties=properties,
bbox=bbox)
# Assert
ids = [feature["id"] for feature in feature_collection.get('features')]
assert ids == expected_ids
def test_get_fields(config):
# Arrange
expected_fields = {
'blockage': {'type': 'VARCHAR(80)'},
'covered': {'type': 'VARCHAR(80)'},
'depth': {'type': 'VARCHAR(80)'},
'layer': {'type': 'VARCHAR(80)'},
'name': {'type': 'VARCHAR(80)'},
'natural': {'type': 'VARCHAR(80)'},
'osm_id': {'type': 'INTEGER'},
'tunnel': {'type': 'VARCHAR(80)'},
'water': {'type': 'VARCHAR(80)'},
'waterway': {'type': 'VARCHAR(80)'},
'width': {'type': 'VARCHAR(80)'},
'z_index': {'type': 'VARCHAR(80)'}
}
# Act
provider = PostgreSQLProvider(config)
# Assert
assert provider.get_fields() == expected_fields
assert provider.fields == expected_fields # API uses .fields attribute
def test_instantiation(config):
"""Test attributes are correctly set during instantiation."""
# Act
provider = PostgreSQLProvider(config)
# Assert
assert provider.name == "PostgreSQL"
assert provider.table == "hotosm_bdi_waterways"
assert provider.id_field == "osm_id"
@pytest.mark.parametrize('bad_data, exception, match', [
({'table': 'bad_table'}, ProviderQueryError,
'Table.*not found in schema.*'),
({'data': {'bad': 'data'}}, ProviderConnectionError,
r'Could not connect to .*None:\*\*\*@'),
({'id_field': 'bad_id'}, ProviderQueryError,
r'No such id_field column \(bad_id\) on osm.hotosm_bdi_waterways.'),
])
def test_instantiation_with_bad_config(config, bad_data, exception, match):
# Arrange
config.update(bad_data)
# Make sure we don't use a cached connection or model in the tests
postgresql_provider_module._ENGINE_STORE = {}
postgresql_provider_module._TABLE_MODEL_STORE = {}
# Act and assert
with pytest.raises(exception, match=match):
PostgreSQLProvider(config)
def test_instantiation_with_bad_credentials(config):
# Arrange
config['data'].update({'user': 'bad_user'})
match = r'Could not connect to .*bad_user:\*\*\*@'
# Make sure we don't use a cached connection in the tests
postgresql_provider_module._ENGINE_STORE = {}
# Act and assert
with pytest.raises(ProviderConnectionError, match=match):
PostgreSQLProvider(config)
def test_engine_and_table_model_stores(config):
provider0 = PostgreSQLProvider(config)
# Same config should return same engine and table_model
provider1 = PostgreSQLProvider(config)
assert repr(provider1._engine) == repr(provider0._engine)
assert provider1._engine is provider0._engine
assert provider1.table_model is provider0.table_model
# Same database connection details, but different table
different_table = config.copy()
different_table.update(table="hotosm_bdi_drains")
provider2 = PostgreSQLProvider(different_table)
assert repr(provider2._engine) == repr(provider0._engine)
assert provider2._engine is provider0._engine
assert provider2.table_model is not provider0.table_model
# Although localhost is 127.0.0.1, this should get different engine
# and also a different table_model, as two databases may have different
# tables with the same name
different_host = config.copy()
different_host["data"]["host"] = "localhost"
provider3 = PostgreSQLProvider(different_host)
assert provider3._engine is not provider0._engine
assert provider3.table_model is not provider0.table_model