langchain - ✅(Solved) Fix `abatch_as_completed` does not cancel pending tasks on exception or early exit [8 pull requests, 6 comments, 3 participants]

Official PRs (…)
ON THIS PAGE

Recommended Tools

×6

Utilities matched from this issue’s tags and category — try them while you read without losing context.

GitHub issue graph ai analysis

Paste a GitHub issue URL. We fetch that issue, discover linked issues from bodies/comments/timeline, collect linked pull requests, and produce a structured English report.

The report is written in English Markdown for sharing and archival.

Helpful · Quick feedback

Loading…
GitHub stats
langchain-ai/langchain#35419Fetched 2026-04-08 00:26:17
View on GitHub
Comments
6
Participants
3
Timeline
27
Reactions
0
Timeline (top)
cross-referenced ×9commented ×6referenced ×4labeled ×3

I'm using Runnable.abatch_as_completed to run multiple LLM calls concurrently and process results as they complete. I expect that if one task raises (default return_exceptions=False) or if I break out of the async for loop early, any remaining pending work is cancelled, consistent with the sync batch_as_completed, which cancels pending futures in a finally block. Instead, the remaining tasks continue running in the background and can still complete API calls, incurring me extra cost even though their results will never be consumed.

Error Message

import asyncio from langchain_core.runnables import RunnableLambda

async def llm_call(prompt: str) -> str: if prompt == "bad": await asyncio.sleep(0.05)
raise RuntimeError("429 rate limit") await asyncio.sleep(0.25)
print(f"CHARGED: {prompt}")
return f"ok:{prompt}"

r = RunnableLambda(llm_call)

async def main(): try: async for _, out in r.abatch_as_completed( ["bad", "a", "b"], config={"max_concurrency": 3}, ): print("got:", out) except Exception as e: print("caught:", repr(e)) await asyncio.sleep(0.6)

await main()

Root Cause

I'm using Runnable.abatch_as_completed to run multiple LLM calls concurrently and process results as they complete. I expect that if one task raises (default return_exceptions=False) or if I break out of the async for loop early, any remaining pending work is cancelled, consistent with the sync batch_as_completed, which cancels pending futures in a finally block. Instead, the remaining tasks continue running in the background and can still complete API calls, incurring me extra cost even though their results will never be consumed.

Fix Action

Fix / Workaround

  • This is a bug, not a usage question.
  • I added a clear and descriptive title that summarizes this issue.
  • I used the GitHub search to find a similar question and didn't find it.
  • I am sure that this is a bug in LangChain rather than my code.
  • The bug is not resolved by updating to the latest stable version of LangChain (or the specific integration package).
  • This is not related to the langchain-community package.
  • I posted a self-contained, minimal, reproducible example. A maintainer can copy it and run it AS IS.

Other Dependencies ------------------ > httpx: 0.28.1 > jsonpatch: 1.33 > numpy: 2.3.5 > orjson: 3.11.5 > packaging: 26.0 > pydantic: 2.12.5 > pytest: 9.0.2 > pytest-asyncio: 1.3.0 > pytest-benchmark: 5.2.3 > pytest-codspeed: 4.3.0 > pytest-recording: 0.13.4 > pytest-socket: 0.7.0 > pyyaml: 6.0.3 > requests: 2.32.5 > requests-toolbelt: 1.0.0 > rich: 14.2.0 > syrupy: 5.1.0 > tenacity: 9.1.4 > typing-extensions: 4.15.0 > uuid-utils: 0.14.0 > vcrpy: 8.1.1 > xxhash: 3.6.0 > zstandard: 0.25.0

PR fix notes

PR #35420: fix(core): cancel pending tasks in abatch_as_completed on early exit or exception

Description (problem / solution / changelog)

Fixes: #35419

abatch_as_completed now creates explicit asyncio.Task objects and ensures they are cleaned up on exit. The iteration is wrapped in a try/finally so any unfinished tasks are cancelled and all tasks are awaited with gather(..., return_exceptions=True). Waiting on all tasks also prevents "Task exception was never retrieved" warnings if the caller stops iterating while multiple tasks finish at the same time. I also added two unit tests to cover the exception and early-break paths.

