On this page
- The API Endpoint That Has to Handle Three Different Situations
- Prerequisites and Schema
- Separating the Flask Handler From the Business Logic
- The Driver: Validation, Then Branching on Existence
- Creating a New Patient: Handling Partial Data Gracefully
- The Update Path: Room Turnover vs. Appending Measurements
- SQLAlchemy’s Parameterized Queries and Why You Don’t Need to Worry About Injection
The API Endpoint That Has to Handle Three Different Situations
Most API tutorials cover creating records. Real systems need more: create if the record doesn’t exist, append new data if it does, and handle the case where the same room gets a different patient (which means the old patient’s data should be wiped and replaced). Get any of these wrong and you end up with duplicate records, lost data, or race conditions.
The /add_patient route in server.py handles a hospital room management scenario where:
- Room number is the primary key (never changes)
- Each room has exactly one patient at a time
- CPAP measurements are stored as growing JSON arrays (one entry per recording session)
- When a new patient checks into a room, the old patient’s data must be atomically replaced
Prerequisites and Schema
Knowledge Base:
- Flask basics (routes, request handling)
- SQL fundamentals (PRIMARY KEY, transactions)
- JSON serialization concepts
Environment:
pip install flask sqlalchemy
python --version # 3.11+
Database Schema (PatientModel.py):
class Patient(Base):
__tablename__ = 'patients'
room_number = Column(Integer, primary_key=True)
patient_mrn = Column(Integer)
patient_name = Column(String)
CPAP_pressure = Column(Text) # JSON array: "[15, 16, 14]"
breath_rate = Column(Text) # JSON array: "[12.3, 14.1]"
apnea_count = Column(Text) # JSON array: "[0, 2, 1]"
flow_image = Column(Text) # JSON array of base64 strings
timestamp = Column(Text) # JSON array of timestamps
Storing arrays as JSON strings in a TEXT column is a pragmatic choice for SQLite, which has no native array type. The tradeoff: you can’t filter on individual array elements with SQL. For this use case — appending values and retrieving the full history — it works fine. If the arrays ever need to be queryable, a separate table or a PostgreSQL ARRAY column would be better.
Separating the Flask Handler From the Business Logic
I always split route handlers from the actual logic. The handler is a thin wrapper; the driver function is testable without an HTTP context:
@app.route("/add_patient", methods=["POST"])
def add_patient_handler():
in_data = request.get_json()
answer, status_code = add_patient_driver(in_data)
return jsonify(answer), status_code
The benefit becomes obvious when writing tests:
# Test the driver directly — no mock HTTP requests needed
def test_add_patient_with_missing_mrn():
result, code = add_patient_driver({"room_number": 101})
assert code == 400
assert "patient_mrn" in result
The Driver: Validation, Then Branching on Existence
def add_patient_driver(in_data):
expected_keys = [
"patient_mrn", "room_number", "patient_name",
"CPAP_pressure", "breath_rate", "apnea_count", "flow_image"
]
expected_types = [int, int, str, int, float, int, str]
validation = validate_input_data_generic(in_data, expected_keys, expected_types)
if validation is not True:
return validation, 400
does_id_exist = does_patient_exist_in_db(int(in_data["room_number"]))
date = current_time()
if does_id_exist is False:
new_patient_to_db(in_data, date)
return "New patient created", 200
else:
update_patient(int(in_data["room_number"]), in_data, date)
return "Patient successfully updated", 200
The helper that checks existence closes its session in a finally block. This is non-negotiable. Without it:
def does_patient_exist_in_db(room_number):
session = Session()
try:
patient = session.query(Patient).filter_by(room_number=room_number).first()
return patient is not None
finally:
session.close()
After about 10 requests without closing sessions, SQLAlchemy raises TimeoutError: QueuePool limit of size 5 overflow 10 reached. The connection pool is exhausted and the server stops responding. Every database function in this codebase follows the same try/finally pattern.
Creating a New Patient: Handling Partial Data Gracefully
When a room has no patient, the request might contain just a name (before any CPAP recordings), just CPAP data (rare but handled), or both. The create function branches on which fields are populated:
def new_patient_to_db(in_data, date):
session = Session()
try:
if in_data["patient_name"] != "":
if in_data["CPAP_pressure"] != "" and in_data["breath_rate"] != "":
# Both name and CPAP data
new_patient = Patient(
patient_mrn=in_data["patient_mrn"],
room_number=in_data["room_number"],
patient_name=in_data["patient_name"],
CPAP_pressure=json.dumps([int(in_data["CPAP_pressure"])]),
breath_rate=json.dumps([float(in_data["breath_rate"])]),
apnea_count=json.dumps([int(in_data["apnea_count"])]),
flow_image=json.dumps([str(in_data["flow_image"])]),
timestamp=json.dumps([date])
)
else:
# Name only, no measurements yet
new_patient = Patient(
patient_mrn=in_data["patient_mrn"],
room_number=in_data["room_number"],
patient_name=in_data["patient_name"]
)
else:
# CPAP data only, no name
new_patient = Patient(
patient_mrn=in_data["patient_mrn"],
room_number=in_data["room_number"],
CPAP_pressure=json.dumps([int(in_data["CPAP_pressure"])]),
breath_rate=json.dumps([float(in_data["breath_rate"])]),
apnea_count=json.dumps([int(in_data["apnea_count"])]),
flow_image=json.dumps([str(in_data["flow_image"])]),
timestamp=json.dumps([date])
)
session.add(new_patient)
session.commit()
return new_patient
finally:
session.close()
Single values are stored as single-element JSON arrays — [15] instead of 15. Every subsequent recording appends to that array, so the data structure is consistent from the first entry.
The Update Path: Room Turnover vs. Appending Measurements
When a patient already exists in a room, the logic branches on whether the MRN (medical record number) changed. A different MRN means a new patient checked in — the old data should be deleted and replaced. The same MRN means the existing patient has new measurements to append.
def update_patient(room_number, in_data, date):
session = Session()
try:
x = session.query(Patient).filter_by(room_number=room_number).first()
if x.patient_mrn != in_data["patient_mrn"]:
# New patient in same room — replace entirely
session.delete(x)
session.commit()
return new_patient_to_db(in_data, date)
# Same patient — append new measurements
if in_data["patient_name"] != "":
x.patient_name = in_data["patient_name"]
if in_data["CPAP_pressure"] != "" and in_data["breath_rate"] != "":
cpap_list = json.loads(x.CPAP_pressure) if x.CPAP_pressure else []
cpap_list.append(int(in_data["CPAP_pressure"]))
x.CPAP_pressure = json.dumps(cpap_list)
breath_list = json.loads(x.breath_rate) if x.breath_rate else []
breath_list.append(float(in_data["breath_rate"]))
x.breath_rate = json.dumps(breath_list)
apnea_list = json.loads(x.apnea_count) if x.apnea_count else []
apnea_list.append(int(in_data["apnea_count"]))
x.apnea_count = json.dumps(apnea_list)
flow_list = json.loads(x.flow_image) if x.flow_image else []
flow_list.append(str(in_data["flow_image"]))
x.flow_image = json.dumps(flow_list)
time_list = json.loads(x.timestamp) if x.timestamp else []
time_list.append(date)
x.timestamp = json.dumps(time_list)
session.commit()
return x
finally:
session.close()
The append pattern — deserialize JSON, append the new value, serialize back — is O(n) in the number of existing measurements. For a patient with 1,000 data points, that’s about 2-5ms per update. If arrays ever grow into the tens of thousands, archiving old data to a separate table would be worth considering.
The room turnover path has a subtle atomicity problem: session.delete(x) commits the deletion, and then new_patient_to_db() opens a new session for the insert. If the server crashes between those two operations, the room has no patient. A cleaner implementation would keep both operations in the same transaction:
session.delete(x)
new_patient = Patient(**in_data)
session.add(new_patient)
session.commit() # Atomic — either both succeed or neither does
The current implementation is a known tradeoff in the codebase.
SQLAlchemy’s Parameterized Queries and Why You Don’t Need to Worry About Injection
The filter query:
session.query(Patient).filter_by(room_number=room_number).first()
generates:
SELECT * FROM patients WHERE room_number = ? LIMIT 1
-- Parameters: [101]
The ? placeholder is a parameterized binding. SQLAlchemy never concatenates user input directly into the query string. This is why raw queries like session.execute(f"SELECT * FROM patients WHERE room_number = {room_number}") are dangerous — they bypass this protection entirely. Always use the ORM methods or parameterized text() constructs for raw queries.
For concurrent access, the does_patient_exist_in_db + update_patient combination has a race condition: two simultaneous requests for the same room could both see “patient doesn’t exist” and both try to create. The fix is a database-level row lock:
x = session.query(Patient).filter_by(room_number=room_number).with_for_update().first()
with_for_update() acquires a row-level lock, making other transactions wait until the current one commits.