All Posts

Async Python: asyncio, Coroutines, and Event Loops Without the Confusion

Why async Python is not about parallelism โ€” it is about not wasting CPU while waiting for I/O

Abstract AlgorithmsAbstract Algorithms
ยทยท28 min read
Share
AI Share on X / Twitter
AI Share on LinkedIn
Copy link

AI-assisted content. This post may have been written or enhanced with the help of AI tools. While efforts are made to ensure accuracy, the content may contain errors or inaccuracies. Please verify critical information independently.


๐Ÿ“– The 500-Second Problem: What Cooperative Multitasking Actually Fixes

Suppose your monitoring pipeline checks the health endpoint of 1,000 internal microservices. Each HTTP call takes about 500 milliseconds โ€” network round-trip, DNS, TLS handshake, server response. A straightforward implementation fires each request one after the other:

import requests
import time

urls = [f"https://service-{i}.internal/health" for i in range(1000)]

start = time.time()
for url in urls:
    response = requests.get(url, timeout=5)
    print(response.status_code)

print(f"Total time: {time.time() - start:.1f}s")  # ~500 seconds

Five hundred seconds โ€” over eight minutes โ€” to do something that should feel instant. Look at what the CPU is doing during those 500 seconds: essentially nothing. Every requests.get call parks the entire program while the kernel waits for a network response. The CPU sits at 0% utilization for ~99.9% of the run. This is the problem asyncio was built to eliminate.

Now consider the same task written with asyncio and aiohttp:

import asyncio
import aiohttp
import time

async def fetch(session, url):
    async with session.get(url) as response:
        return response.status

async def main():
    urls = [f"https://service-{i}.internal/health" for i in range(1000)]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    print(results[:5])

start = time.time()
asyncio.run(main())
print(f"Total time: {time.time() - start:.1f}s")  # ~5 seconds

Five seconds instead of 500. Same single Python thread. No multiprocessing. No threads. No locks. No magic.

The reason is cooperative multitasking. While one coroutine is waiting for a response from service-1, the event loop does not stall โ€” it hands control to the coroutine that is waiting for service-2. When service-2's response arrives, that coroutine resumes. The OS kernel multiplexes the I/O using epoll (Linux), kqueue (macOS), or select (Windows), and the event loop calls the appropriate callback when data is ready. From Python's perspective, hundreds of I/O waits happen "at the same time" on one thread.

This is not parallelism. No two coroutines execute simultaneously. There is no true concurrency โ€” only the cooperative elimination of idle waiting time. That distinction is not a footnote; it is the whole design philosophy of asyncio, and it determines exactly when the tool is appropriate and when it is not.


๐Ÿ” async def, await, and Why a Coroutine Is Not a Function Call

Before you can read asyncio code fluently, you need to internalize one counterintuitive fact: calling an async def function does not execute its body. This surprises almost everyone coming from synchronous Python or other languages.

A regular function executes immediately when called:

def greet(name):
    return f"Hello, {name}"

result = greet("Ana")  # Body runs right now โ€” result is "Hello, Ana"

An async def function โ€” called a coroutine function โ€” returns a coroutine object when called. The body has not run. The coroutine object is a suspended computation waiting to be driven by an event loop:

async def greet(name):
    return f"Hello, {name}"

coro = greet("Ana")   # Body has NOT run โ€” coro is a coroutine object
print(coro)           # <coroutine object greet at 0x10f3a4b20>

Python will even emit a RuntimeWarning: coroutine 'greet' was never awaited if you let that coroutine object get garbage collected without running it. Forgetting await is one of the most common bugs in async codebases.

To actually execute a coroutine you have two options:

  1. asyncio.run(coro) โ€” creates a fresh event loop, runs the coroutine to completion, tears down the loop, and returns the result. This is your program's top-level entry point for async code. Never call it from inside an already-running event loop.

  2. await coro โ€” inside another async def function, suspends the current coroutine and gives control back to the event loop until the awaited coroutine finishes. The event loop is free to run other ready coroutines while this one waits.

async def main():
    result = await greet("Ana")   # Body of greet() runs here
    print(result)                  # "Hello, Ana"

asyncio.run(main())

The await keyword does two things simultaneously. First, it evaluates the awaitable expression โ€” which can be a coroutine, a Task, a Future, or any object implementing __await__. Second, it yields control back to the event loop, which is then free to schedule other ready coroutines. If the awaitable completes instantly, the yield is invisible to you, but it still happens at the bytecode level.

asyncio.sleep(0) is the canonical no-op yield โ€” a way of telling the event loop "run anything else that is ready before coming back to me" without actually waiting for anything:

