Demo

Contributors: Alicia Wang, Conner Swenberg

This demo is not up to date with Fall 2020 as we won't be covering DAO. We recommend you watch the demo video instead.

This demo will introduce you to a new python package, SQLAlchemy, which will help us interact with the database via an object relational mapping. We will also demonstrate how to set up a dao.py file to interface between our application and database code.

File Setup

Write the following code at the top of our db.py file:

db.py
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()

We first need to import our flask_sqlalchemy package and instantiate the SQLAlchemy class. As the name implies, db will represent our database object that we will use to read and write to our database.

Moving over to app.py, let’s first import our necessary models from db.py and initialize our application.

app.py
import json
from flask import Flask, request
import dao
from db import db

# define db filename
db_filename = "todo.db"
app = Flask(__name__)

# setup config
app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{db_filename}"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["SQLALCHEMY_ECHO"] = True

# initialize app
db.init_app(app)
with app.app_context():
    db.create_all()

# generalized response formats
def success_response(data, code=200):
    return json.dumps({"success": True, "data": data}), code

def failure_response(message, code=404):
    return json.dumps({"success": False, "error": message}), code

Recall that our db variable is an instance of the SQLAlchemy class, our database engine. To initialize our application, we will use the init_app() method that takes in a Flask application as an argument. Upon creating the application we will run db.create_all() to create all of our tables as defined by our models.

Tasks

Model Definition

We define tables in our database just like a typical python class:

db.py
class Task(db.Model):    
    __tablename__ = "task"    
    id = db.Column(db.Integer, primary_key=True)    
    description = db.Column(db.String, nullable=False)
    done = db.Column(db.Boolean, nullable=False)

Class definition

For our first class, we will define the structure for our tasks. We will want to subclass off of the SQLAlchemy’s Model class to inherit all of the class properties needed to interact with this table. We also want to define the name of the table SQLAlchemy will create for us with the __tablename__ property (note the double underscores). SQLAlchemy will autogenerate a table name for us if this property is not defined, but explicitly defining the table name can be helpful later on if we have to debug raw SQL commands and need to be sure of what tables correspond with what models.

Column definition

We define columns in our table via creating a new field in the class with db.Column. The column generation method takes in a mandatory argument to indicate the type of the column (all types found in documentation) and a series of optional arguments to define more characteristics of the column.

The first column we wish to define is our unique identifier, id. We will be using an integer like before and SQLAlchemy provides a handy primary_key=True flag to enforce the conditions we wish to set for a primary key, namely uniqueness and non-nullable. We define description and done with a string and boolean type respectively and also enforce that these fields are not nullable. Thinking ahead to how we will want to utilize the ORM for handling requests within our routes, we will need to convert an instance of the Task class into basic types. As mentioned prior, we need to send a standardized data type across HTTP requests (our particular choice is JSON), so let us define a method that helps us convert an object into a python dictionary.

def serialize(self):    
    return {        
        "id": self.id,        
        "description": self.description,        
        "done": self.done,    
    }

This process of converting custom object types into the basic types is known as serialization. Our serialize method simply constructs a python dictionary from our object’s field values.

1. Get all tasks

Our first method is quite basic but introduces us to the first use of our ORM.

app.py
@app.route("/tasks/")
def get_tasks():
    tasks = [t.serialize() for t in Task.query.all()]
    return success_response(tasks)

Querying the database

Our first change is how we query all of our tasks. Before we had a method that used SQLite3 to write direct SQL queries on our database (in this case SELECT * FROM task). Now the ORM provides us a structure to query without directly writing any SQL!

The Task class imported at the top gives us direct access to the task table. SQLAlchemy's .query reference enables us to write queries on our table and is a field for every SQLAlchemy defined class we will use. The .all() method functions exactly like a SELECT * FROM task by imposing no condition on which tasks are returned.

Constructing a return

Now because our task query call returns us Task objects, we need to serialize these into python dictionaries so that they are convertible to a JSON format. For this method and all others defined in the DAO, we want to provide app.py data that is as ready to be sent back into a response as possible which is why we will serialize it here.

