from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL
from sqlalchemy.orm import sessionmaker
[docs]class SqlClient:
"""
This class is a convenience wrapper around SQLAlchemy that makes it easy
to get an ``engine`` object to run queries against any SQL backend.
The official SQLAlchemy documentation page on engines, connections, transactions,
sessions, etc. is very good:
https://docs.sqlalchemy.org/en/13/core/connections.html
Using this class, you have 2 options for running queries:
1. You can use the `run_query` method to execute queries like a typical client
2. You can get an engine or session and do ORM style queries
Usage:
(1)
SQLAlchemy requires underlying database drivers and python libraries to be installed.
For example, to connect to MySQL, you would need to install the `PyMySQL` package and
mysql driver binaries in the project that uses this class.
.. code:: text
# python dependencies
postgres : pip install psycopg2-binary
mysql : pip install pymysql
snowflake : pip install snowflake-connector-python snowflake-sqlalchemy
(2)
One of the big advantages of SQLAlchemy is that it allows you to use SQLite for writing
unit tests. Sadly, not all queries are testable in this way because of SQLite's limitations.
The following are some of the bigger limitations:
SQLite does not support
1. ``RETURNING`` clauses on ``INSERT`` statements
2. ``UPDATE`` statements with multiple-table criteria
"""
def __init__(
self,
sql_dialect=None,
driver_lib=None,
username=None,
password=None,
host=None,
port=None,
database=None,
engine_kwargs=dict(),
):
"""
Note that the default sqlite connection string "sqlite://" is treated as
"sqlite:///:memory:", so you can specify sql_dialect="sqlite" and leave
all other parameters as default to interact with an in memory database for testing.
Args:
sql_dialect (str): "sqlite", "postgresql", "mysql", "snowflake", etc. The type of
SQL database being accessed
driver_lib (str): Optional - The python library used by SQLAlchemy to handle SQL queries
For example "psycopg2" or "pymysql"
username (str): database username
password (str): database password
host (str): database DNS name or IP address
port (int): port to on which to connect to the database e.g.
database (str): Optional - specific database/schema to connect to
engine_kwargs (dict): Optional - additional configuration passed to the SQLAlchemy engine
This includes dialect specific settings
"""
# save init params
self.sql_dialect = sql_dialect
self.driver_lib = driver_lib
self.username = username
self.password = password
self.host = host
self.port = port
self.database = database
# select a default SQL library if driver_lib is unspecified
if not driver_lib:
self.driver_lib = {
"sqlite": None,
"postgresql": "psycopg2", # pip install psycopg2-binary
"mysql": "pymysql", # pip install pymysql
"snowflake": None, # pip install snowflake-connector-python snowflake-sqlalchemy
}.get(sql_dialect)
# this can be of the form: "postgresql+psycopg2" or just "snowflake"
drivername = f"{self.sql_dialect}+{self.driver_lib}" if self.driver_lib else self.sql_dialect
url_params = {
"drivername": drivername,
"username": self.username,
"password": self.password,
"host": self.host,
"port": self.port,
"database": self.database,
}
self.connection_string = str(URL(**url_params))
# SQLAlchemy best practice is to have 1 engine instance per application lifecycle
self.engine = create_engine(self.connection_string, **engine_kwargs)
# prepare a sessionmaker to make sessions available
self.Session = sessionmaker(bind=self.engine)
def get_session(self):
return self.Session()
[docs] def run_query(self, query, *query_args, commit=True, fetch_all=False, return_raw_result=False):
"""
Execute a SQL query.
Args:
query (str | sqlalchemy expression): a SQL expression expressed as a string or SQLAlchemy string object.
query_args (object): any values that are to be inserted in some way into the query
commit (bool): whether or not to commit changes to the database
fetch_all (bool): returns a ResultProxy (array of ResultRowProxy) objects if True which can be treated as an array of Row results. Otherwise returns a single ResultRowProxy or None if no results are found.
return_raw_results (bool): Set to True to skip fetching the query results. This is useful for non-select statements such as INSERT, UPDATE, CREATE, etc. which do not typically return anything.
Returns:
:py:class:`sqlalchemy.engine.result.ResultProxy`: This is a very useful object, with attributes like
- ``is_insert`` for insert statements
- ``rowcount`` for number of inserted/updated rows
- ``lastrowid`` for the primary key of the last inserted row. See this link for the full API: https://www.kite.com/python/docs/sqlalchemy.engine.ResultProxy
"""
to_return = None
try:
# the context manager automatically closes the connection and rolls back on exeptions
with self.engine.connect() as connection:
with connection.begin() as transaction:
# execute the query
result = connection.execute(query, query_args)
if return_raw_result:
# return the sqlalchemy.engine.result.ResultProxy object
to_return = result
else:
# default to fetching the results
to_return = result.fetchall() if fetch_all else result.fetchone()
# commit or rollback
if commit:
transaction.commit()
else:
transaction.rollback()
except Exception as query_run_error:
print(query_run_error)
to_return = {"Error": "Failed to run query due to - {}".format(query_run_error)}
return to_return
if __name__ == "__main__":
# example init
sql_client = SqlClient(
sql_dialect="mysql",
driver_lib="pymysql",
username="root",
password="password",
host="localhost",
port=3309,
# database="bulk_scoring"
)
print(sql_client.connection_string)