langchain - ✅(Solved) Fix Ability to identify and selectively stream only `AIMessage` tokens of the final answer to user's request in `updates` stream mode. [1 pull requests, 1 comments, 2 participants]

Official PRs (…)
ON THIS PAGE

Recommended Tools

×6

Utilities matched from this issue’s tags and category — try them while you read without losing context.

GitHub issue graph ai analysis

Paste a GitHub issue URL. We fetch that issue, discover linked issues from bodies/comments/timeline, collect linked pull requests, and produce a structured English report.

The report is written in English Markdown for sharing and archival.

Helpful · Quick feedback

Loading…
GitHub stats
langchain-ai/langchain#35276Fetched 2026-04-08 00:26:51
View on GitHub
Comments
1
Participants
2
Timeline
9
Reactions
0
Author
Participants
Timeline (top)
labeled ×5closed ×1commented ×1cross-referenced ×1

Fix Action

Fixed

PR fix notes

PR #35394: feat(core): add streaming token classification metadata

Description (problem / solution / changelog)

Summary

Adds minimal, stable metadata to on_llm_new_token callback kwargs so downstream consumers can filter streamed tokens:

  • lc_token_type: "content" | "tool_call" | "other"
  • lc_is_assistant_content: bool
  • lc_is_tool_call: bool

Updates create_agent model invocations to include stable attribution tags in callback tags:

  • lc:agent, lc:agent_node:model, and lc:agent_name:<name> (when provided)

Related to: #35276

Technical changes

  • Metadata is derived strictly from the streamed ChatGenerationChunk / AIMessageChunk (checks tool_call_chunks).
  • Agent tagging is additive and only affects model-node calls inside create_agent.

Example usage

You can now filter on_llm_new_token events to stream only assistant content tokens (and ignore tool-call deltas) while running an agent with astream(..., stream_mode="updates").

from typing import Any
from langchain_core.callbacks import AsyncCallbackHandler

class TokenQueueStreamingHandler(AsyncCallbackHandler):
    def __init__(self, queue):
        self.queue = queue

    async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        if not token:
            return

        # Only stream assistant content tokens (ignore tool-call deltas)
        if kwargs.get("lc_is_assistant_content") is True:
            # Optional: only accept tokens emitted by the agent model node
            if "lc:agent_node:model" in (kwargs.get("tags") or []):
                await self.queue.put(token)

Tests added

  • langchain-core: unit test asserts the streaming callback receives the new metadata and differentiates tool-call vs content chunks.
  • langchain_v1: unit test asserts create_agent passes the expected stable tags into the model call config.

Changed files

  • libs/core/langchain_core/language_models/chat_models.py (modified, +55/-8)
  • libs/core/tests/unit_tests/language_models/chat_models/test_base.py (modified, +70/-0)
  • libs/langchain_v1/langchain/agents/factory.py (modified, +19/-2)
  • libs/langchain_v1/tests/unit_tests/agents/test_streaming_final_only_tokens.py (added, +53/-0)

Code Example

async for stream_mode, data in self._agent.astream( # The output of each step in the graph. The output shape depends on the stream_mode.
    {"messages": [{"role": "user", "content": message}]},
    stream_mode = modes, # Use this to stream all values in the state after each step.
    config = config, # This is needed by Checkpointer
    subgraphs = subgraphs
):
    for source, update in data.items():
        if source == "model":
            await output_queue.put(update)

---

class TokenQueueStreamingHandler(AsyncCallbackHandler):
    """LangChain callback handler for streaming LLM tokens to an asyncio queue."""
    def __init__(self, queue: Queue):
        self.queue = queue

    async def on_llm_new_token(self, token: str, **kwargs) -> None:
        if token:
            await self.queue.put(token)

---

# kwargs: {'chunk': ChatGenerationChunk(text='We', message=AIMessageChunk(content='We', additional_kwargs={}, response_metadata={}, tool_calls=[], invalid_tool_calls=[], tool_call_chunks=[])), 
#          'run_id': UUID('019c028e-19a4-7213-92d0-26535292aaca'), 
#          'parent_run_id': UUID('019c028e-1983-7f42-8648-8f02f450018c'), 
#          'tags': ['seq:step:1'], 
#          'verbose': False}
RAW_BUFFERClick to expand / collapse

Checked other resources

  • This is a feature request, not a bug report or usage question.
  • I added a clear and descriptive title that summarizes the feature request.
  • I used the GitHub search to find a similar feature request and didn't find it.
  • I checked the LangChain documentation and API reference to see if this feature already exists.
  • This is not related to the langchain-community package.

