From c607be7ed2a35fe2bc9b2718dc70c2b028a38ae8 Mon Sep 17 00:00:00 2001 From: Moritz Langer <31667222+Moritz-Langer@users.noreply.github.com> Date: Fri, 21 Jun 2024 13:15:34 +0200 Subject: [PATCH] Oracle Connection Pooling implementation of issues/1655 (#1688) * Implemented Connection Pooling First part of collection pooling, improve by making pool size and other params configurable in YAML file of pygeoapi * updated session pooling with more robust creation of dsn * Further changes for releasing connections * further little tweaks * Added Thread Lock and building of DSN * Session Pooling now only happens when ENV VAR ORACLE_POOL_MIN and ORACLE_POOL_MAX are set. Else usual behaviour is restored and single connections are opened and closed. Added a Lock for avoiding race conditions and threat safety. * Flake8 changes * More Flake8 * Added integer type to env variables * Added Tests for Session Pooling * Flake 8 changes for tests * More Flake8 fun * Fixed typo for env vars * Fixed typo for env var and added noqa for variables * More Typos * More Flake8 * Added documentation to the oracle section for session pooling * Review and Feedback with @totycro * Update oracle.py Added Author * Added feedback from review --------- Co-authored-by: root Co-authored-by: xlanger --- .../data-publishing/ogcapi-features.rst | 15 + pygeoapi/provider/oracle.py | 259 +++++++++++------- tests/test_oracle_provider.py | 43 ++- 3 files changed, 220 insertions(+), 97 deletions(-) diff --git a/docs/source/data-publishing/ogcapi-features.rst b/docs/source/data-publishing/ogcapi-features.rst index 7c98631..4d87ab1 100644 --- a/docs/source/data-publishing/ogcapi-features.rst +++ b/docs/source/data-publishing/ogcapi-features.rst @@ -387,6 +387,21 @@ Extra properties Extra properties is a list of strings which are added as fields for data retrieval in the SELECT clauses. They can be used to return expressions computed by the database. +Session Pooling +"""""""""""""""" + +Configured using environment variables. + +.. code-block:: bash + + export ORACLE_POOL_MIN=2 + export ORACLE_POOL_MAX=10 + + +The ``ORACLE_POOL_MIN`` and ``ORACLE_POOL_MAX`` environment variables are used to trigger session pool creation in the Oracle Provider and the ``DatabaseConnection`` class. See https://python-oracledb.readthedocs.io/en/latest/api_manual/module.html#oracledb.create_pool for documentation of the ``create_pool`` function. + +If none or only one of the environment variables is set, session pooling will not be activated and standalone connections are established at every request. + Custom SQL Manipulator Plugin """"""""""""""""""""""""""""" diff --git a/pygeoapi/provider/oracle.py b/pygeoapi/provider/oracle.py index c775971..640a0a9 100644 --- a/pygeoapi/provider/oracle.py +++ b/pygeoapi/provider/oracle.py @@ -1,9 +1,10 @@ # ================================================================= # # Authors: Andreas Kosubek +# Authors: Moritz Langer # # Copyright (c) 2023 Andreas Kosubek -# +# Copyright (c) 2024 Moritz Langer # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation # files (the "Software"), to deal in the Software without @@ -30,9 +31,12 @@ import importlib import json import logging +import os +import threading +from typing import Optional + import oracledb import pyproj -from typing import Optional from pygeoapi.api import DEFAULT_STORAGE_CRS @@ -54,6 +58,27 @@ class DatabaseConnection: """Database connection class to be used as 'with' statement. The class returns a connection object. """ + pool = None # Class-level connection pool + lock = threading.Lock() + + @classmethod + def create_pool(cls, conn_dict, oracle_pool_min, oracle_pool_max): + """Initialize the connection pool for the class + Lock is implemented before function call at __init__""" + dsn = cls._make_dsn(conn_dict) + # Create the pool + + p = oracledb.create_pool( + user=conn_dict["user"], + password=conn_dict["password"], + dsn=dsn, + min=oracle_pool_min, + max=oracle_pool_max, + increment=1, + ) + LOGGER.debug("Connection pool created successfully.") + + return p def __init__(self, conn_dic, table, properties=[], context="query"): """ @@ -88,106 +113,137 @@ class DatabaseConnection: ) self.properties = [item.lower() for item in properties] self.fields = {} # Dict of columns. Key is col name, value is type - self.conn = None + oracle_pool_min = int(os.environ.get('ORACLE_POOL_MIN', 0)) + oracle_pool_max = int(os.environ.get('ORACLE_POOL_MAX', 0)) + # Initialize the connection pool if it hasn't been initialized + if oracle_pool_min and oracle_pool_max: + LOGGER.debug("Found environment variables for session pooling:") + LOGGER.debug(f"ORACLE_POOL_MIN: {oracle_pool_min}") + LOGGER.debug(f"ORACLE_POOL_MAX: {oracle_pool_max}") + with DatabaseConnection.lock: + if DatabaseConnection.pool is None: + LOGGER.debug(f"self.conn_dict contains {self.conn_dict}") + DatabaseConnection.pool = DatabaseConnection.create_pool( + self.conn_dict, oracle_pool_min, oracle_pool_max + ) + LOGGER.debug( + "Initialized conneciton pool with " + f"{DatabaseConnection.pool.max} connections" + ) + + @staticmethod + def _make_dsn(conn_dict): + if conn_dict.get("init_oracle_client", False): + oracledb.init_oracle_client() + + # Connect with tnsnames.ora entry and Login with Oracle Wallet + if conn_dict.get("external_auth") == "wallet": + LOGGER.debug( + "Oracle connect with tnsnames.ora entry \ + and login with Oracle Wallet" + ) + if "tns_name" not in conn_dict: + raise ProviderConnectionError( + "tns_name must be set for external authentication!" + ) + + dsn = conn_dict["tns_name"] + + # Connect with SERVICE_NAME + if "service_name" in conn_dict: + LOGGER.debug( + f"Oracle connect with service_name: \ + {conn_dict['service_name']}" + ) + + if "host" not in conn_dict: + raise ProviderConnectionError( + "Host must be set for connection with service_name!" + ) + + dsn = oracledb.makedsn( + conn_dict["host"], + conn_dict.get("port", 1521), + service_name=conn_dict["service_name"], + ) + + # Connect with SID + elif "sid" in conn_dict: + LOGGER.debug( + f"Oracle connect with sid: {conn_dict['sid']}" + ) + + if "host" not in conn_dict: + raise ProviderConnectionError( + "Host must be set for connection with sid!" + ) + + dsn = oracledb.makedsn( + conn_dict["host"], + conn_dict.get("port", 1521), + sid=conn_dict["sid"], + ) + + # Connect with tnsnames.ora entry + elif "tns_name" in conn_dict: + LOGGER.debug( + f"Oracle connect with tns_name: \ + {conn_dict['tns_name']}" + ) + dsn = conn_dict["tns_name"] + + else: + raise ProviderConnectionError( + "One of service_name, sid or tns_name must be specified!" + ) + + LOGGER.debug(f"Oracle DSN string: {dsn}") + + return dsn def __enter__(self): + """Acquires a connection from the pool.""" try: - if self.conn_dict.get("init_oracle_client", False): - oracledb.init_oracle_client() - - # Connect with tnsnames.ora entry and Login with Oracle Wallet - if self.conn_dict.get("external_auth") == "wallet": - LOGGER.debug( - "Oracle connect with tnsnames.ora entry \ - and login with Oracle Wallet" - ) - - if "tns_name" not in self.conn_dict: - raise ProviderConnectionError( - "tns_name must be set for external authentication!" - ) - - dsn = self.conn_dict["tns_name"] - - # Connect with SERVICE_NAME - if "service_name" in self.conn_dict: - LOGGER.debug( - f"Oracle connect with service_name: \ - {self.conn_dict['service_name']}" - ) - - if "host" not in self.conn_dict: - raise ProviderConnectionError( - "Host must be set for connection with service_name!" - ) - - dsn = oracledb.makedsn( - self.conn_dict["host"], - self.conn_dict.get("port", 1521), - service_name=self.conn_dict["service_name"], - ) - - # Connect with SID - elif "sid" in self.conn_dict: - LOGGER.debug( - f"Oracle connect with sid: {self.conn_dict['sid']}" - ) - - if "host" not in self.conn_dict: - raise ProviderConnectionError( - "Host must be set for connection with sid!" - ) - - dsn = oracledb.makedsn( - self.conn_dict["host"], - self.conn_dict.get("port", 1521), - sid=self.conn_dict["sid"], - ) - - # Connect with tnsnames.ora entry - elif "tns_name" in self.conn_dict: - LOGGER.debug( - f"Oracle connect with tns_name: \ - {self.conn_dict['tns_name']}" - ) - dsn = self.conn_dict["tns_name"] - + if DatabaseConnection.pool: + self.conn = DatabaseConnection.pool.acquire() + LOGGER.debug("Connection acquired from pool .") + LOGGER.debug(f"Connection from pool is {self.conn}.") else: - raise ProviderConnectionError( - "One of service_name, sid or tns_name must be specified!" - ) + dsn = self._make_dsn(self.conn_dict) + LOGGER.debug(f"Created dsn for single connection with params: {dsn}") # noqa + # Connect with tnsnames.ora entry and Login with Oracle Wallet # noqa + if self.conn_dict.get("external_auth") == "wallet": + self.conn = oracledb.connect(externalauth=True, dsn=dsn) - LOGGER.debug(f"Oracle DSN string: {dsn}") + # Connect with tnsnames.ora entry, + # TNS_ADMIN is set via configuration + if "tns_admin" in self.conn_dict: + self.conn = oracledb.connect( + user=self.conn_dict["user"], + password=self.conn_dict["password"], + dsn=dsn, + config_dir=self.conn_dict["tns_admin"], + ) - # Connect with tnsnames.ora entry and Login with Oracle Wallet - if self.conn_dict.get("external_auth") == "wallet": - self.conn = oracledb.connect(externalauth=True, dsn=dsn) - - # Connect with tnsnames.ora entry, - # TNS_ADMIN is set via configuration - if "tns_admin" in self.conn_dict: - self.conn = oracledb.connect( - user=self.conn_dict["user"], - password=self.conn_dict["password"], - dsn=dsn, - config_dir=self.conn_dict["tns_admin"], - ) - - # Connect with user / password via dsn string - # When dsn is a TNS name, the environment variable TNS_ADMIN must - # be set (Path to tnsnames.ora file) - else: - self.conn = oracledb.connect( - user=self.conn_dict["user"], - password=self.conn_dict["password"], - dsn=dsn, - ) + # Connect with user / password via dsn string + # When dsn is a TNS name, the environment variable TNS_ADMIN must # noqa + # be set (Path to tnsnames.ora file) + else: + self.conn = oracledb.connect( + user=self.conn_dict["user"], + password=self.conn_dict["password"], + dsn=dsn, + ) except oracledb.DatabaseError as e: - LOGGER.error( - f"Couldn't connect to Oracle using:{str(self.conn_dict)}" - ) - LOGGER.error(e) + if DatabaseConnection.pool: + LOGGER.error("Couldn't acquire a connection from the pool.") + LOGGER.error(e) + else: + LOGGER.error( + f"Couldn't connect to Oracle using:{str(self.conn_dict)}" + ) + LOGGER.error(e) raise ProviderConnectionError(e) # Check if table name has schema/owner inside @@ -225,8 +281,19 @@ class DatabaseConnection: return self def __exit__(self, exc_type, exc_val, exc_tb): - # some logic to commit/rollback - self.conn.close() + """ + Releases the connection back to the pool. + """ + try: + if DatabaseConnection.pool: + DatabaseConnection.pool.release(self.conn) + LOGGER.debug("Connection released back to pool.") + else: + self.conn.close() + LOGGER.debug("Single Connection closed") + except oracledb.DatabaseError as e: + LOGGER.error("Error closing the connection.") + LOGGER.error(e) def _get_table_columns(self, schema, table): """ diff --git a/tests/test_oracle_provider.py b/tests/test_oracle_provider.py index d807552..d84b298 100644 --- a/tests/test_oracle_provider.py +++ b/tests/test_oracle_provider.py @@ -33,7 +33,7 @@ import os import pytest from pygeoapi.provider.base import ProviderInvalidQueryError -from pygeoapi.provider.oracle import OracleProvider +from pygeoapi.provider.oracle import OracleProvider, DatabaseConnection USERNAME = os.environ.get("PYGEOAPI_ORACLE_USER", "geo_test") PASSWORD = os.environ.get("PYGEOAPI_ORACLE_PASSWD", "geo_test") @@ -148,6 +148,20 @@ def config(): } +@pytest.fixture() +def config_db_conn(): + return { + "conn_dic": { + "host": HOST, + "port": PORT, + "service_name": SERVICE_NAME, + "user": USERNAME, + "password": PASSWORD, + }, + "table": "lakes", + } + + @pytest.fixture() def config_public_synonym(): return { @@ -616,3 +630,30 @@ def test_query_mandatory_properties_must_be_specified(config): p = OracleProvider(config) with pytest.raises(ProviderInvalidQueryError): p.query(properties=[("id", "123")]) + + +@pytest.fixture() +def database_connection_pool(config_db_conn): + os.environ["ORACLE_POOL_MIN"] = "2" # noqa: F841 + os.environ["ORACLE_POOL_MAX"] = "10" # noqa: F841 + yield + if 'ORACLE_POOL_MIN' in os.environ: + del os.environ["ORACLE_POOL_MIN"] + if 'ORACLE_POOL_MAX' in os.environ: + del os.environ["ORACLE_POOL_MAX"] + + +def test_oracle_pool(config_db_conn, database_connection_pool): + """ + Test whether an oracle session pool is created when there are + the required env variables. + """ + db_conn = DatabaseConnection(**config_db_conn) + assert db_conn.pool + assert db_conn.pool.max == int(os.environ.get("ORACLE_POOL_MAX")) + + +def test_query_pool(config, database_connection_pool): + """Test query using a DB Session Pool for a valid JSON object with geometry""" # noqa + # Run query test again with session pool + test_query(config)