This one-line style of looping through an existing list is known as list comprehension and is highly recommended for simple loops like this.

2. Create a task

@app.route("/tasks/", methods=["POST"])
def create_task():
    body = json.loads(request.data)
    new_task = Task(
        description=body.get('description'),
        done=body.get('done', False)
    )
    db.session.add(new_task)
    db.session.commit()
    return success_response(new_task.serialize(), 201)

Construct Task object

First, we want to construct a python object to represent our task. Just like before, we need to parse our incoming request's body for the description data. We use python's built-in .get() method to safely parse the body dictionary. If there does not exist a value with the provided key, the method with return None by default. We can indicate a default value of False for the done variable using a second argument to body.get(). Next we need to use the ORM to convert this object into a real entry in the database.

Saving to the database

Note that we now have a db object which isolates the interaction with our database. Our db object has a field called session to keep track of our current connection to the database (remember how with sqlite3 we used self.conn). Just like with our sqlite3 implementation where we had separate commands for executing the insertion of a new task and the actual saving, or "committing", of a task, the same applies with our ORM. By keeping the addition of new entries to the database and actual committing, we free ourselves to potentially add multiple entries to different tables before accepting the cost of a database save.

Now we can simply return a serialized task back to the client with a 201 response code to indicate the successful creation of a new entry.

3. Retrieve a task

@app.route("/tasks/<int:task_id>/")
def get_task(task_id):
    task = Task.query.filter_by(id=task_id).first()
    if task is None:
        return failure_response("Task not found!")
    return success_response(task.serialize())

Just like before, we design this route to accept an id within the route path. Now we can make use of SQLAlchemy's filter_by method to search for a task with the id field matching our provided task_id. This query will return all tasks that abide this criteria, functionally returning a list of tasks. However, we only want a single task so we call .first() at the end of our query to return the first instance returned in this query. This query will return None if there does not exist any tasks belonging to task_id. We return a failure response if the task does not exist in our table and otherwise return a success response with a serialized task.

4. Update a task

@app.route("/tasks/<int:task_id>/", methods=["POST"])
def update_task(task_id):
    task = Task.query.filter_by(id=task_id).first()
    if task is None:
        return failure_response("Task not found!")
    
    body = json.loads(request.data)
    task.description = body.get('description', task.description)
    task.done = body.get('done', task.done)

    db.session.commit()
    return success_response(task.serialize())

First, we want to query for our task just like before and return if there does not exist said task. After parsing the request's body, we can update our task object's fields like a normal python custom object with direct assignment to the fields task.description and task.done. We get a bit fancy with our use of .get() by placing the current description or done value as the default if no new value is provided in the request. Therefore, absent fields in the body will not change our database. Upon assigning new values to the task, we need to commit these changes and return our serialized object.

5. Delete a task

@app.route("/tasks/<int:task_id>/", methods=["DELETE"])
def delete_task(task_id):
    task = Task.query.filter_by(id=task_id).first()
    if task is None:
        return failure_response("Task not found!")
    db.session.delete(task)
    db.session.commit()
    return success_response(task.serialize())

First, we want to query for our task just like before and return if there does not exist said task. We can use SQLAlchemy's provided delete() method to remove our task from the database. Upon committing our changes, we can return the original task to give back to the client.

Subtasks (One-to-Many Relationship)

Model Definition

Consider that we want to add smaller “subtasks” within each task to break it down to more manageable components. We can begin defining a Subtask model with a One-to-Many relationship with Task as follows:

class Subtask(db.Model):    
    __tablename__ = "subtask"    
    id = db.Column(db.Integer, primary_key=True)    
    description = db.Column(db.String, nullable=False)    
    done = db.Column(db.Boolean, nullable=False)    
    task_id = db.Column(db.Integer, db.ForeignKey("task.id"), nullable=False)

We follow our same guidelines of defining a __tablename__ field and all of our columns. But to create the relationship between a Task and Subtask, we do something special. We define task_id as a column in the table and give it the type of an integer, but we also add the argument db.ForeignKey("task.id"). This part indicates the declaration of a foreign key that references the id field in the task table. To generalize this structure, we define foreign keys with: db.ForeignKey("<tablename>.<field>").

