🎁 New User? Get 20% off your first purchase with code NEWUSER20 Β· ⚑ Instant download Β· πŸ”’ Secure checkout Register Now β†’
Menu

Categories

asyncio in Python 3.13: TaskGroups, ExceptionGroups, and Real-World Patterns

asyncio in Python 3.13: TaskGroups, ExceptionGroups, and Real-World Patterns

Quick summary: The structured-concurrency revolution in Python asyncio is complete in 3.13. TaskGroup replaces the error-prone asyncio.gather() pattern; ExceptionGroup and except* handle the multi-error reality of concurrent code; the cancellation model is finally clean enough that "task cancellation safety" is achievable rather than aspirational. This guide walks through the modern patterns: when to use TaskGroup vs gather, how to structure long-running services, the cancellation gotchas that still bite, the testing patterns that catch async bugs early, and the migration path for legacy asyncio codebases.

asyncio Python 3.13 TaskGroups ExceptionGroups patterns 2026

Why Structured Concurrency Matters

The original asyncio API (Python 3.4-3.10) suffered from a problem common to early concurrency libraries: tasks could be created freely and never tracked. asyncio.create_task() returns a task; if you forget to await it or save a reference, exceptions vanish silently and tasks leak. The result: production asyncio code became "callback hell" with extra steps, where reasoning about task lifecycles required studying the entire program.

Structured concurrency, popularized by Trio and added to asyncio in Python 3.11 with TaskGroup, fixes this. The contract: every task must belong to a "nursery" (TaskGroup); tasks cannot outlive their nursery; if any task in a nursery raises, the others are cancelled and the exception(s) propagate up. This is the same kind of guarantee you get from "every variable has a defined scope" in normal code β€” and it transforms asyncio from a footgun into a usable tool.

TaskGroup: The New Default

The old pattern (asyncio.gather)

import asyncio

async def fetch(url):
    # ... do HTTP fetch ...
    pass

async def main():
    results = await asyncio.gather(
        fetch("https://api.example.com/users"),
        fetch("https://api.example.com/posts"),
        fetch("https://api.example.com/comments"),
    )

Problems: if one of the three fetches raises, the others continue running in the background (they may complete or may not), and you only see one exception. If main() is cancelled, the running tasks may or may not be cancelled depending on how gather is configured.

The new pattern (TaskGroup)

import asyncio

async def main():
    async with asyncio.TaskGroup() as tg:
        users = tg.create_task(fetch("https://api.example.com/users"))
        posts = tg.create_task(fetch("https://api.example.com/posts"))
        comments = tg.create_task(fetch("https://api.example.com/comments"))
    # Block here until ALL tasks complete or one fails
    # If any raised, all others are cancelled and we get an ExceptionGroup
    print(users.result(), posts.result(), comments.result())

What changed:

  • The async with block defines the lifetime of all tasks. They cannot escape it.
  • If any task raises, the others are immediately cancelled.
  • The block exits with an ExceptionGroup containing all unhandled exceptions.
  • If the outer code is cancelled, the TaskGroup propagates cancellation to all child tasks.

For 90% of "I want to do N things concurrently and then collect results" use cases, TaskGroup is the right tool in 2026. asyncio.gather still has a place for specific cases (when you genuinely want failed tasks to not affect other tasks via return_exceptions=True), but TaskGroup is the new default.

ExceptionGroup and except*

Concurrent operations can fail in multiple ways simultaneously. The pre-3.11 pattern was to bury this complexity (gather returned only the first exception). The 3.11+ pattern surfaces it via ExceptionGroup and the new except* syntax:

try:
    async with asyncio.TaskGroup() as tg:
        tg.create_task(fetch_users())
        tg.create_task(fetch_posts())
        tg.create_task(fetch_comments())
except* HTTPError as eg:
    # Handle all HTTP errors as a group
    log_http_failures(eg.exceptions)
except* TimeoutError as eg:
    # Handle all timeouts
    log_timeouts(eg.exceptions)
except* Exception as eg:
    # Catch-all for anything else
    raise

except* matches each exception in the group separately and lets you handle different categories distinctly. This is clean β€” pre-3.11, you had to manually unwrap exception chains and check each one.

When to add ExceptionGroup awareness to your code

If your function uses TaskGroup internally, callers will see ExceptionGroup. Document this. The standard pattern: catch the group and re-raise the relevant single exception if appropriate, or let it propagate.

async def fetch_all() -> tuple[Users, Posts, Comments]:
    """Returns all three. Raises HTTPError if any fetch fails."""
    try:
        async with asyncio.TaskGroup() as tg:
            users_task = tg.create_task(fetch_users())
            posts_task = tg.create_task(fetch_posts())
            comments_task = tg.create_task(fetch_comments())
    except* HTTPError as eg:
        # Caller wants a regular HTTPError, not a group
        raise eg.exceptions[0] from None
    return users_task.result(), posts_task.result(), comments_task.result()

