crewai - ✅(Solved) Fix Support for Graceful Cancellation and Resource Cleanup via aclose()/cancel() on CrewStreamingOutput Streaming Objects [1 pull requests, 1 comments, 2 participants]

Official PRs (…)
ON THIS PAGE

GitHub issue graph ai analysis

Paste a GitHub issue URL. We fetch that issue, discover linked issues from bodies/comments/timeline, collect linked pull requests, and produce a structured English report.

The report is written in English Markdown for sharing and archival.

Helpful · Quick feedback

Loading…
GitHub stats
crewAIInc/crewAI#5312Fetched 2026-04-08 03:10:53
View on GitHub
Comments
1
Participants
2
Timeline
6
Reactions
0
Author
Timeline (top)
labeled ×2assigned ×1commented ×1cross-referenced ×1

Fix Action

Fixed

PR fix notes

PR #5313: feat: add graceful cancellation support (aclose/cancel) for streaming outputs

Description (problem / solution / changelog)

Summary

Adds aclose() and cancel() methods to CrewStreamingOutput and FlowStreamingOutput (via the shared StreamingOutputBase) so callers can abort in-flight streaming and release resources promptly — e.g. when an HTTP client disconnects from a FastAPI StreamingResponse.

Types (types/streaming.py):

  • New _cancelled flag, _cancel_event (asyncio), _cancel_thread_event (threading), _background_task, _background_thread attributes on StreamingOutputBase
  • aclose() — async cancellation: sets cancel events, cancels the background asyncio.Task, awaits cleanup
  • cancel() — sync cancellation: sets cancel events, fires task cancellation (no await), marks completed
  • is_cancelled property
  • Both are idempotent (no-op on already-completed streams)

Utilities (utilities/streaming.py):

  • create_chunk_generator (sync): switched from blocking queue.get() to polling with 0.1s timeout so cancellation can be detected between chunks
  • create_async_chunk_generator: switched from await queue.get() to asyncio.wait() racing queue-get vs cancel-event, with proper task cancellation in the finally block
  • Both generators wire their cancel primitives to the streaming output object via a _wire_cancel() helper called each iteration

Tests: 14 new tests in TestStreamingCancellation covering async close, sync cancel, background task cancellation, idempotency, no-op on completed streams, FlowStreamingOutput support, and multi-call safety.

Closes #5312

Review & Testing Checklist for Human

  • Performance regression in sync streaming: create_chunk_generator now polls with a 0.1s timeout instead of a blocking queue.get(). Verify this doesn't introduce noticeable latency in chunk delivery for real LLM streaming workloads.
  • Async generator overhead: create_async_chunk_generator now creates two futures (ensure_future) and calls asyncio.wait per chunk. Confirm this is acceptable overhead vs the original single await queue.get().
  • Thread safety of _wire_cancel: The wiring mutates output_holder[0]'s internal attributes on every iteration. Verify no race with concurrent cancel()/aclose() calls from another thread/task.
  • cancel() does not join the background thread: By design, sync cancel() only signals and marks complete — the daemon thread may still run briefly. Verify this is acceptable for the intended FastAPI disconnect use case.
  • Integration test gap: All new tests use mocked generators. Consider testing aclose() through the full Crew.akickoff(stream=True) → cancel path to validate end-to-end behavior with the real event bus and queue wiring.

