Skip to content

Python Threading: Complete Guide to Multithreading with Examples

Updated on

Your Python program makes 50 API calls, one after another. Each call takes 200 milliseconds of waiting. The math is brutal: 10 seconds of your program's life wasted staring at network responses. Your CPU sits idle at near-zero utilization while your script crawls through I/O-bound operations that could run simultaneously.

This problem compounds fast. Web scrapers that fetch thousands of pages sequentially. File processing scripts that read and write one file at a time. Database queries that block the entire application while waiting for results. Every second of idle waiting is a second your program could be doing useful work.

Python's threading module solves this by running multiple operations concurrently within a single process. Threads share memory, start quickly, and excel at I/O-bound workloads where the program spends most of its time waiting. This guide covers everything from basic thread creation to advanced synchronization patterns, with production-ready code examples you can use immediately.

📚

What is Threading in Python?

Threading allows a program to run multiple operations concurrently within the same process. Each thread shares the same memory space, making communication between threads fast and straightforward.

Python's threading module provides a high-level interface for creating and managing threads. But there is an important caveat: the Global Interpreter Lock (GIL).

The Global Interpreter Lock (GIL)

The GIL is a mutex in CPython that allows only one thread to execute Python bytecode at a time. This means threads cannot achieve true parallelism for CPU-bound operations. However, the GIL releases during I/O operations (network calls, file reads, database queries), allowing other threads to run while one waits for I/O.

import threading
import time
 
def cpu_bound(n):
    """CPU-bound: GIL prevents parallel execution"""
    total = 0
    for i in range(n):
        total += i * i
    return total
 
def io_bound(url):
    """I/O-bound: GIL releases during network wait"""
    import urllib.request
    return urllib.request.urlopen(url).read()
 
