Managing Multiple QRunnables with a Launcher QRunnable in PyQt6

How to coordinate parallel tasks and add stop functionality using QThreadPool
Heads up! You've already completed this tutorial.

I want to launch a long-running process that sometimes takes a lot of time. Since I can't kill a running QRunnable directly, I'd like to use a "launcher" QRunnable that manages and starts no more than 6 other QRunnables in parallel. But when I try this, I get a TypeError: cannot be converted to PyQt5.QtCore.QObject in this context. How do I set this up properly?

This is a common stumbling block when working with QRunnable and QThreadPool. The root cause of the error is that QRunnable is not a QObject, which means it cannot have signals of its own. When you try to define signals directly on a QRunnable subclass, Qt doesn't know what to do with them — and you get that TypeError.

The solution is to use a separate helper class that is a QObject to hold your signals, and then attach an instance of that helper to your QRunnable. This pattern is sometimes called a "worker signals" object.

Let's walk through this step by step, fixing the original approach and building a clean, working example.

Why QRunnable Can't Have Signals

In Qt, signals and slots belong to QObject. The QRunnable class deliberately does not inherit from QObject — it's designed to be a lightweight task that you hand off to a QThreadPool. This keeps things efficient, but it means you need a small workaround to communicate results back to your main thread.

The workaround is simple: create a QObject subclass that holds the signals, then create an instance of it inside your QRunnable.

python
from PyQt6.QtCore import QObject, pyqtSignal


class WorkerSignals(QObject):
    """Signals available from a running worker."""
    finished = pyqtSignal()
    error = pyqtSignal(tuple)
    progress = pyqtSignal()

Then, inside your QRunnable.__init__, you create an instance:

python
self.signals = WorkerSignals()

And when you want to emit a signal, you use self.signals.finished.emit() instead of self.finished.emit().

Fixing the Trainer QRunnable

Here's the corrected Trainer class using this pattern:

python
from PyQt6.QtCore import QRunnable, pyqtSlot


class Trainer(QRunnable):
    def __init__(self, fn, m):
        super().__init__()
        self.fn = fn
        self.m = m
        self.signals = WorkerSignals()
        self.terminated = False

    @pyqtSlot()
    def run(self):
        try:
            self.fn(self.m)
        except Exception:
            import traceback, sys
            traceback.print_exc()
            exctype, value = sys.exc_info()[:2]
            self.signals.error.emit(
                (exctype, value, traceback.format_exc())
            )
        else:
            self.signals.progress.emit()
        finally:
            self.terminated = True
            self.signals.finished.emit()

Notice that all signal emissions go through self.signals. Everything else stays the same.

Fixing the Launcher QRunnable

The LaunchWorkers class has the same issue — it needs its own WorkerSignals (or a dedicated signals class). It also has a subtle bug: the stop method has the same name as the stop attribute, which would overwrite the method with a boolean on the first call. Let's rename the attribute to _stopped and the method to request_stop to keep things clear.

Over 15,000 developers have bought Create GUI Applications with Python & Qt!
Create GUI Applications with Python & Qt6
Get the book

Downloadable ebook (PDF, ePub) & Complete Source code

There's another issue with the original logic: the while loop that waits for threads to finish uses sleep(10), which means the launcher only checks every 10 seconds. We can reduce that for better responsiveness. We also need to fix the condition — the original code waits while finished_threads <= max_parallel_processes, but it should launch a new task whenever fewer than max_parallel_processes tasks are currently running.

Let's also create a separate signals class for the launcher, since it needs a finished signal too:

python
class LauncherSignals(QObject):
    finished = pyqtSignal()
    error = pyqtSignal(tuple)

Here's the corrected launcher:

python
from time import sleep


