Skip to content

Python SQLite3 Tutorial: Vollständiger Leitfaden zur SQLite-Datenbank in Python

Updated on

Du musst strukturierte Daten in deiner Python-Anwendung speichern. Vielleicht sind es Benutzereinstellungen, Sensormesswerte, Log-Einträge oder eine Aufgabenliste. Du greifst zu einer CSV-Datei, aber die Datenmenge wächst und Abfragen werden mühsam. Du ziehst PostgreSQL oder MySQL in Betracht, aber einen Datenbankserver für ein lokales Tool oder einen Prototypen aufzusetzen, wirkt wie Overkill. Du brauchst einfach eine zuverlässige Möglichkeit, strukturierte Daten ohne großen Aufwand zu speichern, abzufragen und zu aktualisieren.

Genau dieses Problem löst SQLite. SQLite ist eine in sich geschlossene, serverlose Datenbank-Engine, die alles in einer einzigen Datei speichert. Python bringt das sqlite3-Modul in der Standardbibliothek bereits mit, du musst also nichts installieren. Du erhältst vollständige SQL-Unterstützung, ACID-Transaktionen und die Fähigkeit, Datensätze bis zu 281 Terabyte zu verarbeiten – und das alles ohne einen Serverprozess zu starten oder Konfigurationsdateien zu verwalten.

Dieser Leitfaden führt dich durch alles, was du brauchst, um SQLite in Python effektiv zu nutzen: vom Erstellen deiner ersten Datenbank über fortgeschrittene Abfragen, pandas-Integration, Performance-Tuning bis hin zu einem vollständigen Praxisprojekt.

📚

Was ist SQLite und warum sollte man es verwenden?

SQLite ist eine eingebettete relationale Datenbank-Engine. Im Gegensatz zu PostgreSQL oder MySQL läuft sie nicht als separater Serverprozess. Die gesamte Datenbank liegt in einer einzigen Datei auf der Festplatte (oder im Speicher). Das sqlite3-Modul in Pythons Standardbibliothek stellt ein DB-API-2.0-konformes Interface zu SQLite bereit.

Wichtige Eigenschaften von SQLite:

  • Keine Konfiguration: Kein Server-Setup, keine Benutzer, keine Berechtigungen zu verwalten
  • Serverlos: Die Datenbank-Engine läuft im Prozess deiner Anwendung
  • Einzeldatei: Die gesamte Datenbank (Tabellen, Indizes, Daten) ist eine .db-Datei
  • Plattformübergreifend: Die Datenbankdatei funktioniert auf jedem OS ohne Konvertierung
  • ACID-konform: Vollständige Transaktionsunterstützung mit Commit und Rollback
  • Leichtgewichtig: Die Bibliothek ist ca. 750 KB groß – kleiner als die meisten Bilder

Wann man SQLite verwenden sollte

Use CaseSQLitePostgreSQL/MySQL
Desktop-/Mobile-AppsBeste WahlOverkill
PrototypingBeste WahlLangsamer einzurichten
Embedded-Geräte / IoTBeste WahlNicht praktikabel
Unit-TestingBeste WahlErfordert Testserver
Data-Analysis-SkripteGute WahlUnnötiger Overhead
Web-App mit < 100K täglichen BesuchenFunktioniert gutSkalierbarer
Web-App mit hoher ParallelitätNicht idealBeste Wahl
Mehrere schreibintensive ClientsNicht idealBeste Wahl

SQLite bewältigt die meisten Anwendungen, die keine mehreren gleichzeitigen Schreibverbindungen benötigen. Android und iOS verwenden SQLite als Standard für lokale Datenbanken. Firefox, Chrome und macOS nutzen es intern. Es ist die weltweit am häufigsten ausgerollte Datenbank-Engine.

Verbindung zu einer Datenbank herstellen

Die Funktion sqlite3.connect() erstellt eine Verbindung zu einer Datenbankdatei. Existiert die Datei nicht, erstellt SQLite sie automatisch.

import sqlite3
 
# Connect to a database file (creates it if it doesn't exist)
conn = sqlite3.connect("myapp.db")
 
# Create a cursor to execute SQL statements
cursor = conn.cursor()
 
# Always close the connection when done
conn.close()

In-Memory-Datenbanken

Verwende die spezielle Zeichenkette :memory:, um eine Datenbank zu erstellen, die nur im RAM existiert. Das ist ideal für Tests und temporäre Datenverarbeitung.

import sqlite3
 
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
 
cursor.execute("CREATE TABLE test (id INTEGER, value TEXT)")
cursor.execute("INSERT INTO test VALUES (1, 'hello')")
cursor.execute("SELECT * FROM test")
print(cursor.fetchone())  # (1, 'hello')
 
conn.close()  # Database is gone after this

Context Manager verwenden

Das empfohlene Muster nutzt einen Context Manager (with-Statement), damit die Verbindung auch bei Fehlern korrekt beendet wird. SQLite-Verbindungen führen innerhalb eines with-Blocks bei Erfolg automatisch ein Commit und bei Exceptions automatisch ein Rollback aus.

import sqlite3
 
# Connection as context manager handles commit/rollback
with sqlite3.connect("myapp.db") as conn:
    cursor = conn.cursor()
    cursor.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)")
    cursor.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
    # Auto-commits if no exception
    # Auto-rolls back if an exception occurs
 
# Note: the connection is NOT closed by 'with' -- it just commits/rolls back
# For full cleanup, close explicitly or use a wrapper:
conn.close()

Eine saubere Helper-Funktion, die sowohl Commit/Rollback als auch das Schließen übernimmt:

