hermes - ✅(Solved) Fix Gemini native streaming: usageMetadata not extracted — token counts always 0 [2 pull requests, 1 participants]

Official PRs (…)
ON THIS PAGE

Recommended Tools

×6

Utilities matched from this issue’s tags and category — try them while you read without losing context.

GitHub issue graph ai analysis

Paste a GitHub issue URL. We fetch that issue, discover linked issues from bodies/comments/timeline, collect linked pull requests, and produce a structured English report.

The report is written in English Markdown for sharing and archival.

Helpful · Quick feedback

Loading…
GitHub stats
NousResearch/hermes-agent#15253Fetched 2026-04-25 06:23:24
View on GitHub
Comments
0
Participants
1
Timeline
9
Reactions
0
Participants
Timeline (top)
labeled ×4referenced ×3cross-referenced ×2

Root Cause

translate_stream_event() in agent/gemini_native_adapter.py only extracts candidates from each SSE event — it never reads usageMetadata. The chunks it returns all have usage=None (set in _make_stream_chunk at line ~589).

In run_agent.py, the streaming loop collects usage from the final chunk:

if hasattr(chunk, "usage") and chunk.usage:
    usage_obj = chunk.usage

Since every Gemini streaming chunk has usage=None, usage_obj stays None, and the mock response built from the stream has no usage. The if hasattr(response, 'usage') and response.usage: guard at line ~10100 fails, so normalize_usage / update_token_counts are never called.

Meanwhile, the non-streaming path (translate_gemini_response) correctly reads usageMetadata:

usage_meta = resp.get("usageMetadata") or {}
usage = SimpleNamespace(
    prompt_tokens=int(usage_meta.get("promptTokenCount") or 0),
    completion_tokens=int(usage_meta.get("candidatesTokenCount") or 0),
    ...
)

Fix Action

Fix

In translate_stream_event(), extract usageMetadata from the event and attach it to the last chunk emitted (the one with finish_reason set). This follows the OpenAI streaming convention that run_agent.py already relies on.

def translate_stream_event(event: Dict[str, Any], model: str, tool_call_indices: Dict[str, Dict[str, Any]]) -> List[_GeminiStreamChunk]:
    candidates = event.get("candidates") or []
    if not candidates:
        return []
    # ... existing code ...

    finish_reason_raw = str(cand.get("finishReason") or "")
    if finish_reason_raw:
        mapped = "tool_calls" if tool_call_indices else _map_gemini_finish_reason(finish_reason_raw)
-       chunks.append(_make_stream_chunk(model=model, finish_reason=mapped))
+       finish_chunk = _make_stream_chunk(model=model, finish_reason=mapped)
+       # Attach usage from this event's usageMetadata so run_agent.py
+       # can record token counts (mirrors translate_gemini_response).
+       usage_meta = event.get("usageMetadata") or {}
+       if usage_meta:
+           finish_chunk.usage = SimpleNamespace(
+               prompt_tokens=int(usage_meta.get("promptTokenCount") or 0),
+               completion_tokens=int(usage_meta.get("candidatesTokenCount") or 0),
+               total_tokens=int(usage_meta.get("totalTokenCount") or 0),
+               prompt_tokens_details=SimpleNamespace(
+                   cached_tokens=int(usage_meta.get("cachedContentTokenCount") or 0),
+               ),
+           )
+       chunks.append(finish_chunk)
    return chunks

This is the minimal fix. Alternatively, usage could be emitted on every chunk (Gemini sends cumulative usageMetadata on each event), but the finish-chunk approach matches OpenAI semantics.

PR fix notes

PR #15264: fix(gemini): extract usageMetadata from streaming chunks for token tracking

Description (problem / solution / changelog)

Summary

Gemini native streaming adapter never extracts usageMetadata from SSE events, so all streaming sessions record input_tokens=0, output_tokens=0 in state.db. Cost monitoring and usage tracking is completely broken for Gemini native users.

Fixes #15253

Root cause

translate_stream_event() only extracts candidates from each event and never reads usageMetadata. Every chunk has usage=None, so the streaming loop in run_agent.py never calls normalize_usage / update_token_counts.

Meanwhile, the non-streaming path (translate_gemini_response) correctly reads usageMetadata.

Fix

Attach usage on the finish chunk (the one with finish_reason set) from the event's usageMetadata. This follows OpenAI streaming conventions that run_agent.py already relies on.

Testing

  • Non-streaming path is unaffected
  • Existing streaming behavior (content, tool calls) is unaffected
  • Token counts will now be recorded when Gemini sends usageMetadata in the final streaming event