async def cooperative_loop(name, count):
    for i in range(count):
        print(f"{name}: step {i}")
        await asyncio.sleep(0)  # Give other coroutines a turn

The three primitives you will encounter in every async codebase:

PrimitiveWhat it doesWhen to use
async def f()Defines a coroutine functionWhenever the function needs to await anything
await exprSuspends current coroutine; resumes when awaitable is doneInside async def; every I/O call
asyncio.run(coro)Runs a coroutine as the top-level entry pointMain function, test harness, script entry point

โš™๏ธ How the Event Loop Schedules, Suspends, and Resumes Coroutines

The event loop is the heart of asyncio. It is a scheduler that maintains a queue of ready-to-run coroutines and an I/O selector that watches file descriptors for readiness events. Its work cycle repeats continuously:

  1. Pick the next ready coroutine from the run queue.
  2. Resume it โ€” let it execute Python bytecode until it hits an await.
  3. If the await is on an I/O operation, register the file descriptor with the OS selector and move on.
  4. Poll the OS selector for I/O readiness events.
  5. Mark any coroutines whose I/O is now complete as ready.
  6. Repeat until the queue is empty.

The flowchart below traces one complete turn of this scheduling loop:

flowchart TD
    A[Event Loop starts] --> B[Pick next ready coroutine from queue]
    B --> C[Resume coroutine until it hits await]
    C --> D{What did the coroutine await?}
    D -->|Coroutine completed| E[Store result and mark Task done]
    E --> F{More coroutines in queue?}
    F -->|Yes| B
    F -->|No| G[Event Loop exits]
    D -->|I/O await - socket or file| H[Register fd with OS selector]
    H --> I[Poll OS selector with timeout]
    I --> J{Any file descriptor ready?}
    J -->|Yes| K[Move waiting coroutine back to run queue]
    K --> B
    J -->|No - still waiting| F

When a coroutine suspends on an I/O await, it is moved off the run queue entirely and registered against a file descriptor in the OS selector. The event loop does not spin-wait โ€” it calls select() or epoll_wait() with a timeout, allowing the OS kernel to block efficiently until I/O arrives. This is the mechanism that allows a single thread to appear to handle many concurrent I/O operations simultaneously.

asyncio.create_task() versus Awaiting a Coroutine Directly

Using await on a bare coroutine runs it sequentially โ€” one completes before the next starts:

async def main():
    result1 = await fetch(session, url1)   # Waits for full completion
    result2 = await fetch(session, url2)   # Only then starts this

asyncio.create_task() wraps a coroutine in a Task object, immediately schedules it on the running event loop, and returns the Task without blocking. Both tasks can now make progress at await points:

async def main():
    task1 = asyncio.create_task(fetch(session, url1))   # Scheduled immediately
    task2 = asyncio.create_task(fetch(session, url2))   # Scheduled immediately
    result1 = await task1   # Now both can interleave during I/O waits
    result2 = await task2

asyncio.gather() for Fan-Out Concurrency

asyncio.gather(*coroutines_or_tasks) is the idiomatic way to run many coroutines concurrently and collect all their results in one call:

results = await asyncio.gather(
    fetch(session, url1),
    fetch(session, url2),
    fetch(session, url3),
)
# results[0] corresponds to url1, results[1] to url2, etc.

All three coroutines start and can interleave freely at their await points. Results are returned in the same order as the inputs, regardless of which coroutine finished first. If any coroutine raises an exception, gather cancels the remaining tasks and re-raises the exception by default. Pass return_exceptions=True to collect exceptions as result values instead.

Timeouts with asyncio.wait_for()

Wrap any awaitable with asyncio.wait_for(coro, timeout=seconds) to cancel it automatically if it runs too long:

try:
    result = await asyncio.wait_for(fetch(session, slow_url), timeout=2.0)
except asyncio.TimeoutError:
    print("Request exceeded 2 seconds โ€” skipping")

When the timeout fires, asyncio cancels the inner task and raises asyncio.TimeoutError. The cancelled coroutine receives a CancelledError at its current await point. You can catch CancelledError inside the coroutine to run cleanup before re-raising โ€” for example, to close an open file or release a database connection.


๐Ÿง  Under the Hood: How Python's asyncio Scheduler Actually Works

Understanding the internals of asyncio transforms mysterious behavior into predictable cause and effect. This section covers the cooperative scheduling model and where its performance profile holds up versus where it degrades.

The Internals of the Event Loop

Python's asyncio event loop is single-threaded and cooperative. These two adjectives explain both its elegance and its failure modes.

