crewai - ✅(Solved) Fix [FEATURE] Streaming handlers on singleton event bus can fan out chunks across concurrent runs [4 pull requests, 1 participants]

Official PRs (…)
ON THIS PAGE

Recommended Tools

×6

Utilities matched from this issue’s tags and category — try them while you read without losing context.

GitHub issue graph ai analysis

Paste a GitHub issue URL. We fetch that issue, discover linked issues from bodies/comments/timeline, collect linked pull requests, and produce a structured English report.

The report is written in English Markdown for sharing and archival.

Helpful · Quick feedback

Loading…
GitHub stats
crewAIInc/crewAI#5376Fetched 2026-04-10 03:43:00
View on GitHub
Comments
0
Participants
1
Timeline
3
Reactions
0
Author
Participants
Timeline (top)
cross-referenced ×1labeled ×1referenced ×1

Fix Action

Fix / Workaround

In concurrent streaming runs, each run registers its own handler and queue. Event dispatch can invoke all active handlers for the event type, which may fan out chunks across multiple per-run queues when handler-level scoping/filtering is missing.

PR fix notes

PR #5377: fix: add run_id scoping to streaming handlers to prevent cross-run chunk contamination (#5376)

Description (problem / solution / changelog)

Summary

Fixes #5376 — the singleton CrewAIEventsBus fans out LLMStreamChunkEvent to all registered stream handlers. When multiple streaming runs execute concurrently, each run's handler receives chunks from every other run, causing cross-run chunk contamination.

Approach: Each streaming run is assigned a unique run_id (UUID) that flows through via contextvars.ContextVar. Events are stamped with the run_id at emission time, and handlers filter to only accept events matching their own run_id.

Changes:

  • LLMStreamChunkEvent — new optional run_id: str | None = None field
  • streaming.py — new _current_stream_run_id context var, get_current_stream_run_id() accessor; create_streaming_state() generates + sets run_id; _create_stream_handler() filters by run_id
  • base_llm.py / llm.py — all 4 LLMStreamChunkEvent emission sites stamp run_id=get_current_stream_run_id()
  • 5 new tests in TestStreamingRunIsolation covering handler filtering, concurrent state isolation, and multi-thread isolation

Review & Testing Checklist for Human

  • Filter semantics when run_id is None: The handler passes events through when either the handler's or event's run_id is None (streaming.py:148-150). This preserves backward compatibility but means un-stamped events still fan out to all handlers. Verify this is the desired behavior for your use cases.
  • Context propagation in all thread/async paths: contextvars.ContextVar does NOT auto-propagate to child threads. create_chunk_generator already uses contextvars.copy_context() — verify no other code paths spawn threads without context copying that would lose the run_id.
  • Test fragility in _get_run_id_from_handler: One test introspects handler closure cells to extract the captured run_id by matching UUID-shaped strings. This works but is coupled to closure internals.

Suggested test plan: Run two concurrent Crew(stream=True).kickoff() calls against a real or mocked LLM and verify each stream's chunks contain only content from its own run.

Notes

  • run_id defaults to None so all existing code/serialization is unaffected
  • All 39 existing streaming tests continue to pass alongside the 5 new ones

Link to Devin session: https://app.devin.ai/sessions/bc659bf095044869b49955ca3f222f65

<!-- CURSOR_SUMMARY -->

[!NOTE] Medium Risk Changes core streaming/event-bus plumbing by introducing a per-run run_id and filtering handlers, which could affect chunk delivery in concurrent or nested streaming scenarios (especially when run_id is missing). Covered by new concurrency-focused tests but still touches a central execution path.

Overview Fixes concurrent streaming chunk cross-talk by introducing a per-streaming-run run_id and scoping LLMStreamChunkEvent delivery to matching handlers.

create_streaming_state() now generates/reuses a UUID run scope via a ContextVar, LLM streaming emission sites stamp run_id onto each LLMStreamChunkEvent, and stream handlers drop chunks from other runs. Adds a new TestStreamingRunIsolation suite to verify handler filtering and isolation across contexts/threads.

<sup>Reviewed by Cursor Bugbot for commit 0a2230796233b1d40322a1c51eaf9747b7b82297. Bugbot is set up for automated code reviews on this repo. Configure here.</sup>

<!-- /CURSOR_SUMMARY -->

Changed files

  • lib/crewai/src/crewai/events/types/llm_events.py (modified, +1/-0)
  • lib/crewai/src/crewai/llm.py (modified, +4/-0)
  • lib/crewai/src/crewai/llms/base_llm.py (modified, +2/-0)
  • lib/crewai/src/crewai/utilities/streaming.py (modified, +36/-1)
  • lib/crewai/tests/test_streaming.py (modified, +259/-0)