import sqlite3
from contextlib import contextmanager
 
@contextmanager
def get_db(db_path="myapp.db"):
    """Context manager that commits on success, rolls back on error, and always closes."""
    conn = sqlite3.connect(db_path)
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()
 
# Usage
with get_db() as conn:
    conn.execute("INSERT INTO users (name) VALUES (?)", ("Bob",))

Tabellen erstellen

Verwende CREATE TABLE, um das Schema zu definieren. SQLite unterstützt ein vereinfachtes Typsystem mit fünf Storage Classes.

SQLite-Datentypen

Storage ClassBeschreibungPython-Äquivalent
NULLNull-WertNone
INTEGERSigned Integer (1, 2, 3, 4, 6 oder 8 Bytes)int
REALFließkommazahl (8-Byte IEEE float)float
TEXTUTF-8- oder UTF-16-Stringstr
BLOBBinärdaten, exakt wie eingegeben gespeichertbytes

SQLite verwendet dynamische Typisierung. Du kannst jeden Typ in jeder Spalte speichern – unabhängig vom deklarierten Typ. Der deklarierte Typ ist ein Hinweis, keine harte Einschränkung. Das unterscheidet sich von PostgreSQL und MySQL.

Tabelle mit Constraints erstellen

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
cursor.execute("""
    CREATE TABLE IF NOT EXISTS employees (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        email TEXT UNIQUE NOT NULL,
        department TEXT DEFAULT 'General',
        salary REAL CHECK(salary > 0),
        hire_date TEXT NOT NULL,
        is_active INTEGER DEFAULT 1
    )
""")
 
conn.commit()
conn.close()

Wichtige Constraints:

  • PRIMARY KEY: Eindeutiger Identifikator pro Zeile. INTEGER PRIMARY KEY ist ein Alias für die interne rowid.
  • AUTOINCREMENT: Stellt sicher, dass die rowid immer steigt (gelöschte IDs werden nicht wiederverwendet).
  • NOT NULL: Spalte darf keine NULL-Werte enthalten.
  • UNIQUE: Keine zwei Zeilen dürfen in dieser Spalte denselben Wert haben.
  • DEFAULT: Standardwert, falls beim INSERT keiner angegeben wird.
  • CHECK: Validiert Daten gegen eine Bedingung.

Mehrere verwandte Tabellen erstellen

import sqlite3
 
conn = sqlite3.connect("school.db")
cursor = conn.cursor()
 
# Enable foreign key enforcement (off by default in SQLite)
cursor.execute("PRAGMA foreign_keys = ON")
 
cursor.execute("""
    CREATE TABLE IF NOT EXISTS students (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        email TEXT UNIQUE
    )
""")
 
cursor.execute("""
    CREATE TABLE IF NOT EXISTS courses (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        title TEXT NOT NULL,
        credits INTEGER DEFAULT 3
    )
""")
 
cursor.execute("""
    CREATE TABLE IF NOT EXISTS enrollments (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        student_id INTEGER NOT NULL,
        course_id INTEGER NOT NULL,
        grade TEXT,
        enrolled_date TEXT DEFAULT CURRENT_DATE,
        FOREIGN KEY (student_id) REFERENCES students(id) ON DELETE CASCADE,
        FOREIGN KEY (course_id) REFERENCES courses(id) ON DELETE CASCADE,
        UNIQUE(student_id, course_id)
    )
""")
 
conn.commit()
conn.close()

CRUD-Operationen

CRUD steht für Create, Read, Update, Delete – die vier Grundoperationen für persistente Speicherung.

INSERT: Daten hinzufügen

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Insert a single row
cursor.execute(
    "INSERT INTO employees (name, email, department, salary, hire_date) VALUES (?, ?, ?, ?, ?)",
    ("Alice Johnson", "alice@company.com", "Engineering", 95000, "2024-01-15")
)
 
# Insert multiple rows with executemany
employees = [
    ("Bob Smith", "bob@company.com", "Marketing", 72000, "2024-03-01"),
    ("Carol White", "carol@company.com", "Engineering", 98000, "2023-11-20"),
    ("David Brown", "david@company.com", "Sales", 68000, "2024-06-10"),
    ("Eve Davis", "eve@company.com", "Engineering", 105000, "2023-08-05"),
]
 
cursor.executemany(
    "INSERT INTO employees (name, email, department, salary, hire_date) VALUES (?, ?, ?, ?, ?)",
    employees
)
 
# Get the last inserted row ID
print(f"Last inserted ID: {cursor.lastrowid}")
 
# Get the number of rows affected
print(f"Rows inserted: {cursor.rowcount}")
 
conn.commit()
conn.close()

INSERT OR REPLACE (Upsert)

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Insert or update if email already exists
cursor.execute("""
    INSERT INTO employees (name, email, department, salary, hire_date)
    VALUES (?, ?, ?, ?, ?)
    ON CONFLICT(email) DO UPDATE SET
        name = excluded.name,
        salary = excluded.salary
""", ("Alice Johnson", "alice@company.com", "Engineering", 100000, "2024-01-15"))
 
conn.commit()
conn.close()

SELECT: Daten lesen

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Select all rows
cursor.execute("SELECT * FROM employees")
all_rows = cursor.fetchall()
for row in all_rows:
    print(row)  # Returns tuples by default
 
# Select specific columns with a condition
cursor.execute(
    "SELECT name, salary FROM employees WHERE department = ?",
    ("Engineering",)
)
engineers = cursor.fetchall()
for name, salary in engineers:
    print(f"{name}: ${salary:,.0f}")
 
