Async Python: The Stuff Nobody Tells You

Async Python: The Stuff Nobody Tells You

Async Python looks simple in tutorials. 'Just add async and await!' they say. Then you try to use it for real and discover all the gotchas that nobody mentioned. After using async Python in production for years, here's what I wish someone had told me from the start.

First, let's be clear about what async actually does

Async Python is about concurrency, not parallelism. The difference matters.

Parallelism means doing multiple things at the same time. You need multiple CPUs or cores. Python's GIL (Global Interpreter Lock) prevents true parallelism for CPU-bound code.

Concurrency means managing multiple things at once, but not necessarily doing them simultaneously. While one task waits for I/O (network, disk, etc.), another task can run.

Async Python excels at concurrency. It's perfect when your code spends most of its time waiting for external things - API responses, database queries, file reads. It's terrible when your code is actually doing computation.

Gotcha #1: One slow thing blocks everything

This is the most common mistake. Async doesn't make slow code fast. If you do something CPU-intensive in an async function, you block the whole event loop. The 'async' keyword doesn't magically parallelize your code.

async def process_image(image_data):
    # BAD: This blocks the entire event loop!
    result = expensive_image_processing(image_data)
    return result

While that image is being processed, nothing else can run. No other requests get handled. Your whole application freezes.

The fix is to run CPU-bound work in a thread pool:

import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

async def process_image(image_data):
    loop = asyncio.get_event_loop()
    # Runs in a separate thread, doesn't block the event loop
    result = await loop.run_in_executor(
        executor, expensive_image_processing, image_data
    )
    return result

Or for truly CPU-intensive work, use a process pool to bypass the GIL entirely.

Gotcha #2: asyncio.gather() can fail silently (sort of)

When you're running multiple async tasks concurrently, gather() is the go-to:

results = await asyncio.gather(
    fetch_user(user_id),
    fetch_orders(user_id),
    fetch_notifications(user_id)
)

But here's the thing: if any of those tasks raises an exception, gather() raises immediately and you lose the results from the tasks that succeeded.

Usually what you actually want is:

results = await asyncio.gather(
    fetch_user(user_id),
    fetch_orders(user_id),
    fetch_notifications(user_id),
    return_exceptions=True  # <- This changes everything
)

# Now results might contain exceptions mixed with successful results
user, orders, notifications = results
if isinstance(orders, Exception):
    # Handle the error, but we still got user and notifications
    orders = []

With return_exceptions=True, failed tasks return their exception as a result instead of crashing everything. You can handle failures individually while still using the successful results.

Gotcha #3: You need to limit concurrency yourself

Async makes it easy to fire off thousands of concurrent operations. Too easy.

# This looks innocent but will probably crash something
urls = [f'https://api.example.com/item/{i}' for i in range(10000)]
results = await asyncio.gather(*[fetch(url) for url in urls])

You just tried to open 10,000 simultaneous connections. The target server is going to rate-limit you, or your OS will run out of file descriptors, or you'll just crash something.

The fix is a semaphore:

sem = asyncio.Semaphore(50)  # max 50 concurrent requests

async def limited_fetch(url):
    async with sem:
        return await fetch(url)

# Now only 50 requests run at a time
results = await asyncio.gather(*[limited_fetch(url) for url in urls])

The semaphore acts like a bouncer at a club. Only 50 requests can be 'inside' at a time. The rest wait their turn.

I usually wrap this pattern in a helper function because I use it so often:

async def gather_with_limit(coros, limit=50):
    sem = asyncio.Semaphore(limit)
    async def limited(coro):
        async with sem:
            return await coro
    return await asyncio.gather(*[limited(c) for c in coros])

Gotcha #4: Debugging is genuinely harder

Stack traces in async code are confusing. When an exception happens, the traceback shows you the async machinery instead of where your actual logic lives. You'll see frames like _run_once and run_until_complete that tell you nothing useful.

The traceback might not show the line that actually caused the problem. It might show where you awaited something, which could be far from where the bug actually is.

Print statements become your friend again:

async def mysterious_function(data):
    print(f'DEBUG: entering with {data=}')
    result = await some_operation(data)
    print(f'DEBUG: got {result=}')
    processed = transform(result)
    print(f'DEBUG: transformed to {processed=}')
    return processed

Yeah, it's not sophisticated. But it works.

For more complex debugging, look into enabling asyncio's debug mode:

import asyncio
asyncio.get_event_loop().set_debug(True)

This gives you warnings about coroutines that were never awaited and slow callbacks that might be blocking the event loop.

Gotcha #5: Mixing sync and async code is tricky

You can't call an async function from sync code without some ceremony:

# This doesn't work - you just get a coroutine object
result = async_function()  # Returns <coroutine object>