"Single-threaded" means no two coroutines ever run at the same Python instruction simultaneously. There are no data races on shared Python objects โ€” not because of locking, but because only one coroutine can be executing at any given moment. You can freely share lists, dicts, and custom Python objects between coroutines without mutexes, as long as you remain alert to mutation at await boundaries (where another coroutine may modify shared state before you resume).

"Cooperative" means the event loop cannot preempt a running coroutine mid-execution. The scheduler can only regain control when the coroutine voluntarily yields via an await. This is the fundamental gotcha: if you call a CPU-intensive function or a blocking I/O call โ€” like time.sleep(), requests.get(), or open().read() โ€” inside an async def without an async wrapper, you block the entire OS thread for the duration of that call. Every other coroutine in the program stalls. There is no error, no warning in production, just silently degraded throughput.

Under the hood, asyncio uses the OS I/O notification facility โ€” epoll on Linux, kqueue on macOS, IOCP on Windows โ€” to monitor file descriptors. When a socket becomes readable (HTTP response arrived) or writable (ready to send more data), the OS notifies the event loop, which looks up the coroutine registered on that file descriptor and marks it ready for resumption.

At the Python level, coroutine objects are built on generators. await compiles to yield from in CPython bytecode. The event loop calls .send(None) on the coroutine's generator frame to resume it. When the coroutine yields (via await), control returns to the loop's _run_once() method, which calls the selector's select() with an appropriate timeout, processes ready I/O callbacks, and picks the next coroutine to drive.

uvloop is a drop-in replacement event loop written in Cython on top of libuv โ€” the same async I/O library that powers Node.js. It replaces the default asyncio event loop implementation with a substantially faster one:

import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# asyncio.run() now uses the uvloop implementation automatically

Benchmarks consistently show uvloop delivering 2โ€“4ร— higher throughput than the default loop for network-heavy workloads. The gain comes from reducing Python-level overhead in the I/O polling inner loop โ€” libuv's selector loop is implemented in C with zero Python object creation per iteration.

Performance Analysis: The I/O-Bound vs CPU-Bound Matrix

The decision between asyncio, threading, and multiprocessing depends entirely on where your workload bottlenecks. The following matrix is the definitive guide:

Workload typeBottleneckBest toolCore reason
HTTP API calls, database queries, cache readsNetwork / disk I/O latencyasyncioOne event loop handles 10k+ concurrent waits; zero thread overhead
CPU-intensive math, ML inference, image encodingCPU cyclesmultiprocessingSpawns separate Python interpreters; true multi-core parallelism; bypasses GIL
Blocking C library calls (PIL, sqlite3, cv2)C-level blocking syscallsthreadingGIL releases on blocking C code; threads parallel-wait at OS level
Mixed I/O + light CPU computation per requestNetwork + computeasyncio + run_in_executorOffload CPU work to thread or process pool; keep event loop unblocked
Simple sequential script, one or two I/O callsโ€”Synchronous codeasyncio adds boilerplate complexity with zero benefit for non-concurrent workloads

The reason asyncio beats threading for I/O is overhead, not raw speed. A Python thread requires 8 MB of stack memory, a kernel scheduler context, and GIL acquisition/release for every Python opcode executed. At 1,000 concurrent connections, threads consume ~8 GB of memory and spend significant CPU time context-switching. An asyncio event loop handles 1,000 concurrent coroutines with roughly 200 bytes of Python overhead per coroutine and zero OS context-switching cost.

The reason asyncio loses to multiprocessing for CPU work is not the GIL per se โ€” it is the fundamental nature of cooperative scheduling. A CPU-bound function never reaches an await point, so the event loop never gets a turn during its execution. Even if you wrap CPU-heavy code in async def, it runs synchronously from the event loop's perspective. The fix is loop.run_in_executor(executor, blocking_func, *args), which delegates the blocking work to a ThreadPoolExecutor or ProcessPoolExecutor and returns an awaitable that resolves when the executor is done.

The canonical formula: I/O-bound workload โ†’ asyncio. CPU-bound workload โ†’ multiprocessing. Blocking C library โ†’ threading. Everything else โ†’ synchronous code. When genuinely unsure, profile first with asyncio debug mode (PYTHONASYNCIODEBUG=1) before reaching for multiprocessing complexity.


๐Ÿ“Š Two Coroutines, One Thread: Visualizing Interleaved I/O Execution

The sequence diagram below shows how two coroutines โ€” fetch_user and fetch_orders โ€” share a single event loop thread. Neither blocks the other. When one coroutine dispatches its database query and hits await, the event loop immediately switches to the other. Both queries are in-flight simultaneously, and the program's total wall-clock time is the latency of the slower query alone โ€” not the sum of both.

