Skip to content

Python SQLite3 教程:Python 中 SQLite 数据库完整指南

Updated on

你需要在 Python 应用中存储结构化数据:可能是用户设置、传感器读数、日志记录,或一个任务列表。你先用 CSV 文件应付,但随着数据增长,查询变得痛苦。你考虑 PostgreSQL 或 MySQL,但对一个本地工具或原型来说,搭建数据库服务器又显得太“重”。你真正需要的是:一种可靠的方式来存储、查询、更新结构化数据,同时不引入额外的运维负担。

这正是 SQLite 要解决的问题。SQLite 是一个自包含、无服务器(serverless)的数据库引擎,所有内容都存储在单个文件中。Python 标准库自带 sqlite3 模块,无需安装任何东西。你可以获得完整的 SQL 支持、ACID 事务能力,并且能处理最高达 281TB 的数据集——全程不需要运行数据库服务器进程,也不用管理配置文件。

本指南将带你系统掌握如何在 Python 中高效使用 SQLite:从创建第一个数据库到高级查询、与 pandas 集成、性能调优,以及一个完整的真实项目示例。

📚

SQLite 是什么?为什么要用它?

SQLite 是一种嵌入式关系型数据库引擎。与 PostgreSQL 或 MySQL 不同,它不会作为独立的服务器进程运行。整个数据库存在于磁盘上的单个文件中(或驻留在内存中)。Python 标准库中的 sqlite3 模块提供了一个符合 DB-API 2.0 规范的 SQLite 接口。

SQLite 的关键特性:

  • 零配置:无需服务器部署,无需管理用户与权限
  • 无服务器(Serverless):数据库引擎运行在你的应用进程内
  • 单文件:整个数据库(表、索引、数据)都在一个 .db 文件中
  • 跨平台:数据库文件可在任意 OS 上直接使用,无需转换
  • 符合 ACID:完整事务支持,支持 commit 与 rollback
  • 轻量:库体积约 750 KB——比大多数图片还小

何时使用 SQLite

使用场景SQLitePostgreSQL/MySQL
桌面/移动应用最佳选择大材小用
原型开发最佳选择搭建更慢
嵌入式设备 / IoT最佳选择往往不可行
单元测试最佳选择需要测试服务器
数据分析脚本适合没必要的开销
< 10 万日访问的 Web 应用可用且效果好更可扩展
高并发 Web 应用不理想最佳选择
多客户端写入量大不理想最佳选择

SQLite 能覆盖大多数“不需要多个同时写入连接”的应用。Android 和 iOS 默认本地数据库就是 SQLite。Firefox、Chrome、macOS 内部也大量使用它。它是世界上部署最广泛的数据库引擎。

连接到数据库

sqlite3.connect() 会创建到某个数据库文件的连接。如果文件不存在,SQLite 会自动创建它。

import sqlite3
 
# Connect to a database file (creates it if it doesn't exist)
conn = sqlite3.connect("myapp.db")
 
# Create a cursor to execute SQL statements
cursor = conn.cursor()
 
# Always close the connection when done
conn.close()

内存数据库(In-Memory Databases)

使用特殊字符串 :memory: 创建只存在于 RAM 中的数据库。这非常适合测试或临时数据处理。

import sqlite3
 
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
 
cursor.execute("CREATE TABLE test (id INTEGER, value TEXT)")
cursor.execute("INSERT INTO test VALUES (1, 'hello')")
cursor.execute("SELECT * FROM test")
print(cursor.fetchone())  # (1, 'hello')
 
conn.close()  # Database is gone after this

使用上下文管理器(Context Managers)

推荐模式是使用上下文管理器(with 语句)来确保连接能被正确处理,即便发生错误也不至于遗漏清理。在 with 块中,SQLite 连接在成功时会自动提交(auto-commit),在异常时自动回滚(auto-rollback)。

import sqlite3
 
# Connection as context manager handles commit/rollback
with sqlite3.connect("myapp.db") as conn:
    cursor = conn.cursor()
    cursor.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)")
    cursor.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
    # Auto-commits if no exception
    # Auto-rolls back if an exception occurs
 
# Note: the connection is NOT closed by 'with' -- it just commits/rolls back
# For full cleanup, close explicitly or use a wrapper:
conn.close()

一个更干净的辅助函数:同时负责 commit/rollback 和关闭连接:

import sqlite3
from contextlib import contextmanager
 
@contextmanager
def get_db(db_path="myapp.db"):
    """Context manager that commits on success, rolls back on error, and always closes."""
    conn = sqlite3.connect(db_path)
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()
 
# Usage
with get_db() as conn:
    conn.execute("INSERT INTO users (name) VALUES (?)", ("Bob",))

创建表(Creating Tables)

使用 CREATE TABLE 定义 schema。SQLite 的类型系统相对简化,只有五种存储类型(storage classes)。

SQLite 数据类型(Storage Classes)

Storage Class描述Python 对应类型
NULL空值None
INTEGER有符号整数(1、2、3、4、6 或 8 字节)int
REAL浮点数(8 字节 IEEE float)float
TEXTUTF-8 或 UTF-16 字符串str
BLOB二进制数据,按输入原样存储bytes