# Select with ordering and limit
cursor.execute("""
    SELECT name, salary FROM employees
    ORDER BY salary DESC
    LIMIT 3
""")
top_earners = cursor.fetchall()
print("Top 3 earners:", top_earners)
 
conn.close()

Ergebnisse als Dictionaries erhalten

Standardmäßig liefert fetchall() Tupel. Für Dictionaries (Spaltenname als Key) nutze sqlite3.Row:

import sqlite3
 
conn = sqlite3.connect("myapp.db")
conn.row_factory = sqlite3.Row  # Set row factory
 
cursor = conn.cursor()
cursor.execute("SELECT * FROM employees WHERE department = ?", ("Engineering",))
 
for row in cursor.fetchall():
    print(dict(row))  # Convert Row to dict
    print(row["name"], row["salary"])  # Access by column name
 
conn.close()

UPDATE: Daten ändern

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Update a single field
cursor.execute(
    "UPDATE employees SET salary = ? WHERE email = ?",
    (110000, "alice@company.com")
)
print(f"Rows updated: {cursor.rowcount}")
 
# Update with a calculation
cursor.execute("""
    UPDATE employees
    SET salary = salary * 1.10
    WHERE department = 'Engineering' AND salary < 100000
""")
print(f"Engineers with raises: {cursor.rowcount}")
 
# Conditional update
cursor.execute("""
    UPDATE employees
    SET is_active = 0
    WHERE hire_date < '2023-01-01'
""")
 
conn.commit()
conn.close()

DELETE: Daten entfernen

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Delete specific rows
cursor.execute("DELETE FROM employees WHERE is_active = 0")
print(f"Rows deleted: {cursor.rowcount}")
 
# Delete with a subquery
cursor.execute("""
    DELETE FROM enrollments
    WHERE student_id NOT IN (SELECT id FROM students)
""")
 
# Delete all rows (truncate equivalent)
cursor.execute("DELETE FROM employees")
 
conn.commit()
conn.close()

Parametrisierte Abfragen und Schutz vor SQL Injection

Baue SQL-Abfragen niemals per String-Formatierung oder f-strings zusammen. Verwende immer parametrisierte Abfragen, um SQL-Injection-Angriffe zu verhindern.

Das Problem: SQL Injection

import sqlite3
 
# DANGEROUS: Never do this
user_input = "'; DROP TABLE employees; --"
query = f"SELECT * FROM employees WHERE name = '{user_input}'"
# This would execute: SELECT * FROM employees WHERE name = ''; DROP TABLE employees; --'

Die Lösung: Parametrisierte Abfragen

SQLite3 unterstützt zwei Placeholder-Stile:

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Style 1: Question mark placeholders (positional)
cursor.execute(
    "SELECT * FROM employees WHERE department = ? AND salary > ?",
    ("Engineering", 90000)
)
 
# Style 2: Named placeholders
cursor.execute(
    "SELECT * FROM employees WHERE department = :dept AND salary > :min_salary",
    {"dept": "Engineering", "min_salary": 90000}
)
 
# Named placeholders are clearer with many parameters
cursor.execute("""
    INSERT INTO employees (name, email, department, salary, hire_date)
    VALUES (:name, :email, :dept, :salary, :date)
""", {
    "name": "Frank Green",
    "email": "frank@company.com",
    "dept": "Engineering",
    "salary": 92000,
    "date": "2025-09-01"
})
 
conn.commit()
conn.close()

Parametrisierte Abfragen übernehmen Escaping und Quoting automatisch. Die Datenbank-Engine behandelt Parameterwerte als Daten – niemals als SQL-Code. Damit ist SQL Injection unabhängig vom Input ausgeschlossen.

Dynamische WHERE-Klauseln

Manchmal musst du Abfragen dynamisch bauen, z. B. wenn Filter optional sind. Hier ist ein sicheres Muster:

import sqlite3
 
def search_employees(department=None, min_salary=None, is_active=None):
    conn = sqlite3.connect("myapp.db")
    cursor = conn.cursor()
 
    conditions = []
    params = []
 
    if department is not None:
        conditions.append("department = ?")
        params.append(department)
 
    if min_salary is not None:
        conditions.append("salary >= ?")
        params.append(min_salary)
 
    if is_active is not None:
        conditions.append("is_active = ?")
        params.append(is_active)
 
    query = "SELECT * FROM employees"
    if conditions:
        query += " WHERE " + " AND ".join(conditions)
 
    cursor.execute(query, params)
    results = cursor.fetchall()
    conn.close()
    return results
 
# Usage
engineers = search_employees(department="Engineering", min_salary=90000)
active_staff = search_employees(is_active=1)
everyone = search_employees()  # No filters

Mit Transaktionen arbeiten

Eine Transaktion fasst mehrere SQL-Operationen zu einer atomaren Einheit zusammen. Entweder alle Operationen gelingen (commit) oder keine wird wirksam (rollback).

Manuelle Transaktionskontrolle

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
try:
    # Transfer money between accounts
    cursor.execute("UPDATE accounts SET balance = balance - 500 WHERE id = 1")
    cursor.execute("UPDATE accounts SET balance = balance + 500 WHERE id = 2")
 
    # Verify no negative balance
    cursor.execute("SELECT balance FROM accounts WHERE id = 1")
    balance = cursor.fetchone()[0]
    if balance < 0:
        raise ValueError("Insufficient funds")
 
    conn.commit()  # All good, save changes
    print("Transfer complete")
 