sequenceDiagram
    participant EL as Event Loop
    participant FU as fetch_user
    participant DB1 as User DB
    participant FO as fetch_orders
    participant DB2 as Orders DB

    EL->>FU: Resume coroutine
    FU->>DB1: Send query to User DB
    FU->>EL: Yield control - waiting for DB1
    EL->>FO: Resume coroutine
    FO->>DB2: Send query to Orders DB
    FO->>EL: Yield control - waiting for DB2
    DB1-->>EL: I/O ready notification
    EL->>FU: Resume - User DB responded
    FU->>EL: Return user data
    DB2-->>EL: I/O ready notification
    EL->>FO: Resume - Orders DB responded
    FO->>EL: Return orders data

Notice the critical moment: fetch_user sends its query and immediately hands control back to the event loop (FU->>EL: Yield control). The event loop then starts fetch_orders, which similarly yields after dispatching its own query. Both queries are now in-flight on a single thread. When DB1 responds first, the event loop resumes fetch_user. When DB2 responds, it resumes fetch_orders. The event loop never spins idle โ€” it either runs a coroutine or waits on the OS selector for the next I/O event.

This interleaving is why asyncio.gather() achieves concurrency without threads. The apparent parallelism is an illusion of overlapping I/O wait times, not simultaneous execution. And for I/O-bound workloads, overlapping wait times is all you need.


๐ŸŒ Where asyncio Shows Up in Real Python Production Stacks

asyncio is not a niche academic feature. It underpins the entire modern Python async ecosystem, and you will encounter it every time you touch production Python web services or data pipelines.

Async HTTP clients โ€” aiohttp and httpx

aiohttp is the most widely deployed async HTTP library. It provides both an async client (for making outbound requests) and an async server (for handling inbound requests). Teams at Discord, Sentry, and countless SaaS companies run their service-to-service communication through aiohttp clients. httpx is a newer alternative offering a near-identical interface to the familiar requests library, with first-class async support and HTTP/2 out of the box. Both eliminate the blocking I/O of requests.get and integrate seamlessly with the event loop.

Async database drivers โ€” asyncpg and SQLAlchemy async

asyncpg is a high-performance async driver for PostgreSQL. It is consistently 3โ€“5ร— faster than psycopg2 in high-concurrency benchmarks because it never blocks the event loop on query execution. SQLAlchemy 1.4 and later ships an async extension (sqlalchemy.ext.asyncio) that wraps async drivers like asyncpg with the familiar ORM interface, so you can use session.execute() and session.query() patterns without rewriting all your data access logic:

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async with AsyncSessionLocal() as session:
    result = await session.execute(select(User).where(User.id == 42))
    user = result.scalar_one()

FastAPI request handling

FastAPI is built on Starlette, which runs entirely on asyncio. Every route handler you declare as async def is executed directly on the event loop, allowing FastAPI to handle hundreds of concurrent in-flight requests on a single Uvicorn worker without blocking. Handlers declared as plain def (synchronous) are automatically offloaded to a thread pool via run_in_executor, so they do not stall the event loop even if they do blocking work. This design means you can mix legacy synchronous database drivers with async route handlers during a migration, paying only the thread pool overhead for the blocking portions.

Async file I/O with aiofiles

Python's built-in open() is a blocking call โ€” it will stall the event loop for the duration of any disk read or write. aiofiles wraps file operations in a ThreadPoolExecutor, giving you the async with open() syntax without blocking the event loop. This matters especially for services that read large configuration files, write audit logs, or process file uploads:

import aiofiles

async def write_audit_log(path, entry):
    async with aiofiles.open(path, mode='a') as f:
        await f.write(f"{entry}\n")

โš–๏ธ asyncio vs Threads vs Multiprocessing: Choosing the Right Concurrency Model

The three Python concurrency tools solve different problems. Picking the wrong one does not just underperform โ€” it can introduce subtle bugs that are very hard to debug in production.

asyncio versus threading

Both run inside a single OS process. The GIL applies to both. The fundamental difference is scheduling strategy: threads are preemptively scheduled by the OS โ€” the OS can switch between threads at any Python bytecode boundary, without the thread's cooperation. Coroutines are cooperatively scheduled โ€” the event loop can only switch at explicit await points.

Cooperative scheduling makes async code easier to reason about. You know exactly where context switches can happen: only at await. Between two consecutive await calls, your coroutine runs atomically. In threaded code, you can never be sure โ€” any list append, dict update, or counter increment could be preempted mid-operation, and you must protect everything with locks.