SQLite 使用动态类型(dynamic typing)。无论列声明为何种类型,你都可以在任意列中存储任意类型的数据。声明类型只是提示(hint),而不是强约束。这一点不同于 PostgreSQL 和 MySQL。

创建带约束的表

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
cursor.execute("""
    CREATE TABLE IF NOT EXISTS employees (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        email TEXT UNIQUE NOT NULL,
        department TEXT DEFAULT 'General',
        salary REAL CHECK(salary > 0),
        hire_date TEXT NOT NULL,
        is_active INTEGER DEFAULT 1
    )
""")
 
conn.commit()
conn.close()

常见约束说明:

  • PRIMARY KEY:每行唯一标识。INTEGER PRIMARY KEY 是内部 rowid 的别名。
  • AUTOINCREMENT:保证 rowid 只增不减(不会复用已删除的 ID)。
  • NOT NULL:列不可为 NULL。
  • UNIQUE:该列值不可重复。
  • DEFAULT:INSERT 时未提供则使用默认值。
  • CHECK:用条件校验数据合法性。

创建多个关联表

import sqlite3
 
conn = sqlite3.connect("school.db")
cursor = conn.cursor()
 
# Enable foreign key enforcement (off by default in SQLite)
cursor.execute("PRAGMA foreign_keys = ON")
 
cursor.execute("""
    CREATE TABLE IF NOT EXISTS students (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        email TEXT UNIQUE
    )
""")
 
cursor.execute("""
    CREATE TABLE IF NOT EXISTS courses (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        title TEXT NOT NULL,
        credits INTEGER DEFAULT 3
    )
""")
 
cursor.execute("""
    CREATE TABLE IF NOT EXISTS enrollments (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        student_id INTEGER NOT NULL,
        course_id INTEGER NOT NULL,
        grade TEXT,
        enrolled_date TEXT DEFAULT CURRENT_DATE,
        FOREIGN KEY (student_id) REFERENCES students(id) ON DELETE CASCADE,
        FOREIGN KEY (course_id) REFERENCES courses(id) ON DELETE CASCADE,
        UNIQUE(student_id, course_id)
    )
""")
 
conn.commit()
conn.close()

CRUD 操作

CRUD 指 Create、Read、Update、Delete:持久化存储的四个基本操作。

INSERT:添加数据

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Insert a single row
cursor.execute(
    "INSERT INTO employees (name, email, department, salary, hire_date) VALUES (?, ?, ?, ?, ?)",
    ("Alice Johnson", "alice@company.com", "Engineering", 95000, "2024-01-15")
)
 
# Insert multiple rows with executemany
employees = [
    ("Bob Smith", "bob@company.com", "Marketing", 72000, "2024-03-01"),
    ("Carol White", "carol@company.com", "Engineering", 98000, "2023-11-20"),
    ("David Brown", "david@company.com", "Sales", 68000, "2024-06-10"),
    ("Eve Davis", "eve@company.com", "Engineering", 105000, "2023-08-05"),
]
 
cursor.executemany(
    "INSERT INTO employees (name, email, department, salary, hire_date) VALUES (?, ?, ?, ?, ?)",
    employees
)
 
# Get the last inserted row ID
print(f"Last inserted ID: {cursor.lastrowid}")
 
# Get the number of rows affected
print(f"Rows inserted: {cursor.rowcount}")
 
conn.commit()
conn.close()

INSERT OR REPLACE(Upsert)

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Insert or update if email already exists
cursor.execute("""
    INSERT INTO employees (name, email, department, salary, hire_date)
    VALUES (?, ?, ?, ?, ?)
    ON CONFLICT(email) DO UPDATE SET
        name = excluded.name,
        salary = excluded.salary
""", ("Alice Johnson", "alice@company.com", "Engineering", 100000, "2024-01-15"))
 
conn.commit()
conn.close()

SELECT:读取数据

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Select all rows
cursor.execute("SELECT * FROM employees")
all_rows = cursor.fetchall()
for row in all_rows:
    print(row)  # Returns tuples by default
 
# Select specific columns with a condition
cursor.execute(
    "SELECT name, salary FROM employees WHERE department = ?",
    ("Engineering",)
)
engineers = cursor.fetchall()
for name, salary in engineers:
    print(f"{name}: ${salary:,.0f}")
 
# Select with ordering and limit
cursor.execute("""
    SELECT name, salary FROM employees
    ORDER BY salary DESC
    LIMIT 3
""")
top_earners = cursor.fetchall()
print("Top 3 earners:", top_earners)
 
conn.close()

将结果作为字典返回

默认情况下,fetchall() 返回 tuple。想要字典(列名为 key),使用 sqlite3.Row

import sqlite3
 
conn = sqlite3.connect("myapp.db")
conn.row_factory = sqlite3.Row  # Set row factory
 
cursor = conn.cursor()
cursor.execute("SELECT * FROM employees WHERE department = ?", ("Engineering",))
 
