Skip to content
トピック
Python
Python SQLite3 チュートリアル: Python での SQLite データベース完全ガイド

Python SQLite3 チュートリアル: Python での SQLite データベース完全ガイド

更新日

Python アプリケーションで構造化データを保存する必要があります。ユーザー設定、センサーの読み取り値、ログエントリ、あるいはタスクリストかもしれません。CSV ファイルを使おうとしますが、データが増えるにつれてクエリがつらくなってきます。PostgreSQL や MySQL も考えますが、ローカルツールやプロトタイプのためにデータベースサーバーを立てるのは大げさに感じられます。必要なのは、オーバーヘッドなしで構造化データを確実に保存し、検索し、更新する方法だけです。

まさにこの問題を解決するのが SQLite です。SQLite は自己完結型でサーバーレスのデータベースエンジンで、すべてを 1 つのファイルに保存します。Python には標準ライブラリとして sqlite3 モジュールが含まれているため、追加インストールは不要です。フル SQL サポート、ACID トランザクション、最大 281 テラバイトのデータセットを扱う能力を、サーバープロセスの起動や設定ファイルの管理なしで利用できます。

このガイドでは、Python で SQLite を効果的に使うために必要なことをすべて解説します。最初のデータベース作成から、高度なクエリ、pandas 連携、パフォーマンス調整、そして実運用に近い完全なプロジェクトまでを扱います。

SQLite とは何か、なぜ使うのか?

SQLite は埋め込み型のリレーショナルデータベースエンジンです。PostgreSQL や MySQL と違い、別のサーバープロセスとして動作しません。データベース全体はディスク上の 1 つのファイル(またはメモリ)に存在します。Python 標準ライブラリの sqlite3 モジュールは、SQLite に対する DB-API 2.0 準拠のインターフェースを提供します。

SQLite の主な特徴:

  • ゼロ設定: サーバー設定不要、ユーザー管理不要、権限管理不要
  • サーバーレス: データベースエンジンはアプリケーションのプロセス内で動作
  • 単一ファイル: データベース全体(テーブル、インデックス、データ)が 1 つの .db ファイル
  • クロスプラットフォーム: データベースファイルは変換なしでどの OS でも利用可能
  • ACID 準拠: commit と rollback を備えた完全なトランザクションサポート
  • 軽量: ライブラリサイズは約 750 KB で、多くの画像より小さい

SQLite を使う場面

Use CaseSQLitePostgreSQL/MySQL
Desktop/mobile appsBest choiceOverkill
PrototypingBest choiceSlower to set up
Embedded devices / IoTBest choiceNot feasible
Unit testingBest choiceRequires test server
Data analysis scriptsGood choiceUnnecessary overhead
Web app with < 100K daily visitsWorks wellMore scalable
High-concurrency web appNot idealBest choice
Multiple write-heavy clientsNot idealBest choice

SQLite は、複数の同時書き込み接続を必要としない多くのアプリケーションに対応できます。Android と iOS は標準のローカルデータベースとして SQLite を使っています。Firefox、Chrome、macOS も内部で利用しています。世界で最も広く導入されているデータベースエンジンです。

データベースへの接続

sqlite3.connect() 関数はデータベースファイルへの接続を作成します。ファイルが存在しない場合は、SQLite が自動的に作成します。pathlib を使うと、プラットフォームに依存しない形でデータベースファイルのパスを構築できます。

import sqlite3
 
# Connect to a database file (creates it if it doesn't exist)
conn = sqlite3.connect("myapp.db")
 
# Create a cursor to execute SQL statements
cursor = conn.cursor()
 
# Always close the connection when done
conn.close()

インメモリデータベース

特別な文字列 :memory: を使うと、RAM 上にのみ存在するデータベースを作成できます。これはテストや一時的なデータ処理に最適です。

import sqlite3
 
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
 
cursor.execute("CREATE TABLE test (id INTEGER, value TEXT)")
cursor.execute("INSERT INTO test VALUES (1, 'hello')")
cursor.execute("SELECT * FROM test")
print(cursor.fetchone())  # (1, 'hello')
 
conn.close()  # Database is gone after this

コンテキストマネージャーの使用

推奨されるパターンは、コンテキストマネージャー(with ステートメント)を使って、エラーが発生しても接続が確実に閉じられるようにする方法です。SQLite の接続は、with ブロック内で成功時に自動 commit、例外時に自動 rollback も行います。

import sqlite3
 
# Connection as context manager handles commit/rollback
with sqlite3.connect("myapp.db") as conn:
    cursor = conn.cursor()
    cursor.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)")
    cursor.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
    # Auto-commits if no exception
    # Auto-rolls back if an exception occurs
 
# Note: the connection is NOT closed by 'with' -- it just commits/rolls back
# For full cleanup, close explicitly or use a wrapper:
conn.close()

commit/rollback と close の両方を扱う、きれいなヘルパー関数:

import sqlite3
from contextlib import contextmanager
 
@contextmanager
def get_db(db_path="myapp.db"):
    """Context manager that commits on success, rolls back on error, and always closes."""
    conn = sqlite3.connect(db_path)
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()
 
# Usage
with get_db() as conn:
    conn.execute("INSERT INTO users (name) VALUES (?)", ("Bob",))

テーブルの作成