except Exception as e:
    conn.rollback()  # Something went wrong, undo everything
    print(f"Transfer failed: {e}")
 
finally:
    conn.close()

Autocommit und Isolation Levels

Standardmäßig arbeitet sqlite3 im Modus „deferred transaction“. Du kannst dieses Verhalten ändern:

import sqlite3
 
# Default behavior: transactions are deferred
conn = sqlite3.connect("myapp.db")
 
# Autocommit mode (each statement is its own transaction)
conn = sqlite3.connect("myapp.db", isolation_level=None)
 
# Immediate locking (prevents concurrent write conflicts)
conn = sqlite3.connect("myapp.db")
conn.execute("BEGIN IMMEDIATE")
# ... your operations ...
conn.commit()

Savepoints für partielle Rollbacks

Savepoints erlauben es, einen Teil einer Transaktion zurückzurollen, ohne alles rückgängig zu machen:

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
try:
    cursor.execute("INSERT INTO orders (customer_id, total) VALUES (1, 100)")
 
    # Start a savepoint
    cursor.execute("SAVEPOINT order_items")
 
    try:
        cursor.execute("INSERT INTO order_items (order_id, product, qty) VALUES (1, 'Widget', 5)")
        cursor.execute("INSERT INTO order_items (order_id, product, qty) VALUES (1, 'Gadget', -1)")
        # This might fail due to CHECK constraint
        cursor.execute("RELEASE SAVEPOINT order_items")
    except sqlite3.IntegrityError:
        cursor.execute("ROLLBACK TO SAVEPOINT order_items")
        print("Some items failed, but order was still created")
 
    conn.commit()
except Exception:
    conn.rollback()
finally:
    conn.close()

Daten abrufen: fetchone, fetchall, fetchmany

Das Cursor-Objekt stellt drei Methoden bereit, um Query-Ergebnisse abzurufen.

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
cursor.execute("SELECT * FROM employees ORDER BY salary DESC")
 
# fetchone: Get the next single row (or None if no more rows)
top_earner = cursor.fetchone()
print(f"Top earner: {top_earner}")
 
# fetchmany: Get the next N rows
next_three = cursor.fetchmany(3)
print(f"Next 3: {next_three}")
 
# fetchall: Get all remaining rows
remaining = cursor.fetchall()
print(f"Remaining: {len(remaining)} rows")
 
conn.close()

Ergebnisse effizient iterieren

Bei großen Result Sets iteriere direkt über den Cursor, statt alles in den Speicher zu laden:

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
cursor.execute("SELECT name, salary FROM employees")
 
# Memory-efficient: processes one row at a time
for name, salary in cursor:
    print(f"{name}: ${salary:,.0f}")
 
conn.close()

fetchmany mit Batch-Verarbeitung

Wenn du Millionen Zeilen verarbeitest, gibt dir fetchmany() in einer Schleife Kontrolle über den Speicherverbrauch:

import sqlite3
 
conn = sqlite3.connect("large_data.db")
cursor = conn.cursor()
 
cursor.execute("SELECT * FROM sensor_readings")
 
batch_size = 1000
while True:
    rows = cursor.fetchmany(batch_size)
    if not rows:
        break
    # Process batch
    for row in rows:
        pass  # your processing logic
 
conn.close()

Fortgeschrittene Abfragen

JOINs

import sqlite3
 
conn = sqlite3.connect("school.db")
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
 
# INNER JOIN: Students with their enrolled courses
cursor.execute("""
    SELECT s.name AS student, c.title AS course, e.grade
    FROM enrollments e
    INNER JOIN students s ON e.student_id = s.id
    INNER JOIN courses c ON e.course_id = c.id
    ORDER BY s.name, c.title
""")
 
for row in cursor.fetchall():
    print(f"{row['student']} - {row['course']}: {row['grade'] or 'In Progress'}")
 
# LEFT JOIN: All students, even those not enrolled
cursor.execute("""
    SELECT s.name, COUNT(e.id) AS course_count
    FROM students s
    LEFT JOIN enrollments e ON s.id = e.student_id
    GROUP BY s.id
    ORDER BY course_count DESC
""")
 
for row in cursor.fetchall():
    print(f"{row['name']}: {row['course_count']} courses")
 
conn.close()

Aggregation

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Department statistics
cursor.execute("""
    SELECT
        department,
        COUNT(*) AS headcount,
        ROUND(AVG(salary), 2) AS avg_salary,
        MIN(salary) AS min_salary,
        MAX(salary) AS max_salary,
        SUM(salary) AS total_payroll
    FROM employees
    WHERE is_active = 1
    GROUP BY department
    HAVING COUNT(*) >= 2
    ORDER BY avg_salary DESC
""")
 
print(f"{'Department':<15} {'Count':<8} {'Avg Salary':<12} {'Min':<10} {'Max':<10}")
print("-" * 55)
for row in cursor.fetchall():
    dept, count, avg_sal, min_sal, max_sal, total = row
    print(f"{dept:<15} {count:<8} ${avg_sal:<11,.0f} ${min_sal:<9,.0f} ${max_sal:<9,.0f}")
 
conn.close()

Subqueries

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Employees earning above their department average
cursor.execute("""
    SELECT name, department, salary
    FROM employees e
    WHERE salary > (
        SELECT AVG(salary) FROM employees
        WHERE department = e.department
    )
    ORDER BY department, salary DESC
""")
 
for name, dept, salary in cursor.fetchall():
    print(f"{name} ({dept}): ${salary:,.0f}")
 
