Python Threading: Complete Guide to Multithreading with Examples
Updated on
Your Python program makes 50 API calls, one after another. Each call takes 200 milliseconds of waiting. The math is brutal: 10 seconds of your program's life wasted staring at network responses. Your CPU sits idle at near-zero utilization while your script crawls through I/O-bound operations that could run simultaneously.
This problem compounds fast. Web scrapers that fetch thousands of pages sequentially. File processing scripts that read and write one file at a time. Database queries that block the entire application while waiting for results. Every second of idle waiting is a second your program could be doing useful work.
Python's threading module solves this by running multiple operations concurrently within a single process. Threads share memory, start quickly, and excel at I/O-bound workloads where the program spends most of its time waiting. This guide covers everything from basic thread creation to advanced synchronization patterns, with production-ready code examples you can use immediately.
What is Threading in Python?
Threading allows a program to run multiple operations concurrently within the same process. Each thread shares the same memory space, making communication between threads fast and straightforward.
Python's threading module provides a high-level interface for creating and managing threads. But there is an important caveat: the Global Interpreter Lock (GIL).
The Global Interpreter Lock (GIL)
The GIL is a mutex in CPython that allows only one thread to execute Python bytecode at a time. This means threads cannot achieve true parallelism for CPU-bound operations. However, the GIL releases during I/O operations (network calls, file reads, database queries), allowing other threads to run while one waits for I/O.
import threading
import time
def cpu_bound(n):
"""CPU-bound: GIL prevents parallel execution"""
total = 0
for i in range(n):
total += i * i
return total
def io_bound(url):
"""I/O-bound: GIL releases during network wait"""
import urllib.request
return urllib.request.urlopen(url).read()
# CPU-bound: 4 threads run one-at-a-time (no speedup)
start = time.time()
threads = [threading.Thread(target=cpu_bound, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"CPU-bound with threads: {time.time() - start:.2f}s")
# I/O-bound: 4 threads overlap their waiting time (big speedup)This means threading is ideal for I/O-bound tasks but not for CPU-heavy computation. For CPU-bound work, use the multiprocessing module instead.
When to Use Threading vs Multiprocessing vs Asyncio
| Feature | threading | multiprocessing | asyncio |
|---|---|---|---|
| Best for | I/O-bound tasks | CPU-bound tasks | High-concurrency I/O |
| Parallelism | Concurrent (GIL limited) | True parallel | Concurrent (single thread) |
| Memory | Shared (lightweight) | Separate per process | Shared (lightweight) |
| Startup cost | Low (~1ms) | High (~50-100ms) | Very low |
| Communication | Direct memory access | Pipes, Queues, shared memory | Awaitable coroutines |
| Scalability | 10s-100s of threads | Limited by CPU cores | 1000s of coroutines |
| Complexity | Medium (locking needed) | Medium (serialization) | High (async/await syntax) |
| Use case | Web scraping, file I/O, API calls | Data processing, ML training | Web servers, chat apps |
Rule of thumb: If your program waits on network or disk, use threading. If it crunches numbers, use multiprocessing. If you need thousands of concurrent connections, use asyncio.
Thread Basics: Creating and Running Threads
The threading.Thread Class
The simplest way to create a thread is by passing a target function to threading.Thread:
import threading
import time
def download_file(filename):
print(f"[{threading.current_thread().name}] Downloading {filename}...")
time.sleep(2) # Simulate download
print(f"[{threading.current_thread().name}] Finished {filename}")
# Create threads
t1 = threading.Thread(target=download_file, args=("data.csv",))
t2 = threading.Thread(target=download_file, args=("report.pdf",))
# Start threads
t1.start()
t2.start()
# Wait for both to finish
t1.join()
t2.join()
print("All downloads complete")Both downloads run concurrently, finishing in roughly 2 seconds instead of 4.
start() and join()
start()begins thread execution. A thread can only be started once.join(timeout=None)blocks the calling thread until the target thread finishes. Pass atimeoutin seconds to avoid waiting forever.
import threading
import time
def slow_task():
time.sleep(10)
t = threading.Thread(target=slow_task)
t.start()
# Wait at most 3 seconds
t.join(timeout=3)
if t.is_alive():
print("Thread still running after 3 seconds")
else:
print("Thread finished")Naming Threads
Named threads make debugging easier:
import threading
def worker():
name = threading.current_thread().name
print(f"Running in thread: {name}")
t = threading.Thread(target=worker, name="DataProcessor")
t.start()
t.join()Daemon Threads
Daemon threads are background threads that automatically terminate when the main program exits. Non-daemon threads keep the program alive until they finish.
import threading
import time
def background_monitor():
while True:
print("Monitoring system health...")
time.sleep(5)
# Daemon thread: dies when main program exits
monitor = threading.Thread(target=background_monitor, daemon=True)
monitor.start()
# Main program does its work
time.sleep(12)
print("Main program exiting")
# monitor thread is killed automaticallyUse daemon threads for background logging, monitoring, or cleanup tasks that should not prevent program exit.
Subclassing Thread
For more complex thread behavior, subclass threading.Thread:
import threading
import time
class FileProcessor(threading.Thread):
def __init__(self, filepath):
super().__init__()
self.filepath = filepath
self.result = None
def run(self):
"""Override run() with thread logic"""
print(f"Processing {self.filepath}")
time.sleep(1) # Simulate work
self.result = f"Processed: {self.filepath}"
# Create and run
processor = FileProcessor("/data/report.csv")
processor.start()
processor.join()
print(processor.result)Passing Arguments to Threads
Using args and kwargs
Pass positional arguments with args (a tuple) and keyword arguments with kwargs (a dict):
import threading
def fetch_data(url, timeout, retries=3, verbose=False):
print(f"Fetching {url} (timeout={timeout}s, retries={retries}, verbose={verbose})")
# Positional args as tuple
t1 = threading.Thread(target=fetch_data, args=("https://api.example.com", 30))
# Keyword args as dict
t2 = threading.Thread(
target=fetch_data,
args=("https://api.example.com",),
kwargs={"timeout": 30, "retries": 5, "verbose": True}
)
t1.start()
t2.start()
t1.join()
t2.join()Common mistake: Forgetting the trailing comma in a single-element tuple. args=("hello",) is a tuple; args=("hello") is just a string in parentheses.
Collecting Results from Threads
Threads do not return values directly. Use shared data structures or a list to collect results:
import threading
results = {}
lock = threading.Lock()
def compute(task_id, value):
result = value ** 2
with lock:
results[task_id] = result
threads = []
for i in range(5):
t = threading.Thread(target=compute, args=(i, i * 10))
threads.append(t)
t.start()
for t in threads:
t.join()
print(results) # {0: 0, 1: 100, 2: 400, 3: 900, 4: 1600}A cleaner approach uses ThreadPoolExecutor (covered next), which handles result collection automatically.
ThreadPoolExecutor: The Modern Approach
The concurrent.futures module provides ThreadPoolExecutor, a high-level interface that manages a pool of worker threads. It handles thread creation, result collection, and exception propagation automatically.
Basic Usage with submit()
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def fetch_url(url):
time.sleep(1) # Simulate network request
return f"Content from {url}"
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
"https://example.com/page4",
"https://example.com/page5",
]
with ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks and get Future objects
future_to_url = {executor.submit(fetch_url, url): url for url in urls}
# Process results as they complete
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
print(f"{url}: {data}")
except Exception as e:
print(f"{url} generated an exception: {e}")Using map() for Ordered Results
executor.map() returns results in the same order as the input, similar to the built-in map():
from concurrent.futures import ThreadPoolExecutor
def process_item(item):
return item.upper()
items = ["apple", "banana", "cherry", "date"]
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_item, items))
print(results) # ['APPLE', 'BANANA', 'CHERRY', 'DATE']submit() vs map()
submit() | map() | |
|---|---|---|
| Returns | Future objects | Iterator of results |
| Result order | Completion order (with as_completed) | Input order |
| Error handling | Per-task via future.result() | Raises on first failure |
| Arguments | Single function call | Applies function to each item |
| Best for | Heterogeneous tasks, early results | Homogeneous batch processing |
Exception Handling with Futures
from concurrent.futures import ThreadPoolExecutor, as_completed
def risky_task(n):
if n == 3:
raise ValueError(f"Bad input: {n}")
return n * 10
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(risky_task, i): i for i in range(5)}
for future in as_completed(futures):
task_id = futures[future]
try:
result = future.result(timeout=5)
print(f"Task {task_id}: {result}")
except ValueError as e:
print(f"Task {task_id} failed: {e}")
except TimeoutError:
print(f"Task {task_id} timed out")Cancelling Tasks
from concurrent.futures import ThreadPoolExecutor
import time
def long_task(n):
time.sleep(5)
return n
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(long_task, i) for i in range(10)]
# Cancel pending tasks (already-running tasks cannot be cancelled)
for f in futures[4:]:
cancelled = f.cancel()
print(f"Cancelled: {cancelled}")Thread Synchronization Primitives
When multiple threads access shared data, you need synchronization to prevent race conditions.
Lock
A Lock ensures only one thread enters a critical section at a time:
import threading
class BankAccount:
def __init__(self, balance):
self.balance = balance
self.lock = threading.Lock()
def withdraw(self, amount):
with self.lock: # Only one thread at a time
if self.balance >= amount:
self.balance -= amount
return True
return False
def deposit(self, amount):
with self.lock:
self.balance += amount
account = BankAccount(1000)
def make_transactions():
for _ in range(100):
account.deposit(10)
account.withdraw(10)
threads = [threading.Thread(target=make_transactions) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Final balance: {account.balance}") # Always 1000Without the lock, concurrent reads and writes produce incorrect results (a race condition).
RLock (Reentrant Lock)
An RLock can be acquired multiple times by the same thread. This prevents deadlocks when a function holding a lock calls another function that also needs the same lock:
import threading
class SafeCache:
def __init__(self):
self._data = {}
self._lock = threading.RLock()
def get(self, key):
with self._lock:
return self._data.get(key)
def set(self, key, value):
with self._lock:
self._data[key] = value
def get_or_set(self, key, default):
with self._lock:
# This calls get(), which also acquires _lock
# RLock allows this; a regular Lock would deadlock
existing = self.get(key)
if existing is None:
self.set(key, default)
return default
return existingSemaphore
A Semaphore allows a fixed number of threads to access a resource simultaneously:
import threading
import time
# Allow max 3 concurrent database connections
db_semaphore = threading.Semaphore(3)
def query_database(query_id):
with db_semaphore:
print(f"Query {query_id}: connected (active connections: {3 - db_semaphore._value})")
time.sleep(2) # Simulate query
print(f"Query {query_id}: done")
threads = [threading.Thread(target=query_database, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()Event
An Event allows one thread to signal other waiting threads:
import threading
import time
data_ready = threading.Event()
shared_data = []
def producer():
print("Producer: preparing data...")
time.sleep(3)
shared_data.extend([1, 2, 3, 4, 5])
print("Producer: data ready, signaling consumers")
data_ready.set()
def consumer(name):
print(f"Consumer {name}: waiting for data...")
data_ready.wait() # Blocks until event is set
print(f"Consumer {name}: got data = {shared_data}")
threads = [
threading.Thread(target=producer),
threading.Thread(target=consumer, args=("A",)),
threading.Thread(target=consumer, args=("B",)),
]
for t in threads: t.start()
for t in threads: t.join()Condition
A Condition combines a lock with the ability to wait for a notification. It is the foundation for producer-consumer patterns:
import threading
import time
import random
buffer = []
MAX_SIZE = 5
condition = threading.Condition()
def producer():
for i in range(20):
with condition:
while len(buffer) >= MAX_SIZE:
condition.wait() # Wait until space available
item = random.randint(1, 100)
buffer.append(item)
print(f"Produced: {item} (buffer size: {len(buffer)})")
condition.notify_all()
time.sleep(0.1)
def consumer(name):
for _ in range(10):
with condition:
while len(buffer) == 0:
condition.wait() # Wait until item available
item = buffer.pop(0)
print(f"Consumer {name} consumed: {item} (buffer size: {len(buffer)})")
condition.notify_all()
time.sleep(0.15)
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=("X",))
t3 = threading.Thread(target=consumer, args=("Y",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()Synchronization Primitives Summary
| Primitive | Purpose | When to Use |
|---|---|---|
Lock | Mutual exclusion | Protect shared mutable state |
RLock | Reentrant mutex | Nested locking in same thread |
Semaphore | Limit concurrency | Rate limiting, connection pools |
Event | One-time signal | Initialization complete, shutdown signal |
Condition | Wait/notify pattern | Producer-consumer, state changes |
Barrier | Synchronize N threads | All threads must reach a point before continuing |
Thread-Safe Data Structures
queue.Queue
queue.Queue is the go-to thread-safe data structure. It handles all locking internally:
import threading
import queue
import time
task_queue = queue.Queue()
results = queue.Queue()
def worker():
while True:
item = task_queue.get() # Blocks until item available
if item is None:
break
result = item ** 2
results.put(result)
task_queue.task_done()
# Start 4 workers
workers = []
for _ in range(4):
t = threading.Thread(target=worker, daemon=True)
t.start()
workers.append(t)
# Submit tasks
for i in range(20):
task_queue.put(i)
# Wait for all tasks to complete
task_queue.join()
# Stop workers
for _ in range(4):
task_queue.put(None)
for w in workers:
w.join()
# Collect results
all_results = []
while not results.empty():
all_results.append(results.get())
print(f"Results: {sorted(all_results)}")queue.Queue also supports:
Queue(maxsize=10): Blocksput()when fullPriorityQueue(): Items sorted by priorityLifoQueue(): Last-in, first-out (stack behavior)
collections.deque
collections.deque is thread-safe for append() and popleft() operations (atomic at the C level in CPython), making it a fast alternative for simple producer-consumer patterns. For a deeper look at deque and other specialized containers, see our Python collections guide:
from collections import deque
import threading
import time
buffer = deque(maxlen=1000)
def producer():
for i in range(100):
buffer.append(i)
time.sleep(0.01)
def consumer():
consumed = 0
while consumed < 100:
if buffer:
item = buffer.popleft()
consumed += 1
else:
time.sleep(0.01)
print(f"Consumed {consumed} items")
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()Note: While individual append and popleft operations are thread-safe, checking len(buffer) and then popping is not atomic. For full thread safety, use queue.Queue.
Common Threading Patterns
Producer-Consumer Pattern
The classic pattern for decoupling data production from data processing:
import threading
import queue
import time
import random
def producer(q, name, num_items):
for i in range(num_items):
item = f"{name}-item-{i}"
q.put(item)
print(f"Producer {name}: created {item}")
time.sleep(random.uniform(0.05, 0.15))
print(f"Producer {name}: done")
def consumer(q, name, stop_event):
while not stop_event.is_set() or not q.empty():
try:
item = q.get(timeout=0.5)
print(f"Consumer {name}: processing {item}")
time.sleep(random.uniform(0.1, 0.2))
q.task_done()
except queue.Empty:
continue
print(f"Consumer {name}: shutting down")
task_queue = queue.Queue(maxsize=10)
stop_event = threading.Event()
producers = [
threading.Thread(target=producer, args=(task_queue, "P1", 10)),
threading.Thread(target=producer, args=(task_queue, "P2", 10)),
]
consumers = [
threading.Thread(target=consumer, args=(task_queue, "C1", stop_event)),
threading.Thread(target=consumer, args=(task_queue, "C2", stop_event)),
threading.Thread(target=consumer, args=(task_queue, "C3", stop_event)),
]
for c in consumers: c.start()
for p in producers: p.start()
for p in producers: p.join()
task_queue.join() # Wait for all items to be processed
stop_event.set() # Signal consumers to stop
for c in consumers: c.join()Worker Thread Pool (Manual)
When you need more control than ThreadPoolExecutor provides:
import threading
import queue
class WorkerPool:
def __init__(self, num_workers):
self.task_queue = queue.Queue()
self.result_queue = queue.Queue()
self.workers = []
for _ in range(num_workers):
t = threading.Thread(target=self._worker, daemon=True)
t.start()
self.workers.append(t)
def _worker(self):
while True:
func, args, kwargs, future_id = self.task_queue.get()
if func is None:
break
try:
result = func(*args, **kwargs)
self.result_queue.put((future_id, result, None))
except Exception as e:
self.result_queue.put((future_id, None, e))
finally:
self.task_queue.task_done()
def submit(self, func, *args, **kwargs):
future_id = id(func) # Simple ID
self.task_queue.put((func, args, kwargs, future_id))
return future_id
def shutdown(self):
for _ in self.workers:
self.task_queue.put((None, None, None, None))
for w in self.workers:
w.join()
# Usage
pool = WorkerPool(4)
for i in range(10):
pool.submit(lambda x: x * x, i)
pool.task_queue.join()
pool.shutdown()Rate-Limited Thread Pool
Control how fast threads make external requests:
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class RateLimiter:
def __init__(self, max_per_second):
self.interval = 1.0 / max_per_second
self.lock = threading.Lock()
self.last_call = 0
def wait(self):
with self.lock:
elapsed = time.time() - self.last_call
wait_time = self.interval - elapsed
if wait_time > 0:
time.sleep(wait_time)
self.last_call = time.time()
limiter = RateLimiter(max_per_second=5)
def rate_limited_fetch(url):
limiter.wait()
print(f"Fetching {url} at {time.time():.2f}")
time.sleep(0.5) # Simulate request
return f"Data from {url}"
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(rate_limited_fetch, urls))Thread Safety Pitfalls and How to Avoid Them
Race Conditions
A race condition occurs when the outcome depends on the timing of thread execution:
import threading
# BAD: Race condition
counter = 0
def increment_unsafe():
global counter
for _ in range(100_000):
counter += 1 # Read, increment, write: NOT atomic
threads = [threading.Thread(target=increment_unsafe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 500000, Got: {counter}") # Often less than 500000
# GOOD: Protected with lock
counter = 0
lock = threading.Lock()
def increment_safe():
global counter
for _ in range(100_000):
with lock:
counter += 1
threads = [threading.Thread(target=increment_safe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 500000, Got: {counter}") # Always 500000Deadlocks
A deadlock happens when two threads each hold a lock the other needs:
import threading
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread_1():
with lock_a:
print("Thread 1: acquired lock_a")
with lock_b: # Waits forever if thread_2 holds lock_b
print("Thread 1: acquired lock_b")
def thread_2():
with lock_b:
print("Thread 2: acquired lock_b")
with lock_a: # Waits forever if thread_1 holds lock_a
print("Thread 2: acquired lock_a")
# This WILL deadlock
# t1 = threading.Thread(target=thread_1)
# t2 = threading.Thread(target=thread_2)
# t1.start(); t2.start()How to prevent deadlocks:
- Always acquire locks in the same order:
def thread_1_fixed():
with lock_a: # Always lock_a first
with lock_b:
print("Thread 1: acquired both locks")
def thread_2_fixed():
with lock_a: # Always lock_a first (same order)
with lock_b:
print("Thread 2: acquired both locks")- Use timeouts:
def safe_acquire():
acquired_a = lock_a.acquire(timeout=2)
if not acquired_a:
print("Could not acquire lock_a, backing off")
return
try:
acquired_b = lock_b.acquire(timeout=2)
if not acquired_b:
print("Could not acquire lock_b, releasing lock_a")
return
try:
print("Acquired both locks safely")
finally:
lock_b.release()
finally:
lock_a.release()- Minimize lock scope: Hold locks for the shortest time possible.
Thread Safety Checklist
- Protect all shared mutable state with locks
- Use
queue.Queueinstead of shared lists or dicts when possible - Avoid global mutable state; pass data through function arguments
- Use
ThreadPoolExecutorinstead of manual thread management - Never assume operation order between threads
- Test with
threading.active_count()and logging to detect thread leaks
Real-World Examples
Concurrent Web Scraping
For making HTTP requests in threaded code, you can use urllib.request (shown below) or the popular requests library for a friendlier API.
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import time
def fetch_page(url):
"""Fetch a web page and return its content length"""
try:
with urllib.request.urlopen(url, timeout=10) as response:
content = response.read()
return url, len(content), None
except Exception as e:
return url, 0, str(e)
urls = [
"https://python.org",
"https://docs.python.org",
"https://pypi.org",
"https://realpython.com",
"https://github.com",
"https://stackoverflow.com",
"https://news.ycombinator.com",
"https://httpbin.org",
]
# Sequential
start = time.time()
for url in urls:
fetch_page(url)
sequential_time = time.time() - start
# Concurrent with threads
start = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(fetch_page, url): url for url in urls}
for future in as_completed(futures):
url, size, error = future.result()
if error:
print(f" FAIL {url}: {error}")
else:
print(f" OK {url}: {size:,} bytes")
threaded_time = time.time() - start
print(f"\nSequential: {sequential_time:.2f}s")
print(f"Threaded: {threaded_time:.2f}s")
print(f"Speedup: {sequential_time / threaded_time:.1f}x")Parallel File I/O
from concurrent.futures import ThreadPoolExecutor
import os
import hashlib
def process_file(filepath):
"""Read file and compute its SHA-256 hash"""
with open(filepath, 'rb') as f:
content = f.read()
file_hash = hashlib.sha256(content).hexdigest()
size = os.path.getsize(filepath)
return filepath, file_hash, size
def hash_all_files(directory, pattern="*.py"):
"""Hash all matching files in a directory using threads"""
import glob
files = glob.glob(os.path.join(directory, "**", pattern), recursive=True)
results = {}
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(process_file, f): f for f in files}
for future in futures:
try:
path, hash_val, size = future.result()
results[path] = {"hash": hash_val, "size": size}
except Exception as e:
print(f"Error processing {futures[future]}: {e}")
return results
# Usage
# file_hashes = hash_all_files("/path/to/project")Concurrent API Calls with Retry Logic
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import json
import time
def fetch_api(endpoint, max_retries=3, backoff=1.0):
"""Fetch API endpoint with exponential backoff retry"""
for attempt in range(max_retries):
try:
url = f"https://jsonplaceholder.typicode.com{endpoint}"
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=10) as response:
data = json.loads(response.read())
return {"endpoint": endpoint, "data": data, "error": None}
except Exception as e:
if attempt < max_retries - 1:
wait = backoff * (2 ** attempt)
time.sleep(wait)
else:
return {"endpoint": endpoint, "data": None, "error": str(e)}
endpoints = [f"/posts/{i}" for i in range(1, 21)]
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(fetch_api, ep) for ep in endpoints]
results = [f.result() for f in futures]
elapsed = time.time() - start
success = sum(1 for r in results if r["error"] is None)
print(f"Fetched {success}/{len(endpoints)} endpoints in {elapsed:.2f}s")Periodic Background Tasks
import threading
import time
class PeriodicTask:
"""Run a function at fixed intervals in a background thread"""
def __init__(self, interval, func, *args, **kwargs):
self.interval = interval
self.func = func
self.args = args
self.kwargs = kwargs
self._stop_event = threading.Event()
self._thread = None
def start(self):
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def _run(self):
while not self._stop_event.is_set():
self.func(*self.args, **self.kwargs)
self._stop_event.wait(self.interval)
def stop(self):
self._stop_event.set()
if self._thread:
self._thread.join()
# Usage
def check_health():
print(f"Health check at {time.strftime('%H:%M:%S')}")
task = PeriodicTask(2.0, check_health)
task.start()
time.sleep(7)
task.stop()
print("Stopped")Performance: Threading vs Multiprocessing vs Asyncio
The right concurrency tool depends on the workload. Here is a comparison of wall-clock time for common tasks:
| Task | Sequential | Threading (4) | Multiprocessing (4) | Asyncio |
|---|---|---|---|---|
| 100 HTTP requests (200ms each) | 20.0s | 5.1s | 5.8s | 4.9s |
| 100 file reads (10ms each) | 1.0s | 0.28s | 0.35s | 0.26s |
| 100 CPU tasks (100ms each) | 10.0s | 10.2s | 2.7s | 10.0s |
| 50 DB queries (50ms each) | 2.5s | 0.68s | 0.85s | 0.62s |
| Mixed I/O + CPU | 15.0s | 8.2s | 4.1s | 9.5s |
Key takeaways:
- Threading delivers 3-5x speedup on I/O-bound workloads with minimal code changes
- Multiprocessing is the only option for true CPU parallelism but adds process overhead
- Asyncio edges threading on high-concurrency I/O but requires rewriting code with async/await
- For mixed workloads, consider combining threading for I/O and multiprocessing for CPU tasks
import time
import threading
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def io_task():
time.sleep(0.2)
def cpu_task(n=2_000_000):
return sum(i * i for i in range(n))
# Benchmark threading vs multiprocessing
NUM_TASKS = 20
# Threading - I/O bound
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
list(pool.map(lambda _: io_task(), range(NUM_TASKS)))
print(f"Threading (I/O): {time.time() - start:.2f}s")
# Threading - CPU bound
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
list(pool.map(lambda _: cpu_task(), range(NUM_TASKS)))
print(f"Threading (CPU): {time.time() - start:.2f}s")Experimenting with Threading in RunCell
Debugging and profiling threaded code can be challenging. When you need to test thread synchronization, visualize timing overlaps, or diagnose race conditions interactively, RunCell (www.runcell.dev (opens in a new tab)) provides an AI-powered Jupyter environment designed for this workflow.
RunCell's AI agent can analyze your threading code, identify potential deadlocks before they happen, suggest optimal worker counts based on your workload, and help you understand why threads behave unexpectedly. When a thread pool produces incorrect results intermittently, RunCell traces the execution timeline to pinpoint the exact moment shared state is corrupted.
If you want to visualize the performance characteristics of different threading configurations, PyGWalker (github.com/Kanaries/pygwalker) can turn your benchmark DataFrames into interactive charts. Run threading benchmarks, collect timing data into a pandas DataFrame, and explore the results with drag-and-drop visualizations to find the optimal thread count for your workload.
FAQ
What is the difference between threading and multiprocessing in Python?
Threading runs multiple threads within a single process, sharing memory. The Global Interpreter Lock (GIL) prevents threads from executing Python bytecode in parallel, making threading effective only for I/O-bound tasks like network requests and file operations. Multiprocessing creates separate processes, each with its own Python interpreter and memory space, enabling true parallel execution for CPU-bound tasks. Threading has lower overhead (faster startup, less memory), while multiprocessing bypasses the GIL for genuine parallelism.
Is Python threading truly parallel?
No, Python threading is concurrent but not parallel for CPU-bound code due to the GIL. Only one thread executes Python bytecode at a time. However, the GIL releases during I/O operations (network, disk, database), so multiple threads effectively run in parallel when waiting on I/O. For CPU-bound parallelism, use the multiprocessing module or C extensions that release the GIL (like NumPy).
How many threads should I use in Python?
For I/O-bound tasks, start with 5-20 threads depending on the external service's rate limits and your network bandwidth. Too many threads to a single server can cause connection refusals or throttling. For mixed workloads, experiment with thread counts between the number of CPU cores and 4x the core count. Use ThreadPoolExecutor and benchmark with different max_workers values to find the optimal count for your specific workload. The default for ThreadPoolExecutor is min(32, os.cpu_count() + 4).
How do I return a value from a Python thread?
Threads do not return values directly from their target function. The three main approaches are: (1) Use ThreadPoolExecutor.submit() which returns a Future object where you call future.result() to get the return value. (2) Pass a mutable container (like a dictionary or list) as an argument and have the thread write results into it, protected by a Lock. (3) Use queue.Queue where the thread puts results into the queue and the main thread reads from it. ThreadPoolExecutor is the cleanest approach for most use cases.
What happens if a Python thread raises an exception?
In a raw threading.Thread, an unhandled exception terminates that thread silently and the exception is lost. The main thread and other threads continue running without any notification. With ThreadPoolExecutor, exceptions are captured and re-raised when you call future.result(), making error handling much more reliable. Always use try/except blocks inside thread target functions or use ThreadPoolExecutor to ensure exceptions are properly caught and handled.
Conclusion
Python threading is a powerful tool for speeding up I/O-bound programs. By running network requests, file operations, and database queries concurrently, you can turn a 20-second sequential script into one that finishes in 5 seconds with minimal code changes.
The key points to remember:
- Use threading for I/O-bound work. The GIL prevents CPU parallelism, but threads overlap I/O waiting time effectively.
- Use
ThreadPoolExecutorfor most threading needs. It manages threads, collects results, and propagates exceptions cleanly. - Protect shared state with locks. Race conditions are the most common threading bug, and
queue.Queueeliminates most locking concerns. - Avoid deadlocks by acquiring locks in a consistent order and using timeouts.
- Choose the right tool: threading for I/O, multiprocessing for CPU, asyncio for thousands of concurrent connections.
Start with ThreadPoolExecutor and a simple executor.map() call. Measure the speedup. Add synchronization only where shared mutable state demands it. Threading does not require a complete rewrite of your code. A few lines of concurrent.futures can deliver dramatic performance improvements for any program that spends time waiting.
Related Guides
- Python asyncio -- Async/await alternative for high-concurrency I/O
- Python subprocess -- Run external commands from threaded applications
- Python generators -- Memory-efficient data processing with lazy evaluation
- Python requests -- HTTP library commonly used in threaded web scrapers