Package (Required)

  • langchain
  • langchain-openai
  • langchain-anthropic
  • langchain-classic
  • langchain-core
  • langchain-model-profiles
  • langchain-tests
  • langchain-text-splitters
  • langchain-chroma
  • langchain-deepseek
  • langchain-exa
  • langchain-fireworks
  • langchain-groq
  • langchain-huggingface
  • langchain-mistralai
  • langchain-nomic
  • langchain-ollama
  • langchain-perplexity
  • langchain-qdrant
  • langchain-xai
  • Other / not sure / general

Feature Description

When using astream with updates streaming mode, I would like to be able to identify the final answer / response from the system to stream to the client of the agent streaming API endpoint.

Code:

async for stream_mode, data in self._agent.astream( # The output of each step in the graph. The output shape depends on the stream_mode.
    {"messages": [{"role": "user", "content": message}]},
    stream_mode = modes, # Use this to stream all values in the state after each step.
    config = config, # This is needed by Checkpointer
    subgraphs = subgraphs
):
    for source, update in data.items():
        if source == "model":
            await output_queue.put(update)

I use RunnableConfig["callbacks"] to stream tokens:

class TokenQueueStreamingHandler(AsyncCallbackHandler):
    """LangChain callback handler for streaming LLM tokens to an asyncio queue."""
    def __init__(self, queue: Queue):
        self.queue = queue

    async def on_llm_new_token(self, token: str, **kwargs) -> None:
        if token:
            await self.queue.put(token)

The kwargs has the following structure and value:

# kwargs: {'chunk': ChatGenerationChunk(text='We', message=AIMessageChunk(content='We', additional_kwargs={}, response_metadata={}, tool_calls=[], invalid_tool_calls=[], tool_call_chunks=[])), 
#          'run_id': UUID('019c028e-19a4-7213-92d0-26535292aaca'), 
#          'parent_run_id': UUID('019c028e-1983-7f42-8648-8f02f450018c'), 
#          'tags': ['seq:step:1'], 
#          'verbose': False}

When I use the values streaming mode, I can save the messages into a list/array and then at the end of the async, I return the last message which is the final answer to user's query. However, streaming the LLM tokens is a challenge for this use case as there is no way to tell the context of the tokens spit out by the LLM and which LLM.

Observations:

  1. additional_kwargs={}, response_metadata={}, tool_calls=[], invalid_tool_calls=[], tool_call_chunks=[] these values are not populated in the kwargs at all.
  2. tags': ['seq:step:1'] always carries the same value.

Use Case

Use RunnableConfig["callbacks"] to selectively stream tokens only of the final AIMessage to the user request.

Proposed Solution

For inspiration, ollama chat has a thinking field in the stream which is used to show or hide reasoning traces before the final answer arrives. https://docs.ollama.com/capabilities/streaming

Alternatives Considered

No response

Additional Context

No response

extent analysis

Fix Plan

1. Introduce a final_answer field in the kwargs

Add a final_answer field to the kwargs dictionary that is populated when the final answer is generated by the LLM. This field will be used to identify the final answer in the stream.

class TokenQueueStreamingHandler(AsyncCallbackHandler):
    """LangChain callback handler for streaming LLM tokens to an asyncio queue."""
    def __init__(self, queue: Queue):
        self.queue = queue

    async def on_llm_new_token(self, token: str, **kwargs) -> None:
        if token:
            if 'final_answer' in kwargs and kwargs['final_answer']:
                await self.queue.put(token)

2. Populate the final_answer field in the kwargs

In the astream method, add a check to populate the final_answer field when the final answer is generated by the LLM.

async for stream_mode, data in self._agent.astream(
    {"messages": [{"role": "user", "content": message}]},
    stream_mode = modes, 
    config = config, 
    subgraphs = subgraphs
):
    for source, update in data.items():
        if source == "model":
            if update['final_answer']:
                kwargs['final_answer'] = update['final_answer']
            await output_queue.put(update)

3. Modify the TokenQueueStreamingHandler to stream the final answer

Modify the TokenQueueStreamingHandler to stream the final answer when the final_answer field is populated in the kwargs.

class TokenQueueStreamingHandler(AsyncCallbackHandler):
    """LangChain callback handler for streaming LLM tokens to an asyncio queue."""
    def __init__(self, queue: Queue):
        self.queue = queue

    async def on_llm_new_token(self, token

Vote matrix · Quick signals

Works
Did the solution work? Tap to confirm.
Easy Fix
Was it a quick fix?
Time Saver
Did it save you time?
Blocking
Was it severely blocking?
Common Issue
Are others likely hitting this too?
Flaky / Intermittent
Is it intermittent?
Verified / Reproducible
Can you reproduce it reliably?
Loading…

Still need to ship something?

×6

Another batch ranked right after the header list — different links, same matching logic.

Back to top recommendations

TRENDING