for row in cursor.fetchall():
    print(dict(row))  # Convert Row to dict
    print(row["name"], row["salary"])  # Access by column name
 
conn.close()

UPDATE:修改数据

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Update a single field
cursor.execute(
    "UPDATE employees SET salary = ? WHERE email = ?",
    (110000, "alice@company.com")
)
print(f"Rows updated: {cursor.rowcount}")
 
# Update with a calculation
cursor.execute("""
    UPDATE employees
    SET salary = salary * 1.10
    WHERE department = 'Engineering' AND salary < 100000
""")
print(f"Engineers with raises: {cursor.rowcount}")
 
# Conditional update
cursor.execute("""
    UPDATE employees
    SET is_active = 0
    WHERE hire_date < '2023-01-01'
""")
 
conn.commit()
conn.close()

DELETE:删除数据

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Delete specific rows
cursor.execute("DELETE FROM employees WHERE is_active = 0")
print(f"Rows deleted: {cursor.rowcount}")
 
# Delete with a subquery
cursor.execute("""
    DELETE FROM enrollments
    WHERE student_id NOT IN (SELECT id FROM students)
""")
 
# Delete all rows (truncate equivalent)
cursor.execute("DELETE FROM employees")
 
conn.commit()
conn.close()

参数化查询与防止 SQL 注入

永远不要用字符串拼接、字符串格式化或 f-string 构造 SQL。务必使用参数化查询来防止 SQL 注入。

问题:SQL 注入

import sqlite3
 
# DANGEROUS: Never do this
user_input = "'; DROP TABLE employees; --"
query = f"SELECT * FROM employees WHERE name = '{user_input}'"
# This would execute: SELECT * FROM employees WHERE name = ''; DROP TABLE employees; --'

解决方案:参数化查询

SQLite3 支持两种占位符风格:

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Style 1: Question mark placeholders (positional)
cursor.execute(
    "SELECT * FROM employees WHERE department = ? AND salary > ?",
    ("Engineering", 90000)
)
 
# Style 2: Named placeholders
cursor.execute(
    "SELECT * FROM employees WHERE department = :dept AND salary > :min_salary",
    {"dept": "Engineering", "min_salary": 90000}
)
 
# Named placeholders are clearer with many parameters
cursor.execute("""
    INSERT INTO employees (name, email, department, salary, hire_date)
    VALUES (:name, :email, :dept, :salary, :date)
""", {
    "name": "Frank Green",
    "email": "frank@company.com",
    "dept": "Engineering",
    "salary": 92000,
    "date": "2025-09-01"
})
 
conn.commit()
conn.close()

参数化查询会自动处理转义与引号。数据库引擎会把参数值当作数据而不是 SQL 代码,无论输入内容多“恶意”,都无法被解释为 SQL,从根源上消除 SQL 注入风险。

动态 WHERE 子句

有时需要按条件动态拼接查询(例如筛选条件是可选的)。下面是安全写法:

import sqlite3
 
def search_employees(department=None, min_salary=None, is_active=None):
    conn = sqlite3.connect("myapp.db")
    cursor = conn.cursor()
 
    conditions = []
    params = []
 
    if department is not None:
        conditions.append("department = ?")
        params.append(department)
 
    if min_salary is not None:
        conditions.append("salary >= ?")
        params.append(min_salary)
 
    if is_active is not None:
        conditions.append("is_active = ?")
        params.append(is_active)
 
    query = "SELECT * FROM employees"
    if conditions:
        query += " WHERE " + " AND ".join(conditions)
 
    cursor.execute(query, params)
    results = cursor.fetchall()
    conn.close()
    return results
 
# Usage
engineers = search_employees(department="Engineering", min_salary=90000)
active_staff = search_employees(is_active=1)
everyone = search_employees()  # No filters

使用事务(Transactions)

事务将多条 SQL 操作组合为一个原子单元:要么全部成功并提交(commit),要么全部撤销(rollback)。

手动控制事务

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
try:
    # Transfer money between accounts
    cursor.execute("UPDATE accounts SET balance = balance - 500 WHERE id = 1")
    cursor.execute("UPDATE accounts SET balance = balance + 500 WHERE id = 2")
 
    # Verify no negative balance
    cursor.execute("SELECT balance FROM accounts WHERE id = 1")
    balance = cursor.fetchone()[0]
    if balance < 0:
        raise ValueError("Insufficient funds")
 
    conn.commit()  # All good, save changes
    print("Transfer complete")
 
except Exception as e:
    conn.rollback()  # Something went wrong, undo everything
    print(f"Transfer failed: {e}")
 
finally:
    conn.close()

Autocommit 与隔离级别(Isolation Levels)

默认情况下,sqlite3 运行在“延迟事务(deferred transaction)”模式。你可以改变行为:

import sqlite3
 
# Default behavior: transactions are deferred
conn = sqlite3.connect("myapp.db")
 
# Autocommit mode (each statement is its own transaction)
conn = sqlite3.connect("myapp.db", isolation_level=None)
 
