featured image

RESTful API Design with SQLAlchemy ORM

A deep dive into implementing robust create-or-update (upsert) logic in a Flask API using SQLAlchemy. This tutorial covers handling JSON array storage, atomic record replacement, and partial updates while ensuring proper session management to prevent connection leaks.

Published

Fri Jun 13 2025

Technologies Used

Python SQLite Flask
Intermediate 11 minutes

Purpose

The Problem

Most beginner API tutorials teach you to create records:

@app.route("/create", methods=["POST"])
def create():
    data = request.json
    new_record = Model(**data)
    db.session.add(new_record)
    db.session.commit()

But real systems need idempotent upsert logic: create if missing, update if exists. Naive implementations cause bugs:

  • Duplicate records when network retries trigger multiple creates
  • Lost data when updates overwrite entire records instead of appending
  • Race conditions when room turnover isn’t atomic

The Solution

We’re analyzing the /add_patient route from server.py, which implements production-grade create-or-update logic with array appending, atomic room turnover, and proper session management.

Why It Matters

This code handles three scenarios most tutorials ignore:

  1. Appending to JSON arrays stored in SQL text columns
  2. Atomic record replacement when a room’s patient changes
  3. Partial updates (name only, CPAP data only, or both)

Prerequisites & Tooling

Knowledge Base:

  • Flask basics (routes, request handling)
  • SQL fundamentals (PRIMARY KEY, transactions)
  • JSON serialization concepts

Environment:

pip install flask sqlalchemy
python --version  # 3.11+

Database Schema (PatientModel.py):

class Patient(Base):
    __tablename__ = 'patients'

    room_number = Column(Integer, primary_key=True)  # Unique constraint
    patient_mrn = Column(Integer)                     # Medical Record Number
    patient_name = Column(String)
    CPAP_pressure = Column(Text)     # JSON array: "[15, 16, 14]"
    breath_rate = Column(Text)       # JSON array: "[12.3, 14.1]"
    apnea_count = Column(Text)       # JSON array: "[0, 2, 1]"
    flow_image = Column(Text)        # JSON array: ["b64str1", "b64str2"]
    timestamp = Column(Text)         # JSON array: ["2024-01-01 10:00:00", ...]

High-Level Architecture

sequenceDiagram
    participant Client
    participant Flask Route
    participant Validation
    participant Database
    participant Update Logic

    Client->>Flask Route: POST /add_patient {data}
    Flask Route->>Validation: validate_input_data_generic()

    alt Validation Fails
        Validation-->>Flask Route: Error message
        Flask Route-->>Client: 400 Bad Request
    else Validation Passes
        Validation-->>Flask Route: True
        Flask Route->>Database: Query by room_number

        alt Patient Doesn't Exist
            Database-->>Flask Route: None
            Flask Route->>Database: INSERT new patient
            Database-->>Flask Route: Success
            Flask Route-->>Client: 200 "New patient created"
        else Patient Exists
            Database-->>Flask Route: Existing record
            Flask Route->>Update Logic: Check if MRN changed

            alt MRN Changed (Room Turnover)
                Update Logic->>Database: DELETE old record
                Update Logic->>Database: INSERT new record
                Database-->>Flask Route: Success
                Flask Route-->>Client: 200 "Patient updated"
            else Same MRN (Update Data)
                Update Logic->>Update Logic: Parse JSON arrays
                Update Logic->>Update Logic: Append new values
                Update Logic->>Database: UPDATE record
                Database-->>Flask Route: Success
                Flask Route-->>Client: 200 "Patient updated"
            end
        end
    end

Analogy: Think of this like a hotel room management system:

  • Room number = Primary key (never changes)
  • Guest MRN = Guest ID (changes when new guest checks in)
  • CPAP data = Room service orders (appended to history)

When a new guest checks into the same room, you clear the previous guest’s history and start fresh.

The Implementation

Step 1: Flask Route Handler (The Entry Point)

