Adding SQLAlchemy

In the last step, we had seen that db_accessor.py used sqlite3, the SQLite database adapter, to interact with the database. In this section, we will introduce SQLAlchemy, whose core will act as an abstraction layer to connect with the SQLite database.

Before we start making any code changes, we need to install sqlalchemy from PyPI. In your requirements.txt file, add sqlalchemy:

requirements.txt
fastapi
uvicorn[standard]
sqlalchemy
ruff

Since we’ve added a new dependency, we need to install the new module using pip. In a terminal window where your venv is active, run the following:

python3.12 -m pip install -r requirements.txt

Now, we’re ready to start making code changes.

Engine

Let’s create db/base.py in the marketsvc folder to keep the database connection configuration in one place.

Here we create the Engine object, which is the entry point of any SQLAlchemy application. The Engine serves as the main source of DBAPI connections to a given database.

marketsvc/db/base.py
from sqlalchemy import create_engine

engine = create_engine("sqlite+pysqlite:///marketdb", echo=True)

We create the Engine with the create_engine() method. Here, we have passed in a string to create_engine(), which includes all the necessary information required to connect to a database:

  1. Type of database, in our case, sqlite. This instructs SQLAlchemy to use the SQLite dialect, which is a system used to communicate with various kinds of databases and their respective drivers.
  2. DBAPI, in our case, pysqlite. DBAPI (Python Database API Specification) is a driver that SQLAlchemy uses to connect to a database. It’s a “low level” API that lets Python talk to the database.
  3. The name of the database, in our case, marketdb

We’ve also enabled the echo flag, which will log the generated SQL queries by SQLAlchemy. We’ll display these logs to explain what happens behind the scenes.

Lazy Connecting

When create_engine() first returns an Engine object, it will not reach out to the database yet. It will wait for a task to be performed against the database, such as a SELECT query, and then connect to the database, following the lazy initialization design pattern.

Connection

Once our Engine object is ready, it will be used to connect to the database by providing a unit of connectivity called the Connection.

Connection is a proxy object for an actual DBAPI connection, and provides services to execute SQL statements and transaction control, and this is how we’ll interact with the database. The Connection is acquired by Engine.connect().

We don’t want to keep a Connection running indefinitely, and thus, the recommended way to use Connection is with context managers, which will frame the operations inside into a transaction, and will invoke Connection.close() at the end of the block.

Executing Queries

Now that we understand how to connect to the database, let’s look at how to execute queries with SQLAlchemy. For now, we’ll be using raw SQL. Let’s update execute_query() in db_accessor.py to use the Connection context manager:

marketsvc/db_accessor.py
from db.base import engine
from sqlalchemy import text

def execute_query(query, params=None):
    with engine.connect() as conn:
        return conn.execute(text(query), params)

The function get_customers(), for example, in db_accessor.py calls execute_query() as follows:

def get_customers():
    rows = execute_query("SELECT * FROM customer")
    return rows

The statement is executed with the Connection.execute() function, which returns a Result; an iterable object of resulting Rows.

In order to format the response correctly, let’s edit the customers() function in server.py to get the Row objects as dictionaries.

marketsvc/server.py
@app.get("/api/customers")
def customers():
    customers = get_customers()
    return (customer._asdict() for customer in customers)

At this point, the API to fetch customers is updated to use SQLAlchemy. Run the service:

./run.sh run

Now, in another shell, hit the /api/customers endpoint using curl. or use our convenience shell script:

./run.sh customers
SQLAlchemy logs
BEGIN (implicit)
SELECT * FROM customer
[generated in 0.00021s] ()
ROLLBACK

As you can see from the logs, a ROLLBACK was emitted at the end. This marked the of the transaction. An automatic rollback occurs when a connection is closed after use, to ensure that the connection is ‘clean’ for its next use.

What if we wanted to access a specific property from our results? let’s take a look at get_total_cost_of_an_order(order_id). Here we want to access the total value that we’ve computed in our SQL query. How do we do that?

As we learned above, execute_query() returns an iterable of Row objects. We expect a single Row in our Result iterable, so we access it as follows:

def get_total_cost_of_an_order(order_id):
    rows = execute_query(...)
    return rows.one().total
Test Your Understanding

Now that you understand how SELECT queries work in SQLAlchemy, are there any other @app.get endpoints in our service that require updates to their server.py or db_accessor.py handlers? If you are stuck, take a look at the branch step-2-sqlalchemy for a hint!

Parameter Binding

