handle ES backends also generated by GDAL/OGR (#385)

This commit is contained in:
Tom Kralidis
2020-03-25 08:23:47 -04:00
committed by GitHub
parent 473e7f1691
commit 477e07cea7
+95 -32
View File
@@ -58,6 +58,8 @@ class ElasticsearchProvider(BaseProvider):
LOGGER.debug('Setting Elasticsearch properties')
self.es_host = url_tokens[2]
self.index_name = url_tokens[-1]
self.is_gdal = False
LOGGER.debug('host: {}'.format(self.es_host))
LOGGER.debug('index: {}'.format(self.index_name))
@@ -68,6 +70,13 @@ class ElasticsearchProvider(BaseProvider):
LOGGER.error(msg)
raise ProviderConnectionError(msg)
LOGGER.debug('Determining ES version')
v = self.es.info()['version']['number'][:3]
if float(v) < 7:
msg = 'only ES 7+ supported'
LOGGER.error(msg)
raise ProviderConnectionError(msg)
LOGGER.debug('Grabbing field information')
try:
self.fields = self.get_fields()
@@ -85,7 +94,13 @@ class ElasticsearchProvider(BaseProvider):
fields_ = {}
ic = IndicesClient(self.es)
ii = ic.get(self.index_name)
p = ii[self.index_name]['mappings']['properties']['properties'] # noqa
try:
p = ii[self.index_name]['mappings']['properties']['properties'] # noqa
except KeyError:
LOGGER.debug('ES index looks generated by GDAL')
self.is_gdal = True
p = ii[self.index_name]['mappings']
for k, v in p['properties'].items():
if 'type' in v:
@@ -148,7 +163,7 @@ class ElasticsearchProvider(BaseProvider):
LOGGER.error('time_field not enabled for collection')
raise ProviderQueryError()
time_field = 'properties.{}'.format(self.time_field)
time_field = self.mask_prop(self.time_field)
if '/' in datetime: # envelope
LOGGER.debug('detected time range')
@@ -181,7 +196,7 @@ class ElasticsearchProvider(BaseProvider):
for prop in properties:
pf = {
'match': {
'properties.{}'.format(prop[0]): prop[1]
self.mask_prop(prop[0]): prop[1]
}
}
query['query']['bool']['filter'].append(pf)
@@ -196,9 +211,9 @@ class ElasticsearchProvider(BaseProvider):
if self.fields[sp]['type'] == 'string':
LOGGER.debug('setting ES .raw on property')
sort_property = 'properties.{}.raw'.format(sp)
sort_property = '{}.raw'.format(self.mask_prop(sp))
else:
sort_property = 'properties.{}'.format(sp)
sort_property = self.mask_prop(sp)
sort_order = 'asc'
if sort['order'] == 'D':
@@ -215,10 +230,9 @@ class ElasticsearchProvider(BaseProvider):
LOGGER.debug('including specified fields: {}'.format(
self.properties))
query['_source'] = {
'includes': list(map('properties.{}'.format, self.properties))
'includes': list(map(self.mask_prop, self.properties))
}
query['_source']['includes'].append('properties.{}'.format(
self.id_field))
query['_source']['includes'].append(self.mask_prop(self.id_field))
query['_source']['includes'].append('type')
query['_source']['includes'].append('geometry')
try:
@@ -270,27 +284,8 @@ class ElasticsearchProvider(BaseProvider):
LOGGER.debug('serializing features')
for feature in results['hits']['hits']:
id_ = feature['_source']['properties'][self.id_field]
LOGGER.debug('serializing id {}'.format(id_))
feature['_source']['id'] = id_
if self.properties:
feature_thinned = {
'id': feature['_source']['properties'][self.id_field],
'type': feature['_source']['type'],
'geometry': feature['_source']['geometry'],
'properties': OrderedDict()
}
for p in self.properties:
try:
feature_thinned['properties'][p] = \
feature['_source']['properties'][p]
except KeyError as err:
LOGGER.error(err)
raise ProviderQueryError()
feature_collection['features'].append(feature_thinned)
else:
feature_collection['features'].append(feature['_source'])
feature_ = self.esdoc2geojson(feature)
feature_collection['features'].append(feature_)
return feature_collection
@@ -307,13 +302,81 @@ class ElasticsearchProvider(BaseProvider):
LOGGER.debug('Fetching identifier {}'.format(identifier))
result = self.es.get(self.index_name, id=identifier)
LOGGER.debug('Serializing feature')
id_ = result['_source']['properties'][self.id_field]
result['_source']['id'] = id_
feature_ = self.esdoc2geojson(result)
except exceptions.NotFoundError as err:
LOGGER.debug('Not found via ES id query: {}'.format(err))
LOGGER.debug('Trying via a real query')
query = {
'query': {
'bool': {
'filter': [{
'match': {
self.id_field: identifier
}
}]
}
}
}
result = self.es.search(index=self.index_name, body=query)
if len(result['hits']['hits']) == 0:
LOGGER.error(err)
return None
LOGGER.debug('Serializing feature')
feature_ = self.esdoc2geojson(result['hits']['hits'][0])
except Exception as err:
LOGGER.error(err)
return None
return result['_source']
return feature_
def esdoc2geojson(self, doc):
feature_ = {}
feature_thinned = {}
if 'properties' not in doc['_source']:
LOGGER.debug('Looks like a GDAL ES 7 document')
id_ = doc['_source'][self.id_field]
if 'type' not in doc['_source']:
feature_['id'] = id_
feature_['type'] = 'Feature'
feature_['geometry'] = doc['_source']['geometry']
feature_['properties'] = {}
for key, value in doc['_source'].items():
if key == 'geometry':
continue
feature_['properties'][key] = value
else:
LOGGER.debug('Looks like true GeoJSON document')
feature_ = doc['_source']
id_ = doc['_source']['properties'][self.id_field]
feature_['id'] = id_
if self.properties:
feature_thinned = {
'id': id_,
'type': feature_['type'],
'geometry': feature_['geometry'],
'properties': OrderedDict()
}
for p in self.properties:
try:
feature_thinned['properties'][p] = feature_['properties'][p] # noqa
except KeyError as err:
LOGGER.error(err)
raise ProviderQueryError()
if feature_thinned:
return feature_thinned
else:
return feature_
def mask_prop(self, property_name):
if self.is_gdal:
return property_name
else:
return 'properties.{}'.format(property_name)
def __repr__(self):
return '<ElasticsearchProvider> {}'.format(self.data)