Changed files

  • .dockerignore (modified, +4/-0)
  • agent/gemini_native_adapter.py (modified, +15/-1)

PR #15493: fix(agent): extract usageMetadata from Gemini streaming events

Description (problem / solution / changelog)

Problem

Gemini's streaming API includes usageMetadata on the final SSE event (alongside finishReason), but translate_stream_event() was ignoring it. This caused all Gemini streaming sessions to report input_tokens=0, output_tokens=0 — complete cost blindness for the most popular provider.

The non-streaming path (translate_gemini_response()) already correctly extracts usageMetadata at lines 514-522. The streaming path had no equivalent logic.

Fix

  • Added usage parameter to _make_stream_chunk() (was hardcoded to None)
  • In translate_stream_event(), extract usageMetadata from the event dict and build a SimpleNamespace usage object (same shape as the non-streaming path)
  • Usage is attached only to the finish-reason chunk (matching OpenAI streaming semantics where the final chunk carries usage)
  • Intermediate content/tool_call/reasoning chunks keep usage=None

Testing

8 new tests in tests/agent/test_gemini_streaming_usage.py:

TestCovers
test_stream_event_with_usage_metadata_attaches_to_finish_chunkHappy path: usage on finish chunk
test_stream_event_with_usage_metadata_and_tool_callsUsage works with tool calls
test_stream_event_without_usage_metadata_has_no_usageIntermediate chunks: no usage
test_stream_event_with_empty_usage_metadata_has_no_usageEmpty {} dict: usage=None
test_stream_event_usage_with_missing_fields_defaults_to_zeroPartial usageMetadata defaults to 0
test_stream_event_with_reasoning_and_usageReasoning + usage together
test_stream_event_with_explicit_zero_usage_metadataAll-zero values still attach (distinct from None)
test_stream_event_usage_only_on_finish_not_contentusageMetadata on non-finish chunks ignored

All 19 tests pass (11 existing + 8 new). No regressions.

Closes #15253

Changed files

  • agent/gemini_native_adapter.py (modified, +15/-2)
  • tests/agent/test_gemini_streaming_usage.py (added, +227/-0)

Code Example

if hasattr(chunk, "usage") and chunk.usage:
    usage_obj = chunk.usage

---

usage_meta = resp.get("usageMetadata") or {}
usage = SimpleNamespace(
    prompt_tokens=int(usage_meta.get("promptTokenCount") or 0),
    completion_tokens=int(usage_meta.get("candidatesTokenCount") or 0),
    ...
)

---

def translate_stream_event(event: Dict[str, Any], model: str, tool_call_indices: Dict[str, Dict[str, Any]]) -> List[_GeminiStreamChunk]:
    candidates = event.get("candidates") or []
    if not candidates:
        return []
    # ... existing code ...

    finish_reason_raw = str(cand.get("finishReason") or "")
    if finish_reason_raw:
        mapped = "tool_calls" if tool_call_indices else _map_gemini_finish_reason(finish_reason_raw)
-       chunks.append(_make_stream_chunk(model=model, finish_reason=mapped))
+       finish_chunk = _make_stream_chunk(model=model, finish_reason=mapped)
+       # Attach usage from this event's usageMetadata so run_agent.py
+       # can record token counts (mirrors translate_gemini_response).
+       usage_meta = event.get("usageMetadata") or {}
+       if usage_meta:
+           finish_chunk.usage = SimpleNamespace(
+               prompt_tokens=int(usage_meta.get("promptTokenCount") or 0),
+               completion_tokens=int(usage_meta.get("candidatesTokenCount") or 0),
+               total_tokens=int(usage_meta.get("totalTokenCount") or 0),
+               prompt_tokens_details=SimpleNamespace(
+                   cached_tokens=int(usage_meta.get("cachedContentTokenCount") or 0),
+               ),
+           )
+       chunks.append(finish_chunk)
    return chunks
RAW_BUFFERClick to expand / collapse

Bug

When using the Gemini native adapter (provider=gemini with AI Studio base URL) in streaming mode, token usage is never recorded. All sessions show input_tokens=0, output_tokens=0 in state.db.

