Oracle Connection Pooling implementation of issues/1655 (#1688)
* Implemented Connection Pooling First part of collection pooling, improve by making pool size and other params configurable in YAML file of pygeoapi * updated session pooling with more robust creation of dsn * Further changes for releasing connections * further little tweaks * Added Thread Lock and building of DSN * Session Pooling now only happens when ENV VAR ORACLE_POOL_MIN and ORACLE_POOL_MAX are set. Else usual behaviour is restored and single connections are opened and closed. Added a Lock for avoiding race conditions and threat safety. * Flake8 changes * More Flake8 * Added integer type to env variables * Added Tests for Session Pooling * Flake 8 changes for tests * More Flake8 fun * Fixed typo for env vars * Fixed typo for env var and added noqa for variables * More Typos * More Flake8 * Added documentation to the oracle section for session pooling * Review and Feedback with @totycro * Update oracle.py Added Author * Added feedback from review --------- Co-authored-by: root <root@els01entw.ama.at> Co-authored-by: xlanger <moritz.langer@ama.gv.at>
This commit is contained in:
@@ -387,6 +387,21 @@ Extra properties
|
||||
Extra properties is a list of strings which are added as fields for data retrieval in the SELECT clauses. They
|
||||
can be used to return expressions computed by the database.
|
||||
|
||||
Session Pooling
|
||||
""""""""""""""""
|
||||
|
||||
Configured using environment variables.
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
export ORACLE_POOL_MIN=2
|
||||
export ORACLE_POOL_MAX=10
|
||||
|
||||
|
||||
The ``ORACLE_POOL_MIN`` and ``ORACLE_POOL_MAX`` environment variables are used to trigger session pool creation in the Oracle Provider and the ``DatabaseConnection`` class. See https://python-oracledb.readthedocs.io/en/latest/api_manual/module.html#oracledb.create_pool for documentation of the ``create_pool`` function.
|
||||
|
||||
If none or only one of the environment variables is set, session pooling will not be activated and standalone connections are established at every request.
|
||||
|
||||
|
||||
Custom SQL Manipulator Plugin
|
||||
"""""""""""""""""""""""""""""
|
||||
|
||||
+163
-96
@@ -1,9 +1,10 @@
|
||||
# =================================================================
|
||||
#
|
||||
# Authors: Andreas Kosubek <andreas.kosubek@ama.gv.at>
|
||||
# Authors: Moritz Langer <moritz.b.langer@gmail.com>
|
||||
#
|
||||
# Copyright (c) 2023 Andreas Kosubek
|
||||
#
|
||||
# Copyright (c) 2024 Moritz Langer
|
||||
# Permission is hereby granted, free of charge, to any person
|
||||
# obtaining a copy of this software and associated documentation
|
||||
# files (the "Software"), to deal in the Software without
|
||||
@@ -30,9 +31,12 @@
|
||||
import importlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
from typing import Optional
|
||||
|
||||
import oracledb
|
||||
import pyproj
|
||||
from typing import Optional
|
||||
|
||||
from pygeoapi.api import DEFAULT_STORAGE_CRS
|
||||
|
||||
@@ -54,6 +58,27 @@ class DatabaseConnection:
|
||||
"""Database connection class to be used as 'with' statement.
|
||||
The class returns a connection object.
|
||||
"""
|
||||
pool = None # Class-level connection pool
|
||||
lock = threading.Lock()
|
||||
|
||||
@classmethod
|
||||
def create_pool(cls, conn_dict, oracle_pool_min, oracle_pool_max):
|
||||
"""Initialize the connection pool for the class
|
||||
Lock is implemented before function call at __init__"""
|
||||
dsn = cls._make_dsn(conn_dict)
|
||||
# Create the pool
|
||||
|
||||
p = oracledb.create_pool(
|
||||
user=conn_dict["user"],
|
||||
password=conn_dict["password"],
|
||||
dsn=dsn,
|
||||
min=oracle_pool_min,
|
||||
max=oracle_pool_max,
|
||||
increment=1,
|
||||
)
|
||||
LOGGER.debug("Connection pool created successfully.")
|
||||
|
||||
return p
|
||||
|
||||
def __init__(self, conn_dic, table, properties=[], context="query"):
|
||||
"""
|
||||
@@ -88,106 +113,137 @@ class DatabaseConnection:
|
||||
)
|
||||
self.properties = [item.lower() for item in properties]
|
||||
self.fields = {} # Dict of columns. Key is col name, value is type
|
||||
self.conn = None
|
||||
oracle_pool_min = int(os.environ.get('ORACLE_POOL_MIN', 0))
|
||||
oracle_pool_max = int(os.environ.get('ORACLE_POOL_MAX', 0))
|
||||
# Initialize the connection pool if it hasn't been initialized
|
||||
if oracle_pool_min and oracle_pool_max:
|
||||
LOGGER.debug("Found environment variables for session pooling:")
|
||||
LOGGER.debug(f"ORACLE_POOL_MIN: {oracle_pool_min}")
|
||||
LOGGER.debug(f"ORACLE_POOL_MAX: {oracle_pool_max}")
|
||||
with DatabaseConnection.lock:
|
||||
if DatabaseConnection.pool is None:
|
||||
LOGGER.debug(f"self.conn_dict contains {self.conn_dict}")
|
||||
DatabaseConnection.pool = DatabaseConnection.create_pool(
|
||||
self.conn_dict, oracle_pool_min, oracle_pool_max
|
||||
)
|
||||
LOGGER.debug(
|
||||
"Initialized conneciton pool with "
|
||||
f"{DatabaseConnection.pool.max} connections"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _make_dsn(conn_dict):
|
||||
if conn_dict.get("init_oracle_client", False):
|
||||
oracledb.init_oracle_client()
|
||||
|
||||
# Connect with tnsnames.ora entry and Login with Oracle Wallet
|
||||
if conn_dict.get("external_auth") == "wallet":
|
||||
LOGGER.debug(
|
||||
"Oracle connect with tnsnames.ora entry \
|
||||
and login with Oracle Wallet"
|
||||
)
|
||||
if "tns_name" not in conn_dict:
|
||||
raise ProviderConnectionError(
|
||||
"tns_name must be set for external authentication!"
|
||||
)
|
||||
|
||||
dsn = conn_dict["tns_name"]
|
||||
|
||||
# Connect with SERVICE_NAME
|
||||
if "service_name" in conn_dict:
|
||||
LOGGER.debug(
|
||||
f"Oracle connect with service_name: \
|
||||
{conn_dict['service_name']}"
|
||||
)
|
||||
|
||||
if "host" not in conn_dict:
|
||||
raise ProviderConnectionError(
|
||||
"Host must be set for connection with service_name!"
|
||||
)
|
||||
|
||||
dsn = oracledb.makedsn(
|
||||
conn_dict["host"],
|
||||
conn_dict.get("port", 1521),
|
||||
service_name=conn_dict["service_name"],
|
||||
)
|
||||
|
||||
# Connect with SID
|
||||
elif "sid" in conn_dict:
|
||||
LOGGER.debug(
|
||||
f"Oracle connect with sid: {conn_dict['sid']}"
|
||||
)
|
||||
|
||||
if "host" not in conn_dict:
|
||||
raise ProviderConnectionError(
|
||||
"Host must be set for connection with sid!"
|
||||
)
|
||||
|
||||
dsn = oracledb.makedsn(
|
||||
conn_dict["host"],
|
||||
conn_dict.get("port", 1521),
|
||||
sid=conn_dict["sid"],
|
||||
)
|
||||
|
||||
# Connect with tnsnames.ora entry
|
||||
elif "tns_name" in conn_dict:
|
||||
LOGGER.debug(
|
||||
f"Oracle connect with tns_name: \
|
||||
{conn_dict['tns_name']}"
|
||||
)
|
||||
dsn = conn_dict["tns_name"]
|
||||
|
||||
else:
|
||||
raise ProviderConnectionError(
|
||||
"One of service_name, sid or tns_name must be specified!"
|
||||
)
|
||||
|
||||
LOGGER.debug(f"Oracle DSN string: {dsn}")
|
||||
|
||||
return dsn
|
||||
|
||||
def __enter__(self):
|
||||
"""Acquires a connection from the pool."""
|
||||
try:
|
||||
if self.conn_dict.get("init_oracle_client", False):
|
||||
oracledb.init_oracle_client()
|
||||
|
||||
# Connect with tnsnames.ora entry and Login with Oracle Wallet
|
||||
if self.conn_dict.get("external_auth") == "wallet":
|
||||
LOGGER.debug(
|
||||
"Oracle connect with tnsnames.ora entry \
|
||||
and login with Oracle Wallet"
|
||||
)
|
||||
|
||||
if "tns_name" not in self.conn_dict:
|
||||
raise ProviderConnectionError(
|
||||
"tns_name must be set for external authentication!"
|
||||
)
|
||||
|
||||
dsn = self.conn_dict["tns_name"]
|
||||
|
||||
# Connect with SERVICE_NAME
|
||||
if "service_name" in self.conn_dict:
|
||||
LOGGER.debug(
|
||||
f"Oracle connect with service_name: \
|
||||
{self.conn_dict['service_name']}"
|
||||
)
|
||||
|
||||
if "host" not in self.conn_dict:
|
||||
raise ProviderConnectionError(
|
||||
"Host must be set for connection with service_name!"
|
||||
)
|
||||
|
||||
dsn = oracledb.makedsn(
|
||||
self.conn_dict["host"],
|
||||
self.conn_dict.get("port", 1521),
|
||||
service_name=self.conn_dict["service_name"],
|
||||
)
|
||||
|
||||
# Connect with SID
|
||||
elif "sid" in self.conn_dict:
|
||||
LOGGER.debug(
|
||||
f"Oracle connect with sid: {self.conn_dict['sid']}"
|
||||
)
|
||||
|
||||
if "host" not in self.conn_dict:
|
||||
raise ProviderConnectionError(
|
||||
"Host must be set for connection with sid!"
|
||||
)
|
||||
|
||||
dsn = oracledb.makedsn(
|
||||
self.conn_dict["host"],
|
||||
self.conn_dict.get("port", 1521),
|
||||
sid=self.conn_dict["sid"],
|
||||
)
|
||||
|
||||
# Connect with tnsnames.ora entry
|
||||
elif "tns_name" in self.conn_dict:
|
||||
LOGGER.debug(
|
||||
f"Oracle connect with tns_name: \
|
||||
{self.conn_dict['tns_name']}"
|
||||
)
|
||||
dsn = self.conn_dict["tns_name"]
|
||||
|
||||
if DatabaseConnection.pool:
|
||||
self.conn = DatabaseConnection.pool.acquire()
|
||||
LOGGER.debug("Connection acquired from pool .")
|
||||
LOGGER.debug(f"Connection from pool is {self.conn}.")
|
||||
else:
|
||||
raise ProviderConnectionError(
|
||||
"One of service_name, sid or tns_name must be specified!"
|
||||
)
|
||||
dsn = self._make_dsn(self.conn_dict)
|
||||
LOGGER.debug(f"Created dsn for single connection with params: {dsn}") # noqa
|
||||
# Connect with tnsnames.ora entry and Login with Oracle Wallet # noqa
|
||||
if self.conn_dict.get("external_auth") == "wallet":
|
||||
self.conn = oracledb.connect(externalauth=True, dsn=dsn)
|
||||
|
||||
LOGGER.debug(f"Oracle DSN string: {dsn}")
|
||||
# Connect with tnsnames.ora entry,
|
||||
# TNS_ADMIN is set via configuration
|
||||
if "tns_admin" in self.conn_dict:
|
||||
self.conn = oracledb.connect(
|
||||
user=self.conn_dict["user"],
|
||||
password=self.conn_dict["password"],
|
||||
dsn=dsn,
|
||||
config_dir=self.conn_dict["tns_admin"],
|
||||
)
|
||||
|
||||
# Connect with tnsnames.ora entry and Login with Oracle Wallet
|
||||
if self.conn_dict.get("external_auth") == "wallet":
|
||||
self.conn = oracledb.connect(externalauth=True, dsn=dsn)
|
||||
|
||||
# Connect with tnsnames.ora entry,
|
||||
# TNS_ADMIN is set via configuration
|
||||
if "tns_admin" in self.conn_dict:
|
||||
self.conn = oracledb.connect(
|
||||
user=self.conn_dict["user"],
|
||||
password=self.conn_dict["password"],
|
||||
dsn=dsn,
|
||||
config_dir=self.conn_dict["tns_admin"],
|
||||
)
|
||||
|
||||
# Connect with user / password via dsn string
|
||||
# When dsn is a TNS name, the environment variable TNS_ADMIN must
|
||||
# be set (Path to tnsnames.ora file)
|
||||
else:
|
||||
self.conn = oracledb.connect(
|
||||
user=self.conn_dict["user"],
|
||||
password=self.conn_dict["password"],
|
||||
dsn=dsn,
|
||||
)
|
||||
# Connect with user / password via dsn string
|
||||
# When dsn is a TNS name, the environment variable TNS_ADMIN must # noqa
|
||||
# be set (Path to tnsnames.ora file)
|
||||
else:
|
||||
self.conn = oracledb.connect(
|
||||
user=self.conn_dict["user"],
|
||||
password=self.conn_dict["password"],
|
||||
dsn=dsn,
|
||||
)
|
||||
|
||||
except oracledb.DatabaseError as e:
|
||||
LOGGER.error(
|
||||
f"Couldn't connect to Oracle using:{str(self.conn_dict)}"
|
||||
)
|
||||
LOGGER.error(e)
|
||||
if DatabaseConnection.pool:
|
||||
LOGGER.error("Couldn't acquire a connection from the pool.")
|
||||
LOGGER.error(e)
|
||||
else:
|
||||
LOGGER.error(
|
||||
f"Couldn't connect to Oracle using:{str(self.conn_dict)}"
|
||||
)
|
||||
LOGGER.error(e)
|
||||
raise ProviderConnectionError(e)
|
||||
|
||||
# Check if table name has schema/owner inside
|
||||
@@ -225,8 +281,19 @@ class DatabaseConnection:
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
# some logic to commit/rollback
|
||||
self.conn.close()
|
||||
"""
|
||||
Releases the connection back to the pool.
|
||||
"""
|
||||
try:
|
||||
if DatabaseConnection.pool:
|
||||
DatabaseConnection.pool.release(self.conn)
|
||||
LOGGER.debug("Connection released back to pool.")
|
||||
else:
|
||||
self.conn.close()
|
||||
LOGGER.debug("Single Connection closed")
|
||||
except oracledb.DatabaseError as e:
|
||||
LOGGER.error("Error closing the connection.")
|
||||
LOGGER.error(e)
|
||||
|
||||
def _get_table_columns(self, schema, table):
|
||||
"""
|
||||
|
||||
@@ -33,7 +33,7 @@
|
||||
import os
|
||||
import pytest
|
||||
from pygeoapi.provider.base import ProviderInvalidQueryError
|
||||
from pygeoapi.provider.oracle import OracleProvider
|
||||
from pygeoapi.provider.oracle import OracleProvider, DatabaseConnection
|
||||
|
||||
USERNAME = os.environ.get("PYGEOAPI_ORACLE_USER", "geo_test")
|
||||
PASSWORD = os.environ.get("PYGEOAPI_ORACLE_PASSWD", "geo_test")
|
||||
@@ -148,6 +148,20 @@ def config():
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def config_db_conn():
|
||||
return {
|
||||
"conn_dic": {
|
||||
"host": HOST,
|
||||
"port": PORT,
|
||||
"service_name": SERVICE_NAME,
|
||||
"user": USERNAME,
|
||||
"password": PASSWORD,
|
||||
},
|
||||
"table": "lakes",
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def config_public_synonym():
|
||||
return {
|
||||
@@ -616,3 +630,30 @@ def test_query_mandatory_properties_must_be_specified(config):
|
||||
p = OracleProvider(config)
|
||||
with pytest.raises(ProviderInvalidQueryError):
|
||||
p.query(properties=[("id", "123")])
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def database_connection_pool(config_db_conn):
|
||||
os.environ["ORACLE_POOL_MIN"] = "2" # noqa: F841
|
||||
os.environ["ORACLE_POOL_MAX"] = "10" # noqa: F841
|
||||
yield
|
||||
if 'ORACLE_POOL_MIN' in os.environ:
|
||||
del os.environ["ORACLE_POOL_MIN"]
|
||||
if 'ORACLE_POOL_MAX' in os.environ:
|
||||
del os.environ["ORACLE_POOL_MAX"]
|
||||
|
||||
|
||||
def test_oracle_pool(config_db_conn, database_connection_pool):
|
||||
"""
|
||||
Test whether an oracle session pool is created when there are
|
||||
the required env variables.
|
||||
"""
|
||||
db_conn = DatabaseConnection(**config_db_conn)
|
||||
assert db_conn.pool
|
||||
assert db_conn.pool.max == int(os.environ.get("ORACLE_POOL_MAX"))
|
||||
|
||||
|
||||
def test_query_pool(config, database_connection_pool):
|
||||
"""Test query using a DB Session Pool for a valid JSON object with geometry""" # noqa
|
||||
# Run query test again with session pool
|
||||
test_query(config)
|
||||
|
||||
Reference in New Issue
Block a user