Multithreading in Python

Multithreading in Python allows you to execute multiple threads (smaller units of a process) concurrently, making your programs more efficient, especially when dealing with I/O-bound operations. Python provides built-in modules like threading and concurrent.futures to implement multithreading.

Threading Fundamentals

Understanding Threads

A thread is the smallest unit of execution within a process. Multiple threads in the same process share memory space, which allows them to access the same data, but can also lead to issues like race conditions and deadlocks if not managed properly.

# Basic thread creation
import threading
import time

def task(name, delay):
    print(f"Task {name} started")
    time.sleep(delay)  # Simulate work
    print(f"Task {name} completed")

# Create threads
thread1 = threading.Thread(target=task, args=("A", 2))
thread2 = threading.Thread(target=task, args=("B", 1))

# Start threads
thread1.start()
thread2.start()

# Wait for threads to complete
thread1.join()
thread2.join()

print("All tasks completed")

The Global Interpreter Lock (GIL)

Python's Global Interpreter Lock (GIL) prevents multiple native threads from executing Python bytecode simultaneously. This means that threading in Python is best suited for I/O-bound tasks rather than CPU-bound tasks.

# I/O-bound task (good for threading)
import threading
import requests
import time

def download_site(url):
    response = requests.get(url)
    print(f"Downloaded {url}: {len(response.text)} bytes")

urls = ["https://example.com", "https://python.org", "https://github.com"] * 3

# Sequential download
start_time = time.time()
for url in urls:
    download_site(url)
print(f"Sequential time: {time.time() - start_time:.2f} seconds")

