llamaIndex - ✅(Solved) Fix [Bug]: Async retry backoff blocks event loop due to `time.sleep` usage [1 pull requests, 2 comments, 2 participants]

Official PRs (…)
ON THIS PAGE

Recommended Tools

×6

Utilities matched from this issue’s tags and category — try them while you read without losing context.

GitHub issue graph ai analysis

Paste a GitHub issue URL. We fetch that issue, discover linked issues from bodies/comments/timeline, collect linked pull requests, and produce a structured English report.

The report is written in English Markdown for sharing and archival.

Helpful · Quick feedback

Loading…
GitHub stats
run-llama/llama_index#20763Fetched 2026-04-08 00:31:06
View on GitHub
Comments
2
Participants
2
Timeline
6
Reactions
0
Timeline (top)
cross-referenced ×2labeled ×2closed ×1commented ×1

Error Message

class TransientError(Exception): No exception is raised. This is a correctness and performance issue rather than a runtime error. Performance benchmarks show a large difference:

Root Cause

This would affect any LlamaIndex async component that relies on retries, such as LLM, embedding, and reranker calls, agent and workflow execution, ingestion and indexing pipelines, async query and retrieval flows, etc because a single retry backoff can pause unrelated tasks sharing the same event loop.

Fix Action

Fixed

PR fix notes

PR #20764: fix: async retry backoff to avoid blocking event loop

Description (problem / solution / changelog)

Fixes #20763

Description

This PR fixes a blocking behavior in the async retry helper aretry_on_exceptions_with_backoff. The function previously used time.sleep() for retry backoff inside an async def, which blocks the asyncio event loop and prevents other coroutines from running concurrently. The backoff logic has been updated to use await asyncio.sleep() instead.

I also added a regression test for this change. The fix actually showed a significant performance improvement in async workloads. Since there is no existing place in the repository for benchmarks, I just added the code below so that it can be used to measure performance before and after the fix. I have also attached the benchmark results from my local machine for reference.

<details> <summary>Benchmark Code</summary>
import asyncio
import time
import json
import os
import statistics
from dataclasses import dataclass, asdict
from typing import Optional, Any, Callable, List, Dict

from llama_index.core.utils import aretry_on_exceptions_with_backoff, ErrorToRetry

class TransientError(Exception):
    pass

@dataclass
class Scenario:
    name: str
    concurrency: int
    backoff: float
    work_delay: float
    fail_times: Optional[int]
    mixed_ratio: float
    repeats: int

def make_task(task_id: int, work_delay: float, fail_times: Optional[int]) -> Callable[[], Any]:
    attempts = {"n": 0}
    async def fn():
        attempts["n"] += 1
        await asyncio.sleep(work_delay)
        if fail_times is None or fail_times == 0:
            return f"ok-{task_id}"
        if fail_times == -1:
            raise TransientError(f"task {task_id} always fails")
        if attempts["n"] <= fail_times:
            raise TransientError(f"task {task_id} transient failure")
        return f"ok-{task_id}"
    return fn

async def run_once(s: Scenario) -> float:
    start = time.perf_counter()
    async def one(i: int):
        use_fail = s.fail_times
        if s.mixed_ratio < 1.0:
            cutoff = int(s.concurrency * s.mixed_ratio)
            if i >= cutoff:
                use_fail = 0
        fn = make_task(i, s.work_delay, use_fail)
        if use_fail == -1:
            max_tries = 3
        elif use_fail is None or use_fail == 0:
            max_tries = 1
        else:
            max_tries = use_fail + 1
        return await aretry_on_exceptions_with_backoff(
            fn,
            errors_to_retry=[ErrorToRetry(TransientError)],
            max_tries=max_tries,
            min_backoff_secs=s.backoff,
            max_backoff_secs=s.backoff,
        )
    try:
        await asyncio.gather(*(one(i) for i in range(s.concurrency)))
    except Exception:
        pass
    return time.perf_counter() - start

async def run_scenario(s: Scenario) -> Dict[str, Any]:
    times = []
    for _ in range(s.repeats):
        times.append(await run_once(s))
    return {
        "scenario": asdict(s),
        "n": len(times),
        "mean_s": statistics.mean(times),
        "median_s": statistics.median(times),
        "min_s": min(times),
        "max_s": max(times),
        "samples_s": times,
    }

def scenarios() -> List[Scenario]:
    base = [
        ("always_ok", None, 1.0),
        ("fail_once", 1, 1.0),
        ("fail_twice", 2, 1.0),
        ("always_fail", -1, 1.0),
        ("mixed_50pct_fail_once", 1, 0.5),
    ]
    concurrencies = [10, 50]
    backoffs = [0.01, 0.2]
    work_delay = 0.01
    reps = 3
    out = []
    for c in concurrencies:
        for b in backoffs:
            for (name, fail_times, mixed_ratio) in base:
                out.append(
                    Scenario(
                        name=f"{name}",
                        concurrency=c,
                        backoff=b,
                        work_delay=work_delay,
                        fail_times=fail_times,
                        mixed_ratio=mixed_ratio,
                        repeats=reps,
                    )
                )
    return out

