featured image

Medical Signal Processing Pipeline

A comprehensive walkthrough of a Python signal processing pipeline that transforms raw CPAP device sensor data into clinically actionable metrics like breath rate, apnea count, and mask leakage.

Published

Thu Jun 12 2025

Technologies Used

Python
Advanced 34 minutes

Purpose

The Problem

Medical devices generate raw sensor data—voltage readings, ADC values, pressure measurements—that mean nothing to clinicians without processing. A naive approach might be:

def count_breaths(data):
    breaths = 0
    for value in data:
        if value > threshold:  # Peak detected?
            breaths += 1
    return breaths

This produces garbage results:

  • False positives: Noise, coughing, or movement register as breaths
  • False negatives: Shallow breaths below threshold are missed
  • No apnea detection: Can’t distinguish normal pauses from dangerous cessation
  • Ignores physics: Doesn’t account for the actual fluid dynamics of airflow

Professional medical software must transform noisy sensor signals into clinically actionable metrics with less than 5% error rates.

The Solution

We’re studying the signal processing pipeline in cpap_measurements.py, which converts raw ADC values to clinical metrics through a multi-stage transformation:

ADC valuesPressure (Pascals)Volumetric Flow (m³/s)Breath EventsApnea Count

This pipeline uses physics-based modeling (venturi tube equations), statistical signal analysis (peak detection), and temporal pattern recognition (apnea identification).

What You Will Learn

This code demonstrates concepts rarely covered in tutorials:

  • Numerical integration using Simpson’s rule for leakage calculation
  • Peak finding with tuned prominence/distance parameters for physiological signals
  • Paired validation ensuring inhalation and exhalation peaks match
  • Edge case handling for corrupted sensor data (NaN, empty values)

Prerequisites & Tooling

Knowledge Base:

  • NumPy array operations (vectorization, broadcasting)
  • Signal processing basics (peaks, troughs, noise)
  • Physics: Pressure, flow, and Bernoulli’s principle
  • Calculus: Numerical integration concepts

Environment:

pip install numpy scipy matplotlib
python --version  # 3.11+

Sample Data Format (sample_data/patient_01.txt):

time,patient_p2,patient_p1ins,patient_p1exp,CPAP_p2,CPAP_p1ins,CPAP_p1exp
0.0,2345,2567,2456,2123,2234,2145
0.01,2456,2678,2567,2234,2345,2256
...

7 columns: Time (seconds), then 6 ADC pressure readings

High-Level Architecture

