Когда AQE в Spark не сработал?

Представьте: у нас тяжёлый запрос на обновление строк по ключам в Iceberg-таблице: сначала удаляем старые записи, потом вставляем новые. Таблица — 1,4 TiB, Spark 3.5.5, продакшн. Удаление выглядит так:

DELETE FROM table AS trg WHERE (hours(trg.create_dttm) BETWEEN 'start' AND 'finish' OR ISNULL(trg.create_dttm)) AND EXISTS ( SELECT 1 FROM src_diff AS src WHERE src.load_key = trg.load_key AND diff_change_oper = 'U' )

Мы выделили значительные ресурсы: 78 executor’ов, по 4 ядра и 20 GiB локального диска на ядро — итого 6240 GiB диска под shuffle. Потюнили spark.sql.shuffle.partitions = 11000 Ожидаемый объём shuffle write для нашего запроса был около 1500 GiB. Вроде всё хорошо?

Но запрос стабильно падал с ошибкой Missing an output location for shuffle на этапе Exchange по ключу партиционирования — day(created_dttm) перед финальной вставкой в таблицу. AQE (Adaptive Query Execution) анализирует статистику shuffle после Map-фазы и принимает решение: объединять ли мелкие партиции (coalesce) или дробить перекошенные (skew handling). Spark сам собирает статистику по партициям. Но делает это используя две структуры данных:

MapStatus — точная статистика по каждой партиции. Используется, если spark.sql.shuffle.partitions < spark.shuffle.minNumPartitionsToHighlyCompress

HighlyCompressedMapStatus — приближённая, компактная статистика. Применяется при большом числе партиций, чтобы снизить нагрузку на память driver’а и ускорить передачу данных. Активируется при spark.sql.shuffle.partitions >= spark.shuffle.minNumPartitionsToHighlyCompress

По умолчанию порог применения приближенной статистики — 2000 партиций. У нас — 11000, а значит, использовалась приближённая статистика.

Проблема в том, что HighlyCompressedMapStatus недостаточно точно оценивает размеры партиций. Это привело к тому, что AQE некорректно определил перекосы, не разбил перегруженные партиции — и в результате одна из них получила колоссальный объём данных, что вызвало сбой.

Мы изменили параметр: spark.shuffle.minNumPartitionsToHighlyCompress=200000 Теперь Spark использует точную статистику (MapStatus) даже при высоком числе партиций. Ошибка исчезла — AQE стал корректно распознавать и обрабатывать перекосы.

У нашего решения есть минус — точная статистика требует больше памяти на driver’е: объём данных — spark.sql.shuffle.partitions × количество executors. При росте этих параметров есть риск OOM на driver’е.

#deepdive #spark #iceberg #performance #tuning