langchain - ✅(Solved) Fix feat(openai): concurrent batch API calls in async embedding methods [1 pull requests, 4 comments, 2 participants]

Official PRs (…)
ON THIS PAGE

Recommended Tools

×6

Utilities matched from this issue’s tags and category — try them while you read without losing context.

GitHub issue graph ai analysis

Paste a GitHub issue URL. We fetch that issue, discover linked issues from bodies/comments/timeline, collect linked pull requests, and produce a structured English report.

The report is written in English Markdown for sharing and archival.

Helpful · Quick feedback

Loading…
GitHub stats
langchain-ai/langchain#36547Fetched 2026-04-08 02:52:02
View on GitHub
Comments
4
Participants
2
Timeline
9
Reactions
0
Author
Timeline (top)
commented ×4labeled ×3cross-referenced ×1issue_type_added ×1

Fix Action

Fixed

PR fix notes

PR #36551: feat(openai): enhance async embedding with concurrent batch processing

Description (problem / solution / changelog)

Fixes #36547

Replaces sequential await loops with asyncio.gather in OpenAIEmbeddings async methods to fire all batch embedding API calls concurrently, delivering near-linear speedup for large document ingestion.

  1. PR title: Should follow the format: TYPE(SCOPE): feat(openai): concurrent batch API calls in async embedding methods

  2. PR description:

Fixes #36547

Replaces sequential await loops with asyncio.gather in OpenAIEmbeddings._aget_len_safe_embeddings and the aembed_documents fast path to fire all batch embedding API calls concurrently, delivering near-linear speedup for large document ingestion. No breaking changes. Only libs/partners/openai is touched.

This PR was developed with assistance from an AI coding agent (Claude Code).

  1. Run make format, make lint and make test from the root of the package(s) you've modified.
  • We will not consider a PR unless these three are passing in CI.
  1. How did you verify your code works?

make format, make lint, make test all pass (320 tests). Added 3 new async unit tests verifying concurrent batch execution, single-batch edge case, and the check_embedding_ctx_length=False fast path. asyncio.gather preserves input order by contract, verified by asserting embeddings match original batch positions.

Social handles (optional)

<!-- If you'd like a shoutout on release, add your socials below -->

Twitter: @ LinkedIn: https://www.linkedin.com/in/sharad-mishra-1568b566/

Changed files

  • libs/partners/openai/langchain_openai/embeddings/base.py (modified, +29/-14)
  • libs/partners/openai/tests/unit_tests/embeddings/test_base.py (modified, +84/-1)
  • libs/partners/openai/uv.lock (modified, +1/-1)

Code Example

from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import OpenAIEmbeddings

store = InMemoryVectorStore(OpenAIEmbeddings())
await store.aadd_documents(documents)  # 5000+ documents


### Proposed Solution

### Proposed Solution

Replace sequential `await` loops with `asyncio.gather` in two methods of 
`OpenAIEmbeddings` (`libs/partners/openai/langchain_openai/embeddings/base.py`):

**1. `_aget_len_safe_embeddings`** (main async path)
**2. `aembed_documents` fast path** (when `check_embedding_ctx_length=False`)

Both currently do:
RAW_BUFFERClick to expand / collapse

Checked other resources

  • This is a feature request, not a bug report or usage question.
  • I added a clear and descriptive title that summarizes the feature request.
  • I used the GitHub search to find a similar feature request and didn't find it.
  • I checked the LangChain documentation and API reference to see if this feature already exists.
  • This is not related to the langchain-community package.

Package (Required)

  • langchain
  • langchain-openai
  • langchain-anthropic
  • langchain-classic
  • langchain-core
  • langchain-model-profiles
  • langchain-tests
  • langchain-text-splitters
  • langchain-chroma
  • langchain-deepseek
  • langchain-exa
  • langchain-fireworks
  • langchain-groq
  • langchain-huggingface
  • langchain-mistralai
  • langchain-nomic
  • langchain-ollama
  • langchain-openrouter
  • langchain-perplexity
  • langchain-qdrant
  • langchain-xai
  • Other / not sure / general

Feature Description

Description (suggested):

Feature

OpenAIEmbeddings._aget_len_safe_embeddings and the aembed_documents fast path currently process batch API calls sequentially — each batch awaits the previous one. For large document sets (e.g., 5000 docs with chunk_size=1000), this means 5 serial HTTP round-trips.

Proposal

Replace the sequential while/await loops with asyncio.gather to fire all batch API calls concurrently. This follows the same pattern already used by MistralAIEmbeddings.aembed_documents.

The change is limited to libs/partners/openai/langchain_openai/embeddings/base.py. No changes to the Embeddings ABC or any vector store. Every async consumer benefits automatically.

Motivation

Near-linear speedup for large document ingestion via aadd_documents on any vector store using OpenAI embeddings.

Use Case

Use Case

RAG document ingestion at scale

When building a RAG (Retrieval-Augmented Generation) pipeline, a common step is embedding thousands of documents into a vector store:

from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import OpenAIEmbeddings

store = InMemoryVectorStore(OpenAIEmbeddings())
await store.aadd_documents(documents)  # 5000+ documents


### Proposed Solution

### Proposed Solution

