Async Python: asyncio, Coroutines, and Event Loops Without the Confusion
Why async Python is not about parallelism โ it is about not wasting CPU while waiting for I/O
Abstract AlgorithmsAI-assisted content. This post may have been written or enhanced with the help of AI tools. While efforts are made to ensure accuracy, the content may contain errors or inaccuracies. Please verify critical information independently.
๐ The 500-Second Problem: What Cooperative Multitasking Actually Fixes
Suppose your monitoring pipeline checks the health endpoint of 1,000 internal microservices. Each HTTP call takes about 500 milliseconds โ network round-trip, DNS, TLS handshake, server response. A straightforward implementation fires each request one after the other:
import requests
import time
urls = [f"https://service-{i}.internal/health" for i in range(1000)]
start = time.time()
for url in urls:
response = requests.get(url, timeout=5)
print(response.status_code)
print(f"Total time: {time.time() - start:.1f}s") # ~500 seconds
Five hundred seconds โ over eight minutes โ to do something that should feel instant. Look at what the CPU is doing during those 500 seconds: essentially nothing. Every requests.get call parks the entire program while the kernel waits for a network response. The CPU sits at 0% utilization for ~99.9% of the run. This is the problem asyncio was built to eliminate.
Now consider the same task written with asyncio and aiohttp:
import asyncio
import aiohttp
import time
async def fetch(session, url):
async with session.get(url) as response:
return response.status
async def main():
urls = [f"https://service-{i}.internal/health" for i in range(1000)]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(results[:5])
start = time.time()
asyncio.run(main())
print(f"Total time: {time.time() - start:.1f}s") # ~5 seconds
Five seconds instead of 500. Same single Python thread. No multiprocessing. No threads. No locks. No magic.
The reason is cooperative multitasking. While one coroutine is waiting for a response from service-1, the event loop does not stall โ it hands control to the coroutine that is waiting for service-2. When service-2's response arrives, that coroutine resumes. The OS kernel multiplexes the I/O using epoll (Linux), kqueue (macOS), or select (Windows), and the event loop calls the appropriate callback when data is ready. From Python's perspective, hundreds of I/O waits happen "at the same time" on one thread.
This is not parallelism. No two coroutines execute simultaneously. There is no true concurrency โ only the cooperative elimination of idle waiting time. That distinction is not a footnote; it is the whole design philosophy of asyncio, and it determines exactly when the tool is appropriate and when it is not.
๐ async def, await, and Why a Coroutine Is Not a Function Call
Before you can read asyncio code fluently, you need to internalize one counterintuitive fact: calling an async def function does not execute its body. This surprises almost everyone coming from synchronous Python or other languages.
A regular function executes immediately when called:
def greet(name):
return f"Hello, {name}"
result = greet("Ana") # Body runs right now โ result is "Hello, Ana"
An async def function โ called a coroutine function โ returns a coroutine object when called. The body has not run. The coroutine object is a suspended computation waiting to be driven by an event loop:
async def greet(name):
return f"Hello, {name}"
coro = greet("Ana") # Body has NOT run โ coro is a coroutine object
print(coro) # <coroutine object greet at 0x10f3a4b20>
Python will even emit a RuntimeWarning: coroutine 'greet' was never awaited if you let that coroutine object get garbage collected without running it. Forgetting await is one of the most common bugs in async codebases.
To actually execute a coroutine you have two options:
asyncio.run(coro)โ creates a fresh event loop, runs the coroutine to completion, tears down the loop, and returns the result. This is your program's top-level entry point for async code. Never call it from inside an already-running event loop.await coroโ inside anotherasync deffunction, suspends the current coroutine and gives control back to the event loop until the awaited coroutine finishes. The event loop is free to run other ready coroutines while this one waits.
async def main():
result = await greet("Ana") # Body of greet() runs here
print(result) # "Hello, Ana"
asyncio.run(main())
The await keyword does two things simultaneously. First, it evaluates the awaitable expression โ which can be a coroutine, a Task, a Future, or any object implementing __await__. Second, it yields control back to the event loop, which is then free to schedule other ready coroutines. If the awaitable completes instantly, the yield is invisible to you, but it still happens at the bytecode level.
asyncio.sleep(0) is the canonical no-op yield โ a way of telling the event loop "run anything else that is ready before coming back to me" without actually waiting for anything:
async def cooperative_loop(name, count):
for i in range(count):
print(f"{name}: step {i}")
await asyncio.sleep(0) # Give other coroutines a turn
The three primitives you will encounter in every async codebase:
| Primitive | What it does | When to use |
async def f() | Defines a coroutine function | Whenever the function needs to await anything |
await expr | Suspends current coroutine; resumes when awaitable is done | Inside async def; every I/O call |
asyncio.run(coro) | Runs a coroutine as the top-level entry point | Main function, test harness, script entry point |
โ๏ธ How the Event Loop Schedules, Suspends, and Resumes Coroutines
The event loop is the heart of asyncio. It is a scheduler that maintains a queue of ready-to-run coroutines and an I/O selector that watches file descriptors for readiness events. Its work cycle repeats continuously:
- Pick the next ready coroutine from the run queue.
- Resume it โ let it execute Python bytecode until it hits an
await. - If the
awaitis on an I/O operation, register the file descriptor with the OS selector and move on. - Poll the OS selector for I/O readiness events.
- Mark any coroutines whose I/O is now complete as ready.
- Repeat until the queue is empty.
The flowchart below traces one complete turn of this scheduling loop:
flowchart TD
A[Event Loop starts] --> B[Pick next ready coroutine from queue]
B --> C[Resume coroutine until it hits await]
C --> D{What did the coroutine await?}
D -->|Coroutine completed| E[Store result and mark Task done]
E --> F{More coroutines in queue?}
F -->|Yes| B
F -->|No| G[Event Loop exits]
D -->|I/O await - socket or file| H[Register fd with OS selector]
H --> I[Poll OS selector with timeout]
I --> J{Any file descriptor ready?}
J -->|Yes| K[Move waiting coroutine back to run queue]
K --> B
J -->|No - still waiting| F
When a coroutine suspends on an I/O await, it is moved off the run queue entirely and registered against a file descriptor in the OS selector. The event loop does not spin-wait โ it calls select() or epoll_wait() with a timeout, allowing the OS kernel to block efficiently until I/O arrives. This is the mechanism that allows a single thread to appear to handle many concurrent I/O operations simultaneously.
asyncio.create_task() versus Awaiting a Coroutine Directly
Using await on a bare coroutine runs it sequentially โ one completes before the next starts:
async def main():
result1 = await fetch(session, url1) # Waits for full completion
result2 = await fetch(session, url2) # Only then starts this
asyncio.create_task() wraps a coroutine in a Task object, immediately schedules it on the running event loop, and returns the Task without blocking. Both tasks can now make progress at await points:
async def main():
task1 = asyncio.create_task(fetch(session, url1)) # Scheduled immediately
task2 = asyncio.create_task(fetch(session, url2)) # Scheduled immediately
result1 = await task1 # Now both can interleave during I/O waits
result2 = await task2
asyncio.gather() for Fan-Out Concurrency
asyncio.gather(*coroutines_or_tasks) is the idiomatic way to run many coroutines concurrently and collect all their results in one call:
results = await asyncio.gather(
fetch(session, url1),
fetch(session, url2),
fetch(session, url3),
)
# results[0] corresponds to url1, results[1] to url2, etc.
All three coroutines start and can interleave freely at their await points. Results are returned in the same order as the inputs, regardless of which coroutine finished first. If any coroutine raises an exception, gather cancels the remaining tasks and re-raises the exception by default. Pass return_exceptions=True to collect exceptions as result values instead.
Timeouts with asyncio.wait_for()
Wrap any awaitable with asyncio.wait_for(coro, timeout=seconds) to cancel it automatically if it runs too long:
try:
result = await asyncio.wait_for(fetch(session, slow_url), timeout=2.0)
except asyncio.TimeoutError:
print("Request exceeded 2 seconds โ skipping")
When the timeout fires, asyncio cancels the inner task and raises asyncio.TimeoutError. The cancelled coroutine receives a CancelledError at its current await point. You can catch CancelledError inside the coroutine to run cleanup before re-raising โ for example, to close an open file or release a database connection.
๐ง Under the Hood: How Python's asyncio Scheduler Actually Works
Understanding the internals of asyncio transforms mysterious behavior into predictable cause and effect. This section covers the cooperative scheduling model and where its performance profile holds up versus where it degrades.
The Internals of the Event Loop
Python's asyncio event loop is single-threaded and cooperative. These two adjectives explain both its elegance and its failure modes.
"Single-threaded" means no two coroutines ever run at the same Python instruction simultaneously. There are no data races on shared Python objects โ not because of locking, but because only one coroutine can be executing at any given moment. You can freely share lists, dicts, and custom Python objects between coroutines without mutexes, as long as you remain alert to mutation at await boundaries (where another coroutine may modify shared state before you resume).
"Cooperative" means the event loop cannot preempt a running coroutine mid-execution. The scheduler can only regain control when the coroutine voluntarily yields via an await. This is the fundamental gotcha: if you call a CPU-intensive function or a blocking I/O call โ like time.sleep(), requests.get(), or open().read() โ inside an async def without an async wrapper, you block the entire OS thread for the duration of that call. Every other coroutine in the program stalls. There is no error, no warning in production, just silently degraded throughput.
Under the hood, asyncio uses the OS I/O notification facility โ epoll on Linux, kqueue on macOS, IOCP on Windows โ to monitor file descriptors. When a socket becomes readable (HTTP response arrived) or writable (ready to send more data), the OS notifies the event loop, which looks up the coroutine registered on that file descriptor and marks it ready for resumption.
At the Python level, coroutine objects are built on generators. await compiles to yield from in CPython bytecode. The event loop calls .send(None) on the coroutine's generator frame to resume it. When the coroutine yields (via await), control returns to the loop's _run_once() method, which calls the selector's select() with an appropriate timeout, processes ready I/O callbacks, and picks the next coroutine to drive.
uvloop is a drop-in replacement event loop written in Cython on top of libuv โ the same async I/O library that powers Node.js. It replaces the default asyncio event loop implementation with a substantially faster one:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# asyncio.run() now uses the uvloop implementation automatically
Benchmarks consistently show uvloop delivering 2โ4ร higher throughput than the default loop for network-heavy workloads. The gain comes from reducing Python-level overhead in the I/O polling inner loop โ libuv's selector loop is implemented in C with zero Python object creation per iteration.
Performance Analysis: The I/O-Bound vs CPU-Bound Matrix
The decision between asyncio, threading, and multiprocessing depends entirely on where your workload bottlenecks. The following matrix is the definitive guide:
| Workload type | Bottleneck | Best tool | Core reason |
| HTTP API calls, database queries, cache reads | Network / disk I/O latency | asyncio | One event loop handles 10k+ concurrent waits; zero thread overhead |
| CPU-intensive math, ML inference, image encoding | CPU cycles | multiprocessing | Spawns separate Python interpreters; true multi-core parallelism; bypasses GIL |
| Blocking C library calls (PIL, sqlite3, cv2) | C-level blocking syscalls | threading | GIL releases on blocking C code; threads parallel-wait at OS level |
| Mixed I/O + light CPU computation per request | Network + compute | asyncio + run_in_executor | Offload CPU work to thread or process pool; keep event loop unblocked |
| Simple sequential script, one or two I/O calls | โ | Synchronous code | asyncio adds boilerplate complexity with zero benefit for non-concurrent workloads |
The reason asyncio beats threading for I/O is overhead, not raw speed. A Python thread requires 8 MB of stack memory, a kernel scheduler context, and GIL acquisition/release for every Python opcode executed. At 1,000 concurrent connections, threads consume ~8 GB of memory and spend significant CPU time context-switching. An asyncio event loop handles 1,000 concurrent coroutines with roughly 200 bytes of Python overhead per coroutine and zero OS context-switching cost.
The reason asyncio loses to multiprocessing for CPU work is not the GIL per se โ it is the fundamental nature of cooperative scheduling. A CPU-bound function never reaches an await point, so the event loop never gets a turn during its execution. Even if you wrap CPU-heavy code in async def, it runs synchronously from the event loop's perspective. The fix is loop.run_in_executor(executor, blocking_func, *args), which delegates the blocking work to a ThreadPoolExecutor or ProcessPoolExecutor and returns an awaitable that resolves when the executor is done.
The canonical formula: I/O-bound workload โ asyncio. CPU-bound workload โ multiprocessing. Blocking C library โ threading. Everything else โ synchronous code. When genuinely unsure, profile first with asyncio debug mode (PYTHONASYNCIODEBUG=1) before reaching for multiprocessing complexity.
๐ Two Coroutines, One Thread: Visualizing Interleaved I/O Execution
The sequence diagram below shows how two coroutines โ fetch_user and fetch_orders โ share a single event loop thread. Neither blocks the other. When one coroutine dispatches its database query and hits await, the event loop immediately switches to the other. Both queries are in-flight simultaneously, and the program's total wall-clock time is the latency of the slower query alone โ not the sum of both.
sequenceDiagram
participant EL as Event Loop
participant FU as fetch_user
participant DB1 as User DB
participant FO as fetch_orders
participant DB2 as Orders DB
EL->>FU: Resume coroutine
FU->>DB1: Send query to User DB
FU->>EL: Yield control - waiting for DB1
EL->>FO: Resume coroutine
FO->>DB2: Send query to Orders DB
FO->>EL: Yield control - waiting for DB2
DB1-->>EL: I/O ready notification
EL->>FU: Resume - User DB responded
FU->>EL: Return user data
DB2-->>EL: I/O ready notification
EL->>FO: Resume - Orders DB responded
FO->>EL: Return orders data
Notice the critical moment: fetch_user sends its query and immediately hands control back to the event loop (FU->>EL: Yield control). The event loop then starts fetch_orders, which similarly yields after dispatching its own query. Both queries are now in-flight on a single thread. When DB1 responds first, the event loop resumes fetch_user. When DB2 responds, it resumes fetch_orders. The event loop never spins idle โ it either runs a coroutine or waits on the OS selector for the next I/O event.
This interleaving is why asyncio.gather() achieves concurrency without threads. The apparent parallelism is an illusion of overlapping I/O wait times, not simultaneous execution. And for I/O-bound workloads, overlapping wait times is all you need.
๐ Where asyncio Shows Up in Real Python Production Stacks
asyncio is not a niche academic feature. It underpins the entire modern Python async ecosystem, and you will encounter it every time you touch production Python web services or data pipelines.
Async HTTP clients โ aiohttp and httpx
aiohttp is the most widely deployed async HTTP library. It provides both an async client (for making outbound requests) and an async server (for handling inbound requests). Teams at Discord, Sentry, and countless SaaS companies run their service-to-service communication through aiohttp clients. httpx is a newer alternative offering a near-identical interface to the familiar requests library, with first-class async support and HTTP/2 out of the box. Both eliminate the blocking I/O of requests.get and integrate seamlessly with the event loop.
Async database drivers โ asyncpg and SQLAlchemy async
asyncpg is a high-performance async driver for PostgreSQL. It is consistently 3โ5ร faster than psycopg2 in high-concurrency benchmarks because it never blocks the event loop on query execution. SQLAlchemy 1.4 and later ships an async extension (sqlalchemy.ext.asyncio) that wraps async drivers like asyncpg with the familiar ORM interface, so you can use session.execute() and session.query() patterns without rewriting all your data access logic:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with AsyncSessionLocal() as session:
result = await session.execute(select(User).where(User.id == 42))
user = result.scalar_one()
FastAPI request handling
FastAPI is built on Starlette, which runs entirely on asyncio. Every route handler you declare as async def is executed directly on the event loop, allowing FastAPI to handle hundreds of concurrent in-flight requests on a single Uvicorn worker without blocking. Handlers declared as plain def (synchronous) are automatically offloaded to a thread pool via run_in_executor, so they do not stall the event loop even if they do blocking work. This design means you can mix legacy synchronous database drivers with async route handlers during a migration, paying only the thread pool overhead for the blocking portions.
Async file I/O with aiofiles
Python's built-in open() is a blocking call โ it will stall the event loop for the duration of any disk read or write. aiofiles wraps file operations in a ThreadPoolExecutor, giving you the async with open() syntax without blocking the event loop. This matters especially for services that read large configuration files, write audit logs, or process file uploads:
import aiofiles
async def write_audit_log(path, entry):
async with aiofiles.open(path, mode='a') as f:
await f.write(f"{entry}\n")
โ๏ธ asyncio vs Threads vs Multiprocessing: Choosing the Right Concurrency Model
The three Python concurrency tools solve different problems. Picking the wrong one does not just underperform โ it can introduce subtle bugs that are very hard to debug in production.
asyncio versus threading
Both run inside a single OS process. The GIL applies to both. The fundamental difference is scheduling strategy: threads are preemptively scheduled by the OS โ the OS can switch between threads at any Python bytecode boundary, without the thread's cooperation. Coroutines are cooperatively scheduled โ the event loop can only switch at explicit await points.
Cooperative scheduling makes async code easier to reason about. You know exactly where context switches can happen: only at await. Between two consecutive await calls, your coroutine runs atomically. In threaded code, you can never be sure โ any list append, dict update, or counter increment could be preempted mid-operation, and you must protect everything with locks.
The resource overhead comparison is stark. Each Python thread requires around 8 MB of OS stack memory and generates GIL pressure on every Python object access. Spawning 1,000 threads consumes 8 GB just for stacks, before any application state. An asyncio coroutine object is roughly 200 bytes. The event loop can manage tens of thousands of concurrent coroutines comfortably within normal memory budgets.
asyncio versus multiprocessing
multiprocessing spawns separate OS processes, each with its own Python interpreter and its own GIL. This is the only way to achieve genuine multi-core parallelism for CPU-bound Python code. The tradeoff is substantial: each process requires 30โ100 MB of memory (for the interpreter, loaded modules, and heap), inter-process communication requires serialization via pickle, and spawning a new process takes 100โ500 milliseconds.
asyncio bridges the gap with loop.run_in_executor(executor, func, *args). You can offload CPU-heavy work to a ProcessPoolExecutor without blocking the event loop, receiving an awaitable that resolves when the executor finishes. This is the production pattern for services that mix I/O-heavy request handling with occasional CPU-intensive operations (report generation, image resizing, ML inference on a request path):
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_heavy(data):
# Runs in a separate process โ does not block the event loop
return sum(x ** 2 for x in data)
async def handle_request(data):
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_heavy, data)
return result
When NOT to reach for async
| Scenario | Why async is the wrong choice |
| CPU-intensive computation: ML training, video encoding | Never yields at await; blocks the event loop; use multiprocessing |
| Simple scripts with one or two I/O calls | Adds boilerplate complexity for zero concurrency benefit |
| Libraries with no async-native API | Calling requests.get() inside async def blocks the entire loop |
| Code that must run on Python 3.6 or earlier | asyncio.run() and stable Task API require Python 3.7+ |
| Teams unfamiliar with cooperative scheduling semantics | Subtle event-loop stall bugs require deep understanding to diagnose |
The rule of thumb: if your code does not spend measurable time waiting for I/O, asyncio will not help and may silently hurt.
๐งญ Which Concurrency Tool Should You Reach For?
Use this decision matrix as your first filter when choosing between Python's concurrency models. After picking the model, the second question is whether every library your code touches has a native async API โ if not, you need an executor bridge:
| Scenario | Needs parallel execution across cores? | Recommended tool | Notes |
| HTTP calls, DB queries, cache reads | No โ overlapping waits is enough | asyncio + aiohttp/asyncpg | The sweet spot for asyncio |
| Same โ but want multi-core distribution | Yes | asyncio + uvloop per process, behind a load balancer | Use Gunicorn + Uvicorn workers |
| CPU-heavy: number crunching, encoding, ML | No (single-core) | Synchronous code | Least overhead; no scheduler tax |
| CPU-heavy: number crunching, encoding, ML | Yes (multi-core) | multiprocessing or concurrent.futures.ProcessPoolExecutor | True parallelism; bypass GIL |
| Blocking C library (PIL, sqlite3, OpenCV) | No | threading or run_in_executor(ThreadPoolExecutor) | GIL releases on C-level blocking; threads parallel-wait |
| Mixed I/O + CPU per request | No | asyncio + run_in_executor for CPU portion | Keep event loop free; offload heavy work |
| Simple script, one file or one HTTP call | โ | Synchronous code | Do not over-engineer |
The column "Needs parallel execution across cores?" is the primary split. For I/O-bound work, you almost never need true multi-core parallelism โ the bottleneck is network or disk latency, not CPU capacity. For CPU-bound work, you almost always do โ Python's GIL prevents a single process from using more than one core for Python bytecode.
๐งช Three Worked Examples: HTTP Fetching, Task Queues, and DB Connection Pooling
These examples build in complexity from the most common production use case to a full async infrastructure pattern. Each demonstrates a distinct asyncio primitive. Pay attention to where await appears โ every await is a potential yield point where the event loop can schedule other work.
Example 1: Concurrent HTTP Fetcher with Bounded Concurrency
This example demonstrates how to fetch many URLs concurrently while bounding the number of simultaneous in-flight requests using asyncio.Semaphore. Without a semaphore, asyncio.gather would fire all requests simultaneously, potentially overwhelming target servers or exhausting local OS socket limits. The semaphore acts as a concurrency valve โ at most 50 requests can be active at the same time.
import asyncio
import aiohttp
async def fetch_url(session, semaphore, url):
async with semaphore: # Suspends here if 50 slots are all taken
try:
timeout = aiohttp.ClientTimeout(total=5)
async with session.get(url, timeout=timeout) as resp:
body = await resp.read()
return {"url": url, "status": resp.status, "bytes": len(body)}
except Exception as exc:
return {"url": url, "error": str(exc)}
async def fetch_all(urls, max_concurrent=50):
semaphore = asyncio.Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, semaphore, url) for url in urls]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
urls = [f"https://httpbin.org/delay/{i % 2}" for i in range(30)]
results = asyncio.run(fetch_all(urls))
for r in results:
print(r)
asyncio.Semaphore(50) is an async-native counter. async with semaphore decrements it on entry and increments it on exit, suspending the coroutine (not the thread) if the counter is already at zero. Other coroutines continue running during the suspension โ the event loop is never blocked.
Example 2: Async Producer/Consumer Task Queue
This example builds a producer/consumer pipeline where a producer enqueues work items and three worker coroutines consume and process them concurrently. This is the async equivalent of the thread-safe queue.Queue + worker thread pattern, but with zero thread overhead.
import asyncio
async def producer(queue, items):
for item in items:
await queue.put(item)
print(f"Produced: {item}")
# Send a None sentinel to each worker to signal shutdown
for _ in range(3):
await queue.put(None)
async def worker(worker_id, queue):
while True:
item = await queue.get()
if item is None:
print(f"Worker {worker_id} shutting down")
queue.task_done()
break
await asyncio.sleep(0.05) # Simulate async I/O work
print(f"Worker {worker_id} processed: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10) # Bounded queue โ backpressure on producer
items = [f"task-{i}" for i in range(15)]
producer_task = asyncio.create_task(producer(queue, items))
worker_tasks = [asyncio.create_task(worker(i, queue)) for i in range(3)]
await asyncio.gather(producer_task, *worker_tasks)
asyncio.run(main())
asyncio.Queue(maxsize=10) creates bounded backpressure: if the queue is full, queue.put() suspends the producer until a worker drains a slot. The sentinel None pattern gracefully signals each worker to stop after all items are consumed. queue.task_done() lets you use queue.join() elsewhere to wait for all work to complete.
Example 3: Async Context Manager for Database Connection Pooling
This example demonstrates how to write a custom async context manager using __aenter__ and __aexit__. Connection pooling is one of the most important production patterns for async database access โ reusing a fixed pool of pre-opened connections avoids the latency and resource cost of opening a new connection for every query.
import asyncio
import asyncpg
class AsyncDBPool:
def __init__(self, dsn, min_size=2, max_size=10):
self.dsn = dsn
self.min_size = min_size
self.max_size = max_size
self._pool = None
async def __aenter__(self):
self._pool = await asyncpg.create_pool(
self.dsn,
min_size=self.min_size,
max_size=self.max_size,
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._pool.close()
async def fetch_user(self, user_id):
async with self._pool.acquire() as conn:
return await conn.fetchrow(
"SELECT id, name, email FROM users WHERE id = $1",
user_id
)
async def main():
dsn = "postgresql://user:pass@localhost/mydb"
async with AsyncDBPool(dsn, max_size=20) as db:
tasks = [db.fetch_user(i) for i in range(1, 101)]
users = await asyncio.gather(*tasks)
print(f"Fetched {len([u for u in users if u])} users")
asyncio.run(main())
asyncpg.create_pool maintains a pool of pre-opened database connections. pool.acquire() checks out a connection (suspending if all are in use) and returns it when the async with block exits. Running 100 concurrent queries against a pool of 20 connections means at most 20 queries are active at any moment โ the other 80 wait inside asyncio's scheduler, not the OS, consuming negligible resources.
๐ ๏ธ asyncio Ecosystem: The Libraries That Make It Production-Ready
The standard library asyncio module provides the scheduler, synchronization primitives (Queue, Semaphore, Lock, Event, Condition), and high-level utilities (gather, wait_for, create_task, shield). In production, these companion libraries complete the stack:
asyncio (stdlib) โ The foundation. Included with Python 3.4+; API stabilized in 3.7; asyncio.run() added in 3.7. No installation required. Start here for all scheduling primitives and synchronization needs.
aiohttp โ Full-featured async HTTP client and server. Supports connection pooling, cookie jars, streaming responses, multipart uploads, WebSockets, and middleware. Install: pip install aiohttp. Minimal usage:
async with aiohttp.ClientSession() as session:
async with session.get("https://api.github.com/repos/python/cpython") as r:
data = await r.json()
print(data["stargazers_count"])
httpx โ Modern async HTTP client with an API nearly identical to requests, making it a frictionless upgrade for existing codebases. Supports HTTP/2, certificate pinning, and sync/async modes in one package. Install: pip install httpx. Usage:
async with httpx.AsyncClient() as client:
resp = await client.get("https://api.github.com/repos/python/cpython")
data = resp.json()
print(data["stargazers_count"])
uvloop โ A Cython-based drop-in event loop built on libuv. Delivers 2โ4ร throughput improvements for network-heavy services with a single configuration line. Used in production at companies running high-throughput Python API services. Install: pip install uvloop. Usage:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# All subsequent asyncio.run() calls use uvloop automatically
For a production FastAPI service, the recommended async stack is: uvloop (event loop) + httpx or aiohttp (outbound HTTP) + asyncpg + SQLAlchemy async (database) + aiofiles (file I/O) + aiocache or Redis with an async driver (caching). Every layer is non-blocking end to end โ the event loop never stalls on I/O.
๐ Lessons From the Trenches: Common asyncio Mistakes
Blocking the event loop silently. This is the most widespread mistake, and it is invisible without monitoring. Calling time.sleep(1), requests.get(url), open(path).read(), or any synchronous blocking function inside an async def freezes the entire program for the duration of that call. Every other in-flight coroutine stalls. There is no error. Throughput craters. The fix: time.sleep โ await asyncio.sleep; requests โ aiohttp or httpx; open โ aiofiles; CPU-heavy functions โ await loop.run_in_executor(None, blocking_func, arg). Enable PYTHONASYNCIODEBUG=1 in staging to get warnings when a single event loop iteration takes longer than 100ms.
Forgetting await before a coroutine call. Writing result = fetch(url) instead of result = await fetch(url) creates a coroutine object but never executes its body. Python will emit RuntimeWarning: coroutine 'fetch' was never awaited in development. In production, the bug often manifests as result being None or a coroutine object, silently corrupting downstream logic. Enable PYTHONASYNCIODEBUG=1 and asyncio.get_event_loop().set_debug(True) to catch these at development time.
Mixing synchronous and async code without a bridge. If you have a synchronous callback, click handler, or third-party hook that needs to trigger async work, you cannot simply await inside it โ there is no running event loop in synchronous context. Use asyncio.run(coro) to create a temporary loop if calling from a script entrypoint, or loop.run_coroutine_threadsafe(coro, loop) to submit work to an already-running loop from a different thread.
Creating tasks and ignoring them. asyncio.create_task(coro) schedules a coroutine but does not wait for it. If the parent coroutine returns before the task finishes โ or if an exception is raised โ the task may be silently cancelled or its exception may go unhandled. Always store a reference and await it, or use asyncio.gather() to collect all tasks before the parent exits. The asyncio.TaskGroup API (Python 3.11+) makes this safer by automatically waiting for all child tasks and propagating exceptions.
Assuming await boundaries are safe for shared mutable state. Between two await calls, your coroutine runs atomically โ no other coroutine can interfere. But at every await, another coroutine may run and modify shared state. Reading a value, yielding, then using the value again without re-reading it is a logical race condition. Use asyncio.Lock for critical sections, or design shared state as immutable snapshots to avoid the issue entirely.
๐ Summary and Key Takeaways
TLDR:
asynciolets a single Python thread handle thousands of concurrent I/O operations by cooperatively yielding control between coroutines at everyawait. It is not parallelism โ it is CPU-efficient waiting. Use it for network and database workloads; reach formultiprocessingwhen the bottleneck is CPU, not I/O.
Five ideas to walk away with:
A coroutine is a suspended computation, not a function call.
async def f()defines a coroutine function. Calling it returns a coroutine object with no code executed.await f()orasyncio.run(f())actually drives it to completion.The event loop is a cooperative single-threaded scheduler. It runs one coroutine at a time and can only switch at
awaitpoints. Any blocking call insideasync deffreezes every other coroutine in the program.asyncio.gather()is your primary concurrency tool. It schedules multiple coroutines and lets them interleave atawaitpoints, collecting results in input order. Useasyncio.create_task()for fire-and-schedule patterns.Match the tool to the bottleneck. asyncio for I/O-bound work.
multiprocessingfor CPU-bound work.threadingfor blocking C libraries. Synchronous code for everything else.Use async-native libraries end to end. One blocking call โ
requests.get,time.sleep,psycopg2.executeโ undoes all the concurrency benefits. Every library in your async stack must either be async-native or wrapped inrun_in_executor.
๐ Practice Quiz
Test your understanding of Python asyncio, the event loop, and concurrency model selection.
- You write
coro = my_async_function()withoutawait. What doescorocontain, and has any code insidemy_async_functionrun yet?
Show answer
Correct Answer:coro is a coroutine object โ a suspended computation wrapped around the function's bytecode frame. No code inside my_async_function has executed. The function body only runs when the coroutine is driven by await coro inside another async def, or by asyncio.run(coro) at the top level. Python will emit RuntimeWarning: coroutine 'my_async_function' was never awaited when the object is garbage collected.
- What is the behavioral difference between
result = await fetch(url)andtask = asyncio.create_task(fetch(url))?
Show answer
Correct Answer:await fetch(url) suspends the current coroutine and waits for fetch to complete fully before the current coroutine can continue โ sequential execution. asyncio.create_task(fetch(url)) schedules fetch on the event loop immediately and returns a Task without blocking the caller. Both the calling coroutine and the task can make progress concurrently at subsequent await points. To get the task result, you later await task or include it in asyncio.gather().
- A colleague wraps a blocking HTTP call in
async def fetch(url): return requests.get(url). They claim this makes it non-blocking because it is now anasync def. Are they correct? What would you say?
Show answer
Correct Answer: They are incorrect. Addingasync def around a blocking call does not make it non-blocking. requests.get() is a synchronous function that blocks the OS thread. When an asyncio coroutine calls it, the event loop thread blocks for the entire duration of the HTTP request. All other in-flight coroutines stall. There is no await inside the function, so the event loop never gets a turn. The fix is to replace requests with an async-native library like aiohttp (async with session.get(url)) or to offload the blocking call with await loop.run_in_executor(None, requests.get, url).
asyncio.gather(task_a, task_b, task_c)is running.task_braises aValueError. What happens totask_aandtask_cby default, and how do you change the behavior to collect all results including exceptions?
Show answer
Correct Answer: By default, when any task raises an exception,asyncio.gather immediately cancels the remaining tasks and propagates the exception to the caller. task_a and task_c receive a CancelledError at their current await point. To change this, pass return_exceptions=True: all tasks run to completion regardless of individual failures, and exceptions are returned as result values in the output list alongside normal return values. The caller can then inspect each result with isinstance(r, Exception).
- Your service performs 200 independent PostgreSQL queries per incoming request and is clearly I/O-bound. A teammate proposes rewriting it with
multiprocessing.Poolto scale across all 16 CPU cores. Is this a good idea? What would you recommend instead?
Show answer
Correct Answer: For I/O-bound workloads like database queries,multiprocessing adds significant overhead โ process spawn time (100โ500ms), memory duplication (~50โ100 MB per process), and pickle serialization for inter-process communication โ without addressing the actual bottleneck, which is network and disk latency. The right tool is asyncio with asyncpg and a connection pool. You can run all 200 queries concurrently on a single event loop thread with asyncio.gather(), limited by pool size via asyncpg.create_pool(max_size=20). This achieves far greater effective throughput with a fraction of the resource cost.
- Open-ended challenge: You need to build an async web crawler that fetches up to 50,000 pages while: (a) respecting a maximum of 30 simultaneous requests, (b) retrying each failed URL up to 3 times with exponential backoff, (c) writing results to disk without blocking the event loop, and (d) stopping gracefully when a cancellation signal is received. What
asyncioprimitives would you use for each constraint, and what would your coroutine structure look like?
Show answer
There is no single correct answer. A strong design uses:asyncio.Semaphore(30) for concurrency bounding (a); a retry loop with await asyncio.sleep(2 ** attempt) and a max_retries=3 counter inside the fetch coroutine, catching aiohttp.ClientError and asyncio.TimeoutError (b); aiofiles.open() for non-blocking file writes (c); asyncio.Event as a cancellation flag polled at the top of the worker loop, plus asyncio.TaskGroup or asyncio.gather wrapping a try/except asyncio.CancelledError block for graceful shutdown (d). The overall structure: a producer coroutine enqueues seed URLs into an asyncio.Queue, N worker coroutines drain the queue and enqueue newly discovered URLs, a writer coroutine drains a result queue to aiofiles. A visited-URL set prevents re-crawling.
๐ Related Posts

Written by
Abstract Algorithms
@abstractalgorithms
More Posts
Watermarking and Late Data Handling in Spark Structured Streaming
TLDR: A watermark tells Spark Structured Streaming: "I will accept events up to N minutes late, and then I am done waiting." Spark tracks the maximum event time seen per partition, takes the global minimum across all partitions, subtracts the thresho...
Spark Structured Streaming: Micro-Batch vs Continuous Processing
๐ The 15-Minute Gap: How a Fraud Team Discovered They Needed Real-Time Streaming A fintech team runs payment fraud detection with a well-tuned Spark batch job. Every 15 minutes it reads a day's worth of transaction events from S3, scores them agains...
Stateful Aggregations in Spark Structured Streaming: mapGroupsWithState
TLDR: mapGroupsWithState gives each streaming key its own mutable state object, persisted in a fault-tolerant state store that checkpoints to object storage on every micro-batch. Where window aggregations assume fixed time boundaries, mapGroupsWithSt...
Shuffles in Spark: Why groupBy Kills Performance
TLDR: A Spark shuffle is the most expensive operation in any distributed job โ it moves every matching key across the network, writes temporary sorted files to disk, and forces a hard synchronization barrier between every upstream and downstream stag...
