litellm - ✅(Solved) Fix [Bug]: Memory Leak + Quadratic Re-sends: `DataDogLogger.async_send_batch()` Never Clears `log_queue` [1 pull requests, 1 participants]

Official PRs (…)
ON THIS PAGE

Recommended Tools

×6

Utilities matched from this issue’s tags and category — try them while you read without losing context.

GitHub issue graph ai analysis

Paste a GitHub issue URL. We fetch that issue, discover linked issues from bodies/comments/timeline, collect linked pull requests, and produce a structured English report.

The report is written in English Markdown for sharing and archival.

Helpful · Quick feedback

Loading…
GitHub stats
BerriAI/litellm#25660Fetched 2026-04-14 05:38:21
View on GitHub
Comments
0
Participants
1
Timeline
5
Reactions
0
Author
Participants
Timeline (top)
cross-referenced ×2labeled ×2referenced ×1

Error Message

datadog.py:311-362

async def async_send_batch(self): try: if not self.log_queue: ... return

    response = await self.async_send_compressed_data(self.log_queue)  # sends all
    response.raise_for_status()
    ...
    # ← log_queue.clear() is MISSING here
except Exception as e:
    verbose_logger.exception(...)

Root Cause

_log_async_event() appends each event to self.log_queue and then, when len(log_queue) >= batch_size (default DD_MAX_BATCH_SIZE = 1000), calls async_send_batch() directly:

# datadog.py:426-432
self.log_queue.append(dd_payload)

if len(self.log_queue) >= self.batch_size:
    await self.async_send_batch()   # ← sends queue but NEVER clears it

async_send_batch() transmits the queue to Datadog but contains no self.log_queue.clear() call:

# datadog.py:311-362
async def async_send_batch(self):
    try:
        if not self.log_queue:
            ...
            return

        response = await self.async_send_compressed_data(self.log_queue)  # sends all
        response.raise_for_status()
        ...
        # ← log_queue.clear() is MISSING here
    except Exception as e:
        verbose_logger.exception(...)

The queue is only cleared inside the base class flush_queue(), called by the background periodic_flush() task every flush_interval seconds (default 5 s):

# custom_batch_logger.py:44-55
async def flush_queue(self):
    async with self.flush_lock:
        if self.log_queue:
            await self.async_send_batch()
            self.log_queue.clear()    # ← the ONLY place that clears

The same defect is present in async_post_call_failure_hook() (lines 301–304), which also calls async_send_batch() directly after appending to the queue.

Fix Action

Fixed

PR fix notes

PR #25663: fix: drain datadog batches safely

Description (problem / solution / changelog)

Summary

  • detach and drain the Datadog batch queue before sending so threshold flushes do not keep growing memory or resend old events
  • route threshold-based flushes through the logger flush lock and requeue unsent events on send failures
  • add focused regression tests for concurrent appends during flush and failure-hook threshold flushing

Fixes #25660

Changed files

  • litellm/integrations/datadog/datadog.py (modified, +25/-5)
  • tests/test_litellm/integrations/datadog/test_datadog_logger_batching.py (added, +267/-0)

Code Example

# datadog.py:426-432
self.log_queue.append(dd_payload)

if len(self.log_queue) >= self.batch_size:
    await self.async_send_batch()   # ← sends queue but NEVER clears it

---

# datadog.py:311-362
async def async_send_batch(self):
    try:
        if not self.log_queue:
            ...
            return

        response = await self.async_send_compressed_data(self.log_queue)  # sends all
        response.raise_for_status()
        ...
        # ← log_queue.clear() is MISSING here
    except Exception as e:
        verbose_logger.exception(...)

---

# custom_batch_logger.py:44-55
async def flush_queue(self):
    async with self.flush_lock:
        if self.log_queue:
            await self.async_send_batch()
            self.log_queue.clear()    # ← the ONLY place that clears

---

litellm proxy HTTP request
FastAPI route handler (proxy_server.py)
    → litellm.acompletion()
Logging.success_handler()
DataDogLogger.async_log_success_event()    [datadog.py:184]
DataDogLogger._log_async_event()         [datadog.py:418]
            → self.log_queue.append(dd_payload)      [datadog.py:426]GROWS
            → self.async_send_batch()                [datadog.py:432]NO CLEAR

