Multithreading in Python
Multithreading in Python allows you to execute multiple threads (smaller units of a process) concurrently,
making your programs more efficient, especially when dealing with I/O-bound operations. Python provides
built-in modules like threading
and concurrent.futures
to implement multithreading.
Threading Fundamentals
Understanding Threads
A thread is the smallest unit of execution within a process. Multiple threads in the same process share memory space, which allows them to access the same data, but can also lead to issues like race conditions and deadlocks if not managed properly.
# Basic thread creation
import threading
import time
def task(name, delay):
print(f"Task {name} started")
time.sleep(delay) # Simulate work
print(f"Task {name} completed")
# Create threads
thread1 = threading.Thread(target=task, args=("A", 2))
thread2 = threading.Thread(target=task, args=("B", 1))
# Start threads
thread1.start()
thread2.start()
# Wait for threads to complete
thread1.join()
thread2.join()
print("All tasks completed")
The Global Interpreter Lock (GIL)
Python's Global Interpreter Lock (GIL) prevents multiple native threads from executing Python bytecode simultaneously. This means that threading in Python is best suited for I/O-bound tasks rather than CPU-bound tasks.
# I/O-bound task (good for threading) import threading import requests import time def download_site(url): response = requests.get(url) print(f"Downloaded {url}: {len(response.text)} bytes") urls = ["https://example.com", "https://python.org", "https://github.com"] * 3 # Sequential download start_time = time.time() for url in urls: download_site(url) print(f"Sequential time: {time.time() - start_time:.2f} seconds") # Threaded download start_time = time.time() threads = [] for url in urls: thread = threading.Thread(target=download_site, args=(url,)) threads.append(thread) thread.start() for thread in threads: thread.join() print(f"Threaded time: {time.time() - start_time:.2f} seconds")
When to Use Threading
Good Use Cases | Not Ideal For |
---|---|
Network I/O operations | CPU-intensive calculations |
File I/O operations | Numerical processing |
User interface responsiveness | Parallel data processing |
Waiting for external resources | Heavy computational tasks |
Thread Synchronization
Locks (Mutexes)
When multiple threads access shared resources, locks help prevent race conditions by allowing only one thread to access the resource at a time.
import threading # Shared resource without lock (problematic) counter = 0 def increment_without_lock(): global counter for _ in range(100000): current = counter counter = current + 1 # Race condition can occur here # Shared resource with lock (safe) counter_with_lock = 0 lock = threading.Lock() def increment_with_lock(): global counter_with_lock for _ in range(100000): with lock: # Acquire and release the lock automatically current = counter_with_lock counter_with_lock = current + 1 # Test without lock threads = [threading.Thread(target=increment_without_lock) for _ in range(5)] for thread in threads: thread.start() for thread in threads: thread.join() print(f"Without lock: {counter}") # Often less than 500,000 # Test with lock threads = [threading.Thread(target=increment_with_lock) for _ in range(5)] for thread in threads: thread.start() for thread in threads: thread.join() print(f"With lock: {counter_with_lock}") # Always 500,000
RLock (Reentrant Lock)
A reentrant lock can be acquired multiple times by the same thread, which is useful for recursive functions or nested lock usage.
import threading
# Regular Lock would cause deadlock in this scenario
class RecursiveTask:
def __init__(self):
self.lock = threading.RLock() # Use RLock instead of Lock
self.data = {}
def update_recursive(self, key, value, depth=3):
with self.lock: # First acquisition of the lock
self.data[key] = value
print(f"Updated {key} to {value}")
if depth > 0:
# Second acquisition of the lock by the same thread
# A regular Lock would deadlock here
with self.lock:
self.update_recursive(key + "_sub", value + 1, depth - 1)
task = RecursiveTask()
task.update_recursive("key", 1) # Works with RLock, would deadlock with Lock
Semaphore
Semaphores limit the number of threads that can access a resource simultaneously, which is useful for controlling access to limited resources.
import threading import time import random # Semaphore for limiting concurrent access pool_semaphore = threading.Semaphore(3) # Allow only 3 concurrent threads def worker(name): with pool_semaphore: # Acquire the semaphore (blocks if already at limit) print(f"Worker {name} is accessing the pool") time.sleep(random.uniform(1, 3)) # Simulate work print(f"Worker {name} is releasing the pool") # Create and start 10 worker threads workers = [] for i in range(10): thread = threading.Thread(target=worker, args=(f"Worker-{i}",)) workers.append(thread) thread.start() # Wait for all threads to complete for thread in workers: thread.join()
Events
Events provide a simple way to communicate between threads, allowing one thread to signal to others that something has occurred.
import threading import time # Event for thread communication data_ready = threading.Event() def producer(): print("Producer: Preparing data...") time.sleep(2) # Simulate data preparation print("Producer: Data is ready") data_ready.set() # Signal that data is ready def consumer(): print("Consumer: Waiting for data...") data_ready.wait() # Wait until data is ready print("Consumer: Consuming data") # Create and start threads producer_thread = threading.Thread(target=producer) consumer_thread = threading.Thread(target=consumer) producer_thread.start() consumer_thread.start() producer_thread.join() consumer_thread.join()
Condition Variables
Condition variables provide a more complex synchronization mechanism that allows threads to wait for specific conditions.
import threading import time import random # Producer-consumer pattern with condition variables class Buffer: def __init__(self, size): self.buffer = [] self.size = size self.condition = threading.Condition() def add_item(self, item): with self.condition: while len(self.buffer) >= self.size: print(f"Buffer full, producer waiting...") self.condition.wait() # Wait until there's space self.buffer.append(item) print(f"Produced: {item}, Buffer: {self.buffer}") self.condition.notify() # Notify a waiting consumer def get_item(self): with self.condition: while not self.buffer: print(f"Buffer empty, consumer waiting...") self.condition.wait() # Wait until there's an item item = self.buffer.pop(0) print(f"Consumed: {item}, Buffer: {self.buffer}") self.condition.notify() # Notify a waiting producer return item # Create a shared buffer buffer = Buffer(size=3) # Producer function def producer(): for i in range(10): time.sleep(random.uniform(0.1, 0.5)) # Simulate production time buffer.add_item(i) # Consumer function def consumer(): for _ in range(10): time.sleep(random.uniform(0.2, 0.6)) # Simulate consumption time buffer.get_item() # Create and start threads producer_thread = threading.Thread(target=producer) consumer_thread = threading.Thread(target=consumer) producer_thread.start() consumer_thread.start() producer_thread.join() consumer_thread.join()
Thread Pools
ThreadPoolExecutor
Thread pools manage a set of worker threads for executing tasks, which is more efficient than creating and destroying threads for each task.
Python's concurrent.futures
module provides a high-level interface for thread pools.
from concurrent.futures import ThreadPoolExecutor import time # Function to be executed in parallel def process_item(item): print(f"Processing {item}") time.sleep(1) # Simulate processing time return item * item # Using ThreadPoolExecutor with ThreadPoolExecutor(max_workers=4) as executor: # Method 1: Submit individual tasks future1 = executor.submit(process_item, 1) future2 = executor.submit(process_item, 2) future3 = executor.submit(process_item, 3) print(f"Result 1: {future1.result()}") print(f"Result 2: {future2.result()}") print(f"Result 3: {future3.result()}") # Method 2: Process a collection of items items = [4, 5, 6, 7, 8] results = list(executor.map(process_item, items)) print(f"Batch results: {results}") # Method 3: Process tasks asynchronously futures = [executor.submit(process_item, i) for i in range(9, 12)] for future in futures: result = future.result() print(f"Async result: {result}")
Waiting for Results
When using thread pools, you can wait for results in different ways.
from concurrent.futures import ThreadPoolExecutor, as_completed import time import random # Function with varying completion times def task_with_random_delay(name): delay = random.uniform(0.5, 3) time.sleep(delay) return f"Task {name} completed in {delay:.2f} seconds" # Submit multiple tasks with ThreadPoolExecutor(max_workers=5) as executor: # Submit tasks and store futures futures = {executor.submit(task_with_random_delay, f"Task-{i}"): i for i in range(10)} # Process results as they complete (not in submission order) print("Results as they complete:") for future in as_completed(futures): task_id = futures[future] try: result = future.result() print(f"Task ID {task_id}: {result}") except Exception as e: print(f"Task ID {task_id} generated an exception: {e}") # Process all results when all tasks are done print("\nAll results after completion:") for task_id, future in sorted([(task_id, future) for future, task_id in futures.items()]): print(f"Task ID {task_id}: {future.result()}")
Daemon Threads
Understanding Daemon Threads
Daemon threads run in the background and do not prevent the program from exiting when all non-daemon threads have finished. They are useful for tasks that don't need to complete, like monitoring or cleanup.
import threading import time # Background task (daemon thread) def background_task(): count = 0 while True: count += 1 print(f"Background task running... Count: {count}") time.sleep(1) # Main task (non-daemon thread) def main_task(): for i in range(3): print(f"Main task step {i}") time.sleep(0.5) print("Main task completed") # Create and start threads background_thread = threading.Thread(target=background_task, daemon=True) main_thread = threading.Thread(target=main_task) background_thread.start() main_thread.start() # Wait only for the main thread to complete main_thread.join() print("Program exit - daemon thread will be terminated") # If we wanted to wait for the background thread too (which would never end): # background_thread.join() # This would prevent the program from exiting
Use Cases for Daemon Threads
- Monitoring services
- Periodic cleanup operations
- Heartbeat signals
- Background logging
- Watchdog processes
import threading import time import queue # Simple background logger using a daemon thread class BackgroundLogger: def __init__(self): self.log_queue = queue.Queue() self.daemon_thread = threading.Thread(target=self._process_logs, daemon=True) self.daemon_thread.start() def _process_logs(self): while True: log_entry = self.log_queue.get() if log_entry is None: # Sentinel value to stop break timestamp = time.strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] {log_entry}") time.sleep(0.1) # Simulate I/O operation self.log_queue.task_done() def log(self, message): self.log_queue.put(message) def stop(self): self.log_queue.put(None) if self.daemon_thread.is_alive(): self.daemon_thread.join() # Usage logger = BackgroundLogger() # Main application logic for i in range(5): logger.log(f"Processing item {i}") time.sleep(0.5) # Since it's a daemon thread, we can just exit without calling stop() # But for a clean shutdown, we can do: logger.stop()
Best Practices
Thread Safety Principles
- Always use proper synchronization mechanisms when accessing shared resources
- Minimize the critical section (the code protected by locks)
- Prefer thread-safe data structures or methods when available
- Be aware of the Global Interpreter Lock (GIL) limitations
- Use thread-local storage for thread-specific data
- Avoid creating too many threads (they have overhead)
- Prefer thread pools for managing multiple concurrent tasks
# Thread-local storage example import threading # Create thread-local data thread_local = threading.local() def process_request(request_id): # Store thread-specific data thread_local.request_id = request_id # Process the request print(f"Processing request {thread_local.request_id} in thread {threading.current_thread().name}") # Log something log_message(f"Request completed") def log_message(message): # Access thread-specific data without passing it explicitly if hasattr(thread_local, 'request_id'): print(f"[Request {thread_local.request_id}] {message}") else: print(f"[Unknown request] {message}") # Create and start threads threads = [] for i in range(3): thread = threading.Thread(target=process_request, args=(f"REQ-{i}",)) threads.append(thread) thread.start() for thread in threads: thread.join()
Avoiding Common Pitfalls
- Race Conditions: Use locks or other synchronization mechanisms
- Deadlocks: Acquire locks in a consistent order
- Thread Starvation: Don't hold locks for too long
- Resource Exhaustion: Limit the number of threads
- Thread Safety: Use thread-safe containers and operations
import threading import time from queue import Queue # Thread-safe counter with Queue def safe_counter(): counter_queue = Queue() counter_queue.put(0) # Initialize counter def increment(): value = counter_queue.get() time.sleep(0.001) # Simulate work counter_queue.put(value + 1) def get_value(): value = counter_queue.get() counter_queue.put(value) # Put it back return value return increment, get_value increment, get_value = safe_counter() # Use the counter in multiple threads threads = [] for _ in range(100): thread = threading.Thread(target=increment) threads.append(thread) thread.start() for thread in threads: thread.join() print(f"Final counter value: {get_value()}") # Always 100
Practice Exercises
Try These:
- Create a multithreaded web scraper that downloads multiple pages concurrently.
- Implement a thread-safe cache for expensive function calls.
- Build a producer-consumer system with multiple producer and consumer threads.
- Create a thread pool that processes a large set of data in parallel.
- Implement a daemon thread that monitors changes to a directory.