# CPU-bound: 4 threads run one-at-a-time (no speedup)
start = time.time()
threads = [threading.Thread(target=cpu_bound, args=(10_000_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"CPU-bound with threads: {time.time() - start:.2f}s")
 
# I/O-bound: 4 threads overlap their waiting time (big speedup)

This means threading is ideal for I/O-bound tasks but not for CPU-heavy computation. For CPU-bound work, use the multiprocessing module instead.

When to Use Threading vs Multiprocessing vs Asyncio

Featurethreadingmultiprocessingasyncio
Best forI/O-bound tasksCPU-bound tasksHigh-concurrency I/O
ParallelismConcurrent (GIL limited)True parallelConcurrent (single thread)
MemoryShared (lightweight)Separate per processShared (lightweight)
Startup costLow (~1ms)High (~50-100ms)Very low
CommunicationDirect memory accessPipes, Queues, shared memoryAwaitable coroutines
Scalability10s-100s of threadsLimited by CPU cores1000s of coroutines
ComplexityMedium (locking needed)Medium (serialization)High (async/await syntax)
Use caseWeb scraping, file I/O, API callsData processing, ML trainingWeb servers, chat apps

Rule of thumb: If your program waits on network or disk, use threading. If it crunches numbers, use multiprocessing. If you need thousands of concurrent connections, use asyncio.

Thread Basics: Creating and Running Threads

The threading.Thread Class

The simplest way to create a thread is by passing a target function to threading.Thread:

import threading
import time
 
def download_file(filename):
    print(f"[{threading.current_thread().name}] Downloading {filename}...")
    time.sleep(2)  # Simulate download
    print(f"[{threading.current_thread().name}] Finished {filename}")
 
# Create threads
t1 = threading.Thread(target=download_file, args=("data.csv",))
t2 = threading.Thread(target=download_file, args=("report.pdf",))
 
# Start threads
t1.start()
t2.start()
 
# Wait for both to finish
t1.join()
t2.join()
 
print("All downloads complete")

Both downloads run concurrently, finishing in roughly 2 seconds instead of 4.

start() and join()

  • start() begins thread execution. A thread can only be started once.
  • join(timeout=None) blocks the calling thread until the target thread finishes. Pass a timeout in seconds to avoid waiting forever.
import threading
import time
 
def slow_task():
    time.sleep(10)
 
t = threading.Thread(target=slow_task)
t.start()
 
# Wait at most 3 seconds
t.join(timeout=3)
 
if t.is_alive():
    print("Thread still running after 3 seconds")
else:
    print("Thread finished")

Naming Threads

Named threads make debugging easier:

import threading
 
def worker():
    name = threading.current_thread().name
    print(f"Running in thread: {name}")
 
t = threading.Thread(target=worker, name="DataProcessor")
t.start()
t.join()

Daemon Threads

Daemon threads are background threads that automatically terminate when the main program exits. Non-daemon threads keep the program alive until they finish.

import threading
import time
 
def background_monitor():
    while True:
        print("Monitoring system health...")
        time.sleep(5)
 
# Daemon thread: dies when main program exits
monitor = threading.Thread(target=background_monitor, daemon=True)
monitor.start()
 
# Main program does its work
time.sleep(12)
print("Main program exiting")
# monitor thread is killed automatically

Use daemon threads for background logging, monitoring, or cleanup tasks that should not prevent program exit.

Subclassing Thread

For more complex thread behavior, subclass threading.Thread:

import threading
import time
 
class FileProcessor(threading.Thread):
    def __init__(self, filepath):
        super().__init__()
        self.filepath = filepath
        self.result = None
 
    def run(self):
        """Override run() with thread logic"""
        print(f"Processing {self.filepath}")
        time.sleep(1)  # Simulate work
        self.result = f"Processed: {self.filepath}"
 
# Create and run
processor = FileProcessor("/data/report.csv")
processor.start()
processor.join()
print(processor.result)

Passing Arguments to Threads

Using args and kwargs

Pass positional arguments with args (a tuple) and keyword arguments with kwargs (a dict):

import threading
 
def fetch_data(url, timeout, retries=3, verbose=False):
    print(f"Fetching {url} (timeout={timeout}s, retries={retries}, verbose={verbose})")
 
# Positional args as tuple
t1 = threading.Thread(target=fetch_data, args=("https://api.example.com", 30))
 
# Keyword args as dict
t2 = threading.Thread(
    target=fetch_data,
    args=("https://api.example.com",),
    kwargs={"timeout": 30, "retries": 5, "verbose": True}
)
 
t1.start()
t2.start()
t1.join()
t2.join()

Common mistake: Forgetting the trailing comma in a single-element tuple. args=("hello",) is a tuple; args=("hello") is just a string in parentheses.

Collecting Results from Threads

Threads do not return values directly. Use shared data structures or a list to collect results:

import threading
 
results = {}
lock = threading.Lock()
 
def compute(task_id, value):
    result = value ** 2
    with lock:
        results[task_id] = result
 
threads = []
for i in range(5):
    t = threading.Thread(target=compute, args=(i, i * 10))
    threads.append(t)
    t.start()
 
for t in threads:
    t.join()
 
print(results)  # {0: 0, 1: 100, 2: 400, 3: 900, 4: 1600}

A cleaner approach uses ThreadPoolExecutor (covered next), which handles result collection automatically.

ThreadPoolExecutor: The Modern Approach

The concurrent.futures module provides ThreadPoolExecutor, a high-level interface that manages a pool of worker threads. It handles thread creation, result collection, and exception propagation automatically.

Basic Usage with submit()

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
 
def fetch_url(url):
    time.sleep(1)  # Simulate network request
    return f"Content from {url}"
 
urls = [
    "https://example.com/page1",
    "https://example.com/page2",
    "https://example.com/page3",
    "https://example.com/page4",
    "https://example.com/page5",
]
 
with ThreadPoolExecutor(max_workers=3) as executor:
    # Submit tasks and get Future objects
    future_to_url = {executor.submit(fetch_url, url): url for url in urls}
 
    # Process results as they complete
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            print(f"{url}: {data}")
        except Exception as e:
            print(f"{url} generated an exception: {e}")

Using map() for Ordered Results

executor.map() returns results in the same order as the input, similar to the built-in map():

from concurrent.futures import ThreadPoolExecutor
 
def process_item(item):
    return item.upper()
 
items = ["apple", "banana", "cherry", "date"]
 
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process_item, items))
 
