Show HN: Pyleak – Detect asyncio issues causing AI agent latency

4 months ago 68

PyPI PyPI Downloads

Detect leaked asyncio tasks, threads, and event loop blocking in Python. Inspired by Go's goleak.

import asyncio from pyleak import no_task_leaks, no_thread_leaks, no_event_loop_blocking # Detect leaked asyncio tasks async def main(): async with no_task_leaks(): asyncio.create_task(asyncio.sleep(10)) # This will be detected await asyncio.sleep(0.1) # Detect leaked threads def sync_main(): with no_thread_leaks(): threading.Thread(target=lambda: time.sleep(10)).start() # This will be detected # Detect event loop blocking async def async_main(): with no_event_loop_blocking(): time.sleep(0.5) # This will be detected

All detectors can be used as context managers:

# AsyncIO tasks (async context) async with no_task_leaks(): # Your async code here pass # Threads (sync context) with no_thread_leaks(): # Your threaded code here pass # Event loop blocking (async context only) async def main(): with no_event_loop_blocking(): # Your potentially blocking code here pass

All detectors can also be used as decorators:

@no_task_leaks() async def my_async_function(): # Any leaked tasks will be detected pass @no_thread_leaks() def my_threaded_function(): # Any leaked threads will be detected pass @no_event_loop_blocking() async def my_potentially_blocking_function(): # Any event loop blocking will be detected pass

From leaked asyncio tasks

When using no_task_leaks, you get detailed stack trace information showing exactly where leaked tasks are executing and where they were created.

import asyncio from pyleak import TaskLeakError, no_task_leaks async def leaky_function(): async def background_task(): print("background task started") await asyncio.sleep(10) print("creating a long running task") asyncio.create_task(background_task()) async def main(): try: async with no_task_leaks(action="raise"): await leaky_function() except TaskLeakError as e: print(e) if __name__ == "__main__": asyncio.run(main())

Output:

creating a long running task background task started Detected 1 leaked asyncio tasks Leaked Task: Task-2 ID: 4345977088 State: TaskState.RUNNING Current Stack: File "/tmp/example.py", line 9, in background_task await asyncio.sleep(10)

Include creation stack trace

You can also include the creation stack trace by passing enable_creation_tracking=True to no_task_leaks.

async def main(): try: async with no_task_leaks(action="raise", enable_creation_tracking=True): await leaky_function() except TaskLeakError as e: print(e)

Output:

creating a long running task background task started Detected 1 leaked asyncio tasks Leaked Task: Task-2 ID: 4392245504 State: TaskState.RUNNING Current Stack: File "/tmp/example.py", line 9, in background_task await asyncio.sleep(10) Creation Stack: File "/tmp/example.py", line 24, in <module> asyncio.run(main()) File "/opt/homebrew/.../asyncio/runners.py", line 194, in run return runner.run(main) File "/opt/homebrew/.../asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) File "/opt/homebrew/.../asyncio/base_events.py", line 671, in run_until_complete self.run_forever() File "/opt/homebrew/.../asyncio/base_events.py", line 638, in run_forever self._run_once() File "/opt/homebrew/.../asyncio/base_events.py", line 1971, in _run_once handle._run() File "/opt/homebrew/.../asyncio/events.py", line 84, in _run self._context.run(self._callback, *self._args) File "/tmp/example.py", line 18, in main await leaky_function() File "/tmp/example.py", line 12, in leaky_function asyncio.create_task(background_task())

TaskLeakError has a leaked_tasks attribute that contains a list of LeakedTask objects including the stack trace details.

Note: enable_creation_tracking monkey patches asyncio.create_task to include the creation stack trace. It is not recommended to be used in production to avoid unnecessary side effects.

When using no_event_loop_blocking, you get detailed stack trace information showing exactly where the event loop is blocked and where the blocking code is executing.

import asyncio import time from pyleak import EventLoopBlockError, no_event_loop_blocking async def some_function_with_blocking_code(): print("starting") time.sleep(1) print("done") async def main(): try: async with no_event_loop_blocking(action="raise"): await some_function_with_blocking_code() except EventLoopBlockError as e: print(e) if __name__ == "__main__": asyncio.run(main())

Output:

starting done Detected 1 event loop blocks Event Loop Block: block-1 Duration: 0.605s (threshold: 0.200s) Timestamp: 1749051796.302 Blocking Stack: File "/private/tmp/example.py", line 22, in <module> asyncio.run(main()) File "/opt/homebrew/.../asyncio/runners.py", line 194, in run return runner.run(main) File "/opt/homebrew/.../asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) File "/opt/homebrew/.../asyncio/base_events.py", line 671, in run_until_complete self.run_forever() File "/opt/homebrew/.../asyncio/base_events.py", line 638, in run_forever self._run_once() File "/opt/homebrew/.../asyncio/base_events.py", line 1971, in _run_once handle._run() File "/opt/homebrew/.../asyncio/events.py", line 84, in _run self._context.run(self._callback, *self._args) File "/private/tmp/example.py", line 16, in main await some_function_with_blocking_code() File "/private/tmp/example.py", line 9, in some_function_with_blocking_code time.sleep(1)

Control what happens when leaks/blocking are detected:

Action AsyncIO Tasks Threads Event Loop Blocking
"warn" (default) ✅ Issues ResourceWarning ✅ Issues ResourceWarning ✅ Issues ResourceWarning
"log" ✅ Writes to logger ✅ Writes to logger ✅ Writes to logger
"cancel" ✅ Cancels leaked tasks ❌ Warns (can't force-stop) ❌ Warns (can't cancel)
"raise" ✅ Raises TaskLeakError ✅ Raises ThreadLeakError ✅ Raises EventLoopBlockError
# Examples async with no_task_leaks(action="cancel"): # Cancels leaked tasks pass with no_thread_leaks(action="raise"): # Raises exception on thread leaks pass with no_event_loop_blocking(action="log"): # Logs blocking events pass

Filter detection by resource names (tasks and threads only):

import re # Exact match async with no_task_leaks(name_filter="background-worker"): pass with no_thread_leaks(name_filter="worker-thread"): pass # Regex pattern async with no_task_leaks(name_filter=re.compile(r"worker-\d+")): pass with no_thread_leaks(name_filter=re.compile(r"background-.*")): pass

Note: Event loop blocking detection doesn't support name filtering.

