I'm running multiple workers in a
QThreadPool— some do inference on images, others copy files. Each task finishes quickly, so tracking progress within each worker doesn't make sense. But I need to know how long the whole batch took. How do I track the duration of workers across the thread pool and display it?
When you're running many fast tasks through a QThreadPool, a per-task progress bar isn't very useful. What you really want is to collect timing information from each worker after it finishes and combine those results in the main thread.
The approach is straightforward: each worker times itself using Python's time.time(), then emits the duration through a signal when it completes. The main window collects all those durations in a dictionary and updates the display on a timer.
Having Each Worker Time Itself
The simplest way to get timing data from a worker is to record the time at the start and end of its run() method, then emit the difference as a signal. Each worker gets a unique ID (using uuid) so you can tell the results apart.
import time
import uuid
from PyQt6.QtCore import QObject, QRunnable, pyqtSignal, pyqtSlot
class WorkerSignals(QObject):
duration = pyqtSignal(str, float)
finished = pyqtSignal(str)
class Worker(QRunnable):
def __init__(self):
super().__init__()
self.job_id = uuid.uuid4().hex
self.signals = WorkerSignals()
@pyqtSlot()
def run(self):
start_time = time.time()
# Your actual work goes here — inference, file copy, etc.
time.sleep(0.5) # Simulating some work
end_time = time.time()
self.signals.duration.emit(self.job_id, end_time - start_time)
self.signals.finished.emit(self.job_id)
The duration signal carries two pieces of information: the worker's unique ID and the elapsed time in seconds. The finished signal lets you know the worker is done, which is useful for tracking how many workers have completed.
Collecting Durations in the Main Window
On the GUI side, you store each worker's duration in a dictionary keyed by its job ID. A QTimer periodically refreshes the display so you can see results updating in near real-time as workers finish.
def update_duration(self, job_id, duration):
self.worker_duration[job_id] = duration
Each time a worker emits its duration signal, this slot adds the result to the dictionary. Because signals emitted from worker threads are delivered to slots in the main thread via Qt's event loop, this is thread-safe — you don't need to add any locking yourself.
Calculating the Total
To get the total time across all completed workers, sum the values in the dictionary:
def calculate_duration(self):
if not self.worker_duration:
return 0.0
return sum(self.worker_duration.values())
This gives you the cumulative time spent working across all threads. If you're running four workers in parallel and each takes 0.5 seconds, the total will be approximately 2.0 seconds — even though the wall-clock time was only about 0.5 seconds. Both numbers can be useful depending on what you're measuring.
If you want the wall-clock time for the entire batch instead, you can record time.time() when you start the batch and compare it to when the last worker finishes.
Complete Working Example
Here's a full application that starts a batch of workers, collects their durations, and displays the results. Press the button to kick off a batch — the label updates as each worker completes.
import random
import sys
import time
import uuid
from PyQt6.QtCore import QObject, QRunnable, QThreadPool, QTimer, pyqtSignal, pyqtSlot
from PyQt6.QtWidgets import (
QApplication,
QLabel,
QMainWindow,
QPushButton,
QVBoxLayout,
QWidget,
)
class WorkerSignals(QObject):
"""Signals emitted by a Worker."""
duration = pyqtSignal(str, float)
finished = pyqtSignal(str)
class Worker(QRunnable):
"""A worker that times its own execution and emits the duration."""
def __init__(self):
super().__init__()
self.job_id = uuid.uuid4().hex
self.signals = WorkerSignals()
@pyqtSlot()
def run(self):
start_time = time.time()
# Simulate work with a random delay.
total_n = 1000
delay = random.random() / 100
for n in range(total_n):
time.sleep(delay)
end_time = time.time()
self.signals.duration.emit(self.job_id, end_time - start_time)
self.signals.finished.emit(self.job_id)
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
layout = QVBoxLayout()
self.duration_label = QLabel("No batch running")
self.batch_time_label = QLabel("")
button = QPushButton("Start Batch")
button.pressed.connect(self.execute)
layout.addWidget(self.duration_label)
layout.addWidget(self.batch_time_label)
layout.addWidget(button)
container = QWidget()
container.setLayout(layout)
self.setCentralWidget(container)
# Dictionary to hold durations keyed by job ID.
self.worker_duration = {}
self.batch_size = 0
self.batch_start_time = None
self.threadpool = QThreadPool()
print(
f"Multithreading with maximum {self.threadpool.maxThreadCount()} threads"
)
# Timer to refresh the display periodically.
self.timer = QTimer()
self.timer.setInterval(100)
self.timer.timeout.connect(self.refresh_duration)
self.timer.start()
def execute(self):
self.cleanup()
self.batch_size = 4
self.batch_start_time = time.time()
for n in range(self.batch_size):
worker = Worker()
worker.signals.duration.connect(self.update_duration)
worker.signals.finished.connect(self.check_batch_complete)
self.threadpool.start(worker)
def cleanup(self):
self.worker_duration = {}
self.batch_start_time = None
self.duration_label.setText("Batch started...")
self.batch_time_label.setText("")
def update_duration(self, job_id, duration):
self.worker_duration[job_id] = duration
def check_batch_complete(self, job_id):
if len(self.worker_duration) == self.batch_size:
wall_clock = time.time() - self.batch_start_time
self.batch_time_label.setText(
f"Wall-clock time for batch: {wall_clock:.2f}s"
)
def calculate_duration(self):
if not self.worker_duration:
return 0.0
return sum(self.worker_duration.values())
def refresh_duration(self):
total = self.calculate_duration()
completed = len(self.worker_duration)
self.duration_label.setText(
f"Cumulative worker time: {total:.2f}s ({completed} complete)"
)
app = QApplication(sys.argv)
window = MainWindow()
window.show()
app.exec()
When you run this and click "Start Batch", four workers begin executing in parallel. As each one finishes, it reports its duration back to the main window. The label updates every 100ms, showing you the cumulative time and how many workers have completed. Once all four are done, the wall-clock time for the entire batch is displayed as well.
Adapting This for Your Own Workers
To use this pattern with your own tasks — inference, file copying, or anything else — replace the simulated work inside Worker.run() with your actual logic. The timing and signal emission stay the same:
@pyqtSlot()
def run(self):
start_time = time.time()
# Do your real work here.
result = run_inference(self.image_path)
copy_file(self.image_path, result.target_folder)
end_time = time.time()
self.signals.duration.emit(self.job_id, end_time - start_time)
self.signals.finished.emit(self.job_id)
If you have two distinct types of workers (one for inference, one for file copying), you can create separate QRunnable subclasses for each. Both can use the same WorkerSignals class and emit duration the same way. The main window doesn't need to care what kind of work was done — it just collects the timings.
Tracking Multiple Batches
If you're running several batches concurrently, pass a batch ID into each worker so you can group results:
class Worker(QRunnable):
def __init__(self, batch_id):
super().__init__()
self.batch_id = batch_id
self.job_id = uuid.uuid4().hex
self.signals = WorkerSignals()
Then in your signals, include the batch ID alongside the duration, and use a nested dictionary in the main window to keep each batch's results separate.
For more background on multithreading with QThreadPool and QRunnable, see our guide to multithreading in PyQt6. For connecting workers to your GUI using signals and slots, our signals and slots tutorial covers the fundamentals. And if you do want to add a progress bar for tracking batch completion (e.g., showing 3 of 4 workers done), our QProgressBar tutorial walks through how to set that up.
Packaging Python Applications with PyInstaller by Martin Fitzpatrick — This step-by-step guide walks you through packaging your own Python applications from simple examples to complete installers and signed executables.