# Immediate locking (prevents concurrent write conflicts)
conn = sqlite3.connect("myapp.db")
conn.execute("BEGIN IMMEDIATE")
# ... your operations ...
conn.commit()

使用 Savepoint 进行部分回滚

Savepoint 允许你只回滚事务中的一部分,而不必全部撤销:

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
try:
    cursor.execute("INSERT INTO orders (customer_id, total) VALUES (1, 100)")
 
    # Start a savepoint
    cursor.execute("SAVEPOINT order_items")
 
    try:
        cursor.execute("INSERT INTO order_items (order_id, product, qty) VALUES (1, 'Widget', 5)")
        cursor.execute("INSERT INTO order_items (order_id, product, qty) VALUES (1, 'Gadget', -1)")
        # This might fail due to CHECK constraint
        cursor.execute("RELEASE SAVEPOINT order_items")
    except sqlite3.IntegrityError:
        cursor.execute("ROLLBACK TO SAVEPOINT order_items")
        print("Some items failed, but order was still created")
 
    conn.commit()
except Exception:
    conn.rollback()
finally:
    conn.close()

获取数据:fetchone、fetchall、fetchmany

cursor 对象提供三种方法来读取查询结果。

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
cursor.execute("SELECT * FROM employees ORDER BY salary DESC")
 
# fetchone: Get the next single row (or None if no more rows)
top_earner = cursor.fetchone()
print(f"Top earner: {top_earner}")
 
# fetchmany: Get the next N rows
next_three = cursor.fetchmany(3)
print(f"Next 3: {next_three}")
 
# fetchall: Get all remaining rows
remaining = cursor.fetchall()
print(f"Remaining: {len(remaining)} rows")
 
conn.close()

更高效地遍历结果

对大结果集,直接遍历 cursor 比一次性加载到内存更省内存:

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
cursor.execute("SELECT name, salary FROM employees")
 
# Memory-efficient: processes one row at a time
for name, salary in cursor:
    print(f"{name}: ${salary:,.0f}")
 
conn.close()

使用 fetchmany 进行批处理

当需要处理数百万行数据时,循环 fetchmany() 能更好地控制内存占用:

import sqlite3
 
conn = sqlite3.connect("large_data.db")
cursor = conn.cursor()
 
cursor.execute("SELECT * FROM sensor_readings")
 
batch_size = 1000
while True:
    rows = cursor.fetchmany(batch_size)
    if not rows:
        break
    # Process batch
    for row in rows:
        pass  # your processing logic
 
conn.close()

高级查询(Advanced Queries)

JOIN

import sqlite3
 
conn = sqlite3.connect("school.db")
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
 
# INNER JOIN: Students with their enrolled courses
cursor.execute("""
    SELECT s.name AS student, c.title AS course, e.grade
    FROM enrollments e
    INNER JOIN students s ON e.student_id = s.id
    INNER JOIN courses c ON e.course_id = c.id
    ORDER BY s.name, c.title
""")
 
for row in cursor.fetchall():
    print(f"{row['student']} - {row['course']}: {row['grade'] or 'In Progress'}")
 
# LEFT JOIN: All students, even those not enrolled
cursor.execute("""
    SELECT s.name, COUNT(e.id) AS course_count
    FROM students s
    LEFT JOIN enrollments e ON s.id = e.student_id
    GROUP BY s.id
    ORDER BY course_count DESC
""")
 
for row in cursor.fetchall():
    print(f"{row['name']}: {row['course_count']} courses")
 
conn.close()

聚合(Aggregation)

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Department statistics
cursor.execute("""
    SELECT
        department,
        COUNT(*) AS headcount,
        ROUND(AVG(salary), 2) AS avg_salary,
        MIN(salary) AS min_salary,
        MAX(salary) AS max_salary,
        SUM(salary) AS total_payroll
    FROM employees
    WHERE is_active = 1
    GROUP BY department
    HAVING COUNT(*) >= 2
    ORDER BY avg_salary DESC
""")
 
print(f"{'Department':<15} {'Count':<8} {'Avg Salary':<12} {'Min':<10} {'Max':<10}")
print("-" * 55)
for row in cursor.fetchall():
    dept, count, avg_sal, min_sal, max_sal, total = row
    print(f"{dept:<15} {count:<8} ${avg_sal:<11,.0f} ${min_sal:<9,.0f} ${max_sal:<9,.0f}")
 
conn.close()

子查询(Subqueries)

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Employees earning above their department average
cursor.execute("""
    SELECT name, department, salary
    FROM employees e
    WHERE salary > (
        SELECT AVG(salary) FROM employees
        WHERE department = e.department
    )
    ORDER BY department, salary DESC
""")
 
for name, dept, salary in cursor.fetchall():
    print(f"{name} ({dept}): ${salary:,.0f}")
 
# Most recent hire in each department
cursor.execute("""
    SELECT name, department, hire_date
    FROM employees
    WHERE (department, hire_date) IN (
        SELECT department, MAX(hire_date)
        FROM employees
        GROUP BY department
    )
""")
 