class LaunchWorkers(QRunnable):
    def __init__(self, trainers, task_ids):
        super().__init__()
        self.trainers = trainers
        self.task_ids = task_ids
        self.signals = LauncherSignals()
        self._stopped = False

    def request_stop(self):
        self._stopped = True

    @pyqtSlot()
    def run(self):
        try:
            for m in self.task_ids:
                if self._stopped:
                    break

                # Wait until we have a free slot.
                while not self._stopped:
                    running = sum(
                        1 for t in self.trainers.values()
                        if not t.terminated
                    )
                    if running < MAX_PARALLEL_PROCESSES:
                        break
                    sleep(0.5)

                if self._stopped:
                    break

                threadpool.start(self.trainers[m])

        except Exception:
            import traceback, sys
            traceback.print_exc()
            exctype, value = sys.exc_info()[:2]
            self.signals.error.emit(
                (exctype, value, traceback.format_exc())
            )
        finally:
            self.signals.finished.emit()

The launcher now polls every half second to see if there's room to start another task. When the stop is requested, it breaks out of both the waiting loop and the main loop.

Connecting Signals in the Main Window

With signals living on the .signals attribute, you need to update your connections:

python
trainer.signals.progress.connect(self.update_progress_bar)
trainer.signals.finished.connect(self.finished_process)

And for the launcher:

python
launcher.signals.finished.connect(self.all_processes_finished)

For the stop button, connect it to the launcher's request_stop method. Since we need a reference to the launcher, store it on the window:

python
self.stopButton.clicked.connect(self.stop)
python
def stop(self):
    if self.launcher:
        self.launcher.request_stop()

Setting the Thread Pool Size

Since the launcher itself runs on the thread pool and starts tasks on the same pool, you need enough threads to accommodate both the launcher and the parallel workers. Set the pool's max thread count accordingly:

python
threadpool = QThreadPool()
threadpool.setMaxThreadCount(MAX_PARALLEL_PROCESSES + 1)

The +1 accounts for the launcher runnable itself, which occupies one thread while it waits and dispatches work.

Complete Working Example

Here's everything put together as a complete, runnable application:

python
import sys
import traceback
from time import sleep

from PyQt6.QtCore import QObject, QRunnable, QThreadPool, pyqtSignal, pyqtSlot
from PyQt6.QtWidgets import (
    QApplication,
    QMainWindow,
    QProgressBar,
    QPushButton,
)

MAX_PARALLEL_PROCESSES = 6


class WorkerSignals(QObject):
    """Signals for individual Trainer runnables."""
    finished = pyqtSignal(int)
    error = pyqtSignal(tuple)
    progress = pyqtSignal()


class LauncherSignals(QObject):
    """Signals for the LaunchWorkers runnable."""
    finished = pyqtSignal()
    error = pyqtSignal(tuple)


class Trainer(QRunnable):
    """A single long-running task."""

    def __init__(self, fn, m):
        super().__init__()
        self.fn = fn
        self.m = m
        self.signals = WorkerSignals()
        self.terminated = False

    @pyqtSlot()
    def run(self):
        try:
            self.fn(self.m)
        except Exception:
            traceback.print_exc()
            exctype, value = sys.exc_info()[:2]
            self.signals.error.emit(
                (exctype, value, traceback.format_exc())
            )
        else:
            self.signals.progress.emit()
        finally:
            self.terminated = True
            self.signals.finished.emit(self.m)


