featured image

Medical Signal Processing Pipeline

A comprehensive walkthrough of a Python signal processing pipeline that transforms raw CPAP device sensor data into clinically actionable metrics like breath rate, apnea count, and mask leakage.

Published

Thu Jun 12 2025

Technologies Used

Python
Advanced 34 minutes

Raw CPAP sensor data is useless on its own. The device outputs voltage readings and ADC values that mean nothing to a clinician without a pipeline that converts them into breath rate, apnea count, and mask leakage. The obvious approach — scan for values above a threshold and call those breaths — produces garbage: false positives from noise, missed shallow breaths, no way to distinguish a pause from apnea.

This tutorial walks through cpap_measurements.py’s signal processing pipeline, which converts raw ADC values to clinical metrics through a multi-stage transformation:

ADC values → Pressure (Pascals) → Volumetric Flow (m³/s) → Breath Events → Apnea Count

The pipeline uses physics-based modeling (venturi tube equations), statistical signal analysis (peak detection with physiological constraints), and temporal pattern recognition (apnea identification by timing gaps between valid breath cycles).

What You Need

pip install numpy scipy matplotlib
python --version  # 3.11+

Input data format — seven columns per line: time, patient_p2, patient_p1ins, patient_p1exp, CPAP_p2, CPAP_p1ins, CPAP_p1exp

Stage 1: Defensive Data Ingestion

Medical sensor data is noisy. Lines can have missing values, non-numeric strings, or NaN entries. Every line gets validated before processing:

def error_check(line):
    data = line.split(",")
    valid = True

    for x in data:
        if (x == ""):
            logging.error("Incorrect Data")
            valid = False
        elif (x == "NaN"):
            logging.error("Incorrect Data")
            valid = False
        else:
            try:
                val = float(x)
            except ValueError:
                logging.error("Incorrect Data")
                valid = False

    return valid

The "NaN" string check is deliberate. float("NaN") succeeds in Python — it returns a NaN float without raising an exception. That NaN then propagates silently through every downstream calculation: np.max([0.1, 0.2, np.nan, 0.15]) returns nan, not 0.2. Catch the string before it becomes a float.

Stage 2: ADC to Pressure Conversion

The CPAP pressure sensor outputs 10-bit ADC values that map linearly to pressure via the manufacturer’s calibration formula:

def ADC_to_Pressure(line):
    data = line.split(",")

    for i in range(7):
        if i == 0:
            data[i] = float(data[i])
        else:
            # 98.0665 Pa = 1 cmH2O (unit conversion)
            # 25.4 / (14745 - 1638) = sensor-specific scaling
            # (ADC - 1638) = zero-offset correction
            data[i] = 98.0665 * (25.4 / (14745 - 1638)) * (int(data[i]) - 1638)

    return data

Breaking the formula apart: 14745 is the ADC value at maximum pressure (25 cmH2O); 1638 is the ADC value at zero pressure; 25.4 is the sensor’s measurement range in cmH2O; 98.0665 converts cmH2O to Pascals. The full formula reduces to Pressure = 0.1902 × (ADC - 1638) Pascals.

Stage 3: Pressure to Flow — Bernoulli’s Equation

The CPAP device uses a venturi tube to measure airflow. Bernoulli’s equation relates the pressure differential to flow velocity; multiplying by the cross-sectional area gives volumetric flow:

def Pressure_to_Flow(data):
    p2     = data[1]      # Downstream pressure
    p1_ins = data[2]      # Upstream pressure during inhalation
    p1_exp = data[3]      # Upstream pressure during expiration

    A1 = np.pi * (0.0075)**2   # Upstream area (15mm diameter)
    A2 = np.pi * (0.006)**2    # Neck area (12mm diameter)

    if (p1_ins >= p1_exp):
        # Inhalation
        flow = A1 * np.sqrt(2 * (p1_ins - p2) / (1.199 * (((A1/A2)**2) - 1)))
    else:
        # Exhalation — negative sign indicates outward flow
        flow = -A1 * np.sqrt(2 * (p1_exp - p2) / (1.199 * (((A1/A2)**2) - 1)))

    return data[0], flow

The physics derivation: combine Bernoulli’s equation with the continuity equation (A1v1 = A2v2) and solve for v1. The result is v1 = sqrt(2ΔP / (ρ((A1/A2)² - 1))). Multiply by A1 to get volumetric flow. Sign convention is positive for inhalation, negative for exhalation.

If A1 equals A2 (no constriction), the denominator becomes zero and the function crashes. In practice this can’t happen with the physical sensor, but defensive code would check abs(A1 - A2) < 1e-6 and raise an error.

Stage 4: Peak Detection — Finding Breath Events

A breath consists of an inhalation peak followed by an exhalation peak. SciPy’s find_peaks() with physiologically-tuned parameters handles the detection:

