Feature/mongodb jobmanager (#1186)

* MongoDB Jobmanager

* Add MongoDB-Jobmanager

Example configuration included

* Requested Fixes

Fixes Issues mentioned in latest Review

* Flake8 fixes

Code is fixed on the basis of a flake8 report

* Update ogcapi-processes.rst

* Update ogcapi-processes.rst

* Flake8 fixes

Fixes of flake8 errors
This commit is contained in:
Alexander Pilz
2023-03-30 13:38:43 +02:00
committed by GitHub
parent 466a1df723
commit d9c377ef62
5 changed files with 174 additions and 2 deletions
@@ -46,6 +46,21 @@ advanced job management capabilities (e.g. Kubernetes, databases, etc.).
connection: /tmp/pygeoapi-process-manager.db
output_dir: /tmp/
MongoDB
--------------------
As an alternative to the default a manager employing `MongoDB`_ can be used.
The connection to an installed `MongoDB`_ instance must be provided in the configuration.
`MongoDB`_ uses the localhost and port 27017 by default. Jobs are stored in a collection named
job_manager_pygeoapi.
.. code-block:: yaml
server:
manager:
name: MongoDB
connection: mongodb://host:port
output_dir: /tmp/
Putting it all together
-----------------------
+1 -1
View File
@@ -53,7 +53,7 @@ server:
# connection: /tmp/pygeoapi-process-manager.db
# output_dir: /tmp/
# ogc_schemas_location: /opt/schemas.opengis.net
logging:
level: ERROR
#logfile: /tmp/pygeoapi.log
+1
View File
@@ -67,6 +67,7 @@ PLUGINS = {
},
'process_manager': {
'Dummy': 'pygeoapi.process.manager.dummy.DummyManager',
'MongoDB': 'pygeoapi.process.manager.mongodb_.MongoDBManager',
'TinyDB': 'pygeoapi.process.manager.tinydb_.TinyDBManager'
}
}
+156
View File
@@ -0,0 +1,156 @@
# =================================================================
#
# Authors: Alexander Pilz <a.pilz@52north.org>
#
# Copyright (c) 2023 Alexander Pilz
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================
import json
import logging
import traceback
from pymongo import MongoClient
from pygeoapi.process.manager.base import BaseManager
LOGGER = logging.getLogger(__name__)
class MongoDBManager(BaseManager):
def __init__(self, manager_def):
super().__init__(manager_def)
self.is_async = True
def _connect(self):
try:
client = MongoClient(self.connection)
self.db = client
LOGGER.info("JOBMANAGER - MongoDB connected")
return True
except Exception:
self.destroy()
LOGGER.error("JOBMANAGER - connect error",
exc_info=(traceback))
return False
def destroy(self):
try:
self.db.close()
LOGGER.info("JOBMANAGER - MongoDB disconnected")
return True
except Exception:
self.destroy()
LOGGER.error("JOBMANAGER - destroy error",
exc_info=(traceback))
return False
def get_jobs(self, status=None):
try:
self._connect()
database = self.db.job_manager_pygeoapi
collection = database.jobs
if status is not None:
jobs = list(collection.find({}, {"status": status}))
else:
jobs = list(collection.find({}))
LOGGER.info("JOBMANAGER - MongoDB jobs queried")
return jobs
except Exception:
LOGGER.error("JOBMANAGER - get_jobs error",
exc_info=(traceback))
return False
def add_job(self, job_metadata):
try:
self._connect()
database = self.db.job_manager_pygeoapi
collection = database.jobs
doc_id = collection.insert_one(job_metadata)
LOGGER.info("JOBMANAGER - MongoDB job added")
return doc_id
except Exception:
LOGGER.error("JOBMANAGER - add_job error",
exc_info=(traceback))
return False
def update_job(self, job_id, update_dict):
try:
self._connect()
database = self.db.job_manager_pygeoapi
collection = database.jobs
entry = collection.find_one({"identifier": job_id})
collection.update_one(entry, {"$set": update_dict})
LOGGER.info("JOBMANAGER - MongoDB job updated")
return True
except Exception:
LOGGER.error("JOBMANAGER - MongoDB update_job error",
exc_info=(traceback))
return False
def delete_job(self, job_id):
try:
self._connect()
database = self.db.job_manager_pygeoapi
collection = database.jobs
collection.delete_one({"identifier": job_id})
LOGGER.info("JOBMANAGER - MongoDB job deleted")
return True
except Exception:
LOGGER.error("JOBMANAGER - MongoDB delete_job error",
exc_info=(traceback))
return False
def get_job(self, job_id):
try:
self._connect()
database = self.db.job_manager_pygeoapi
collection = database.jobs
entry = collection.find_one({"identifier": job_id})
LOGGER.info("JOBMANAGER - MongoDB job queried")
return entry
except Exception:
LOGGER.error("JOBMANAGER - MongoDB get_job error",
exc_info=(traceback))
return False
def get_job_result(self, job_id):
try:
self._connect()
database = self.db.job_manager_pygeoapi
collection = database.jobs
entry = collection.find_one({"identifier": job_id})
if entry["status"] != "successful":
LOGGER.info("JOBMANAGER - job not finished or failed")
return (None,)
with open(entry["location"], "r") as file:
data = json.load(file)
LOGGER.info("JOBMANAGER - MongoDB job result queried")
return entry["mimetype"], data
except Exception:
LOGGER.error("JOBMANAGER - MongoDB get_job_result error",
exc_info=(traceback))
return False
def __repr__(self):
return f'<MongoDBManager> {self.name}'
+1 -1
View File
@@ -15,4 +15,4 @@ requests
shapely<2.0
SQLAlchemy<2.0.0
tinydb
unicodecsv
unicodecsv