Async Python: The Stuff Nobody Tells You
Async Python looks simple in tutorials. 'Just add async and await!' they say. Then you try to use it for real and discover all the gotchas that nobody mentioned. After using async Python in production for years, here's what I wish someone had told me from the start.
First, let's be clear about what async actually does
Async Python is about concurrency, not parallelism. The difference matters.
Parallelism means doing multiple things at the same time. You need multiple CPUs or cores. Python's GIL (Global Interpreter Lock) prevents true parallelism for CPU-bound code.
Concurrency means managing multiple things at once, but not necessarily doing them simultaneously. While one task waits for I/O (network, disk, etc.), another task can run.
Async Python excels at concurrency. It's perfect when your code spends most of its time waiting for external things - API responses, database queries, file reads. It's terrible when your code is actually doing computation.
Gotcha #1: One slow thing blocks everything
This is the most common mistake. Async doesn't make slow code fast. If you do something CPU-intensive in an async function, you block the whole event loop. The 'async' keyword doesn't magically parallelize your code.
async def process_image(image_data):
# BAD: This blocks the entire event loop!
result = expensive_image_processing(image_data)
return result
While that image is being processed, nothing else can run. No other requests get handled. Your whole application freezes.
The fix is to run CPU-bound work in a thread pool:
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
async def process_image(image_data):
loop = asyncio.get_event_loop()
# Runs in a separate thread, doesn't block the event loop
result = await loop.run_in_executor(
executor, expensive_image_processing, image_data
)
return result
Or for truly CPU-intensive work, use a process pool to bypass the GIL entirely.
Gotcha #2: asyncio.gather() can fail silently (sort of)
When you're running multiple async tasks concurrently, gather() is the go-to:
results = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
fetch_notifications(user_id)
)
But here's the thing: if any of those tasks raises an exception, gather() raises immediately and you lose the results from the tasks that succeeded.
Usually what you actually want is:
results = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
fetch_notifications(user_id),
return_exceptions=True # <- This changes everything
)
# Now results might contain exceptions mixed with successful results
user, orders, notifications = results
if isinstance(orders, Exception):
# Handle the error, but we still got user and notifications
orders = []
With return_exceptions=True, failed tasks return their exception as a result instead of crashing everything. You can handle failures individually while still using the successful results.
Gotcha #3: You need to limit concurrency yourself
Async makes it easy to fire off thousands of concurrent operations. Too easy.
# This looks innocent but will probably crash something
urls = [f'https://api.example.com/item/{i}' for i in range(10000)]
results = await asyncio.gather(*[fetch(url) for url in urls])
You just tried to open 10,000 simultaneous connections. The target server is going to rate-limit you, or your OS will run out of file descriptors, or you'll just crash something.
The fix is a semaphore:
sem = asyncio.Semaphore(50) # max 50 concurrent requests
async def limited_fetch(url):
async with sem:
return await fetch(url)
# Now only 50 requests run at a time
results = await asyncio.gather(*[limited_fetch(url) for url in urls])
The semaphore acts like a bouncer at a club. Only 50 requests can be 'inside' at a time. The rest wait their turn.
I usually wrap this pattern in a helper function because I use it so often:
async def gather_with_limit(coros, limit=50):
sem = asyncio.Semaphore(limit)
async def limited(coro):
async with sem:
return await coro
return await asyncio.gather(*[limited(c) for c in coros])
Gotcha #4: Debugging is genuinely harder
Stack traces in async code are confusing. When an exception happens, the traceback shows you the async machinery instead of where your actual logic lives. You'll see frames like _run_once and run_until_complete that tell you nothing useful.
The traceback might not show the line that actually caused the problem. It might show where you awaited something, which could be far from where the bug actually is.
Print statements become your friend again:
async def mysterious_function(data):
print(f'DEBUG: entering with {data=}')
result = await some_operation(data)
print(f'DEBUG: got {result=}')
processed = transform(result)
print(f'DEBUG: transformed to {processed=}')
return processed
Yeah, it's not sophisticated. But it works.
For more complex debugging, look into enabling asyncio's debug mode:
import asyncio
asyncio.get_event_loop().set_debug(True)
This gives you warnings about coroutines that were never awaited and slow callbacks that might be blocking the event loop.
Gotcha #5: Mixing sync and async code is tricky
You can't call an async function from sync code without some ceremony:
# This doesn't work - you just get a coroutine object
result = async_function() # Returns <coroutine object>
# You need to actually run it
result = asyncio.run(async_function()) # Works, but creates a new event loop
And you can't easily call sync code from async without blocking:
async def handler(request):
# This blocks the event loop!
data = sync_database_call() # Oops
return Response(data)
When you're in async land, everything you await needs to be async-native. Libraries like httpx (instead of requests), asyncpg (instead of psycopg2), aiofiles (instead of open). There's an async version of most popular libraries, but you have to actually use them.
Gotcha #6: Context doesn't propagate automatically
If you're using context variables (like for request IDs in logging), they don't automatically propagate to spawned tasks:
from contextvars import ContextVar
request_id_var = ContextVar('request_id')
async def handle_request(request_id):
request_id_var.set(request_id)
# This task sees request_id_var fine
# But tasks created here might not, depending on how you create them
asyncio.create_task(background_job()) # Inherits context (good)
The good news is that asyncio.create_task() copies the context by default. But if you're using thread pools or other mechanisms, you might need to handle context explicitly.
Gotcha #7: Timeouts are not built-in where you think they are
I see this all the time in production code - someone writes an async function that calls an API and assumes there's some default timeout. There isn't.
async def fetch_user_data(user_id):
# If this API is down, this will hang FOREVER
async with httpx.AsyncClient() as client:
response = await client.get(f'https://api.example.com/users/{user_id}')
return response.json()
If that API server is stuck, your request will wait indefinitely. Your application will slowly accumulate stuck tasks until it grinds to a halt.
Always use timeouts:
import asyncio
async def fetch_user_data(user_id):
async with httpx.AsyncClient(timeout=10.0) as client:
try:
response = await asyncio.wait_for(
client.get(f'https://api.example.com/users/{user_id}'),
timeout=5.0
)
return response.json()
except asyncio.TimeoutError:
# Handle timeout gracefully
raise ServiceUnavailableError(f'User service timed out for {user_id}')
Notice I have two timeouts here - one at the HTTP client level (10 seconds) and one wrapping the specific request (5 seconds). The client-level timeout is a safety net; the request-level timeout is what I actually expect to trigger if things go wrong.
Gotcha #8: Task cancellation is more complex than it looks
Cancelling async tasks seems straightforward until you actually need to do cleanup properly.
async def long_running_task():
try:
await asyncio.sleep(100)
print('Task completed')
except asyncio.CancelledError:
print('Task was cancelled')
# WRONG: If you don't re-raise, cancellation is suppressed!
If you catch CancelledError and don't re-raise it, the task won't actually be cancelled. It'll keep running. This is especially dangerous with background tasks.
The correct pattern:
async def long_running_task():
try:
await asyncio.sleep(100)
print('Task completed')
except asyncio.CancelledError:
# Do cleanup
await cleanup_resources()
# MUST re-raise
raise
Even better, use context managers for cleanup when possible:
async def long_running_task():
async with managed_resource() as resource:
await asyncio.sleep(100)
# Context manager handles cleanup even on cancellation
Real-world pattern: The retry decorator
Here's a pattern I use constantly in production. Most API calls should retry with exponential backoff, but writing that logic everywhere is tedious and error-prone.
import asyncio
import random
from functools import wraps
def async_retry(max_attempts=3, base_delay=1.0, max_delay=60.0):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt == max_attempts - 1:
# Last attempt, give up
raise
# Exponential backoff with jitter
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
await asyncio.sleep(delay + jitter)
raise last_exception
return wrapper
return decorator
# Usage
@async_retry(max_attempts=3, base_delay=1.0)
async def fetch_flaky_api(url):
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
return response.json()
The jitter is important - without it, if multiple tasks are retrying simultaneously, they'll all retry at the exact same time and create thundering herd problems.
Real-world pattern: Batching requests
Sometimes you have many small items to process, and making individual API calls for each is inefficient. Batching helps:
class AsyncBatcher:
def __init__(self, batch_size=50, flush_interval=1.0):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.queue = []
self.lock = asyncio.Lock()
self.flush_task = None
async def start(self):
self.flush_task = asyncio.create_task(self._periodic_flush())
async def _periodic_flush(self):
while True:
await asyncio.sleep(self.flush_interval)
await self.flush()
async def add(self, item):
async with self.lock:
self.queue.append(item)
if len(self.queue) >= self.batch_size:
await self.flush()
async def flush(self):
async with self.lock:
if not self.queue:
return
batch = self.queue[:]
self.queue.clear()
# Process batch (outside the lock)
await self._process_batch(batch)
async def _process_batch(self, items):
# Send all items in one API call
await api.bulk_create(items)
# Usage
batcher = AsyncBatcher(batch_size=100, flush_interval=2.0)
await batcher.start()
# Later, items accumulate and get flushed automatically
for item in items:
await batcher.add(item)
This pattern ensures you're not making tons of tiny API calls, but you also don't wait forever to process items.
Real-world pattern: Circuit breaker
When a service is down, you don't want to keep hammering it with requests. A circuit breaker helps:
from enum import Enum
import time
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Service is down, fail fast
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60.0):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time >= self.timeout:
# Try again
self.state = CircuitState.HALF_OPEN
else:
raise Exception('Circuit breaker is OPEN')
try:
result = await func(*args, **kwargs)
# Success - reset
self.failure_count = 0
self.state = CircuitState.CLOSED
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
# Usage
breaker = CircuitBreaker(failure_threshold=5, timeout=60.0)
async def fetch_with_breaker(url):
return await breaker.call(httpx.get, url)
When the service is down, the circuit breaker "opens" and fails fast instead of waiting for timeouts. After a cooldown period, it goes into "half-open" state to test if the service recovered.
Testing async code
Testing async code requires some special considerations. You can't just call an async function in a test - you need to run it in an event loop.
pytest makes this easier with pytest-asyncio:
import pytest
@pytest.mark.asyncio
async def test_fetch_user():
result = await fetch_user(123)
assert result['id'] == 123
# For testing timeout behavior
@pytest.mark.asyncio
async def test_timeout_handling():
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_function(), timeout=0.1)
# For testing concurrent behavior
@pytest.mark.asyncio
async def test_concurrent_requests():
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3)
)
assert len(results) == 3
Mocking async functions requires async mocks:
from unittest.mock import AsyncMock
async def test_with_mock():
mock_api = AsyncMock()
mock_api.get_user.return_value = {'id': 123, 'name': 'Test'}
result = await fetch_user_data(123, api=mock_api)
assert result['name'] == 'Test'
mock_api.get_user.assert_called_once_with(123)
Performance monitoring
In production, you need to monitor your async code's performance. Here's a pattern I use:
import time
from functools import wraps
def async_timing(name):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
result = await func(*args, **kwargs)
return result
finally:
duration = time.perf_counter() - start
# Log to your metrics system
metrics.histogram(f'async.{name}.duration', duration)
return wrapper
return decorator
@async_timing('fetch_user')
async def fetch_user(user_id):
# ... implementation
pass
This helps you identify which async operations are slow and might be blocking your event loop.
Common async pitfalls in specific scenarios
Let me walk through some real scenarios I've debugged in production systems.
The database connection pool gotcha
You switched to an async database driver like asyncpg. Great! But did you configure the connection pool correctly?
# BAD: Creating a new pool for every request
async def get_user(user_id):
pool = await asyncpg.create_pool(DATABASE_URL) # Don't do this!
async with pool.acquire() as conn:
return await conn.fetchrow('SELECT * FROM users WHERE id = $1', user_id)
# GOOD: Create pool once at startup
pool = None
async def startup():
global pool
pool = await asyncpg.create_pool(
DATABASE_URL,
min_size=10,
max_size=20,
command_timeout=5.0
)
async def get_user(user_id):
async with pool.acquire() as conn:
return await conn.fetchrow('SELECT * FROM users WHERE id = $1', user_id)
Connection pools are expensive to create. Create one at application startup and reuse it. Also, set timeouts on your queries - database operations can hang just like HTTP requests.
The file I/O mistake
Reading files with regular open() in async code blocks the event loop. Use aiofiles:
# BAD: Blocks the event loop
async def read_config():
with open('config.json') as f:
return json.load(f)
# GOOD: Actually async
import aiofiles
async def read_config():
async with aiofiles.open('config.json', mode='r') as f:
contents = await f.read()
return json.loads(contents)
This matters more than you'd think. On a busy server reading multiple config files or logs, synchronous file I/O can add noticeable latency.
The background task memory leak
Background tasks that never complete can leak memory. Always keep track of them:
# BAD: Tasks just float around forever
async def handle_request(request):
# Fire and forget - but what if it never finishes?
asyncio.create_task(send_analytics_event(request))
return Response('OK')
# GOOD: Track and periodically clean up
background_tasks = set()
async def handle_request(request):
task = asyncio.create_task(send_analytics_event(request))
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
return Response('OK')
The add_done_callback ensures completed tasks are removed from the set automatically. Without this, the set grows unbounded and you have a memory leak.
The sleep vs. wait_for confusion
asyncio.sleep() is not the same as timeout. I see this mistake all the time:
# WRONG: This doesn't timeout the operation!
async def fetch_with_timeout(url):
await asyncio.sleep(5) # This just waits 5 seconds, then continues
return await httpx.get(url)
# RIGHT: This actually enforces a timeout
async def fetch_with_timeout(url):
return await asyncio.wait_for(httpx.get(url), timeout=5)
sleep() is for delaying execution. wait_for() is for enforcing timeouts on operations.
The exception in asyncio.gather() detail
Remember how I said gather() fails if any task raises an exception? Here's the subtle part - it doesn't cancel the other tasks automatically:
async def slow_task():
await asyncio.sleep(10)
return 'slow'
async def failing_task():
await asyncio.sleep(1)
raise ValueError('oops')
# The slow_task keeps running even though gather() raises after 1 second
try:
results = await asyncio.gather(slow_task(), failing_task())
except ValueError:
# failing_task raised, but slow_task is STILL RUNNING in the background!
pass
If you want to cancel all tasks when one fails, you need to do it explicitly:
tasks = [asyncio.create_task(slow_task()), asyncio.create_task(failing_task())]
try:
results = await asyncio.gather(*tasks)
except Exception:
# Cancel all tasks
for task in tasks:
task.cancel()
raise
Library-specific gotchas
Different async libraries have their own quirks. Here are some I've encountered:
aiohttp vs httpx
Both are popular async HTTP clients, but they have different APIs and behaviors:
# aiohttp requires a session
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as response:
data = await response.json()
# httpx can be used without a session (but sessions are better for connection pooling)
response = await httpx.get('https://api.example.com/data')
data = response.json() # Note: NOT awaited in httpx!
The response.json() difference has bitten me multiple times when switching between libraries.
Redis and connection handling
With aioredis (now merged into redis-py 4.2+), forgetting to await can cause silent failures:
# BAD: Forgot to await, get a coroutine object instead of the value
redis = aioredis.from_url('redis://localhost')
value = redis.get('key') # Returns a coroutine, not the value!
# GOOD: Actually await the operation
value = await redis.get('key')
Python won't always warn you about un-awaited coroutines if you don't use the result, leading to operations that silently don't happen.
The "just use asyncio.run()" trap
For simple scripts, asyncio.run() seems perfect. But in more complex scenarios, it has limitations:
# This looks fine
async def main():
result = await fetch_data()
print(result)
asyncio.run(main())
But asyncio.run() creates a new event loop, runs your coroutine, and closes the loop. If you need to run async code in an interactive environment (like a Jupyter notebook) or integrate with existing async code, you might need:
# For Jupyter/IPython
import nest_asyncio
nest_asyncio.apply()
# Or get the running loop if one exists
try:
loop = asyncio.get_running_loop()
# Use loop.create_task() instead of asyncio.run()
except RuntimeError:
# No loop is running, safe to use asyncio.run()
asyncio.run(main())
When async is worth it
Despite the gotchas, async Python is genuinely useful for:
- Web servers: Handling many concurrent HTTP requests
- API clients: Making many API calls concurrently
- WebSockets: Managing many long-lived connections
- Data pipelines: Fetching from multiple sources concurrently
The pattern is always the same: lots of waiting for I/O. If that describes your workload, async can give you significant performance improvements with relatively little code change.
When to avoid async
Don't use async for:
- CPU-bound work: Use multiprocessing instead
- Simple scripts: The overhead isn't worth it
- Heavily sync dependencies: If your main library is sync-only, don't fight it
- When you don't understand it: Sync code that works beats async code that doesn't
The bottom line
Async Python is powerful but has sharp edges. The tutorials make it look easy because they skip the hard parts. In production, you'll hit all of these gotchas and more.
But once you learn the patterns - semaphores for limiting concurrency, run_in_executor for CPU-bound work, return_exceptions for robust error handling - async Python becomes a genuinely useful tool. Just don't expect it to be as simple as the tutorials suggest.