The resource overhead comparison is stark. Each Python thread requires around 8 MB of OS stack memory and generates GIL pressure on every Python object access. Spawning 1,000 threads consumes 8 GB just for stacks, before any application state. An asyncio coroutine object is roughly 200 bytes. The event loop can manage tens of thousands of concurrent coroutines comfortably within normal memory budgets.

asyncio versus multiprocessing

multiprocessing spawns separate OS processes, each with its own Python interpreter and its own GIL. This is the only way to achieve genuine multi-core parallelism for CPU-bound Python code. The tradeoff is substantial: each process requires 30โ€“100 MB of memory (for the interpreter, loaded modules, and heap), inter-process communication requires serialization via pickle, and spawning a new process takes 100โ€“500 milliseconds.

asyncio bridges the gap with loop.run_in_executor(executor, func, *args). You can offload CPU-heavy work to a ProcessPoolExecutor without blocking the event loop, receiving an awaitable that resolves when the executor finishes. This is the production pattern for services that mix I/O-heavy request handling with occasional CPU-intensive operations (report generation, image resizing, ML inference on a request path):

import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_heavy(data):
    # Runs in a separate process โ€” does not block the event loop
    return sum(x ** 2 for x in data)

async def handle_request(data):
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_heavy, data)
    return result

When NOT to reach for async

ScenarioWhy async is the wrong choice
CPU-intensive computation: ML training, video encodingNever yields at await; blocks the event loop; use multiprocessing
Simple scripts with one or two I/O callsAdds boilerplate complexity for zero concurrency benefit
Libraries with no async-native APICalling requests.get() inside async def blocks the entire loop
Code that must run on Python 3.6 or earlierasyncio.run() and stable Task API require Python 3.7+
Teams unfamiliar with cooperative scheduling semanticsSubtle event-loop stall bugs require deep understanding to diagnose

The rule of thumb: if your code does not spend measurable time waiting for I/O, asyncio will not help and may silently hurt.


๐Ÿงญ Which Concurrency Tool Should You Reach For?

Use this decision matrix as your first filter when choosing between Python's concurrency models. After picking the model, the second question is whether every library your code touches has a native async API โ€” if not, you need an executor bridge:

ScenarioNeeds parallel execution across cores?Recommended toolNotes
HTTP calls, DB queries, cache readsNo โ€” overlapping waits is enoughasyncio + aiohttp/asyncpgThe sweet spot for asyncio
Same โ€” but want multi-core distributionYesasyncio + uvloop per process, behind a load balancerUse Gunicorn + Uvicorn workers
CPU-heavy: number crunching, encoding, MLNo (single-core)Synchronous codeLeast overhead; no scheduler tax
CPU-heavy: number crunching, encoding, MLYes (multi-core)multiprocessing or concurrent.futures.ProcessPoolExecutorTrue parallelism; bypass GIL
Blocking C library (PIL, sqlite3, OpenCV)Nothreading or run_in_executor(ThreadPoolExecutor)GIL releases on C-level blocking; threads parallel-wait
Mixed I/O + CPU per requestNoasyncio + run_in_executor for CPU portionKeep event loop free; offload heavy work
Simple script, one file or one HTTP callโ€”Synchronous codeDo not over-engineer

The column "Needs parallel execution across cores?" is the primary split. For I/O-bound work, you almost never need true multi-core parallelism โ€” the bottleneck is network or disk latency, not CPU capacity. For CPU-bound work, you almost always do โ€” Python's GIL prevents a single process from using more than one core for Python bytecode.


๐Ÿงช Three Worked Examples: HTTP Fetching, Task Queues, and DB Connection Pooling

These examples build in complexity from the most common production use case to a full async infrastructure pattern. Each demonstrates a distinct asyncio primitive. Pay attention to where await appears โ€” every await is a potential yield point where the event loop can schedule other work.

Example 1: Concurrent HTTP Fetcher with Bounded Concurrency

This example demonstrates how to fetch many URLs concurrently while bounding the number of simultaneous in-flight requests using asyncio.Semaphore. Without a semaphore, asyncio.gather would fire all requests simultaneously, potentially overwhelming target servers or exhausting local OS socket limits. The semaphore acts as a concurrency valve โ€” at most 50 requests can be active at the same time.

import asyncio
import aiohttp

async def fetch_url(session, semaphore, url):
    async with semaphore:   # Suspends here if 50 slots are all taken
        try:
            timeout = aiohttp.ClientTimeout(total=5)
            async with session.get(url, timeout=timeout) as resp:
                body = await resp.read()
                return {"url": url, "status": resp.status, "bytes": len(body)}
        except Exception as exc:
            return {"url": url, "error": str(exc)}

