Files
pygeoapi/pygeoapi/util.py
T
Sander Schaminee 023f24d26b Multilingual support (alternative) (#664)
* Created localization (l10n) module + tests. Added l10n support to API and plugins (wip).

* Big refactor:

* All routed API methods are now decorated by @pre_process (consistency) and no longer have a headers+format argument but a request argument (**kwargs also removed)
* The pre_process decorator turns an incoming Flask/Starlette request into a generic APIRequest instance
* The new APIRequest class extracts all relevant info (params, data, locale, etc.) from the request and exposes them as properties
* Removed a lot of boilerplate (i.e. format checking) and wrapped that into methods
* Updated server-specific API calls in each route method (pass entire request object, not headers and query params)

* Several improvements and fixes:

* Updated OpenAPI page with "l" query param
* Added example translations (metadata)
* Changed plugin signature: added explicit locale attribute (instead of **kwargs)
* Moved locale processing to get_plugin_locale() function in l10n module
* API should pass raw requested locale to plugins, locale should always be set
* Fixed API tests and added APIRequest tests
* Prepared utils.py for Jinja2 i18n extension
* Rebased on commit b40297a8 and fixed compatibility with #661 and #662

* Updated documentation for language support

* Rebased and fixed compatibility with PR #658:

* Fixed EDR provider signature (added locale)
* Fixed EDR API routes and query function (and improved parameter-name handling)
* Fixed EDR tests

* Translate entire config in render_j2_template for requested locale:

* Added new translate_dict function to l10n module (+ tests)
* Updated all render_j2_template calls with locale parameter
* Updated pygeoapi-test-config.yml with some language structs

* Minor improvements

* support both 'language' and 'languages' property in server config and provider definitions
* renamed and modified translate_dict() to more generic translate_struct() function (l10n module)
* remove Content-Language header from provider responses if provider has no language support and format is json(ld)
* updated tests

* Leave provider locale handling to API

* Moved code to determine locale from providers to API class (and remove for formatters and processes)
* Removed locale parameter from plugin __init__ signatures
* Removed locale parameter from load_plugin()
* Added **kwargs to provider implementations for get, query, get_metadata, get_coverage_domainset and get_coverage_rangetype method signatures
* Added language=<locale> to all API calls to provider get, query, get_metadata, get_coverage_domainset and get_coverage_rangetype methods

* Use 'lang' instead of 'l' as language query parameter

* Updated Open API
* Updated documentation
* Fixed tests

* Implemented requested PR changes:

* Added usage examples to the APIRequest docstring
* Removed language support from coverage functions
* Updated plugins.rst and language.rst to match new behavior
* Removed language struct from resource links in pygeoapi-config.yml
* Rebased on latest master (fixed test_api.py)

* Rebased and applied fixes:

* Data property in APIRequest now is an awaitable attribute (fixed for Starlette compatibility)
* Named references to 'l' parameter to 'lang'

* Final changes/improvements:

* Make sure that Content-Language is always set;
* Added more tests to ensure that the default language returned is the first configured language (if no language was requested by the user);
* Updated docs;
* Replaced re-occuring strings with constants in api.py;
* Fixed Flake8 checks.

* add missing async to starlette routes (#704)

Co-authored-by: Tom Kralidis <tomkralidis@gmail.com>
2021-06-08 18:46:35 -04:00

514 lines
14 KiB
Python

# =================================================================
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
#
# Copyright (c) 2020 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
"""Generic util functions used in the code"""
import base64
from datetime import date, datetime, time
from decimal import Decimal
from enum import Enum
import io
import json
import logging
import mimetypes
import os
import re
from urllib.request import urlopen
from urllib.parse import urlparse
import dateutil.parser
# from babel.support import Translations
from jinja2 import Environment, FileSystemLoader
from jinja2.exceptions import TemplateNotFound
import yaml
from pygeoapi import __version__
from pygeoapi import l10n
from pygeoapi.provider.base import ProviderTypeError
LOGGER = logging.getLogger(__name__)
DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
TEMPLATES = '{}{}templates'.format(os.path.dirname(
os.path.realpath(__file__)), os.sep)
mimetypes.add_type('text/plain', '.yaml')
mimetypes.add_type('text/plain', '.yml')
def dategetter(date_property, collection):
"""
Attempts to obtain a date value from a collection.
:param date_property: property representing the date
:param collection: dictionary to check within
:returns: `str` (ISO8601) representing the date (allowing
for an open interval using null)
"""
value = collection.get(date_property, None)
if value is None:
return None
return value.isoformat()
def get_typed_value(value):
"""
Derive true type from data value
:param value: value
:returns: value as a native Python data type
"""
try:
if '.' in value: # float?
value2 = float(value)
elif len(value) > 1 and value.startswith('0'):
value2 = value
else: # int?
value2 = int(value)
except ValueError: # string (default)?
value2 = value
return value2
def yaml_load(fh):
"""
serializes a YAML files into a pyyaml object
:param fh: file handle
:returns: `dict` representation of YAML
"""
# support environment variables in config
# https://stackoverflow.com/a/55301129
path_matcher = re.compile(r'.*\$\{([^}^{]+)\}.*')
def path_constructor(loader, node):
env_var = path_matcher.match(node.value).group(1)
if env_var not in os.environ:
raise EnvironmentError('Undefined environment variable in config')
return get_typed_value(os.path.expandvars(node.value))
class EnvVarLoader(yaml.SafeLoader):
pass
EnvVarLoader.add_implicit_resolver('!path', path_matcher, None)
EnvVarLoader.add_constructor('!path', path_constructor)
return yaml.load(fh, Loader=EnvVarLoader)
def str2bool(value):
"""
helper function to return Python boolean
type (source: https://stackoverflow.com/a/715468)
:param value: value to be evaluated
:returns: `bool` of whether the value is boolean-ish
"""
value2 = False
if isinstance(value, bool):
value2 = value
else:
value2 = value.lower() in ('yes', 'true', 't', '1', 'on')
return value2
def to_json(dict_, pretty=False):
"""
Serialize dict to json
:param dict_: `dict` of JSON representation
:param pretty: `bool` of whether to prettify JSON (default is `False`)
:returns: JSON string representation
"""
if pretty:
indent = 4
else:
indent = None
return json.dumps(dict_, default=json_serial,
indent=indent)
def format_datetime(value, format_=DATETIME_FORMAT):
"""
Parse datetime as ISO 8601 string; re-present it in particular format
for display in HTML
:param value: `str` of ISO datetime
:param format_: `str` of datetime format for strftime
:returns: string
"""
if not isinstance(value, str) or not value.strip():
return ''
return dateutil.parser.isoparse(value).strftime(format_)
def file_modified_iso8601(filepath):
"""
Provide a file's ctime in ISO8601
:param filepath: path to file
:returns: string of ISO8601
"""
return datetime.fromtimestamp(
os.path.getctime(filepath)).strftime('%Y-%m-%dT%H:%M:%SZ')
def human_size(nbytes):
"""
Provides human readable file size
source: https://stackoverflow.com/a/14996816
:param nbytes: int of file size (bytes)
:param units: list of unit abbreviations
:returns: string of human readable filesize
"""
suffixes = ['B', 'K', 'M', 'G', 'T', 'P']
i = 0
while nbytes >= 1024 and i < len(suffixes)-1:
nbytes /= 1024.
i += 1
if suffixes[i] == 'K':
f = str(int(nbytes)).rstrip('0').rstrip('.')
elif suffixes[i] == 'B':
return nbytes
else:
f = '{:.1f}'.format(nbytes).rstrip('0').rstrip('.')
return '{}{}'.format(f, suffixes[i])
def format_duration(start, end=None):
"""
Parse a start and (optional) end datetime as ISO 8601 strings, calculate
the difference, and return that duration as a string.
:param start: `str` of ISO datetime
:param end: `str` of ISO datetime, defaults to `start` for a 0 duration
:returns: string
"""
if not isinstance(start, str) or not start.strip():
return ''
end = end or start
duration = dateutil.parser.isoparse(end) - dateutil.parser.isoparse(start)
return str(duration)
def get_path_basename(urlpath):
"""
Helper function to derive file basename
:param urlpath: URL path
:returns: string of basename of URL path
"""
return os.path.basename(urlpath)
def json_serial(obj):
"""
helper function to convert to JSON non-default
types (source: https://stackoverflow.com/a/22238613)
:param obj: `object` to be evaluated
:returns: JSON non-default type to `str`
"""
if isinstance(obj, (datetime, date, time)):
return obj.isoformat()
elif isinstance(obj, bytes):
try:
LOGGER.debug('Returning as UTF-8 decoded bytes')
return obj.decode('utf-8')
except UnicodeDecodeError:
LOGGER.debug('Returning as base64 encoded JSON object')
return base64.b64encode(obj)
elif isinstance(obj, Decimal):
return float(obj)
elif isinstance(obj, l10n.Locale):
return l10n.locale2str(obj)
msg = '{} type {} not serializable'.format(obj, type(obj))
LOGGER.error(msg)
raise TypeError(msg)
def is_url(urlstring):
"""
Validation function that determines whether a candidate URL should be
considered a URI. No remote resource is obtained; this does not check
the existence of any remote resource.
:param urlstring: `str` to be evaluated as candidate URL.
:returns: `bool` of whether the URL looks like a URL.
"""
try:
result = urlparse(urlstring)
return bool(result.scheme and result.netloc)
except ValueError:
return False
def render_j2_template(config, template, data, locale_=None):
"""
render Jinja2 template
:param config: dict of configuration
:param template: template (relative path)
:param data: dict of data
:param locale_: the requested output Locale
:returns: string of rendered template
"""
custom_templates = False
try:
templates_path = config['server']['templates']['path']
env = Environment(loader=FileSystemLoader(templates_path),
extensions=['jinja2.ext.i18n'])
custom_templates = True
LOGGER.debug('using custom templates: {}'.format(templates_path))
except (KeyError, TypeError):
env = Environment(loader=FileSystemLoader(TEMPLATES),
extensions=['jinja2.ext.i18n'])
LOGGER.debug('using default templates: {}'.format(TEMPLATES))
env.filters['to_json'] = to_json
env.filters['format_datetime'] = format_datetime
env.filters['format_duration'] = format_duration
env.filters['human_size'] = human_size
env.globals.update(to_json=to_json)
env.filters['get_path_basename'] = get_path_basename
env.globals.update(get_path_basename=get_path_basename)
env.filters['get_breadcrumbs'] = get_breadcrumbs
env.globals.update(get_breadcrumbs=get_breadcrumbs)
env.filters['filter_dict_by_key_value'] = filter_dict_by_key_value
env.globals.update(filter_dict_by_key_value=filter_dict_by_key_value)
# TODO: insert Babel Translation stuff here
try:
template = env.get_template(template)
except TemplateNotFound as err:
if custom_templates:
LOGGER.debug(err)
LOGGER.debug('Custom template not found; using default')
env = Environment(loader=FileSystemLoader(TEMPLATES),
extensions=['jinja2.ext.i18n'])
template = env.get_template(template)
else:
raise
return template.render(config=l10n.translate_struct(config, locale_, True),
data=data, version=__version__)
def get_mimetype(filename):
"""
helper function to return MIME type of a given file
:param filename: filename (with extension)
:returns: MIME type of given filename
"""
return mimetypes.guess_type(filename)[0]
def get_breadcrumbs(urlpath):
"""
helper function to make breadcrumbs from a URL path
:param urlpath: URL path
:returns: `list` of `dict` objects of labels and links
"""
links = []
tokens = urlpath.split('/')
s = ''
for t in tokens:
if s:
s += '/' + t
else:
s = t
links.append({
'href': s,
'title': t,
})
return links
def filter_dict_by_key_value(dict_, key, value):
"""
helper function to filter a dict by a dict key
:param dict_: ``dict``
:param key: dict key
:param value: dict key value
:returns: filtered ``dict``
"""
return {k: v for (k, v) in dict_.items() if v[key] == value}
def filter_providers_by_type(providers, type):
"""
helper function to filter a list of providers by type
:param providers: ``list``
:param type: str
:returns: filtered ``dict`` provider
"""
providers_ = {provider['type']: provider for provider in providers}
return providers_.get(type, None)
def get_provider_by_type(providers, provider_type):
"""
helper function to load a provider by a provider type
:param providers: ``list`` of providers
:param provider_type: type of provider (feature)
:returns: provider based on type
"""
LOGGER.debug('Searching for provider type {}'.format(provider_type))
try:
p = (next(d for i, d in enumerate(providers)
if d['type'] == provider_type))
except (RuntimeError, StopIteration):
raise ProviderTypeError('Invalid provider type requested')
return p
def get_provider_default(providers):
"""
helper function to get a resource's default provider
:param providers: ``list`` of providers
:returns: filtered ``dict``
"""
try:
default = (next(d for i, d in enumerate(providers) if 'default' in d
and d['default'] is True))
LOGGER.debug('found default provider type')
except StopIteration:
LOGGER.debug('no default provider type. Returning first provider')
default = providers[0]
LOGGER.debug('Default provider: {}'.format(default['type']))
return default
class JobStatus(Enum):
"""
Enum for the job status options specified in the WPS 2.0 specification
"""
# From the specification
accepted = 'accepted'
running = 'running'
successful = 'successful'
failed = 'failed'
dismissed = 'dismissed'
def read_data(path):
"""
helper function to read data (file or networrk)
"""
LOGGER.debug('Attempting to read {}'.format(path))
scheme = urlparse(path).scheme
if scheme in ['', 'file']:
LOGGER.debug('local file on disk')
with io.open(path, 'rb') as fh:
return fh.read()
else:
LOGGER.debug('network file')
with urlopen(path) as r:
return r.read()
def url_join(*parts):
"""
helper function to join a URL from a number of parts/fragments.
Implemented because urllib.parse.urljoin strips subpaths from
host urls if they are specified
Per https://github.com/geopython/pygeoapi/issues/695
:param parts: list of parts to join
:returns: str of resulting URL
"""
return '/'.join([p.strip().strip('/') for p in parts])