CREATE TABLE を使ってスキーマを定義します。SQLite は 5 つのストレージクラスを持つ簡略化された型システムをサポートしています。

SQLite のデータ型

Storage ClassDescriptionPython Equivalent
NULLNull valueNone
INTEGERSigned integer (1, 2, 3, 4, 6, or 8 bytes)int
REALFloating point (8-byte IEEE float)float
TEXTUTF-8 or UTF-16 stringstr
BLOBBinary data, stored exactly as inputbytes

SQLite は動的型付けを採用しています。宣言された型に関係なく、任意の列に任意の型を保存できます。宣言された型は制約ではなくヒントです。これは PostgreSQL や MySQL とは異なります。

制約付きテーブルの作成

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
cursor.execute("""
    CREATE TABLE IF NOT EXISTS employees (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        email TEXT UNIQUE NOT NULL,
        department TEXT DEFAULT 'General',
        salary REAL CHECK(salary > 0),
        hire_date TEXT NOT NULL,
        is_active INTEGER DEFAULT 1
    )
""")
 
conn.commit()
conn.close()

主な制約:

  • PRIMARY KEY: 各行の一意な識別子。INTEGER PRIMARY KEY は内部 rowid のエイリアスです。
  • AUTOINCREMENT: rowid が必ず増加することを保証します(削除済み ID を再利用しません)。
  • NOT NULL: 列に NULL 値を入れられません。
  • UNIQUE: この列で同じ値を複数行に持てません。
  • DEFAULT: INSERT 時に値が指定されない場合の既定値。
  • CHECK: 条件に対してデータを検証します。

関連する複数テーブルの作成

import sqlite3
 
conn = sqlite3.connect("school.db")
cursor = conn.cursor()
 
# Enable foreign key enforcement (off by default in SQLite)
cursor.execute("PRAGMA foreign_keys = ON")
 
cursor.execute("""
    CREATE TABLE IF NOT EXISTS students (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        email TEXT UNIQUE
    )
""")
 
cursor.execute("""
    CREATE TABLE IF NOT EXISTS courses (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        title TEXT NOT NULL,
        credits INTEGER DEFAULT 3
    )
""")
 
cursor.execute("""
    CREATE TABLE IF NOT EXISTS enrollments (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        student_id INTEGER NOT NULL,
        course_id INTEGER NOT NULL,
        grade TEXT,
        enrolled_date TEXT DEFAULT CURRENT_DATE,
        FOREIGN KEY (student_id) REFERENCES students(id) ON DELETE CASCADE,
        FOREIGN KEY (course_id) REFERENCES courses(id) ON DELETE CASCADE,
        UNIQUE(student_id, course_id)
    )
""")
 
conn.commit()
conn.close()

CRUD 操作

CRUD は Create, Read, Update, Delete の略で、永続化ストレージに対する 4 つの基本操作です。

INSERT: データの追加

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Insert a single row
cursor.execute(
    "INSERT INTO employees (name, email, department, salary, hire_date) VALUES (?, ?, ?, ?, ?)",
    ("Alice Johnson", "alice@company.com", "Engineering", 95000, "2024-01-15")
)
 
# Insert multiple rows with executemany
employees = [
    ("Bob Smith", "bob@company.com", "Marketing", 72000, "2024-03-01"),
    ("Carol White", "carol@company.com", "Engineering", 98000, "2023-11-20"),
    ("David Brown", "david@company.com", "Sales", 68000, "2024-06-10"),
    ("Eve Davis", "eve@company.com", "Engineering", 105000, "2023-08-05"),
]
 
cursor.executemany(
    "INSERT INTO employees (name, email, department, salary, hire_date) VALUES (?, ?, ?, ?, ?)",
    employees
)
 
# Get the last inserted row ID
print(f"Last inserted ID: {cursor.lastrowid}")
 
# Get the number of rows affected
print(f"Rows inserted: {cursor.rowcount}")
 
conn.commit()
conn.close()

INSERT OR REPLACE(Upsert)

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Insert or update if email already exists
cursor.execute("""
    INSERT INTO employees (name, email, department, salary, hire_date)
    VALUES (?, ?, ?, ?, ?)
    ON CONFLICT(email) DO UPDATE SET
        name = excluded.name,
        salary = excluded.salary
""", ("Alice Johnson", "alice@company.com", "Engineering", 100000, "2024-01-15"))
 
conn.commit()
conn.close()

SELECT: データの読み取り

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Select all rows
cursor.execute("SELECT * FROM employees")
all_rows = cursor.fetchall()
for row in all_rows:
    print(row)  # Returns tuples by default
 
# Select specific columns with a condition
cursor.execute(
    "SELECT name, salary FROM employees WHERE department = ?",
    ("Engineering",)
)
engineers = cursor.fetchall()
for name, salary in engineers:
    print(f"{name}: ${salary:,.0f}")
 
# Select with ordering and limit
cursor.execute("""
    SELECT name, salary FROM employees
    ORDER BY salary DESC
    LIMIT 3
""")
top_earners = cursor.fetchall()
print("Top 3 earners:", top_earners)
 
conn.close()

結果を辞書として取得する

デフォルトでは fetchall() はタプルを返します。辞書(列名をキー)として取得するには、sqlite3.Row を使います。

import sqlite3
 
