How to make spark reuse python workers where we have done some costly init set up?

Apache Spark Worker Reuse Pitfall:ㄨ Optimizing Costly UDF Initialization

Summary

A PySpark deployment using Pandas UDFs with expensive initialization (e.g., ML model loading) failed to reuse Python worker processes, causing repeated costly setups. This occurred despite configuring spark.sql.execution.pyspark.udf.idleTimeoutSeconds solely – because Python worker factories forcibly terminate workers after a separate hard-coded 60-second timeout, even if the UDF idle timeout is higher.

Root Cause

Worker termination occurred due to two conflicting timeout mechanisms:

  • idleTimeoutSecondsador setting: Governs UDF-level idle time before worker cleanup
  • Python worker factory timeout (hard-coded 60s): Kills workers regardless of idle state after 60 seconds of worker process existence

The factory timeout supersedes the UDF idle timeout. After 60 seconds:

# PySpark internals (PythonWorkerFactory)
def create():
    ...
    self.launch_python_worker(timeout=60)  # Hard-coded 60s factory timeout

This forces termination regardless of UDF activity, nullifying reuse attempts when idleTimeoutSeconds > 60.

Why This Happens in Real Systems

  1. Workload patterns: Beyond 60s between UDF batches is common in:
    • Low-frequency data pipelines
    • Variable batch processing times
    • Resource-contested clusters
  2. Costly initialization: ML/DL models, large broadcast variables, or connection pools amplify penalties when reinitialized
  3. Misconfiguration propensity: Engineers configure UDF idle timeout without awareness of factory timeout

Real-World Impact

  • 200-300% longer job durations due to repeated model loads
  • CPU waste regenerating initialization states
  • Broadcast variable exhaustion (objects reloaded per worker)
  • Unpredictable latency spikes during worker recycling

Example or Code

# Pitfall: Worker killed after 60s despite 300s idle timeout
spark.conf.set("spark.sql.execution.pyspark.udf.idleTimeoutSeconds", "300")

model_bc = spark.sparkContext.broadcast(load_heavy_model())  # ~15s load time

@pandas_udf("long", PandasUDFType.SCALAR_ITER)
def process(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    model = model_bc.value  # Reinitialized if worker dies after 60s
    for series in iterator:
        yield model.predict(series)

How Senior Engineers Fix It

Augment timeout configuration to override the 60s factory limit:

# Override BOTH timeouts in cluster config
spark.conf.set("spark.sql.execution.pyspark.udf.idleTimeoutSeconds", "600")  # UDF timeout
spark.conf.set("spark.python.worker.reuse", "true")  # Critical for reuse
spark.conf.set("spark.python.worker.timeout", "0")   # Disables 60s factory timeout
  • Verify reuse: Monitor PythonWorkerFactory logs for Created new worker events
  • Complementary optimizations:
    • Initialize outside UDF via worker-level setup (__main__ in spark-submit files)
    • Validate serialization efficiency of broadcasted objects
  • Default caution: spark.python.worker.timeout=0 risks resource leaks~ use job-based cleanup

Why Juniors Miss It

  1. Documentation gaps: Factory timeout separation not emphasized in common tuning guides
  2. Assumption pitfall: Belief that idleTimeoutSeconds alone controls worker lifetime
  3. Tooling limitations: No Spark UI metric directly showing worker kill reasons
  4. Stateless mindset: Underestimation of stateful worker requirements in distributed UDFs