# Threaded download
start_time = time.time()
threads = []
for url in urls:
    thread = threading.Thread(target=download_site, args=(url,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()
print(f"Threaded time: {time.time() - start_time:.2f} seconds")

When to Use Threading

Good Use Cases Not Ideal For
Network I/O operations CPU-intensive calculations
File I/O operations Numerical processing
User interface responsiveness Parallel data processing
Waiting for external resources Heavy computational tasks

Thread Synchronization

Locks (Mutexes)

When multiple threads access shared resources, locks help prevent race conditions by allowing only one thread to access the resource at a time.

import threading

# Shared resource without lock (problematic)
counter = 0

def increment_without_lock():
    global counter
    for _ in range(100000):
        current = counter
        counter = current + 1  # Race condition can occur here

# Shared resource with lock (safe)
counter_with_lock = 0
lock = threading.Lock()

def increment_with_lock():
    global counter_with_lock
    for _ in range(100000):
        with lock:  # Acquire and release the lock automatically
            current = counter_with_lock
            counter_with_lock = current + 1

# Test without lock
threads = [threading.Thread(target=increment_without_lock) for _ in range(5)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()
print(f"Without lock: {counter}")  # Often less than 500,000

# Test with lock
threads = [threading.Thread(target=increment_with_lock) for _ in range(5)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()
print(f"With lock: {counter_with_lock}")  # Always 500,000

RLock (Reentrant Lock)

A reentrant lock can be acquired multiple times by the same thread, which is useful for recursive functions or nested lock usage.

import threading

# Regular Lock would cause deadlock in this scenario
class RecursiveTask:
    def __init__(self):
        self.lock = threading.RLock()  # Use RLock instead of Lock
        self.data = {}
        
    def update_recursive(self, key, value, depth=3):
        with self.lock:  # First acquisition of the lock
            self.data[key] = value
            print(f"Updated {key} to {value}")
            
            if depth > 0:
                # Second acquisition of the lock by the same thread
                # A regular Lock would deadlock here
                with self.lock:
                    self.update_recursive(key + "_sub", value + 1, depth - 1)

task = RecursiveTask()
task.update_recursive("key", 1)  # Works with RLock, would deadlock with Lock

Semaphore

Semaphores limit the number of threads that can access a resource simultaneously, which is useful for controlling access to limited resources.

import threading
import time
import random

# Semaphore for limiting concurrent access
pool_semaphore = threading.Semaphore(3)  # Allow only 3 concurrent threads

def worker(name):
    with pool_semaphore:  # Acquire the semaphore (blocks if already at limit)
        print(f"Worker {name} is accessing the pool")
        time.sleep(random.uniform(1, 3))  # Simulate work
        print(f"Worker {name} is releasing the pool")

# Create and start 10 worker threads
workers = []
for i in range(10):
    thread = threading.Thread(target=worker, args=(f"Worker-{i}",))
    workers.append(thread)
    thread.start()

# Wait for all threads to complete
for thread in workers:
    thread.join()

Events

Events provide a simple way to communicate between threads, allowing one thread to signal to others that something has occurred.

import threading
import time

# Event for thread communication
data_ready = threading.Event()

def producer():
    print("Producer: Preparing data...")
    time.sleep(2)  # Simulate data preparation
    print("Producer: Data is ready")
    data_ready.set()  # Signal that data is ready

def consumer():
    print("Consumer: Waiting for data...")
    data_ready.wait()  # Wait until data is ready
    print("Consumer: Consuming data")

# Create and start threads
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

Condition Variables

Condition variables provide a more complex synchronization mechanism that allows threads to wait for specific conditions.

import threading
import time
import random

# Producer-consumer pattern with condition variables
class Buffer:
    def __init__(self, size):
        self.buffer = []
        self.size = size
        self.condition = threading.Condition()
        
    def add_item(self, item):
        with self.condition:
            while len(self.buffer) >= self.size:
                print(f"Buffer full, producer waiting...")
                self.condition.wait()  # Wait until there's space
                
            self.buffer.append(item)
            print(f"Produced: {item}, Buffer: {self.buffer}")
            
            self.condition.notify()  # Notify a waiting consumer
            
    def get_item(self):
        with self.condition:
            while not self.buffer:
                print(f"Buffer empty, consumer waiting...")
                self.condition.wait()  # Wait until there's an item
                
            item = self.buffer.pop(0)
            print(f"Consumed: {item}, Buffer: {self.buffer}")
            
            self.condition.notify()  # Notify a waiting producer
            return item

# Create a shared buffer
buffer = Buffer(size=3)

# Producer function
def producer():
    for i in range(10):
        time.sleep(random.uniform(0.1, 0.5))  # Simulate production time
        buffer.add_item(i)

# Consumer function
def consumer():
    for _ in range(10):
        time.sleep(random.uniform(0.2, 0.6))  # Simulate consumption time
        buffer.get_item()

# Create and start threads
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

Thread Pools

ThreadPoolExecutor

Thread pools manage a set of worker threads for executing tasks, which is more efficient than creating and destroying threads for each task. Python's concurrent.futures module provides a high-level interface for thread pools.

from concurrent.futures import ThreadPoolExecutor
import time

# Function to be executed in parallel
def process_item(item):
    print(f"Processing {item}")
    time.sleep(1)  # Simulate processing time
    return item * item

# Using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as executor:
    # Method 1: Submit individual tasks
    future1 = executor.submit(process_item, 1)
    future2 = executor.submit(process_item, 2)
    future3 = executor.submit(process_item, 3)
    
    print(f"Result 1: {future1.result()}")
    print(f"Result 2: {future2.result()}")
    print(f"Result 3: {future3.result()}")
    
    # Method 2: Process a collection of items
    items = [4, 5, 6, 7, 8]
    results = list(executor.map(process_item, items))
    print(f"Batch results: {results}")
    
    # Method 3: Process tasks asynchronously
    futures = [executor.submit(process_item, i) for i in range(9, 12)]
    
    for future in futures:
        result = future.result()
        print(f"Async result: {result}")

Waiting for Results

When using thread pools, you can wait for results in different ways.

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

# Function with varying completion times
def task_with_random_delay(name):
    delay = random.uniform(0.5, 3)
    time.sleep(delay)
    return f"Task {name} completed in {delay:.2f} seconds"

# Submit multiple tasks
with ThreadPoolExecutor(max_workers=5) as executor:
    # Submit tasks and store futures
    futures = {executor.submit(task_with_random_delay, f"Task-{i}"): i for i in range(10)}
    
    # Process results as they complete (not in submission order)
    print("Results as they complete:")
    for future in as_completed(futures):
        task_id = futures[future]
        try:
            result = future.result()
            print(f"Task ID {task_id}: {result}")
        except Exception as e:
            print(f"Task ID {task_id} generated an exception: {e}")
            
    # Process all results when all tasks are done
    print("\nAll results after completion:")
    for task_id, future in sorted([(task_id, future) for future, task_id in futures.items()]):
        print(f"Task ID {task_id}: {future.result()}")

Daemon Threads

Understanding Daemon Threads

Daemon threads run in the background and do not prevent the program from exiting when all non-daemon threads have finished. They are useful for tasks that don't need to complete, like monitoring or cleanup.

import threading
import time

# Background task (daemon thread)
def background_task():
    count = 0
    while True:
        count += 1
        print(f"Background task running... Count: {count}")
        time.sleep(1)

# Main task (non-daemon thread)
def main_task():
    for i in range(3):
        print(f"Main task step {i}")
        time.sleep(0.5)
    print("Main task completed")

# Create and start threads
background_thread = threading.Thread(target=background_task, daemon=True)
main_thread = threading.Thread(target=main_task)

background_thread.start()
main_thread.start()

# Wait only for the main thread to complete
main_thread.join()
print("Program exit - daemon thread will be terminated")

# If we wanted to wait for the background thread too (which would never end):
# background_thread.join()
# This would prevent the program from exiting

Use Cases for Daemon Threads

  • Monitoring services
  • Periodic cleanup operations
  • Heartbeat signals
  • Background logging
  • Watchdog processes
import threading
import time
import queue

# Simple background logger using a daemon thread
class BackgroundLogger:
    def __init__(self):
        self.log_queue = queue.Queue()
        self.daemon_thread = threading.Thread(target=self._process_logs, daemon=True)
        self.daemon_thread.start()
        
    def _process_logs(self):
        while True:
            log_entry = self.log_queue.get()
            if log_entry is None:  # Sentinel value to stop
                break
                
            timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
            print(f"[{timestamp}] {log_entry}")
            time.sleep(0.1)  # Simulate I/O operation
            
            self.log_queue.task_done()
            
    def log(self, message):
        self.log_queue.put(message)
        
    def stop(self):
        self.log_queue.put(None)
        if self.daemon_thread.is_alive():
            self.daemon_thread.join()

# Usage
logger = BackgroundLogger()

# Main application logic
for i in range(5):
    logger.log(f"Processing item {i}")
    time.sleep(0.5)

# Since it's a daemon thread, we can just exit without calling stop()
# But for a clean shutdown, we can do:
logger.stop()

Best Practices

Thread Safety Principles

  • Always use proper synchronization mechanisms when accessing shared resources
  • Minimize the critical section (the code protected by locks)
  • Prefer thread-safe data structures or methods when available
  • Be aware of the Global Interpreter Lock (GIL) limitations
  • Use thread-local storage for thread-specific data
  • Avoid creating too many threads (they have overhead)
  • Prefer thread pools for managing multiple concurrent tasks
# Thread-local storage example
import threading

# Create thread-local data
thread_local = threading.local()

def process_request(request_id):
    # Store thread-specific data
    thread_local.request_id = request_id
    
    # Process the request
    print(f"Processing request {thread_local.request_id} in thread {threading.current_thread().name}")
    
    # Log something
    log_message(f"Request completed")
    
def log_message(message):
    # Access thread-specific data without passing it explicitly
    if hasattr(thread_local, 'request_id'):
        print(f"[Request {thread_local.request_id}] {message}")
    else:
        print(f"[Unknown request] {message}")

# Create and start threads
threads = []
for i in range(3):
    thread = threading.Thread(target=process_request, args=(f"REQ-{i}",))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

Avoiding Common Pitfalls

  • Race Conditions: Use locks or other synchronization mechanisms
  • Deadlocks: Acquire locks in a consistent order
  • Thread Starvation: Don't hold locks for too long
  • Resource Exhaustion: Limit the number of threads
  • Thread Safety: Use thread-safe containers and operations
import threading
import time
from queue import Queue

# Thread-safe counter with Queue
def safe_counter():
    counter_queue = Queue()
    counter_queue.put(0)  # Initialize counter
    
    def increment():
        value = counter_queue.get()
        time.sleep(0.001)  # Simulate work
        counter_queue.put(value + 1)
    
    def get_value():
        value = counter_queue.get()
        counter_queue.put(value)  # Put it back
        return value
    
    return increment, get_value

increment, get_value = safe_counter()

# Use the counter in multiple threads
threads = []
for _ in range(100):
    thread = threading.Thread(target=increment)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"Final counter value: {get_value()}")  # Always 100

Practice Exercises

Try These:

  1. Create a multithreaded web scraper that downloads multiple pages concurrently.
  2. Implement a thread-safe cache for expensive function calls.
  3. Build a producer-consumer system with multiple producer and consumer threads.
  4. Create a thread pool that processes a large set of data in parallel.
  5. Implement a daemon thread that monitors changes to a directory.
Back to Cheat Sheet