I'm building an app that connects to several REST APIs. Some responses come back quickly, but one request can take over 5 minutes. I need the results displayed in different parts of the UI (like tabs in a workflow). How can I use
concurrent.futuresto handle this without freezing the GUI? And how do I show progress while waiting?
When your PyQt6 application needs to fetch data from the network — or do any work that takes more than a fraction of a second — you need to move that work off the main thread. If you don't, your entire interface will freeze until the work completes. For an API call that takes five minutes, that means five minutes of an unresponsive, stuck window. Not great.
Python's concurrent.futures module provides a clean, high-level way to run tasks in background threads (or processes). In this tutorial, we'll combine it with PyQt6's signal/slot system to build a tabbed workflow application that:
- Fetches data from APIs in the background
- Populates checkbox pick lists from the results
- Shows a spinning progress indicator while waiting
- Lets the user abort a long-running request
- Passes selections between tabs in a workflow
The challenge: threads and the GUI
PyQt6 (like most GUI frameworks) requires that all UI updates happen on the main thread. You can't modify a widget from a background thread — doing so leads to crashes or unpredictable behavior.
The pattern we'll use is:
- Submit work to a
ThreadPoolExecutorfromconcurrent.futures - Use a PyQt6
QTimerto periodically check whether the work is done - When it's done, retrieve the result and update the UI on the main thread
This keeps things safe, and it's straightforward once you see it in action. If you're more familiar with Qt's own threading approach, see the tutorial on multithreading PyQt6 applications with QThreadPool for a comparison.
A minimal example first
Before building the full application, let's start with the smallest possible example of running a background task with concurrent.futures in PyQt6.
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from PyQt6.QtCore import QTimer
from PyQt6.QtWidgets import QApplication, QLabel, QMainWindow, QPushButton, QVBoxLayout, QWidget
def slow_task():
"""Simulate a slow API call."""
time.sleep(3)
return "Data received!"
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("concurrent.futures + PyQt6")
self.label = QLabel("Press the button to fetch data.")
self.button = QPushButton("Fetch Data")
self.button.clicked.connect(self.start_task)
layout = QVBoxLayout()
layout.addWidget(self.label)
layout.addWidget(self.button)
container = QWidget()
container.setLayout(layout)
self.setCentralWidget(container)
self.executor = ThreadPoolExecutor(max_workers=2)
self.future = None
# A timer that checks on our background task.
self.check_timer = QTimer()
self.check_timer.setInterval(100) # Check every 100ms.
self.check_timer.timeout.connect(self.check_future)
def start_task(self):
self.label.setText("Working...")
self.button.setEnabled(False)
self.future = self.executor.submit(slow_task)
self.check_timer.start()
def check_future(self):
if self.future and self.future.done():
self.check_timer.stop()
result = self.future.result()
self.label.setText(result)
self.button.setEnabled(True)
self.future = None
app = QApplication(sys.argv)
window = MainWindow()
window.show()
app.exec()
Run this, and you'll see that the UI stays responsive during the 3-second wait. The button disables itself, the label shows "Working...", and when the result arrives, everything updates cleanly.
How this works
ThreadPoolExecutor.submit() runs a callable in a background thread and immediately returns a Future object. This Future is a handle to the work — you can check whether it's finished, get its result, or cancel it.
We use a QTimer to poll the future every 100 milliseconds. When future.done() returns True, we know it's safe to call future.result() and update the UI. All of this polling and updating happens on the main thread, so we never touch widgets from a background thread.
PyQt/PySide Development Services — Stuck in development hell? I'll help you get your project focused, finished and released. Benefit from years of practical experience releasing software with Python.
Using a worker wrapper with signals
Polling with a QTimer works well for simple cases, but when you have multiple concurrent tasks — each updating different parts of the UI — a signal-based approach is cleaner. We can create a small helper that watches a future and emits a signal when it completes.
from PyQt6.QtCore import QObject, QTimer, pyqtSignal
class FutureWatcher(QObject):
"""Watches a concurrent.futures.Future and emits a signal when done."""
finished = pyqtSignal(object) # Emits the result (or exception).
def __init__(self, future, parent=None):
super().__init__(parent)
self.future = future
self._timer = QTimer(self)
self._timer.setInterval(100)
self._timer.timeout.connect(self._check)
self._timer.start()
def _check(self):
if self.future.done():
self._timer.stop()
try:
result = self.future.result()
self.finished.emit(result)
except Exception as e:
self.finished.emit(e)
def cancel(self):
"""Attempt to cancel the future."""
self.future.cancel()
self._timer.stop()
With FutureWatcher, you can submit a task and connect its finished signal to any slot — including one that updates a specific widget. This is the approach we'll use in the full example.
Adding a progress spinner
For long-running tasks, you'll want to show the user that something is happening. PyQt6 doesn't have a built-in spinning animation widget, but we can create a simple one by painting a rotating arc.
import math
from PyQt6.QtCore import QTimer, Qt
from PyQt6.QtGui import QPainter, QPen, QColor
from PyQt6.QtWidgets import QWidget
class SpinnerWidget(QWidget):
"""A simple spinning progress indicator."""
def __init__(self, parent=None):
super().__init__(parent)
self.angle = 0
self.setFixedSize(40, 40)
self._timer = QTimer(self)
self._timer.setInterval(50)
self._timer.timeout.connect(self._rotate)
self.hide() # Hidden by default.
def start(self):
self.angle = 0
self.show()
self._timer.start()
def stop(self):
self._timer.stop()
self.hide()
def _rotate(self):
self.angle = (self.angle + 10) % 360
self.update()
def paintEvent(self, event):
painter = QPainter(self)
painter.setRenderHint(QPainter.RenderHint.Antialiasing)
pen = QPen(QColor("#3498db"), 3)
pen.setCapStyle(Qt.PenCapStyle.RoundCap)
painter.setPen(pen)
# Draw an arc that rotates.
rect = self.rect().adjusted(5, 5, -5, -5)
painter.drawArc(rect, self.angle * 16, 270 * 16)
This gives us a blue spinning arc. Call spinner.start() when work begins and spinner.stop() when it finishes.
Building the full tabbed workflow
Now let's put everything together into a real application. The scenario is a three-step workflow:
- Step 1: Fetch a list of items from API "A", display them as checkboxes, let the user pick one or more.
- Step 2: Fetch a shorter list from API "B", same idea.
- Step 3: Send the selections from Steps 1 and 2 to API "C", wait for the (possibly very slow) response, and display the result.
Since we don't have real APIs for this tutorial, we'll simulate them with time.sleep() and fake data. You can replace these functions with real requests.get() calls.
Here's the complete application:
import json
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from PyQt6.QtCore import QObject, QTimer, Qt, pyqtSignal
from PyQt6.QtGui import QColor, QPainter, QPen
from PyQt6.QtWidgets import (
QApplication,
QCheckBox,
QHBoxLayout,
QLabel,
QMainWindow,
QPlainTextEdit,
QPushButton,
QScrollArea,
QTabWidget,
QVBoxLayout,
QWidget,
)
# ---------------------------------------------------------------------------
# Simulated API calls (replace these with real requests)
# ---------------------------------------------------------------------------
def fetch_api_a():
"""Simulate fetching a long list of items from API A."""
time.sleep(1) # Simulates network delay.
return [f"Item {i}" for i in range(1, 121)]
def fetch_api_b():
"""Simulate fetching a shorter list from API B."""
time.sleep(0.5)
return [f"Option {chr(65 + i)}" for i in range(26)]
def fetch_api_c(selected_a, selected_b):
"""Simulate a long-running report request to API C."""
# In real code, you'd send selected_a and selected_b as parameters.
total = len(selected_a) + len(selected_b)
# Simulate variable duration: more selections = longer wait.
delay = min(2 + total * 0.3, 10) # Cap at 10s for the demo.
time.sleep(delay)
return {
"status": "success",
"selections_a": selected_a,
"selections_b": selected_b,
"records": total * 42,
"summary": f"Processed {total} selections, found {total * 42} records.",
}
# ---------------------------------------------------------------------------
# FutureWatcher — bridges concurrent.futures with Qt signals
# ---------------------------------------------------------------------------
class FutureWatcher(QObject):
"""Watches a Future and emits finished(result) on the main thread."""
finished = pyqtSignal(object)
def __init__(self, future, parent=None):
super().__init__(parent)
self.future = future
self._timer = QTimer(self)
self._timer.setInterval(100)
self._timer.timeout.connect(self._check)
self._timer.start()
def _check(self):
if self.future.done():
self._timer.stop()
try:
result = self.future.result()
except Exception as e:
result = e
self.finished.emit(result)
def cancel(self):
self.future.cancel()
self._timer.stop()
# ---------------------------------------------------------------------------
# SpinnerWidget — rotating arc progress indicator
# ---------------------------------------------------------------------------
class SpinnerWidget(QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.angle = 0
self.setFixedSize(40, 40)
self._timer = QTimer(self)
self._timer.setInterval(50)
self._timer.timeout.connect(self._rotate)
self.hide()
def start(self):
self.angle = 0
self.show()
self._timer.start()
def stop(self):
self._timer.stop()
self.hide()
def _rotate(self):
self.angle = (self.angle + 10) % 360
self.update()
def paintEvent(self, event):
painter = QPainter(self)
painter.setRenderHint(QPainter.RenderHint.Antialiasing)
pen = QPen(QColor("#3498db"), 3)
pen.setCapStyle(Qt.PenCapStyle.RoundCap)
painter.setPen(pen)
rect = self.rect().adjusted(5, 5, -5, -5)
painter.drawArc(rect, self.angle * 16, 270 * 16)
# ---------------------------------------------------------------------------
# ChecklistTab — a tab that loads a list and shows checkboxes
# ---------------------------------------------------------------------------
class ChecklistTab(QWidget):
"""A tab that fetches a list from an API and displays checkboxes."""
def __init__(self, title, fetch_func, executor, parent=None):
super().__init__(parent)
self.fetch_func = fetch_func
self.executor = executor
self.checkboxes = []
self.watcher = None
layout = QVBoxLayout(self)
# Top row: title, spinner, and buttons.
top_row = QHBoxLayout()
top_row.addWidget(QLabel(f"<b>{title}</b>"))
self.spinner = SpinnerWidget()
top_row.addWidget(self.spinner)
top_row.addStretch()
self.load_button = QPushButton("Load")
self.load_button.clicked.connect(self.load_data)
top_row.addWidget(self.load_button)
self.select_all_button = QPushButton("Select All")
self.select_all_button.clicked.connect(self.select_all)
self.select_all_button.setEnabled(False)
top_row.addWidget(self.select_all_button)
self.clear_button = QPushButton("Clear All")
self.clear_button.clicked.connect(self.clear_all)
self.clear_button.setEnabled(False)
top_row.addWidget(self.clear_button)
layout.addLayout(top_row)
# Scrollable area for checkboxes.
self.scroll_area = QScrollArea()
self.scroll_area.setWidgetResizable(True)
self.scroll_content = QWidget()
self.scroll_layout = QVBoxLayout(self.scroll_content)
self.scroll_layout.setAlignment(Qt.AlignmentFlag.AlignTop)
self.scroll_area.setWidget(self.scroll_content)
layout.addWidget(self.scroll_area)
self.status_label = QLabel("Press Load to fetch data.")
layout.addWidget(self.status_label)
def load_data(self):
"""Submit the fetch function to the thread pool."""
self.load_button.setEnabled(False)
self.spinner.start()
self.status_label.setText("Fetching data...")
future = self.executor.submit(self.fetch_func)
self.watcher = FutureWatcher(future, parent=self)
self.watcher.finished.connect(self.on_data_loaded)
def on_data_loaded(self, result):
"""Called on the main thread when the fetch completes."""
self.spinner.stop()
self.load_button.setEnabled(True)
if isinstance(result, Exception):
self.status_label.setText(f"Error: {result}")
return
# Clear old checkboxes.
for cb in self.checkboxes:
cb.deleteLater()
self.checkboxes.clear()
# Create new checkboxes.
for item in result:
cb = QCheckBox(item)
self.scroll_layout.addWidget(cb)
self.checkboxes.append(cb)
self.select_all_button.setEnabled(True)
self.clear_button.setEnabled(True)
self.status_label.setText(f"Loaded {len(result)} items.")
def select_all(self):
for cb in self.checkboxes:
cb.setChecked(True)
def clear_all(self):
for cb in self.checkboxes:
cb.setChecked(False)
def get_selected(self):
"""Return a list of the checked item texts."""
return [cb.text() for cb in self.checkboxes if cb.isChecked()]
# ---------------------------------------------------------------------------
# ReportTab — sends selections to API C and shows the result
# ---------------------------------------------------------------------------
class ReportTab(QWidget):
"""A tab that sends selections to an API and displays the JSON result."""
def __init__(self, tab_a, tab_b, executor, parent=None):
super().__init__(parent)
self.tab_a = tab_a
self.tab_b = tab_b
self.executor = executor
self.watcher = None
layout = QVBoxLayout(self)
# Top row.
top_row = QHBoxLayout()
top_row.addWidget(QLabel("<b>Step 3: Generate Report</b>"))
self.spinner = SpinnerWidget()
top_row.addWidget(self.spinner)
top_row.addStretch()
self.run_button = QPushButton("Run Report")
self.run_button.clicked.connect(self.run_report)
top_row.addWidget(self.run_button)
self.abort_button = QPushButton("Abort")
self.abort_button.clicked.connect(self.abort_report)
self.abort_button.setEnabled(False)
top_row.addWidget(self.abort_button)
layout.addLayout(top_row)
# Selection summary.
self.summary_label = QLabel("Select items in Steps 1 and 2, then run the report.")
layout.addWidget(self.summary_label)
# Result display.
self.result_view = QPlainTextEdit()
self.result_view.setReadOnly(True)
layout.addWidget(self.result_view)
self.status_label = QLabel("")
layout.addWidget(self.status_label)
def run_report(self):
selected_a = self.tab_a.get_selected()
selected_b = self.tab_b.get_selected()
if not selected_a and not selected_b:
self.status_label.setText("Please select at least one item from Step 1 or Step 2.")
return
self.summary_label.setText(
f"Sending {len(selected_a)} items from Step 1 "
f"and {len(selected_b)} items from Step 2..."
)
self.result_view.clear()
self.run_button.setEnabled(False)
self.abort_button.setEnabled(True)
self.spinner.start()
self.status_label.setText("Working... this may take a while.")
future = self.executor.submit(fetch_api_c, selected_a, selected_b)
self.watcher = FutureWatcher(future, parent=self)
self.watcher.finished.connect(self.on_report_done)
def on_report_done(self, result):
self.spinner.stop()
self.run_button.setEnabled(True)
self.abort_button.setEnabled(False)
if isinstance(result, Exception):
self.status_label.setText(f"Error: {result}")
return
formatted = json.dumps(result, indent=2)
self.result_view.setPlainText(formatted)
self.status_label.setText("Report complete.")
self.summary_label.setText(f"Result: {result.get('summary', '')}")
def abort_report(self):
if self.watcher:
self.watcher.cancel()
self.spinner.stop()
self.run_button.setEnabled(True)
self.abort_button.setEnabled(False)
self.status_label.setText("Aborted.")
# ---------------------------------------------------------------------------
# MainWindow
# ---------------------------------------------------------------------------
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("API Workflow — concurrent.futures + PyQt6")
self.resize(700, 500)
# Shared thread pool for all background work.
self.executor = ThreadPoolExecutor(max_workers=4)
# Create the tabs.
self.tabs = QTabWidget()
self.tab_a = ChecklistTab("Step 1: Select Items", fetch_api_a, self.executor)
self.tab_b = ChecklistTab("Step 2: Select Options", fetch_api_b, self.executor)
self.tab_c = ReportTab(self.tab_a, self.tab_b, self.executor)
self.tabs.addTab(self.tab_a, "Step 1")
self.tabs.addTab(self.tab_b, "Step 2")
self.tabs.addTab(self.tab_c, "Step 3: Report")
self.setCentralWidget(self.tabs)
def closeEvent(self, event):
self.executor.shutdown(wait=False, cancel_futures=True)
event.accept()
# ---------------------------------------------------------------------------
# Run
# ---------------------------------------------------------------------------
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MainWindow()
window.show()
app.exec()
Walking through the design
Let's look at how the pieces fit together.
The thread pool
We create a single ThreadPoolExecutor in MainWindow and share it across all tabs. This pool manages a small number of worker threads (4 in this case). When you call executor.submit(some_function), it queues the function to run on the next available thread and immediately returns a Future.
self.executor = ThreadPoolExecutor(max_workers=4)
Because the pool is shared, Steps 1 and 2 can fetch data simultaneously if the user switches between tabs quickly.
The FutureWatcher
Each time we submit work, we create a FutureWatcher that polls the Future and emits a finished signal when the result is ready. The signal carries the result as a Python object, so it can be a list, a dict, or an exception — whatever the task produced.
future = self.executor.submit(self.fetch_func)
self.watcher = FutureWatcher(future, parent=self)
self.watcher.finished.connect(self.on_data_loaded)
The connected slot (on_data_loaded) runs on the main thread, so it's safe to update widgets there.
The ChecklistTab
This widget handles Steps 1 and 2. When the user presses "Load", it submits the fetch function to the pool. When data arrives, it creates QCheckBox widgets inside a QScrollArea. The "Select All" and "Clear All" buttons make it easy to manage large lists.
The get_selected() method returns the text of all checked boxes. The report tab calls this method to gather the user's selections — no queues or shared state needed, just a direct method call from one tab to another.
The ReportTab
Step 3 reads the current selections from the other two tabs and submits them to fetch_api_c. Since this task can be slow, the tab shows a spinner and enables an "Abort" button.
Aborting a task
The abort button calls watcher.cancel(), which in turn calls future.cancel() on the underlying Future. There's an important caveat here: concurrent.futures can only cancel a future that hasn't started running yet. If the task is already executing in a thread, cancel() won't interrupt it — the thread will run to completion, and we simply ignore the result.
For truly cancellable long-running operations, you'd need to pass a flag (like a threading.Event) into your worker function and have it check periodically whether it should stop. Here's what that would look like:
import threading
def fetch_api_c_cancellable(selected_a, selected_b, cancel_event):
"""A version that checks for cancellation."""
total = len(selected_a) + len(selected_b)
steps = int(min(2 + total * 0.3, 10) * 10)
for i in range(steps):
if cancel_event.is_set():
return None # Cancelled.
time.sleep(0.1)
return {
"status": "success",
"records": total * 42,
}
You'd store the cancel_event on the tab and call cancel_event.set() from the abort button.
Why not queues?
You might have seen advice to use queue.Queue for communication between threads and the GUI. Queues work, but they add complexity — you need to put items into the queue from the worker, then poll the queue from the main thread with a timer. The FutureWatcher approach accomplishes the same thing with less code and a clearer connection between "this task" and "this handler."
If you had a stream of incoming results (like log messages or progress updates), a queue would make more sense. For request-response patterns like API calls, futures are a natural fit.
Replacing the simulated APIs with real requests
To use real HTTP endpoints, install the requests library and replace the simulated functions:
import requests
def fetch_api_a():
response = requests.get("https://api.example.com/items", timeout=30)
response.raise_for_status()
return response.json() # Return a list of items.
Everything else stays the same. The FutureWatcher doesn't care what the function does — it just waits for it to finish and delivers the result.
Summary
The pattern for using concurrent.futures with PyQt6 is:
- Create a
ThreadPoolExecutor(once, usually in your main window). - Submit slow work with
executor.submit(function, *args). - Use a
FutureWatcher(or aQTimer) to detect completion on the main thread. - Update your widgets in the connected slot — safely on the main thread.
This approach scales naturally. Each submitted task gets its own Future and its own watcher, so multiple requests can be in flight at the same time without interfering with each other. Your UI stays responsive, your users see progress feedback, and you don't need to manage raw threads or shared queues.
Once your application is complete, you can package it for distribution on Windows or create a macOS .dmg installer using PyInstaller.
Create GUI Applications with Python & Qt6 by Martin Fitzpatrick — (PyQt6 Edition) The hands-on guide to making apps with Python — Save time and build better with this book. Over 15K copies sold.