SQL Alchemy Performance Considerations
SQL Alchemy Performance Considerations
Query
The example table is around 7 million observations, 50+ columns, including 160+ account names, and suppose we want to get the distinct account names. The data model we have defined is the ConsumerLoanStatic
with table name consumer_loan_static
.
Use text()
and write the raw SQL
from sqlalchemy import text
# Execute raw SQL using SQLAlchemy
query = text("SELECT DISTINCT account_name FROM consumer_loan_static")
result = session.execute(query).fetchall()
A more pythonic way is to
session.query(ConsumerLoanStatic.account_name).distinct().all()
Use Query in Pandas
pd.read_sql_query("SELECT DISTINCT account_name FROM consumer_loan_static", engine)['account_name'].tolist()
What you should NOT do is the following, this is to query the whole table and then reduce the duplicates according to the account_name
. The returned values are a list of records (with all columns in each record), which is unnecessary if we just want to get a list of names.
session.query(ConsumerLoanStatic).distinct(ConsumerLoanStatic.account_name).all()
Turn a SQLAlchemy query to a statement consumed by the the pd.read_sql()
.
query = session.query(ConsumerLoanStatic)
# Actual sql statement is
str(query.statement)
# to turn the results to a dataframe
# session.bind is actually the engine object
df = pd.read_sql(query.statement, session.bind)
Insert
Here, we illustrate a use case where we have large data frames that needs to be inserted to a table. Instead of validating the data before insertion, we would like to use the constraints of the table to reject.
Dialects
For Postgres, there is an ON CONFLICT
statement, and a on_conflict_do_update()
function can alternatively update the existing record when a composite of given unique columns are found conflicted. Please check the on_conflict_do_nothing()
as well.
class DataFrameChunkProcessor:
def __init__(self, engine, model, chunk_size):
self.engine = engine
self.model = model
self.table = model.__table__
self.chunk_size = chunk_size
self.session = Session(engine)
def upsert_whole_dataframe(self, df, unique_cols):
total_rows = df.shape[0]
for start in tqdm(range(0, total_rows, self.chunk_size)):
end = min(start + self.chunk_size, total_rows)
# NOTE: we can work on the view of the dataframe, no need to create a new dataframe
chunk_data_dict = df.iloc[start:end].to_dict(orient='records')
self.upsert_chunk(chunk_data_dict, unique_cols)
def upsert_chunk(self, data_dict, unique_cols):
insert_stmt = insert(self.table).values(data_dict)
update_dict = {col.name: col for col in insert_stmt.excluded if col.name not in unique_cols}
on_conflict_stmt = insert_stmt.on_conflict_do_update(index_elements=unique_cols, set_=update_dict)
# change this to on conflict do nothing
# on_conflict_stmt = insert_stmt.on_conflict_do_nothing(index_elements=unique_cols)
self.session.execute(on_conflict_stmt)
self.session.commit()
Generic
For other cases, maybe the following version works better (I did not find better solutions). This can sometimes be faster, as we first try bulk update as many as records as possible, and rely on the error handling. If a bulk is rejected, then we try to commit insertions individually.
One thing to notice here is to use a new session for each bulk. The increases stability because when a transaction has an error we need to rollback()
before we do the next commit.
"""
Description: This script contains the class DataFrameChunkProcessor, which is used to insert a dataframe into a database table in chunks.
Date: 2024-07-28
"""
import pandas as pd
from tqdm import tqdm
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
class DataFrameChunkProcessor:
def __init__(self, engine, model, chunk_size):
self.engine = engine
# model is the class that represents the table
self.model = model
# table is the sqlalchemy table object
self.table = model.__table__
self.chunk_size = chunk_size
self.Session = scoped_session(sessionmaker(bind=engine))
def insert_whole_dataframe(self, df):
"""
Insert a whole dataframe into the database.
Parameters
----------
df: pd.DataFrame
The dataframe to be inserted into the database
"""
total_rows = df.shape[0]
for start in tqdm(range(0, total_rows, self.chunk_size)):
end = min(start + self.chunk_size, total_rows)
# NOTE: we can work on the view of the dataframe, no need to copy a new dataframe
chunk_data_dict = df.iloc[start:end].to_dict(orient='records')
self.insert_chunk(chunk_data_dict)
def insert_chunk(self, data_dict):
"""
Insert a chunk of data into the database.
Note: We leverage the bulk_insert_mappings method, and when conflict occurs, we fall back to individual inserts.
Args:
data_dict: list of dictionaries, each dictionary represents a row to be inserted
"""
with self.Session() as session:
try:
session.bulk_insert_mappings(self.model, data_dict)
session.commit()
except Exception as e:
session.rollback()
self.insert_individual(session, data_dict)
def insert_individual(self, session, data_dict):
"""
Insert an individual row of data into the database.
"""
for data in data_dict:
try:
insert_stmt = self.table.insert().values(data)
session.execute(insert_stmt)
session.commit()
except Exception as e:
session.rollback()
Table Enhancements
Create indexing for a table can also improves the query performance. The trade-off is that insertion will be slower.
Unique constraint can be used to replace the primary key.
from sqlalchemy import Column, String, Integer, UniqueConstraint, Index, Date
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Student(Base):
__tablename__ = 'students'
id = Column(Integer, primary_key=True, autoincrement=True)
first_name = Column(String(50))
last_name = Column(String(50))
date_of_birth = Column(Date)
address = Column(String(100))
score = Column(Integer)
__table_args__ = (
UniqueConstraint('first_name', 'last_name', 'date_of_birth', name='student_unique_constraint'),
Index('student_name_index', 'first_name', 'last_name')
)