def find_breaths(time, flow):
    # Find inhalation peaks (positive flow)
    ins_peaks, _ = signal.find_peaks(
        flow,
        height=0.0001,    # Minimum peak height (m³/s)
        distance=80,      # Minimum 80 samples between peaks (~0.8s at 100Hz)
        width=20          # Minimum peak width in samples
    )

    # Find exhalation peaks (negative flow) — invert to find troughs
    exp_peaks, _ = signal.find_peaks(
        -flow,
        height=0.00005,   # Lower threshold: exhalation is passive, produces less flow
        distance=80,
        width=20
    )

Inhalation is active (diaphragm contracts), producing higher flow rates than exhalation (passive, diaphragm relaxes). Setting equal thresholds would either miss shallow exhalations or miscount. The 80-sample minimum distance enforces that peaks can’t be closer than 0.8 seconds at 100Hz — physiologically, normal breathing is 12-20 breaths per minute, so no two peaks should be less than ~2 seconds apart. The width=20 parameter rejects sharp noise spikes that don’t have the temporal profile of an actual breath.

Stage 5: Breath Validation — Pairing Inhalation with Exhalation

Not every peak is a valid breath. A valid breath requires an exhalation between each inhalation and the next:

    breaths = 1
    breath_times = []
    pos_breaths = dict()

    for i in range(len(ins_peaks) - 1):
        for z in exp_peaks:
            if ((time[ins_peaks[i]] < time[z]) and
                (time[z] < time[ins_peaks[i+1]])):

                breaths += 1
                pos_breaths.update({flow[ins_peaks[i]]: i})
                actual_peak = max(pos_breaths.keys())
                breath_times.append(time[ins_peaks[pos_breaths[actual_peak]]])
                pos_breaths.clear()
                break
            else:
                pos_breaths.update({flow[ins_peaks[i]]: i})

    breath_times.append(time[ins_peaks[i+1]])
    return breaths, breath_times

If there’s no exhalation between two inhalation peaks, the inhalation doesn’t count as a complete breath. This eliminates double-counted peaks from sighs, coughs, or sensor artifacts.

Stage 6: Apnea Detection — Timing Gaps Between Breaths

Apnea is defined clinically as cessation of airflow for 10 or more seconds. After collecting validated breath times, the logic is straightforward:

def count_apnea(breath_times):
    apnea_count = 0

    for i in range(len(breath_times) - 1):
        time_gap = breath_times[i+1] - breath_times[i]
        if (time_gap > 10):
            apnea_count += 1

    return apnea_count

One edge case worth knowing: coughing can produce several rapid spike peaks in quick succession — the paired validation usually filters these out, but if cough artifacts slip through as “breaths,” the timing gaps before and after them won’t trigger apnea detection. A minimum inter-breath interval filter (if breath_times[i] - breath_times[i-1] < 1.0: skip) would reject the artifactual rapid bursts.

Stage 7: Leakage Calculation via Simpson’s Rule

Leakage is the total air volume that escaped through mask gaps. It’s the integral of the flow curve:

def calculate_leakage(time, flow):
    leakage = integrate.simpson(flow, time) * 1000  # Convert m³ to liters

    if (leakage < 0):
        logging.warning("Leakage is negative")

    return leakage

Simpson’s rule (integrate.simpson) approximates the integral with parabolas rather than trapezoids. The error term for trapezoidal integration is proportional to Δx²; for Simpson’s it’s proportional to Δx⁴. For 1000 samples of respiratory data, the difference in accuracy is roughly 100x. For a clinical measurement, that matters.

The Main Driver

def analysis_driver(file_name):
    with open(file_name, "r") as in_file:
        first_line = in_file.readline().strip("\n")  # Skip header

        t = np.array([])
        F = np.array([])

        for line in in_file:
            valid_line = error_check(line.strip("\n"))
            if (valid_line is False):
                continue

            data       = ADC_to_Pressure(line)
            time, flow = Pressure_to_Flow(data)

            t = np.append(t, time)
            F = np.append(F, flow)

        breaths, breath_times = find_breaths(t, F)
        duration              = calculate_duration(t)
        breath_rate_bpm       = calculate_breath_rate(duration, breaths)
        apnea_count           = count_apnea(breath_times)
        leakage               = calculate_leakage(t, F)

        return breath_rate_bpm, apnea_count, t, F

One performance note: np.append(t, time) creates a new array on every iteration — this is O(n²). For production with large files, pre-allocate: count the lines first, then create np.zeros(num_lines) and fill by index. The difference is roughly 10x for 10,000 data points.

Also validate duration before calculating breath rate. A file with only a few data points will produce an absurdly high BPM (12,000 BPM for 2 breaths detected in 0.01 seconds). Check that duration >= 10 seconds before the division.

We respect your privacy.

← View All Tutorials

Related Projects

    Ask me anything!