llamaIndex - ✅(Solved) Fix [Bug]: IngestionPipeline with `num_workers > 1` silently loses all cache writes [1 pull requests, 2 comments, 3 participants]

Official PRs (…)
ON THIS PAGE

Recommended Tools

×6

Utilities matched from this issue’s tags and category — try them while you read without losing context.

GitHub issue graph ai analysis

Paste a GitHub issue URL. We fetch that issue, discover linked issues from bodies/comments/timeline, collect linked pull requests, and produce a structured English report.

The report is written in English Markdown for sharing and archival.

Helpful · Quick feedback

Loading…
GitHub stats
run-llama/llama_index#21300Fetched 2026-04-08 02:44:03
View on GitHub
Comments
2
Participants
3
Timeline
5
Reactions
0
Timeline (top)
commented ×2labeled ×2cross-referenced ×1

Root Cause

Root cause is that in the multi-worker path, self.cache is passed to worker processes and run_transformations() writes cache entries there, but only the transformed nodes are returned to the parent process. With the default IngestionCache(SimpleKVStore()), this means each worker mutates its own in-memory copy, and those writes are lost when the worker exits because no cache state is merged back into the parent.

Fix Action

Fixed

PR fix notes

PR #21301: Preserve cache writes from multiprocessing workers in IngestionPipeline

Description (problem / solution / changelog)

Description

After a multi-worker run, cache entries written by workers are merged back into the parent pipeline, so subsequent runs can reuse cached transformation results and pipeline.persist() includes entries created during multiprocessing runs.

Fixes #21300

New Package?

Did I fill in the tool.llamahub section in the pyproject.toml and provide a detailed README.md for my new integration or package?

  • Yes
  • No

Version Bump?

Did I bump the version in the pyproject.toml file of the package I am updating? (Except for the llama-index-core package)

  • Yes
  • No

Type of Change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)

How Has This Been Tested?

Your pull-request will likely not be merged unless it is covered by some form of impactful unit testing.

  • I added new unit tests to cover this change
  • I believe this change is already covered by existing unit tests

Suggested Checklist:

  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have added Google Colab support for the newly added notebooks.
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes
  • I ran uv run make format; uv run make lint to appease the lint gods

Changed files

  • llama-index-core/llama_index/core/ingestion/pipeline.py (modified, +76/-7)
  • llama-index-core/tests/ingestion/test_pipeline.py (modified, +78/-0)

Code Example

### Relevant Logs/Tracbacks
RAW_BUFFERClick to expand / collapse

Bug Description

When num_workers > 1, cache entries written during a multi-worker pipeline.run() are not merged back into the parent IngestionPipeline cache. As a result, subsequent multi-worker runs may re-execute transformations that were expected to be cached, including expensive steps such as embedding, and pipeline.persist() does not include cache entries created only inside worker processes.

Root cause is that in the multi-worker path, self.cache is passed to worker processes and run_transformations() writes cache entries there, but only the transformed nodes are returned to the parent process. With the default IngestionCache(SimpleKVStore()), this means each worker mutates its own in-memory copy, and those writes are lost when the worker exits because no cache state is merged back into the parent.

Version

0.14.19

Steps to Reproduce


from llama_index.core import Document
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter

def cache_size(pipeline):
    return len(pipeline.cache.cache.get_all(collection=pipeline.cache.collection))

if __name__ == "__main__":
    docs = [Document(text=f"Sample document {i}. " * 30) for i in range(8)]

    # sequential path
    pipeline_single = IngestionPipeline(transformations=[SentenceSplitter()])
    pipeline_single.run(documents=docs)
    print("sequential cache after first run:", cache_size(pipeline_single))
    pipeline_single.run(documents=docs)
    print("sequential cache after second run:", cache_size(pipeline_single))

    # multi-worker path
    pipeline_multi = IngestionPipeline(transformations=[SentenceSplitter()])
    pipeline_multi.run(documents=docs, num_workers=2)
    print("multi-worker cache after first run:", cache_size(pipeline_multi))
    pipeline_multi.run(documents=docs, num_workers=2)
    print("multi-worker cache after second run:", cache_size(pipeline_multi))

Relevant Logs/Tracbacks

sequential cache after first run: 1
sequential cache after second run: 1
WARNING:root:Removing unpickleable private attribute _chunking_tokenizer_fn
WARNING:root:Removing unpickleable private attribute _split_fns
WARNING:root:Removing unpickleable private attribute _sub_sentence_split_fns
WARNING:root:Removing unpickleable private attribute _chunking_tokenizer_fn
WARNING:root:Removing unpickleable private attribute _split_fns
WARNING:root:Removing unpickleable private attribute _sub_sentence_split_fns
multi-worker cache after first run: 0
WARNING:root:Removing unpickleable private attribute _chunking_tokenizer_fn
WARNING:root:Removing unpickleable private attribute _split_fns
WARNING:root:Removing unpickleable private attribute _sub_sentence_split_fns
WARNING:root:Removing unpickleable private attribute _chunking_tokenizer_fn
WARNING:root:Removing unpickleable private attribute _split_fns
WARNING:root:Removing unpickleable private attribute _sub_sentence_split_fns
multi-worker cache after second run: 0

extent analysis

TL;DR

  • Implementing a cache merge mechanism after multi-worker pipeline.run() calls is likely necessary to fix the issue.

Guidance

  • Identify the cache implementation used by IngestionPipeline and determine if it supports merging cache entries from multiple worker processes.
  • Modify the run_transformations() method to return not only the transformed nodes but also the cache entries written during the transformation process, allowing the parent process to merge these entries into its cache.
  • Consider using a cache store that supports shared access and synchronization across multiple worker processes, such as a distributed cache or a database-based cache.
  • After implementing the cache merge mechanism, verify its correctness by checking the cache size after multiple runs of the pipeline.run() method with num_workers > 1.

Example

# Pseudo-code example of how cache merge could be implemented
def run_transformations(self, documents, num_workers):
    # ...
    cache_entries = []
    transformed_nodes = []
    for worker_result in worker_results:
        transformed_nodes.extend(worker_result['transformed_nodes'])
        cache_entries.extend(worker_result['cache_entries'])
    # Merge cache entries into the parent cache
    self.cache.cache.put_all(cache_entries, collection=self.cache.collection)
    return transformed_nodes

Notes

  • The provided example code and logs suggest that the issue is related to the cache implementation and the way it handles multi-worker scenarios.
  • Without more information about the IngestionCache and SimpleKVStore implementations, it's difficult to provide a more specific solution.

Recommendation

  • Apply workaround: Implement a custom cache merge mechanism to ensure that cache entries written by worker processes are properly merged into the parent cache. This will likely require modifications to the run_transformations() method and the cache implementation used by IngestionPipeline.

Vote matrix · Quick signals

Works
Did the solution work? Tap to confirm.
Easy Fix
Was it a quick fix?
Time Saver
Did it save you time?
Blocking
Was it severely blocking?
Common Issue
Are others likely hitting this too?
Flaky / Intermittent
Is it intermittent?
Verified / Reproducible
Can you reproduce it reliably?
Loading…

Still need to ship something?

×6

Another batch ranked right after the header list — different links, same matching logic.

Back to top recommendations

TRENDING