# Most recent hire in each department
cursor.execute("""
    SELECT name, department, hire_date
    FROM employees
    WHERE (department, hire_date) IN (
        SELECT department, MAX(hire_date)
        FROM employees
        GROUP BY department
    )
""")
 
for row in cursor.fetchall():
    print(row)
 
conn.close()

Window Functions (SQLite 3.25+)

SQLite unterstützt Window Functions für laufende Summen, Rankings und weitere analytische Abfragen:

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Rank employees by salary within each department
cursor.execute("""
    SELECT
        name,
        department,
        salary,
        RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS dept_rank,
        RANK() OVER (ORDER BY salary DESC) AS overall_rank
    FROM employees
    WHERE is_active = 1
""")
 
for name, dept, salary, dept_rank, overall_rank in cursor.fetchall():
    print(f"#{overall_rank} overall, #{dept_rank} in {dept}: {name} (${salary:,.0f})")
 
# Running total of payroll by hire date
cursor.execute("""
    SELECT
        name,
        hire_date,
        salary,
        SUM(salary) OVER (ORDER BY hire_date) AS running_total
    FROM employees
    ORDER BY hire_date
""")
 
for name, date, salary, running in cursor.fetchall():
    print(f"{date} | {name}: ${salary:,.0f} | Running total: ${running:,.0f}")
 
conn.close()

SQLite mit pandas

pandas hat integrierte SQLite-Unterstützung über read_sql() und to_sql(). So kannst du Daten einfach zwischen DataFrames und SQLite-Datenbanken bewegen.

SQL-Ergebnisse in einen DataFrame einlesen

import sqlite3
import pandas as pd
 
conn = sqlite3.connect("myapp.db")
 
# Read a SQL query into a DataFrame
df = pd.read_sql("SELECT * FROM employees WHERE is_active = 1", conn)
print(df.head())
print(df.describe())
 
# Parameterized query with pandas
df = pd.read_sql(
    "SELECT * FROM employees WHERE department = ? AND salary > ?",
    conn,
    params=("Engineering", 90000)
)
 
# Read an entire table
df = pd.read_sql("SELECT * FROM employees", conn, index_col="id")
 
conn.close()

Einen DataFrame nach SQLite schreiben

import sqlite3
import pandas as pd
 
# Create a sample DataFrame
df = pd.DataFrame({
    "product": ["Laptop", "Phone", "Tablet", "Monitor", "Keyboard"],
    "price": [999.99, 699.99, 449.99, 349.99, 79.99],
    "stock": [50, 200, 75, 30, 500],
    "category": ["Electronics", "Electronics", "Electronics", "Peripherals", "Peripherals"]
})
 
conn = sqlite3.connect("store.db")
 
# Write DataFrame to a new table
df.to_sql("products", conn, if_exists="replace", index=False)
 
# Append to an existing table
new_products = pd.DataFrame({
    "product": ["Mouse", "Webcam"],
    "price": [49.99, 89.99],
    "stock": [300, 100],
    "category": ["Peripherals", "Peripherals"]
})
new_products.to_sql("products", conn, if_exists="append", index=False)
 
# Verify
result = pd.read_sql("SELECT * FROM products", conn)
print(result)
 
conn.close()

Round-Trip-Workflow: CSV → SQLite → Analyse

import sqlite3
import pandas as pd
 
# Step 1: Load CSV into SQLite
df = pd.read_csv("sales_data.csv")
conn = sqlite3.connect("analytics.db")
df.to_sql("sales", conn, if_exists="replace", index=False)
 
# Step 2: Run SQL analytics
summary = pd.read_sql("""
    SELECT
        region,
        strftime('%Y-%m', sale_date) AS month,
        COUNT(*) AS transactions,
        SUM(amount) AS revenue,
        AVG(amount) AS avg_transaction
    FROM sales
    GROUP BY region, month
    ORDER BY month, revenue DESC
""", conn)
 
print(summary)
 
# Step 3: Export results
summary.to_csv("monthly_summary.csv", index=False)
 
conn.close()

Sobald du deine Query-Ergebnisse in einem DataFrame hast, kannst du sie direkt mit PyGWalker (opens in a new tab) visualisieren. PyGWalker verwandelt jeden pandas DataFrame in eine interaktive Visualisierungsoberfläche ähnlich wie Tableau, sodass du per Drag-and-drop Charts erstellen kannst, ohne Plotting-Code zu schreiben:

import pygwalker as pyg
 
# Turn your SQL query results into an interactive visualization
walker = pyg.walk(summary)

Für einen noch reibungsloseren Workflow bietet RunCell (opens in a new tab) eine KI-gestützte Jupyter-Umgebung, in der du SQL-Abfragen schreiben, KI-unterstütztes Debugging bekommen und Ergebnisse sofort visualisieren kannst – alles in einem Notebook.

Fehlerbehandlung

Das sqlite3-Modul definiert mehrere Exception-Typen. Spezifische Exceptions zu fangen führt zu besseren Fehlermeldungen und besseren Recovery-Strategien.

import sqlite3
 
def safe_insert(db_path, name, email, salary):
    """Insert an employee with proper error handling."""
    try:
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        cursor.execute(
            "INSERT INTO employees (name, email, salary, hire_date) VALUES (?, ?, ?, date('now'))",
            (name, email, salary)
        )
        conn.commit()
        return cursor.lastrowid
 
    except sqlite3.IntegrityError as e:
        # Unique constraint violated (duplicate email, NOT NULL violated, etc.)
        print(f"Data integrity error: {e}")
        return None
 
    except sqlite3.OperationalError as e:
        # Table doesn't exist, SQL syntax error, database locked, etc.
        print(f"Operational error: {e}")
        return None
 
    except sqlite3.DatabaseError as e:
        # Corrupted database, disk full, etc.
        print(f"Database error: {e}")
        return None
 
    except sqlite3.ProgrammingError as e:
        # Using a closed cursor, wrong number of parameters, etc.
        print(f"Programming error: {e}")
        return None
 
    finally:
        if conn:
            conn.close()

