featured image

Orchestrating Chaos: Writing a Thread-Safe Singleton to Coordinate N Concurrent GPU Jobs

Learn how to implement a thread-safe singleton for managing concurrent GPU jobs in ComfyUI.

Published

Sun Aug 17 2025

Technologies Used

Python ComfyUI
Advanced 54 minutes

ComfyUI’s execution model is clean and simple: one prompt, one run, done. Press queue, nodes execute, images appear. That works perfectly until you need to run 90 variations of a workflow.

You could queue 90 prompts manually. But then you have no logical grouping — you can’t know when “all 90 of this batch” are finished. You can’t build a comparison grid because you don’t know which 90 images belong together. You can’t show a progress bar. You can’t cancel the batch.

This is a distributed coordination problem. You’re orchestrating N independent processes (ComfyUI prompt executions) that must be treated as a single logical unit, sharing a lifecycle they’re individually unaware of.

core/iteration_state.py implements an IterationStateManager that owns the full batch lifecycle: interception, cloning, state tracking, result aggregation, timeout cleanup, and HTTP-based status reporting. It’s the architectural centerpiece of the extension.

What You Must Understand Before This Makes Sense

You need Python threading.Lock and when to use it, the singleton concept (one instance shared across the entire application), ComfyUI’s prompt format (a JSON dict mapping node IDs to node configurations), copy.deepcopy and why shallow copy fails on nested dicts, and async/await basics for the HTTP route handlers.

Environment:

Python >= 3.9
threading, copy, uuid, time  (standard library)
aiohttp                      (provided by ComfyUI's embedded server)
ComfyUI's PromptServer and prompt_queue internal APIs

One important caveat: this module calls PromptServer.instance and server.prompt_queue.put() — internal ComfyUI APIs that aren’t officially documented and could change between releases. Every such call is wrapped in try/except to fail gracefully.

Air Traffic Control: One Tower, Many Independent Flights

An air traffic control tower doesn’t fly any planes. It doesn’t build them. Its job is to know where every plane is, coordinate their sequencing, and declare when all planes in a flight group have landed. IterationStateManager is that tower. The individual ComfyUI execution workers are the planes — they take off independently, fly independently, but all report to the tower on landing.

The batch lifecycle has five states:

  • Pending: onprompt fires, batch is created
  • Active: combinations generated, prompts queued
  • Collecting: results arriving via register_result()
  • Complete: completed_count == total, grid build triggered
  • Cancelled or Expired: user cancelled or batch aged past 3600 seconds

The Double-Checked Locking Singleton

The singleton pattern ensures exactly one IterationStateManager exists for the entire ComfyUI process lifetime. This matters because the batch registry (_active_batches) must be shared: the onprompt handler adds to it, and node execution callbacks read from it — from different threads.

import threading

_instance = None
_instance_lock = threading.Lock()


class IterationStateManager:

    @classmethod
    def instance(cls):
        global _instance

        # First check (no lock): fast path for the common case.
        # 99.9% of calls take this path.
        if _instance is None:

            # Second check (with lock): only one thread creates the instance.
            # Without the lock, two threads could both pass the first check
            # and both call cls() — creating two instances.
            with _instance_lock:
                if _instance is None:
                    _instance = cls()

        return _instance

The outer if _instance is None check avoids acquiring the lock on every call, which would be a performance bottleneck. The inner check inside the lock prevents the race where two threads both see None and both try to create the instance.

You might wonder whether Python’s GIL makes this unnecessary. It doesn’t. The GIL prevents true parallel execution of Python bytecode, but if _instance is None followed by _instance = cls() is two bytecode operations. The GIL can be released between them — especially if there’s I/O, time.sleep, or C extension calls in between. The race is real.

The BatchState Data Class

Every batch gets its own BatchState object — a named container for correlated values that travel together.

class BatchState:

    def __init__(self, batch_id, total, combinations, batch_name, mode):
        self.batch_id        = batch_id
        self.total           = total
        self.combinations    = combinations
        self.batch_name      = batch_name
        self.mode            = mode  # "matrix" or "linear"

        self.results         = {}    # index → {images, metadata}
        self.completed_count = 0

        self.cancelled       = False
        self.created_at      = time.time()

        self.prompt_ids      = []    # For cancellation

Notice what’s not stored: image tensors. The results dict stores a path or lightweight reference. Holding full GPU tensors here would cause memory leaks for large batches. The WISaveImage and WIGridCompositor nodes manage the actual image data.

The Prompt Interceptor

handle_prompt() is called synchronously by ComfyUI’s onprompt hook before any execution begins. It must be fast and must not raise exceptions.

def handle_prompt(self, json_data):
    self._cleanup_expired()

    prompt = json_data.get("prompt", {})
    if not prompt:
        return json_data

    iterator_nodes = self._find_iterator_nodes(prompt)
    if not iterator_nodes:
        return json_data

    for node_id, node_data in iterator_nodes.items():
        inputs = node_data.get("inputs", {})
        if not inputs.get("enabled", True):
            continue

        mode       = inputs.get("mode", "matrix")
        batch_name = inputs.get("batch_name", "batch")

        param_defs = self._collect_param_definitions(prompt, node_id)
        if not param_defs:
            continue

        parsed_params = []
        for pdef in param_defs:
            try:
                values = parameter_parser.parse(pdef["values_raw"], pdef["type"])
                parsed_params.append({**pdef, "parsed_values": values})
            except (ValueError, FileNotFoundError) as e:
                logger.error(f"Error parsing parameter '{pdef['name']}': {e}")
                continue

        if not parsed_params:
            continue

The error handling philosophy here matters: a single bad parameter definition is logged and skipped, but it doesn’t abort the entire batch. The remaining parameters still run.

Deep-Copying the Prompt Graph

The first combination modifies the original prompt in-place. Every subsequent combination is a deep-copied clone, independently queued.

        if mode == "matrix":
            combinations = combination_engine.cartesian(parsed_params)
        else:
            combinations = combination_engine.linear_zip(parsed_params)

        total    = len(combinations)
        batch_id = f"{batch_name}_{uuid.uuid4().hex[:8]}"
        batch_state = BatchState(batch_id, total, combinations, batch_name, mode)

        self._apply_combination(
            json_data, prompt, workflow,
            combinations[0], 0, total, batch_id, batch_name, mode
        )

        self._queue_remaining(
            json_data, workflow, combinations,
            batch_id, batch_name, mode, total, batch_state
        )

        with self._lock:
            self._active_batches[batch_id] = batch_state

Inside _queue_remaining, copy.deepcopy creates a completely independent copy of the entire prompt graph. Shallow copy would share nested dicts, so _apply_combination on the clone would mutate the original.

def _queue_remaining(self, json_data, workflow, combinations, batch_id,
                     batch_name, mode, total, batch_state):
    from server import PromptServer
    server = PromptServer.instance

    for i in range(1, total):
        cloned        = copy.deepcopy(json_data)
        cloned_prompt = cloned.get("prompt", {})

        self._apply_combination(
            cloned, cloned_prompt, workflow,
            combinations[i], i, total, batch_id, batch_name, mode
        )

        new_prompt_id = str(uuid.uuid4())

        valid = server.prompt_queue.put(
            server.number,
            new_prompt_id,
            cloned_prompt,
            cloned.get("extra_data", {}),
            self._get_output_nodes(cloned_prompt),
        )

        if valid:
            server.number += 1

        batch_state.prompt_ids.append(new_prompt_id)

        with self._lock:
            self._prompt_to_batch[new_prompt_id] = (batch_id, i)

copy.deepcopy is O(n) where n is the size of the prompt graph. A ComfyUI prompt JSON is typically 5–50 KB, so cloning 90 of them allocates 450 KB–4.5 MB. At Python’s allocation rate, this takes 1–50 ms. For GPU workloads that run for seconds to minutes, this is irrelevant. I could optimize by only cloning the nodes that change, but the implementation complexity is substantial, the fragility is high, and the benefit is measured in milliseconds. deepcopy is the right call here.

Result Aggregation That Fires Exactly Once

register_result is what node executions call when they complete. It returns True exactly once — when the final iteration lands.

def register_result(self, batch_id, index, images, metadata):
    with self._lock:
        batch = self._active_batches.get(batch_id)
        if batch:
            batch.results[index] = {"images": images, "metadata": metadata}

            # The increment and comparison must happen inside the same lock.
            # Without it, two threads could both read completed_count=8,
            # both increment to 9, and both return True for a 10-item batch —
            # triggering grid assembly twice.
            batch.completed_count += 1

            logger.info(
                f"Batch '{batch.batch_name}': "
                f"{batch.completed_count}/{batch.total} complete"
            )

            return batch.completed_count >= batch.total

    return False

The critical point: both the increment and the >= comparison happen inside the same lock acquisition. Without the lock, this is a classic TOCTOU (time-of-check-time-of-use) race condition. Two threads could both read completed_count=8, both increment to 9, and both return True for a 10-item batch, triggering the grid compositor twice.

The _lock is a single instance-level lock. Everything that touches _active_batches or _prompt_to_batch acquires it. This is coarse-grained locking — simple, but potentially a bottleneck if hundreds of threads compete. In practice, the lock is held for microseconds (dictionary reads and writes), and the overwhelmingly common case is uncontested acquisition (a single atomic compare-and-swap in C). Finer-grained locking would be premature optimization.

Timeout Orphans, Late Results, and the Cancellation Gap

Late arriving results after cancellation. If a batch is cancelled and cleanup_batch removes it from _active_batches, but a result arrives a moment later via register_result, the lookup returns None and the method returns False silently. The orphaned result is discarded. This is intentional: the user cancelled the batch.

Timeout orphans. If ComfyUI crashes mid-batch, the batch state persists in _active_batches forever — a memory leak. _cleanup_expired runs at the start of every handle_prompt call and removes batches older than 3,600 seconds. This is lazy cleanup: it happens when the system is already active, not on a background timer. For a tool running locally, this is acceptable.

The cancellation gap. The /wi/cancel_batch/{batch_id} route calls server.prompt_queue.delete_queue_item(pid) for each queued prompt. This is a ComfyUI internal API. If it changes in a future release, cancellation will silently fail (the try/except catches it). The batch gets marked cancelled in the state manager, but prompts may still execute. This is a known fragility point — every call to ComfyUI internals is wrapped in try/except precisely because of situations like this.

The _prompt_to_batch map leak. Every queued prompt ID is added to _prompt_to_batch. This map is only cleaned up by cleanup_batch. If cleanup is never called because the workflow has no compositor node, the map grows until the 1-hour timeout fires. Not ideal, but bounded.

The real skill here isn’t the Python — it’s recognizing that “are we done?” is not a question you can answer safely without a lock, and that “clean up after crashed jobs” is a problem you can solve lazily without a background thread.

We respect your privacy.

← View All Tutorials

Related Projects

    Ask me anything!