def save_json(path: str, payload: Any) -> None:
    os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
    with open(path, "w", encoding="utf-8") as f:
        json.dump(payload, f, indent=2)

def load_json(path: str) -> Any:
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

def key_of(r: Dict[str, Any]) -> str:
    s = r["scenario"]
    return f'{s["name"]}|c={s["concurrency"]}|b={s["backoff"]}|d={s["work_delay"]}|f={s["fail_times"]}|m={s["mixed_ratio"]}'

def fmt(x: float) -> str:
    return f"{x:.3f}s"

def compare(before_path: str, after_path: str) -> None:
    before = load_json(before_path)["results"]
    after = load_json(after_path)["results"]
    bmap = {key_of(r): r for r in before}
    amap = {key_of(r): r for r in after}
    keys = sorted(set(bmap.keys()) & set(amap.keys()))
    header = ["scenario", "before(median)", "after(median)", "speedup", "before(mean)", "after(mean)"]
    rows = []
    for k in keys:
        br = bmap[k]
        ar = amap[k]
        bmed = br["median_s"]
        amed = ar["median_s"]
        sp = (bmed / amed) if amed > 0 else float("inf")
        rows.append([k, fmt(bmed), fmt(amed), f"{sp:.2f}x", fmt(br["mean_s"]), fmt(ar["mean_s"])])
    widths = [max(len(str(row[i])) for row in ([header] + rows)) for i in range(len(header))]
    def print_row(row):
        print(" | ".join(str(row[i]).ljust(widths[i]) for i in range(len(row))))
    print_row(header)
    print("-+-".join("-" * w for w in widths))
    for r in rows:
        print_row(r)

async def run_all(out_path: str) -> None:
    res = []
    for s in scenarios():
        res.append(await run_scenario(s))
    save_json(out_path, {"results": res})

def main():
    import sys
    if len(sys.argv) < 2 or sys.argv[1] not in {"before", "after", "compare"}:
        print("Usage: python bench_async_retry.py before|after|compare")
        raise SystemExit(2)
    mode = sys.argv[1]
    before_path = os.path.join("bench_out", "before.json")
    after_path = os.path.join("bench_out", "after.json")
    if mode == "before":
        asyncio.run(run_all(before_path))
        print(f"Wrote {before_path}")
    elif mode == "after":
        asyncio.run(run_all(after_path))
        print(f"Wrote {after_path}")
    else:
        compare(before_path, after_path)

if __name__ == "__main__":
    main()
</details>
scenariobefore (median)after (median)speedupbefore (mean)after (mean)
always_fail|c=10|b=0.01|d=0.01|f=-1|m=1.00.284s0.086s3.29x0.323s0.088s
always_fail|c=10|b=0.2|d=0.01|f=-1|m=1.04.234s0.482s8.79x4.221s0.511s
always_fail|c=50|b=0.01|d=0.01|f=-1|m=1.01.342s0.110s12.21x1.381s0.105s
always_fail|c=50|b=0.2|d=0.01|f=-1|m=1.020.411s0.469s43.53x20.447s0.477s
always_ok|c=10|b=0.01|d=0.01|f=None|m=1.00.016s0.016s1.03x0.017s0.014s
always_ok|c=10|b=0.2|d=0.01|f=None|m=1.00.015s0.016s0.94x0.014s0.016s
always_ok|c=50|b=0.01|d=0.01|f=None|m=1.00.020s0.016s1.21x0.020s0.016s
always_ok|c=50|b=0.2|d=0.01|f=None|m=1.00.016s0.016s1.02x0.016s0.016s
fail_once|c=10|b=0.01|d=0.01|f=1|m=1.00.134s0.048s2.80x0.137s0.048s
fail_once|c=10|b=0.2|d=0.01|f=1|m=1.02.042s0.238s8.59x2.049s0.253s
fail_once|c=50|b=0.01|d=0.01|f=1|m=1.00.834s0.048s17.51x0.801s0.048s
fail_once|c=50|b=0.2|d=0.01|f=1|m=1.010.221s0.239s42.72x10.223s0.249s
fail_twice|c=10|b=0.01|d=0.01|f=2|m=1.00.304s0.080s3.82x0.319s0.085s
fail_twice|c=10|b=0.2|d=0.01|f=2|m=1.04.074s0.460s8.87x4.078s0.459s
fail_twice|c=50|b=0.01|d=0.01|f=2|m=1.01.339s0.096s13.99x1.335s0.095s
fail_twice|c=50|b=0.2|d=0.01|f=2|m=1.020.478s0.474s43.17x20.479s0.537s
mixed_50pct_fail_once|c=10|b=0.01|d=0.01|f=1|m=0.50.102s0.032s3.19x0.097s0.033s
mixed_50pct_fail_once|c=10|b=0.2|d=0.01|f=1|m=0.51.039s0.253s4.10x1.038s0.251s
mixed_50pct_fail_once|c=50|b=0.01|d=0.01|f=1|m=0.50.338s0.048s7.07x0.338s0.064s
mixed_50pct_fail_once|c=50|b=0.2|d=0.01|f=1|m=0.55.177s0.238s21.77x5.189s0.238s