conn = sqlite3.connect("myapp.db")
conn.row_factory = sqlite3.Row  # Set row factory
 
cursor = conn.cursor()
cursor.execute("SELECT * FROM employees WHERE department = ?", ("Engineering",))
 
for row in cursor.fetchall():
    print(dict(row))  # Convert Row to dict
    print(row["name"], row["salary"])  # Access by column name
 
conn.close()

UPDATE: データの変更

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Update a single field
cursor.execute(
    "UPDATE employees SET salary = ? WHERE email = ?",
    (110000, "alice@company.com")
)
print(f"Rows updated: {cursor.rowcount}")
 
# Update with a calculation
cursor.execute("""
    UPDATE employees
    SET salary = salary * 1.10
    WHERE department = 'Engineering' AND salary < 100000
""")
print(f"Engineers with raises: {cursor.rowcount}")
 
# Conditional update
cursor.execute("""
    UPDATE employees
    SET is_active = 0
    WHERE hire_date < '2023-01-01'
""")
 
conn.commit()
conn.close()

DELETE: データの削除

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Delete specific rows
cursor.execute("DELETE FROM employees WHERE is_active = 0")
print(f"Rows deleted: {cursor.rowcount}")
 
# Delete with a subquery
cursor.execute("""
    DELETE FROM enrollments
    WHERE student_id NOT IN (SELECT id FROM students)
""")
 
# Delete all rows (truncate equivalent)
cursor.execute("DELETE FROM employees")
 
conn.commit()
conn.close()

パラメータ化クエリと SQL インジェクション対策

文字列フォーマットや f-string で SQL クエリを組み立ててはいけません。SQL インジェクション攻撃を防ぐため、常にパラメータ化クエリを使ってください。

問題: SQL インジェクション

import sqlite3
 
# DANGEROUS: Never do this
user_input = "'; DROP TABLE employees; --"
query = f"SELECT * FROM employees WHERE name = '{user_input}'"
# This would execute: SELECT * FROM employees WHERE name = ''; DROP TABLE employees; --'

解決策: パラメータ化クエリ

SQLite3 は 2 種類のプレースホルダースタイルをサポートしています。

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Style 1: Question mark placeholders (positional)
cursor.execute(
    "SELECT * FROM employees WHERE department = ? AND salary > ?",
    ("Engineering", 90000)
)
 
# Style 2: Named placeholders
cursor.execute(
    "SELECT * FROM employees WHERE department = :dept AND salary > :min_salary",
    {"dept": "Engineering", "min_salary": 90000}
)
 
# Named placeholders are clearer with many parameters
cursor.execute("""
    INSERT INTO employees (name, email, department, salary, hire_date)
    VALUES (:name, :email, :dept, :salary, :date)
""", {
    "name": "Frank Green",
    "email": "frank@company.com",
    "dept": "Engineering",
    "salary": 92000,
    "date": "2025-09-01"
})
 
conn.commit()
conn.close()

パラメータ化クエリは、エスケープと引用符の処理を自動的に行います。データベースエンジンは、パラメータ値を SQL コードではなくデータとして扱います。これにより、入力内容に関係なく SQL インジェクションを排除できます。

動的な WHERE 句

フィルターが任意のときなど、クエリを動的に組み立てる必要がある場合があります。以下は安全なパターンです。

import sqlite3
 
def search_employees(department=None, min_salary=None, is_active=None):
    conn = sqlite3.connect("myapp.db")
    cursor = conn.cursor()
 
    conditions = []
    params = []
 
    if department is not None:
        conditions.append("department = ?")
        params.append(department)
 
    if min_salary is not None:
        conditions.append("salary >= ?")
        params.append(min_salary)
 
    if is_active is not None:
        conditions.append("is_active = ?")
        params.append(is_active)
 
    query = "SELECT * FROM employees"
    if conditions:
        query += " WHERE " + " AND ".join(conditions)
 
    cursor.execute(query, params)
    results = cursor.fetchall()
    conn.close()
    return results
 
# Usage
engineers = search_employees(department="Engineering", min_salary=90000)
active_staff = search_employees(is_active=1)
everyone = search_employees()  # No filters

トランザクションの扱い

トランザクションは複数の SQL 操作を 1 つの原子的な単位にまとめます。すべて成功すれば commit、どれかが失敗すれば rollback です。

手動のトランザクション制御

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
try:
    # Transfer money between accounts
    cursor.execute("UPDATE accounts SET balance = balance - 500 WHERE id = 1")
    cursor.execute("UPDATE accounts SET balance = balance + 500 WHERE id = 2")
 
    # Verify no negative balance
    cursor.execute("SELECT balance FROM accounts WHERE id = 1")
    balance = cursor.fetchone()[0]
    if balance < 0:
        raise ValueError("Insufficient funds")
 
    conn.commit()  # All good, save changes
    print("Transfer complete")
 
except Exception as e:
    conn.rollback()  # Something went wrong, undo everything
    print(f"Transfer failed: {e}")
 
finally:
    conn.close()

Autocommit と Isolation Level

デフォルトでは、sqlite3 は「deferred transaction」モードで動作します。挙動は変更できます。

import sqlite3
 
# Default behavior: transactions are deferred
conn = sqlite3.connect("myapp.db")
 
# Autocommit mode (each statement is its own transaction)
conn = sqlite3.connect("myapp.db", isolation_level=None)
 