# You need to actually run it
result = asyncio.run(async_function())  # Works, but creates a new event loop

And you can't easily call sync code from async without blocking:

async def handler(request):
    # This blocks the event loop!
    data = sync_database_call()  # Oops
    return Response(data)

When you're in async land, everything you await needs to be async-native. Libraries like httpx (instead of requests), asyncpg (instead of psycopg2), aiofiles (instead of open). There's an async version of most popular libraries, but you have to actually use them.

Gotcha #6: Context doesn't propagate automatically

If you're using context variables (like for request IDs in logging), they don't automatically propagate to spawned tasks:

from contextvars import ContextVar

request_id_var = ContextVar('request_id')

async def handle_request(request_id):
    request_id_var.set(request_id)
    # This task sees request_id_var fine

    # But tasks created here might not, depending on how you create them
    asyncio.create_task(background_job())  # Inherits context (good)

The good news is that asyncio.create_task() copies the context by default. But if you're using thread pools or other mechanisms, you might need to handle context explicitly.

Gotcha #7: Timeouts are not built-in where you think they are

I see this all the time in production code - someone writes an async function that calls an API and assumes there's some default timeout. There isn't.

async def fetch_user_data(user_id):
    # If this API is down, this will hang FOREVER
    async with httpx.AsyncClient() as client:
        response = await client.get(f'https://api.example.com/users/{user_id}')
    return response.json()

If that API server is stuck, your request will wait indefinitely. Your application will slowly accumulate stuck tasks until it grinds to a halt.

Always use timeouts:

import asyncio

async def fetch_user_data(user_id):
    async with httpx.AsyncClient(timeout=10.0) as client:
        try:
            response = await asyncio.wait_for(
                client.get(f'https://api.example.com/users/{user_id}'),
                timeout=5.0
            )
            return response.json()
        except asyncio.TimeoutError:
            # Handle timeout gracefully
            raise ServiceUnavailableError(f'User service timed out for {user_id}')

Notice I have two timeouts here - one at the HTTP client level (10 seconds) and one wrapping the specific request (5 seconds). The client-level timeout is a safety net; the request-level timeout is what I actually expect to trigger if things go wrong.

Gotcha #8: Task cancellation is more complex than it looks

Cancelling async tasks seems straightforward until you actually need to do cleanup properly.

async def long_running_task():
    try:
        await asyncio.sleep(100)
        print('Task completed')
    except asyncio.CancelledError:
        print('Task was cancelled')
        # WRONG: If you don't re-raise, cancellation is suppressed!

If you catch CancelledError and don't re-raise it, the task won't actually be cancelled. It'll keep running. This is especially dangerous with background tasks.

The correct pattern:

async def long_running_task():
    try:
        await asyncio.sleep(100)
        print('Task completed')
    except asyncio.CancelledError:
        # Do cleanup
        await cleanup_resources()
        # MUST re-raise
        raise

Even better, use context managers for cleanup when possible:

async def long_running_task():
    async with managed_resource() as resource:
        await asyncio.sleep(100)
        # Context manager handles cleanup even on cancellation

Real-world pattern: The retry decorator

Here's a pattern I use constantly in production. Most API calls should retry with exponential backoff, but writing that logic everywhere is tedious and error-prone.

import asyncio
import random
from functools import wraps

def async_retry(max_attempts=3, base_delay=1.0, max_delay=60.0):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    last_exception = e

                    if attempt == max_attempts - 1:
                        # Last attempt, give up
                        raise

                    # Exponential backoff with jitter
                    delay = min(base_delay * (2 ** attempt), max_delay)
                    jitter = random.uniform(0, delay * 0.1)
                    await asyncio.sleep(delay + jitter)

            raise last_exception
        return wrapper
    return decorator

# Usage
@async_retry(max_attempts=3, base_delay=1.0)
async def fetch_flaky_api(url):
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        response.raise_for_status()
        return response.json()

The jitter is important - without it, if multiple tasks are retrying simultaneously, they'll all retry at the exact same time and create thundering herd problems.

Real-world pattern: Batching requests

Sometimes you have many small items to process, and making individual API calls for each is inefficient. Batching helps:

class AsyncBatcher:
    def __init__(self, batch_size=50, flush_interval=1.0):
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.queue = []
        self.lock = asyncio.Lock()
        self.flush_task = None

    async def start(self):
        self.flush_task = asyncio.create_task(self._periodic_flush())

    async def _periodic_flush(self):
        while True:
            await asyncio.sleep(self.flush_interval)
            await self.flush()

    async def add(self, item):
        async with self.lock:
            self.queue.append(item)
            if len(self.queue) >= self.batch_size:
                await self.flush()

    async def flush(self):
        async with self.lock:
            if not self.queue:
                return

            batch = self.queue[:]
            self.queue.clear()

        # Process batch (outside the lock)
        await self._process_batch(batch)

    async def _process_batch(self, items):
        # Send all items in one API call
        await api.bulk_create(items)