SQLite3-Exception-Hierarchie

ExceptionParentHäufige Ursachen
sqlite3.WarningExceptionNicht-fatale Probleme
sqlite3.ErrorExceptionBasisklasse für alle sqlite3-Fehler
sqlite3.InterfaceErrorErrorFalsche Nutzung des DB-API-Interfaces
sqlite3.DatabaseErrorErrorDatenbankbezogene Fehler
sqlite3.DataErrorDatabaseErrorProbleme bei der Datenverarbeitung
sqlite3.OperationalErrorDatabaseErrorDB gesperrt, Tabelle fehlt, SQL-Fehler
sqlite3.IntegrityErrorDatabaseErrorUNIQUE-, NOT NULL-, FOREIGN KEY-Verletzungen
sqlite3.InternalErrorDatabaseErrorInterner sqlite3-Modulfehler
sqlite3.ProgrammingErrorDatabaseErrorFalsche API-Nutzung
sqlite3.NotSupportedErrorDatabaseErrorNicht unterstütztes Feature

Performance-Tipps

WAL-Modus aktivieren

Write-Ahead Logging (WAL) erlaubt gleichzeitige Reads während Writes und verbessert die Schreibperformance:

import sqlite3
 
conn = sqlite3.connect("myapp.db")
conn.execute("PRAGMA journal_mode=WAL")
# Returns 'wal' if successful

WAL ist besonders nützlich, wenn ein Reader (z. B. ein Webserver) und ein Writer (z. B. ein Background-Job) auf dieselbe Datenbank zugreifen.

executemany für Bulk Inserts nutzen

executemany ist deutlich schneller als execute in einer Schleife aufzurufen, weil es die Round-Trips zwischen Python und SQLite reduziert:

import sqlite3
import time
 
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE numbers (value INTEGER)")
 
data = [(i,) for i in range(100_000)]
 
# Slow: individual inserts
start = time.perf_counter()
for d in data:
    conn.execute("INSERT INTO numbers VALUES (?)", d)
conn.commit()
slow_time = time.perf_counter() - start
 
conn.execute("DELETE FROM numbers")
 
# Fast: executemany
start = time.perf_counter()
conn.executemany("INSERT INTO numbers VALUES (?)", data)
conn.commit()
fast_time = time.perf_counter() - start
 
print(f"Individual inserts: {slow_time:.2f}s")
print(f"executemany:        {fast_time:.2f}s")
print(f"Speedup:            {slow_time / fast_time:.1f}x")
 
conn.close()

Indizes für häufig abgefragte Spalten erstellen

Indizes beschleunigen SELECT-Abfragen stark – auf Kosten etwas langsamerer Writes und mehr Speicherplatz:

import sqlite3
 
conn = sqlite3.connect("myapp.db")
 
# Single column index
conn.execute("CREATE INDEX IF NOT EXISTS idx_employees_department ON employees(department)")
 
# Composite index (for queries that filter on both columns)
conn.execute("CREATE INDEX IF NOT EXISTS idx_employees_dept_salary ON employees(department, salary)")
 
# Unique index (also enforces uniqueness)
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_employees_email ON employees(email)")
 
# Check existing indexes
cursor = conn.execute("SELECT name, sql FROM sqlite_master WHERE type='index'")
for name, sql in cursor.fetchall():
    print(f"{name}: {sql}")
 
conn.close()

EXPLAIN QUERY PLAN zur Performanceprüfung nutzen

import sqlite3
 
conn = sqlite3.connect("myapp.db")
 
# Check how SQLite executes a query
result = conn.execute("""
    EXPLAIN QUERY PLAN
    SELECT * FROM employees WHERE department = 'Engineering' AND salary > 90000
""").fetchall()
 
for row in result:
    print(row)
# Output shows whether an index is used or a full table scan occurs
 
conn.close()

Zusätzliche PRAGMAs für Performance

import sqlite3
 
conn = sqlite3.connect("myapp.db")
 
# Increase cache size (default is 2000 pages = ~8MB)
conn.execute("PRAGMA cache_size = -64000")  # 64MB (negative = KB)
 
# Reduce disk syncs for speed (less durable but much faster)
conn.execute("PRAGMA synchronous = NORMAL")  # Options: OFF, NORMAL, FULL (default)
 
# Store temp tables in memory
conn.execute("PRAGMA temp_store = MEMORY")
 
# Check database integrity
result = conn.execute("PRAGMA integrity_check").fetchone()
print(f"Integrity: {result[0]}")  # Should print 'ok'
 
conn.close()

Praxisbeispiel: Task-Manager-CLI

Hier ist eine vollständige Task-Manager-Anwendung, die alle in diesem Guide behandelten Konzepte demonstriert. Sie nutzt SQLite für persistente Speicherung und unterstützt Prioritäten, Tags und Fälligkeitsdaten.

import sqlite3
import sys
from datetime import datetime, date
from contextlib import contextmanager
 
DB_PATH = "tasks.db"
 
@contextmanager
def get_db():
    """Get a database connection with Row factory and foreign keys enabled."""
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    conn.execute("PRAGMA foreign_keys = ON")
    conn.execute("PRAGMA journal_mode = WAL")
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()
 
