This commit is contained in:
+31
-10
@@ -5,7 +5,7 @@
|
||||
# Francesco Bartoli <xbartolone@gmail.com>
|
||||
#
|
||||
# Copyright (c) 2018 Jorge Samuel Mendes de Jesus
|
||||
# Copyright (c) 2022 Tom Kralidis
|
||||
# Copyright (c) 2023 Tom Kralidis
|
||||
# Copyright (c) 2020 Francesco Bartoli
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person
|
||||
@@ -91,7 +91,15 @@ class SQLiteGPKGProvider(BaseProvider):
|
||||
results = self.cursor.execute(
|
||||
f'PRAGMA table_info({self.table})').fetchall()
|
||||
for item in results:
|
||||
self.fields[item['name']] = {'type': item['type']}
|
||||
json_type = None
|
||||
|
||||
if item['type'] in ['INTEGER', 'REAL']:
|
||||
json_type = 'number'
|
||||
elif item['type'].startswith('TEXT') or item['type'] == 'BLOB':
|
||||
json_type = 'string'
|
||||
|
||||
if json_type is not None:
|
||||
self.fields[item['name']] = {'type': json_type}
|
||||
|
||||
return self.fields
|
||||
|
||||
@@ -212,6 +220,9 @@ class SQLiteGPKGProvider(BaseProvider):
|
||||
self.application_id = 0
|
||||
|
||||
if self.application_id:
|
||||
geometry_columns_table = 'gpkg_geometry_columns'
|
||||
geometry_columns_table_name = 'table_name'
|
||||
geometry_columns_column_name = 'column_name'
|
||||
cursor.execute("SELECT AutoGPKGStart()")
|
||||
result = cursor.fetchall()
|
||||
if result[0][0] >= 1:
|
||||
@@ -219,27 +230,37 @@ class SQLiteGPKGProvider(BaseProvider):
|
||||
else:
|
||||
LOGGER.info("SELECT AutoGPKGStart() returned 0." +
|
||||
"Detected GPKG but couldn't load support")
|
||||
raise InvalidPluginError
|
||||
|
||||
if self.application_id:
|
||||
self.geom_col = "geom"
|
||||
raise InvalidPluginError()
|
||||
else:
|
||||
self.geom_col = "geometry"
|
||||
geometry_columns_table = 'geometry_columns'
|
||||
geometry_columns_column_name = 'f_geometry_column'
|
||||
geometry_columns_table_name = 'f_table_name'
|
||||
|
||||
try:
|
||||
cursor.execute(f'PRAGMA table_info({self.table})')
|
||||
result = cursor.fetchall()
|
||||
except sqlite3.OperationalError:
|
||||
LOGGER.error(f'Couldnt find table: {self.table}')
|
||||
LOGGER.error(f'Could not find table: {self.table}')
|
||||
raise ProviderConnectionError()
|
||||
|
||||
LOGGER.debug('Determining name of geometry column')
|
||||
cursor.execute(f"SELECT {geometry_columns_column_name} FROM {geometry_columns_table} WHERE {geometry_columns_table_name} = '{self.table}'") # noqa
|
||||
geometry_column = cursor.fetchall()
|
||||
|
||||
if geometry_column:
|
||||
LOGGER.debug("Found geometry column")
|
||||
self.geom_col = geometry_column[0][0]
|
||||
else:
|
||||
msg = 'No geometry column found'
|
||||
LOGGER.error(msg)
|
||||
raise ProviderConnectionError(msg)
|
||||
|
||||
try:
|
||||
assert len(result), 'Table not found'
|
||||
assert len([item for item in result
|
||||
if self.id_field in item]), 'id_field not present'
|
||||
|
||||
except AssertionError:
|
||||
raise InvalidPluginError
|
||||
raise InvalidPluginError()
|
||||
|
||||
self.columns = [item[1] for item in result if item[1]
|
||||
not in [self.geom_col, self.geom_col.upper()]]
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
# Tom Kralidis <tomkralidis@gmail.com>
|
||||
#
|
||||
# Copyright (c) 2019 Just van den Broecke
|
||||
# Copyright (c) 2022 Tom Kralidis
|
||||
# Copyright (c) 2023 Tom Kralidis
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person
|
||||
# obtaining a copy of this software and associated documentation
|
||||
@@ -63,6 +63,18 @@ def config_geopackage():
|
||||
}
|
||||
|
||||
|
||||
def test_get_fields_sqlite(config_sqlite):
|
||||
"""Testing field definitions for sqlite3"""
|
||||
|
||||
fields_expected = {
|
||||
'ogc_fid': {'type': 'number'},
|
||||
'scalerank': {'type': 'number'}
|
||||
}
|
||||
|
||||
p = SQLiteGPKGProvider(config_sqlite)
|
||||
assert p.get_fields() == fields_expected
|
||||
|
||||
|
||||
def test_query_sqlite(config_sqlite):
|
||||
"""Testing query for a valid JSON object with geometry for sqlite3"""
|
||||
|
||||
@@ -78,6 +90,21 @@ def test_query_sqlite(config_sqlite):
|
||||
assert geometry is not None
|
||||
|
||||
|
||||
def test_get_fields_geopackage(config_geopackage):
|
||||
"""Testing field definitions for geopackage"""
|
||||
|
||||
fields_expected = {
|
||||
'fclass': {'type': 'string'},
|
||||
'fid': {'type': 'number'},
|
||||
'gid': {'type': 'number'},
|
||||
'name': {'type': 'string'},
|
||||
'osm_id': {'type': 'number'}
|
||||
}
|
||||
|
||||
p = SQLiteGPKGProvider(config_geopackage)
|
||||
assert p.get_fields() == fields_expected
|
||||
|
||||
|
||||
def test_query_geopackage(config_geopackage):
|
||||
"""Testing query for a valid JSON object with geometry for geopackage"""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user