# Usage
batcher = AsyncBatcher(batch_size=100, flush_interval=2.0)
await batcher.start()

# Later, items accumulate and get flushed automatically
for item in items:
    await batcher.add(item)

This pattern ensures you're not making tons of tiny API calls, but you also don't wait forever to process items.

Real-world pattern: Circuit breaker

When a service is down, you don't want to keep hammering it with requests. A circuit breaker helps:

from enum import Enum
import time

class CircuitState(Enum):
    CLOSED = "closed"  # Normal operation
    OPEN = "open"      # Service is down, fail fast
    HALF_OPEN = "half_open"  # Testing if service recovered

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60.0):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    async def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time >= self.timeout:
                # Try again
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception('Circuit breaker is OPEN')

        try:
            result = await func(*args, **kwargs)
            # Success - reset
            self.failure_count = 0
            self.state = CircuitState.CLOSED
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN

            raise

# Usage
breaker = CircuitBreaker(failure_threshold=5, timeout=60.0)

async def fetch_with_breaker(url):
    return await breaker.call(httpx.get, url)

When the service is down, the circuit breaker "opens" and fails fast instead of waiting for timeouts. After a cooldown period, it goes into "half-open" state to test if the service recovered.

Testing async code

Testing async code requires some special considerations. You can't just call an async function in a test - you need to run it in an event loop.

pytest makes this easier with pytest-asyncio:

import pytest

@pytest.mark.asyncio
async def test_fetch_user():
    result = await fetch_user(123)
    assert result['id'] == 123

# For testing timeout behavior
@pytest.mark.asyncio
async def test_timeout_handling():
    with pytest.raises(asyncio.TimeoutError):
        await asyncio.wait_for(slow_function(), timeout=0.1)

# For testing concurrent behavior
@pytest.mark.asyncio
async def test_concurrent_requests():
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3)
    )
    assert len(results) == 3

Mocking async functions requires async mocks:

from unittest.mock import AsyncMock

async def test_with_mock():
    mock_api = AsyncMock()
    mock_api.get_user.return_value = {'id': 123, 'name': 'Test'}

    result = await fetch_user_data(123, api=mock_api)
    assert result['name'] == 'Test'
    mock_api.get_user.assert_called_once_with(123)

Performance monitoring

In production, you need to monitor your async code's performance. Here's a pattern I use:

import time
from functools import wraps

def async_timing(name):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start = time.perf_counter()
            try:
                result = await func(*args, **kwargs)
                return result
            finally:
                duration = time.perf_counter() - start
                # Log to your metrics system
                metrics.histogram(f'async.{name}.duration', duration)
        return wrapper
    return decorator

@async_timing('fetch_user')
async def fetch_user(user_id):
    # ... implementation
    pass

This helps you identify which async operations are slow and might be blocking your event loop.

Common async pitfalls in specific scenarios

Let me walk through some real scenarios I've debugged in production systems.

The database connection pool gotcha

You switched to an async database driver like asyncpg. Great! But did you configure the connection pool correctly?

# BAD: Creating a new pool for every request
async def get_user(user_id):
    pool = await asyncpg.create_pool(DATABASE_URL)  # Don't do this!
    async with pool.acquire() as conn:
        return await conn.fetchrow('SELECT * FROM users WHERE id = $1', user_id)

# GOOD: Create pool once at startup
pool = None

async def startup():
    global pool
    pool = await asyncpg.create_pool(
        DATABASE_URL,
        min_size=10,
        max_size=20,
        command_timeout=5.0
    )

async def get_user(user_id):
    async with pool.acquire() as conn:
        return await conn.fetchrow('SELECT * FROM users WHERE id = $1', user_id)

Connection pools are expensive to create. Create one at application startup and reuse it. Also, set timeouts on your queries - database operations can hang just like HTTP requests.

The file I/O mistake

Reading files with regular open() in async code blocks the event loop. Use aiofiles:

# BAD: Blocks the event loop
async def read_config():
    with open('config.json') as f:
        return json.load(f)

# GOOD: Actually async
import aiofiles

async def read_config():
    async with aiofiles.open('config.json', mode='r') as f:
        contents = await f.read()
        return json.loads(contents)

This matters more than you'd think. On a busy server reading multiple config files or logs, synchronous file I/O can add noticeable latency.

The background task memory leak

Background tasks that never complete can leak memory. Always keep track of them:

# BAD: Tasks just float around forever
async def handle_request(request):
    # Fire and forget - but what if it never finishes?
    asyncio.create_task(send_analytics_event(request))
    return Response('OK')

# GOOD: Track and periodically clean up
background_tasks = set()

async def handle_request(request):
    task = asyncio.create_task(send_analytics_event(request))
    background_tasks.add(task)
    task.add_done_callback(background_tasks.discard)
    return Response('OK')

The add_done_callback ensures completed tasks are removed from the set automatically. Without this, the set grows unbounded and you have a memory leak.

The sleep vs. wait_for confusion

asyncio.sleep() is not the same as timeout. I see this mistake all the time:

# WRONG: This doesn't timeout the operation!
async def fetch_with_timeout(url):
    await asyncio.sleep(5)  # This just waits 5 seconds, then continues
    return await httpx.get(url)

# RIGHT: This actually enforces a timeout
async def fetch_with_timeout(url):
    return await asyncio.wait_for(httpx.get(url), timeout=5)

sleep() is for delaying execution. wait_for() is for enforcing timeouts on operations.

The exception in asyncio.gather() detail

Remember how I said gather() fails if any task raises an exception? Here's the subtle part - it doesn't cancel the other tasks automatically:

async def slow_task():
    await asyncio.sleep(10)
    return 'slow'

async def failing_task():
    await asyncio.sleep(1)
    raise ValueError('oops')

# The slow_task keeps running even though gather() raises after 1 second
try:
    results = await asyncio.gather(slow_task(), failing_task())
except ValueError:
    # failing_task raised, but slow_task is STILL RUNNING in the background!
    pass

If you want to cancel all tasks when one fails, you need to do it explicitly:

tasks = [asyncio.create_task(slow_task()), asyncio.create_task(failing_task())]
try:
    results = await asyncio.gather(*tasks)
except Exception:
    # Cancel all tasks
    for task in tasks:
        task.cancel()
    raise

Library-specific gotchas

Different async libraries have their own quirks. Here are some I've encountered:

aiohttp vs httpx

Both are popular async HTTP clients, but they have different APIs and behaviors:

# aiohttp requires a session
async with aiohttp.ClientSession() as session:
    async with session.get('https://api.example.com/data') as response:
        data = await response.json()

# httpx can be used without a session (but sessions are better for connection pooling)
response = await httpx.get('https://api.example.com/data')
data = response.json()  # Note: NOT awaited in httpx!

The response.json() difference has bitten me multiple times when switching between libraries.

Redis and connection handling

With aioredis (now merged into redis-py 4.2+), forgetting to await can cause silent failures:

# BAD: Forgot to await, get a coroutine object instead of the value
redis = aioredis.from_url('redis://localhost')
value = redis.get('key')  # Returns a coroutine, not the value!

# GOOD: Actually await the operation
value = await redis.get('key')

Python won't always warn you about un-awaited coroutines if you don't use the result, leading to operations that silently don't happen.

The "just use asyncio.run()" trap

For simple scripts, asyncio.run() seems perfect. But in more complex scenarios, it has limitations:

# This looks fine
async def main():
    result = await fetch_data()
    print(result)

asyncio.run(main())

But asyncio.run() creates a new event loop, runs your coroutine, and closes the loop. If you need to run async code in an interactive environment (like a Jupyter notebook) or integrate with existing async code, you might need:

# For Jupyter/IPython
import nest_asyncio
nest_asyncio.apply()

# Or get the running loop if one exists
try:
    loop = asyncio.get_running_loop()
    # Use loop.create_task() instead of asyncio.run()
except RuntimeError:
    # No loop is running, safe to use asyncio.run()
    asyncio.run(main())

When async is worth it

Despite the gotchas, async Python is genuinely useful for:

  • Web servers: Handling many concurrent HTTP requests
  • API clients: Making many API calls concurrently
  • WebSockets: Managing many long-lived connections
  • Data pipelines: Fetching from multiple sources concurrently

The pattern is always the same: lots of waiting for I/O. If that describes your workload, async can give you significant performance improvements with relatively little code change.

When to avoid async

Don't use async for:

  • CPU-bound work: Use multiprocessing instead
  • Simple scripts: The overhead isn't worth it
  • Heavily sync dependencies: If your main library is sync-only, don't fight it
  • When you don't understand it: Sync code that works beats async code that doesn't

The bottom line

Async Python is powerful but has sharp edges. The tutorials make it look easy because they skip the hard parts. In production, you'll hit all of these gotchas and more.

But once you learn the patterns - semaphores for limiting concurrency, run_in_executor for CPU-bound work, return_exceptions for robust error handling - async Python becomes a genuinely useful tool. Just don't expect it to be as simple as the tutorials suggest.

Send a Message