def init_db():
    """Create tables if they don't exist."""
    with get_db() as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS tasks (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                title TEXT NOT NULL,
                description TEXT DEFAULT '',
                priority INTEGER DEFAULT 2 CHECK(priority BETWEEN 1 AND 4),
                status TEXT DEFAULT 'todo' CHECK(status IN ('todo', 'in_progress', 'done')),
                due_date TEXT,
                created_at TEXT DEFAULT CURRENT_TIMESTAMP,
                completed_at TEXT
            )
        """)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS tags (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT UNIQUE NOT NULL
            )
        """)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS task_tags (
                task_id INTEGER NOT NULL,
                tag_id INTEGER NOT NULL,
                PRIMARY KEY (task_id, tag_id),
                FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE,
                FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
            )
        """)
        conn.execute("CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)")
        conn.execute("CREATE INDEX IF NOT EXISTS idx_tasks_priority ON tasks(priority)")
        conn.execute("CREATE INDEX IF NOT EXISTS idx_tasks_due ON tasks(due_date)")
 
def add_task(title, description="", priority=2, due_date=None, tags=None):
    """Add a new task with optional tags."""
    with get_db() as conn:
        cursor = conn.execute(
            "INSERT INTO tasks (title, description, priority, due_date) VALUES (?, ?, ?, ?)",
            (title, description, priority, due_date)
        )
        task_id = cursor.lastrowid
 
        if tags:
            for tag_name in tags:
                conn.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag_name,))
                tag_id = conn.execute("SELECT id FROM tags WHERE name = ?", (tag_name,)).fetchone()["id"]
                conn.execute("INSERT INTO task_tags (task_id, tag_id) VALUES (?, ?)", (task_id, tag_id))
 
        print(f"Task #{task_id} created: {title}")
        return task_id
 
def list_tasks(status=None, priority=None, tag=None):
    """List tasks with optional filters."""
    with get_db() as conn:
        conditions = []
        params = []
 
        if status:
            conditions.append("t.status = ?")
            params.append(status)
        if priority:
            conditions.append("t.priority = ?")
            params.append(priority)
        if tag:
            conditions.append("t.id IN (SELECT task_id FROM task_tags tt JOIN tags tg ON tt.tag_id = tg.id WHERE tg.name = ?)")
            params.append(tag)
 
        query = """
            SELECT t.id, t.title, t.priority, t.status, t.due_date,
                   GROUP_CONCAT(tg.name, ', ') AS tags
            FROM tasks t
            LEFT JOIN task_tags tt ON t.id = tt.task_id
            LEFT JOIN tags tg ON tt.tag_id = tg.id
        """
        if conditions:
            query += " WHERE " + " AND ".join(conditions)
        query += " GROUP BY t.id ORDER BY t.priority ASC, t.due_date ASC"
 
        rows = conn.execute(query, params).fetchall()
 
        priority_labels = {1: "URGENT", 2: "HIGH", 3: "MEDIUM", 4: "LOW"}
        print(f"\n{'ID':<5} {'Priority':<10} {'Status':<13} {'Due':<12} {'Title':<30} {'Tags'}")
        print("-" * 85)
        for row in rows:
            due = row["due_date"] or "—"
            tags_str = row["tags"] or "—"
            p_label = priority_labels.get(row["priority"], "?")
            print(f"{row['id']:<5} {p_label:<10} {row['status']:<13} {due:<12} {row['title']:<30} {tags_str}")
 
        print(f"\nTotal: {len(rows)} tasks")
 
def complete_task(task_id):
    """Mark a task as done."""
    with get_db() as conn:
        result = conn.execute(
            "UPDATE tasks SET status = 'done', completed_at = ? WHERE id = ?",
            (datetime.now().isoformat(), task_id)
        )
        if result.rowcount:
            print(f"Task #{task_id} marked as done.")
        else:
            print(f"Task #{task_id} not found.")
 
def stats():
    """Show task statistics."""
    with get_db() as conn:
        row = conn.execute("""
            SELECT
                COUNT(*) AS total,
                SUM(CASE WHEN status = 'todo' THEN 1 ELSE 0 END) AS todo,
                SUM(CASE WHEN status = 'in_progress' THEN 1 ELSE 0 END) AS in_progress,
                SUM(CASE WHEN status = 'done' THEN 1 ELSE 0 END) AS done,
                SUM(CASE WHEN due_date < date('now') AND status != 'done' THEN 1 ELSE 0 END) AS overdue
            FROM tasks
        """).fetchone()
 
        print(f"\nTask Statistics:")
        print(f"  Total:       {row['total']}")
        print(f"  To Do:       {row['todo']}")
        print(f"  In Progress: {row['in_progress']}")
        print(f"  Done:        {row['done']}")
        print(f"  Overdue:     {row['overdue']}")
 
# Initialize and demo
if __name__ == "__main__":
    init_db()
 
    # Add sample tasks
    add_task("Design database schema", priority=1, due_date="2026-02-20", tags=["backend", "database"])
    add_task("Write API endpoints", priority=2, due_date="2026-02-25", tags=["backend", "api"])
    add_task("Create unit tests", priority=3, tags=["testing"])
    add_task("Update documentation", priority=4, due_date="2026-03-01", tags=["docs"])
 
    # List and stats
    list_tasks()
    stats()
    complete_task(1)
    list_tasks(status="done")

SQLite vs PostgreSQL vs MySQL vs MongoDB

FeatureSQLitePostgreSQLMySQLMongoDB
TypeEmbeddedClient-serverClient-serverDocument store
SetupZero configInstall + configureInstall + configureInstall + configure
StorageSingle fileServer directoryServer directoryServer directory
Max DB size281 TBUnlimited256 TBUnlimited
Concurrent writesLimited (file lock)Excellent (MVCC)Good (row locks)Good (document locks)
SQL supportMost of SQL92Full SQL + extensionsFull SQLMQL (not SQL)
Data types5 storage classes40+ types30+ typesBSON types
Full-text searchFTS5 extensionBuilt-in tsvectorBuilt-in FULLTEXTBuilt-in text index
JSON supportJSON1 extensionNative JSONBNative JSONNative (it is JSON)
ReplicationNot built-inStreaming + logicalPrimary-replicaReplica sets
Best forEmbedded, local, prototypesWeb apps, analyticsWeb apps, CMSFlexible schema, documents
Python librarysqlite3 (built-in)psycopg2mysql-connectorpymongo
Hosting requiredNoYesYesYes
Cost to startFree, zero overheadFree, but need serverFree, but need serverFree, but need server

Die Entscheidung ist einfach: Nutze SQLite, wenn deine Anwendung der einzige Prozess ist, der in die Datenbank schreibt. Nutze PostgreSQL oder MySQL, wenn du gleichzeitige Schreibzugriffe durch mehrere Nutzer, Replikation oder Netzwerkzugriff brauchst. Nutze MongoDB, wenn deine Daten dokumentenorientiert sind und nicht gut in ein relationales Schema passen.

Häufig gestellte Fragen

Ist SQLite für den Produktionseinsatz geeignet?

Ja. SQLite wird in Produktion von Android, iOS, allen großen Webbrowsern, Airbus-Flugsoftware und unzähligen Desktop-Anwendungen eingesetzt. Es verarbeitet bis zu 281 Terabyte und Millionen Zeilen. Für schreibintensive Webanwendungen mit hoher Parallelität und vielen gleichzeitigen Nutzern ist es nicht geeignet, aber für die meisten anderen Szenarien funktioniert es sehr gut.

Wie prüfe ich, welche SQLite-Version Python verwendet?

Nutze sqlite3.sqlite_version, um die Version der SQLite-Bibliothek zu prüfen, und sqlite3.version für die Version des Python-Moduls. Beispiel: import sqlite3; print(sqlite3.sqlite_version) könnte 3.45.1 ausgeben.

Können mehrere Prozesse dieselbe SQLite-Datenbank lesen?

Ja. Mehrere Prozesse können gleichzeitig aus derselben SQLite-Datenbank lesen. Schreiboperationen erzeugen ein exklusives Lock auf der Datenbankdatei. Mit aktiviertem WAL-Modus blockieren Reader keine Writer und Writer blockieren keine Reader. Allerdings kann immer nur ein Writer gleichzeitig arbeiten.

Wie sichere ich eine SQLite-Datenbank sicher?

Nutze die sqlite3-Backup-API: source.backup(dest). Du kannst die Datenbankdatei auch sicher kopieren, solange keine Schreibvorgänge laufen. Kopiere die Datei niemals während einer aktiven Schreibtransaktion, da das Backup sonst beschädigt werden kann.

import sqlite3
 
source = sqlite3.connect("myapp.db")
backup = sqlite3.connect("myapp_backup.db")
source.backup(backup)
backup.close()
source.close()

Wie speichere ich Datum und Uhrzeit in SQLite?

SQLite hat keinen nativen DATE- oder DATETIME-Typ. Speichere Datumswerte als TEXT im ISO-8601-Format ('2026-02-18' oder '2026-02-18T14:30:00'). Die eingebauten Datumsfunktionen von SQLite (date(), time(), datetime(), strftime()) funktionieren mit diesem Format. Alternativ kannst du Unix-Timestamps als INTEGER speichern.

Unterstützt SQLite Verschlüsselung?

Das standardmäßige Open-Source-SQLite enthält keine Verschlüsselung. Die SQLite Encryption Extension (SEE) ist ein kostenpflichtiges kommerzielles Produkt. Kostenlose Alternativen sind SQLCipher (Open Source) sowie pysqlcipher3 für die Python-Integration.

Wie verhindere ich SQL Injection in Python sqlite3?

Verwende immer parametrisierte Abfragen mit ?- oder :name-Placeholders. Konstruiere SQL-Strings niemals per f-strings, .format() oder %-Stringformatierung mit User-Input. Das sqlite3-Modul übernimmt das Escaping automatisch, wenn du Parameter separat übergibst.

Fazit

Pythons sqlite3-Modul bietet dir eine vollwertige relationale Datenbank ohne Setup-Aufwand. Für lokale Anwendungen, Prototypen, Data-Analysis-Skripte und Tests ist SQLite kaum zu schlagen. Die wichtigsten Muster, die du dir merken solltest: Verwende immer parametrisierte Abfragen, um SQL Injection zu verhindern, nutze Context Manager für sauberes Connection-Handling, aktiviere den WAL-Modus für bessere Nebenläufigkeit und verwende executemany für Bulk-Operationen.

Für Data-Analysis-Workflows ist die Kombination aus SQLite und pandas besonders stark: Daten mit to_sql() laden, mit read_sql() abfragen und Ergebnisse mit PyGWalker (opens in a new tab) visualisieren. Wenn dein Projekt die Concurrency-Grenzen von SQLite überschreitet, erfordert der Wechsel zu PostgreSQL in der Regel nur minimale Codeänderungen, da das DB-API-2.0-Interface über Datenbanktreiber hinweg konsistent ist.

📚