Affected versions: v0.11.0 (v2026.4.23), likely v0.10.0+ (since native Gemini streaming was added in #12674)

Impact: Cost monitoring and usage tracking is completely broken for Gemini native users. The dashboard shows $0.00 cost for all sessions.

Root cause

translate_stream_event() in agent/gemini_native_adapter.py only extracts candidates from each SSE event — it never reads usageMetadata. The chunks it returns all have usage=None (set in _make_stream_chunk at line ~589).

In run_agent.py, the streaming loop collects usage from the final chunk:

if hasattr(chunk, "usage") and chunk.usage:
    usage_obj = chunk.usage

Since every Gemini streaming chunk has usage=None, usage_obj stays None, and the mock response built from the stream has no usage. The if hasattr(response, 'usage') and response.usage: guard at line ~10100 fails, so normalize_usage / update_token_counts are never called.

Meanwhile, the non-streaming path (translate_gemini_response) correctly reads usageMetadata:

usage_meta = resp.get("usageMetadata") or {}
usage = SimpleNamespace(
    prompt_tokens=int(usage_meta.get("promptTokenCount") or 0),
    completion_tokens=int(usage_meta.get("candidatesTokenCount") or 0),
    ...
)

Fix

In translate_stream_event(), extract usageMetadata from the event and attach it to the last chunk emitted (the one with finish_reason set). This follows the OpenAI streaming convention that run_agent.py already relies on.

def translate_stream_event(event: Dict[str, Any], model: str, tool_call_indices: Dict[str, Dict[str, Any]]) -> List[_GeminiStreamChunk]:
    candidates = event.get("candidates") or []
    if not candidates:
        return []
    # ... existing code ...

    finish_reason_raw = str(cand.get("finishReason") or "")
    if finish_reason_raw:
        mapped = "tool_calls" if tool_call_indices else _map_gemini_finish_reason(finish_reason_raw)
-       chunks.append(_make_stream_chunk(model=model, finish_reason=mapped))
+       finish_chunk = _make_stream_chunk(model=model, finish_reason=mapped)
+       # Attach usage from this event's usageMetadata so run_agent.py
+       # can record token counts (mirrors translate_gemini_response).
+       usage_meta = event.get("usageMetadata") or {}
+       if usage_meta:
+           finish_chunk.usage = SimpleNamespace(
+               prompt_tokens=int(usage_meta.get("promptTokenCount") or 0),
+               completion_tokens=int(usage_meta.get("candidatesTokenCount") or 0),
+               total_tokens=int(usage_meta.get("totalTokenCount") or 0),
+               prompt_tokens_details=SimpleNamespace(
+                   cached_tokens=int(usage_meta.get("cachedContentTokenCount") or 0),
+               ),
+           )
+       chunks.append(finish_chunk)
    return chunks

This is the minimal fix. Alternatively, usage could be emitted on every chunk (Gemini sends cumulative usageMetadata on each event), but the finish-chunk approach matches OpenAI semantics.

extent analysis

TL;DR

The most likely fix is to modify the translate_stream_event function in agent/gemini_native_adapter.py to extract usageMetadata from the event and attach it to the last chunk emitted.

Guidance

  • Verify that the usageMetadata is being sent in the SSE events from the Gemini native adapter.
  • Update the translate_stream_event function to extract usageMetadata and attach it to the last chunk emitted, as shown in the provided fix.
  • Test the updated function to ensure that token usage is being recorded correctly in state.db.
  • Consider alternative approaches, such as emitting usage on every chunk, but ensure that the chosen approach aligns with the OpenAI streaming convention.

Example

The provided fix code snippet demonstrates how to modify the translate_stream_event function to extract usageMetadata and attach it to the last chunk emitted:

def translate_stream_event(event: Dict[str, Any], model: str, tool_call_indices: Dict[str, Dict[str, Any]]) -> List[_GeminiStreamChunk]:
    # ...
    finish_chunk = _make_stream_chunk(model=model, finish_reason=mapped)
    usage_meta = event.get("usageMetadata") or {}
    if usage_meta:
        finish_chunk.usage = SimpleNamespace(
            prompt_tokens=int(usage_meta.get("promptTokenCount") or 0),
            completion_tokens=int(usage_meta.get("candidatesTokenCount") or 0),
            total_tokens=int(usage_meta.get("totalTokenCount") or 0),
            prompt_tokens_details=SimpleNamespace(
                cached_tokens=int(usage_meta.get("cachedContentTokenCount") or 0),
            ),
        )
    chunks.append(finish_chunk)
    return chunks

Notes

The provided fix assumes that the usageMetadata is being sent in the SSE events from the Gemini native adapter. If this is not the case, additional debugging may be required to determine why

Vote matrix · Quick signals

Works
Did the solution work? Tap to confirm.
Easy Fix
Was it a quick fix?
Time Saver
Did it save you time?
Blocking
Was it severely blocking?
Common Issue
Are others likely hitting this too?
Flaky / Intermittent
Is it intermittent?
Verified / Reproducible
Can you reproduce it reliably?
Loading…

Still need to ship something?

×6

Another batch ranked right after the header list — different links, same matching logic.

Back to top recommendations

TRENDING