Logic: The route handler is a thin wrapper that delegates to a driver function. This separation allows testing without running Flask.

@app.route("/add_patient", methods=["POST"])
def add_patient_handler():
    """
    Flask Handler for /add_patient route

    Receives JSON dictionary in format:
    {
        "patient_name": <string>,
        "patient_mrn": <int>,
        "room_number": <int>,
        "CPAP_pressure": <int or "">,
        "breath_rate": <float or "">,
        "apnea_count": <int or "">,
        "flow_image": <base64_string or "">
    }

    Returns (message, status_code) tuple
    """
    # Extract JSON from POST request body
    in_data = request.get_json()

    # Delegate to driver function
    answer, status_code = add_patient_driver(in_data)

    # Convert to JSON response
    return jsonify(answer), status_code

🔵 Deep Dive: Why separate the handler from the driver?

Testing benefits:

# Can test without Flask app context
def test_add_patient_with_missing_mrn():
    result, code = add_patient_driver({"room_number": 101})
    assert code == 400
    assert "patient_mrn" in result

# vs. testing the route directly (requires mock HTTP requests)

Step 2: Driver Function - Validation Phase

Logic: Define the expected schema and validate before touching the database.

def add_patient_driver(in_data):
    """
    Implements the 'add_patient' route logic

    Returns
    -------
    str: Success/error message
    int: HTTP status code (200 or 400)
    """
    # Define expected schema
    expected_keys = [
        "patient_mrn",
        "room_number",
        "patient_name",
        "CPAP_pressure",
        "breath_rate",
        "apnea_count",
        "flow_image"
    ]

    expected_types = [int, int, str, int, float, int, str]

    # Use the generic validator from Tutorial 1
    validation = validate_input_data_generic(in_data, expected_keys, expected_types)

    if validation is not True:
        # Validation returned an error message
        return validation, 400

Step 3: Database Query - Does Patient Exist?

Logic: Query by the primary key (room_number) to determine create vs. update path.

    # Check if room already has a patient
    does_id_exist = does_patient_exist_in_db(int(in_data["room_number"]))

    # Capture timestamp for this operation
    date = current_time()  # Returns "YYYY-MM-DD HH:MM:SS"

Helper function:

def does_patient_exist_in_db(room_number):
    """
    Queries database for patient by room number (primary key)

    Returns
    -------
    bool: True if patient exists, False otherwise
    """
    session = Session()
    try:
        patient = session.query(Patient).filter_by(room_number=room_number).first()
        return patient is not None
    finally:
        # CRITICAL: Always close sessions to prevent connection leaks
        session.close()

🔴 Danger: Forgetting session.close() causes connection pool exhaustion:

# After 10 requests without closing:
sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached

Step 4: Create Path - New Patient

Logic: If the room doesn’t exist in the database, create a new record. Handle partial data gracefully.

    if does_id_exist is False:
        # Create new patient record
        new_patient_to_db(in_data, date)
        return "New patient created", 200

Implementation of new_patient_to_db():