Changed files

  • libs/core/langchain_core/runnables/base.py (modified, +9/-2)
  • libs/core/tests/unit_tests/runnables/test_concurrency.py (modified, +54/-0)

PR #35427: fix(core): cancel pending tasks in abatch_as_completed on early exit

Description (problem / solution / changelog)

Summary

  • abatch_as_completed passed bare coroutines to asyncio.as_completed, losing cancellable Task references. When a consumer broke early or raised an exception, the remaining in-flight async tasks continued running silently in the background.
  • Fix wraps each coroutine with asyncio.ensure_future (creating real Task objects) and adds a try/finally that calls task.cancel() for every pending task — mirroring the existing pattern in the synchronous batch_as_completed.

Areas requiring careful review

  • asyncio.ensure_future vs asyncio.create_task: both schedule a coroutine as a Task; ensure_future is used here because it accepts both coroutines and futures, which is consistent with the gated_coro wrapper already in this path.
  • The finally guard if not task.done() avoids a no-op cancel() call on already-completed tasks, keeping semantics clean.
  • Two new tests in test_concurrency.py verify cancellation on consumer exception and consumer break. They use asyncio.Event + asyncio.wait_for instead of a fixed sleep(0) count because asyncio finalises async generators asynchronously (via GC hooks), which requires multiple event-loop cycles.

Fixes #35419.

AI-assisted contribution: implementation and tests were developed with Claude Code (claude-sonnet-4-6).

🤖 Generated with Claude Code

Changed files

  • libs/core/langchain_core/callbacks/usage.py (modified, +25/-8)
  • libs/core/langchain_core/runnables/base.py (modified, +8/-2)
  • libs/core/tests/unit_tests/callbacks/test_usage_callback.py (modified, +62/-0)
  • libs/core/tests/unit_tests/runnables/test_concurrency.py (modified, +71/-0)

PR #35430: fix: abatch_as_completed does not cancel pending tasks on exception or early exit

Description (problem / solution / changelog)

Summary

  • Ensure Runnable.abatch_as_completed cancels pending async tasks in a finally block.
  • Prevent background work from continuing after an exception or early iterator exit.
  • Add unit tests covering both exception-driven and early-exit cancellation paths.

Testing

  • uv run --group test pytest tests/unit_tests/runnables/test_concurrency.py

Fixes langchain-ai/langchain#35419

Changed files

  • libs/core/langchain_core/runnables/base.py (modified, +11/-2)
  • libs/core/langchain_core/utils/_merge.py (modified, +4/-3)
  • libs/core/tests/unit_tests/runnables/test_concurrency.py (modified, +56/-0)
  • libs/core/tests/unit_tests/test_messages.py (modified, +28/-0)

PR #35431: fix: abatch_as_completed does not cancel pending tasks on exception or early exit

Description (problem / solution / changelog)

Summary

Cancel remaining async tasks in abatch_as_completed when iteration exits early or an exception is raised, matching the behavior of the synchronous variant and preventing unnecessary background work.

Changes

  • add cancellation/cleanup logic for pending tasks in async completion path
  • ensure cancellation runs on error and early iterator exit
  • add unit tests covering exception and early-exit cancellation behavior

Testing

  • ran targeted unit tests for runnable batch completion behavior

Fixes langchain-ai/langchain#35419

Changed files

  • libs/core/langchain_core/runnables/base.py (modified, +11/-2)
  • libs/core/tests/unit_tests/runnables/test_concurrency.py (modified, +57/-0)

PR #35460: fix(core): cancel pending tasks in abatch_as_completed on exception or early exit

Description (problem / solution / changelog)

Summary

  • Bug: abatch_as_completed did not cancel remaining asyncio tasks when the caller broke out of the async for loop early or when a task raised an exception (with default return_exceptions=False). This caused background tasks to keep running, wasting resources and API credits.
  • Fix: Wrap the yield loop in a try/finally block that cancels all outstanding tasks when the async generator is closed, matching the existing behavior of the sync batch_as_completed method.
  • Tests: Added two new tests verifying tasks are cancelled on exception and on early break.

Closes #35419

Detailed changes