We may want to select specific rows, or insert some data to the table. The Connection.execute() function can accept parameters called bound parameters. We indicate the presense of parameters in the text() construct by using colons, such as :customer_id. We can then send the actual value of these parameters as a dictionary in the second argument of Connection.execute(); for example:

conn.execute(
    text("SELECT * FROM customer WHERE id=:customer_id"),
    {"customer_id": 1},
)
WARNING

Never put variables directly in the query string. Doing so leaves your code vulnerable to SQL injection attacks. Always use parameter binding. Using parameters allows the dialect and DBAPI to correctly handle the input, and enables the driver to have the best performance.

If we want to send multiple sets of parameters, such as insert multiple records in the table, we can pass a list of dictionaries to Connection.execute() and send multiple parameter sets. The SQL statement will be executed once for each parameter set.

Committing Data

In this section, we’ll learn about two commit styles.

Commit As You Go

You might have noticed the absense of the COMMIT statement from the SQLAlchemy logs. If we want to commit some data, we need to explicitly call Connection.commit() inside the block.

Let’s update execute_insert_query() to invoke Connection.commit().

marketsvc/db_accessor.py
def execute_insert_query(query, params):
    with engine.connect() as conn:
        cursor = conn.execute(text(query), params)
        result = cursor.fetchone()
        conn.commit()
        return result

Here, we’ve used the resulting Cursor object from the execute statement to fetch a single result, as we expect our statement to result in exactly one Row.

Therefore, we can update add_new_order_for_customer() to fetch the id of the new order by using .id, as the Row object behaves like a namedtuple:

def add_new_order_for_customer(customer_id, items):
    try:
        new_order_id = execute_insert_query(
            """
            INSERT INTO orders
                (customer_id, order_time)
            VALUES
                (:customer_id, Date('now'))
            RETURNING id
            """,
            {"customer_id": customer_id},
        ).id

        # execute_insert_queries(...)

        return True

Temporarily comment the lines where we call the execute_insert_queries() function. If you were to run ./run.sh neworder to exercise add_new_order_for_customer() at this point, where you insert a row in the orders table, the logs generated by SQLAlchemy would look like as follows:

SQLAlchemy logs:
BEGIN (implicit)
INSERT INTO orders (customer_id, order_time) VALUES (?, Date('now')) RETURNING id
[generated in 0.00041s] (1,)
COMMIT

As you can see, conn.commit() committed the transaction, and the statement COMMIT is logged, as compared to ROLLBACK in the previous example. We can then call conn.commit() for committing additional statements. This style is called commit as you go.

However, if an exception occurs duing the transaction, the changes will be rolled back and a ROLLBACK will be displayed instead.

 

You can see that add_new_order_for_customer() has two INSERT statements. The first adds a new order to the orders table, and the second adds the items & their quantities of the new order into the order_items table, and associates them with the new_order_id we just created.

Next, let’s update execute_insert_queries() to use SQLAlchemy:

def execute_insert_queries(query, params_tuple):
    with engine.connect() as conn:
        conn.execute(text(query), params_tuple)
        conn.commit()
Test Your Understanding

Now, if you uncomment execute_insert_queries() in add_new_order_for_customer() and hit the API again, you should see the generated SQLAlchemy logs for both INSERT statements. Can you predict the output of the logs?

Note here that the difference in the implementation between execute_insert_query() and execute_insert_queries() is that in the former, we fetch results from the cursor.

Because we’ve used the syntax RETURNING id in the first INSERT statement, SQLAlchemy expects the results to be fetched before the connection.commit() is called. If you were to skip the call to result = cursor.fetchone() when you execute the first INSERT statement, SQLAlchemy would throw an exception:

sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) cannot commit transaction - SQL statements in progress

Begin Once

Another way to commit data is to use the context manager Engine.begin() instead of Engine.connect(). It will declare the whole block to be one transcation block, and will enclose everything inside the transaction with one COMMIT at the end. This method is called begin once.

with engine.begin() as conn:
    result = conn.execute(text("INSERT INTO orders..."), params)
SQLAlchemy logs
BEGIN (implicit)
INSERT INTO orders ...
[generated in 0.00041s] (...)
COMMIT

Notice there is a COMMIT without explicitly writing conn.commit().

 

Now that we’ve added SQLAlchemy, let’s eliminate raw SQL text and introduce ORMs!

Kudos!

🙌 You have now reached the step-2-sqlalchemy part of the tutorial. If not, checkout that branch and continue from there:

git checkout step-2-sqlalchemy

 

results matching ""

    No results matching ""