Serialization

Our next step is to define how we wish to serialize a Subtask and how this serialization will interact with our existing serialization for Task. We can first write a straightforward serialization of a Subtask:

def serialize(self):    
    return {        
        "id": self.id,        
        "description": self.description,        
        "done": self.done    
    }

Update Task Model

The second part to defining a one-to-many relationship involves adding some indicator code to the model on the other side of the relationship. Within our Task model we now write:

class Task(db.Model):    
    __tablename__ = "task"    
    id = db.Column(db.Integer, primary_key=True)    
    description = db.Column(db.String, nullable=False)    
    done = db.Column(db.Boolean, nullable=False)    
    # defining the reverse side of the relationship    
    subtasks = db.relationship("Subtask", cascade="delete")

We create a subtasks field that allows us to query from the reverse side of the relationship by defining a db.relationship type. The first argument defines the name of the model class we are referencing (notice how this is Subtask instead the subtask table name). Our second and optional argument, cascade='delete', defines a protocol to handle a deletion of a Task object from the database. Consider we remove a Task with many Subtasks, all of these subtasks now have no parent Task to reference. By default, their task_id field will be set to None, but in the context of our app, we only want Subtasks to exist as nested beneath a main Task. The cascade protocol says to the ORM to delete all related Subtasks upon deleting a Task to preserve this nature of the application.

Update Task Serializer

Now let's consider how we want to access these subtasks from the client’s perspective. We have two options: add a new route for the client to request for the subtasks of a particular task or find a way to return subtask information in the original requests that retrieve Tasks. We will go with the latter due to its simplicity for us and the client. Let us update our Task serializer to include a new field:

def serialize(self):        
    return {            
        "id": self.id,            
        "description": self.description,            
        "done": self.done,            
        # serialize each subtask related to this task            
        "subtasks": [s.serialize() for s in self.subtasks],        
    }

We can use the list comprehension approach as before to serialize all of our subtasks. Our original list, self.subtasks, utilizes our defined relationship to actually make a query and gather our related subtasks. We can then serialize each of these objects in our Task serializer method.

6. Create a subtask

app.py
@app.route("/tasks/<int:task_id>/subtasks/", methods=["POST"])
def create_subtask(task_id):
    task = Task.query.filter_by(id=task_id).first()
    if task is None:
        return failure_response("Task not found!")

    body = json.loads(request.data)
    new_subtask = Subtask(
        description=body.get('description'),
        done=body.get('done'),
        task_id=task_id
    )

    db.session.add(new_subtask)
    db.session.commit()
    return success_response(new_subtask.serialize())

We will create a new subtask by instantiating the Subtask class, just like we did for regular tasks. To complete the addition, we add the newly created subtask to our database, commit the changes and finally return the subtask serialized.

Upon updating our models' structure, we can implement a new route to add subtasks to a parent task:

Route Definition: When creating new subtasks, we want to provide clients with a means to indicate which task to attach it with. Of the many ways for us to do this, we will simply extend our traditional task/task_id/ route structure by adding a subtask/ to the route path. Because we are expecting data to come in our request, we want to accept the POST request method.

Subtask Addition: First, we need to retrieve the task we intend to add this new subtask under and return a failed response if we cannot find it. Then, we can move forward with parsing our request's body, calling our DAO, and returning a success response.

Categories (Many-to-Many Relationship)

Model Definition

We will demonstrate a many-to-many relationship with a categories feature. Any given task can belong to many different category groups and a category can contain many tasks associated with it. We can define a basic implementation as follows:

db.py
class Category(db.Model):
    __tablename__ = "category"
    id = db.Column(db.Integer, primary_key=True)
    description = db.Column(db.String, nullable=False)
    color = db.Column(db.String, nullable=False)

Serialization

Our next step is to define the serialization of a Category with an analogous implementation to our previous models:

def serialize(self):
        return {
            "id": self.id,
            "description": self.description,
            "color": self.color
        }

Create Association Table

