GH-39808: [C++][Parquet] Evict pre-buffered row-group bytes after decode#49855
GH-39808: [C++][Parquet] Evict pre-buffered row-group bytes after decode#49855justinli500 wants to merge 2 commits into
Conversation
…er decode Dataset.to_batches() on parquet files accumulates memory as iteration proceeds because ReadRangeCache has no eviction API. PreBuffer() is called once with every row group up front, entries stay resident until the FileReader is destroyed, and users see roughly 10x more memory than the equivalent ParquetFile.iter_batches() path. This is one of the longest-standing open issues on the tracker. Add a new ReadRangeCache::EvictEntriesInRange(start, length) method that removes cache entries fully contained in the given window. Entries that span past the window (for example, because range coalescing merged them with an adjacent row group's column chunk) are deliberately left in place, so eviction is safe in the presence of coalescing. Expose the primitive through ParquetFileReader::EvictPreBufferedData and call it from the Arrow RowGroupGenerator's .Then callback once a row group has been decoded into Arrow arrays. At that point the raw column-chunk bytes held by the cache are no longer needed, and releasing them gives each row group a bounded per-row-group memory footprint. Thread safety: promote the existing mutex from LazyImpl into base Impl so that Cache, Read, Wait, WaitFor, and EvictEntriesInRange all acquire it before touching the entries vector. Concurrent Read from one thread and Evict from another was previously undefined behaviour in the non-lazy cache, and the dataset scanner's batch_readahead path is exactly the concurrent call pattern that would trigger it. Read now drops the lock before blocking on the I/O future, so the new locking does not serialize readers more tightly than before. Measured on a 458 MB / 10-row-group / 10M-row test file: Dataset.to_batches, before fix: 598 MB peak Dataset.to_batches, after fix: 331 MB peak (-267 MB) ParquetFile.iter_batches (no pre-buffer): 59 MB peak Savings scale linearly with row-group count, so on the multi-GB files from the issue thread this single fix recovers several GB of peak allocation. The remaining gap between Dataset.to_batches and iter_batches comes from a second source of accumulation in the Dataset infrastructure that is unrelated to the ReadRangeCache and should be tracked as a follow-up issue. New tests: * RangeReadCache.EvictEntriesInRange * RangeReadCache.EvictEntriesInRangeSpanningEntry * RangeReadCache.ConcurrentReadAndEvict * TestArrowReadWrite.EvictPreBufferedData * TestArrowReadWrite.GetRecordBatchGeneratorReleasesPreBufferedRowGroups Full regression sweep: 824/824 parquet-arrow-reader-writer-test, 57/57 arrow-io-memory-test.
|
|
Reformat the pre-buffer eviction changes with clang-format 18.1.8 to satisfy the CI lint job. Whitespace and line-wrapping only; no behavior change.
|
Just pushed a |
|
@wgtmac @pitrou This is a huge issue for Ray users, and we're trying to decide if we need to move away from PyArrow for our Parquet reading implementation Will you guys be unable to prioritize this PR? I know maintaining a project is a lot of work, so I'd understand if you can't take a look immediately |
wgtmac
left a comment
There was a problem hiding this comment.
Thanks for the fix! I have two concerns with this patch.
First, eviction does not seem to account for coalesced cache entries that span row groups. See my inline comment.
Second, many of the comments feel too verbose and restate what the code is already doing. I’d suggest trimming them down to only the invariants or non-obvious reasoning.
| if (!cached_source_) { | ||
| return; | ||
| } | ||
| for (int row : row_groups) { |
There was a problem hiding this comment.
Eviction is done one row group at a time, but cache entries are only removed if they are fully contained in that row group’s byte window. With default coalescing, a single cache entry can span adjacent row groups, so evicting row group 0 leaves the entry because it extends past the window, and evicting row group 1 also leaves it because the entry starts before the window. That entry is then never released, so the memory growth this PR is meant to fix can still occur for small or adjacent row groups.
Rationale for this change
Dataset.to_batches()accumulates memory becauseReadRangeCachehas no eviction API.
PreBuffer()is called with every row group upfront and entries stay resident until the
FileReaderis destroyed,so users see ~10x more peak memory than
ParquetFile.iter_batches().Issue #39808 has been open for over a year; downstream projects have
worked around it by disabling
pre_buffer(Ray) or dropping thedataset API (Marin), both of which give up features or throughput.
What changes are included in this PR?
ReadRangeCache::EvictEntriesInRange(start, length). Removesentries fully contained in the window; leaves coalesced entries
alone so eviction is safe under range coalescing.
ParquetFileReader::EvictPreBufferedData(row_groups, column_indices)and call it from
RowGroupGenerator::ReadOneRowGroupafter the rowgroup has been decoded into Arrow arrays.
LazyImplmutex into baseImplsoconcurrent
ReadandEvictacross row groups is definedbehaviour on both cache variants.
Performance
Measured on a 458 MB / 10 row group / 10M row parquet file
(6 columns: 3 float64, 2 int64, 1 large_string; Snappy; macOS arm64;
Release build). Fix toggled via a one-line A/B test:
total_allocated_bytesDataset.to_batches, fix disabledDataset.to_batches, fix enabledDataset.to_batches,pre_buffer=FalseParquetFile.iter_batchesxychart-beta title "Peak allocated memory (458 MB / 10 row groups, lower is better)" x-axis ["without fix", "with fix", "no prebuffer", "iter_batches"] y-axis "MB" 0 --> 650 bar [598, 331, 151, 59]Per-row-group progression during iteration
(
max(total_allocated_bytes)in MB, sampled every 10k of 100k batches):xychart-beta title "max_allocated over iteration (top line: without fix; bottom: with fix)" x-axis "batches consumed (thousands)" 10 --> 100 y-axis "MB" 0 --> 650 line [159, 232, 294, 323, 386, 415, 477, 507, 569, 598] line [129, 202, 205, 234, 237, 266, 270, 299, 302, 331]Savings scale linearly with row-group count, so on the multi-GB files
from the issue thread this single fix recovers several GB of peak.
Related work/commits
Downstream projects have shipped workarounds while this issue has
been open, all of them in their own code rather than upstream:
ray-project/ray#62745(merged 2026-04-20): injectsParquetFragmentScanOptions(pre_buffer=False, use_buffered_stream=True)in Ray Data's parquet reader. Gets peak alloc down to ~75 MB but
gives up the
pre_buffer=Truecoalesced-read optimization thatmakes S3 fast.
marin-community/marin#4344(merged): replaces dataset-API usagewith
ParquetFile.iter_batches, giving up hive-partition discovery,filter pushdown, and dataset-level schema unification.
No open PR against
apache/arrowaddresses the cache-sideaccumulation. This PR is the upstream fix that lets both workarounds
be reverted without losing features or throughput.
Scope of this fix
This PR fixes the
ReadRangeCacheaccumulation that dominates peakmemory on the default
pre_buffer=truepath.A second source of growth, visible as the 151 MB vs 59 MB gap in the
pre_buffer=falserow of the table above, lives in the dataset asyncgenerator pipeline and is unrelated to the cache. It should be
tracked as a follow-up issue.
Partially closes #39808.
Test plan
New tests in
arrow/io/memory_test.cc:RangeReadCache.EvictEntriesInRange- basic eviction semanticsacross lazy and eager caches. Covers no-op windows, partial
overlaps, wide windows that drop multiple entries, and evict on an
empty cache.
RangeReadCache.EvictEntriesInRangeSpanningEntry- forces coalescingvia
hole_size_limit=100and verifies a coalesced entry is refusedfor a partial-window evict and dropped for a wide window that fully
contains it.
RangeReadCache.ConcurrentReadAndEvict- 4 reader threads in a tightRead()loop against the upper half of the cache, 1 evictor threadrunning 50 cycles of
EvictEntriesInRange+ re-Cacheagainst thelower half. Runs for both
lazy=trueandlazy=false. Under thepre-refactor code the
lazy=falsecase would race theentriesvector; both cases now pass cleanly.
New tests in
parquet/arrow/arrow_reader_writer_test.cc:TestArrowReadWrite.EvictPreBufferedData- PreBuffers a 4-row-groupfile, calls
EvictPreBufferedData({0}, ...), confirms row group 0'scache entries are gone while row groups 1-3 remain readable, and
that evicting twice or evicting on a reader that never PreBuffered
are both safe no-ops.
TestArrowReadWrite.GetRecordBatchGeneratorReleasesPreBufferedRowGroupspre_buffer=trueand confirms correctness of every emitted batch.Full-suite regression on Release build, macOS arm64:
parquet-arrow-reader-writer-test: 824/826 passing, 0 failing(the 2 skips are pre-existing dictionary-write variants not built
in this configuration).
arrow-io-memory-test: 57/57 passing.Are there any user-facing changes?
One new public method:
parquet::ParquetFileReader::EvictPreBufferedData.No behaviour change for existing callers beyond strictly lower peak
memory on the default
pre_buffer=truepath. No API deprecations,no format changes.
This PR contains a "Critical Fix": No (memory usage improvement,
not correctness).