Parallel failure path:
DataDogLogger.async_post_call_failure_hook()     [datadog.py:221]
    → self.log_queue.append(dd_payload)              [datadog.py:301]GROWS
    → self.async_send_batch()                        [datadog.py:304]NO CLEAR

---

# datadog.pyDataDogLogger.async_send_batch()
async def async_send_batch(self):
    try:
        if not self.log_queue:
            verbose_logger.exception("Datadog: log_queue does not exist")
            return

        response = await self.async_send_compressed_data(self.log_queue)
        if response.status_code == 413:
            verbose_logger.exception(DD_ERRORS.DATADOG_413_ERROR.value)
            return

        response.raise_for_status()
        if response.status_code != 202:
            raise Exception(...)

        self.log_queue.clear()   # ← ADD THIS after successful send
        verbose_logger.debug(...)
    except Exception as e:
        verbose_logger.exception(...)

---

# datadog.py:431-432 (and line 303-304)
if len(self.log_queue) >= self.batch_size:
    await self.flush_queue()   # ← uses the lock AND clears
RAW_BUFFERClick to expand / collapse

Check for existing issues

  • I have searched the existing issues and checked that my issue is not a duplicate.

What happened?

DataDogLogger accumulates an unbounded in-memory queue between periodic flushes, and re-sends every previously-seen event on every request once the batch threshold is crossed. This causes both a memory growth proportional to throughput × flush interval and quadratic duplicate API calls to Datadog.

Severity

Critical — on the hot path for every LLM response when Datadog logging is enabled. At 1,000 req/s this consumes ~18 MB per 5-second flush cycle; at 10,000 req/s ~150 MB; at 100,000 req/s ~1.5 GB.

Affected files

  • litellm/integrations/datadog/datadog.py — lines 293–310, 311–365, 418–432
  • litellm/integrations/custom_batch_logger.py — lines 44–55 (has the only correct log_queue.clear())

Root cause

_log_async_event() appends each event to self.log_queue and then, when len(log_queue) >= batch_size (default DD_MAX_BATCH_SIZE = 1000), calls async_send_batch() directly:

# datadog.py:426-432
self.log_queue.append(dd_payload)

if len(self.log_queue) >= self.batch_size:
    await self.async_send_batch()   # ← sends queue but NEVER clears it

async_send_batch() transmits the queue to Datadog but contains no self.log_queue.clear() call:

# datadog.py:311-362
async def async_send_batch(self):
    try:
        if not self.log_queue:
            ...
            return

        response = await self.async_send_compressed_data(self.log_queue)  # sends all
        response.raise_for_status()
        ...
        # ← log_queue.clear() is MISSING here
    except Exception as e:
        verbose_logger.exception(...)

The queue is only cleared inside the base class flush_queue(), called by the background periodic_flush() task every flush_interval seconds (default 5 s):

# custom_batch_logger.py:44-55
async def flush_queue(self):
    async with self.flush_lock:
        if self.log_queue:
            await self.async_send_batch()
            self.log_queue.clear()    # ← the ONLY place that clears

The same defect is present in async_post_call_failure_hook() (lines 301–304), which also calls async_send_batch() directly after appending to the queue.

How to reproduce

  1. Configure a LiteLLM proxy with Datadog logging enabled.
  2. Send ≥ 1,001 requests within a single flush_interval window (default 5 s).
  3. Observe that:
    • Memory: the proxy's RSS grows proportionally to throughput × flush_interval.
    • Datadog: events with index 1,000 onward appear multiple times in Datadog (event at index 1,000 appears (T × F − batch_size) times).

Impact

Throughput (req/s)Queue size at flush (T=1000 req/s, F=5 s)~Memory heldRe-sends per flush cycle
100 req/s~500 events~1.5 MB0 (never crosses batch_size)
1,000 req/s~6,000 events~18 MB~12,500 duplicate sends
10,000 req/s~51,000 events~150 MB~1.25M duplicate sends
100,000 req/s~501,000 events~1.5 GB~125M duplicate sends

Call chain (static trace)

litellm proxy HTTP request
  → FastAPI route handler (proxy_server.py)
    → litellm.acompletion()
      → Logging.success_handler()
        → DataDogLogger.async_log_success_event()    [datadog.py:184]
          → DataDogLogger._log_async_event()         [datadog.py:418]
            → self.log_queue.append(dd_payload)      [datadog.py:426]  ← GROWS
            → self.async_send_batch()                [datadog.py:432]  ← NO CLEAR