def new_patient_to_db(in_data, date):
    """
    Adds a new patient dictionary to the database

    Handles three scenarios:
    1. Name only (no CPAP data)
    2. CPAP data only (no name)
    3. Both name and CPAP data
    """
    session = Session()
    try:
        # Check if patient name was provided
        if (in_data["patient_name"] != ""):
            # Check if CPAP data was also provided
            if (in_data["CPAP_pressure"] != "" and in_data["breath_rate"] != ""):
                # Create with both name and CPAP data
                new_patient = Patient(
                    patient_mrn=in_data["patient_mrn"],
                    room_number=in_data["room_number"],
                    patient_name=in_data["patient_name"],
                    # Convert single values to JSON arrays
                    CPAP_pressure=json.dumps([int(in_data["CPAP_pressure"])]),
                    breath_rate=json.dumps([float(in_data["breath_rate"])]),
                    apnea_count=json.dumps([int(in_data["apnea_count"])]),
                    flow_image=json.dumps([str(in_data["flow_image"])]),
                    timestamp=json.dumps([date])
                )
            else:
                # Create with name only (no CPAP data)
                new_patient = Patient(
                    patient_mrn=in_data["patient_mrn"],
                    room_number=in_data["room_number"],
                    patient_name=in_data["patient_name"]
                )
        else:
            # Create with CPAP data only (no name)
            if (in_data["CPAP_pressure"] != "" and in_data["breath_rate"] != ""):
                new_patient = Patient(
                    patient_mrn=in_data["patient_mrn"],
                    room_number=in_data["room_number"],
                    CPAP_pressure=json.dumps([int(in_data["CPAP_pressure"])]),
                    breath_rate=json.dumps([float(in_data["breath_rate"])]),
                    apnea_count=json.dumps([int(in_data["apnea_count"])]),
                    flow_image=json.dumps([str(in_data["flow_image"])]),
                    timestamp=json.dumps([date])
                )
            else:
                # Create with MRN and room only
                new_patient = Patient(
                    patient_mrn=in_data["patient_mrn"],
                    room_number=in_data["room_number"]
                )

        session.add(new_patient)
        session.commit()
        return new_patient
    finally:
        session.close()

🔵 Deep Dive: Why store arrays as JSON strings?

SQLite doesn’t have native array types. Three options:

ApproachProsCons
JSON in TEXT columnSimple, portable, SQLite-compatibleInefficient queries (can’t use WHERE on array elements)
Separate tableNormalized, queryableRequires JOINs, more complex schema
PostgreSQL ARRAY typeBest performanceRequires PostgreSQL

For this use case (appending values, retrieving full arrays), JSON is optimal.

Step 5: Update Path - Room Turnover Detection

Logic: If a patient exists, check if the MRN changed (indicating a new patient in the same room).

    else:
        # Patient exists, update or replace
        update_patient(int(in_data["room_number"]), in_data, date)
        return "Patient successfully updated", 200

Critical section of update_patient():

def update_patient(room_number, in_data, date):
    """
    Updates existing patient OR replaces with new patient if MRN changed
    """
    session = Session()
    try:
        # Fetch existing patient record
        x = session.query(Patient).filter_by(room_number=room_number).first()

        # CRITICAL: Check if this is a room turnover
        if (x.patient_mrn != in_data["patient_mrn"]):
            # New patient in same room - ATOMIC REPLACEMENT
            session.delete(x)       # Delete old patient
            session.commit()        # Commit deletion
            # Create new patient record
            patient = new_patient_to_db(in_data, date)
            return patient

🔴 Danger: This is not fully atomic! Between delete and new_patient_to_db(), the room has no patient. If the server crashes, data is lost.

Better approach:

# Use a single transaction
session.delete(x)
new_patient = Patient(**in_data)  # Create in same session
session.add(new_patient)
session.commit()  # Atomic commit of both operations

Step 6: Update Path - Appending to Arrays

Logic: If the MRN matches, append new CPAP data to existing arrays.

        # Same patient - append new data
        if (in_data["patient_name"] != ""):
            x.patient_name = in_data["patient_name"]  # Update name

        if (in_data["CPAP_pressure"] != "" and in_data["breath_rate"] != ""):
            # Deserialize JSON array or create empty list
            cpap_list = json.loads(x.CPAP_pressure) if x.CPAP_pressure else []
            cpap_list.append(int(in_data["CPAP_pressure"]))
            # Serialize back to JSON
            x.CPAP_pressure = json.dumps(cpap_list)

            breath_list = json.loads(x.breath_rate) if x.breath_rate else []
            breath_list.append(float(in_data["breath_rate"]))
            x.breath_rate = json.dumps(breath_list)

            apnea_list = json.loads(x.apnea_count) if x.apnea_count else []
            apnea_list.append(int(in_data["apnea_count"]))
            x.apnea_count = json.dumps(apnea_list)

            flow_list = json.loads(x.flow_image) if x.flow_image else []
            flow_list.append(str(in_data["flow_image"]))
            x.flow_image = json.dumps(flow_list)

            time_list = json.loads(x.timestamp) if x.timestamp else []
            time_list.append(date)
            x.timestamp = json.dumps(time_list)

        session.commit()
        return x
    finally:
        session.close()