Cancellation: Now Actually Workable

asyncio cancellation was the trickiest part of the original API. A task could be cancelled at any await point, including in cleanup blocks, including in code that you couldn't reasonably make cancellation-safe. Python 3.11 added asyncio.shield() improvements and 3.13 finalized the cancellation model.

The rules in 2026

  1. Cancellation arrives at the next await point in the cancelled task.
  2. The task receives asyncio.CancelledError. You can catch it, but you should re-raise unless you have a specific reason not to.
  3. try/finally blocks run as expected. asyncio.shield() protects an awaitable from cancellation.
  4. If you swallow CancelledError without re-raising, your task continues running and the canceller assumes you handled it gracefully. This is sometimes correct, often a bug.

The cancellation-safe cleanup pattern

async def with_resource():
    resource = await acquire_resource()
    try:
        return await use_resource(resource)
    finally:
        # Cleanup runs even on cancellation
        await asyncio.shield(release_resource(resource))

The shield ensures that release_resource completes even if the calling task is being cancelled β€” important for cleanup that must finish (releasing locks, returning database connections to a pool, flushing buffers).

The cancellation-checking pattern

async def long_loop():
    for i in range(1_000_000):
        await asyncio.sleep(0)  # Yield, allowing cancellation
        do_iteration(i)

If your tight loop never awaits, it can never be cancelled. Inserting await asyncio.sleep(0) periodically yields to the event loop and gives cancellation a chance to land.

Production Patterns

1. Long-running services as TaskGroups

The "main loop" of a service is naturally a TaskGroup that lives for the entire service lifetime:

async def run_service():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(http_server())
        tg.create_task(metrics_publisher())
        tg.create_task(health_check_responder())
        tg.create_task(background_processor())
    # Block forever; only exits when one task crashes
    # When that happens, all others get cancelled cleanly

Pair with proper signal handling so SIGTERM cancels the outer task and the TaskGroup unwinds cleanly:

import signal

def main():
    loop = asyncio.new_event_loop()
    main_task = loop.create_task(run_service())
    loop.add_signal_handler(signal.SIGTERM, main_task.cancel)
    loop.add_signal_handler(signal.SIGINT, main_task.cancel)
    try:
        loop.run_until_complete(main_task)
    except asyncio.CancelledError:
        pass

This is the modern systemd-style "shut down cleanly on SIGTERM" pattern that integrates correctly with the systemd unit files we covered in the systemd article.

2. Bounded concurrency with semaphores

"Make 1000 HTTP requests, but no more than 50 concurrently" is a common pattern. Combine TaskGroup with a Semaphore:

async def fetch_many(urls):
    sem = asyncio.Semaphore(50)
    
    async def fetch_with_limit(url):
        async with sem:
            return await fetch(url)
    
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_with_limit(u)) for u in urls]
    
    return [t.result() for t in tasks]

The semaphore caps concurrency; the TaskGroup gives you cancellation safety.

3. Timeouts with asyncio.timeout

async def fetch_with_timeout():
    async with asyncio.timeout(5.0):
        return await slow_fetch()

Cleaner than the old asyncio.wait_for() β€” it cancels the inner code at the timeout and raises TimeoutError. Composes nicely with TaskGroup.

4. Producer-consumer with Queue

async def pipeline():
    queue = asyncio.Queue(maxsize=100)
    
    async def producer():
        for item in source():
            await queue.put(item)
        # Sentinel to signal end
        for _ in range(NUM_CONSUMERS):
            await queue.put(None)
    
    async def consumer():
        while True:
            item = await queue.get()
            if item is None:
                break
            await process(item)
    
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer())
        for _ in range(NUM_CONSUMERS):
            tg.create_task(consumer())

Backpressure is automatic via the queue's maxsize; consumers naturally block if the queue empties; cancellation propagates correctly via the TaskGroup.

Common Pitfalls in 2026

1. Mixing sync and async carelessly

Calling a blocking sync function from async code blocks the entire event loop. Use asyncio.to_thread() for blocking I/O or concurrent.futures.ProcessPoolExecutor for CPU-bound work:

# Wrong - blocks event loop
data = open("largefile").read()

# Right - blocking call in worker thread
data = await asyncio.to_thread(read_large_file)

# Right for CPU-bound work
with ProcessPoolExecutor() as executor:
    result = await asyncio.get_event_loop().run_in_executor(executor, cpu_heavy)

2. Forgetting to handle CancelledError correctly

# Wrong - swallows cancellation
try:
    await something()
except Exception:
    log_error()

# Right - re-raise CancelledError specifically
try:
    await something()
except asyncio.CancelledError:
    raise  # Don't swallow
except Exception:
    log_error()

In Python 3.8+, CancelledError no longer inherits from Exception (it inherits from BaseException), so naked except Exception doesn't catch it. But many codebases have except BaseException patterns that do β€” those need fixing.