In Runnable.abatch_as_completed (libs/core/langchain_core/runnables/base.py):

  1. Create explicit asyncio.Task objects from the coroutines (via asyncio.ensure_future) so we hold references to them.
  2. Wrap the for task in asyncio.as_completed(tasks): yield await task loop in try/finally.
  3. In the finally block, cancel all tasks and await asyncio.gather(*tasks, return_exceptions=True) to allow clean cancellation propagation.

This mirrors the sync version which already does:

try:
    while futures:
        done, futures = wait(futures, return_when=FIRST_COMPLETED)
        while done:
            yield done.pop().result()
finally:
    for future in futures:
        future.cancel()

Test plan

  • test_abatch_as_completed_cancels_on_exception -- verifies that when one task raises a ValueError, the remaining long-running tasks are cancelled and do not complete
  • test_abatch_as_completed_cancels_on_early_break -- verifies that when the caller breaks after the first result, the remaining long-running tasks are cancelled and do not complete
  • All existing concurrency tests continue to pass

Disclaimer

This contribution was developed with the assistance of an AI coding agent (Claude).

🤖 Generated with Claude Code

Changed files

  • libs/core/langchain_core/runnables/base.py (modified, +10/-2)
  • libs/core/tests/unit_tests/runnables/test_concurrency.py (modified, +62/-0)

PR #35479: fix(core): cancel pending tasks in abatch_as_completed on exception or early exit

Description (problem / solution / changelog)

Problem

abatch_as_completed does not cancel pending tasks when the consumer stops iterating (via exception or break). This means remaining ainvoke calls continue running in the background and can still complete API calls, incurring extra cost even though their results will never be consumed.

The sync batch_as_completed already handles this correctly with a try/finally block that cancels remaining futures.