no_task_leaks( action="warn", # Action to take on detection name_filter=None, # Filter by task name logger=None # Custom logger )
no_thread_leaks( action="warn", # Action to take on detection name_filter=None, # Filter by thread name logger=None, # Custom logger exclude_daemon=True, # Exclude daemon threads )
no_event_loop_blocking( action="warn", # Action to take on detection logger=None, # Custom logger threshold=0.1, # Minimum blocking time to report (seconds) check_interval=0.01 # How often to check (seconds) )

Perfect for catching issues in tests:

import pytest from pyleak import no_task_leaks, no_thread_leaks, no_event_loop_blocking @pytest.mark.asyncio async def test_no_leaked_tasks(): async with no_task_leaks(action="raise"): await my_async_function() def test_no_leaked_threads(): with no_thread_leaks(action="raise"): my_threaded_function() @pytest.mark.asyncio async def test_no_event_loop_blocking(): with no_event_loop_blocking(action="raise", threshold=0.1): await my_potentially_blocking_function()

Detecting Synchronous HTTP Calls in Async Code

import httpx from starlette.testclient import TestClient async def test_sync_vs_async_http(): # This will detect blocking with no_event_loop_blocking(action="warn"): response = TestClient(app).get("/endpoint") # Synchronous! # This will not detect blocking with no_event_loop_blocking(action="warn"): async with httpx.AsyncClient() as client: response = await client.get("/endpoint") # Asynchronous!

Ensuring Proper Resource Cleanup

async def test_background_task_cleanup(): async with no_task_leaks(action="raise"): # This would fail the test asyncio.create_task(long_running_task()) # This would pass task = asyncio.create_task(long_running_task()) task.cancel() try: await task except asyncio.CancelledError: pass

Debugging complex task leaks

import asyncio import random import re from pyleak import TaskLeakError, no_task_leaks async def debug_task_leaks(): """Example showing how to debug complex task leaks.""" async def worker(worker_id: int, sleep_time: int): print(f"Worker {worker_id} starting") await asyncio.sleep(sleep_time) # Simulate work print(f"Worker {worker_id} done") async def spawn_workers(): for i in range(3): asyncio.create_task(worker(i, random.randint(1, 10)), name=f"worker-{i}") try: async with no_task_leaks( action="raise", enable_creation_tracking=True, name_filter=re.compile(r"worker-\d+"), # Only catch worker tasks ): await spawn_workers() await asyncio.sleep(0.1) # Let workers start except TaskLeakError as e: print(f"\nFound {e.task_count} leaked worker tasks:") for task_info in e.leaked_tasks: print(f"\n--- {task_info.name} ---") print("Currently executing:") print(task_info.format_current_stack()) print("Created at:") print(task_info.format_creation_stack()) # Cancel the leaked task if task_info.task_ref: task_info.task_ref.cancel() if __name__ == "__main__": asyncio.run(debug_task_leaks())
Toggle to see the output
Worker 0 starting Worker 1 starting Worker 2 starting Found 3 leaked worker tasks: --- worker-2 --- Currently executing: File "/private/tmp/example.py", line 33, in worker await asyncio.sleep(sleep_time) # Simulate work Created at: File "/private/tmp/example.py", line 65, in <module> asyncio.run(debug_task_leaks()) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run return runner.run(main) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete self.run_forever() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever self._run_once() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once handle._run() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run self._context.run(self._callback, *self._args) File "/private/tmp/example.py", line 47, in debug_task_leaks await spawn_workers() File "/private/tmp/example.py", line 39, in spawn_workers asyncio.create_task(worker(i, random.randint(1, 10)), name=f"worker-{i}") --- worker-0 --- Currently executing: File "/private/tmp/example.py", line 33, in worker await asyncio.sleep(sleep_time) # Simulate work Created at: File "/private/tmp/example.py", line 65, in <module> asyncio.run(debug_task_leaks()) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run return runner.run(main) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete self.run_forever() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever self._run_once() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once handle._run() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run self._context.run(self._callback, *self._args) File "/private/tmp/example.py", line 47, in debug_task_leaks await spawn_workers() File "/private/tmp/example.py", line 39, in spawn_workers asyncio.create_task(worker(i, random.randint(1, 10)), name=f"worker-{i}") --- worker-1 --- Currently executing: File "/private/tmp/example.py", line 33, in worker await asyncio.sleep(sleep_time) # Simulate work Created at: File "/private/tmp/example.py", line 65, in <module> asyncio.run(debug_task_leaks()) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run return runner.run(main) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete self.run_forever() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever self._run_once() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once handle._run() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run self._context.run(self._callback, *self._args) File "/private/tmp/example.py", line 47, in debug_task_leaks await spawn_workers() File "/private/tmp/example.py", line 39, in spawn_workers asyncio.create_task(worker(i, random.randint(1, 10)), name=f"worker-{i}")

Debugging event loop blocking

import asyncio from pyleak import EventLoopBlockError, no_event_loop_blocking async def process_user_data(user_id: int): """Simulates cpu intensive work - contains blocking operations!""" print(f"Processing user {user_id}...") return sum(i * i for i in range(100_000_000)) async def main(): try: async with no_event_loop_blocking(action="raise", threshold=0.5): user1 = await process_user_data(1) user2 = await process_user_data(2) except EventLoopBlockError as e: print(f"\n🚨 Found {e.block_count} blocking events:") print(e) if __name__ == "__main__": asyncio.run(main())
Toggle to see the output
Processing user 1... Processing user 2... 🚨 Found 5 blocking events: Detected 5 event loop blocks Event Loop Block: block-1 Duration: 1.507s (threshold: 0.500s) Timestamp: 1749052720.456 Blocking Stack: File "/private/tmp/example.py", line 36, in <module> asyncio.run(main()) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run return runner.run(main) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete self.run_forever() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever self._run_once() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once handle._run() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run self._context.run(self._callback, *self._args) File "/private/tmp/example.py", line 27, in main user1 = await process_user_data(1) File "/private/tmp/example.py", line 21, in process_user_data return sum(i * i for i in range(100_000_000)) File "/private/tmp/example.py", line 21, in <genexpr> return sum(i * i for i in range(100_000_000)) Event Loop Block: block-2 Duration: 1.516s (threshold: 0.500s) Timestamp: 1749052722.054 Blocking Stack: File "/private/tmp/example.py", line 36, in <module> asyncio.run(main()) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run return runner.run(main) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete self.run_forever() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever self._run_once() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once handle._run() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run self._context.run(self._callback, *self._args) File "/private/tmp/example.py", line 27, in main user1 = await process_user_data(1) File "/private/tmp/example.py", line 21, in process_user_data return sum(i * i for i in range(100_000_000)) File "/private/tmp/example.py", line 21, in <genexpr> return sum(i * i for i in range(100_000_000)) Event Loop Block: block-3 Duration: 1.518s (threshold: 0.500s) Timestamp: 1749052723.648 Blocking Stack: File "/private/tmp/example.py", line 36, in <module> asyncio.run(main()) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run return runner.run(main) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete self.run_forever() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever self._run_once() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once handle._run() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run self._context.run(self._callback, *self._args) File "/private/tmp/example.py", line 28, in main user2 = await process_user_data(2) File "/private/tmp/example.py", line 21, in process_user_data return sum(i * i for i in range(100_000_000)) File "/private/tmp/example.py", line 21, in <genexpr> return sum(i * i for i in range(100_000_000)) Event Loop Block: block-4 Duration: 1.517s (threshold: 0.500s) Timestamp: 1749052725.247 Blocking Stack: File "/private/tmp/example.py", line 36, in <module> asyncio.run(main()) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run return runner.run(main) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete self.run_forever() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever self._run_once() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once handle._run() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run self._context.run(self._callback, *self._args) File "/private/tmp/example.py", line 28, in main user2 = await process_user_data(2) File "/private/tmp/example.py", line 21, in process_user_data return sum(i * i for i in range(100_000_000)) File "/private/tmp/example.py", line 21, in <genexpr> return sum(i * i for i in range(100_000_000)) Event Loop Block: block-5 Duration: 1.513s (threshold: 0.500s) Timestamp: 1749052726.839 Blocking Stack: File "/private/tmp/example.py", line 36, in <module> asyncio.run(main()) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run return runner.run(main) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run return self._loop.run_until_complete(task) File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete self.run_forever() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever self._run_once() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once handle._run() File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run self._context.run(self._callback, *self._args) File "/private/tmp/example.py", line 28, in main user2 = await process_user_data(2) File "/private/tmp/example.py", line 21, in process_user_data return sum(i * i for i in range(100_000_000)) File "/private/tmp/example.py", line 21, in <genexpr> return sum(i * i for i in range(100_000_000))

The pytest plugin automatically wraps tests with pyleak detectors based on pytest markers.

Add the plugin to your pytest configuration

pyproject.toml

[tool.pytest.ini_options] markers = [ "no_leaks: detect asyncio task leaks, thread leaks, and event loop blocking" ]

pytest.ini

[tool:pytest] markers = no_leaks: detect asyncio task leaks, thread leaks, and event loop blocking

You can also add it to the conftest.py file.

# conftest.py import pytest def pytest_configure(config): config.addinivalue_line( "markers", "no_leaks: detect asyncio task leaks, thread leaks, and event loop blocking" )
@pytest.mark.no_leaks @pytest.mark.asyncio async def test_no_task_leaks(): asyncio.create_task(asyncio.sleep(10))

By default, all detectors are enabled. You can selectively enable or disable detectors using the no_leaks marker. For example, to only detect task leaks and event loop blocking, you can use the following:

@pytest.mark.no_leaks(tasks=True, blocking=True, threads=False) @pytest.mark.asyncio async def test_async_no_leaks(): asyncio.create_task(asyncio.sleep(10)) # This will be detected time.sleep(0.5) # This will be detected threading.Thread(target=lambda: time.sleep(10)).start() # This will not be detected

no_leaks marker configuration

Name Default Description
tasks True Whether to detect task leaks
task_action raise Action to take when a task leak is detected
task_name_filter None Filter to apply to task names
enable_task_creation_tracking False Whether to enable task creation tracking
threads True Whether to detect thread leaks
thread_action raise Action to take when a thread leak is detected
thread_name_filter None Filter to apply to thread names
exclude_daemon_threads True Whether to exclude daemon threads
blocking True Whether to detect event loop blocking
blocking_action raise Action to take when a blocking event loop is detected
blocking_threshold 0.1 Threshold for blocking event loop detection
blocking_check_interval 0.01 Interval for checking for blocking event loop

AsyncIO Tasks: Leaked tasks can cause memory leaks, prevent graceful shutdown, and make debugging difficult.

Threads: Leaked threads consume system resources and can prevent proper application termination.

Event Loop Blocking: Synchronous operations in async code destroy performance and can cause timeouts.

pyleak helps you catch these issues during development and testing, optionally using a pytest plugin, before they reach production.

More examples can be found in the test files:


Disclaimer: Most of the code and tests are written by Claude.

Read Entire Article