3. Sharing state across tasks without synchronization

asyncio is single-threaded but tasks switch at await points. State that is read and written across awaits needs synchronization:

# Wrong - race condition
self.counter += 1  # Non-atomic if done in concurrent tasks

# Right - use a Lock
async with self.counter_lock:
    self.counter += 1

4. Creating tasks outside a TaskGroup

The old pattern asyncio.create_task(coro) creates a "fire and forget" task. If you don't keep a reference, it can be garbage collected before completing. If it raises, the exception goes to the unhandled-exception handler (often unnoticed). Use TaskGroup unless you genuinely need fire-and-forget.

5. Not testing async code under cancellation

Most async bugs surface only when cancellation hits at an inconvenient moment. Tests should explicitly cancel tasks and verify cleanup happens correctly:

async def test_cleanup_on_cancel():
    task = asyncio.create_task(my_function())
    await asyncio.sleep(0.1)  # Let it start
    task.cancel()
    with pytest.raises(asyncio.CancelledError):
        await task
    # Verify cleanup happened
    assert resource_was_released()

The Library Ecosystem in 2026

  • httpx β€” the modern async HTTP client. Use this instead of aiohttp for new code.
  • asyncpg β€” high-performance async PostgreSQL driver. Mature, fast, the standard.
  • aiokafka, aiopika β€” Kafka and RabbitMQ async clients. Production-ready.
  • fastapi β€” async web framework. Built on Starlette + Pydantic. Dominant in new Python web services.
  • aiosqlite, motor (MongoDB), aioredis β€” async drivers for the obvious databases.
  • anyio β€” abstraction over asyncio and trio. Useful if you write libraries that need to support both.

Frequently Asked Questions

Should I migrate gather() calls to TaskGroup?

For new code, yes. For existing code, only when you touch it for other reasons or when the gather pattern has caused bugs. Pure churn migrations are usually not worth the diff noise.

What about asyncio vs Trio?

Trio remains a beautifully-designed library and is the spiritual ancestor of structured concurrency. In 2026, asyncio has caught up enough that Trio's advantages are smaller. New projects almost always use asyncio for ecosystem reasons.

Is async/await faster than threads in Python?

For I/O-bound workloads with many concurrent operations, yes β€” by orders of magnitude. For CPU-bound workloads, neither helps; you need processes or a different language for the hot path.

How do I debug async code?

asyncio.run() with debug=True enables additional checks (slow callback warnings, unawaited coroutines, etc.). Modern IDEs (PyCharm, VS Code with Python extension) have async-aware debuggers. aiomonitor gives you a live view of running tasks.

What about Python's GIL changes?

Python 3.13 introduced an experimental "free-threaded" build (no GIL). This affects threading more than asyncio (which is already single-threaded by design). For the typical async workload, free-threaded Python doesn't change anything.

Can I use asyncio in production?

Absolutely. Major Python services at large companies (Uber, Microsoft, Netflix, many others) run on asyncio in production. The patterns in this article are what experienced production teams use.

One Real Production Service

A team we know runs a Python webhook ingestion service that processes roughly 50,000 incoming webhooks per minute (peak), each requiring a database write, an outbound HTTP call to a downstream service, and a queue publish. Originally written with threading (one thread per request), the service required 32 vCPU instances to handle peak load. After rewriting in asyncio with httpx + asyncpg + aiokafka in early 2025, the same load runs comfortably on 4 vCPU instances. The TaskGroup-based concurrency made the cancellation model trivial (graceful shutdown, deadline propagation through nested calls). Total rewrite effort: roughly 6 weeks of one engineer's time. Annual infrastructure savings: roughly $48,000. Operational simplicity: huge β€” fewer moving parts, simpler debugging, no thread-pool tuning. Net assessment: best engineering investment they made all year.

Further Reading from the Dargslan Library

The Bottom Line

Modern asyncio in Python 3.13 is genuinely good β€” TaskGroups make concurrent code reasonable to reason about, ExceptionGroups handle multi-failure cleanly, and the cancellation model finally makes sense. For new Python code, async/await is the default unless you have a reason not to use it. For legacy codebases on older asyncio patterns, gradual migration to TaskGroups pays off in reduced incident count and clearer code. The structured-concurrency revolution has reached Python; teams that learn the patterns early benefit the most.

Share this article:
Mikkel Sorensen
About the Author

Mikkel Sorensen

UX/UI Design, Java Development, User-Centered Application Design, Technical Documentation

Mikkel SΓΈrensen is a UX/UI-focused software developer with a strong background in Java-based application development.

He works at the intersection of user experience design and software engineering, creating applications that are both technically robust and user-centered. His experience includes interface design, inter...

UX Design UI Design Java Applications User Experience Engineering Accessibility Basics

Stay Updated

Subscribe to our newsletter for the latest tutorials, tips, and exclusive offers.