Just Use httpx
If you're doing async Python, stop using requests. Use httpx.
Why?
requests is synchronous. There's no good way to use it with async code. People try wrapping it with run_in_executor or whatever, but it's messy and you lose all the benefits of async.
httpx is async-native:
async with httpx.AsyncClient() as client:
response = await client.get('https://api.example.com')
The API is almost identical to requests, so migration is easy. If you know requests, you already know 90% of httpx.
Concurrent requests are trivial
This is where httpx really shines. Need to fetch 10 URLs?
async def fetch_all(urls):
async with httpx.AsyncClient() as client:
tasks = [client.get(url) for url in urls]
return await asyncio.gather(*tasks)
This fires off all the requests at once. With requests, you'd have to use threads or something like concurrent.futures. With httpx, it's just normal async code that does what you'd expect.
I used this pattern to speed up a data pipeline at work that was making sequential API calls. Went from 30 seconds to about 3 seconds. Just by switching to httpx and making the calls concurrent.
But here's something not obvious: you want to reuse the client. Creating a new AsyncClient for every request is wasteful. The client maintains a connection pool, and reusing it means you're not establishing new TCP connections every time.
# Bad: creates new client for each request
async def fetch_user(user_id):
async with httpx.AsyncClient() as client:
return await client.get(f'/users/{user_id}')
# Good: share the client
async def fetch_users(client, user_ids):
tasks = [client.get(f'/users/{uid}') for uid in user_ids]
return await asyncio.gather(*tasks)
async with httpx.AsyncClient() as client:
users = await fetch_users(client, [1, 2, 3, 4, 5])
The connection pool keeps TCP connections alive and reuses them. This matters a lot when you're making hundreds of requests to the same API.
Rate limiting without pain
APIs have rate limits. httpx makes it easy to respect them with asyncio.Semaphore:
async def fetch_with_limit(client, semaphore, url):
async with semaphore:
return await client.get(url)
async def main():
# Max 10 concurrent requests
semaphore = asyncio.Semaphore(10)
async with httpx.AsyncClient() as client:
tasks = [fetch_with_limit(client, semaphore, url) for url in urls]
return await asyncio.gather(*tasks)
This lets you fire off hundreds of tasks, but only 10 will actually be making requests at once. The rest wait their turn. No complicated thread pools or queues.
Streaming responses
Downloading a 500MB file? Don't load it all into memory:
async with httpx.AsyncClient() as client:
async with client.stream('GET', 'https://example.com/large-file.zip') as response:
async for chunk in response.aiter_bytes(chunk_size=8192):
# Process chunk
file.write(chunk)
With requests, you'd use response.iter_content(). Same idea, but httpx makes it async so you're not blocking while waiting for chunks.
This pattern also works great for parsing streaming JSON APIs or processing CSV files without loading everything into RAM.
File uploads and multipart forms
File uploads work just like requests:
files = {'file': open('report.pdf', 'rb')}
response = await client.post('https://api.example.com/upload', files=files)
For more control, you can build multipart forms manually:
files = {
'file': ('report.pdf', open('report.pdf', 'rb'), 'application/pdf'),
'thumbnail': ('thumb.jpg', open('thumb.jpg', 'rb'), 'image/jpeg'),
}
data = {'user_id': '123', 'description': 'Monthly report'}
response = await client.post(url, files=files, data=data)
The tuple format is (filename, file_object, content_type). httpx handles the multipart encoding for you.
Authentication is straightforward
Basic auth:
client = httpx.AsyncClient(auth=('username', 'password'))
Bearer tokens:
headers = {'Authorization': f'Bearer {token}'}
client = httpx.AsyncClient(headers=headers)
Or write custom auth if you need something weird:
class CustomAuth(httpx.Auth):
def __init__(self, token):
self.token = token
def auth_flow(self, request):
request.headers['X-Custom-Auth'] = self.token
yield request
client = httpx.AsyncClient(auth=CustomAuth('secret-token'))
The auth_flow pattern is powerful. You can intercept requests, modify headers, handle token refresh, whatever you need.
Error handling differences
httpx raises different exceptions than requests. The main one: httpx raises HTTPStatusError for 4xx/5xx responses only if you call response.raise_for_status().
try:
response = await client.get(url)
response.raise_for_status()
except httpx.HTTPStatusError as e:
print(f"HTTP error: {e.response.status_code}")
except httpx.RequestError as e:
print(f"Request failed: {e}")
RequestError covers network errors, timeouts, connection failures. HTTPStatusError is specifically for bad status codes. This separation is cleaner than requests' generic RequestException.
Testing with custom transports
httpx has a transport layer that makes testing way easier than mocking requests:
from httpx import AsyncClient, MockTransport, Response
async def mock_handler(request):
if request.url.path == "/users/1":
return Response(200, json={"id": 1, "name": "Alice"})
return Response(404)
async def test_my_function():
transport = MockTransport(mock_handler)
async with AsyncClient(transport=transport) as client:
# Your code that uses client
result = await fetch_user(client, 1)
assert result["name"] == "Alice"
No more mocking the entire requests module. You just provide a handler function that returns responses. Clean, explicit, easy to understand.
The sync client exists too
You don't have to use async. httpx has a sync client that works just like requests:
import httpx
response = httpx.get('https://api.example.com')
print(response.json())
So even in sync code, httpx is a solid choice. It's more actively maintained than requests and has a cleaner codebase.
Timeouts that make sense
One thing that always bugged me about requests was the timeout behavior. httpx has more sensible defaults and lets you set different timeouts for different phases:
timeout = httpx.Timeout(10.0, connect=5.0)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(url)
5 seconds to connect, 10 seconds total. No more requests hanging forever because you forgot to set a timeout.
Bonus: HTTP/2 support
httpx supports HTTP/2 out of the box. If you're making lots of requests to the same host, this is a noticeable performance improvement. HTTP/2 multiplexes requests over a single connection, so you don't pay the overhead of establishing new connections for each request.
async with httpx.AsyncClient(http2=True) as client:
# All requests to the same host share one connection
...
I tested this with a microservice that makes about 50 requests to the same API endpoint. With HTTP/1.1, total time was around 800ms. With HTTP/2 enabled, it dropped to about 400ms. Same code, just http2=True. Not every API supports HTTP/2 yet, but when they do, it's free performance.
Connection pooling deep dive
Understanding connection pooling can save you a ton of headaches in production. By default, httpx maintains a pool of persistent connections. This is great, but you need to tune it for your use case.
limits = httpx.Limits(max_keepalive_connections=20, max_connections=100)
async with httpx.AsyncClient(limits=limits) as client:
# Client maintains up to 20 persistent connections
# and allows up to 100 total connections
...
max_keepalive_connections is how many idle connections to keep open. If you're hitting the same 5 APIs over and over, keeping 20 connections alive is wasteful. Lower it to 5-10.
max_connections is the hard limit on total connections. If you hit this limit, requests will wait. I've seen apps hang because they set this too low and all connections got tied up waiting for slow responses.
Here's a production pattern I use:
# config.py
HTTPX_LIMITS = httpx.Limits(
max_keepalive_connections=10,
max_connections=50,
keepalive_expiry=30.0 # Close idle connections after 30 seconds
)
HTTPX_TIMEOUT = httpx.Timeout(
connect=5.0,
read=30.0,
write=10.0,
pool=5.0 # Max time to wait for a connection from the pool
)
# app.py
class APIClient:
def __init__(self):
self._client = None
async def __aenter__(self):
self._client = httpx.AsyncClient(
limits=HTTPX_LIMITS,
timeout=HTTPX_TIMEOUT,
http2=True
)
return self._client
async def __aexit__(self, *args):
await self._client.aclose()
This gives you fine-grained control over timeouts and connection behavior. The pool timeout is especially important—it prevents your app from hanging if the connection pool is exhausted.
Retries and backoff strategies
APIs fail. Networks are unreliable. You need retry logic. httpx doesn't have built-in retries (unlike some libraries), but that's actually good—you can implement exactly the retry behavior you need.
Here's a simple exponential backoff:
async def fetch_with_retry(client, url, max_retries=3):
for attempt in range(max_retries):
try:
response = await client.get(url, timeout=10.0)
response.raise_for_status()
return response
except (httpx.HTTPStatusError, httpx.RequestError) as e:
if attempt == max_retries - 1:
raise
# Don't retry client errors (4xx), only server errors (5xx) and network issues
if isinstance(e, httpx.HTTPStatusError) and 400 <= e.response.status_code < 500:
raise
wait_time = 2 ** attempt # 1s, 2s, 4s
await asyncio.sleep(wait_time)
This retries on 5xx errors and network failures, but not 4xx errors (which won't succeed on retry anyway). The backoff prevents hammering a struggling server.
For production, I usually add jitter to prevent thundering herd problems:
import random
wait_time = (2 ** attempt) + random.uniform(0, 1)
If 100 clients all hit a 503 at the same time and retry after exactly 2 seconds, you've just created another spike. Jitter spreads out the retries.
Real-world gotcha: closing the client properly
This bit me hard. If you don't properly close the httpx client, you'll leak connections and eventually hit OS limits. In a long-running service, this means your app slowly stops working.
# Bad: client never gets closed if there's an exception
async def fetch_data():
client = httpx.AsyncClient()
response = await client.get(url)
await client.aclose() # This might not run!
return response
# Good: use context manager
async def fetch_data():
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response
In FastAPI or similar frameworks, you want a single client for the app lifetime:
from fastapi import FastAPI
app = FastAPI()
@app.on_event("startup")
async def startup():
app.state.http_client = httpx.AsyncClient(
limits=HTTPX_LIMITS,
timeout=HTTPX_TIMEOUT
)
@app.on_event("shutdown")
async def shutdown():
await app.state.http_client.aclose()
@app.get("/fetch")
async def fetch_endpoint():
response = await app.state.http_client.get("https://api.example.com/data")
return response.json()
One client for the entire app. It's created on startup and closed on shutdown. All your endpoints share it. This is way more efficient than creating a new client per request.
Debugging techniques
When requests fail, you need visibility. httpx makes debugging easier than requests did.
import logging
# Enable httpx debug logging
logging.basicConfig(level=logging.DEBUG)
This shows you exactly what's happening: DNS lookups, TCP connections, TLS handshakes, request headers, response headers. It's verbose, but when something's broken, it's invaluable.
For production monitoring, log the important stuff:
async def fetch_with_logging(client, url):
start = time.time()
try:
response = await client.get(url)
duration = time.time() - start
logger.info(
"HTTP request completed",
extra={
"url": url,
"status_code": response.status_code,
"duration_ms": int(duration * 1000),
"response_size": len(response.content)
}
)
return response
except Exception as e:
duration = time.time() - start
logger.error(
"HTTP request failed",
extra={
"url": url,
"duration_ms": int(duration * 1000),
"error": str(e)
}
)
raise
This gives you metrics on request latency, failure rates, and response sizes. Pipe this to your logging infrastructure and you can build dashboards showing API health.
Working with webhooks and callbacks
Sometimes you're on the receiving end—your app needs to handle incoming HTTP requests and make outbound calls. httpx works great here too.
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
async def process_webhook(client: httpx.AsyncClient, data: dict):
# Do something with the webhook data
result = await client.post(
"https://internal-api.example.com/process",
json=data
)
return result.json()
@app.post("/webhook")
async def webhook_handler(data: dict, background_tasks: BackgroundTasks):
# Don't block the webhook response
background_tasks.add_task(process_webhook, app.state.http_client, data)
return {"status": "accepted"}
The webhook responds immediately while processing happens in the background. The outbound httpx call doesn't block the webhook sender.
Performance optimization tricks
Here are some patterns that made my code noticeably faster:
1. Pre-serialize JSON
If you're sending the same JSON payload multiple times, serialize it once:
import json
payload = {"key": "value", "items": [1, 2, 3]}
json_bytes = json.dumps(payload).encode('utf-8')
async with httpx.AsyncClient() as client:
for url in urls:
await client.post(url, content=json_bytes, headers={"Content-Type": "application/json"})
Serializing JSON isn't free. Do it once, not N times.
2. Disable redirect following for APIs
Most APIs don't redirect. Save a bit of overhead:
async with httpx.AsyncClient(follow_redirects=False) as client:
response = await client.get(url)
3. Compression
If you're transferring lots of data, enable compression:
headers = {"Accept-Encoding": "gzip, deflate"}
async with httpx.AsyncClient(headers=headers) as client:
response = await client.get(url)
httpx automatically decompresses responses. This can reduce transfer time significantly for large JSON payloads.
When not to use httpx
httpx isn't always the right choice. Here's when to consider alternatives:
- WebSocket connections: Use
websocketslibrary instead - gRPC: Use the official gRPC library
- GraphQL: Libraries like
gqlprovide better GraphQL-specific features - Very high concurrency (1000+ simultaneous connections): Consider
aiohttpwhich is slightly more performant at extreme scale
But for 95% of HTTP use cases, httpx is the sweet spot between features and simplicity.
Migrating from requests
If you're already using requests, migration is painless. Most code just needs import changes:
# Before
import requests
response = requests.get('https://api.example.com')
# After (sync)
import httpx
response = httpx.get('https://api.example.com')
# After (async)
import httpx
async with httpx.AsyncClient() as client:
response = await client.get('https://api.example.com')
The response object has the same .json(), .text, .content attributes. Session objects work the same way. If you're using requests.Session, just swap it for httpx.Client or httpx.AsyncClient.
The only gotcha is exception handling. Update your except blocks to catch httpx exceptions instead of requests exceptions, and you're done.
Just use httpx.