Parallel failure path:
  → DataDogLogger.async_post_call_failure_hook()     [datadog.py:221]
    → self.log_queue.append(dd_payload)              [datadog.py:301]  ← GROWS
    → self.async_send_batch()                        [datadog.py:304]  ← NO CLEAR

Proposed fix

Option A (recommended) — Add self.log_queue.clear() inside async_send_batch() in datadog.py, after a successful send. This mirrors the correct pattern already used in SlackAlerting:

# datadog.py — DataDogLogger.async_send_batch()
async def async_send_batch(self):
    try:
        if not self.log_queue:
            verbose_logger.exception("Datadog: log_queue does not exist")
            return

        response = await self.async_send_compressed_data(self.log_queue)
        if response.status_code == 413:
            verbose_logger.exception(DD_ERRORS.DATADOG_413_ERROR.value)
            return

        response.raise_for_status()
        if response.status_code != 202:
            raise Exception(...)

        self.log_queue.clear()   # ← ADD THIS after successful send
        verbose_logger.debug(...)
    except Exception as e:
        verbose_logger.exception(...)

Option B — Replace direct async_send_batch() calls in _log_async_event() and async_post_call_failure_hook() with flush_queue(), which already handles both sending and clearing under the flush_lock:

# datadog.py:431-432 (and line 303-304)
if len(self.log_queue) >= self.batch_size:
    await self.flush_queue()   # ← uses the lock AND clears

Option A is preferred because it also closes a race between _log_async_event() (which bypasses flush_lock) and the concurrent periodic_flush() task — both can hold a reference to the same queue contents simultaneously.

Reference

  • SlackAlerting.async_send_batch() (correct pattern): clears the queue inside async_send_batch().
  • CustomBatchLogger.flush_queue() (custom_batch_logger.py:44–55): the base-class flush that currently does the only safe clear.

Environment

  • Component: litellm/integrations/datadog/
  • Trigger condition: Datadog logging enabled + throughput ≥ DD_MAX_BATCH_SIZE (1000) events within a single flush_interval (default 5 s)
  • Python: 3.10+
  • LiteLLM versions affected: all versions with DataDogLogger

What part of LiteLLM is this about?

Proxy

What LiteLLM version are you on ?

v1.83.7

Twitter / LinkedIn details

No response

extent analysis

TL;DR

To fix the memory growth and duplicate API calls issue, add self.log_queue.clear() inside async_send_batch() in datadog.py after a successful send, as proposed in Option A.

Guidance

  • Identify the lines of code where async_send_batch() is called directly and consider replacing them with flush_queue() to ensure the queue is cleared after sending.
  • Verify that the flush_interval is set to a reasonable value to prevent excessive memory growth.
  • Review the DD_MAX_BATCH_SIZE value and adjust it if necessary to balance between memory usage and API call efficiency.
  • Test the proposed fix with different throughput scenarios to ensure the memory growth and duplicate API calls are resolved.

Example

The proposed fix in Option A involves adding self.log_queue.clear() after a successful send in async_send_batch():

async def async_send_batch(self):
    try:
        # ...
        response.raise_for_status()
        self.log_queue.clear()  # Add this line
        # ...
    except Exception as e:
        # ...

Notes

The fix assumes that the async_send_batch() method is the primary cause of the issue. However, other factors like the flush_interval and DD_MAX_BATCH_SIZE values may also impact the behavior.

Recommendation

Apply the workaround proposed in Option A by adding self.log_queue.clear() inside async_send_batch() to resolve the memory growth and duplicate API calls issue. This fix is preferred because it also addresses the race condition between _log_async_event() and the concurrent periodic_flush() task.

Vote matrix · Quick signals

Works
Did the solution work? Tap to confirm.
Easy Fix
Was it a quick fix?
Time Saver
Did it save you time?
Blocking
Was it severely blocking?
Common Issue
Are others likely hitting this too?
Flaky / Intermittent
Is it intermittent?
Verified / Reproducible
Can you reproduce it reliably?
Loading…

Still need to ship something?

×6

Another batch ranked right after the header list — different links, same matching logic.

Back to top recommendations

TRENDING