# Immediate locking (prevents concurrent write conflicts)
conn = sqlite3.connect("myapp.db")
conn.execute("BEGIN IMMEDIATE")
# ... your operations ...
conn.commit()

部分的な rollback のための savepoint

savepoint を使うと、トランザクション全体を取り消さずに一部だけ rollback できます。

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
try:
    cursor.execute("INSERT INTO orders (customer_id, total) VALUES (1, 100)")
 
    # Start a savepoint
    cursor.execute("SAVEPOINT order_items")
 
    try:
        cursor.execute("INSERT INTO order_items (order_id, product, qty) VALUES (1, 'Widget', 5)")
        cursor.execute("INSERT INTO order_items (order_id, product, qty) VALUES (1, 'Gadget', -1)")
        # This might fail due to CHECK constraint
        cursor.execute("RELEASE SAVEPOINT order_items")
    except sqlite3.IntegrityError:
        cursor.execute("ROLLBACK TO SAVEPOINT order_items")
        print("Some items failed, but order was still created")
 
    conn.commit()
except Exception:
    conn.rollback()
finally:
    conn.close()

データの取得: fetchone, fetchall, fetchmany

cursor オブジェクトは、クエリ結果を取得するための 3 つのメソッドを提供します。

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
cursor.execute("SELECT * FROM employees ORDER BY salary DESC")
 
# fetchone: Get the next single row (or None if no more rows)
top_earner = cursor.fetchone()
print(f"Top earner: {top_earner}")
 
# fetchmany: Get the next N rows
next_three = cursor.fetchmany(3)
print(f"Next 3: {next_three}")
 
# fetchall: Get all remaining rows
remaining = cursor.fetchall()
print(f"Remaining: {len(remaining)} rows")
 
conn.close()

結果を効率的に反復する

大きな結果セットでは、すべてをメモリに読み込む代わりに cursor を直接反復するのが有効です。

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
cursor.execute("SELECT name, salary FROM employees")
 
# Memory-efficient: processes one row at a time
for name, salary in cursor:
    print(f"{name}: ${salary:,.0f}")
 
conn.close()

バッチ処理での fetchmany

何百万行も処理する場合は、fetchmany() をループで使うことでメモリ使用量を制御できます。

import sqlite3
 
conn = sqlite3.connect("large_data.db")
cursor = conn.cursor()
 
cursor.execute("SELECT * FROM sensor_readings")
 
batch_size = 1000
while True:
    rows = cursor.fetchmany(batch_size)
    if not rows:
        break
    # Process batch
    for row in rows:
        pass  # your processing logic
 
conn.close()

高度なクエリ

JOIN

import sqlite3
 
conn = sqlite3.connect("school.db")
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
 
# INNER JOIN: Students with their enrolled courses
cursor.execute("""
    SELECT s.name AS student, c.title AS course, e.grade
    FROM enrollments e
    INNER JOIN students s ON e.student_id = s.id
    INNER JOIN courses c ON e.course_id = c.id
    ORDER BY s.name, c.title
""")
 
for row in cursor.fetchall():
    print(f"{row['student']} - {row['course']}: {row['grade'] or 'In Progress'}")
 
# LEFT JOIN: All students, even those not enrolled
cursor.execute("""
    SELECT s.name, COUNT(e.id) AS course_count
    FROM students s
    LEFT JOIN enrollments e ON s.id = e.student_id
    GROUP BY s.id
    ORDER BY course_count DESC
""")
 
for row in cursor.fetchall():
    print(f"{row['name']}: {row['course_count']} courses")
 
conn.close()

集計

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Department statistics
cursor.execute("""
    SELECT
        department,
        COUNT(*) AS headcount,
        ROUND(AVG(salary), 2) AS avg_salary,
        MIN(salary) AS min_salary,
        MAX(salary) AS max_salary,
        SUM(salary) AS total_payroll
    FROM employees
    WHERE is_active = 1
    GROUP BY department
    HAVING COUNT(*) >= 2
    ORDER BY avg_salary DESC
""")
 
print(f"{'Department':<15} {'Count':<8} {'Avg Salary':<12} {'Min':<10} {'Max':<10}")
print("-" * 55)
for row in cursor.fetchall():
    dept, count, avg_sal, min_sal, max_sal, total = row
    print(f"{dept:<15} {count:<8} ${avg_sal:<11,.0f} ${min_sal:<9,.0f} ${max_sal:<9,.0f}")
 
conn.close()

サブクエリ

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Employees earning above their department average
cursor.execute("""
    SELECT name, department, salary
    FROM employees e
    WHERE salary > (
        SELECT AVG(salary) FROM employees
        WHERE department = e.department
    )
    ORDER BY department, salary DESC
""")
 
for name, dept, salary in cursor.fetchall():
    print(f"{name} ({dept}): ${salary:,.0f}")
 
# Most recent hire in each department
cursor.execute("""
    SELECT name, department, hire_date
    FROM employees
    WHERE (department, hire_date) IN (
        SELECT department, MAX(hire_date)
        FROM employees
        GROUP BY department
    )
""")
 
for row in cursor.fetchall():
    print(row)
 
conn.close()

ウィンドウ関数(SQLite 3.25+)