Replace sequential `await` loops with `asyncio.gather` in two methods of 
`OpenAIEmbeddings` (`libs/partners/openai/langchain_openai/embeddings/base.py`):

**1. `_aget_len_safe_embeddings`** (main async path)
**2. `aembed_documents` fast path** (when `check_embedding_ctx_length=False`)

Both currently do:
```python
# Sequential — each batch waits for the previous
while i < len(tokens):
    response = await self.async_client.create(input=batch, ...)
    i = batch_end
Change to:


# Pre-compute batch boundaries (no I/O dependency between batches)
batch_ranges = [...]

# Fire all API calls concurrently, results stay in order
batch_results = await asyncio.gather(
    *[_embed_batch(s, e) for s, e in batch_ranges]
)

Why this approach:

asyncio.gather preserves input order — no result reordering needed
Fix lives in the embedding provider (not the vector store), so every consumer (InMemoryVectorStore, Chroma, Pinecone, etc.) benefits automatically
No changes to the Embeddings ABC or any vector store code
No new parameters or breaking changes
Matches the existing pattern in MistralAIEmbeddings.aembed_documents
Sync methods (embed_documents) are left unchanged — no threading risk
OpenAI SDK's built-in retry/backoff handles rate limiting naturally
Scope: Only libs/partners/openai is touched (1 source file + tests).

### Alternatives Considered

### Alternatives Considered

**1. Multithreading with `ThreadPoolExecutor` in sync `embed_documents`**
Rejected — threads are heavyweight (~8MB stack each), require GIL 
contention management, and add thread-safety risks for the HTTP client. 
The async path with `asyncio.gather` achieves the same concurrency at 
near-zero overhead (~240 bytes per coroutine).

**2. Concurrency at the vector store layer (`InMemoryVectorStore.aadd_documents`)**
Rejected — the vector store doesn't know the provider's optimal batch size, 
token limits, or API constraints. Splitting texts at this layer would create 
redundant nested batching (vector store batches → provider batches internally). 
Wrong abstraction layer.

**3. Adding a concurrent method to the `Embeddings` ABC**
Rejected — would force every embedding provider to implement batching/
concurrency logic, even providers where it doesn't apply (local models, 
single-request APIs). The ABC should stay minimal.

**4. `asyncio.as_completed` instead of `asyncio.gather`**
Rejected — returns results in completion order, not submission order. Would 
require extra bookkeeping to maintain the positional mapping that 
`InMemoryVectorStore.aadd_documents` relies on (it zips vectors with 
documents by index). `asyncio.gather` preserves order by contract.

**5. `gather_with_concurrency` with semaphore (bounded concurrency)**
Considered but deferred — `langchain-core` has this utility at 
`runnables/utils.py`, but unbounded `asyncio.gather` matches the existing 
MistralAI precedent and the OpenAI SDK already handles rate limiting via 
built-in retry/backoff. A `max_concurrency` parameter can be added later 
if needed without breaking changes.


### Additional Context

### Additional Context

**Existing precedent in this repo:**
MistralAI embeddings already implement this exact pattern — concurrent 
batch API calls via `asyncio.gather`:
https://github.com/langchain-ai/langchain/blob/master/libs/partners/mistralai/langchain_mistralai/embeddings.py#L312-L314

```python
# MistralAI's existing implementation
batch_responses = await asyncio.gather(
    *[_aembed_batch(batch) for batch in self._get_batches(texts)]
)

extent analysis

TL;DR

Replace sequential await loops with asyncio.gather in OpenAIEmbeddings methods _aget_len_safe_embeddings and aembed_documents to enable concurrent batch API calls.

Guidance

  • Identify the methods _aget_len_safe_embeddings and aembed_documents in libs/partners/openai/langchain_openai/embeddings/base.py that need to be modified.
  • Replace the sequential while/await loops with asyncio.gather to fire all batch API calls concurrently, preserving the input order of results.
  • Pre-compute batch boundaries before using asyncio.gather to ensure no I/O dependency between batches.
  • Verify that the change improves performance for large document ingestion via aadd_documents on any vector store using OpenAI embeddings.

Example

# Replace sequential await loops with asyncio.gather
batch_ranges = [...]  # pre-compute batch boundaries
batch_results = await asyncio.gather(
    *[_embed_batch(s, e) for s, e in batch_ranges]
)

Notes

The proposed solution only touches the libs/partners/openai package and does not introduce any breaking changes. The asyncio.gather approach matches the existing pattern in MistralAIEmbeddings.aembed_documents and preserves the input order of results.

Recommendation

Apply the workaround by replacing sequential await loops with asyncio.gather in the specified methods, as it provides a near-linear speedup for large document ingestion without introducing breaking changes.

Vote matrix · Quick signals

Works
Did the solution work? Tap to confirm.
Easy Fix
Was it a quick fix?
Time Saver
Did it save you time?
Blocking
Was it severely blocking?
Common Issue
Are others likely hitting this too?
Flaky / Intermittent
Is it intermittent?
Verified / Reproducible
Can you reproduce it reliably?
Loading…

Still need to ship something?

×6

Another batch ranked right after the header list — different links, same matching logic.

Back to top recommendations

TRENDING