async def fetch_all(urls, max_concurrent=50):
    semaphore = asyncio.Semaphore(max_concurrent)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, semaphore, url) for url in urls]
        return await asyncio.gather(*tasks)

if __name__ == "__main__":
    urls = [f"https://httpbin.org/delay/{i % 2}" for i in range(30)]
    results = asyncio.run(fetch_all(urls))
    for r in results:
        print(r)

asyncio.Semaphore(50) is an async-native counter. async with semaphore decrements it on entry and increments it on exit, suspending the coroutine (not the thread) if the counter is already at zero. Other coroutines continue running during the suspension โ€” the event loop is never blocked.

Example 2: Async Producer/Consumer Task Queue

This example builds a producer/consumer pipeline where a producer enqueues work items and three worker coroutines consume and process them concurrently. This is the async equivalent of the thread-safe queue.Queue + worker thread pattern, but with zero thread overhead.

import asyncio

async def producer(queue, items):
    for item in items:
        await queue.put(item)
        print(f"Produced: {item}")
    # Send a None sentinel to each worker to signal shutdown
    for _ in range(3):
        await queue.put(None)

async def worker(worker_id, queue):
    while True:
        item = await queue.get()
        if item is None:
            print(f"Worker {worker_id} shutting down")
            queue.task_done()
            break
        await asyncio.sleep(0.05)   # Simulate async I/O work
        print(f"Worker {worker_id} processed: {item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)   # Bounded queue โ€” backpressure on producer
    items = [f"task-{i}" for i in range(15)]

    producer_task = asyncio.create_task(producer(queue, items))
    worker_tasks = [asyncio.create_task(worker(i, queue)) for i in range(3)]

    await asyncio.gather(producer_task, *worker_tasks)

asyncio.run(main())

asyncio.Queue(maxsize=10) creates bounded backpressure: if the queue is full, queue.put() suspends the producer until a worker drains a slot. The sentinel None pattern gracefully signals each worker to stop after all items are consumed. queue.task_done() lets you use queue.join() elsewhere to wait for all work to complete.

Example 3: Async Context Manager for Database Connection Pooling

This example demonstrates how to write a custom async context manager using __aenter__ and __aexit__. Connection pooling is one of the most important production patterns for async database access โ€” reusing a fixed pool of pre-opened connections avoids the latency and resource cost of opening a new connection for every query.

import asyncio
import asyncpg

class AsyncDBPool:
    def __init__(self, dsn, min_size=2, max_size=10):
        self.dsn = dsn
        self.min_size = min_size
        self.max_size = max_size
        self._pool = None

    async def __aenter__(self):
        self._pool = await asyncpg.create_pool(
            self.dsn,
            min_size=self.min_size,
            max_size=self.max_size,
        )
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self._pool.close()

    async def fetch_user(self, user_id):
        async with self._pool.acquire() as conn:
            return await conn.fetchrow(
                "SELECT id, name, email FROM users WHERE id = $1",
                user_id
            )

async def main():
    dsn = "postgresql://user:pass@localhost/mydb"
    async with AsyncDBPool(dsn, max_size=20) as db:
        tasks = [db.fetch_user(i) for i in range(1, 101)]
        users = await asyncio.gather(*tasks)
        print(f"Fetched {len([u for u in users if u])} users")

asyncio.run(main())

asyncpg.create_pool maintains a pool of pre-opened database connections. pool.acquire() checks out a connection (suspending if all are in use) and returns it when the async with block exits. Running 100 concurrent queries against a pool of 20 connections means at most 20 queries are active at any moment โ€” the other 80 wait inside asyncio's scheduler, not the OS, consuming negligible resources.


๐Ÿ› ๏ธ asyncio Ecosystem: The Libraries That Make It Production-Ready

The standard library asyncio module provides the scheduler, synchronization primitives (Queue, Semaphore, Lock, Event, Condition), and high-level utilities (gather, wait_for, create_task, shield). In production, these companion libraries complete the stack:

asyncio (stdlib) โ€” The foundation. Included with Python 3.4+; API stabilized in 3.7; asyncio.run() added in 3.7. No installation required. Start here for all scheduling primitives and synchronization needs.

aiohttp โ€” Full-featured async HTTP client and server. Supports connection pooling, cookie jars, streaming responses, multipart uploads, WebSockets, and middleware. Install: pip install aiohttp. Minimal usage:

async with aiohttp.ClientSession() as session:
    async with session.get("https://api.github.com/repos/python/cpython") as r:
        data = await r.json()
        print(data["stargazers_count"])

httpx โ€” Modern async HTTP client with an API nearly identical to requests, making it a frictionless upgrade for existing codebases. Supports HTTP/2, certificate pinning, and sync/async modes in one package. Install: pip install httpx. Usage:

async with httpx.AsyncClient() as client:
    resp = await client.get("https://api.github.com/repos/python/cpython")
    data = resp.json()
    print(data["stargazers_count"])

uvloop โ€” A Cython-based drop-in event loop built on libuv. Delivers 2โ€“4ร— throughput improvements for network-heavy services with a single configuration line. Used in production at companies running high-throughput Python API services. Install: pip install uvloop. Usage:

import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# All subsequent asyncio.run() calls use uvloop automatically

For a production FastAPI service, the recommended async stack is: uvloop (event loop) + httpx or aiohttp (outbound HTTP) + asyncpg + SQLAlchemy async (database) + aiofiles (file I/O) + aiocache or Redis with an async driver (caching). Every layer is non-blocking end to end โ€” the event loop never stalls on I/O.


๐Ÿ“š Lessons From the Trenches: Common asyncio Mistakes

Blocking the event loop silently. This is the most widespread mistake, and it is invisible without monitoring. Calling time.sleep(1), requests.get(url), open(path).read(), or any synchronous blocking function inside an async def freezes the entire program for the duration of that call. Every other in-flight coroutine stalls. There is no error. Throughput craters. The fix: time.sleep โ†’ await asyncio.sleep; requests โ†’ aiohttp or httpx; open โ†’ aiofiles; CPU-heavy functions โ†’ await loop.run_in_executor(None, blocking_func, arg). Enable PYTHONASYNCIODEBUG=1 in staging to get warnings when a single event loop iteration takes longer than 100ms.

Forgetting await before a coroutine call. Writing result = fetch(url) instead of result = await fetch(url) creates a coroutine object but never executes its body. Python will emit RuntimeWarning: coroutine 'fetch' was never awaited in development. In production, the bug often manifests as result being None or a coroutine object, silently corrupting downstream logic. Enable PYTHONASYNCIODEBUG=1 and asyncio.get_event_loop().set_debug(True) to catch these at development time.

Mixing synchronous and async code without a bridge. If you have a synchronous callback, click handler, or third-party hook that needs to trigger async work, you cannot simply await inside it โ€” there is no running event loop in synchronous context. Use asyncio.run(coro) to create a temporary loop if calling from a script entrypoint, or loop.run_coroutine_threadsafe(coro, loop) to submit work to an already-running loop from a different thread.

Creating tasks and ignoring them. asyncio.create_task(coro) schedules a coroutine but does not wait for it. If the parent coroutine returns before the task finishes โ€” or if an exception is raised โ€” the task may be silently cancelled or its exception may go unhandled. Always store a reference and await it, or use asyncio.gather() to collect all tasks before the parent exits. The asyncio.TaskGroup API (Python 3.11+) makes this safer by automatically waiting for all child tasks and propagating exceptions.

Assuming await boundaries are safe for shared mutable state. Between two await calls, your coroutine runs atomically โ€” no other coroutine can interfere. But at every await, another coroutine may run and modify shared state. Reading a value, yielding, then using the value again without re-reading it is a logical race condition. Use asyncio.Lock for critical sections, or design shared state as immutable snapshots to avoid the issue entirely.


๐Ÿ“Œ Summary and Key Takeaways

TLDR: asyncio lets a single Python thread handle thousands of concurrent I/O operations by cooperatively yielding control between coroutines at every await. It is not parallelism โ€” it is CPU-efficient waiting. Use it for network and database workloads; reach for multiprocessing when the bottleneck is CPU, not I/O.

Five ideas to walk away with:

  1. A coroutine is a suspended computation, not a function call. async def f() defines a coroutine function. Calling it returns a coroutine object with no code executed. await f() or asyncio.run(f()) actually drives it to completion.

  2. The event loop is a cooperative single-threaded scheduler. It runs one coroutine at a time and can only switch at await points. Any blocking call inside async def freezes every other coroutine in the program.

  3. asyncio.gather() is your primary concurrency tool. It schedules multiple coroutines and lets them interleave at await points, collecting results in input order. Use asyncio.create_task() for fire-and-schedule patterns.

  4. Match the tool to the bottleneck. asyncio for I/O-bound work. multiprocessing for CPU-bound work. threading for blocking C libraries. Synchronous code for everything else.

  5. Use async-native libraries end to end. One blocking call โ€” requests.get, time.sleep, psycopg2.execute โ€” undoes all the concurrency benefits. Every library in your async stack must either be async-native or wrapped in run_in_executor.


๐Ÿ“ Practice Quiz

Test your understanding of Python asyncio, the event loop, and concurrency model selection.

  1. You write coro = my_async_function() without await. What does coro contain, and has any code inside my_async_function run yet?
Show answer Correct Answer: coro is a coroutine object โ€” a suspended computation wrapped around the function's bytecode frame. No code inside my_async_function has executed. The function body only runs when the coroutine is driven by await coro inside another async def, or by asyncio.run(coro) at the top level. Python will emit RuntimeWarning: coroutine 'my_async_function' was never awaited when the object is garbage collected.
  1. What is the behavioral difference between result = await fetch(url) and task = asyncio.create_task(fetch(url))?
Show answer Correct Answer: await fetch(url) suspends the current coroutine and waits for fetch to complete fully before the current coroutine can continue โ€” sequential execution. asyncio.create_task(fetch(url)) schedules fetch on the event loop immediately and returns a Task without blocking the caller. Both the calling coroutine and the task can make progress concurrently at subsequent await points. To get the task result, you later await task or include it in asyncio.gather().
  1. A colleague wraps a blocking HTTP call in async def fetch(url): return requests.get(url). They claim this makes it non-blocking because it is now an async def. Are they correct? What would you say?
Show answer Correct Answer: They are incorrect. Adding async def around a blocking call does not make it non-blocking. requests.get() is a synchronous function that blocks the OS thread. When an asyncio coroutine calls it, the event loop thread blocks for the entire duration of the HTTP request. All other in-flight coroutines stall. There is no await inside the function, so the event loop never gets a turn. The fix is to replace requests with an async-native library like aiohttp (async with session.get(url)) or to offload the blocking call with await loop.run_in_executor(None, requests.get, url).
  1. asyncio.gather(task_a, task_b, task_c) is running. task_b raises a ValueError. What happens to task_a and task_c by default, and how do you change the behavior to collect all results including exceptions?
Show answer Correct Answer: By default, when any task raises an exception, asyncio.gather immediately cancels the remaining tasks and propagates the exception to the caller. task_a and task_c receive a CancelledError at their current await point. To change this, pass return_exceptions=True: all tasks run to completion regardless of individual failures, and exceptions are returned as result values in the output list alongside normal return values. The caller can then inspect each result with isinstance(r, Exception).
  1. Your service performs 200 independent PostgreSQL queries per incoming request and is clearly I/O-bound. A teammate proposes rewriting it with multiprocessing.Pool to scale across all 16 CPU cores. Is this a good idea? What would you recommend instead?
Show answer Correct Answer: For I/O-bound workloads like database queries, multiprocessing adds significant overhead โ€” process spawn time (100โ€“500ms), memory duplication (~50โ€“100 MB per process), and pickle serialization for inter-process communication โ€” without addressing the actual bottleneck, which is network and disk latency. The right tool is asyncio with asyncpg and a connection pool. You can run all 200 queries concurrently on a single event loop thread with asyncio.gather(), limited by pool size via asyncpg.create_pool(max_size=20). This achieves far greater effective throughput with a fraction of the resource cost.
  1. Open-ended challenge: You need to build an async web crawler that fetches up to 50,000 pages while: (a) respecting a maximum of 30 simultaneous requests, (b) retrying each failed URL up to 3 times with exponential backoff, (c) writing results to disk without blocking the event loop, and (d) stopping gracefully when a cancellation signal is received. What asyncio primitives would you use for each constraint, and what would your coroutine structure look like?
Show answer There is no single correct answer. A strong design uses: asyncio.Semaphore(30) for concurrency bounding (a); a retry loop with await asyncio.sleep(2 ** attempt) and a max_retries=3 counter inside the fetch coroutine, catching aiohttp.ClientError and asyncio.TimeoutError (b); aiofiles.open() for non-blocking file writes (c); asyncio.Event as a cancellation flag polled at the top of the worker loop, plus asyncio.TaskGroup or asyncio.gather wrapping a try/except asyncio.CancelledError block for graceful shutdown (d). The overall structure: a producer coroutine enqueues seed URLs into an asyncio.Queue, N worker coroutines drain the queue and enqueue newly discovered URLs, a writer coroutine drains a result queue to aiofiles. A visited-URL set prevents re-crawling.

Abstract Algorithms

Written by

Abstract Algorithms

@abstractalgorithms