SQLite は、累計、ランキング、その他の分析クエリのためのウィンドウ関数をサポートしています。

import sqlite3
 
conn = sqlite3.connect("myapp.db")
cursor = conn.cursor()
 
# Rank employees by salary within each department
cursor.execute("""
    SELECT
        name,
        department,
        salary,
        RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS dept_rank,
        RANK() OVER (ORDER BY salary DESC) AS overall_rank
    FROM employees
    WHERE is_active = 1
""")
 
for name, dept, salary, dept_rank, overall_rank in cursor.fetchall():
    print(f"#{overall_rank} overall, #{dept_rank} in {dept}: {name} (${salary:,.0f})")
 
# Running total of payroll by hire date
cursor.execute("""
    SELECT
        name,
        hire_date,
        salary,
        SUM(salary) OVER (ORDER BY hire_date) AS running_total
    FROM employees
    ORDER BY hire_date
""")
 
for name, date, salary, running in cursor.fetchall():
    print(f"{date} | {name}: ${salary:,.0f} | Running total: ${running:,.0f}")
 
conn.close()

pandas で SQLite を使う

pandas には read_sql()to_sql() による SQLite サポートが組み込まれています。これにより、DataFrame と SQLite データベース間でデータを簡単に移動できます。

SQL 結果を DataFrame に読み込む

import sqlite3
import pandas as pd
 
conn = sqlite3.connect("myapp.db")
 
# Read a SQL query into a DataFrame
df = pd.read_sql("SELECT * FROM employees WHERE is_active = 1", conn)
print(df.head())
print(df.describe())
 
# Parameterized query with pandas
df = pd.read_sql(
    "SELECT * FROM employees WHERE department = ? AND salary > ?",
    conn,
    params=("Engineering", 90000)
)
 
# Read an entire table
df = pd.read_sql("SELECT * FROM employees", conn, index_col="id")
 
conn.close()

DataFrame を SQLite に書き込む

import sqlite3
import pandas as pd
 
# Create a sample DataFrame
df = pd.DataFrame({
    "product": ["Laptop", "Phone", "Tablet", "Monitor", "Keyboard"],
    "price": [999.99, 699.99, 449.99, 349.99, 79.99],
    "stock": [50, 200, 75, 30, 500],
    "category": ["Electronics", "Electronics", "Electronics", "Peripherals", "Peripherals"]
})
 
conn = sqlite3.connect("store.db")
 
# Write DataFrame to a new table
df.to_sql("products", conn, if_exists="replace", index=False)
 
# Append to an existing table
new_products = pd.DataFrame({
    "product": ["Mouse", "Webcam"],
    "price": [49.99, 89.99],
    "stock": [300, 100],
    "category": ["Peripherals", "Peripherals"]
})
new_products.to_sql("products", conn, if_exists="append", index=False)
 
# Verify
result = pd.read_sql("SELECT * FROM products", conn)
print(result)
 
conn.close()

Round-Trip ワークフロー: CSV から SQLite、そして分析へ

よくあるワークフローは、CSV ファイル(pandas read_csv を参照)から SQLite にデータを読み込み、SQL ベースで分析し、その結果をエクスポートする流れです。

import sqlite3
import pandas as pd
 
# Step 1: Load CSV into SQLite
df = pd.read_csv("sales_data.csv")
conn = sqlite3.connect("analytics.db")
df.to_sql("sales", conn, if_exists="replace", index=False)
 
# Step 2: Run SQL analytics
summary = pd.read_sql("""
    SELECT
        region,
        strftime('%Y-%m', sale_date) AS month,
        COUNT(*) AS transactions,
        SUM(amount) AS revenue,
        AVG(amount) AS avg_transaction
    FROM sales
    GROUP BY region, month
    ORDER BY month, revenue DESC
""", conn)
 
print(summary)
 
# Step 3: Export results
summary.to_csv("monthly_summary.csv", index=False)
 
conn.close()

クエリ結果を DataFrame にしたら、PyGWalker (opens in a new tab) を使って直接可視化できます。PyGWalker は任意の pandas DataFrame を Tableau のような対話的可視化インターフェースに変換し、プロットコードを書かずにドラッグ&ドロップでチャートを作成できます。

import pygwalker as pyg
 
# Turn your SQL query results into an interactive visualization
walker = pyg.walk(summary)

よりスムーズなワークフローには、RunCell (opens in a new tab) が便利です。AI 支援の Jupyter 環境で、SQL クエリの作成、AI を使ったデバッグ、結果の即時可視化を、すべて 1 つのノートブック内で行えます。

エラーハンドリング

sqlite3 モジュールは複数の例外型を定義しています。特定の例外を捕捉すると、より良いエラーメッセージと復旧戦略につながります。

import sqlite3
 
def safe_insert(db_path, name, email, salary):
    """Insert an employee with proper error handling."""
    try:
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        cursor.execute(
            "INSERT INTO employees (name, email, salary, hire_date) VALUES (?, ?, ?, date('now'))",
            (name, email, salary)
        )
        conn.commit()
        return cursor.lastrowid
 
    except sqlite3.IntegrityError as e:
        # Unique constraint violated (duplicate email, NOT NULL violated, etc.)
        print(f"Data integrity error: {e}")
        return None
 
    except sqlite3.OperationalError as e:
        # Table doesn't exist, SQL syntax error, database locked, etc.
        print(f"Operational error: {e}")
        return None
 
    except sqlite3.DatabaseError as e:
        # Corrupted database, disk full, etc.
        print(f"Database error: {e}")
        return None
 
    except sqlite3.ProgrammingError as e:
        # Using a closed cursor, wrong number of parameters, etc.
        print(f"Programming error: {e}")
        return None
 
    finally:
        if conn:
            conn.close()

