Balancing Throughput and Parquet File Size in DataFusion Pipelines

Summary

A production pipeline designed to ingest data from multiple remote APIs encountered a fundamental architectural conflict between throughput optimization and storage efficiency. The system aimed to process large responses immediately to maximize CPU/IO utilization while simultaneously buffering small responses to prevent the “small file problem” in Parquet storage.

The core issue arose from trying to treat a stateful sink (the file writer) as a stateless operator (the dataframe write operation). In DataFusion, calling write_table on individual DataFrames creates independent execution plans that lack a shared state for file rotation logic, making it impossible to satisfy both goals using standard high-level APIs.

Root Cause

The failure to achieve the desired behavior stems from the decoupling of the execution plan from the physical storage state.

  • Statelessness of DataFrames: Each time write_table is called, DataFusion constructs a new execution plan. This plan has no intrinsic knowledge of previously opened file handles or the current byte-size of existing Parquet files on disk.
  • Granularity Mismatch:
    • Immediate Writes: Triggered per API response, leading to fragmentation (too many small files) for small payloads.
    • Unioned Writes: Require waiting for all responses to complete, leading to high latency and poor resource utilization for large payloads.
  • Schema Divergence: Because the input schemas vary before transformation, a single unified TableProvider cannot be easily implemented without complex, runtime-determined schema mapping.

Why This Happens in Real Systems

This is a classic Resource Contention vs. Cost Optimization trade-off. In high-scale distributed systems, we face two competing forces:

  • Compute Efficiency: We want to “push” data through the pipeline as fast as possible to free up memory and CPU cycles (Stream Processing).
  • Storage Efficiency: We want to batch data to satisfy the requirements of columnar formats like Parquet, which rely on large row groups and large files to maintain high compression ratios and efficient metadata (Batch Processing).

When these two requirements collide, a naive implementation will always favor one at the total expense of the other.

Real-World Impact

  • Increased Cloud Costs: A massive influx of small files increases S3/GCS PUT request costs and metadata overhead.
  • Query Performance Degradation: Downstream analytical engines (like Trino, DuckDB, or Spark) suffer from high latency when scanning thousands of tiny Parquet files due to excessive file IOPS and metadata parsing.
  • Resource Starvation: Attempting to buffer too much data in memory to avoid small files can lead to OOM (Out of Memory) kills during high-concurrency spikes.

Example or Code (if necessary and relevant)

The following conceptual Rust snippet demonstrates why the standard approach fails to bridge the gap:

// This pattern fails to solve the problem because each call is isolated.
async fn process_api_responses(responses: Vec) -> Result {
    for df in responses {
        // PROBLEM: If df is small, this creates a tiny file.
        // PROBLEM: If df is large, we can't "merge" it into an existing open file.
        df.write_parquet("output.parquet", None).await?;
    }
    Ok(())
}

How Senior Engineers Fix It

A senior engineer moves away from high-level “all-in-one” functions and implements a Layered Buffering Architecture.

  1. Decouple Ingestion from Persistence: Instead of writing directly to Parquet, write the transformed records to an intermediate, high-throughput buffer (e.g., an in-memory RecordBatch queue or a specialized stream).
  2. Implement a Custom Sink/Writer: Create a dedicated Sink Manager that manages a pool of FileSink objects.
  3. Threshold-Based Rotation: The Sink Manager monitors two metrics:
    • Byte Size: Total bytes written to the current file.
    • Time/Count: Time elapsed since the last write or number of batches received.
  4. Hybrid Execution:
    • Large Payloads: Bypass the buffer and trigger an immediate “flush” of a new file.
    • Small Payloads: Route through the Sink Manager to be aggregated into the current active file.

Why Juniors Miss It

  • API Over-Reliance: Juniors tend to look for a single function call (like write_table) that does “everything,” rather than decomposing the problem into Ingestion, Buffering, and Persistence.
  • Ignoring Physical Realities: They treat files as abstract logical entities rather than physical objects with IOPS, metadata, and compression constraints.
  • Missing the State Problem: They often fail to realize that a stateless function (the DataFrame API) cannot manage stateful resources (open file handles and byte counts) without an external orchestrator.

Leave a Comment