print(results)  # ['APPLE', 'BANANA', 'CHERRY', 'DATE']

submit() vs map()

submit()map()
ReturnsFuture objectsIterator of results
Result orderCompletion order (with as_completed)Input order
Error handlingPer-task via future.result()Raises on first failure
ArgumentsSingle function callApplies function to each item
Best forHeterogeneous tasks, early resultsHomogeneous batch processing

Exception Handling with Futures

from concurrent.futures import ThreadPoolExecutor, as_completed
 
def risky_task(n):
    if n == 3:
        raise ValueError(f"Bad input: {n}")
    return n * 10
 
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(risky_task, i): i for i in range(5)}
 
    for future in as_completed(futures):
        task_id = futures[future]
        try:
            result = future.result(timeout=5)
            print(f"Task {task_id}: {result}")
        except ValueError as e:
            print(f"Task {task_id} failed: {e}")
        except TimeoutError:
            print(f"Task {task_id} timed out")

Cancelling Tasks

from concurrent.futures import ThreadPoolExecutor
import time
 
def long_task(n):
    time.sleep(5)
    return n
 
with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(long_task, i) for i in range(10)]
 
    # Cancel pending tasks (already-running tasks cannot be cancelled)
    for f in futures[4:]:
        cancelled = f.cancel()
        print(f"Cancelled: {cancelled}")

Thread Synchronization Primitives

When multiple threads access shared data, you need synchronization to prevent race conditions.

Lock

A Lock ensures only one thread enters a critical section at a time:

import threading
 
class BankAccount:
    def __init__(self, balance):
        self.balance = balance
        self.lock = threading.Lock()
 
    def withdraw(self, amount):
        with self.lock:  # Only one thread at a time
            if self.balance >= amount:
                self.balance -= amount
                return True
            return False
 
    def deposit(self, amount):
        with self.lock:
            self.balance += amount
 
account = BankAccount(1000)
 
def make_transactions():
    for _ in range(100):
        account.deposit(10)
        account.withdraw(10)
 
