llamaIndex - 💡(How to fix) Fix [Bug]: ElasticsearchStore sync methods use run_until_complete — breaks in ASGI with pooled clients [1 participants]

Official PRs (…)
ON THIS PAGE

Recommended Tools

×6

Utilities matched from this issue’s tags and category — try them while you read without losing context.

GitHub issue graph ai analysis

Paste a GitHub issue URL. We fetch that issue, discover linked issues from bodies/comments/timeline, collect linked pull requests, and produce a structured English report.

The report is written in English Markdown for sharing and archival.

Helpful · Quick feedback

Loading…
GitHub stats
run-llama/llama_index#21324Fetched 2026-04-08 03:01:09
View on GitHub
Comments
0
Participants
1
Timeline
1
Reactions
0
Participants
Timeline (top)
closed ×1

Error Message

RuntimeError: Task <Task pending name='Task-3958' coro=<AsyncVectorStore.close()>> got Future <Task pending> attached to a different loop

Fix Action

Fix / Workaround

We ended up writing our own store with this pattern as a workaround. Happy to contribute a PR if there's interest.

Code Example

RuntimeError: Task <Task pending name='Task-3958' coro=<AsyncVectorStore.close()>>
got Future <Task pending> attached to a different loop

---

# llama_index/vector_stores/opensearch/base.py:80-81
os_client: Optional[OSClient] = None,          # sync
os_async_client: Optional[OSClient] = None,    # async

---

# line 844-890 (OpensearchVectorClient.query)
res = self._os_client.search(index=self._index, body=search_query, params=params)

---

# line 892-938 (OpensearchVectorClient.aquery)
res = await self._os_async_client.search(index=self._index, body=search_query, params=params)

---

File "llama_index/vector_stores/elasticsearch/base.py", line 286, in close
    return asyncio.get_event_loop().run_until_complete(self._store.close())
File "elasticsearch/helpers/vectorstore/_async/vectorstore.py", line 102, in close
    return await self.client.close()
File "aiohttp/client.py", line 1349, in close
    await self._connector.close()
RuntimeError: Task got Future attached to a different loop
RAW_BUFFERClick to expand / collapse

Bug Description

ElasticsearchStore only accepts AsyncElasticsearch and bridges all sync methods via asyncio.get_event_loop().run_until_complete(). This causes two problems in ASGI environments (Gunicorn + Uvicorn):

1. Can't pool the async client across requests. Pooling avoids per-request aiohttp.ClientSession creation (which causes memory issues — see https://github.com/run-llama/workflows-py/issues/485). But a pooled AsyncElasticsearch client is bound to one event loop. When a sync method calls run_until_complete() from a different thread, it fails:

RuntimeError: Task <Task pending name='Task-3958' coro=<AsyncVectorStore.close()>>
got Future <Task pending> attached to a different loop

2. close() fails in ASGI. ElasticsearchStore.close() calls run_until_complete(self._store.close()) (line 286). In ASGI, this is called from a sync thread (e.g. VectorStoreService.__exit__() via sync_to_async), hitting the same "different loop" error.

Both problems stem from the same design: there is no sync code path. Every sync method delegates to async via run_until_complete, requiring nest_asyncio.apply() (line 225) and tying the store to one event loop.

OpensearchVectorStore doesn't have this problem

OpensearchVectorClient in the same repo holds separate sync and async clients:

# llama_index/vector_stores/opensearch/base.py:80-81
os_client: Optional[OSClient] = None,          # sync
os_async_client: Optional[OSClient] = None,    # async

Sync methods use the sync client directly:

# line 844-890 (OpensearchVectorClient.query)
res = self._os_client.search(index=self._index, body=search_query, params=params)

Async methods use the async client:

# line 892-938 (OpensearchVectorClient.aquery)
res = await self._os_async_client.search(index=self._index, body=search_query, params=params)

No run_until_complete, no nest_asyncio, no event loop conflicts.

Suggestion

Apply the same sync/async separation to ElasticsearchStore:

  • Accept both Elasticsearch (sync) and AsyncElasticsearch (async) clients
  • query(), add(), delete(), close() → use sync client directly
  • aquery(), async_add(), adelete() → use async client directly
  • Remove nest_asyncio.apply() and run_until_complete bridges

We ended up writing our own store with this pattern as a workaround. Happy to contribute a PR if there's interest.

Version

llama-index-vector-stores-elasticsearch 0.5.1

Steps to Reproduce

  1. Deploy an ASGI application (Gunicorn + Uvicorn)
  2. Pool AsyncElasticsearch clients across requests (to avoid per-request aiohttp session creation)
  3. Call ElasticsearchStore.close() or any sync method from a sync thread

Relevant Logs/Tracebacks

File "llama_index/vector_stores/elasticsearch/base.py", line 286, in close
    return asyncio.get_event_loop().run_until_complete(self._store.close())
File "elasticsearch/helpers/vectorstore/_async/vectorstore.py", line 102, in close
    return await self.client.close()
File "aiohttp/client.py", line 1349, in close
    await self._connector.close()
RuntimeError: Task got Future attached to a different loop

extent analysis

TL;DR

Apply the same sync/async separation to ElasticsearchStore as in OpensearchVectorStore to resolve event loop conflicts.

Guidance

  • Accept both Elasticsearch (sync) and AsyncElasticsearch (async) clients in ElasticsearchStore to handle sync and async methods separately.
  • Use the sync client directly for sync methods like query(), add(), delete(), and close().
  • Use the async client directly for async methods like aquery(), async_add(), and adelete().
  • Remove nest_asyncio.apply() and run_until_complete bridges to avoid event loop conflicts.

Example

class ElasticsearchStore:
    def __init__(self, es_client: Optional[Elasticsearch] = None, es_async_client: Optional[AsyncElasticsearch] = None):
        self._es_client = es_client
        self._es_async_client = es_async_client

    def query(self, index, body, params):
        return self._es_client.search(index=index, body=body, params=params)

    async def aquery(self, index, body, params):
        return await self._es_async_client.search(index=index, body=body, params=params)

Notes

This solution assumes that the Elasticsearch and AsyncElasticsearch clients are properly configured and handle their respective sync and async operations correctly.

Recommendation

Apply the workaround by separating sync and async clients in ElasticsearchStore, as this approach has been successfully implemented in OpensearchVectorStore.

Vote matrix · Quick signals

Works
Did the solution work? Tap to confirm.
Easy Fix
Was it a quick fix?
Time Saver
Did it save you time?
Blocking
Was it severely blocking?
Common Issue
Are others likely hitting this too?
Flaky / Intermittent
Is it intermittent?
Verified / Reproducible
Can you reproduce it reliably?
Loading…

Still need to ship something?

×6

Another batch ranked right after the header list — different links, same matching logic.

Back to top recommendations

TRENDING

llamaIndex - 💡(How to fix) Fix [Bug]: ElasticsearchStore sync methods use run_until_complete — breaks in ASGI with pooled clients [1 participants]