I want to launch a long-running process that sometimes takes a lot of time. Since I can't kill a running QRunnable directly, I'd like to use a "launcher" QRunnable that manages and starts no more than 6 other QRunnables in parallel. But when I try this, I get a
TypeError: cannot be converted to PyQt5.QtCore.QObject in this context. How do I set this up properly?
This is a common stumbling block when working with QRunnable and QThreadPool. The root cause of the error is that QRunnable is not a QObject, which means it cannot have signals of its own. When you try to define signals directly on a QRunnable subclass, Qt doesn't know what to do with them — and you get that TypeError.
The solution is to use a separate helper class that is a QObject to hold your signals, and then attach an instance of that helper to your QRunnable. This pattern is sometimes called a "worker signals" object.
Let's walk through this step by step, fixing the original approach and building a clean, working example.
Why QRunnable Can't Have Signals
In Qt, signals and slots belong to QObject. The QRunnable class deliberately does not inherit from QObject — it's designed to be a lightweight task that you hand off to a QThreadPool. This keeps things efficient, but it means you need a small workaround to communicate results back to your main thread.
The workaround is simple: create a QObject subclass that holds the signals, then create an instance of it inside your QRunnable.
from PyQt6.QtCore import QObject, pyqtSignal
class WorkerSignals(QObject):
"""Signals available from a running worker."""
finished = pyqtSignal()
error = pyqtSignal(tuple)
progress = pyqtSignal()
Then, inside your QRunnable.__init__, you create an instance:
self.signals = WorkerSignals()
And when you want to emit a signal, you use self.signals.finished.emit() instead of self.finished.emit().
Fixing the Trainer QRunnable
Here's the corrected Trainer class using this pattern:
from PyQt6.QtCore import QRunnable, pyqtSlot
class Trainer(QRunnable):
def __init__(self, fn, m):
super().__init__()
self.fn = fn
self.m = m
self.signals = WorkerSignals()
self.terminated = False
@pyqtSlot()
def run(self):
try:
self.fn(self.m)
except Exception:
import traceback, sys
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit(
(exctype, value, traceback.format_exc())
)
else:
self.signals.progress.emit()
finally:
self.terminated = True
self.signals.finished.emit()
Notice that all signal emissions go through self.signals. Everything else stays the same.
Fixing the Launcher QRunnable
The LaunchWorkers class has the same issue — it needs its own WorkerSignals (or a dedicated signals class). It also has a subtle bug: the stop method has the same name as the stop attribute, which would overwrite the method with a boolean on the first call. Let's rename the attribute to _stopped and the method to request_stop to keep things clear.
There's another issue with the original logic: the while loop that waits for threads to finish uses sleep(10), which means the launcher only checks every 10 seconds. We can reduce that for better responsiveness. We also need to fix the condition — the original code waits while finished_threads <= max_parallel_processes, but it should launch a new task whenever fewer than max_parallel_processes tasks are currently running.
Let's also create a separate signals class for the launcher, since it needs a finished signal too:
class LauncherSignals(QObject):
finished = pyqtSignal()
error = pyqtSignal(tuple)
Here's the corrected launcher:
from time import sleep
class LaunchWorkers(QRunnable):
def __init__(self, trainers, task_ids):
super().__init__()
self.trainers = trainers
self.task_ids = task_ids
self.signals = LauncherSignals()
self._stopped = False
def request_stop(self):
self._stopped = True
@pyqtSlot()
def run(self):
try:
for m in self.task_ids:
if self._stopped:
break
# Wait until we have a free slot.
while not self._stopped:
running = sum(
1 for t in self.trainers.values()
if not t.terminated
)
if running < MAX_PARALLEL_PROCESSES:
break
sleep(0.5)
if self._stopped:
break
threadpool.start(self.trainers[m])
except Exception:
import traceback, sys
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit(
(exctype, value, traceback.format_exc())
)
finally:
self.signals.finished.emit()
The launcher now polls every half second to see if there's room to start another task. When the stop is requested, it breaks out of both the waiting loop and the main loop.
Connecting Signals in the Main Window
With signals living on the .signals attribute, you need to update your connections:
trainer.signals.progress.connect(self.update_progress_bar)
trainer.signals.finished.connect(self.finished_process)
And for the launcher:
launcher.signals.finished.connect(self.all_processes_finished)
For the stop button, connect it to the launcher's request_stop method. Since we need a reference to the launcher, store it on the window:
self.stopButton.clicked.connect(self.stop)
def stop(self):
if self.launcher:
self.launcher.request_stop()
Setting the Thread Pool Size
Since the launcher itself runs on the thread pool and starts tasks on the same pool, you need enough threads to accommodate both the launcher and the parallel workers. Set the pool's max thread count accordingly:
threadpool = QThreadPool()
threadpool.setMaxThreadCount(MAX_PARALLEL_PROCESSES + 1)
The +1 accounts for the launcher runnable itself, which occupies one thread while it waits and dispatches work.
Complete Working Example
Here's everything put together as a complete, runnable application:
import sys
import traceback
from time import sleep
from PyQt6.QtCore import QObject, QRunnable, QThreadPool, pyqtSignal, pyqtSlot
from PyQt6.QtWidgets import (
QApplication,
QMainWindow,
QProgressBar,
QPushButton,
)
MAX_PARALLEL_PROCESSES = 6
class WorkerSignals(QObject):
"""Signals for individual Trainer runnables."""
finished = pyqtSignal(int)
error = pyqtSignal(tuple)
progress = pyqtSignal()
class LauncherSignals(QObject):
"""Signals for the LaunchWorkers runnable."""
finished = pyqtSignal()
error = pyqtSignal(tuple)
class Trainer(QRunnable):
"""A single long-running task."""
def __init__(self, fn, m):
super().__init__()
self.fn = fn
self.m = m
self.signals = WorkerSignals()
self.terminated = False
@pyqtSlot()
def run(self):
try:
self.fn(self.m)
except Exception:
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit(
(exctype, value, traceback.format_exc())
)
else:
self.signals.progress.emit()
finally:
self.terminated = True
self.signals.finished.emit(self.m)
class LaunchWorkers(QRunnable):
"""Manages launching Trainer runnables with a concurrency limit."""
def __init__(self, trainers, task_ids, threadpool):
super().__init__()
self.trainers = trainers
self.task_ids = task_ids
self.threadpool = threadpool
self.signals = LauncherSignals()
self._stopped = False
def request_stop(self):
self._stopped = True
@pyqtSlot()
def run(self):
try:
for m in self.task_ids:
if self._stopped:
break
# Wait until a slot is available.
while not self._stopped:
running = sum(
1 for t in self.trainers.values()
if not t.terminated
)
if running < MAX_PARALLEL_PROCESSES:
break
sleep(0.5)
if self._stopped:
break
self.threadpool.start(self.trainers[m])
except Exception:
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit(
(exctype, value, traceback.format_exc())
)
finally:
self.signals.finished.emit()
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setGeometry(300, 300, 350, 200)
self.threadpool = QThreadPool()
self.threadpool.setMaxThreadCount(MAX_PARALLEL_PROCESSES + 1)
self.trainers = {}
self.n_trainers = 10
self.launcher = None
self.count = 0
self.progress_bar = QProgressBar(self)
self.progress_bar.setGeometry(50, 20, 250, 20)
self.progress_bar.setValue(0)
self.start_button = QPushButton("Start", self)
self.start_button.setGeometry(50, 60, 100, 30)
self.start_button.clicked.connect(self.create_trainers)
self.stop_button = QPushButton("Stop", self)
self.stop_button.setGeometry(200, 60, 100, 30)
self.stop_button.clicked.connect(self.stop)
def the_long_process(self, m):
"""Simulate a long-running computation."""
print(f"Starting process {m}")
sleep(3)
print(f"Finished process {m}")
def create_trainers(self):
self.count = 0
self.progress_bar.setValue(0)
self.trainers = {}
for m in range(self.n_trainers):
trainer = Trainer(self.the_long_process, m)
trainer.signals.progress.connect(self.update_progress_bar)
trainer.signals.finished.connect(self.finished_process)
self.trainers[m] = trainer
self.launcher = LaunchWorkers(
self.trainers, range(self.n_trainers), self.threadpool
)
self.launcher.signals.finished.connect(self.all_processes_finished)
self.threadpool.start(self.launcher)
def update_progress_bar(self):
self.count += 1
percent = int(self.count / self.n_trainers * 100)
self.progress_bar.setValue(percent)
def finished_process(self, m):
print(f"Main window notified: process {m} done")
def all_processes_finished(self):
print("All processes finished.")
def stop(self):
if self.launcher:
self.launcher.request_stop()
print("Stop requested — no new tasks will be started.")
def main():
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()
When you run this, clicking Start will begin launching up to 6 tasks at a time from a total of 10. The progress bar updates as each task completes. Clicking Stop prevents any new tasks from being launched, though tasks already running will finish on their own.
Don't Forget
Stopping running tasks. The stop mechanism here prevents new tasks from starting. It does not interrupt tasks that are already running. If you need to cancel in-progress work, you'll need to add a check inside the long-running function itself (e.g., periodically check a flag and return early). For more on this topic, see our guide on how to start, stop, or pause running threads.
Thread safety. The terminated attribute on each Trainer is written in the worker thread and read in the launcher thread. For a simple boolean flag like this, Python's GIL makes this safe in practice. For more complex shared state, consider using QMutex or Python's threading.Lock. If you're new to multithreading in PyQt6, our complete guide to multithreading with QThreadPool covers the fundamentals.
Pool size. Setting setMaxThreadCount to MAX_PARALLEL_PROCESSES + 1 ensures the launcher thread always has a slot available. Without this, the launcher could be waiting for a pool slot while the tasks it's supposed to manage are also waiting — a deadlock.
Transmitting data from workers. In this example we emit simple signals, but you can also pass extra data with your Qt signals to send results back from each worker to the main thread.
This pattern gives you a clean way to manage batches of parallel work with a concurrency limit, all while keeping your UI responsive and providing a way to stop the process gracefully.
PyQt/PySide Office Hours 1:1 with Martin Fitzpatrick — Save yourself time and frustration. Get one on one help with your projects. Bring issues, bugs and questions about usability to architecture and maintainability, and leave with solutions.