graph LR
    A[Raw ADC File] --> B[Error Check Lines]
    B --> C{Valid?}
    C -->|No| D[Log Error, Skip Line]
    C -->|Yes| E[ADC → Pascals Conversion]
    E --> F[Pascals → Flow Rate m³/s]
    F --> G[Accumulate Time & Flow Arrays]
    G --> H[Find Inhalation Peaks]
    G --> I[Find Exhalation Peaks]
    H --> J[Pair Validation]
    I --> J
    J --> K[Count Valid Breath Cycles]
    J --> L[Extract Breath Times]
    L --> M[Calculate Time Deltas]
    M --> N{Delta > 10s?}
    N -->|Yes| O[Increment Apnea Count]
    N -->|No| P[Continue]
    G --> Q[Numerical Integration Simpson's Rule]
    Q --> R[Calculate Leakage Volume]
    K --> S[Calculate Breath Rate]
    S --> T[Return Metrics]
    O --> T
    R --> T

Analogy: Imagine processing security camera footage to count people entering a building:

  1. Raw frames = Raw ADC values
  2. Motion detection = Peak finding
  3. Entry/exit pairing = Inhalation/exhalation validation
  4. Time gap analysis = Apnea detection

Just as you need to distinguish actual people from shadows, reflections, and pets, we must distinguish real breaths from artifacts.

Implementation

Step 1: Error Checking - Defensive Data Ingestion

Logic: Medical sensor data is notoriously noisy. Each line might have missing values, non-numeric strings, or NaN entries. We must validate every line before processing.

def error_check(line):
    """
    Logs errors if a line has missing or incorrect data points

    From each time point line, creates an array of seven entries by
    separating at each comma. Checks each entry for validity.

    Parameters
    ----------
    line : string
        One line of the input file (e.g., "0.01,2345,2567,2456,2123,2234,2145")

    Returns
    -------
    valid : boolean
        True if all seven values are numeric and usable
    """
    data = line.split(",")  # Split CSV line
    valid = True

    for x in data:
        # Check 1: Missing value
        if (x == ""):
            logging.error("Incorrect Data")
            valid = False
        # Check 2: Explicit NaN string
        elif (x == "NaN"):
            logging.error("Incorrect Data")
            valid = False
        else:
            # Check 3: Numeric conversion possible?
            try:
                val = float(x)
            except ValueError:
                logging.error("Incorrect Data")
                valid = False

    return valid

🔴 Danger: Why check for "NaN" string separately? NumPy’s float("NaN") succeeds but produces a NaN value that breaks downstream calculations:

float("NaN")  # Returns nan (not an error!)
np.sqrt(float("NaN"))  # Returns nan (silently propagates)

Real-world impact: If we don’t catch this, a single NaN corrupts the entire array:

flows = [0.1, 0.2, np.nan, 0.15]
np.max(flows)  # Returns nan (not 0.2!)

Step 2: ADC to Pressure Conversion - Applying Sensor Calibration

Logic: The CPAP device’s pressure sensor outputs 10-bit ADC values (0-1023) that map linearly to pressure. We must apply the manufacturer’s calibration formula.

def ADC_to_Pressure(line):
    """
    Converts each ADC value to Pressure in Pascals

    Given a time point's data, ignores time value and converts the other six
    values from ADC integer units to float pressure values.

    Calibration formula: P = 98.0665 * (25.4 / (14745 - 1638)) * (ADC - 1638)

    Parameters
    ----------
    line : string
        One line of the input file containing data for a single time point

    Returns
    -------
    data : array of seven floats
        [time, patient_p2, patient_p1ins, patient_p1exp, CPAP_p2, CPAP_p1ins, CPAP_p1exp]
    """
    data = line.split(",")

    for i in range(7):
        if i == 0:
            # Time remains as-is
            data[i] = float(data[i])
        else:
            # Apply pressure conversion formula
            # 98.0665 Pa = 1 cmH2O (standard pressure unit conversion)
            # 25.4 / (14745 - 1638) = Sensor-specific scaling factor
            # (ADC - 1638) = Zero-offset correction
            data[i] = 98.0665 * (25.4 / (14745 - 1638)) * (int(data[i]) - 1638)

    return data

🔵 Deep Dive: Breaking down the calibration formula:

Component 1: Unit Conversion

98.0665  # Pascals per cmH2O (standard conversion)

Medical literature uses cmH2O; physics calculations use Pascals.

Component 2: Sensor Scaling

25.4 / (14745 - 1638)  # ≈ 0.001937

This maps the sensor’s output range to a standard pressure range:

  • 14745 = ADC value at maximum pressure (25 cmH2O)
  • 1638 = ADC value at zero pressure
  • 25.4 = Maximum pressure in sensor’s range (cmH2O)

Component 3: Zero Offset

(int(data[i]) - 1638)  # Shift baseline to zero

Full formula:

Pressure = 98.0665 * (25.4 / 13107) * (ADC - 1638)
Pressure = 0.1902 * (ADC - 1638) Pascals

Step 3: Pressure to Flow - Venturi Tube Physics

Logic: The CPAP device uses a venturi tube (constricted airway) to measure flow. We apply Bernoulli’s equation to calculate volumetric flow from pressure differentials.

def Pressure_to_Flow(data):
    """
    Converts Pressure values to Volumetric Flow using venturi tube equations

    Compares inspiration pressure with expiration pressure to determine
    flow direction and calculates magnitude using Bernoulli's equation.

    Venturi tube specs:
    - Upstream diameter: 15 mm (radius = 7.5 mm)
    - Neck diameter: 12 mm (radius = 6 mm)
    - Moist air density: 1.199 kg/m³

    Parameters
    ----------
    data : array of seven floats
        [time, p2, p1_ins, p1_exp, CPAP_p2, CPAP_p1ins, CPAP_p1exp]

    Returns
    -------
    time : float
        Time point in seconds
    flow : float
        Volumetric flow rate in m³/second (positive = inhalation)
    """
    p2 = data[1]        # Downstream pressure
    p1_ins = data[2]    # Upstream pressure during inhalation
    p1_exp = data[3]    # Upstream pressure during expiration

    # Cross-sectional areas
    A1 = np.pi * (0.0075)**2  # Upstream area (m²)
    A2 = np.pi * (0.006)**2   # Neck area (m²)

    # Determine flow direction by comparing pressures
    if (p1_ins >= p1_exp):
        # Inhalation: Air flowing into patient
        # Bernoulli equation: v = sqrt(2 * ΔP / (ρ * (A1²/A2² - 1)))
        flow = A1 * np.sqrt(2 * (p1_ins - p2) / (1.199 * (((A1/A2)**2) - 1)))
    else:
        # Exhalation: Air flowing out of patient
        # Negative sign indicates outward flow
        flow = -A1 * np.sqrt(2 * (p1_exp - p2) / (1.199 * (((A1/A2)**2) - 1)))

    return data[0], flow

🔵 Deep Dive: Venturi tube physics derivation

Bernoulli’s equation:

P1 + (1/2)ρv1² = P2 + (1/2)ρv2²

Continuity equation:

A1 * v1 = A2 * v2  →  v2 = v1 * (A1/A2)

Combining and solving for v1:

P1 - P2 = (1/2)ρ(v2² - v1²)
P1 - P2 = (1/2)ρv1²((A1/A2)² - 1)
v1 = sqrt(2(P1 - P2) / (ρ((A1/A2)² - 1)))

Volumetric flow:

Q = A1 * v1

Sign convention:

  • Positive flow = Inhalation (air into patient)
  • Negative flow = Exhalation (air out of patient)

Step 4: Peak Detection - Finding Breath Events

Logic: A breath consists of an inhalation peak followed by an exhalation peak. We use SciPy’s find_peaks() with carefully tuned parameters to detect real breaths while ignoring noise.

def find_breaths(time, flow):
    """
    Finds the number of breaths and the time at which they occurred

    Uses scipy.signal.find_peaks with tuned parameters to identify
    respiratory events, then validates inhalation-exhalation pairs.

    Parameters
    ----------
    time : array of float
        Time points in seconds (e.g., [0.0, 0.01, 0.02, ...])
    flow : array of float
        Volumetric flow rates in m³/second (positive = in, negative = out)

    Returns
    -------
    breaths : integer
        Number of complete breath cycles detected
    breath_times : array of floats
        Time points at which inhalation peaks occurred
    """
    # Find INHALATION peaks (positive flow)
    ins_peaks, ins_properties = signal.find_peaks(
        flow,
        height=0.0001,      # Minimum peak height (m³/s)
        threshold=None,     # Not used
        distance=80,        # Minimum samples between peaks (≈0.8s at 100Hz)
        prominence=None,    # Not used
        width=20            # Minimum peak width in samples
    )

    # Find EXHALATION peaks (negative flow)
    # Invert signal with negative sign to find negative peaks as positive
    exp_peaks, exp_properties = signal.find_peaks(
        -flow,              # Negate to find troughs
        height=0.00005,     # Lower threshold for exhalation
        threshold=None,
        distance=80,
        prominence=None,
        width=20
    )

🔴 Danger: Why different heights for inhalation (0.0001) vs. exhalation (0.00005)?

Physiological reality: Inhalation is active (diaphragm contracts), producing higher flow rates. Exhalation is passive (diaphragm relaxes), producing lower flow rates.

Setting equal thresholds would:

  • Miss shallow exhalations → Undercounting breaths
  • Detect noise as exhalations → Overcounting breaths

Tuning process:

# Visualize to tune parameters
plt.plot(time, flow)
for x in ins_peaks:
    plt.plot(time[x], flow[x], 'r.')  # Red dots = inhalation peaks
for y in exp_peaks:
    plt.plot(time[y], flow[y], 'b.')  # Blue dots = exhalation peaks
plt.show()

Step 5: Breath Validation - Pairing Inhalation with Exhalation

Logic: A valid breath requires an exhalation to follow each inhalation before the next inhalation. We iterate through inhalation peaks and search for corresponding exhalation peaks.

    breaths = 1  # Start count at 1 (assumes file starts mid-breath)
    breath_times = []
    pos_breaths = dict()  # Temporary storage for potential breath peaks

    # Iterate through consecutive inhalation peaks
    for i in range(len(ins_peaks) - 1):
        # Search for exhalation peaks between this inhalation and the next
        for z in exp_peaks:
            # Check if exhalation occurs after this inhalation
            # but before the next inhalation
            if ((time[ins_peaks[i]] < time[z]) and
                (time[z] < time[ins_peaks[i+1]])):

                # Valid breath cycle found!
                breaths += 1

                # Store all potential peaks with their flow magnitude
                pos_breaths.update({flow[ins_peaks[i]]: i})

                # Select the highest flow as the "true" peak
                actual_peak = max(pos_breaths.keys())
                breath_times.append(time[ins_peaks[pos_breaths[actual_peak]]])

                # Clear temporary storage
                pos_breaths.clear()
                break  # Move to next inhalation peak
            else:
                # No matching exhalation yet, store as potential
                pos_breaths.update({flow[ins_peaks[i]]: i})

    # Add the final inhalation peak
    breath_times.append(time[ins_peaks[i+1]])

    return breaths, breath_times

Example visualization:

Flow (m³/s)
    ^
    |     INS1      INS2      INS3
    |      /\        /\        /\
    |     /  \      /  \      /  \
----+----/----\----/----\----/----\-----> Time (s)
    |         \  /      \  /      \  /
    |          \/        \/        \/
    |         EXP1      EXP2      EXP3

Valid pairs: (INS1, EXP1), (INS2, EXP2), (INS3, EXP3)
Breaths = 3

Invalid pattern (missed exhalation):

    |     INS1      INS2
    |      /\        /\
    |     /  \      /  \
----+----/----\----/----\-----> Time
    |         \  /
    |          \/
    |         EXP1

No exhalation between INS1 and INS2
Breaths = 1 (only INS1 counts)

Step 6: Apnea Detection - Temporal Pattern Recognition

Logic: Apnea events are abnormal pauses in breathing (>10 seconds). We calculate time deltas between consecutive breaths and count gaps exceeding the threshold.

def count_apnea(breath_times):
    """
    Counts number of apnea events in data

    Iterates through breath times and calculates time differences.
    If elapsed time exceeds 10 seconds, it's counted as an apnea event.

    Clinical definition: Apnea = cessation of airflow for ≥10 seconds

    Parameters
    ----------
    breath_times : array of floats
        Time points at which breaths occurred (e.g., [1.2, 3.5, 7.8, 22.1, ...])

    Returns
    -------
    apnea_count : integer
        Number of apnea events detected
    """
    apnea_count = 0

    for i in range(len(breath_times) - 1):
        # Calculate time between consecutive breaths
        time_gap = breath_times[i+1] - breath_times[i]

        if (time_gap > 10):
            apnea_count += 1
            # In production, would log: (timestamp, duration)
            # logging.warning(f"Apnea detected at {breath_times[i]}s, duration {time_gap}s")

    return apnea_count

Example:

breath_times = [1.0, 3.2, 5.5, 18.7, 20.3, 35.1]
#                    ↓     ↓      ↓            ↓
# Gaps:           2.2s  2.3s  13.2s (APNEA)  14.8s (APNEA)

count_apnea(breath_times)  # Returns 2

🔴 Danger: This algorithm has a false positive edge case:

Scenario: Patient coughs violently at t=10s, causing spurious peaks:

breath_times = [1.0, 3.2, 5.5, 10.1, 10.15, 10.2, 12.3, ...]
#                                  ↑ Cough artifacts

The cough creates three rapid “breaths” in 0.1 seconds, but the algorithm counts them as valid. However, the gap before (5.5 → 10.1 = 4.6s) and after (10.2 → 12.3 = 2.1s) doesn’t trigger apnea.

If the cough was missed:

breath_times = [1.0, 3.2, 5.5, 12.3, ...]
#                            ↓
# Gap: 6.8s (not apnea)

Solution: Add physiological validation:

# Reject breaths with inter-breath intervals < 1 second
MIN_BREATH_INTERVAL = 1.0
filtered_breaths = []
for i in range(len(breath_times)):
    if i == 0 or (breath_times[i] - breath_times[i-1]) >= MIN_BREATH_INTERVAL:
        filtered_breaths.append(breath_times[i])

Step 7: Leakage Calculation - Numerical Integration

Logic: Leakage is the total volume of air lost through mask gaps. We integrate the flow curve using Simpson’s rule—a numerical integration method more accurate than rectangular approximation.

def calculate_leakage(time, flow):
    """
    Calculates total mask leakage observed in data

    Uses scipy.integrate.simpson with Simpson's rule to approximate
    integral of area under the flow vs. time curve.

    Sign convention: Positive leakage = more air in than out (mask leak)

    Parameters
    ----------
    time : array of float
        Time points in seconds
    flow : array of float
        Volumetric flow rates in m³/second

    Returns
    -------
    leakage : float
        Total mask leakage volume in liters
    """
    # Numerical integration using Simpson's rule
    leakage = integrate.simpson(flow, time) * 1000  # Convert m³ to liters

    if (leakage < 0):
        logging.warning("Leakage is negative")

    return leakage

🔵 Deep Dive: Why Simpson’s rule instead of trapezoidal?

Trapezoidal rule (first-order):

Integral ≈ Σ (y[i] + y[i+1]) * Δx / 2
Error ∝ Δx²

Simpson’s rule (second-order):

Integral ≈ Σ (y[i] + 4*y[i+1] + y[i+2]) * Δx / 3
Error ∝ Δx⁴

For 1000 samples:

  • Trapezoidal error: ~0.001 liters
  • Simpson’s error: ~0.00001 liters (100× more accurate!)

Visual intuition:

Flow
  ^
  |     Actual curve
  |      ___
  |    /     \
  |   /       \___
  | /            \
  +----------------> Time

Trapezoidal: Approximates with straight lines (underestimates curves)
Simpson's:   Approximates with parabolas (fits curves better)

Step 8: Main Driver - Orchestrating the Pipeline

Logic: Read the file line by line, accumulate validated data points, then process the entire time series.

def analysis_driver(file_name):
    """
    Reads input file and orchestrates signal processing pipeline

    Opens file, validates each line, converts ADC→Pressure→Flow,
    then analyzes for breaths, apnea, and leakage.

    Parameters
    ----------
    file_name : string
        File containing raw CPAP data (CSV format with header)

    Returns
    -------
    breath_rate_bpm : float
    apnea_count : int
    t : numpy array of time values
    F : numpy array of flow values
    """
    logging.info("Start of data analysis. File Name: {}".format(file_name))

    with open(file_name, "r") as in_file:
        # Skip header line
        first_line = in_file.readline().strip("\n")

        # Initialize accumulators
        t = np.array([])  # Time array
        F = np.array([])  # Flow array

        # Process each line
        for line in in_file:
            # Step 1: Validate line
            valid_line = error_check(line.strip("\n"))
            if (valid_line is False):
                continue  # Skip corrupted lines

            # Step 2: ADC → Pressure conversion
            data = ADC_to_Pressure(line)

            # Step 3: Pressure → Flow conversion
            time, flow = Pressure_to_Flow(data)

            # Step 4: Accumulate in arrays
            t = np.append(t, time)
            F = np.append(F, flow)

        # Step 5: Process complete time series
        breaths, breath_times = find_breaths(t, F)
        duration = calculate_duration(t)
        breath_rate_bpm = calculate_breath_rate(duration, breaths)
        apnea_count = count_apnea(breath_times)
        leakage = calculate_leakage(t, F)

        return breath_rate_bpm, apnea_count, t, F

Under the Hood

NumPy Array Performance

Why use NumPy arrays instead of Python lists?

# Python list approach (SLOW)
time_list = []
for line in file:
    t = parse(line)
    time_list.append(t)  # Reallocates memory on each append!

# NumPy approach (FAST)
t = np.array([])
for line in file:
    t = np.append(t, parse(line))  # Still slow! Creates new array each time

🔴 Danger: Both approaches are O(n²) due to repeated reallocation!

Optimal approach (pre-allocate):

# Count lines first
num_lines = count_lines(file_name)

# Pre-allocate arrays
t = np.zeros(num_lines)
F = np.zeros(num_lines)

# Fill arrays (O(n) instead of O(n²))
for i, line in enumerate(file):
    data = ADC_to_Pressure(line)
    t[i], F[i] = Pressure_to_Flow(data)

Performance comparison (10,000 data points):

  • Append method: ~500ms
  • Pre-allocate method: ~50ms (10× faster)

SciPy’s find_peaks Algorithm

How does find_peaks() work internally?

Naive approach (O(n²)):

def naive_find_peaks(signal):
    peaks = []
    for i in range(1, len(signal) - 1):
        # Check if current point is higher than neighbors
        if signal[i] > signal[i-1] and signal[i] > signal[i+1]:
            peaks.append(i)
    return peaks

SciPy’s optimized approach:

  1. First derivative zero-crossing (O(n)):

    diff = np.diff(signal)  # Calculate derivative
    sign_changes = np.diff(np.sign(diff))  # Find sign flips
    peaks = np.where(sign_changes < 0)[0] + 1  # Negative flip = peak
  2. Prominence filtering (O(n log n)):

    • Calculate “prominence” = vertical distance from peak to lowest contour line
    • Uses efficient peak-valley matching algorithm
  3. Distance filtering (O(n)):

    • Remove peaks within distance samples using sliding window

Memory usage:

  • Naive: O(n) for peaks list
  • SciPy: O(n) for intermediate arrays + O(k) for peaks (k < n)

Floating Point Precision in Medical Calculations

Critical consideration: Flow calculations involve subtracting similar numbers:

flow = A1 * np.sqrt(2 * (p1_ins - p2) / (1.199 * (((A1/A2)**2) - 1)))
#                         ↑ Catastrophic cancellation risk

Example:

p1_ins = 2456.123456789
p2     = 2456.123456788
delta  = 0.000000001  # Lost precision!

Python floats use 64-bit IEEE 754:

  • 1 sign bit
  • 11 exponent bits
  • 52 mantissa bits (≈15-17 decimal digits precision)

For pressures ~2500 Pascals, precision is ±0.0001 Pa

Mitigation: Use high-precision sensors (14-bit ADC minimum) and avoid operations on near-equal values.

Edge Cases & Pitfalls

Edge Case 1: Zero Division in Venturi Equation

Scenario: What if p1_ins = p2 (no pressure differential)?

flow = A1 * np.sqrt(2 * (p1_ins - p2) / (1.199 * (((A1/A2)**2) - 1)))
#                         ↑ (0 - 0) = 0, sqrt(0) = 0

Result: flow = 0 (correct!)

But what if A1 = A2 (no constriction)?

flow = A1 * np.sqrt(2 * delta_P / (1.199 * ((1**2) - 1)))
#                                            ↑ Division by zero!

Result: RuntimeWarning: divide by zeroflow = inf

Fix:

if abs(A1 - A2) < 1e-6:
    raise ValueError("Invalid venturi geometry: A1 must differ from A2")

Edge Case 2: Negative Values in sqrt()

Scenario: Sensor malfunction produces p2 > p1 (impossible physics).

flow = np.sqrt(2 * (p1_ins - p2) / ...)
#              ↑ Negative value!
# Result: RuntimeWarning: invalid value encountered in sqrt
# flow = nan

Fix: Add validation

delta_p = p1_ins - p2
if delta_p < 0:
    logging.warning(f"Invalid pressure differential: {delta_p}")
    return data[0], 0.0  # Assume zero flow

Edge Case 3: Breath Rate Division by Zero

def calculate_breath_rate(duration, breaths):
    breath_rate = breaths / (duration / 60)
    return breath_rate

Scenario: File has only 1-2 lines (duration ≈ 0.01s).

duration = 0.01
breath_rate = 2 / (0.01 / 60)  # = 2 / 0.0001667 = 12,000 BPM!

Physiological impossibility: Human breath rate is 12-20 BPM, never >100 BPM.

Fix:

if duration < 10:  # Require at least 10 seconds of data
    raise ValueError("Insufficient data duration for breath rate calculation")

breath_rate = breaths / (duration / 60)
if breath_rate > 60:
    logging.warning(f"Unrealistic breath rate: {breath_rate} BPM")

Security: File Path Injection

Vulnerable code:

def analysis_driver(file_name):
    with open(file_name, "r") as in_file:
        # Process file

Attack:

# Attacker provides malicious path
analysis_driver("../../../etc/passwd")
# Could read sensitive system files!

Fix: Validate file path

import os

ALLOWED_DIR = "/app/sample_data"

def analysis_driver(file_name):
    # Resolve to absolute path
    abs_path = os.path.abspath(file_name)

    # Ensure it's within allowed directory
    if not abs_path.startswith(ALLOWED_DIR):
        raise ValueError("Invalid file path")

    with open(abs_path, "r") as in_file:
        # Process file

Conclusion

What You Learned:

  1. Multi-Stage Signal Processing: Transformed raw sensor data through physics-based models (ADC → Pressure → Flow)
  2. Peak Detection Algorithms: Applied tuned find_peaks() with physiological constraints for medical data
  3. Paired Validation Logic: Implemented inhalation-exhalation matching to distinguish real breaths from artifacts
  4. Numerical Integration: Used Simpson’s rule for accurate volume calculations
  5. Temporal Pattern Recognition: Detected apnea events through time-delta analysis
  6. Defensive Error Handling: Validated every data point for NaN, empty values, and malformed input
  7. Floating Point Considerations: Understood precision limits in medical calculations

Advanced Concepts Demonstrated:

  • O(n) algorithm complexity for real-time medical data processing
  • NumPy vectorization for performance (10× faster than Python lists)
  • SciPy signal processing internals (derivative-based peak finding)
  • IEEE 754 floating point precision limitations in sensor data
  • Catastrophic cancellation risks in pressure differentials

Skill Transfer: These techniques apply to:

  • ECG signal processing (QRS complex detection)
  • Audio processing (beat detection, speech recognition)
  • Stock market analysis (trend detection, volatility calculation)
  • Vibration monitoring (machinery fault detection)
  • Environmental sensors (air quality, seismic data)

Next Steps:

  1. Implement real-time processing (sliding window instead of batch)
  2. Add machine learning for adaptive threshold tuning
  3. Integrate Kalman filtering for noise reduction
  4. Implement FDA-compliant audit trails for medical device software

We respect your privacy.

← View All Tutorials

Related Projects

    Ask me anything!