From d9c377ef62a2dde448306193ee9d108278c0ff8c Mon Sep 17 00:00:00 2001 From: Alexander Pilz <51150328+xcomagent95@users.noreply.github.com> Date: Thu, 30 Mar 2023 13:38:43 +0200 Subject: [PATCH] Feature/mongodb jobmanager (#1186) * MongoDB Jobmanager * Add MongoDB-Jobmanager Example configuration included * Requested Fixes Fixes Issues mentioned in latest Review * Flake8 fixes Code is fixed on the basis of a flake8 report * Update ogcapi-processes.rst * Update ogcapi-processes.rst * Flake8 fixes Fixes of flake8 errors --- .../data-publishing/ogcapi-processes.rst | 15 ++ pygeoapi-config.yml | 2 +- pygeoapi/plugin.py | 1 + pygeoapi/process/manager/mongodb_.py | 156 ++++++++++++++++++ requirements.txt | 2 +- 5 files changed, 174 insertions(+), 2 deletions(-) create mode 100644 pygeoapi/process/manager/mongodb_.py diff --git a/docs/source/data-publishing/ogcapi-processes.rst b/docs/source/data-publishing/ogcapi-processes.rst index 52e5b82..99cb278 100644 --- a/docs/source/data-publishing/ogcapi-processes.rst +++ b/docs/source/data-publishing/ogcapi-processes.rst @@ -46,6 +46,21 @@ advanced job management capabilities (e.g. Kubernetes, databases, etc.). connection: /tmp/pygeoapi-process-manager.db output_dir: /tmp/ +MongoDB +-------------------- +As an alternative to the default a manager employing `MongoDB`_ can be used. +The connection to an installed `MongoDB`_ instance must be provided in the configuration. +`MongoDB`_ uses the localhost and port 27017 by default. Jobs are stored in a collection named +job_manager_pygeoapi. + +.. code-block:: yaml + + server: + manager: + name: MongoDB + connection: mongodb://host:port + output_dir: /tmp/ + Putting it all together ----------------------- diff --git a/pygeoapi-config.yml b/pygeoapi-config.yml index fd42141..97e3f5f 100644 --- a/pygeoapi-config.yml +++ b/pygeoapi-config.yml @@ -53,7 +53,7 @@ server: # connection: /tmp/pygeoapi-process-manager.db # output_dir: /tmp/ # ogc_schemas_location: /opt/schemas.opengis.net - + logging: level: ERROR #logfile: /tmp/pygeoapi.log diff --git a/pygeoapi/plugin.py b/pygeoapi/plugin.py index 4c610d2..dd33233 100644 --- a/pygeoapi/plugin.py +++ b/pygeoapi/plugin.py @@ -67,6 +67,7 @@ PLUGINS = { }, 'process_manager': { 'Dummy': 'pygeoapi.process.manager.dummy.DummyManager', + 'MongoDB': 'pygeoapi.process.manager.mongodb_.MongoDBManager', 'TinyDB': 'pygeoapi.process.manager.tinydb_.TinyDBManager' } } diff --git a/pygeoapi/process/manager/mongodb_.py b/pygeoapi/process/manager/mongodb_.py new file mode 100644 index 0000000..426e28a --- /dev/null +++ b/pygeoapi/process/manager/mongodb_.py @@ -0,0 +1,156 @@ +# ================================================================= +# +# Authors: Alexander Pilz +# +# Copyright (c) 2023 Alexander Pilz +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= +import json +import logging +import traceback + +from pymongo import MongoClient + +from pygeoapi.process.manager.base import BaseManager + +LOGGER = logging.getLogger(__name__) + + +class MongoDBManager(BaseManager): + def __init__(self, manager_def): + super().__init__(manager_def) + self.is_async = True + + def _connect(self): + try: + client = MongoClient(self.connection) + self.db = client + LOGGER.info("JOBMANAGER - MongoDB connected") + return True + except Exception: + self.destroy() + LOGGER.error("JOBMANAGER - connect error", + exc_info=(traceback)) + return False + + def destroy(self): + try: + self.db.close() + LOGGER.info("JOBMANAGER - MongoDB disconnected") + return True + except Exception: + self.destroy() + LOGGER.error("JOBMANAGER - destroy error", + exc_info=(traceback)) + return False + + def get_jobs(self, status=None): + try: + self._connect() + database = self.db.job_manager_pygeoapi + collection = database.jobs + if status is not None: + jobs = list(collection.find({}, {"status": status})) + else: + jobs = list(collection.find({})) + LOGGER.info("JOBMANAGER - MongoDB jobs queried") + return jobs + except Exception: + LOGGER.error("JOBMANAGER - get_jobs error", + exc_info=(traceback)) + return False + + def add_job(self, job_metadata): + try: + self._connect() + database = self.db.job_manager_pygeoapi + collection = database.jobs + doc_id = collection.insert_one(job_metadata) + LOGGER.info("JOBMANAGER - MongoDB job added") + return doc_id + except Exception: + LOGGER.error("JOBMANAGER - add_job error", + exc_info=(traceback)) + return False + + def update_job(self, job_id, update_dict): + try: + self._connect() + database = self.db.job_manager_pygeoapi + collection = database.jobs + entry = collection.find_one({"identifier": job_id}) + collection.update_one(entry, {"$set": update_dict}) + LOGGER.info("JOBMANAGER - MongoDB job updated") + return True + except Exception: + LOGGER.error("JOBMANAGER - MongoDB update_job error", + exc_info=(traceback)) + return False + + def delete_job(self, job_id): + try: + self._connect() + database = self.db.job_manager_pygeoapi + collection = database.jobs + collection.delete_one({"identifier": job_id}) + LOGGER.info("JOBMANAGER - MongoDB job deleted") + return True + except Exception: + LOGGER.error("JOBMANAGER - MongoDB delete_job error", + exc_info=(traceback)) + return False + + def get_job(self, job_id): + try: + self._connect() + database = self.db.job_manager_pygeoapi + collection = database.jobs + entry = collection.find_one({"identifier": job_id}) + LOGGER.info("JOBMANAGER - MongoDB job queried") + return entry + except Exception: + LOGGER.error("JOBMANAGER - MongoDB get_job error", + exc_info=(traceback)) + return False + + def get_job_result(self, job_id): + try: + self._connect() + database = self.db.job_manager_pygeoapi + collection = database.jobs + entry = collection.find_one({"identifier": job_id}) + if entry["status"] != "successful": + LOGGER.info("JOBMANAGER - job not finished or failed") + return (None,) + with open(entry["location"], "r") as file: + data = json.load(file) + LOGGER.info("JOBMANAGER - MongoDB job result queried") + return entry["mimetype"], data + except Exception: + LOGGER.error("JOBMANAGER - MongoDB get_job_result error", + exc_info=(traceback)) + return False + + def __repr__(self): + return f' {self.name}' diff --git a/requirements.txt b/requirements.txt index ab20e68..c467e50 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,4 +15,4 @@ requests shapely<2.0 SQLAlchemy<2.0.0 tinydb -unicodecsv +unicodecsv \ No newline at end of file