PR #5375: fix(streaming): prefer event.task_id when building StreamChunk.task_id

Description (problem / solution / changelog)

Summary

This PR makes StreamChunk.task_id use event.task_id as the primary source, with a backward-compatible fallback to current_task_info["id"].

  • Before:
    • StreamChunk.task_id could be empty when current_task_info["id"] was empty.
  • After:
    • StreamChunk.task_id = event.task_id or current_task_info["id"] or "".

Why this matters

LLMStreamChunkEvent.task_id is populated from from_task in event normalization, while current_task_info["id"] may remain empty in some streaming contexts.

So even when task identity exists in the event, downstream consumers can still receive chunks with empty chunk.task_id if StreamChunk.task_id is built only from current_task_info.

Real downstream example (SSE filtering)

In a downstream FastAPI SSE endpoint, we collect crew task IDs and filter stream chunks by chunk.task_id:

crew = CrewAgent().crew()
task_id_set = {str(task.id) for task in crew.tasks}

async for chunk in streaming:
    content = getattr(chunk, "content", None)
    task_id = getattr(chunk, "task_id", None)
    if content and task_id in task_id_set:
        await queue.put(f"data: {json.dumps(content, ensure_ascii=False)}\n\n")

If chunk.task_id is empty, valid chunks can be dropped by filtering logic.

Related concurrency concern (important, not fully solved by this PR)

While each request/run typically has its own queue, stream handlers are registered on the singleton crewai_event_bus. That means with concurrent streaming runs:

  • Request A registers handler_A -> writes to queue_A
  • Request B registers handler_B -> writes to queue_B
  • A single LLMStreamChunkEvent can be dispatched to all active handlers

So this is not a "shared queue" problem; it is an event fan-out to multiple handlers problem on a global event bus, which can cause cross-run chunk contamination if handler-level scoping/filtering is missing.

This PR addresses task_id correctness in StreamChunk. A follow-up improvement could add stronger run/stream scoping for handler dispatch.

Compatibility

Backward compatible:
If event.task_id is missing, existing fallback behavior (current_task_info["id"]) is preserved.

Test plan

  • Verify stream chunks now carry non-empty task_id when event contains task context.
  • Verify fallback still works when event.task_id is unavailable.
  • Validate downstream filtering by task IDs no longer drops valid chunks due to empty task_id.

Changed files

  • lib/crewai/src/crewai/utilities/streaming.py (modified, +1/-1)

PR #5406: fix: add run-scoped isolation to event bus for concurrent streaming

Description (problem / solution / changelog)

Summary

Fixes #5376

The singleton crewai_event_bus dispatches events to all registered handlers regardless of which concurrent run emitted them. When multiple runs (e.g. kickoff_for_each, parallel streams) register their own LLMStreamChunkEvent handlers, events from run A fan out to run B's handler — causing cross-run chunk contamination.

Changes

  • BaseEvent.run_id — new optional field on every event, auto-stamped from a ContextVar during emit() so each event knows which run it belongs to
  • run_scope() context manager — establishes a unique run_id for the current execution context (uses contextvars for proper thread/async isolation)
  • get_current_run_id() / set_current_run_id() — low-level helpers exported from crewai.events
  • Stream handler filtering_create_stream_handler() captures the run_id at creation time and silently skips events whose run_id does not match
  • Backwards compatible — when no run_scope is active, run_id is None and handlers accept all events (existing single-run behavior unchanged)

Files changed

FileChange
lib/crewai/src/crewai/events/base_events.pyAdd run_id field to BaseEvent
lib/crewai/src/crewai/events/event_bus.pyAdd _current_run_id ContextVar, run_scope(), stamp run_id in _prepare_event()
lib/crewai/src/crewai/events/__init__.pyExport run_scope, get_current_run_id, set_current_run_id
lib/crewai/src/crewai/utilities/streaming.pyPass run_id to stream handler, filter mismatched events
lib/crewai/tests/events/test_concurrent_run_isolation.py381-line test suite

Test plan

  • TestRunScope — context manager sets/resets run_id, nesting, explicit IDs
  • TestEventRunIdStamping — emit stamps run_id, no-scope gives None, pre-set not overwritten
  • TestStreamHandlerIsolation — handler accepts matching, rejects mismatched, no-filter accepts all
  • TestConcurrentRunIsolation — 2 concurrent threads, 10-thread stress test, single-run regression

🤖 Generated with Claude Code

<!-- CURSOR_SUMMARY -->