New Package?

Did I fill in the tool.llamahub section in the pyproject.toml and provide a detailed README.md for my new integration or package?

  • Yes
  • No

Version Bump?

Did I bump the version in the pyproject.toml file of the package I am updating? (Except for the llama-index-core package)

  • Yes
  • No

Type of Change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)

How Has This Been Tested?

Your pull-request will likely not be merged unless it is covered by some form of impactful unit testing.

  • I added new unit tests to cover this change
  • I believe this change is already covered by existing unit tests

Suggested Checklist:

  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have added Google Colab support for the newly added notebooks.
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes
  • I ran uv run make format; uv run make lint to appease the lint gods

Changed files

  • llama-index-core/llama_index/core/utils.py (modified, +1/-1)
  • llama-index-core/tests/test_utils.py (modified, +29/-0)

Code Example

### Relevant Logs/Tracbacks
RAW_BUFFERClick to expand / collapse

Bug Description

The async retry helper aretry_on_exceptions_with_backoff calls time.sleep() during backoff inside an async def. This blocks the asyncio event loop, so other coroutines cannot run while a retry is waiting. In async workloads this degrades performance and can make concurrent execution behave like it is serialized during retries.

This would affect any LlamaIndex async component that relies on retries, such as LLM, embedding, and reranker calls, agent and workflow execution, ingestion and indexing pipelines, async query and retrieval flows, etc because a single retry backoff can pause unrelated tasks sharing the same event loop.

Version

0.14.15

Steps to Reproduce

import asyncio
import time
from llama_index.core.utils import aretry_on_exceptions_with_backoff, ErrorToRetry

class TransientError(Exception):
    pass

def make_flaky():
    state = {"n": 0}
    async def flaky():
        state["n"] += 1
        await asyncio.sleep(0.01)
        if state["n"] == 1:
            raise TransientError("retry once")
        return "ok"
    return flaky

async def run_batch(concurrency=50, backoff=0.2):
    async def one():
        return await aretry_on_exceptions_with_backoff(
            make_flaky(),
            errors_to_retry=[ErrorToRetry(TransientError)],
            min_backoff_secs=backoff,
            max_backoff_secs=backoff,
            max_tries=2,
        )

    start = time.perf_counter()
    await asyncio.gather(*(one() for _ in range(concurrency)))
    elapsed = time.perf_counter() - start
    return elapsed

elapsed = await run_batch(concurrency=50, backoff=0.2)
print("Elapsed seconds:", round(elapsed, 2))

Relevant Logs/Tracbacks

No exception is raised. This is a correctness and performance issue rather than a runtime error. Performance benchmarks show a large difference:
- Before the fix: ~10 seconds total runtime under high concurrency
- After the fix (`await asyncio.sleep`): ~0.25 seconds total runtime

This show us that the event loop is clearly being blocked by `time.sleep()`.

extent analysis

<think> Okay, let's see. The user is reporting an issue with the async retry helper in LlamaIndex. The problem is that it uses time.sleep() inside an async function, which blocks the event loop. That makes sense because time.sleep is synchronous and blocks the entire event loop in an async context. So when you have multiple concurrent tasks, they all get stuck waiting during retries.

The solution here is to replace time.sleep() with asyncio.sleep() so that it's non-blocking. The user provided a test case where using time.sleep() caused a 10-second runtime, but switching to asyncio.sleep() brought it down to 0.25 seconds. That's a big difference.

So the fix would involve modifying the aretry_on_exceptions_with_backoff function. I need to find where time.sleep is called and replace it with asyncio.sleep. Since it's part of the LlamaIndex library, the user would need to either patch the library or upgrade if a fix is available. But if they can't upgrade, they might need to monkey-patch it themselves.

Wait, the user is using version 0.14.15. Maybe there's a newer version where this is fixed? I should check if that's the case. If not, the user would have to modify the source code of the library or create a custom retry function.

In the example code, the user shows a test case. The fix in their test case was replacing time.sleep with asyncio.sleep. So the actual code in the library probably has time.sleep somewhere in the retry logic. The solution steps would be to locate that line and change it.

But since the user can't modify the library directly, maybe they can override the function. Alternatively, they can use a custom retry decorator that uses asyncio.sleep. But that's more involved.

So the main steps are: identify the location in the library where time.sleep is used, replace it with asyncio.sleep, and ensure that the code is

Vote matrix · Quick signals

Works
Did the solution work? Tap to confirm.
Easy Fix
Was it a quick fix?
Time Saver
Did it save you time?
Blocking
Was it severely blocking?
Common Issue
Are others likely hitting this too?
Flaky / Intermittent
Is it intermittent?
Verified / Reproducible
Can you reproduce it reliably?
Loading…

Still need to ship something?

×6

Another batch ranked right after the header list — different links, same matching logic.

Back to top recommendations

TRENDING

llamaIndex - ✅(Solved) Fix [Bug]: Async retry backoff blocks event loop due to `time.sleep` usage [1 pull requests, 2 comments, 2 participants]