SQLite3 の例外階層

ExceptionParentCommon Causes
sqlite3.WarningExceptionNon-fatal issues
sqlite3.ErrorExceptionBase class for all sqlite3 errors
sqlite3.InterfaceErrorErrorMisuse of the DB-API interface
sqlite3.DatabaseErrorErrorDatabase-related errors
sqlite3.DataErrorDatabaseErrorData processing issues
sqlite3.OperationalErrorDatabaseErrorDatabase locked, table missing, SQL error
sqlite3.IntegrityErrorDatabaseErrorUNIQUE, NOT NULL, FOREIGN KEY violations
sqlite3.InternalErrorDatabaseErrorInternal sqlite3 module error
sqlite3.ProgrammingErrorDatabaseErrorIncorrect API usage
sqlite3.NotSupportedErrorDatabaseErrorUnsupported feature

パフォーマンスのヒント

WAL モードを有効にする

Write-Ahead Logging(WAL)モードは、書き込み中でも同時読み取りを可能にし、書き込み性能を向上させます。

import sqlite3
 
conn = sqlite3.connect("myapp.db")
conn.execute("PRAGMA journal_mode=WAL")
# Returns 'wal' if successful

WAL モードは、同じデータベースにアクセスする読み手(たとえば Web サーバー)と書き手(たとえばバックグラウンドジョブ)がいる場合に特に有効です。

一括挿入には executemany を使う

executemany は、Python と SQLite の往復回数を減らせるため、execute をループで呼ぶより大幅に高速です。

import sqlite3
import time
 
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE numbers (value INTEGER)")
 
data = [(i,) for i in range(100_000)]
 
# Slow: individual inserts
start = time.perf_counter()
for d in data:
    conn.execute("INSERT INTO numbers VALUES (?)", d)
conn.commit()
slow_time = time.perf_counter() - start
 
conn.execute("DELETE FROM numbers")
 
# Fast: executemany
start = time.perf_counter()
conn.executemany("INSERT INTO numbers VALUES (?)", data)
conn.commit()
fast_time = time.perf_counter() - start
 
print(f"Individual inserts: {slow_time:.2f}s")
print(f"executemany:        {fast_time:.2f}s")
print(f"Speedup:            {slow_time / fast_time:.1f}x")
 
conn.close()

頻繁に検索する列にインデックスを作成する

インデックスは SELECT を大幅に高速化しますが、その代わりに書き込みが少し遅くなり、ディスク容量を多く使います。

import sqlite3
 
conn = sqlite3.connect("myapp.db")
 
# Single column index
conn.execute("CREATE INDEX IF NOT EXISTS idx_employees_department ON employees(department)")
 
# Composite index (for queries that filter on both columns)
conn.execute("CREATE INDEX IF NOT EXISTS idx_employees_dept_salary ON employees(department, salary)")
 
# Unique index (also enforces uniqueness)
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_employees_email ON employees(email)")
 
# Check existing indexes
cursor = conn.execute("SELECT name, sql FROM sqlite_master WHERE type='index'")
for name, sql in cursor.fetchall():
    print(f"{name}: {sql}")
 
conn.close()

EXPLAIN QUERY PLAN で性能を確認する

import sqlite3
 
conn = sqlite3.connect("myapp.db")
 
# Check how SQLite executes a query
result = conn.execute("""
    EXPLAIN QUERY PLAN
    SELECT * FROM employees WHERE department = 'Engineering' AND salary > 90000
""").fetchall()
 
for row in result:
    print(row)
# Output shows whether an index is used or a full table scan occurs
 
conn.close()

パフォーマンス向上のための追加 PRAGMA

import sqlite3
 
conn = sqlite3.connect("myapp.db")
 
# Increase cache size (default is 2000 pages = ~8MB)
conn.execute("PRAGMA cache_size = -64000")  # 64MB (negative = KB)
 
# Reduce disk syncs for speed (less durable but much faster)
conn.execute("PRAGMA synchronous = NORMAL")  # Options: OFF, NORMAL, FULL (default)
 
# Store temp tables in memory
conn.execute("PRAGMA temp_store = MEMORY")
 
# Check database integrity
result = conn.execute("PRAGMA integrity_check").fetchone()
print(f"Integrity: {result[0]}")  # Should print 'ok'
 
conn.close()

実践例: タスク管理 CLI

以下は、このガイドで扱った概念をすべて示す完全なタスク管理アプリケーションです。SQLite を永続ストレージとして使い、優先度、タグ、期限をサポートします。

import sqlite3
import sys
from datetime import datetime, date
from contextlib import contextmanager
 
DB_PATH = "tasks.db"
 
@contextmanager
def get_db():
    """Get a database connection with Row factory and foreign keys enabled."""
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    conn.execute("PRAGMA foreign_keys = ON")
    conn.execute("PRAGMA journal_mode = WAL")
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()
 