Example state transition:

# Initial state
{"CPAP_pressure": "[15, 16]"}

# After update with pressure=14
{"CPAP_pressure": "[15, 16, 14]"}

Under the Hood

SQLAlchemy Session Lifecycle

session = Session()  # Creates a connection from the pool
try:
    # Queries/modifications here
    session.commit()  # Writes changes to database
finally:
    session.close()  # Returns connection to pool

What happens without try/finally?

def bad_function():
    session = Session()
    x = session.query(Patient).first()
    x.name = "New Name"
    session.commit()
    # Forgot to close!
    # Connection remains open indefinitely

After 5-10 calls:

sqlalchemy.exc.TimeoutError: QueuePool limit exceeded

JSON Serialization Performance

Naive approach:

# Deserialize -> Modify -> Serialize on every request
cpap_list = json.loads(x.CPAP_pressure)  # O(n)
cpap_list.append(new_value)              # O(1) amortized
x.CPAP_pressure = json.dumps(cpap_list)  # O(n)
session.commit()                          # Disk I/O

Total time complexity: O(n) where n = array length

For 1000 data points: ~2-5ms per operation

Alternative (if arrays grow very large):

  • Use PostgreSQL JSONB with append operators
  • Store recent data in JSON, archive old data to separate table
  • Consider time-series database (InfluxDB) for medical telemetry

Edge Cases & Pitfalls

Race Condition: Concurrent Room Turnovers

Scenario:

# Thread 1: Updates room 101 with MRN=12345
# Thread 2: Updates room 101 with MRN=67890 (simultaneously)

# Both threads read the same old patient record
# Both delete it
# Both create new records
# Result: One patient is lost!

Fix: Use database-level locking

x = session.query(Patient).filter_by(room_number=room_number).with_for_update().first()
# Acquires row-level lock - other transactions wait

Memory Leak: Unbounded Array Growth

# After 1 year of hourly measurements:
CPAP_pressure = "[15, 16, 14, ...]"  # 8,760 values!

Problems:

  • Deserializing large JSON strings is slow
  • Single HTTP responses become multi-megabyte
  • Database queries slow down

Fix: Implement data archival

if len(cpap_list) > 1000:
    # Archive old data to separate table
    archive_old_data(cpap_list[:500])
    cpap_list = cpap_list[500:]

Security: SQL Injection?

Is this vulnerable?

session.query(Patient).filter_by(room_number=room_number).first()

No! SQLAlchemy uses parameterized queries:

-- Actual SQL executed:
SELECT * FROM patients WHERE room_number = ? LIMIT 1
-- Parameters: [101]

The ? placeholder prevents injection. Never use:

# DANGEROUS:
session.execute(f"SELECT * FROM patients WHERE room_number = {room_number}")

Conclusion

What You Learned:

  1. Upsert Pattern: Implemented create-or-update logic without database-specific UPSERT syntax
  2. Session Management: Used try/finally blocks to prevent connection leaks
  3. JSON Array Storage: Stored time-series data in relational databases efficiently
  4. Atomic Operations: Handled room turnover with transactional consistency
  5. API Design: Separated route handlers from business logic for testability

Skill Transfer: These patterns apply to:

  • E-commerce inventory systems (stock updates)
  • Social media likes/comments (append-only data)
  • IoT telemetry ingestion (sensor readings)
  • Audit log systems (immutable event streams)

We respect your privacy.

← View All Tutorials

Related Projects

    Ask me anything!