Notes

  • The _background_thread attribute is stored but not used for active cleanup (Python threads can't be cancelled). Cleanup relies on the polling loop checking the threading.Event.
  • _wire_cancel() is called every loop iteration; it could be optimized to wire once, but the cost is minimal (just attribute assignment + truthiness check).

Link to Devin session: https://app.devin.ai/sessions/d15a57dd90ab46c2a89fa4288605d1f4

<!-- CURSOR_SUMMARY -->

[!NOTE] Medium Risk Adds new cancellation paths that alter sync/async streaming control flow and background task/thread lifecycle handling; mistakes could cause leaked handlers/tasks or prematurely completed streams under race conditions.

Overview Adds graceful cancellation to streaming outputs via new StreamingOutputBase.aclose() (async) and cancel() (sync), plus an is_cancelled flag, so callers can abort in-flight crew/flow streaming and mark streams completed.

Updates the sync and async chunk generators to support cooperative cancellation by wiring per-stream cancel events into the output object and changing queue waits to be interruptible (sync polling with timeout; async wait() racing queue-get vs cancel-event), with adjusted cleanup logic for background tasks/threads.

Extends test_streaming.py with a new cancellation-focused test suite covering idempotency, completed-stream no-ops, background task cancellation, and both CrewStreamingOutput and FlowStreamingOutput behavior.

<sup>Reviewed by Cursor Bugbot for commit da65140cf8599d1aa46fd874e939aaa368319a4d. Bugbot is set up for automated code reviews on this repo. Configure here.</sup>

<!-- /CURSOR_SUMMARY -->

Changed files

  • lib/crewai/src/crewai/types/streaming.py (modified, +86/-0)
  • lib/crewai/src/crewai/utilities/streaming.py (modified, +55/-4)
  • lib/crewai/tests/test_streaming.py (modified, +216/-0)

Code Example

streaming = await crew.akickoff(inputs=inputs)
try:
    async for chunk in streaming:
        ...
finally:
    # This should exist:
    await streaming.aclose()  # Should cancel agents, tasks, LLM calls, etc.
RAW_BUFFERClick to expand / collapse

Feature Area

Performance optimization

Is your feature request related to a an existing bug? Please link it here.

none

Describe the solution you'd like

Hi crewAI Team,

Background:
When using async streaming execution via Crew.akickoff (with stream=True), the returned object is a CrewStreamingOutput containing an async iterator over stream chunks. In web-serving scenarios (e.g., with FastAPI + StreamingResponse), if the HTTP client disconnects, Python's async generator protocol triggers cancellation of the surrounding coroutine/async generator.

Problem / Current Behavior:
Currently, there is:

  • No aclose()/cancel() or equivalent method on CrewStreamingOutput or the underlying async iterator returned by akickoff,
  • No hook or mechanism that allows application code (or a FastAPI middleware, etc.) to signal that downstream/unfinished LLM sub-tasks, streams, or resource consumers should be immediately canceled/cleaned up,
  • As a result, if the HTTP client disconnects, agent/crew processing continues in the background until all tasks naturally complete, potentially wasting tokens, compute, and tying up resources.

Why this matters:

  • In real world API and web applications, client-initiated disconnects are common.
  • Automatic and/or explicit cleanup (aclose, cancel, etc.) is critical for efficient resource management in LLM inference; otherwise, it could lead to resource exhaustion, quota waste, or stuck threads.
  • Most streaming async frameworks (aiostream, some OpenAI python libs, etc.) support aclose for this exact reason.

Request:

  • Please support an async cancellation/cleanup protocol for streaming:
    • Implement aclose() (and/or cancel()) methods on CrewStreamingOutput and all objects returned by the streaming kickoff pipeline (create_async_chunk_generator, etc).
    • Make sure calling aclose() (or a similar method) aggressively cancels/aborts ALL in-flight agent/LLM sub-tasks, and releases any allocations, so that compute/GPU is promptly freed and no work is done after cancellation.
    • If cancellation is not possible (e.g. due to LLM provider constraints), at least ensure that resources will be freed at the earliest possible opportunity.

Example Usage:
In frameworks like FastAPI:

streaming = await crew.akickoff(inputs=inputs)
try:
    async for chunk in streaming:
        ...
finally:
    # This should exist:
    await streaming.aclose()  # Should cancel agents, tasks, LLM calls, etc.

Thank you for your work. This feature would make crewAI much more robust and production-friendly for real-world streaming applications!

Describe alternatives you've considered

No response

Additional context

No response

Willingness to Contribute

Yes, I'd be happy to submit a pull request

extent analysis

TL;DR

Implementing an aclose() method on CrewStreamingOutput to support async cancellation and cleanup of in-flight tasks and resource allocations is the most likely fix.

Guidance

  • Implement aclose() on CrewStreamingOutput to cancel all in-flight agent/LLM sub-tasks and release allocations.
  • Ensure aclose() is called when the HTTP client disconnects to prevent resource waste and quota exhaustion.
  • Consider adding a hook or mechanism for application code to signal cancellation of downstream tasks and resource consumers.
  • Review the create_async_chunk_generator pipeline to ensure it supports cancellation and cleanup.

Example

class CrewStreamingOutput:
    # ...
    async def aclose(self):
        # Cancel all in-flight tasks and release allocations
        # ...
        pass

Notes

The implementation of aclose() will depend on the underlying architecture of CrewStreamingOutput and the create_async_chunk_generator pipeline. Additionally, consideration should be given to handling cases where cancellation is not possible due to LLM provider constraints.

Recommendation

Apply workaround by implementing aclose() on CrewStreamingOutput to support async cancellation and cleanup, as this will make crewAI more robust and production-friendly for real-world streaming applications.

Vote matrix · Quick signals

Works
Did the solution work? Tap to confirm.
Easy Fix
Was it a quick fix?
Time Saver
Did it save you time?
Blocking
Was it severely blocking?
Common Issue
Are others likely hitting this too?
Flaky / Intermittent
Is it intermittent?
Verified / Reproducible
Can you reproduce it reliably?
Loading…

Still need to ship something?

×6

Another batch ranked right after the header list — different links, same matching logic.

Back to top recommendations

TRENDING