for row in cursor.fetchall():
    print(row)
 
conn.close()

窗口函数(SQLite 3.25+)

SQLite 支持窗口函数,用于运行累计值、排名等分析查询:

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Rank employees by salary within each department
cursor.execute("""
    SELECT
        name,
        department,
        salary,
        RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS dept_rank,
        RANK() OVER (ORDER BY salary DESC) AS overall_rank
    FROM employees
    WHERE is_active = 1
""")
 
for name, dept, salary, dept_rank, overall_rank in cursor.fetchall():
    print(f"#{overall_rank} overall, #{dept_rank} in {dept}: {name} (${salary:,.0f})")
 
# Running total of payroll by hire date
cursor.execute("""
    SELECT
        name,
        hire_date,
        salary,
        SUM(salary) OVER (ORDER BY hire_date) AS running_total
    FROM employees
    ORDER BY hire_date
""")
 
for name, date, salary, running in cursor.fetchall():
    print(f"{date} | {name}: ${salary:,.0f} | Running total: ${running:,.0f}")
 
conn.close()

SQLite 与 pandas

pandas 通过 read_sql()to_sql() 内置支持 SQLite。这让 DataFrame 与 SQLite 数据库之间的数据流转非常方便。

将 SQL 查询结果读取到 DataFrame

import sqlite3
import pandas as pd
 
conn = sqlite3.connect("myapp.db")
 
# Read a SQL query into a DataFrame
df = pd.read_sql("SELECT * FROM employees WHERE is_active = 1", conn)
print(df.head())
print(df.describe())
 
# Parameterized query with pandas
df = pd.read_sql(
    "SELECT * FROM employees WHERE department = ? AND salary > ?",
    conn,
    params=("Engineering", 90000)
)
 
# Read an entire table
df = pd.read_sql("SELECT * FROM employees", conn, index_col="id")
 
conn.close()

将 DataFrame 写入 SQLite

import sqlite3
import pandas as pd
 
# Create a sample DataFrame
df = pd.DataFrame({
    "product": ["Laptop", "Phone", "Tablet", "Monitor", "Keyboard"],
    "price": [999.99, 699.99, 449.99, 349.99, 79.99],
    "stock": [50, 200, 75, 30, 500],
    "category": ["Electronics", "Electronics", "Electronics", "Peripherals", "Peripherals"]
})
 
conn = sqlite3.connect("store.db")
 
# Write DataFrame to a new table
df.to_sql("products", conn, if_exists="replace", index=False)
 
# Append to an existing table
new_products = pd.DataFrame({
    "product": ["Mouse", "Webcam"],
    "price": [49.99, 89.99],
    "stock": [300, 100],
    "category": ["Peripherals", "Peripherals"]
})
new_products.to_sql("products", conn, if_exists="append", index=False)
 
# Verify
result = pd.read_sql("SELECT * FROM products", conn)
print(result)
 
conn.close()

往返工作流:CSV → SQLite → 分析

import sqlite3
import pandas as pd
 
# Step 1: Load CSV into SQLite
df = pd.read_csv("sales_data.csv")
conn = sqlite3.connect("analytics.db")
df.to_sql("sales", conn, if_exists="replace", index=False)
 
# Step 2: Run SQL analytics
summary = pd.read_sql("""
    SELECT
        region,
        strftime('%Y-%m', sale_date) AS month,
        COUNT(*) AS transactions,
        SUM(amount) AS revenue,
        AVG(amount) AS avg_transaction
    FROM sales
    GROUP BY region, month
    ORDER BY month, revenue DESC
""", conn)
 
print(summary)
 
# Step 3: Export results
summary.to_csv("monthly_summary.csv", index=False)
 
conn.close()

当你把查询结果放入 DataFrame 后,可以直接用 PyGWalker (opens in a new tab) 做可视化。PyGWalker 能把任意 pandas DataFrame 变成类似 Tableau 的交互式可视化界面,通过拖拽字段生成图表,无需编写绘图代码:

import pygwalker as pyg
 
# Turn your SQL query results into an interactive visualization
walker = pyg.walk(summary)

如果你希望工作流更顺滑,RunCell (opens in a new tab) 提供了 AI 驱动的 Jupyter 环境:可以写 SQL、获得 AI 辅助调试,并在同一个 notebook 中即时可视化结果。

错误处理(Error Handling)

sqlite3 模块定义了多种异常类型。捕获更具体的异常有助于提供更清晰的错误信息与恢复策略。

import sqlite3
 
def safe_insert(db_path, name, email, salary):
    """Insert an employee with proper error handling."""
    try:
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        cursor.execute(
            "INSERT INTO employees (name, email, salary, hire_date) VALUES (?, ?, ?, date('now'))",
            (name, email, salary)
        )
        conn.commit()
        return cursor.lastrowid
 
    except sqlite3.IntegrityError as e:
        # Unique constraint violated (duplicate email, NOT NULL violated, etc.)
        print(f"Data integrity error: {e}")
        return None
 
    except sqlite3.OperationalError as e:
        # Table doesn't exist, SQL syntax error, database locked, etc.
        print(f"Operational error: {e}")
        return None
 
    except sqlite3.DatabaseError as e:
        # Corrupted database, disk full, etc.
        print(f"Database error: {e}")
        return None
 
    except sqlite3.ProgrammingError as e:
        # Using a closed cursor, wrong number of parameters, etc.
        print(f"Programming error: {e}")
        return None
 
    finally:
        if conn:
            conn.close()