[!NOTE] High Risk High risk because it deletes core repo tooling (CI workflows, pre-commit config, test environment/config files) which can silently break testing, linting, security scanning, and contributor workflows despite not changing runtime code.

Overview This PR removes a large set of repository-level configuration and automation: GitHub Actions workflows for linting, tests, type-checking, CodeQL, vulnerability scanning, stale issues, PR labeling/title checks, nightly/publish automation, and related CodeQL/dependabot configs.

It also deletes various project scaffolding and dev/test assets including .gitignore, .editorconfig, .pre-commit-config.yaml, .python-version, root conftest.py/.env.test, plus top-level documentation and policy files like README.md, LICENSE, CONTRIBUTING.md, issue templates, and substantial docs/ar/* content.

<sup>Reviewed by Cursor Bugbot for commit cb375e656d08c1c648de965d307abe2baa0e9f73. Bugbot is set up for automated code reviews on this repo. Configure here.</sup>

<!-- /CURSOR_SUMMARY -->

Changed files

  • .editorconfig (removed, +0/-14)
  • .env.test (removed, +0/-160)
  • .github/CONTRIBUTING.md (removed, +0/-173)
  • .github/ISSUE_TEMPLATE/bug_report.yml (removed, +0/-115)
  • .github/ISSUE_TEMPLATE/config.yml (removed, +0/-1)
  • .github/ISSUE_TEMPLATE/feature_request.yml (removed, +0/-65)
  • .github/codeql/codeql-config.yml (removed, +0/-33)
  • .github/dependabot.yml (removed, +0/-16)
  • .github/security.md (removed, +0/-12)
  • .github/workflows/build-uv-cache.yml (removed, +0/-48)
  • .github/workflows/codeql.yml (removed, +0/-103)
  • .github/workflows/docs-broken-links.yml (removed, +0/-35)
  • .github/workflows/generate-tool-specs.yml (removed, +0/-63)
  • .github/workflows/linter.yml (removed, +0/-87)
  • .github/workflows/nightly.yml (removed, +0/-127)
  • .github/workflows/pr-size.yml (removed, +0/-32)
  • .github/workflows/pr-title.yml (removed, +0/-41)
  • .github/workflows/publish.yml (removed, +0/-166)
  • .github/workflows/stale.yml (removed, +0/-29)
  • .github/workflows/tests.yml (removed, +0/-137)
  • .github/workflows/type-checker.yml (removed, +0/-91)
  • .github/workflows/update-test-durations.yml (removed, +0/-71)
  • .github/workflows/vulnerability-scan.yml (removed, +0/-105)
  • .gitignore (removed, +0/-32)
  • .pre-commit-config.yaml (removed, +0/-33)
  • .python-version (removed, +0/-1)
  • LICENSE (removed, +0/-19)
  • README.md (removed, +0/-780)
  • conftest.py (removed, +0/-296)
  • docs/ar/api-reference/inputs.mdx (removed, +0/-8)
  • docs/ar/api-reference/introduction.mdx (removed, +0/-135)
  • docs/ar/api-reference/kickoff.mdx (removed, +0/-8)
  • docs/ar/api-reference/resume.mdx (removed, +0/-6)
  • docs/ar/api-reference/status.mdx (removed, +0/-6)
  • docs/ar/changelog.mdx (removed, +0/-841)
  • docs/ar/concepts/agent-capabilities.mdx (removed, +0/-147)
  • docs/ar/concepts/agents.mdx (removed, +0/-357)
  • docs/ar/concepts/checkpointing.mdx (removed, +0/-229)
  • docs/ar/concepts/cli.mdx (removed, +0/-287)
  • docs/ar/concepts/collaboration.mdx (removed, +0/-363)
  • docs/ar/concepts/crews.mdx (removed, +0/-204)
  • docs/ar/concepts/event-listener.mdx (removed, +0/-236)
  • docs/ar/concepts/files.mdx (removed, +0/-267)
  • docs/ar/concepts/flows.mdx (removed, +0/-1068)
  • docs/ar/concepts/knowledge.mdx (removed, +0/-1095)
  • docs/ar/concepts/llms.mdx (removed, +0/-1464)
  • docs/ar/concepts/memory.mdx (removed, +0/-878)
  • docs/ar/concepts/planning.mdx (removed, +0/-155)
  • docs/ar/concepts/processes.mdx (removed, +0/-67)
  • docs/ar/concepts/production-architecture.mdx (removed, +0/-154)
  • docs/ar/concepts/reasoning.mdx (removed, +0/-148)
  • docs/ar/concepts/skills.mdx (removed, +0/-306)
  • docs/ar/concepts/tasks.mdx (removed, +0/-1085)
  • docs/ar/concepts/testing.mdx (removed, +0/-49)
  • docs/ar/concepts/tools.mdx (removed, +0/-290)
  • docs/ar/concepts/training.mdx (removed, +0/-197)
  • docs/ar/enterprise/features/agent-repositories.mdx (removed, +0/-155)
  • docs/ar/enterprise/features/automations.mdx (removed, +0/-104)
  • docs/ar/enterprise/features/crew-studio.mdx (removed, +0/-88)
  • docs/ar/enterprise/features/flow-hitl-management.mdx (removed, +0/-558)
  • docs/ar/enterprise/features/hallucination-guardrail.mdx (removed, +0/-251)
  • docs/ar/enterprise/features/marketplace.mdx (removed, +0/-45)
  • docs/ar/enterprise/features/pii-trace-redactions.mdx (removed, +0/-342)
  • docs/ar/enterprise/features/rbac.mdx (removed, +0/-256)
  • docs/ar/enterprise/features/tools-and-integrations.mdx (removed, +0/-261)
  • docs/ar/enterprise/features/traces.mdx (removed, +0/-148)
  • docs/ar/enterprise/features/webhook-streaming.mdx (removed, +0/-172)
  • docs/ar/enterprise/guides/automation-triggers.mdx (removed, +0/-321)
  • docs/ar/enterprise/guides/azure-openai-setup.mdx (removed, +0/-54)
  • docs/ar/enterprise/guides/build-crew.mdx (removed, +0/-48)
  • docs/ar/enterprise/guides/capture_telemetry_logs.mdx (removed, +0/-39)
  • docs/ar/enterprise/guides/custom-mcp-server.mdx (removed, +0/-136)
  • docs/ar/enterprise/guides/deploy-to-amp.mdx (removed, +0/-445)
  • docs/ar/enterprise/guides/enable-crew-studio.mdx (removed, +0/-182)
  • docs/ar/enterprise/guides/gmail-trigger.mdx (removed, +0/-97)
  • docs/ar/enterprise/guides/google-calendar-trigger.mdx (removed, +0/-83)
  • docs/ar/enterprise/guides/google-drive-trigger.mdx (removed, +0/-80)
  • docs/ar/enterprise/guides/hubspot-trigger.mdx (removed, +0/-61)
  • docs/ar/enterprise/guides/human-in-the-loop.mdx (removed, +0/-157)
  • docs/ar/enterprise/guides/kickoff-crew.mdx (removed, +0/-178)
  • docs/ar/enterprise/guides/microsoft-teams-trigger.mdx (removed, +0/-70)
  • docs/ar/enterprise/guides/onedrive-trigger.mdx (removed, +0/-69)
  • docs/ar/enterprise/guides/outlook-trigger.mdx (removed, +0/-69)
  • docs/ar/enterprise/guides/prepare-for-deployment.mdx (removed, +0/-311)
  • docs/ar/enterprise/guides/private-package-registry.mdx (removed, +0/-263)
  • docs/ar/enterprise/guides/react-component-export.mdx (removed, +0/-112)
  • docs/ar/enterprise/guides/salesforce-trigger.mdx (removed, +0/-50)
  • docs/ar/enterprise/guides/slack-trigger.mdx (removed, +0/-62)
  • docs/ar/enterprise/guides/team-management.mdx (removed, +0/-91)
  • docs/ar/enterprise/guides/tool-repository.mdx (removed, +0/-154)
  • docs/ar/enterprise/guides/training-crews.mdx (removed, +0/-132)
  • docs/ar/enterprise/guides/update-crew.mdx (removed, +0/-91)
  • docs/ar/enterprise/guides/webhook-automation.mdx (removed, +0/-157)
  • docs/ar/enterprise/guides/zapier-trigger.mdx (removed, +0/-105)
  • docs/ar/enterprise/integrations/asana.mdx (removed, +0/-271)
  • docs/ar/enterprise/integrations/box.mdx (removed, +0/-280)
  • docs/ar/enterprise/integrations/clickup.mdx (removed, +0/-301)
  • docs/ar/enterprise/integrations/github.mdx (removed, +0/-0)
  • docs/ar/enterprise/integrations/gmail.mdx (removed, +0/-0)
  • docs/ar/enterprise/integrations/google_calendar.mdx (removed, +0/-0)

PR #5505: fix: scope streaming handlers to prevent cross-run chunk contamination

Description (problem / solution / changelog)

Summary

  • Streaming handlers on the singleton crewai_event_bus received ALL LLMStreamChunkEvent emissions, causing cross-run chunk fan-out when multiple streaming runs executed concurrently
  • Adds a ContextVar-based stream scope ID (_current_stream_id) so each handler only accepts events emitted within its own execution context
  • LLMStreamChunkEvent handlers run synchronously in the emitter's thread, so the context var naturally reflects the correct stream scope

Closes #5376

Test plan

  • Added TestConcurrentStreamIsolation regression test: sets up two streams in separate contexts, emits chunks concurrently, asserts zero cross-contamination
  • All 35 existing streaming tests pass unchanged
<!-- CURSOR_SUMMARY -->

[!NOTE] Medium Risk Touches core streaming/event-bus integration and relies on ContextVar propagation across threads/tasks; mistakes could lead to missing chunks or continued fan-out under concurrency.

Overview Prevents concurrent streaming runs from cross-contaminating by scoping each registered LLMStreamChunkEvent handler to a per-run stream_id propagated via a ContextVar.

create_streaming_state now generates and stores a UUID stream_id, handlers drop events outside their context, and both sync/async chunk generators temporarily push the stream_id into the copied execution context so only in-scope emitters are observed. Adds a regression test that runs two concurrent streams and asserts each receives only its own chunks.

<sup>Reviewed by Cursor Bugbot for commit 7359c5c1f67216198cab566586797f5e4c1cfbb9. Bugbot is set up for automated code reviews on this repo. Configure here.</sup>

<!-- /CURSOR_SUMMARY -->

Changed files

  • lib/crewai/src/crewai/utilities/streaming.py (modified, +29/-9)
  • lib/crewai/tests/test_streaming.py (modified, +88/-0)
RAW_BUFFERClick to expand / collapse

Feature Area

Core functionality

Is your feature request related to a an existing bug? Please link it here.

Problem

LLMStreamChunkEvent handlers are registered on the singleton crewai_event_bus.

In concurrent streaming runs, each run registers its own handler and queue. Event dispatch can invoke all active handlers for the event type, which may fan out chunks across multiple per-run queues when handler-level scoping/filtering is missing.

This is not a shared-queue issue; it is a global event fan-out issue.

Describe the solution you'd like

Why this is risky

With two concurrent runs:

  • run A: handler_A -> queue_A
  • run B: handler_B -> queue_B

An event emitted by run A may still be observed by handler_B unless strictly scoped, potentially causing cross-run chunk contamination.

Suggested direction

  • Add explicit run/stream correlation scope in streaming event handling.
  • Ensure handlers only accept events belonging to their own run/stream context.
  • Add concurrent streaming regression tests to verify isolation.

Describe alternatives you've considered

No response

Additional context

No response

Willingness to Contribute

I could provide more detailed specifications

extent analysis

TL;DR

To prevent cross-run chunk contamination, add explicit run/stream correlation scope in streaming event handling to ensure handlers only accept events belonging to their own run/stream context.

Guidance

  • Identify the current event handling mechanism in LLMStreamChunkEvent handlers and assess how events are dispatched to multiple handlers.
  • Introduce a run/stream identifier in the event data to enable handlers to filter events based on their own run/stream context.
  • Develop and integrate concurrent streaming regression tests to verify the isolation of event handling across different runs.
  • Consider implementing a mechanism for handlers to register with a specific run/stream scope to prevent global event fan-out.

Example

# Pseudo-code example of run/stream scoped event handling
class LLMStreamChunkEvent:
    def __init__(self, run_id, stream_id, data):
        self.run_id = run_id
        self.stream_id = stream_id
        self.data = data

class EventHandler:
    def __init__(self, run_id, stream_id):
        self.run_id = run_id
        self.stream_id = stream_id

    def handle_event(self, event):
        if event.run_id == self.run_id and event.stream_id == self.stream_id:
            # Process the event
            pass
        else:
            # Ignore the event
            pass

Notes

The provided solution direction assumes that the event handling mechanism can be modified to include run/stream correlation scope. The actual implementation may vary depending on the existing architecture and requirements.

Recommendation

Apply workaround by adding explicit run/stream correlation scope in streaming event handling, as this approach directly addresses the identified issue of global event fan-out and cross-run chunk contamination.

Vote matrix · Quick signals

Works
Did the solution work? Tap to confirm.
Easy Fix
Was it a quick fix?
Time Saver
Did it save you time?
Blocking
Was it severely blocking?
Common Issue
Are others likely hitting this too?
Flaky / Intermittent
Is it intermittent?
Verified / Reproducible
Can you reproduce it reliably?
Loading…

Still need to ship something?

×6

Another batch ranked right after the header list — different links, same matching logic.

Back to top recommendations

TRENDING