Show HN: Pyleak – Detect asyncio issues causing AI agent latency
4 months ago
68
Detect leaked asyncio tasks, threads, and event loop blocking in Python. Inspired by Go's goleak.
importasynciofrompyleakimportno_task_leaks, no_thread_leaks, no_event_loop_blocking# Detect leaked asyncio tasksasyncdefmain():
asyncwithno_task_leaks():
asyncio.create_task(asyncio.sleep(10)) # This will be detectedawaitasyncio.sleep(0.1)
# Detect leaked threads defsync_main():
withno_thread_leaks():
threading.Thread(target=lambda: time.sleep(10)).start() # This will be detected# Detect event loop blockingasyncdefasync_main():
withno_event_loop_blocking():
time.sleep(0.5) # This will be detected
@no_task_leaks()asyncdefmy_async_function():
# Any leaked tasks will be detectedpass@no_thread_leaks()defmy_threaded_function():
# Any leaked threads will be detected pass@no_event_loop_blocking()asyncdefmy_potentially_blocking_function():
# Any event loop blocking will be detectedpass
From leaked asyncio tasks
When using no_task_leaks, you get detailed stack trace information showing exactly where leaked tasks are executing and where they were created.
creating a long running task
background task started
Detected 1 leaked asyncio tasks
Leaked Task: Task-2
ID: 4345977088
State: TaskState.RUNNING
Current Stack:
File "/tmp/example.py", line 9, in background_task
await asyncio.sleep(10)
Include creation stack trace
You can also include the creation stack trace by passing enable_creation_tracking=True to no_task_leaks.
creating a long running task
background task started
Detected 1 leaked asyncio tasks
Leaked Task: Task-2
ID: 4392245504
State: TaskState.RUNNING
Current Stack:
File "/tmp/example.py", line 9, in background_task
await asyncio.sleep(10)
Creation Stack:
File "/tmp/example.py", line 24, in <module>
asyncio.run(main())
File "/opt/homebrew/.../asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/.../asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/.../asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/.../asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/.../asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/.../asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/tmp/example.py", line 18, in main
await leaky_function()
File "/tmp/example.py", line 12, in leaky_function
asyncio.create_task(background_task())
TaskLeakError has a leaked_tasks attribute that contains a list of LeakedTask objects including the stack trace details.
Note: enable_creation_tracking monkey patches asyncio.create_task to include the creation stack trace. It is not recommended to be used in production to avoid unnecessary side effects.
When using no_event_loop_blocking, you get detailed stack trace information showing exactly where the event loop is blocked and where the blocking code is executing.
starting
done
Detected 1 event loop blocks
Event Loop Block: block-1
Duration: 0.605s (threshold: 0.200s)
Timestamp: 1749051796.302
Blocking Stack:
File "/private/tmp/example.py", line 22, in <module>
asyncio.run(main())
File "/opt/homebrew/.../asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/.../asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/.../asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/.../asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/.../asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/.../asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 16, in main
await some_function_with_blocking_code()
File "/private/tmp/example.py", line 9, in some_function_with_blocking_code
time.sleep(1)
Control what happens when leaks/blocking are detected:
Note: Event loop blocking detection doesn't support name filtering.
no_task_leaks(
action="warn", # Action to take on detectionname_filter=None, # Filter by task namelogger=None# Custom logger
)
no_thread_leaks(
action="warn", # Action to take on detectionname_filter=None, # Filter by thread namelogger=None, # Custom loggerexclude_daemon=True, # Exclude daemon threads
)
no_event_loop_blocking(
action="warn", # Action to take on detectionlogger=None, # Custom loggerthreshold=0.1, # Minimum blocking time to report (seconds)check_interval=0.01# How often to check (seconds)
)
importhttpxfromstarlette.testclientimportTestClientasyncdeftest_sync_vs_async_http():
# This will detect blockingwithno_event_loop_blocking(action="warn"):
response=TestClient(app).get("/endpoint") # Synchronous!# This will not detect blocking withno_event_loop_blocking(action="warn"):
asyncwithhttpx.AsyncClient() asclient:
response=awaitclient.get("/endpoint") # Asynchronous!
Ensuring Proper Resource Cleanup
asyncdeftest_background_task_cleanup():
asyncwithno_task_leaks(action="raise"):
# This would fail the testasyncio.create_task(long_running_task())
# This would passtask=asyncio.create_task(long_running_task())
task.cancel()
try:
awaittaskexceptasyncio.CancelledError:
pass
Worker 0 starting
Worker 1 starting
Worker 2 starting
Found 3 leaked worker tasks:
--- worker-2 ---
Currently executing:
File "/private/tmp/example.py", line 33, in worker
await asyncio.sleep(sleep_time) # Simulate work
Created at:
File "/private/tmp/example.py", line 65, in <module>
asyncio.run(debug_task_leaks())
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 47, in debug_task_leaks
await spawn_workers()
File "/private/tmp/example.py", line 39, in spawn_workers
asyncio.create_task(worker(i, random.randint(1, 10)), name=f"worker-{i}")
--- worker-0 ---
Currently executing:
File "/private/tmp/example.py", line 33, in worker
await asyncio.sleep(sleep_time) # Simulate work
Created at:
File "/private/tmp/example.py", line 65, in <module>
asyncio.run(debug_task_leaks())
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 47, in debug_task_leaks
await spawn_workers()
File "/private/tmp/example.py", line 39, in spawn_workers
asyncio.create_task(worker(i, random.randint(1, 10)), name=f"worker-{i}")
--- worker-1 ---
Currently executing:
File "/private/tmp/example.py", line 33, in worker
await asyncio.sleep(sleep_time) # Simulate work
Created at:
File "/private/tmp/example.py", line 65, in <module>
asyncio.run(debug_task_leaks())
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 47, in debug_task_leaks
await spawn_workers()
File "/private/tmp/example.py", line 39, in spawn_workers
asyncio.create_task(worker(i, random.randint(1, 10)), name=f"worker-{i}")
Debugging event loop blocking
importasynciofrompyleakimportEventLoopBlockError, no_event_loop_blockingasyncdefprocess_user_data(user_id: int):
"""Simulates cpu intensive work - contains blocking operations!"""print(f"Processing user {user_id}...")
returnsum(i*iforiinrange(100_000_000))
asyncdefmain():
try:
asyncwithno_event_loop_blocking(action="raise", threshold=0.5):
user1=awaitprocess_user_data(1)
user2=awaitprocess_user_data(2)
exceptEventLoopBlockErrorase:
print(f"\n🚨 Found {e.block_count} blocking events:")
print(e)
if__name__=="__main__":
asyncio.run(main())
Toggle to see the output
Processing user 1...
Processing user 2...
🚨 Found 5 blocking events:
Detected 5 event loop blocks
Event Loop Block: block-1
Duration: 1.507s (threshold: 0.500s)
Timestamp: 1749052720.456
Blocking Stack:
File "/private/tmp/example.py", line 36, in <module>
asyncio.run(main())
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 27, in main
user1 = await process_user_data(1)
File "/private/tmp/example.py", line 21, in process_user_data
return sum(i * i for i in range(100_000_000))
File "/private/tmp/example.py", line 21, in <genexpr>
return sum(i * i for i in range(100_000_000))
Event Loop Block: block-2
Duration: 1.516s (threshold: 0.500s)
Timestamp: 1749052722.054
Blocking Stack:
File "/private/tmp/example.py", line 36, in <module>
asyncio.run(main())
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 27, in main
user1 = await process_user_data(1)
File "/private/tmp/example.py", line 21, in process_user_data
return sum(i * i for i in range(100_000_000))
File "/private/tmp/example.py", line 21, in <genexpr>
return sum(i * i for i in range(100_000_000))
Event Loop Block: block-3
Duration: 1.518s (threshold: 0.500s)
Timestamp: 1749052723.648
Blocking Stack:
File "/private/tmp/example.py", line 36, in <module>
asyncio.run(main())
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 28, in main
user2 = await process_user_data(2)
File "/private/tmp/example.py", line 21, in process_user_data
return sum(i * i for i in range(100_000_000))
File "/private/tmp/example.py", line 21, in <genexpr>
return sum(i * i for i in range(100_000_000))
Event Loop Block: block-4
Duration: 1.517s (threshold: 0.500s)
Timestamp: 1749052725.247
Blocking Stack:
File "/private/tmp/example.py", line 36, in <module>
asyncio.run(main())
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 28, in main
user2 = await process_user_data(2)
File "/private/tmp/example.py", line 21, in process_user_data
return sum(i * i for i in range(100_000_000))
File "/private/tmp/example.py", line 21, in <genexpr>
return sum(i * i for i in range(100_000_000))
Event Loop Block: block-5
Duration: 1.513s (threshold: 0.500s)
Timestamp: 1749052726.839
Blocking Stack:
File "/private/tmp/example.py", line 36, in <module>
asyncio.run(main())
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 671, in run_until_complete
self.run_forever()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 638, in run_forever
self._run_once()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/base_events.py", line 1971, in _run_once
handle._run()
File "/opt/homebrew/anaconda3/envs/ffa/lib/python3.12/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/private/tmp/example.py", line 28, in main
user2 = await process_user_data(2)
File "/private/tmp/example.py", line 21, in process_user_data
return sum(i * i for i in range(100_000_000))
File "/private/tmp/example.py", line 21, in <genexpr>
return sum(i * i for i in range(100_000_000))
The pytest plugin automatically wraps tests with pyleak detectors based on pytest markers.
By default, all detectors are enabled. You can selectively enable or disable detectors using the no_leaks marker. For example, to only detect task leaks and event loop blocking, you can use the following:
@pytest.mark.no_leaks(tasks=True, blocking=True, threads=False)@pytest.mark.asyncioasyncdeftest_async_no_leaks():
asyncio.create_task(asyncio.sleep(10)) # This will be detectedtime.sleep(0.5) # This will be detectedthreading.Thread(target=lambda: time.sleep(10)).start() # This will not be detected
no_leaks marker configuration
Name
Default
Description
tasks
True
Whether to detect task leaks
task_action
raise
Action to take when a task leak is detected
task_name_filter
None
Filter to apply to task names
enable_task_creation_tracking
False
Whether to enable task creation tracking
threads
True
Whether to detect thread leaks
thread_action
raise
Action to take when a thread leak is detected
thread_name_filter
None
Filter to apply to thread names
exclude_daemon_threads
True
Whether to exclude daemon threads
blocking
True
Whether to detect event loop blocking
blocking_action
raise
Action to take when a blocking event loop is detected
blocking_threshold
0.1
Threshold for blocking event loop detection
blocking_check_interval
0.01
Interval for checking for blocking event loop
AsyncIO Tasks: Leaked tasks can cause memory leaks, prevent graceful shutdown, and make debugging difficult.
Threads: Leaked threads consume system resources and can prevent proper application termination.
Event Loop Blocking: Synchronous operations in async code destroy performance and can cause timeouts.
pyleak helps you catch these issues during development and testing, optionally using a pytest plugin, before they reach production.