SQLite3 异常层级(Exception Hierarchy)

ExceptionParent常见原因
sqlite3.WarningException非致命问题
sqlite3.ErrorExceptionsqlite3 全部错误的基类
sqlite3.InterfaceErrorErrorDB-API 接口误用
sqlite3.DatabaseErrorError数据库相关错误
sqlite3.DataErrorDatabaseError数据处理问题
sqlite3.OperationalErrorDatabaseError数据库被锁、表缺失、SQL 错误
sqlite3.IntegrityErrorDatabaseErrorUNIQUE、NOT NULL、FOREIGN KEY 约束违规
sqlite3.InternalErrorDatabaseErrorsqlite3 模块内部错误
sqlite3.ProgrammingErrorDatabaseErrorAPI 使用不正确
sqlite3.NotSupportedErrorDatabaseError不支持的特性

性能建议(Performance Tips)

启用 WAL 模式

WAL(Write-Ahead Logging)模式允许在写入时并发读取,并提升写性能:

import sqlite3
 
conn = sqlite3.connect("myapp.db")
conn.execute("PRAGMA journal_mode=WAL")
# Returns 'wal' if successful

当你有一个 reader(例如 Web 服务)和一个 writer(例如后台任务)同时访问同一个数据库时,WAL 模式尤其有用。

批量插入使用 executemany

与在循环里反复调用 execute 相比,executemany 会显著更快,因为它减少了 Python 与 SQLite 之间的往返开销:

import sqlite3
import time
 
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE numbers (value INTEGER)")
 
data = [(i,) for i in range(100_000)]
 
# Slow: individual inserts
start = time.perf_counter()
for d in data:
    conn.execute("INSERT INTO numbers VALUES (?)", d)
conn.commit()
slow_time = time.perf_counter() - start
 
conn.execute("DELETE FROM numbers")
 
# Fast: executemany
start = time.perf_counter()
conn.executemany("INSERT INTO numbers VALUES (?)", data)
conn.commit()
fast_time = time.perf_counter() - start
 
print(f"Individual inserts: {slow_time:.2f}s")
print(f"executemany:        {fast_time:.2f}s")
print(f"Speedup:            {slow_time / fast_time:.1f}x")
 
conn.close()

为高频查询列创建索引

索引能显著加速 SELECT 查询,但会带来:写入略慢、占用更多磁盘空间。

import sqlite3
 
conn = sqlite3.connect("myapp.db")
 
# Single column index
conn.execute("CREATE INDEX IF NOT EXISTS idx_employees_department ON employees(department)")
 
# Composite index (for queries that filter on both columns)
conn.execute("CREATE INDEX IF NOT EXISTS idx_employees_dept_salary ON employees(department, salary)")
 
# Unique index (also enforces uniqueness)
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_employees_email ON employees(email)")
 
# Check existing indexes
cursor = conn.execute("SELECT name, sql FROM sqlite_master WHERE type='index'")
for name, sql in cursor.fetchall():
    print(f"{name}: {sql}")
 
conn.close()

使用 EXPLAIN QUERY PLAN 检查性能

import sqlite3
 
conn = sqlite3.connect("myapp.db")
 
# Check how SQLite executes a query
result = conn.execute("""
    EXPLAIN QUERY PLAN
    SELECT * FROM employees WHERE department = 'Engineering' AND salary > 90000
""").fetchall()
 
for row in result:
    print(row)
# Output shows whether an index is used or a full table scan occurs
 
conn.close()

其他有助于性能的 PRAGMA

import sqlite3
 
conn = sqlite3.connect("myapp.db")
 
# Increase cache size (default is 2000 pages = ~8MB)
conn.execute("PRAGMA cache_size = -64000")  # 64MB (negative = KB)
 
# Reduce disk syncs for speed (less durable but much faster)
conn.execute("PRAGMA synchronous = NORMAL")  # Options: OFF, NORMAL, FULL (default)
 
# Store temp tables in memory
conn.execute("PRAGMA temp_store = MEMORY")
 
# Check database integrity
result = conn.execute("PRAGMA integrity_check").fetchone()
print(f"Integrity: {result[0]}")  # Should print 'ok'
 
conn.close()

真实案例:任务管理 CLI

下面是一个完整的任务管理应用,展示了本指南涉及的核心概念。它使用 SQLite 做持久化存储,支持优先级、标签和截止日期。

import sqlite3
import sys
from datetime import datetime, date
from contextlib import contextmanager
 
DB_PATH = "tasks.db"
 