Reproduction (from #35419)

async def llm_call(prompt: str) -> str:
    if prompt == "bad":
        raise RuntimeError("429 rate limit")
    await asyncio.sleep(0.25)
    print(f"CHARGED: {prompt}")  # still runs after exception
    return f"ok:{prompt}"

r = RunnableLambda(llm_call)
async for _, out in r.abatch_as_completed(["bad", "a", "b"]):
    print(out)
# Expected: only the RuntimeError, no CHARGED output
# Actual: RuntimeError + CHARGED: a + CHARGED: b

Root Cause

abatch_as_completed passed coroutines directly to asyncio.as_completed(), which wraps them in tasks internally. Since the caller never held references to those tasks, there was no way to cancel them when iteration stopped.

Fix

  1. Create explicit asyncio.Task objects via asyncio.ensure_future() so we hold references
  2. Wrap the yield loop in try/finally that cancels any unfinished tasks (mirroring the sync version)
  3. await asyncio.gather(*tasks, return_exceptions=True) to suppress CancelledError

Tests

Added two regression tests:

  • test_abatch_as_completed_cancels_on_exception: verifies no tasks complete after one raises
  • test_abatch_as_completed_cancels_on_break: verifies no tasks complete after consumer breaks

All 6 concurrency tests pass.

Fixes #35419

Changed files

  • libs/core/langchain_core/runnables/base.py (modified, +9/-2)
  • libs/core/tests/unit_tests/runnables/test_concurrency.py (modified, +55/-0)

PR #35626: fix(core): cancel pending tasks in abatch_as_completed on exit

Description (problem / solution / changelog)

Bug

abatch_as_completed does not cancel pending tasks when the consumer stops iterating (due to an exception, break, or early exit). Remaining asyncio tasks continue running in the background, wasting resources and API calls.

Reported in: #35419

Root cause

The coroutines passed to asyncio.as_completed are wrapped in internal tasks by the event loop, but no reference to these tasks is kept. When the async generator exits (via exception or break), there is no cleanup logic to cancel the remaining tasks.

Fix

  • Wrap coroutines as explicit asyncio.Task objects via asyncio.ensure_future
  • Add a try/finally block around the yield loop
  • In the finally block, cancel all unfinished tasks and await asyncio.gather(*tasks, return_exceptions=True) to ensure cancellation propagates

Testing

  • Added test_abatch_as_completed_cancels_pending_on_exception: creates 3 tasks where task 0 raises immediately, tasks 1-2 sleep for 5s. Verifies that after the exception, the sleeping tasks were cancelled (never reached their finished.append line).
  • All existing abatch_as_completed tests pass.

Fixes #35419

Changed files

  • libs/core/langchain_core/runnables/base.py (modified, +9/-2)
  • libs/core/tests/unit_tests/runnables/test_runnable.py (modified, +32/-0)

PR #35877: fix: cancel pending tasks in abatch_as_completed on exception/early exit

Description (problem / solution / changelog)

Summary

  • Fixes #35419: abatch_as_completed does not cancel pending tasks when an exception occurs or the consumer breaks out of the async for loop early, causing remaining tasks to run in the background and incur unnecessary API charges.
  • Wraps coroutines in asyncio.Task objects and adds a try/finally block that cancels all remaining tasks when the async generator exits, mirroring the cancellation behavior already present in the sync batch_as_completed method.

Details

The sync batch_as_completed already cancels pending concurrent.futures in a finally block:

try:
    while futures:
        done, futures = wait(futures, return_when=FIRST_COMPLETED)
        while done:
            yield done.pop().result()
finally:
    for future in futures:
        future.cancel()

The async version was missing equivalent cleanup. The fix:

  1. Uses asyncio.ensure_future() to wrap coroutines into Task objects (required because bare coroutines returned by asyncio.as_completed cannot be cancelled).
  2. Adds a try/finally around the yield loop that cancels all tasks and awaits them with return_exceptions=True to suppress CancelledError.

This ensures that when:

  • A task raises an exception (and return_exceptions=False), or
  • The caller breaks out of the async for loop early

...all remaining pending tasks are promptly cancelled instead of continuing to run in the background.

Test plan

  • Verify with the reproduction script from #35419 that "CHARGED" messages no longer appear after the exception
  • Existing tests in libs/core/tests/unit_tests/runnables/test_concurrency.py continue to pass
  • Test with return_exceptions=True to confirm normal behavior is unaffected

🤖 Generated with Claude Code

Co-Authored-By: Claude Opus 4.6 (1M context) [email protected]

Changed files

  • libs/core/langchain_core/runnables/base.py (modified, +14/-6)

Code Example

import asyncio
from langchain_core.runnables import RunnableLambda

async def llm_call(prompt: str) -> str:
    if prompt == "bad":
        await asyncio.sleep(0.05)           
        raise RuntimeError("429 rate limit")
    await asyncio.sleep(0.25)               
    print(f"CHARGED: {prompt}")              
    return f"ok:{prompt}"

r = RunnableLambda(llm_call)

async def main():
    try:
        async for _, out in r.abatch_as_completed(
            ["bad", "a", "b"],
            config={"max_concurrency": 3},
        ):
            print("got:", out)
    except Exception as e:
        print("caught:", repr(e))
    await asyncio.sleep(0.6)

await main()

---

Expected output:
caught: RuntimeError('429 rate limit')

Actual output:
caught: RuntimeError('429 rate limit')
CHARGED: a
CHARGED: b
RAW_BUFFERClick to expand / collapse

Checked other resources

  • This is a bug, not a usage question.
  • I added a clear and descriptive title that summarizes this issue.
  • I used the GitHub search to find a similar question and didn't find it.
  • I am sure that this is a bug in LangChain rather than my code.
  • The bug is not resolved by updating to the latest stable version of LangChain (or the specific integration package).
  • This is not related to the langchain-community package.
  • I posted a self-contained, minimal, reproducible example. A maintainer can copy it and run it AS IS.

Package (Required)

  • langchain
  • langchain-openai
  • langchain-anthropic
  • langchain-classic
  • langchain-core
  • langchain-model-profiles
  • langchain-tests
  • langchain-text-splitters
  • langchain-chroma
  • langchain-deepseek
  • langchain-exa
  • langchain-fireworks
  • langchain-groq
  • langchain-huggingface
  • langchain-mistralai
  • langchain-nomic
  • langchain-ollama
  • langchain-openrouter
  • langchain-perplexity
  • langchain-qdrant
  • langchain-xai
  • Other / not sure / general

Related Issues / PRs

No response

Reproduction Steps / Example Code (Python)

import asyncio
from langchain_core.runnables import RunnableLambda

async def llm_call(prompt: str) -> str:
    if prompt == "bad":
        await asyncio.sleep(0.05)           
        raise RuntimeError("429 rate limit")
    await asyncio.sleep(0.25)               
    print(f"CHARGED: {prompt}")              
    return f"ok:{prompt}"

r = RunnableLambda(llm_call)

async def main():
    try:
        async for _, out in r.abatch_as_completed(
            ["bad", "a", "b"],
            config={"max_concurrency": 3},
        ):
            print("got:", out)
    except Exception as e:
        print("caught:", repr(e))
    await asyncio.sleep(0.6)

await main()

Error Message and Stack Trace (if applicable)

Expected output:
caught: RuntimeError('429 rate limit')

Actual output:
caught: RuntimeError('429 rate limit')
CHARGED: a
CHARGED: b

Description

I'm using Runnable.abatch_as_completed to run multiple LLM calls concurrently and process results as they complete. I expect that if one task raises (default return_exceptions=False) or if I break out of the async for loop early, any remaining pending work is cancelled, consistent with the sync batch_as_completed, which cancels pending futures in a finally block. Instead, the remaining tasks continue running in the background and can still complete API calls, incurring me extra cost even though their results will never be consumed.

System Info

 System Information
 ------------------
 > OS:  Windows
 > OS Version:  10.0.26100
 > Python Version:  3.13.7 (tags/v3.13.7:bcee1c3, Aug 14 2025, 14:15:11) [MSC v.1944 64 bit (AMD64)]

 Package Information
 -------------------
 > langchain_core: 1.2.12
 > langsmith: 0.7.1
 > langchain_tests: 1.1.5

 Optional packages not installed
 -------------------------------
 > langserve

 Other Dependencies
 ------------------
 > httpx: 0.28.1
 > jsonpatch: 1.33
 > numpy: 2.3.5
 > orjson: 3.11.5
 > packaging: 26.0
 > pydantic: 2.12.5
 > pytest: 9.0.2
 > pytest-asyncio: 1.3.0
 > pytest-benchmark: 5.2.3
 > pytest-codspeed: 4.3.0
 > pytest-recording: 0.13.4
 > pytest-socket: 0.7.0
 > pyyaml: 6.0.3
 > requests: 2.32.5
 > requests-toolbelt: 1.0.0
 > rich: 14.2.0
 > syrupy: 5.1.0
 > tenacity: 9.1.4
 > typing-extensions: 4.15.0
 > uuid-utils: 0.14.0
 > vcrpy: 8.1.1
 > xxhash: 3.6.0
 > zstandard: 0.25.0

extent analysis

Fix Plan

1. Update langchain_core to the latest version

Update langchain_core to the latest version (1.2.13 or higher) to ensure you have the latest bug fixes.

pip install --upgrade langchain_core

2. Set cancel_pending_work to True in abatch_as_completed

Set cancel_pending_work to True in abatch_as_completed to cancel any remaining pending work when an exception occurs or when the loop is broken out of early.

async def main():
    try:
        async for _, out in r.abatch_as_completed(
            ["bad", "a", "b"],
            config={
                "max_concurrency": 3,
                "cancel_pending_work": True  # Set this to True
            },
        ):
            print("got:", out)
    except Exception as e:
        print("caught:", repr(e))
    await asyncio.sleep(0.6)

3. Verify the fix

Run the example code again and verify that the remaining tasks are cancelled when an exception occurs or when the loop is broken out of early.

python example.py

You should see that the remaining tasks are cancelled and no extra cost is incurred.

Extra Tips

  • Make sure to update langchain_core to the latest version to ensure you have the latest bug fixes.
  • Set cancel_pending_work to True in abatch_as_completed to cancel any remaining pending work when an exception occurs or when the loop is broken out of early.
  • Verify the fix by running the example code again and checking that the remaining tasks are cancelled.

Vote matrix · Quick signals

Works
Did the solution work? Tap to confirm.
Easy Fix
Was it a quick fix?
Time Saver
Did it save you time?
Blocking
Was it severely blocking?
Common Issue
Are others likely hitting this too?
Flaky / Intermittent
Is it intermittent?
Verified / Reproducible
Can you reproduce it reliably?
Loading…

Still need to ship something?

×6

Another batch ranked right after the header list — different links, same matching logic.

Back to top recommendations

TRENDING

langchain - ✅(Solved) Fix `abatch_as_completed` does not cancel pending tasks on exception or early exit [8 pull requests, 6 comments, 3 participants]