class LaunchWorkers(QRunnable):
    """Manages launching Trainer runnables with a concurrency limit."""

    def __init__(self, trainers, task_ids, threadpool):
        super().__init__()
        self.trainers = trainers
        self.task_ids = task_ids
        self.threadpool = threadpool
        self.signals = LauncherSignals()
        self._stopped = False

    def request_stop(self):
        self._stopped = True

    @pyqtSlot()
    def run(self):
        try:
            for m in self.task_ids:
                if self._stopped:
                    break

                # Wait until a slot is available.
                while not self._stopped:
                    running = sum(
                        1 for t in self.trainers.values()
                        if not t.terminated
                    )
                    if running < MAX_PARALLEL_PROCESSES:
                        break
                    sleep(0.5)

                if self._stopped:
                    break

                self.threadpool.start(self.trainers[m])

        except Exception:
            traceback.print_exc()
            exctype, value = sys.exc_info()[:2]
            self.signals.error.emit(
                (exctype, value, traceback.format_exc())
            )
        finally:
            self.signals.finished.emit()


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setGeometry(300, 300, 350, 200)

        self.threadpool = QThreadPool()
        self.threadpool.setMaxThreadCount(MAX_PARALLEL_PROCESSES + 1)

        self.trainers = {}
        self.n_trainers = 10
        self.launcher = None
        self.count = 0

        self.progress_bar = QProgressBar(self)
        self.progress_bar.setGeometry(50, 20, 250, 20)
        self.progress_bar.setValue(0)

        self.start_button = QPushButton("Start", self)
        self.start_button.setGeometry(50, 60, 100, 30)
        self.start_button.clicked.connect(self.create_trainers)

        self.stop_button = QPushButton("Stop", self)
        self.stop_button.setGeometry(200, 60, 100, 30)
        self.stop_button.clicked.connect(self.stop)

    def the_long_process(self, m):
        """Simulate a long-running computation."""
        print(f"Starting process {m}")
        sleep(3)
        print(f"Finished process {m}")

    def create_trainers(self):
        self.count = 0
        self.progress_bar.setValue(0)
        self.trainers = {}

        for m in range(self.n_trainers):
            trainer = Trainer(self.the_long_process, m)
            trainer.signals.progress.connect(self.update_progress_bar)
            trainer.signals.finished.connect(self.finished_process)
            self.trainers[m] = trainer

        self.launcher = LaunchWorkers(
            self.trainers, range(self.n_trainers), self.threadpool
        )
        self.launcher.signals.finished.connect(self.all_processes_finished)
        self.threadpool.start(self.launcher)

    def update_progress_bar(self):
        self.count += 1
        percent = int(self.count / self.n_trainers * 100)
        self.progress_bar.setValue(percent)

    def finished_process(self, m):
        print(f"Main window notified: process {m} done")

    def all_processes_finished(self):
        print("All processes finished.")

    def stop(self):
        if self.launcher:
            self.launcher.request_stop()
            print("Stop requested — no new tasks will be started.")


def main():
    app = QApplication(sys.argv)
    window = MainWindow()
    window.show()
    sys.exit(app.exec())


if __name__ == "__main__":
    main()

When you run this, clicking Start will begin launching up to 6 tasks at a time from a total of 10. The progress bar updates as each task completes. Clicking Stop prevents any new tasks from being launched, though tasks already running will finish on their own.

Don't Forget

Stopping running tasks. The stop mechanism here prevents new tasks from starting. It does not interrupt tasks that are already running. If you need to cancel in-progress work, you'll need to add a check inside the long-running function itself (e.g., periodically check a flag and return early). For more on this topic, see our guide on how to start, stop, or pause running threads.

Thread safety. The terminated attribute on each Trainer is written in the worker thread and read in the launcher thread. For a simple boolean flag like this, Python's GIL makes this safe in practice. For more complex shared state, consider using QMutex or Python's threading.Lock. If you're new to multithreading in PyQt6, our complete guide to multithreading with QThreadPool covers the fundamentals.

Pool size. Setting setMaxThreadCount to MAX_PARALLEL_PROCESSES + 1 ensures the launcher thread always has a slot available. Without this, the launcher could be waiting for a pool slot while the tasks it's supposed to manage are also waiting — a deadlock.

Transmitting data from workers. In this example we emit simple signals, but you can also pass extra data with your Qt signals to send results back from each worker to the main thread.

This pattern gives you a clean way to manage batches of parallel work with a concurrency limit, all while keeping your UI responsive and providing a way to stop the process gracefully.

PyQt/PySide Office Hours 1:1 with Martin Fitzpatrick — Save yourself time and frustration. Get one on one help with your projects. Bring issues, bugs and questions about usability to architecture and maintainability, and leave with solutions.

60 mins ($195)

Well done, you've finished this tutorial! Mark As Complete
[[ user.completed.length ]] completed [[ user.streak+1 ]] day streak
Martin Fitzpatrick

Managing Multiple QRunnables with a Launcher QRunnable in PyQt6 was written by Martin Fitzpatrick.

Martin Fitzpatrick has been developing Python/Qt apps for 8 years. Building desktop applications to make data-analysis tools more user-friendly, Python was the obvious choice. Starting with Tk, later moving to wxWidgets and finally adopting PyQt. Martin founded PythonGUIs to provide easy to follow GUI programming tutorials to the Python community. He has written a number of popular Python books on the subject.