Python SQLite3 教程:Python 中 SQLite 数据库完整指南
Updated on
你需要在 Python 应用中存储结构化数据:可能是用户设置、传感器读数、日志记录,或一个任务列表。你先用 CSV 文件应付,但随着数据增长,查询变得痛苦。你考虑 PostgreSQL 或 MySQL,但对一个本地工具或原型来说,搭建数据库服务器又显得太“重”。你真正需要的是:一种可靠的方式来存储、查询、更新结构化数据,同时不引入额外的运维负担。
这正是 SQLite 要解决的问题。SQLite 是一个自包含、无服务器(serverless)的数据库引擎,所有内容都存储在单个文件中。Python 标准库自带 sqlite3 模块,无需安装任何东西。你可以获得完整的 SQL 支持、ACID 事务能力,并且能处理最高达 281TB 的数据集——全程不需要运行数据库服务器进程,也不用管理配置文件。
本指南将带你系统掌握如何在 Python 中高效使用 SQLite:从创建第一个数据库到高级查询、与 pandas 集成、性能调优,以及一个完整的真实项目示例。
SQLite 是什么?为什么要用它?
SQLite 是一种嵌入式关系型数据库引擎。与 PostgreSQL 或 MySQL 不同,它不会作为独立的服务器进程运行。整个数据库存在于磁盘上的单个文件中(或驻留在内存中)。Python 标准库中的 sqlite3 模块提供了一个符合 DB-API 2.0 规范的 SQLite 接口。
SQLite 的关键特性:
- 零配置:无需服务器部署,无需管理用户与权限
- 无服务器(Serverless):数据库引擎运行在你的应用进程内
- 单文件:整个数据库(表、索引、数据)都在一个
.db文件中 - 跨平台:数据库文件可在任意 OS 上直接使用,无需转换
- 符合 ACID:完整事务支持,支持 commit 与 rollback
- 轻量:库体积约 750 KB——比大多数图片还小
何时使用 SQLite
| 使用场景 | SQLite | PostgreSQL/MySQL |
|---|---|---|
| 桌面/移动应用 | 最佳选择 | 大材小用 |
| 原型开发 | 最佳选择 | 搭建更慢 |
| 嵌入式设备 / IoT | 最佳选择 | 往往不可行 |
| 单元测试 | 最佳选择 | 需要测试服务器 |
| 数据分析脚本 | 适合 | 没必要的开销 |
| < 10 万日访问的 Web 应用 | 可用且效果好 | 更可扩展 |
| 高并发 Web 应用 | 不理想 | 最佳选择 |
| 多客户端写入量大 | 不理想 | 最佳选择 |
SQLite 能覆盖大多数“不需要多个同时写入连接”的应用。Android 和 iOS 默认本地数据库就是 SQLite。Firefox、Chrome、macOS 内部也大量使用它。它是世界上部署最广泛的数据库引擎。
连接到数据库
sqlite3.connect() 会创建到某个数据库文件的连接。如果文件不存在,SQLite 会自动创建它。
import sqlite3
# Connect to a database file (creates it if it doesn't exist)
conn = sqlite3.connect("myapp.db")
# Create a cursor to execute SQL statements
cursor = conn.cursor()
# Always close the connection when done
conn.close()内存数据库(In-Memory Databases)
使用特殊字符串 :memory: 创建只存在于 RAM 中的数据库。这非常适合测试或临时数据处理。
import sqlite3
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("CREATE TABLE test (id INTEGER, value TEXT)")
cursor.execute("INSERT INTO test VALUES (1, 'hello')")
cursor.execute("SELECT * FROM test")
print(cursor.fetchone()) # (1, 'hello')
conn.close() # Database is gone after this使用上下文管理器(Context Managers)
推荐模式是使用上下文管理器(with 语句)来确保连接能被正确处理,即便发生错误也不至于遗漏清理。在 with 块中,SQLite 连接在成功时会自动提交(auto-commit),在异常时自动回滚(auto-rollback)。
import sqlite3
# Connection as context manager handles commit/rollback
with sqlite3.connect("myapp.db") as conn:
cursor = conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)")
cursor.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
# Auto-commits if no exception
# Auto-rolls back if an exception occurs
# Note: the connection is NOT closed by 'with' -- it just commits/rolls back
# For full cleanup, close explicitly or use a wrapper:
conn.close()一个更干净的辅助函数:同时负责 commit/rollback 和关闭连接:
import sqlite3
from contextlib import contextmanager
@contextmanager
def get_db(db_path="myapp.db"):
"""Context manager that commits on success, rolls back on error, and always closes."""
conn = sqlite3.connect(db_path)
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
# Usage
with get_db() as conn:
conn.execute("INSERT INTO users (name) VALUES (?)", ("Bob",))创建表(Creating Tables)
使用 CREATE TABLE 定义 schema。SQLite 的类型系统相对简化,只有五种存储类型(storage classes)。
SQLite 数据类型(Storage Classes)
| Storage Class | 描述 | Python 对应类型 |
|---|---|---|
NULL | 空值 | None |
INTEGER | 有符号整数(1、2、3、4、6 或 8 字节) | int |
REAL | 浮点数(8 字节 IEEE float) | float |
TEXT | UTF-8 或 UTF-16 字符串 | str |
BLOB | 二进制数据,按输入原样存储 | bytes |
SQLite 使用动态类型(dynamic typing)。无论列声明为何种类型,你都可以在任意列中存储任意类型的数据。声明类型只是提示(hint),而不是强约束。这一点不同于 PostgreSQL 和 MySQL。
创建带约束的表
import sqlite3
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS employees (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
department TEXT DEFAULT 'General',
salary REAL CHECK(salary > 0),
hire_date TEXT NOT NULL,
is_active INTEGER DEFAULT 1
)
""")
conn.commit()
conn.close()常见约束说明:
PRIMARY KEY:每行唯一标识。INTEGER PRIMARY KEY是内部 rowid 的别名。AUTOINCREMENT:保证 rowid 只增不减(不会复用已删除的 ID)。NOT NULL:列不可为 NULL。UNIQUE:该列值不可重复。DEFAULT:INSERT 时未提供则使用默认值。CHECK:用条件校验数据合法性。
创建多个关联表
import sqlite3
conn = sqlite3.connect("school.db")
cursor = conn.cursor()
# Enable foreign key enforcement (off by default in SQLite)
cursor.execute("PRAGMA foreign_keys = ON")
cursor.execute("""
CREATE TABLE IF NOT EXISTS students (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS courses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
credits INTEGER DEFAULT 3
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS enrollments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
student_id INTEGER NOT NULL,
course_id INTEGER NOT NULL,
grade TEXT,
enrolled_date TEXT DEFAULT CURRENT_DATE,
FOREIGN KEY (student_id) REFERENCES students(id) ON DELETE CASCADE,
FOREIGN KEY (course_id) REFERENCES courses(id) ON DELETE CASCADE,
UNIQUE(student_id, course_id)
)
""")
conn.commit()
conn.close()CRUD 操作
CRUD 指 Create、Read、Update、Delete:持久化存储的四个基本操作。
INSERT:添加数据
import sqlite3
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
# Insert a single row
cursor.execute(
"INSERT INTO employees (name, email, department, salary, hire_date) VALUES (?, ?, ?, ?, ?)",
("Alice Johnson", "alice@company.com", "Engineering", 95000, "2024-01-15")
)
# Insert multiple rows with executemany
employees = [
("Bob Smith", "bob@company.com", "Marketing", 72000, "2024-03-01"),
("Carol White", "carol@company.com", "Engineering", 98000, "2023-11-20"),
("David Brown", "david@company.com", "Sales", 68000, "2024-06-10"),
("Eve Davis", "eve@company.com", "Engineering", 105000, "2023-08-05"),
]
cursor.executemany(
"INSERT INTO employees (name, email, department, salary, hire_date) VALUES (?, ?, ?, ?, ?)",
employees
)
# Get the last inserted row ID
print(f"Last inserted ID: {cursor.lastrowid}")
# Get the number of rows affected
print(f"Rows inserted: {cursor.rowcount}")
conn.commit()
conn.close()INSERT OR REPLACE(Upsert)
import sqlite3
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
# Insert or update if email already exists
cursor.execute("""
INSERT INTO employees (name, email, department, salary, hire_date)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(email) DO UPDATE SET
name = excluded.name,
salary = excluded.salary
""", ("Alice Johnson", "alice@company.com", "Engineering", 100000, "2024-01-15"))
conn.commit()
conn.close()SELECT:读取数据
import sqlite3
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
# Select all rows
cursor.execute("SELECT * FROM employees")
all_rows = cursor.fetchall()
for row in all_rows:
print(row) # Returns tuples by default
# Select specific columns with a condition
cursor.execute(
"SELECT name, salary FROM employees WHERE department = ?",
("Engineering",)
)
engineers = cursor.fetchall()
for name, salary in engineers:
print(f"{name}: ${salary:,.0f}")
# Select with ordering and limit
cursor.execute("""
SELECT name, salary FROM employees
ORDER BY salary DESC
LIMIT 3
""")
top_earners = cursor.fetchall()
print("Top 3 earners:", top_earners)
conn.close()将结果作为字典返回
默认情况下,fetchall() 返回 tuple。想要字典(列名为 key),使用 sqlite3.Row:
import sqlite3
conn = sqlite3.connect("myapp.db")
conn.row_factory = sqlite3.Row # Set row factory
cursor = conn.cursor()
cursor.execute("SELECT * FROM employees WHERE department = ?", ("Engineering",))
for row in cursor.fetchall():
print(dict(row)) # Convert Row to dict
print(row["name"], row["salary"]) # Access by column name
conn.close()UPDATE:修改数据
import sqlite3
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
# Update a single field
cursor.execute(
"UPDATE employees SET salary = ? WHERE email = ?",
(110000, "alice@company.com")
)
print(f"Rows updated: {cursor.rowcount}")
# Update with a calculation
cursor.execute("""
UPDATE employees
SET salary = salary * 1.10
WHERE department = 'Engineering' AND salary < 100000
""")
print(f"Engineers with raises: {cursor.rowcount}")
# Conditional update
cursor.execute("""
UPDATE employees
SET is_active = 0
WHERE hire_date < '2023-01-01'
""")
conn.commit()
conn.close()DELETE:删除数据
import sqlite3
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
# Delete specific rows
cursor.execute("DELETE FROM employees WHERE is_active = 0")
print(f"Rows deleted: {cursor.rowcount}")
# Delete with a subquery
cursor.execute("""
DELETE FROM enrollments
WHERE student_id NOT IN (SELECT id FROM students)
""")
# Delete all rows (truncate equivalent)
cursor.execute("DELETE FROM employees")
conn.commit()
conn.close()参数化查询与防止 SQL 注入
永远不要用字符串拼接、字符串格式化或 f-string 构造 SQL。务必使用参数化查询来防止 SQL 注入。
问题:SQL 注入
import sqlite3
# DANGEROUS: Never do this
user_input = "'; DROP TABLE employees; --"
query = f"SELECT * FROM employees WHERE name = '{user_input}'"
# This would execute: SELECT * FROM employees WHERE name = ''; DROP TABLE employees; --'解决方案:参数化查询
SQLite3 支持两种占位符风格:
import sqlite3
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
# Style 1: Question mark placeholders (positional)
cursor.execute(
"SELECT * FROM employees WHERE department = ? AND salary > ?",
("Engineering", 90000)
)
# Style 2: Named placeholders
cursor.execute(
"SELECT * FROM employees WHERE department = :dept AND salary > :min_salary",
{"dept": "Engineering", "min_salary": 90000}
)
# Named placeholders are clearer with many parameters
cursor.execute("""
INSERT INTO employees (name, email, department, salary, hire_date)
VALUES (:name, :email, :dept, :salary, :date)
""", {
"name": "Frank Green",
"email": "frank@company.com",
"dept": "Engineering",
"salary": 92000,
"date": "2025-09-01"
})
conn.commit()
conn.close()参数化查询会自动处理转义与引号。数据库引擎会把参数值当作数据而不是 SQL 代码,无论输入内容多“恶意”,都无法被解释为 SQL,从根源上消除 SQL 注入风险。
动态 WHERE 子句
有时需要按条件动态拼接查询(例如筛选条件是可选的)。下面是安全写法:
import sqlite3
def search_employees(department=None, min_salary=None, is_active=None):
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
conditions = []
params = []
if department is not None:
conditions.append("department = ?")
params.append(department)
if min_salary is not None:
conditions.append("salary >= ?")
params.append(min_salary)
if is_active is not None:
conditions.append("is_active = ?")
params.append(is_active)
query = "SELECT * FROM employees"
if conditions:
query += " WHERE " + " AND ".join(conditions)
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
return results
# Usage
engineers = search_employees(department="Engineering", min_salary=90000)
active_staff = search_employees(is_active=1)
everyone = search_employees() # No filters使用事务(Transactions)
事务将多条 SQL 操作组合为一个原子单元:要么全部成功并提交(commit),要么全部撤销(rollback)。
手动控制事务
import sqlite3
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
try:
# Transfer money between accounts
cursor.execute("UPDATE accounts SET balance = balance - 500 WHERE id = 1")
cursor.execute("UPDATE accounts SET balance = balance + 500 WHERE id = 2")
# Verify no negative balance
cursor.execute("SELECT balance FROM accounts WHERE id = 1")
balance = cursor.fetchone()[0]
if balance < 0:
raise ValueError("Insufficient funds")
conn.commit() # All good, save changes
print("Transfer complete")
except Exception as e:
conn.rollback() # Something went wrong, undo everything
print(f"Transfer failed: {e}")
finally:
conn.close()Autocommit 与隔离级别(Isolation Levels)
默认情况下,sqlite3 运行在“延迟事务(deferred transaction)”模式。你可以改变行为:
import sqlite3
# Default behavior: transactions are deferred
conn = sqlite3.connect("myapp.db")
# Autocommit mode (each statement is its own transaction)
conn = sqlite3.connect("myapp.db", isolation_level=None)
# Immediate locking (prevents concurrent write conflicts)
conn = sqlite3.connect("myapp.db")
conn.execute("BEGIN IMMEDIATE")
# ... your operations ...
conn.commit()使用 Savepoint 进行部分回滚
Savepoint 允许你只回滚事务中的一部分,而不必全部撤销:
import sqlite3
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO orders (customer_id, total) VALUES (1, 100)")
# Start a savepoint
cursor.execute("SAVEPOINT order_items")
try:
cursor.execute("INSERT INTO order_items (order_id, product, qty) VALUES (1, 'Widget', 5)")
cursor.execute("INSERT INTO order_items (order_id, product, qty) VALUES (1, 'Gadget', -1)")
# This might fail due to CHECK constraint
cursor.execute("RELEASE SAVEPOINT order_items")
except sqlite3.IntegrityError:
cursor.execute("ROLLBACK TO SAVEPOINT order_items")
print("Some items failed, but order was still created")
conn.commit()
except Exception:
conn.rollback()
finally:
conn.close()获取数据:fetchone、fetchall、fetchmany
cursor 对象提供三种方法来读取查询结果。
import sqlite3
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM employees ORDER BY salary DESC")
# fetchone: Get the next single row (or None if no more rows)
top_earner = cursor.fetchone()
print(f"Top earner: {top_earner}")
# fetchmany: Get the next N rows
next_three = cursor.fetchmany(3)
print(f"Next 3: {next_three}")
# fetchall: Get all remaining rows
remaining = cursor.fetchall()
print(f"Remaining: {len(remaining)} rows")
conn.close()更高效地遍历结果
对大结果集,直接遍历 cursor 比一次性加载到内存更省内存:
import sqlite3
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
cursor.execute("SELECT name, salary FROM employees")
# Memory-efficient: processes one row at a time
for name, salary in cursor:
print(f"{name}: ${salary:,.0f}")
conn.close()使用 fetchmany 进行批处理
当需要处理数百万行数据时,循环 fetchmany() 能更好地控制内存占用:
import sqlite3
conn = sqlite3.connect("large_data.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM sensor_readings")
batch_size = 1000
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
# Process batch
for row in rows:
pass # your processing logic
conn.close()高级查询(Advanced Queries)
JOIN
import sqlite3
conn = sqlite3.connect("school.db")
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# INNER JOIN: Students with their enrolled courses
cursor.execute("""
SELECT s.name AS student, c.title AS course, e.grade
FROM enrollments e
INNER JOIN students s ON e.student_id = s.id
INNER JOIN courses c ON e.course_id = c.id
ORDER BY s.name, c.title
""")
for row in cursor.fetchall():
print(f"{row['student']} - {row['course']}: {row['grade'] or 'In Progress'}")
# LEFT JOIN: All students, even those not enrolled
cursor.execute("""
SELECT s.name, COUNT(e.id) AS course_count
FROM students s
LEFT JOIN enrollments e ON s.id = e.student_id
GROUP BY s.id
ORDER BY course_count DESC
""")
for row in cursor.fetchall():
print(f"{row['name']}: {row['course_count']} courses")
conn.close()聚合(Aggregation)
import sqlite3
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
# Department statistics
cursor.execute("""
SELECT
department,
COUNT(*) AS headcount,
ROUND(AVG(salary), 2) AS avg_salary,
MIN(salary) AS min_salary,
MAX(salary) AS max_salary,
SUM(salary) AS total_payroll
FROM employees
WHERE is_active = 1
GROUP BY department
HAVING COUNT(*) >= 2
ORDER BY avg_salary DESC
""")
print(f"{'Department':<15} {'Count':<8} {'Avg Salary':<12} {'Min':<10} {'Max':<10}")
print("-" * 55)
for row in cursor.fetchall():
dept, count, avg_sal, min_sal, max_sal, total = row
print(f"{dept:<15} {count:<8} ${avg_sal:<11,.0f} ${min_sal:<9,.0f} ${max_sal:<9,.0f}")
conn.close()子查询(Subqueries)
import sqlite3
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
# Employees earning above their department average
cursor.execute("""
SELECT name, department, salary
FROM employees e
WHERE salary > (
SELECT AVG(salary) FROM employees
WHERE department = e.department
)
ORDER BY department, salary DESC
""")
for name, dept, salary in cursor.fetchall():
print(f"{name} ({dept}): ${salary:,.0f}")
# Most recent hire in each department
cursor.execute("""
SELECT name, department, hire_date
FROM employees
WHERE (department, hire_date) IN (
SELECT department, MAX(hire_date)
FROM employees
GROUP BY department
)
""")
for row in cursor.fetchall():
print(row)
conn.close()窗口函数(SQLite 3.25+)
SQLite 支持窗口函数,用于运行累计值、排名等分析查询:
import sqlite3
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
# Rank employees by salary within each department
cursor.execute("""
SELECT
name,
department,
salary,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS dept_rank,
RANK() OVER (ORDER BY salary DESC) AS overall_rank
FROM employees
WHERE is_active = 1
""")
for name, dept, salary, dept_rank, overall_rank in cursor.fetchall():
print(f"#{overall_rank} overall, #{dept_rank} in {dept}: {name} (${salary:,.0f})")
# Running total of payroll by hire date
cursor.execute("""
SELECT
name,
hire_date,
salary,
SUM(salary) OVER (ORDER BY hire_date) AS running_total
FROM employees
ORDER BY hire_date
""")
for name, date, salary, running in cursor.fetchall():
print(f"{date} | {name}: ${salary:,.0f} | Running total: ${running:,.0f}")
conn.close()SQLite 与 pandas
pandas 通过 read_sql() 和 to_sql() 内置支持 SQLite。这让 DataFrame 与 SQLite 数据库之间的数据流转非常方便。
将 SQL 查询结果读取到 DataFrame
import sqlite3
import pandas as pd
conn = sqlite3.connect("myapp.db")
# Read a SQL query into a DataFrame
df = pd.read_sql("SELECT * FROM employees WHERE is_active = 1", conn)
print(df.head())
print(df.describe())
# Parameterized query with pandas
df = pd.read_sql(
"SELECT * FROM employees WHERE department = ? AND salary > ?",
conn,
params=("Engineering", 90000)
)
# Read an entire table
df = pd.read_sql("SELECT * FROM employees", conn, index_col="id")
conn.close()将 DataFrame 写入 SQLite
import sqlite3
import pandas as pd
# Create a sample DataFrame
df = pd.DataFrame({
"product": ["Laptop", "Phone", "Tablet", "Monitor", "Keyboard"],
"price": [999.99, 699.99, 449.99, 349.99, 79.99],
"stock": [50, 200, 75, 30, 500],
"category": ["Electronics", "Electronics", "Electronics", "Peripherals", "Peripherals"]
})
conn = sqlite3.connect("store.db")
# Write DataFrame to a new table
df.to_sql("products", conn, if_exists="replace", index=False)
# Append to an existing table
new_products = pd.DataFrame({
"product": ["Mouse", "Webcam"],
"price": [49.99, 89.99],
"stock": [300, 100],
"category": ["Peripherals", "Peripherals"]
})
new_products.to_sql("products", conn, if_exists="append", index=False)
# Verify
result = pd.read_sql("SELECT * FROM products", conn)
print(result)
conn.close()往返工作流:CSV → SQLite → 分析
import sqlite3
import pandas as pd
# Step 1: Load CSV into SQLite
df = pd.read_csv("sales_data.csv")
conn = sqlite3.connect("analytics.db")
df.to_sql("sales", conn, if_exists="replace", index=False)
# Step 2: Run SQL analytics
summary = pd.read_sql("""
SELECT
region,
strftime('%Y-%m', sale_date) AS month,
COUNT(*) AS transactions,
SUM(amount) AS revenue,
AVG(amount) AS avg_transaction
FROM sales
GROUP BY region, month
ORDER BY month, revenue DESC
""", conn)
print(summary)
# Step 3: Export results
summary.to_csv("monthly_summary.csv", index=False)
conn.close()当你把查询结果放入 DataFrame 后,可以直接用 PyGWalker (opens in a new tab) 做可视化。PyGWalker 能把任意 pandas DataFrame 变成类似 Tableau 的交互式可视化界面,通过拖拽字段生成图表,无需编写绘图代码:
import pygwalker as pyg
# Turn your SQL query results into an interactive visualization
walker = pyg.walk(summary)如果你希望工作流更顺滑,RunCell (opens in a new tab) 提供了 AI 驱动的 Jupyter 环境:可以写 SQL、获得 AI 辅助调试,并在同一个 notebook 中即时可视化结果。
错误处理(Error Handling)
sqlite3 模块定义了多种异常类型。捕获更具体的异常有助于提供更清晰的错误信息与恢复策略。
import sqlite3
def safe_insert(db_path, name, email, salary):
"""Insert an employee with proper error handling."""
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(
"INSERT INTO employees (name, email, salary, hire_date) VALUES (?, ?, ?, date('now'))",
(name, email, salary)
)
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError as e:
# Unique constraint violated (duplicate email, NOT NULL violated, etc.)
print(f"Data integrity error: {e}")
return None
except sqlite3.OperationalError as e:
# Table doesn't exist, SQL syntax error, database locked, etc.
print(f"Operational error: {e}")
return None
except sqlite3.DatabaseError as e:
# Corrupted database, disk full, etc.
print(f"Database error: {e}")
return None
except sqlite3.ProgrammingError as e:
# Using a closed cursor, wrong number of parameters, etc.
print(f"Programming error: {e}")
return None
finally:
if conn:
conn.close()SQLite3 异常层级(Exception Hierarchy)
| Exception | Parent | 常见原因 |
|---|---|---|
sqlite3.Warning | Exception | 非致命问题 |
sqlite3.Error | Exception | sqlite3 全部错误的基类 |
sqlite3.InterfaceError | Error | DB-API 接口误用 |
sqlite3.DatabaseError | Error | 数据库相关错误 |
sqlite3.DataError | DatabaseError | 数据处理问题 |
sqlite3.OperationalError | DatabaseError | 数据库被锁、表缺失、SQL 错误 |
sqlite3.IntegrityError | DatabaseError | UNIQUE、NOT NULL、FOREIGN KEY 约束违规 |
sqlite3.InternalError | DatabaseError | sqlite3 模块内部错误 |
sqlite3.ProgrammingError | DatabaseError | API 使用不正确 |
sqlite3.NotSupportedError | DatabaseError | 不支持的特性 |
性能建议(Performance Tips)
启用 WAL 模式
WAL(Write-Ahead Logging)模式允许在写入时并发读取,并提升写性能:
import sqlite3
conn = sqlite3.connect("myapp.db")
conn.execute("PRAGMA journal_mode=WAL")
# Returns 'wal' if successful当你有一个 reader(例如 Web 服务)和一个 writer(例如后台任务)同时访问同一个数据库时,WAL 模式尤其有用。
批量插入使用 executemany
与在循环里反复调用 execute 相比,executemany 会显著更快,因为它减少了 Python 与 SQLite 之间的往返开销:
import sqlite3
import time
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE numbers (value INTEGER)")
data = [(i,) for i in range(100_000)]
# Slow: individual inserts
start = time.perf_counter()
for d in data:
conn.execute("INSERT INTO numbers VALUES (?)", d)
conn.commit()
slow_time = time.perf_counter() - start
conn.execute("DELETE FROM numbers")
# Fast: executemany
start = time.perf_counter()
conn.executemany("INSERT INTO numbers VALUES (?)", data)
conn.commit()
fast_time = time.perf_counter() - start
print(f"Individual inserts: {slow_time:.2f}s")
print(f"executemany: {fast_time:.2f}s")
print(f"Speedup: {slow_time / fast_time:.1f}x")
conn.close()为高频查询列创建索引
索引能显著加速 SELECT 查询,但会带来:写入略慢、占用更多磁盘空间。
import sqlite3
conn = sqlite3.connect("myapp.db")
# Single column index
conn.execute("CREATE INDEX IF NOT EXISTS idx_employees_department ON employees(department)")
# Composite index (for queries that filter on both columns)
conn.execute("CREATE INDEX IF NOT EXISTS idx_employees_dept_salary ON employees(department, salary)")
# Unique index (also enforces uniqueness)
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_employees_email ON employees(email)")
# Check existing indexes
cursor = conn.execute("SELECT name, sql FROM sqlite_master WHERE type='index'")
for name, sql in cursor.fetchall():
print(f"{name}: {sql}")
conn.close()使用 EXPLAIN QUERY PLAN 检查性能
import sqlite3
conn = sqlite3.connect("myapp.db")
# Check how SQLite executes a query
result = conn.execute("""
EXPLAIN QUERY PLAN
SELECT * FROM employees WHERE department = 'Engineering' AND salary > 90000
""").fetchall()
for row in result:
print(row)
# Output shows whether an index is used or a full table scan occurs
conn.close()其他有助于性能的 PRAGMA
import sqlite3
conn = sqlite3.connect("myapp.db")
# Increase cache size (default is 2000 pages = ~8MB)
conn.execute("PRAGMA cache_size = -64000") # 64MB (negative = KB)
# Reduce disk syncs for speed (less durable but much faster)
conn.execute("PRAGMA synchronous = NORMAL") # Options: OFF, NORMAL, FULL (default)
# Store temp tables in memory
conn.execute("PRAGMA temp_store = MEMORY")
# Check database integrity
result = conn.execute("PRAGMA integrity_check").fetchone()
print(f"Integrity: {result[0]}") # Should print 'ok'
conn.close()真实案例:任务管理 CLI
下面是一个完整的任务管理应用,展示了本指南涉及的核心概念。它使用 SQLite 做持久化存储,支持优先级、标签和截止日期。
import sqlite3
import sys
from datetime import datetime, date
from contextlib import contextmanager
DB_PATH = "tasks.db"
@contextmanager
def get_db():
"""Get a database connection with Row factory and foreign keys enabled."""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA journal_mode = WAL")
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def init_db():
"""Create tables if they don't exist."""
with get_db() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT DEFAULT '',
priority INTEGER DEFAULT 2 CHECK(priority BETWEEN 1 AND 4),
status TEXT DEFAULT 'todo' CHECK(status IN ('todo', 'in_progress', 'done')),
due_date TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
completed_at TEXT
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS task_tags (
task_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
PRIMARY KEY (task_id, tag_id),
FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_tasks_priority ON tasks(priority)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_tasks_due ON tasks(due_date)")
def add_task(title, description="", priority=2, due_date=None, tags=None):
"""Add a new task with optional tags."""
with get_db() as conn:
cursor = conn.execute(
"INSERT INTO tasks (title, description, priority, due_date) VALUES (?, ?, ?, ?)",
(title, description, priority, due_date)
)
task_id = cursor.lastrowid
if tags:
for tag_name in tags:
conn.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag_name,))
tag_id = conn.execute("SELECT id FROM tags WHERE name = ?", (tag_name,)).fetchone()["id"]
conn.execute("INSERT INTO task_tags (task_id, tag_id) VALUES (?, ?)", (task_id, tag_id))
print(f"Task #{task_id} created: {title}")
return task_id
def list_tasks(status=None, priority=None, tag=None):
"""List tasks with optional filters."""
with get_db() as conn:
conditions = []
params = []
if status:
conditions.append("t.status = ?")
params.append(status)
if priority:
conditions.append("t.priority = ?")
params.append(priority)
if tag:
conditions.append("t.id IN (SELECT task_id FROM task_tags tt JOIN tags tg ON tt.tag_id = tg.id WHERE tg.name = ?)")
params.append(tag)
query = """
SELECT t.id, t.title, t.priority, t.status, t.due_date,
GROUP_CONCAT(tg.name, ', ') AS tags
FROM tasks t
LEFT JOIN task_tags tt ON t.id = tt.task_id
LEFT JOIN tags tg ON tt.tag_id = tg.id
"""
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " GROUP BY t.id ORDER BY t.priority ASC, t.due_date ASC"
rows = conn.execute(query, params).fetchall()
priority_labels = {1: "URGENT", 2: "HIGH", 3: "MEDIUM", 4: "LOW"}
print(f"\n{'ID':<5} {'Priority':<10} {'Status':<13} {'Due':<12} {'Title':<30} {'Tags'}")
print("-" * 85)
for row in rows:
due = row["due_date"] or "—"
tags_str = row["tags"] or "—"
p_label = priority_labels.get(row["priority"], "?")
print(f"{row['id']:<5} {p_label:<10} {row['status']:<13} {due:<12} {row['title']:<30} {tags_str}")
print(f"\nTotal: {len(rows)} tasks")
def complete_task(task_id):
"""Mark a task as done."""
with get_db() as conn:
result = conn.execute(
"UPDATE tasks SET status = 'done', completed_at = ? WHERE id = ?",
(datetime.now().isoformat(), task_id)
)
if result.rowcount:
print(f"Task #{task_id} marked as done.")
else:
print(f"Task #{task_id} not found.")
def stats():
"""Show task statistics."""
with get_db() as conn:
row = conn.execute("""
SELECT
COUNT(*) AS total,
SUM(CASE WHEN status = 'todo' THEN 1 ELSE 0 END) AS todo,
SUM(CASE WHEN status = 'in_progress' THEN 1 ELSE 0 END) AS in_progress,
SUM(CASE WHEN status = 'done' THEN 1 ELSE 0 END) AS done,
SUM(CASE WHEN due_date < date('now') AND status != 'done' THEN 1 ELSE 0 END) AS overdue
FROM tasks
""").fetchone()
print(f"\nTask Statistics:")
print(f" Total: {row['total']}")
print(f" To Do: {row['todo']}")
print(f" In Progress: {row['in_progress']}")
print(f" Done: {row['done']}")
print(f" Overdue: {row['overdue']}")
# Initialize and demo
if __name__ == "__main__":
init_db()
# Add sample tasks
add_task("Design database schema", priority=1, due_date="2026-02-20", tags=["backend", "database"])
add_task("Write API endpoints", priority=2, due_date="2026-02-25", tags=["backend", "api"])
add_task("Create unit tests", priority=3, tags=["testing"])
add_task("Update documentation", priority=4, due_date="2026-03-01", tags=["docs"])
# List and stats
list_tasks()
stats()
complete_task(1)
list_tasks(status="done")SQLite vs PostgreSQL vs MySQL vs MongoDB
| Feature | SQLite | PostgreSQL | MySQL | MongoDB |
|---|---|---|---|---|
| Type | Embedded | Client-server | Client-server | Document store |
| Setup | Zero config | Install + configure | Install + configure | Install + configure |
| Storage | Single file | Server directory | Server directory | Server directory |
| Max DB size | 281 TB | Unlimited | 256 TB | Unlimited |
| Concurrent writes | Limited (file lock) | Excellent (MVCC) | Good (row locks) | Good (document locks) |
| SQL support | Most of SQL92 | Full SQL + extensions | Full SQL | MQL (not SQL) |
| Data types | 5 storage classes | 40+ types | 30+ types | BSON types |
| Full-text search | FTS5 extension | Built-in tsvector | Built-in FULLTEXT | Built-in text index |
| JSON support | JSON1 extension | Native JSONB | Native JSON | Native (it is JSON) |
| Replication | Not built-in | Streaming + logical | Primary-replica | Replica sets |
| Best for | Embedded, local, prototypes | Web apps, analytics | Web apps, CMS | Flexible schema, documents |
| Python library | sqlite3 (built-in) | psycopg2 | mysql-connector | pymongo |
| Hosting required | No | Yes | Yes | Yes |
| Cost to start | Free, zero overhead | Free, but need server | Free, but need server | Free, but need server |
结论很直接:当你的应用是唯一写入数据库的进程时,用 SQLite。需要并发多用户写入、复制(replication)或网络访问时,用 PostgreSQL 或 MySQL。当你的数据以文档为中心、并不适合关系型 schema 时,用 MongoDB。
常见问题(Frequently Asked Questions)
SQLite 适合用于生产环境吗?
适合。SQLite 被 Android、iOS、所有主流浏览器、Airbus 飞行软件以及大量桌面应用用于生产环境。它可以处理高达 281TB 的数据和数百万行记录。它不适合“高并发、写入密集型”的 Web 应用(大量用户同时写入),但对大多数其他场景都表现良好。
如何查看 Python 使用的 SQLite 版本?
用 sqlite3.sqlite_version 查看 SQLite 库版本,用 sqlite3.version 查看 Python 模块版本。例如:import sqlite3; print(sqlite3.sqlite_version) 可能输出 3.45.1。
多个进程可以同时读取同一个 SQLite 数据库吗?
可以。多个进程可以同时读取同一个 SQLite 数据库。写操作会对数据库文件加独占锁。启用 WAL 模式后,读不会阻塞写、写也不会阻塞读。但同一时刻仍只能有一个 writer。
如何安全地备份 SQLite 数据库?
使用 sqlite3 的 backup API:source.backup(dest)。如果在“没有写入进行”的情况下,你也可以直接复制数据库文件。切勿在写事务进行中拷贝文件,否则可能导致备份损坏。
import sqlite3
source = sqlite3.connect("myapp.db")
backup = sqlite3.connect("myapp_backup.db")
source.backup(backup)
backup.close()
source.close()如何在 SQLite 中存储日期与时间?
SQLite 没有原生的 DATE 或 DATETIME 类型。建议把日期按 ISO 8601 格式存为 TEXT(例如 '2026-02-18' 或 '2026-02-18T14:30:00')。SQLite 内置日期函数(date()、time()、datetime()、strftime())可直接处理该格式。另一种方式是将 Unix 时间戳存为 INTEGER。
SQLite 支持加密吗?
标准开源 SQLite 不内置加密。SQLite Encryption Extension (SEE) 是付费商业产品。免费的替代方案包括 SQLCipher(开源)以及用于 Python 集成的 pysqlcipher3。
如何在 Python sqlite3 中防止 SQL 注入?
始终使用带 ? 或 :name 占位符的参数化查询。不要用 f-string、.format() 或 % 格式化把用户输入拼接进 SQL 字符串。sqlite3 模块会在你单独传入参数时自动处理转义。
结语(Conclusion)
Python 的 sqlite3 模块为你提供了一个“零安装、零配置”的全功能关系型数据库。对于本地应用、原型、数据分析脚本和测试场景,SQLite 很难被击败。需要牢记的关键模式包括:始终使用参数化查询来防止 SQL 注入;用上下文管理器实现干净的连接管理;启用 WAL 模式以获得更好的并发体验;批量操作用 executemany 提升性能。
在数据分析工作流中,SQLite 与 pandas 的组合非常强大:用 to_sql() 导入数据,用 read_sql() 查询分析,并用 PyGWalker (opens in a new tab) 可视化结果。当项目增长到超出 SQLite 并发能力时,迁移到 PostgreSQL 通常只需极少代码改动,因为 DB-API 2.0 接口在不同数据库驱动之间保持一致。