As of right now, our Category model is not related to anything. Recall from our Relational Databases lecture that we implement a many-to-many relationship via a "join" or "association" table to connect the foreign key of one table to the foreign key another table. At the top of our db.py file, we can create a new variable to construct a table using db.Table:

association_table = db.Table("association", db.Model.metadata,
    db.Column("task_id", db.Integer, db.ForeignKey("task.id")),
    db.Column("category_id", db.Integer, db.ForeignKey("category.id"))
)

The first argument provided is the table name, which we will call "association". Next we need to provide the metadata of the Model class. All following arguments will be the columns of our table. Our first column we will store the ids of tasks, db.ForeignKey type, and our second column will store the ids of categories, also db.ForeignKey type.

Add Relationship Fields

Now to implement this relationship, we connect our Category model to our association table with a db.relationship field:

class Category(db.Model):
    __tablename__ = "category"
    id = db.Column(db.Integer, primary_key=True)
    description = db.Column(db.String, nullable=False)
    color = db.Column(db.String, nullable=False)
    # define many-to-many relationship by connecting to association table
    tasks = db.relationship("Task", secondary=association_table, back_populates='categories')

The first argument indicates the class name of the model we are relating too, not the table name which is why "Task" is capitalized. Our second argument, secondary, indicates the join table to use. Finally, back_populates indicates to SQLAlchemy the field name on the reverse side of the relationship that will be referencing this relationship. We can see this in our addition to the Task model:

class Task(db.Model):    
    __tablename__ = "task"
    id = db.Column(db.Integer, primary_key=True)    
    description = db.Column(db.String, nullable=False)    
    done = db.Column(db.Boolean, nullable=False)    
    # defining the reverse side of the many-to-one relationship    
    subtasks = db.relationship("Subtask", cascade="delete")
    # defining the reverse side of the many-to-many relationship, connecting to the same join table
    categories = db.relationship("Category", secondary=association_table, back_populates="tasks")

We define the new field categories to match the input of back_populates from the relationship definition in the Category model. We also set the back_populates on this side to the field name we previously defined: "tasks". Our first argument, the class name, will refer to the Category model, and we will refer to the same association_table. It is crucial that the back_populates tags match exactly with the field names across the two models.

Update Task Serializer

Just like with subtasks, we want to serialize categories within a task. We can do this analogously with another new line in our Task serializer:

def serialize(self):        
    return {            
        "id": self.id,            
        "description": self.description,            
        "done": self.done,            
        # serialize each subtask related to this task            
        "subtasks": [s.serialize() for s in self.subtasks], 
        # serialize each category related to this task
        "categories": [c.serialize() for c in self.categories]       
    }

7. Assign category to task

def assign_category(task_id, description, color):
    task = get_task_by_id(task_id)
    if task is None:
        return None

    category = _get_or_create_category(description, color)

    task.categories.append(category)
    db.session.commit()
    return task.serialize()

First, we need to retrieve the task we intend to link to our category and return None if we cannot find it so our route knows to return a failure response. We then can call our custom method to get or create a category from the provided description. To relate this category to our task, we can append it to the task.categories field. Relationship fields in SQLAlchemy can be manipulated as lists which is why we use the .append() method to add to our relationship. Doing so will automatically update a new entry in the previously defined association table between the two models. Now we can commit changes and return the serialized task.

The request body is expected to contain a description field and we will try to find any categories that match this description (we will not be allowing categories of the same description to exist). If there does not exist a category with said description, we will create a new Category object with the description and color from the request's body. This method is private and only intended to be used within other DAO methods (for us, our next method to assign categories). Private methods in python are indicated with a prepended underscore _.

Just like with subtasks, we want to add a new route for assigning a category to a task:

@app.route("/tasks/<int:task_id>/category/", methods=["POST"])
def assign_category(task_id):
    body = json.loads(request.data)
    task = dao.assign_category(task_id, body.get("description"), body.get("color"),)
    if task is None:
        return failure_response("Task not found!")
    return success_response(task)

Following previous design patterns, we will extend our usual path for a specific task with the category/ string and make this route accept the POST request method. Because we off-loaded the important logic to our DAO, our route method remains relatively simple and clean to read.

Last updated