On this page
- What You Need
- Stage 1: Defensive Data Ingestion
- Stage 2: ADC to Pressure Conversion
- Stage 3: Pressure to Flow — Bernoulli’s Equation
- Stage 4: Peak Detection — Finding Breath Events
- Stage 5: Breath Validation — Pairing Inhalation with Exhalation
- Stage 6: Apnea Detection — Timing Gaps Between Breaths
- Stage 7: Leakage Calculation via Simpson’s Rule
- The Main Driver
Raw CPAP sensor data is useless on its own. The device outputs voltage readings and ADC values that mean nothing to a clinician without a pipeline that converts them into breath rate, apnea count, and mask leakage. The obvious approach — scan for values above a threshold and call those breaths — produces garbage: false positives from noise, missed shallow breaths, no way to distinguish a pause from apnea.
This tutorial walks through cpap_measurements.py’s signal processing pipeline, which converts raw ADC values to clinical metrics through a multi-stage transformation:
ADC values → Pressure (Pascals) → Volumetric Flow (m³/s) → Breath Events → Apnea Count
The pipeline uses physics-based modeling (venturi tube equations), statistical signal analysis (peak detection with physiological constraints), and temporal pattern recognition (apnea identification by timing gaps between valid breath cycles).
What You Need
pip install numpy scipy matplotlib
python --version # 3.11+
Input data format — seven columns per line: time, patient_p2, patient_p1ins, patient_p1exp, CPAP_p2, CPAP_p1ins, CPAP_p1exp
Stage 1: Defensive Data Ingestion
Medical sensor data is noisy. Lines can have missing values, non-numeric strings, or NaN entries. Every line gets validated before processing:
def error_check(line):
data = line.split(",")
valid = True
for x in data:
if (x == ""):
logging.error("Incorrect Data")
valid = False
elif (x == "NaN"):
logging.error("Incorrect Data")
valid = False
else:
try:
val = float(x)
except ValueError:
logging.error("Incorrect Data")
valid = False
return valid
The "NaN" string check is deliberate. float("NaN") succeeds in Python — it returns a NaN float without raising an exception. That NaN then propagates silently through every downstream calculation: np.max([0.1, 0.2, np.nan, 0.15]) returns nan, not 0.2. Catch the string before it becomes a float.
Stage 2: ADC to Pressure Conversion
The CPAP pressure sensor outputs 10-bit ADC values that map linearly to pressure via the manufacturer’s calibration formula:
def ADC_to_Pressure(line):
data = line.split(",")
for i in range(7):
if i == 0:
data[i] = float(data[i])
else:
# 98.0665 Pa = 1 cmH2O (unit conversion)
# 25.4 / (14745 - 1638) = sensor-specific scaling
# (ADC - 1638) = zero-offset correction
data[i] = 98.0665 * (25.4 / (14745 - 1638)) * (int(data[i]) - 1638)
return data
Breaking the formula apart: 14745 is the ADC value at maximum pressure (25 cmH2O); 1638 is the ADC value at zero pressure; 25.4 is the sensor’s measurement range in cmH2O; 98.0665 converts cmH2O to Pascals. The full formula reduces to Pressure = 0.1902 × (ADC - 1638) Pascals.
Stage 3: Pressure to Flow — Bernoulli’s Equation
The CPAP device uses a venturi tube to measure airflow. Bernoulli’s equation relates the pressure differential to flow velocity; multiplying by the cross-sectional area gives volumetric flow:
def Pressure_to_Flow(data):
p2 = data[1] # Downstream pressure
p1_ins = data[2] # Upstream pressure during inhalation
p1_exp = data[3] # Upstream pressure during expiration
A1 = np.pi * (0.0075)**2 # Upstream area (15mm diameter)
A2 = np.pi * (0.006)**2 # Neck area (12mm diameter)
if (p1_ins >= p1_exp):
# Inhalation
flow = A1 * np.sqrt(2 * (p1_ins - p2) / (1.199 * (((A1/A2)**2) - 1)))
else:
# Exhalation — negative sign indicates outward flow
flow = -A1 * np.sqrt(2 * (p1_exp - p2) / (1.199 * (((A1/A2)**2) - 1)))
return data[0], flow
The physics derivation: combine Bernoulli’s equation with the continuity equation (A1v1 = A2v2) and solve for v1. The result is v1 = sqrt(2ΔP / (ρ((A1/A2)² - 1))). Multiply by A1 to get volumetric flow. Sign convention is positive for inhalation, negative for exhalation.
If A1 equals A2 (no constriction), the denominator becomes zero and the function crashes. In practice this can’t happen with the physical sensor, but defensive code would check abs(A1 - A2) < 1e-6 and raise an error.
Stage 4: Peak Detection — Finding Breath Events
A breath consists of an inhalation peak followed by an exhalation peak. SciPy’s find_peaks() with physiologically-tuned parameters handles the detection:
def find_breaths(time, flow):
# Find inhalation peaks (positive flow)
ins_peaks, _ = signal.find_peaks(
flow,
height=0.0001, # Minimum peak height (m³/s)
distance=80, # Minimum 80 samples between peaks (~0.8s at 100Hz)
width=20 # Minimum peak width in samples
)
# Find exhalation peaks (negative flow) — invert to find troughs
exp_peaks, _ = signal.find_peaks(
-flow,
height=0.00005, # Lower threshold: exhalation is passive, produces less flow
distance=80,
width=20
)
Inhalation is active (diaphragm contracts), producing higher flow rates than exhalation (passive, diaphragm relaxes). Setting equal thresholds would either miss shallow exhalations or miscount. The 80-sample minimum distance enforces that peaks can’t be closer than 0.8 seconds at 100Hz — physiologically, normal breathing is 12-20 breaths per minute, so no two peaks should be less than ~2 seconds apart. The width=20 parameter rejects sharp noise spikes that don’t have the temporal profile of an actual breath.
Stage 5: Breath Validation — Pairing Inhalation with Exhalation
Not every peak is a valid breath. A valid breath requires an exhalation between each inhalation and the next:
breaths = 1
breath_times = []
pos_breaths = dict()
for i in range(len(ins_peaks) - 1):
for z in exp_peaks:
if ((time[ins_peaks[i]] < time[z]) and
(time[z] < time[ins_peaks[i+1]])):
breaths += 1
pos_breaths.update({flow[ins_peaks[i]]: i})
actual_peak = max(pos_breaths.keys())
breath_times.append(time[ins_peaks[pos_breaths[actual_peak]]])
pos_breaths.clear()
break
else:
pos_breaths.update({flow[ins_peaks[i]]: i})
breath_times.append(time[ins_peaks[i+1]])
return breaths, breath_times
If there’s no exhalation between two inhalation peaks, the inhalation doesn’t count as a complete breath. This eliminates double-counted peaks from sighs, coughs, or sensor artifacts.
Stage 6: Apnea Detection — Timing Gaps Between Breaths
Apnea is defined clinically as cessation of airflow for 10 or more seconds. After collecting validated breath times, the logic is straightforward:
def count_apnea(breath_times):
apnea_count = 0
for i in range(len(breath_times) - 1):
time_gap = breath_times[i+1] - breath_times[i]
if (time_gap > 10):
apnea_count += 1
return apnea_count
One edge case worth knowing: coughing can produce several rapid spike peaks in quick succession — the paired validation usually filters these out, but if cough artifacts slip through as “breaths,” the timing gaps before and after them won’t trigger apnea detection. A minimum inter-breath interval filter (if breath_times[i] - breath_times[i-1] < 1.0: skip) would reject the artifactual rapid bursts.
Stage 7: Leakage Calculation via Simpson’s Rule
Leakage is the total air volume that escaped through mask gaps. It’s the integral of the flow curve:
def calculate_leakage(time, flow):
leakage = integrate.simpson(flow, time) * 1000 # Convert m³ to liters
if (leakage < 0):
logging.warning("Leakage is negative")
return leakage
Simpson’s rule (integrate.simpson) approximates the integral with parabolas rather than trapezoids. The error term for trapezoidal integration is proportional to Δx²; for Simpson’s it’s proportional to Δx⁴. For 1000 samples of respiratory data, the difference in accuracy is roughly 100x. For a clinical measurement, that matters.
The Main Driver
def analysis_driver(file_name):
with open(file_name, "r") as in_file:
first_line = in_file.readline().strip("\n") # Skip header
t = np.array([])
F = np.array([])
for line in in_file:
valid_line = error_check(line.strip("\n"))
if (valid_line is False):
continue
data = ADC_to_Pressure(line)
time, flow = Pressure_to_Flow(data)
t = np.append(t, time)
F = np.append(F, flow)
breaths, breath_times = find_breaths(t, F)
duration = calculate_duration(t)
breath_rate_bpm = calculate_breath_rate(duration, breaths)
apnea_count = count_apnea(breath_times)
leakage = calculate_leakage(t, F)
return breath_rate_bpm, apnea_count, t, F
One performance note: np.append(t, time) creates a new array on every iteration — this is O(n²). For production with large files, pre-allocate: count the lines first, then create np.zeros(num_lines) and fill by index. The difference is roughly 10x for 10,000 data points.
Also validate duration before calculating breath rate. A file with only a few data points will produce an absurdly high BPM (12,000 BPM for 2 breaths detected in 0.01 seconds). Check that duration >= 10 seconds before the division.