def init_db():
    """Create tables if they don't exist."""
    with get_db() as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS tasks (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                title TEXT NOT NULL,
                description TEXT DEFAULT '',
                priority INTEGER DEFAULT 2 CHECK(priority BETWEEN 1 AND 4),
                status TEXT DEFAULT 'todo' CHECK(status IN ('todo', 'in_progress', 'done')),
                due_date TEXT,
                created_at TEXT DEFAULT CURRENT_TIMESTAMP,
                completed_at TEXT
            )
        """)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS tags (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT UNIQUE NOT NULL
            )
        """)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS task_tags (
                task_id INTEGER NOT NULL,
                tag_id INTEGER NOT NULL,
                PRIMARY KEY (task_id, tag_id),
                FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE,
                FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
            )
        """)
        conn.execute("CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)")
        conn.execute("CREATE INDEX IF NOT EXISTS idx_tasks_priority ON tasks(priority)")
        conn.execute("CREATE INDEX IF NOT EXISTS idx_tasks_due ON tasks(due_date)")
 
def add_task(title, description="", priority=2, due_date=None, tags=None):
    """Add a new task with optional tags."""
    with get_db() as conn:
        cursor = conn.execute(
            "INSERT INTO tasks (title, description, priority, due_date) VALUES (?, ?, ?, ?)",
            (title, description, priority, due_date)
        )
        task_id = cursor.lastrowid
 
        if tags:
            for tag_name in tags:
                conn.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag_name,))
                tag_id = conn.execute("SELECT id FROM tags WHERE name = ?", (tag_name,)).fetchone()["id"]
                conn.execute("INSERT INTO task_tags (task_id, tag_id) VALUES (?, ?)", (task_id, tag_id))
 
        print(f"Task #{task_id} created: {title}")
        return task_id
 
def list_tasks(status=None, priority=None, tag=None):
    """List tasks with optional filters."""
    with get_db() as conn:
        conditions = []
        params = []
 
        if status:
            conditions.append("t.status = ?")
            params.append(status)
        if priority:
            conditions.append("t.priority = ?")
            params.append(priority)
        if tag:
            conditions.append("t.id IN (SELECT task_id FROM task_tags tt JOIN tags tg ON tt.tag_id = tg.id WHERE tg.name = ?)")
            params.append(tag)
 
        query = """
            SELECT t.id, t.title, t.priority, t.status, t.due_date,
                   GROUP_CONCAT(tg.name, ', ') AS tags
            FROM tasks t
            LEFT JOIN task_tags tt ON t.id = tt.task_id
            LEFT JOIN tags tg ON tt.tag_id = tg.id
        """
        if conditions:
            query += " WHERE " + " AND ".join(conditions)
        query += " GROUP BY t.id ORDER BY t.priority ASC, t.due_date ASC"
 
        rows = conn.execute(query, params).fetchall()
 
        priority_labels = {1: "URGENT", 2: "HIGH", 3: "MEDIUM", 4: "LOW"}
        print(f"\n{'ID':<5} {'Priority':<10} {'Status':<13} {'Due':<12} {'Title':<30} {'Tags'}")
        print("-" * 85)
        for row in rows:
            due = row["due_date"] or "—"
            tags_str = row["tags"] or "—"
            p_label = priority_labels.get(row["priority"], "?")
            print(f"{row['id']:<5} {p_label:<10} {row['status']:<13} {due:<12} {row['title']:<30} {tags_str}")
 
        print(f"\nTotal: {len(rows)} tasks")
 
def complete_task(task_id):
    """Mark a task as done."""
    with get_db() as conn:
        result = conn.execute(
            "UPDATE tasks SET status = 'done', completed_at = ? WHERE id = ?",
            (datetime.now().isoformat(), task_id)
        )
        if result.rowcount:
            print(f"Task #{task_id} marked as done.")
        else:
            print(f"Task #{task_id} not found.")
 
def stats():
    """Show task statistics."""
    with get_db() as conn:
        row = conn.execute("""
            SELECT
                COUNT(*) AS total,
                SUM(CASE WHEN status = 'todo' THEN 1 ELSE 0 END) AS todo,
                SUM(CASE WHEN status = 'in_progress' THEN 1 ELSE 0 END) AS in_progress,
                SUM(CASE WHEN status = 'done' THEN 1 ELSE 0 END) AS done,
                SUM(CASE WHEN due_date < date('now') AND status != 'done' THEN 1 ELSE 0 END) AS overdue
            FROM tasks
        """).fetchone()
 
        print(f"\nTask Statistics:")
        print(f"  Total:       {row['total']}")
        print(f"  To Do:       {row['todo']}")
        print(f"  In Progress: {row['in_progress']}")
        print(f"  Done:        {row['done']}")
        print(f"  Overdue:     {row['overdue']}")
 
# Initialize and demo
if __name__ == "__main__":
    init_db()
 
    # Add sample tasks
    add_task("Design database schema", priority=1, due_date="2026-02-20", tags=["backend", "database"])
    add_task("Write API endpoints", priority=2, due_date="2026-02-25", tags=["backend", "api"])
    add_task("Create unit tests", priority=3, tags=["testing"])
    add_task("Update documentation", priority=4, due_date="2026-03-01", tags=["docs"])
 
    # List and stats
    list_tasks()
    stats()
    complete_task(1)
    list_tasks(status="done")

SQLite vs PostgreSQL vs MySQL vs MongoDB

FeatureSQLitePostgreSQLMySQLMongoDB
TypeEmbeddedClient-serverClient-serverDocument store
SetupZero configInstall + configureInstall + configureInstall + configure
StorageSingle fileServer directoryServer directoryServer directory
Max DB size281 TBUnlimited256 TBUnlimited
Concurrent writesLimited (file lock)Excellent (MVCC)Good (row locks)Good (document locks)
SQL supportMost of SQL92Full SQL + extensionsFull SQLMQL (not SQL)
Data types5 storage classes40+ types30+ typesBSON types
Full-text searchFTS5 extensionBuilt-in tsvectorBuilt-in FULLTEXTBuilt-in text index
JSON supportJSON1 extensionNative JSONBNative JSONNative (it is JSON)
ReplicationNot built-inStreaming + logicalPrimary-replicaReplica sets
Best forEmbedded, local, prototypesWeb apps, analyticsWeb apps, CMSFlexible schema, documents
Python librarysqlite3 (built-in)psycopg2mysql-connectorpymongo
Hosting requiredNoYesYesYes
Cost to startFree, zero overheadFree, but need serverFree, but need serverFree, but need server

判断はシンプルです。アプリケーションがデータベースへの唯一の書き込みプロセスであるなら SQLite を使います。複数ユーザーの同時書き込み、レプリケーション、ネットワークアクセスが必要なら PostgreSQL や MySQL を使います。データが文書指向で、リレーショナルスキーマにうまく収まらない場合は MongoDB を使います。

よくある質問

SQLite は本番利用に適していますか?

はい。SQLite は Android、iOS、主要な Web ブラウザー、Airbus のフライトソフトウェア、そして数え切れないほどのデスクトップアプリケーションで本番利用されています。最大 281 テラバイト、数百万行を扱えます。高い同時書き込みがある Web アプリケーションには向きませんが、それ以外の多くのシナリオでは十分に機能します。

Python が使っている SQLite のバージョンを確認するには?

sqlite3.sqlite_version で SQLite ライブラリのバージョンを、sqlite3.version で Python モジュールのバージョンを確認できます。たとえば import sqlite3; print(sqlite3.sqlite_version) のようにすると、3.45.1 などが出力されます。

複数プロセスで同じ SQLite データベースを読み取れますか?

はい。複数プロセスが同じ SQLite データベースを同時に読み取れます。書き込み操作はデータベースファイルに排他ロックを取得します。WAL モードを有効にすると、読み取りは書き込みをブロックせず、書き込みも読み取りをブロックしません。ただし、同時に動作できる書き手は 1 つだけです。

SQLite データベースを安全にバックアップするには?

sqlite3 の backup API を使います: source.backup(dest)。書き込みが行われていないときであれば、データベースファイルを安全にコピーすることもできます。書き込みトランザクションが進行中にファイルをコピーしてはいけません。バックアップが破損する可能性があります。

import sqlite3
 
source = sqlite3.connect("myapp.db")
backup = sqlite3.connect("myapp_backup.db")
source.backup(backup)
backup.close()
source.close()

SQLite で日付と時刻を保存するには?

SQLite にはネイティブの DATE や DATETIME 型はありません。日付は ISO 8601 形式の TEXT として保存してください('2026-02-18''2026-02-18T14:30:00')。SQLite の組み込み日付関数(date()time()datetime()strftime())はこの形式で動作します。代わりに、Unix タイムスタンプを INTEGER 値として保存する方法もあります。

SQLite は暗号化をサポートしていますか?

標準のオープンソース SQLite には暗号化機能は含まれていません。SQLite Encryption Extension(SEE)は有償の商用製品です。無料の代替としては、SQLCipher(オープンソース)や Python 連携用の pysqlcipher3 があります。

Python sqlite3 で SQL インジェクションを防ぐには?

常に ? または :name プレースホルダーを使ったパラメータ化クエリを使用してください。f-string、.format()% 文字列フォーマットでユーザー入力から SQL 文字列を構築してはいけません。sqlite3 モジュールは、パラメータを別々に渡すとエスケープと引用を自動的に処理します。

結論

Python の sqlite3 モジュールを使えば、セットアップ不要で高機能なリレーショナルデータベースを利用できます。ローカルアプリケーション、プロトタイプ、データ分析スクリプト、テスト用途では、SQLite は非常に優れています。覚えておくべき重要なパターンは、SQL インジェクション防止のために常にパラメータ化クエリを使うこと、接続管理にはコンテキストマネージャーを使うこと、同時実行性を高めるために WAL モードを有効にすること、一括操作には executemany を使うことです。

データ分析のワークフローでは、SQLite と pandas の組み合わせが強力です。to_sql() でデータを読み込み、read_sql() でクエリし、PyGWalker (opens in a new tab) で結果を可視化できます。Python コード内でデータベースレコードを構造化するには、dataclasses を使って、テーブル行にきれいに対応する型付きデータモデルを作ることを検討してください。プロジェクトが SQLite の同時実行制限を超えるようになったら、DB-API 2.0 インターフェースが各データベースドライバーで共通しているため、PostgreSQL への移行も通常は最小限のコード変更で済みます。

関連ガイド

📚