@contextmanager
def get_db():
    """Get a database connection with Row factory and foreign keys enabled."""
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    conn.execute("PRAGMA foreign_keys = ON")
    conn.execute("PRAGMA journal_mode = WAL")
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()
 
def init_db():
    """Create tables if they don't exist."""
    with get_db() as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS tasks (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                title TEXT NOT NULL,
                description TEXT DEFAULT '',
                priority INTEGER DEFAULT 2 CHECK(priority BETWEEN 1 AND 4),
                status TEXT DEFAULT 'todo' CHECK(status IN ('todo', 'in_progress', 'done')),
                due_date TEXT,
                created_at TEXT DEFAULT CURRENT_TIMESTAMP,
                completed_at TEXT
            )
        """)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS tags (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT UNIQUE NOT NULL
            )
        """)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS task_tags (
                task_id INTEGER NOT NULL,
                tag_id INTEGER NOT NULL,
                PRIMARY KEY (task_id, tag_id),
                FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE,
                FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
            )
        """)
        conn.execute("CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)")
        conn.execute("CREATE INDEX IF NOT EXISTS idx_tasks_priority ON tasks(priority)")
        conn.execute("CREATE INDEX IF NOT EXISTS idx_tasks_due ON tasks(due_date)")
 
def add_task(title, description="", priority=2, due_date=None, tags=None):
    """Add a new task with optional tags."""
    with get_db() as conn:
        cursor = conn.execute(
            "INSERT INTO tasks (title, description, priority, due_date) VALUES (?, ?, ?, ?)",
            (title, description, priority, due_date)
        )
        task_id = cursor.lastrowid
 
        if tags:
            for tag_name in tags:
                conn.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag_name,))
                tag_id = conn.execute("SELECT id FROM tags WHERE name = ?", (tag_name,)).fetchone()["id"]
                conn.execute("INSERT INTO task_tags (task_id, tag_id) VALUES (?, ?)", (task_id, tag_id))
 
        print(f"Task #{task_id} created: {title}")
        return task_id
 
def list_tasks(status=None, priority=None, tag=None):
    """List tasks with optional filters."""
    with get_db() as conn:
        conditions = []
        params = []
 
        if status:
            conditions.append("t.status = ?")
            params.append(status)
        if priority:
            conditions.append("t.priority = ?")
            params.append(priority)
        if tag:
            conditions.append("t.id IN (SELECT task_id FROM task_tags tt JOIN tags tg ON tt.tag_id = tg.id WHERE tg.name = ?)")
            params.append(tag)
 
        query = """
            SELECT t.id, t.title, t.priority, t.status, t.due_date,
                   GROUP_CONCAT(tg.name, ', ') AS tags
            FROM tasks t
            LEFT JOIN task_tags tt ON t.id = tt.task_id
            LEFT JOIN tags tg ON tt.tag_id = tg.id
        """
        if conditions:
            query += " WHERE " + " AND ".join(conditions)
        query += " GROUP BY t.id ORDER BY t.priority ASC, t.due_date ASC"
 
        rows = conn.execute(query, params).fetchall()
 
        priority_labels = {1: "URGENT", 2: "HIGH", 3: "MEDIUM", 4: "LOW"}
        print(f"\n{'ID':<5} {'Priority':<10} {'Status':<13} {'Due':<12} {'Title':<30} {'Tags'}")
        print("-" * 85)
        for row in rows:
            due = row["due_date"] or "—"
            tags_str = row["tags"] or "—"
            p_label = priority_labels.get(row["priority"], "?")
            print(f"{row['id']:<5} {p_label:<10} {row['status']:<13} {due:<12} {row['title']:<30} {tags_str}")
 
        print(f"\nTotal: {len(rows)} tasks")
 
def complete_task(task_id):
    """Mark a task as done."""
    with get_db() as conn:
        result = conn.execute(
            "UPDATE tasks SET status = 'done', completed_at = ? WHERE id = ?",
            (datetime.now().isoformat(), task_id)
        )
        if result.rowcount:
            print(f"Task #{task_id} marked as done.")
        else:
            print(f"Task #{task_id} not found.")
 
def stats():
    """Show task statistics."""
    with get_db() as conn:
        row = conn.execute("""
            SELECT
                COUNT(*) AS total,
                SUM(CASE WHEN status = 'todo' THEN 1 ELSE 0 END) AS todo,
                SUM(CASE WHEN status = 'in_progress' THEN 1 ELSE 0 END) AS in_progress,
                SUM(CASE WHEN status = 'done' THEN 1 ELSE 0 END) AS done,
                SUM(CASE WHEN due_date < date('now') AND status != 'done' THEN 1 ELSE 0 END) AS overdue
            FROM tasks
        """).fetchone()
 
        print(f"\nTask Statistics:")
        print(f"  Total:       {row['total']}")
        print(f"  To Do:       {row['todo']}")
        print(f"  In Progress: {row['in_progress']}")
        print(f"  Done:        {row['done']}")
        print(f"  Overdue:     {row['overdue']}")
 
# Initialize and demo
if __name__ == "__main__":
    init_db()
 
    # Add sample tasks
    add_task("Design database schema", priority=1, due_date="2026-02-20", tags=["backend", "database"])
    add_task("Write API endpoints", priority=2, due_date="2026-02-25", tags=["backend", "api"])
    add_task("Create unit tests", priority=3, tags=["testing"])
    add_task("Update documentation", priority=4, due_date="2026-03-01", tags=["docs"])
 
    # List and stats
    list_tasks()
    stats()
    complete_task(1)
    list_tasks(status="done")

SQLite vs PostgreSQL vs MySQL vs MongoDB

FeatureSQLitePostgreSQLMySQLMongoDB
TypeEmbeddedClient-serverClient-serverDocument store
SetupZero configInstall + configureInstall + configureInstall + configure
StorageSingle fileServer directoryServer directoryServer directory
Max DB size281 TBUnlimited256 TBUnlimited
Concurrent writesLimited (file lock)Excellent (MVCC)Good (row locks)Good (document locks)
SQL supportMost of SQL92Full SQL + extensionsFull SQLMQL (not SQL)
Data types5 storage classes40+ types30+ typesBSON types
Full-text searchFTS5 extensionBuilt-in tsvectorBuilt-in FULLTEXTBuilt-in text index
JSON supportJSON1 extensionNative JSONBNative JSONNative (it is JSON)
ReplicationNot built-inStreaming + logicalPrimary-replicaReplica sets
Best forEmbedded, local, prototypesWeb apps, analyticsWeb apps, CMSFlexible schema, documents
Python librarysqlite3 (built-in)psycopg2mysql-connectorpymongo
Hosting requiredNoYesYesYes
Cost to startFree, zero overheadFree, but need serverFree, but need serverFree, but need server

结论很直接:当你的应用是唯一写入数据库的进程时,用 SQLite。需要并发多用户写入、复制(replication)或网络访问时,用 PostgreSQL 或 MySQL。当你的数据以文档为中心、并不适合关系型 schema 时,用 MongoDB。

常见问题(Frequently Asked Questions)

SQLite 适合用于生产环境吗?

适合。SQLite 被 Android、iOS、所有主流浏览器、Airbus 飞行软件以及大量桌面应用用于生产环境。它可以处理高达 281TB 的数据和数百万行记录。它不适合“高并发、写入密集型”的 Web 应用(大量用户同时写入),但对大多数其他场景都表现良好。

如何查看 Python 使用的 SQLite 版本?

sqlite3.sqlite_version 查看 SQLite 库版本,用 sqlite3.version 查看 Python 模块版本。例如:import sqlite3; print(sqlite3.sqlite_version) 可能输出 3.45.1

多个进程可以同时读取同一个 SQLite 数据库吗?

可以。多个进程可以同时读取同一个 SQLite 数据库。写操作会对数据库文件加独占锁。启用 WAL 模式后,读不会阻塞写、写也不会阻塞读。但同一时刻仍只能有一个 writer。

如何安全地备份 SQLite 数据库?

使用 sqlite3 的 backup API:source.backup(dest)。如果在“没有写入进行”的情况下,你也可以直接复制数据库文件。切勿在写事务进行中拷贝文件,否则可能导致备份损坏。

import sqlite3
 
source = sqlite3.connect("myapp.db")
backup = sqlite3.connect("myapp_backup.db")
source.backup(backup)
backup.close()
source.close()

如何在 SQLite 中存储日期与时间?

SQLite 没有原生的 DATE 或 DATETIME 类型。建议把日期按 ISO 8601 格式存为 TEXT(例如 '2026-02-18''2026-02-18T14:30:00')。SQLite 内置日期函数(date()time()datetime()strftime())可直接处理该格式。另一种方式是将 Unix 时间戳存为 INTEGER。

SQLite 支持加密吗?

标准开源 SQLite 不内置加密。SQLite Encryption Extension (SEE) 是付费商业产品。免费的替代方案包括 SQLCipher(开源)以及用于 Python 集成的 pysqlcipher3。

如何在 Python sqlite3 中防止 SQL 注入?

始终使用带 ?:name 占位符的参数化查询。不要用 f-string、.format()% 格式化把用户输入拼接进 SQL 字符串。sqlite3 模块会在你单独传入参数时自动处理转义。

结语(Conclusion)

Python 的 sqlite3 模块为你提供了一个“零安装、零配置”的全功能关系型数据库。对于本地应用、原型、数据分析脚本和测试场景,SQLite 很难被击败。需要牢记的关键模式包括:始终使用参数化查询来防止 SQL 注入;用上下文管理器实现干净的连接管理;启用 WAL 模式以获得更好的并发体验;批量操作用 executemany 提升性能。

在数据分析工作流中,SQLite 与 pandas 的组合非常强大:用 to_sql() 导入数据,用 read_sql() 查询分析,并用 PyGWalker (opens in a new tab) 可视化结果。当项目增长到超出 SQLite 并发能力时,迁移到 PostgreSQL 通常只需极少代码改动,因为 DB-API 2.0 接口在不同数据库驱动之间保持一致。

📚