I want my application to use an SQL database offline and also have an online database — something like Google Contacts, where you save a contact and it automatically updates when you're connected to the internet. How do I create a local offline database that migrates changes to an online database when connected?
This is a common pattern in application development known as offline-first design. The idea is that your app always works — even without an internet connection — by reading and writing to a local database. Then, when a connection becomes available, your app syncs those changes to a remote server.
In this article, we'll walk through the architecture and practical steps for building this kind of system in Python, using SQLite for the local database and a remote database (like PostgreSQL or MySQL) on a server.
Understanding the Architecture
Before writing any code, it helps to have a clear picture of how the pieces fit together.
Your application will have two databases:
-
A local SQLite database — this is the one your app reads from and writes to during normal use. SQLite is a great choice here because it requires no separate server process, stores everything in a single file, and is included with Python's standard library.
-
A remote database on a server — this could be PostgreSQL, MySQL, or even another SQLite database exposed through an API. This is your "source of truth" that all clients eventually sync to.
The flow looks like this:
- The app starts and tries to connect to the remote server.
- If connected, it pulls down the latest state from the server.
- The user works with the app normally — all reads and writes go to the local SQLite database.
- Changes are logged locally.
- When a connection is available, logged changes are sent to the remote server.
Do Both Databases Need to Be the Same?
They don't have to be, but it makes your life easier if they are similar. If both sides use standard SQL, you can often replay the same SQL statements on either database without translation.
That said, using SQLite locally and PostgreSQL (or MySQL) remotely is a very common and practical combination. SQLite is lightweight and portable — perfect for a local client. PostgreSQL or MySQL are designed for multi-user, server-based access. As long as you stick to standard SQL types and operations, this combination works well.
Purchasing Power Parity
Developers in [[ country ]] get [[ discount.discount_pc ]]% OFF on all books & courses with code [[ discount.coupon_code ]]Where you might run into trouble is with more advanced or database-specific features — custom data types, certain date/time handling quirks, or non-standard SQL syntax. For straightforward data (text, numbers, dates stored as ISO strings), you'll be fine.
Setting Up the Local SQLite Database
Let's start with a simple example. Suppose you're building a contacts app. Here's how you might set up the local database:
import sqlite3
from datetime import datetime, timezone
def create_local_db(db_path="local_contacts.db"):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create the main contacts table.
cursor.execute("""
CREATE TABLE IF NOT EXISTS contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT,
phone TEXT,
updated_at TEXT NOT NULL
)
""")
# Create a change log table to track what needs syncing.
cursor.execute("""
CREATE TABLE IF NOT EXISTS change_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT NOT NULL,
record_id INTEGER NOT NULL,
operation TEXT NOT NULL,
timestamp TEXT NOT NULL,
synced INTEGER DEFAULT 0
)
""")
conn.commit()
return conn
conn = create_local_db()
print("Local database created successfully.")
The contacts table holds your actual data. The change_log table is where we record every insert, update, or delete that happens locally. This log is what drives the sync process.
Logging Changes Locally
Every time you modify the contacts table, you also write an entry to the change log. Here's a set of helper functions that do this:
import sqlite3
from datetime import datetime, timezone
def create_local_db(db_path="local_contacts.db"):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT,
phone TEXT,
updated_at TEXT NOT NULL
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS change_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT NOT NULL,
record_id INTEGER NOT NULL,
operation TEXT NOT NULL,
timestamp TEXT NOT NULL,
synced INTEGER DEFAULT 0
)
""")
conn.commit()
return conn
def now_iso():
return datetime.now(timezone.utc).isoformat()
def add_contact(conn, name, email, phone):
cursor = conn.cursor()
timestamp = now_iso()
cursor.execute(
"INSERT INTO contacts (name, email, phone, updated_at) VALUES (?, ?, ?, ?)",
(name, email, phone, timestamp),
)
record_id = cursor.lastrowid
# Log the change.
cursor.execute(
"INSERT INTO change_log (table_name, record_id, operation, timestamp) "
"VALUES (?, ?, ?, ?)",
("contacts", record_id, "INSERT", timestamp),
)
conn.commit()
return record_id
def update_contact(conn, contact_id, name, email, phone):
cursor = conn.cursor()
timestamp = now_iso()
cursor.execute(
"UPDATE contacts SET name=?, email=?, phone=?, updated_at=? WHERE id=?",
(name, email, phone, timestamp, contact_id),
)
cursor.execute(
"INSERT INTO change_log (table_name, record_id, operation, timestamp) "
"VALUES (?, ?, ?, ?)",
("contacts", contact_id, "UPDATE", timestamp),
)
conn.commit()
def delete_contact(conn, contact_id):
cursor = conn.cursor()
timestamp = now_iso()
cursor.execute("DELETE FROM contacts WHERE id=?", (contact_id,))
cursor.execute(
"INSERT INTO change_log (table_name, record_id, operation, timestamp) "
"VALUES (?, ?, ?, ?)",
("contacts", contact_id, "DELETE", timestamp),
)
conn.commit()
# Try it out.
conn = create_local_db()
contact_id = add_contact(conn, "Alice Smith", "alice@example.com", "555-1234")
print(f"Added contact with ID: {contact_id}")
update_contact(conn, contact_id, "Alice J. Smith", "alice@example.com", "555-1234")
print(f"Updated contact {contact_id}")
Every modification is now tracked. When it's time to sync, you simply query the change log for entries where synced = 0.
Syncing Changes to the Remote Server
The sync process reads unsynced entries from the change log, sends them to the remote server, and marks them as synced once the server confirms receipt.
How you communicate with the remote server depends on your setup. A common approach is to have a simple REST API on the server that accepts change operations. Here's what the client-side sync logic might look like:
import sqlite3
import requests
from datetime import datetime, timezone
def create_local_db(db_path="local_contacts.db"):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT,
phone TEXT,
updated_at TEXT NOT NULL
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS change_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT NOT NULL,
record_id INTEGER NOT NULL,
operation TEXT NOT NULL,
timestamp TEXT NOT NULL,
synced INTEGER DEFAULT 0
)
""")
conn.commit()
return conn
def now_iso():
return datetime.now(timezone.utc).isoformat()
def add_contact(conn, name, email, phone):
cursor = conn.cursor()
timestamp = now_iso()
cursor.execute(
"INSERT INTO contacts (name, email, phone, updated_at) VALUES (?, ?, ?, ?)",
(name, email, phone, timestamp),
)
record_id = cursor.lastrowid
cursor.execute(
"INSERT INTO change_log (table_name, record_id, operation, timestamp) "
"VALUES (?, ?, ?, ?)",
("contacts", record_id, "INSERT", timestamp),
)
conn.commit()
return record_id
def get_contact_by_id(conn, contact_id):
cursor = conn.cursor()
cursor.execute("SELECT * FROM contacts WHERE id=?", (contact_id,))
return cursor.fetchone()
def get_unsynced_changes(conn):
cursor = conn.cursor()
cursor.execute(
"SELECT id, table_name, record_id, operation, timestamp "
"FROM change_log WHERE synced = 0 ORDER BY timestamp"
)
return cursor.fetchall()
def mark_change_synced(conn, change_id):
cursor = conn.cursor()
cursor.execute("UPDATE change_log SET synced = 1 WHERE id = ?", (change_id,))
conn.commit()
def sync_to_server(conn, server_url):
changes = get_unsynced_changes(conn)
if not changes:
print("Nothing to sync.")
return
for change in changes:
change_id, table_name, record_id, operation, timestamp = change
# Build the payload to send to the server.
payload = {
"table": table_name,
"record_id": record_id,
"operation": operation,
"timestamp": timestamp,
}
# For INSERT and UPDATE, include the current record data.
if operation in ("INSERT", "UPDATE"):
record = get_contact_by_id(conn, record_id)
if record:
payload["data"] = {
"id": record[0],
"name": record[1],
"email": record[2],
"phone": record[3],
"updated_at": record[4],
}
try:
response = requests.post(
f"{server_url}/sync",
json=payload,
timeout=10,
)
if response.status_code == 200:
mark_change_synced(conn, change_id)
print(f"Synced change {change_id}: {operation} on record {record_id}")
else:
print(
f"Server returned {response.status_code} for change {change_id}. "
"Will retry later."
)
break # Stop syncing on first failure to preserve order.
except requests.ConnectionError:
print("Cannot reach server. Will retry when connected.")
break
# Example usage (the server URL would point to your actual API).
conn = create_local_db()
add_contact(conn, "Bob Jones", "bob@example.com", "555-5678")
sync_to_server(conn, "https://yourserver.example.com")
If the server is unreachable, the changes remain in the log with synced = 0 and will be picked up on the next sync attempt. The order of changes is preserved by sorting on the timestamp.
Pulling Changes from the Server
If your app only runs on a single device, you may only need one-way sync (local → remote). But if you want to support multiple devices, you'll also need to pull changes from the server.
A straightforward approach is to track the last time you synced, then ask the server for all changes since that timestamp:
import json
import requests
def pull_from_server(conn, server_url, config_path="sync_config.json"):
# Load the last sync timestamp.
try:
with open(config_path) as f:
config = json.load(f)
last_sync = config.get("last_sync", "1970-01-01T00:00:00+00:00")
except FileNotFoundError:
last_sync = "1970-01-01T00:00:00+00:00"
try:
response = requests.get(
f"{server_url}/changes",
params={"since": last_sync},
timeout=10,
)
if response.status_code != 200:
print(f"Server returned {response.status_code} during pull.")
return
remote_changes = response.json()
cursor = conn.cursor()
for change in remote_changes:
if change["operation"] == "DELETE":
cursor.execute(
f"DELETE FROM {change['table']} WHERE id=?",
(change["record_id"],),
)
elif change["operation"] == "INSERT":
data = change["data"]
cursor.execute(
"INSERT OR REPLACE INTO contacts "
"(id, name, email, phone, updated_at) VALUES (?, ?, ?, ?, ?)",
(data["id"], data["name"], data["email"],
data["phone"], data["updated_at"]),
)
elif change["operation"] == "UPDATE":
data = change["data"]
cursor.execute(
"UPDATE contacts SET name=?, email=?, phone=?, updated_at=? "
"WHERE id=?",
(data["name"], data["email"], data["phone"],
data["updated_at"], data["id"]),
)
conn.commit()
# Save the new sync timestamp.
new_last_sync = remote_changes[-1]["timestamp"] if remote_changes else last_sync
with open(config_path, "w") as f:
json.dump({"last_sync": new_last_sync}, f)
print(f"Pulled {len(remote_changes)} changes from server.")
except requests.ConnectionError:
print("Cannot reach server for pull. Will retry later.")
If you're building a PyQt6 or PySide6 GUI application, you might want to store the last_sync timestamp using QSettings instead of a JSON file. QSettings provides a convenient, cross-platform way to persist small pieces of application configuration.
Handling Conflicts
Conflicts arise when two clients edit the same record while offline, then both try to sync. For example, Client A changes a contact's phone number while Client B changes the same contact's email. When both sync, the server sees two different versions of the same record.
There are several strategies for handling this:
Last write wins — The simplest approach. Whichever change has the later timestamp overwrites the other. This is easy to implement but can silently discard changes.
Field-level merging — Instead of replacing entire records, you compare which fields changed on each side. If Client A changed the phone and Client B changed the email, you can merge both changes without conflict. You only have a real conflict if both changed the same field.
Ask the user — When a conflict is detected, present both versions to the user and let them choose. This is the safest approach for important data.
For many applications — especially simpler ones or those with a single user across devices — "last write wins" is perfectly adequate. You can always add more sophisticated conflict handling later as your needs grow.
Running Sync in the Background
If your application has a GUI, you don't want the sync process to freeze the interface while it waits for network responses. Network calls can take several seconds (or time out entirely), and during that time your UI would be unresponsive.
The solution is to run your sync operations in a background thread. In PyQt6 or PySide6, you can use QThreadPool and QRunnable to handle this cleanly. Our multithreading with QThreadPool tutorial covers this pattern in detail.
Here's a sketch of how you might set up a periodic sync worker:
from PyQt6.QtCore import QRunnable, pyqtSlot, pyqtSignal, QObject
class SyncSignals(QObject):
finished = pyqtSignal()
error = pyqtSignal(str)
progress = pyqtSignal(str)
class SyncWorker(QRunnable):
def __init__(self, db_path, server_url):
super().__init__()
self.db_path = db_path
self.server_url = server_url
self.signals = SyncSignals()
@pyqtSlot()
def run(self):
try:
import sqlite3
conn = sqlite3.connect(self.db_path)
self.signals.progress.emit("Syncing...")
# Pull from server first, then push local changes.
# (Use the pull and sync functions from earlier.)
# pull_from_server(conn, self.server_url)
# sync_to_server(conn, self.server_url)
conn.close()
self.signals.progress.emit("Sync complete.")
self.signals.finished.emit()
except Exception as e:
self.signals.error.emit(str(e))
You would then submit this worker to a QThreadPool from your main window, perhaps triggered by a timer or a "Sync Now" button.
Putting It All Together
Here's a summary of the complete workflow:
- App starts → Create or open the local SQLite database.
- Try to connect → If online, pull the latest changes from the server and push any unsynced local changes.
- User works normally → All reads and writes go to the local database. Every write also creates an entry in the change log.
- Periodic sync → At regular intervals (or on demand), check for connectivity and sync in both directions.
- Handle conflicts → When a conflict is detected, resolve it using your chosen strategy.
The complete local database layer — with change logging and sync support — looks like this:
import sqlite3
import json
import requests
from datetime import datetime, timezone
# --- Database setup ---
def create_local_db(db_path="local_contacts.db"):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT,
phone TEXT,
updated_at TEXT NOT NULL
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS change_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT NOT NULL,
record_id INTEGER NOT NULL,
operation TEXT NOT NULL,
timestamp TEXT NOT NULL,
synced INTEGER DEFAULT 0
)
""")
conn.commit()
return conn
# --- Helpers ---
def now_iso():
return datetime.now(timezone.utc).isoformat()
# --- CRUD operations with change logging ---
def add_contact(conn, name, email, phone):
cursor = conn.cursor()
timestamp = now_iso()
cursor.execute(
"INSERT INTO contacts (name, email, phone, updated_at) VALUES (?, ?, ?, ?)",
(name, email, phone, timestamp),
)
record_id = cursor.lastrowid
cursor.execute(
"INSERT INTO change_log (table_name, record_id, operation, timestamp) "
"VALUES (?, ?, ?, ?)",
("contacts", record_id, "INSERT", timestamp),
)
conn.commit()
return record_id
def update_contact(conn, contact_id, name, email, phone):
cursor = conn.cursor()
timestamp = now_iso()
cursor.execute(
"UPDATE contacts SET name=?, email=?, phone=?, updated_at=? WHERE id=?",
(name, email, phone, timestamp, contact_id),
)
cursor.execute(
"INSERT INTO change_log (table_name, record_id, operation, timestamp) "
"VALUES (?, ?, ?, ?)",
("contacts", contact_id, "UPDATE", timestamp),
)
conn.commit()
def delete_contact(conn, contact_id):
cursor = conn.cursor()
timestamp = now_iso()
cursor.execute("DELETE FROM contacts WHERE id=?", (contact_id,))
cursor.execute(
"INSERT INTO change_log (table_name, record_id, operation, timestamp) "
"VALUES (?, ?, ?, ?)",
("contacts", contact_id, "DELETE", timestamp),
)
conn.commit()
def get_contact_by_id(conn, contact_id):
cursor = conn.cursor()
cursor.execute("SELECT * FROM contacts WHERE id=?", (contact_id,))
return cursor.fetchone()
def get_all_contacts(conn):
cursor = conn.cursor()
cursor.execute("SELECT * FROM contacts ORDER BY name")
return cursor.fetchall()
# --- Sync operations ---
def get_unsynced_changes(conn):
cursor = conn.cursor()
cursor.execute(
"SELECT id, table_name, record_id, operation, timestamp "
"FROM change_log WHERE synced = 0 ORDER BY timestamp"
)
return cursor.fetchall()
def mark_change_synced(conn, change_id):
cursor = conn.cursor()
cursor.execute("UPDATE change_log SET synced = 1 WHERE id = ?", (change_id,))
conn.commit()
def sync_to_server(conn, server_url):
"""Push unsynced local changes to the remote server."""
changes = get_unsynced_changes(conn)
if not changes:
print("Nothing to push.")
return True
for change in changes:
change_id, table_name, record_id, operation, timestamp = change
payload = {
"table": table_name,
"record_id": record_id,
"operation": operation,
"timestamp": timestamp,
}
if operation in ("INSERT", "UPDATE"):
record = get_contact_by_id(conn, record_id)
if record:
payload["data"] = {
"id": record[0],
"name": record[1],
"email": record[2],
"phone": record[3],
"updated_at": record[4],
}
try:
response = requests.post(
f"{server_url}/sync",
json=payload,
timeout=10,
)
if response.status_code == 200:
mark_change_synced(conn, change_id)
print(f"Synced: {operation} on record {record_id}")
else:
print(f"Server error {response.status_code}. Will retry later.")
return False
except requests.ConnectionError:
print("Offline. Will retry when connected.")
return False
return True
def pull_from_server(conn, server_url, config_path="sync_config.json"):
"""Pull remote changes into the local database."""
try:
with open(config_path) as f:
config = json.load(f)
last_sync = config.get("last_sync", "1970-01-01T00:00:00+00:00")
except FileNotFoundError:
last_sync = "1970-01-01T00:00:00+00:00"
try:
response = requests.get(
f"{server_url}/changes",
params={"since": last_sync},
timeout=10,
)
if response.status_code != 200:
print(f"Server returned {response.status_code} during pull.")
return False
remote_changes = response.json()
cursor = conn.cursor()
for change in remote_changes:
if change["operation"] == "DELETE":
cursor.execute(
"DELETE FROM contacts WHERE id=?",
(change["record_id"],),
)
elif change["operation"] in ("INSERT", "UPDATE"):
data = change["data"]
cursor.execute(
"INSERT OR REPLACE INTO contacts "
"(id, name, email, phone, updated_at) VALUES (?, ?, ?, ?, ?)",
(data["id"], data["name"], data["email"],
data["phone"], data["updated_at"]),
)
conn.commit()
if remote_changes:
new_last_sync = remote_changes[-1]["timestamp"]
with open(config_path, "w") as f:
json.dump({"last_sync": new_last_sync}, f)
print(f"Pulled {len(remote_changes)} changes from server.")
return True
except requests.ConnectionError:
print("Offline. Cannot pull from server.")
return False
def full_sync(conn, server_url):
"""Perform a complete sync: pull first, then push."""
print("--- Starting sync ---")
pull_from_server(conn, server_url)
sync_to_server(conn, server_url)
print("--- Sync complete ---")
# --- Demo (local only, no server needed) ---
if __name__ == "__main__":
conn = create_local_db()
# Add some contacts.
id1 = add_contact(conn, "Alice Smith", "alice@example.com", "555-1234")
id2 = add_contact(conn, "Bob Jones", "bob@example.com", "555-5678")
print(f"Added contacts: {id1}, {id2}")
# Update one.
update_contact(conn, id1, "Alice J. Smith", "alice@example.com", "555-1234")
print(f"Updated contact {id1}")
# Show all contacts.
print("\nAll contacts:")
for contact in get_all_contacts(conn):
print(f" {contact}")
# Show pending changes.
print("\nUnsynced changes:")
for change in get_unsynced_changes(conn):
print(f" {change}")
# Attempt sync (will fail without a real server, which is expected).
print()
full_sync(conn, "https://yourserver.example.com")
conn.close()
Running this will create the local database, add and modify contacts, and show you the pending change log. The sync attempt will gracefully handle the missing server and leave your changes queued for the next attempt.
From here, you'd build out the server side (a Flask or FastAPI application that receives and serves change operations) and wire up the sync to run periodically in your application. If you're building a GUI with PyQt6 or PySide6, run the sync in a background thread using QThreadPool so your interface stays responsive.
Create GUI Applications with Python & Qt6 by Martin Fitzpatrick — (PySide6 Edition) The hands-on guide to making apps with Python — Save time and build better with this book. Over 15K copies sold.