threads = [threading.Thread(target=make_transactions) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
 
print(f"Final balance: {account.balance}")  # Always 1000

Without the lock, concurrent reads and writes produce incorrect results (a race condition).

RLock (Reentrant Lock)

An RLock can be acquired multiple times by the same thread. This prevents deadlocks when a function holding a lock calls another function that also needs the same lock:

import threading
 
class SafeCache:
    def __init__(self):
        self._data = {}
        self._lock = threading.RLock()
 
    def get(self, key):
        with self._lock:
            return self._data.get(key)
 
    def set(self, key, value):
        with self._lock:
            self._data[key] = value
 
    def get_or_set(self, key, default):
        with self._lock:
            # This calls get(), which also acquires _lock
            # RLock allows this; a regular Lock would deadlock
            existing = self.get(key)
            if existing is None:
                self.set(key, default)
                return default
            return existing

Semaphore

A Semaphore allows a fixed number of threads to access a resource simultaneously:

import threading
import time
 
# Allow max 3 concurrent database connections
db_semaphore = threading.Semaphore(3)
 
def query_database(query_id):
    with db_semaphore:
        print(f"Query {query_id}: connected (active connections: {3 - db_semaphore._value})")
        time.sleep(2)  # Simulate query
        print(f"Query {query_id}: done")
 
threads = [threading.Thread(target=query_database, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()

Event

An Event allows one thread to signal other waiting threads:

import threading
import time
 
data_ready = threading.Event()
shared_data = []
 
def producer():
    print("Producer: preparing data...")
    time.sleep(3)
    shared_data.extend([1, 2, 3, 4, 5])
    print("Producer: data ready, signaling consumers")
    data_ready.set()
 
def consumer(name):
    print(f"Consumer {name}: waiting for data...")
    data_ready.wait()  # Blocks until event is set
    print(f"Consumer {name}: got data = {shared_data}")
 
threads = [
    threading.Thread(target=producer),
    threading.Thread(target=consumer, args=("A",)),
    threading.Thread(target=consumer, args=("B",)),
]
for t in threads: t.start()
for t in threads: t.join()

Condition

A Condition combines a lock with the ability to wait for a notification. It is the foundation for producer-consumer patterns:

import threading
import time
import random
 
buffer = []
MAX_SIZE = 5
condition = threading.Condition()
 
def producer():
    for i in range(20):
        with condition:
            while len(buffer) >= MAX_SIZE:
                condition.wait()  # Wait until space available
            item = random.randint(1, 100)
            buffer.append(item)
            print(f"Produced: {item} (buffer size: {len(buffer)})")
            condition.notify_all()
        time.sleep(0.1)
 
def consumer(name):
    for _ in range(10):
        with condition:
            while len(buffer) == 0:
                condition.wait()  # Wait until item available
            item = buffer.pop(0)
            print(f"Consumer {name} consumed: {item} (buffer size: {len(buffer)})")
            condition.notify_all()
        time.sleep(0.15)
 
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=("X",))
t3 = threading.Thread(target=consumer, args=("Y",))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()

Synchronization Primitives Summary

PrimitivePurposeWhen to Use
LockMutual exclusionProtect shared mutable state
RLockReentrant mutexNested locking in same thread
SemaphoreLimit concurrencyRate limiting, connection pools
EventOne-time signalInitialization complete, shutdown signal
ConditionWait/notify patternProducer-consumer, state changes
BarrierSynchronize N threadsAll threads must reach a point before continuing

Thread-Safe Data Structures

queue.Queue

queue.Queue is the go-to thread-safe data structure. It handles all locking internally:

import threading
import queue
import time
 
task_queue = queue.Queue()
results = queue.Queue()
 
def worker():
    while True:
        item = task_queue.get()  # Blocks until item available
        if item is None:
            break
        result = item ** 2
        results.put(result)
        task_queue.task_done()
 
# Start 4 workers
workers = []
for _ in range(4):
    t = threading.Thread(target=worker, daemon=True)
    t.start()
    workers.append(t)
 
# Submit tasks
for i in range(20):
    task_queue.put(i)
 
# Wait for all tasks to complete
task_queue.join()
 
# Stop workers
for _ in range(4):
    task_queue.put(None)
for w in workers:
    w.join()
 
# Collect results
all_results = []
while not results.empty():
    all_results.append(results.get())
print(f"Results: {sorted(all_results)}")

queue.Queue also supports:

  • Queue(maxsize=10): Blocks put() when full
  • PriorityQueue(): Items sorted by priority
  • LifoQueue(): Last-in, first-out (stack behavior)

collections.deque

collections.deque is thread-safe for append() and popleft() operations (atomic at the C level in CPython), making it a fast alternative for simple producer-consumer patterns. For a deeper look at deque and other specialized containers, see our Python collections guide:

from collections import deque
import threading
import time
 
buffer = deque(maxlen=1000)
 
def producer():
    for i in range(100):
        buffer.append(i)
        time.sleep(0.01)
 
def consumer():
    consumed = 0
    while consumed < 100:
        if buffer:
            item = buffer.popleft()
            consumed += 1
        else:
            time.sleep(0.01)
    print(f"Consumed {consumed} items")
 
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()

Note: While individual append and popleft operations are thread-safe, checking len(buffer) and then popping is not atomic. For full thread safety, use queue.Queue.

Common Threading Patterns

Producer-Consumer Pattern

The classic pattern for decoupling data production from data processing:

import threading
import queue
import time
import random
 
def producer(q, name, num_items):
    for i in range(num_items):
        item = f"{name}-item-{i}"
        q.put(item)
        print(f"Producer {name}: created {item}")
        time.sleep(random.uniform(0.05, 0.15))
    print(f"Producer {name}: done")
 
def consumer(q, name, stop_event):
    while not stop_event.is_set() or not q.empty():
        try:
            item = q.get(timeout=0.5)
            print(f"Consumer {name}: processing {item}")
            time.sleep(random.uniform(0.1, 0.2))
            q.task_done()
        except queue.Empty:
            continue
    print(f"Consumer {name}: shutting down")
 
task_queue = queue.Queue(maxsize=10)
stop_event = threading.Event()
 
producers = [
    threading.Thread(target=producer, args=(task_queue, "P1", 10)),
    threading.Thread(target=producer, args=(task_queue, "P2", 10)),
]
consumers = [
    threading.Thread(target=consumer, args=(task_queue, "C1", stop_event)),
    threading.Thread(target=consumer, args=(task_queue, "C2", stop_event)),
    threading.Thread(target=consumer, args=(task_queue, "C3", stop_event)),
]
 
for c in consumers: c.start()
for p in producers: p.start()
for p in producers: p.join()
 
task_queue.join()  # Wait for all items to be processed
stop_event.set()   # Signal consumers to stop
for c in consumers: c.join()

Worker Thread Pool (Manual)

When you need more control than ThreadPoolExecutor provides:

import threading
import queue
 
class WorkerPool:
    def __init__(self, num_workers):
        self.task_queue = queue.Queue()
        self.result_queue = queue.Queue()
        self.workers = []
 
        for _ in range(num_workers):
            t = threading.Thread(target=self._worker, daemon=True)
            t.start()
            self.workers.append(t)
 
    def _worker(self):
        while True:
            func, args, kwargs, future_id = self.task_queue.get()
            if func is None:
                break
            try:
                result = func(*args, **kwargs)
                self.result_queue.put((future_id, result, None))
            except Exception as e:
                self.result_queue.put((future_id, None, e))
            finally:
                self.task_queue.task_done()
 
    def submit(self, func, *args, **kwargs):
        future_id = id(func)  # Simple ID
        self.task_queue.put((func, args, kwargs, future_id))
        return future_id
 
    def shutdown(self):
        for _ in self.workers:
            self.task_queue.put((None, None, None, None))
        for w in self.workers:
            w.join()
 
# Usage
pool = WorkerPool(4)
for i in range(10):
    pool.submit(lambda x: x * x, i)
pool.task_queue.join()
pool.shutdown()

Rate-Limited Thread Pool

Control how fast threads make external requests:

import threading
import time
from concurrent.futures import ThreadPoolExecutor
 
class RateLimiter:
    def __init__(self, max_per_second):
        self.interval = 1.0 / max_per_second
        self.lock = threading.Lock()
        self.last_call = 0
 
    def wait(self):
        with self.lock:
            elapsed = time.time() - self.last_call
            wait_time = self.interval - elapsed
            if wait_time > 0:
                time.sleep(wait_time)
            self.last_call = time.time()
 
limiter = RateLimiter(max_per_second=5)
 
def rate_limited_fetch(url):
    limiter.wait()
    print(f"Fetching {url} at {time.time():.2f}")
    time.sleep(0.5)  # Simulate request
    return f"Data from {url}"
 
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
 
with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(rate_limited_fetch, urls))

Thread Safety Pitfalls and How to Avoid Them

Race Conditions

A race condition occurs when the outcome depends on the timing of thread execution:

import threading
 
# BAD: Race condition
counter = 0
 
def increment_unsafe():
    global counter
    for _ in range(100_000):
        counter += 1  # Read, increment, write: NOT atomic
 
threads = [threading.Thread(target=increment_unsafe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 500000, Got: {counter}")  # Often less than 500000
 
# GOOD: Protected with lock
counter = 0
lock = threading.Lock()
 
def increment_safe():
    global counter
    for _ in range(100_000):
        with lock:
            counter += 1
 
threads = [threading.Thread(target=increment_safe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 500000, Got: {counter}")  # Always 500000

Deadlocks

A deadlock happens when two threads each hold a lock the other needs:

import threading
 
lock_a = threading.Lock()
lock_b = threading.Lock()
 
def thread_1():
    with lock_a:
        print("Thread 1: acquired lock_a")
        with lock_b:  # Waits forever if thread_2 holds lock_b
            print("Thread 1: acquired lock_b")
 
def thread_2():
    with lock_b:
        print("Thread 2: acquired lock_b")
        with lock_a:  # Waits forever if thread_1 holds lock_a
            print("Thread 2: acquired lock_a")
 
# This WILL deadlock
# t1 = threading.Thread(target=thread_1)
# t2 = threading.Thread(target=thread_2)
# t1.start(); t2.start()

How to prevent deadlocks:

  1. Always acquire locks in the same order:
def thread_1_fixed():
    with lock_a:    # Always lock_a first
        with lock_b:
            print("Thread 1: acquired both locks")
 
def thread_2_fixed():
    with lock_a:    # Always lock_a first (same order)
        with lock_b:
            print("Thread 2: acquired both locks")
  1. Use timeouts:
def safe_acquire():
    acquired_a = lock_a.acquire(timeout=2)
    if not acquired_a:
        print("Could not acquire lock_a, backing off")
        return
    try:
        acquired_b = lock_b.acquire(timeout=2)
        if not acquired_b:
            print("Could not acquire lock_b, releasing lock_a")
            return
        try:
            print("Acquired both locks safely")
        finally:
            lock_b.release()
    finally:
        lock_a.release()
  1. Minimize lock scope: Hold locks for the shortest time possible.

Thread Safety Checklist

  • Protect all shared mutable state with locks
  • Use queue.Queue instead of shared lists or dicts when possible
  • Avoid global mutable state; pass data through function arguments
  • Use ThreadPoolExecutor instead of manual thread management
  • Never assume operation order between threads
  • Test with threading.active_count() and logging to detect thread leaks

Real-World Examples

Concurrent Web Scraping

For making HTTP requests in threaded code, you can use urllib.request (shown below) or the popular requests library for a friendlier API.

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import time
 
def fetch_page(url):
    """Fetch a web page and return its content length"""
    try:
        with urllib.request.urlopen(url, timeout=10) as response:
            content = response.read()
            return url, len(content), None
    except Exception as e:
        return url, 0, str(e)
 
urls = [
    "https://python.org",
    "https://docs.python.org",
    "https://pypi.org",
    "https://realpython.com",
    "https://github.com",
    "https://stackoverflow.com",
    "https://news.ycombinator.com",
    "https://httpbin.org",
]
 
# Sequential
start = time.time()
for url in urls:
    fetch_page(url)
sequential_time = time.time() - start
 
# Concurrent with threads
start = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
    futures = {executor.submit(fetch_page, url): url for url in urls}
    for future in as_completed(futures):
        url, size, error = future.result()
        if error:
            print(f"  FAIL {url}: {error}")
        else:
            print(f"  OK   {url}: {size:,} bytes")
threaded_time = time.time() - start
 
print(f"\nSequential: {sequential_time:.2f}s")
print(f"Threaded:   {threaded_time:.2f}s")
print(f"Speedup:    {sequential_time / threaded_time:.1f}x")

Parallel File I/O

from concurrent.futures import ThreadPoolExecutor
import os
import hashlib
 
def process_file(filepath):
    """Read file and compute its SHA-256 hash"""
    with open(filepath, 'rb') as f:
        content = f.read()
    file_hash = hashlib.sha256(content).hexdigest()
    size = os.path.getsize(filepath)
    return filepath, file_hash, size
 
def hash_all_files(directory, pattern="*.py"):
    """Hash all matching files in a directory using threads"""
    import glob
    files = glob.glob(os.path.join(directory, "**", pattern), recursive=True)
 
    results = {}
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = {executor.submit(process_file, f): f for f in files}
        for future in futures:
            try:
                path, hash_val, size = future.result()
                results[path] = {"hash": hash_val, "size": size}
            except Exception as e:
                print(f"Error processing {futures[future]}: {e}")
 
    return results
 
# Usage
# file_hashes = hash_all_files("/path/to/project")

Concurrent API Calls with Retry Logic

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
import json
import time
 
def fetch_api(endpoint, max_retries=3, backoff=1.0):
    """Fetch API endpoint with exponential backoff retry"""
    for attempt in range(max_retries):
        try:
            url = f"https://jsonplaceholder.typicode.com{endpoint}"
            req = urllib.request.Request(url)
            with urllib.request.urlopen(req, timeout=10) as response:
                data = json.loads(response.read())
                return {"endpoint": endpoint, "data": data, "error": None}
        except Exception as e:
            if attempt < max_retries - 1:
                wait = backoff * (2 ** attempt)
                time.sleep(wait)
            else:
                return {"endpoint": endpoint, "data": None, "error": str(e)}
 
endpoints = [f"/posts/{i}" for i in range(1, 21)]
 
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(fetch_api, ep) for ep in endpoints]
    results = [f.result() for f in futures]
 
elapsed = time.time() - start
success = sum(1 for r in results if r["error"] is None)
print(f"Fetched {success}/{len(endpoints)} endpoints in {elapsed:.2f}s")

Periodic Background Tasks

import threading
import time
 
class PeriodicTask:
    """Run a function at fixed intervals in a background thread"""
    def __init__(self, interval, func, *args, **kwargs):
        self.interval = interval
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self._stop_event = threading.Event()
        self._thread = None
 
    def start(self):
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()
 
    def _run(self):
        while not self._stop_event.is_set():
            self.func(*self.args, **self.kwargs)
            self._stop_event.wait(self.interval)
 
    def stop(self):
        self._stop_event.set()
        if self._thread:
            self._thread.join()
 
# Usage
def check_health():
    print(f"Health check at {time.strftime('%H:%M:%S')}")
 
task = PeriodicTask(2.0, check_health)
task.start()
time.sleep(7)
task.stop()
print("Stopped")

Performance: Threading vs Multiprocessing vs Asyncio

The right concurrency tool depends on the workload. Here is a comparison of wall-clock time for common tasks:

TaskSequentialThreading (4)Multiprocessing (4)Asyncio
100 HTTP requests (200ms each)20.0s5.1s5.8s4.9s
100 file reads (10ms each)1.0s0.28s0.35s0.26s
100 CPU tasks (100ms each)10.0s10.2s2.7s10.0s
50 DB queries (50ms each)2.5s0.68s0.85s0.62s
Mixed I/O + CPU15.0s8.2s4.1s9.5s

Key takeaways:

  • Threading delivers 3-5x speedup on I/O-bound workloads with minimal code changes
  • Multiprocessing is the only option for true CPU parallelism but adds process overhead
  • Asyncio edges threading on high-concurrency I/O but requires rewriting code with async/await
  • For mixed workloads, consider combining threading for I/O and multiprocessing for CPU tasks
import time
import threading
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
 
def io_task():
    time.sleep(0.2)
 
def cpu_task(n=2_000_000):
    return sum(i * i for i in range(n))
 
# Benchmark threading vs multiprocessing
NUM_TASKS = 20
 
# Threading - I/O bound
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
    list(pool.map(lambda _: io_task(), range(NUM_TASKS)))
print(f"Threading (I/O): {time.time() - start:.2f}s")
 
# Threading - CPU bound
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
    list(pool.map(lambda _: cpu_task(), range(NUM_TASKS)))
print(f"Threading (CPU): {time.time() - start:.2f}s")

Experimenting with Threading in RunCell

Debugging and profiling threaded code can be challenging. When you need to test thread synchronization, visualize timing overlaps, or diagnose race conditions interactively, RunCell (www.runcell.dev (opens in a new tab)) provides an AI-powered Jupyter environment designed for this workflow.

RunCell's AI agent can analyze your threading code, identify potential deadlocks before they happen, suggest optimal worker counts based on your workload, and help you understand why threads behave unexpectedly. When a thread pool produces incorrect results intermittently, RunCell traces the execution timeline to pinpoint the exact moment shared state is corrupted.

If you want to visualize the performance characteristics of different threading configurations, PyGWalker (github.com/Kanaries/pygwalker) can turn your benchmark DataFrames into interactive charts. Run threading benchmarks, collect timing data into a pandas DataFrame, and explore the results with drag-and-drop visualizations to find the optimal thread count for your workload.

FAQ

What is the difference between threading and multiprocessing in Python?

Threading runs multiple threads within a single process, sharing memory. The Global Interpreter Lock (GIL) prevents threads from executing Python bytecode in parallel, making threading effective only for I/O-bound tasks like network requests and file operations. Multiprocessing creates separate processes, each with its own Python interpreter and memory space, enabling true parallel execution for CPU-bound tasks. Threading has lower overhead (faster startup, less memory), while multiprocessing bypasses the GIL for genuine parallelism.

Is Python threading truly parallel?

No, Python threading is concurrent but not parallel for CPU-bound code due to the GIL. Only one thread executes Python bytecode at a time. However, the GIL releases during I/O operations (network, disk, database), so multiple threads effectively run in parallel when waiting on I/O. For CPU-bound parallelism, use the multiprocessing module or C extensions that release the GIL (like NumPy).

How many threads should I use in Python?

For I/O-bound tasks, start with 5-20 threads depending on the external service's rate limits and your network bandwidth. Too many threads to a single server can cause connection refusals or throttling. For mixed workloads, experiment with thread counts between the number of CPU cores and 4x the core count. Use ThreadPoolExecutor and benchmark with different max_workers values to find the optimal count for your specific workload. The default for ThreadPoolExecutor is min(32, os.cpu_count() + 4).

How do I return a value from a Python thread?

Threads do not return values directly from their target function. The three main approaches are: (1) Use ThreadPoolExecutor.submit() which returns a Future object where you call future.result() to get the return value. (2) Pass a mutable container (like a dictionary or list) as an argument and have the thread write results into it, protected by a Lock. (3) Use queue.Queue where the thread puts results into the queue and the main thread reads from it. ThreadPoolExecutor is the cleanest approach for most use cases.

What happens if a Python thread raises an exception?

In a raw threading.Thread, an unhandled exception terminates that thread silently and the exception is lost. The main thread and other threads continue running without any notification. With ThreadPoolExecutor, exceptions are captured and re-raised when you call future.result(), making error handling much more reliable. Always use try/except blocks inside thread target functions or use ThreadPoolExecutor to ensure exceptions are properly caught and handled.

Conclusion

Python threading is a powerful tool for speeding up I/O-bound programs. By running network requests, file operations, and database queries concurrently, you can turn a 20-second sequential script into one that finishes in 5 seconds with minimal code changes.

The key points to remember:

  • Use threading for I/O-bound work. The GIL prevents CPU parallelism, but threads overlap I/O waiting time effectively.
  • Use ThreadPoolExecutor for most threading needs. It manages threads, collects results, and propagates exceptions cleanly.
  • Protect shared state with locks. Race conditions are the most common threading bug, and queue.Queue eliminates most locking concerns.
  • Avoid deadlocks by acquiring locks in a consistent order and using timeouts.
  • Choose the right tool: threading for I/O, multiprocessing for CPU, asyncio for thousands of concurrent connections.

Start with ThreadPoolExecutor and a simple executor.map() call. Measure the speedup. Add synchronization only where shared mutable state demands it. Threading does not require a complete rewrite of your code. A few lines of concurrent.futures can deliver dramatic performance improvements for any program that spends time waiting.

Related Guides

📚