💫 Spark для аналитика (ч. 1)
Раньше в 💙 я работал немного со Spark, большая часть задач спокойно решалась внутри одной БД или выгрузкой в pandas.
Сейчас для моих задач Spark - это необходимость, чтобы не падал JupyterHub по оперативной памяти: все вычисления выполняются распределённо на кластере с большим объёмом ресурсов. Но это не волшебная таблетка, т.к. важно следить за тем, как используются ресурсы, грамотно настраивать Spark-приложения и оптимизировать запросы. На самом деле подход к работе с ресурсами здесь другой, и есть ряд ограничений, о которых расскажу в следующих постах 🙃 🥳 Как я использую сейчас 1. Собираю данные из разных источников В реальных задачах часто нужно объединять сразу несколько источников: выгрузки из разных баз, parquet и тд. Пока всё влезает в pandas - норм, но когда данных слишком много, pandas начинает падать. Spark позволяет легко подтянуть все необходимые источники и собрать их в одну большую таблицу, не заботясь об ограничениях памяти.
2. Выполняю тяжёлые вычисления и агрегации После того как все данные собраны, начинаются подсчеты метрик по большим объёмам данных. Здесь Spark выигрывает за счёт распределённых вычислений: вся тяжёлая работа идёт на кластере, а не на ноутбуке. Как только нужные агрегаты посчитаны, можно забрать результат и уже дальше анализировать, строить графики и т.д.
😍 Spark реально может работать дольше, чем pandas, если данных немного. Всё из-за архитектуры: Spark каждый раз поднимает распределённую систему, разбивает задачи на части, отправляет их на кластер и только потом собирает результат. Pandas же считает всё в оперативке и на небольших данных это быстрее почти всегда.
🔽Ниже прикрепляю основные функции для работы в Spark, которые я использую для решения задач аналитики `from pyspark.sql import SparkSession from pyspark.sql.functions import avg, count
#запускаем Spark-сессию, тут еще можно закопаться в настройки приложения (если будет много 🐳, выложу)spark = SparkSession.builder.appName("zasql_python").getOrCreate() # название приложения может быть произвольным
#читаем csv и кучу источниковdf_csv = spark.read.csv("file.csv", header=True, inferSchema=True) df_parquet = spark.read.parquet("file.parquet") df_json = spark.read.json("file.json")
#джойним таблицы между собойdf_joined = df_csv.join(df_parquet, on="user_id", how="inner")
#фильтруем данныеdf_filtered = df_joined.filter(df_joined["is_active"] == 1)
#применяем агрегирующие функции, считаем сумму строчек, среднее значение по заказамdf_grouped = ( df_filtered .groupBy("country") .agg( count("*").alias("users_count"), avg("order_sum").alias("avg_order_sum") ) )
df_pandas = df_grouped.toPandas()
🐼 Очень похоже на pandas + можно в любой момент пересесть на spark.sql и писать в sql-формате любой запрос. `from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("zasql_python_sql").getOrCreate() # произвольное название приложения, должно быть другим, если запускаем параллельно
df_orders = spark.read.parquet("orders.parquet") # читаем в Spark DataFrame первый источник источник df_users = spark.read.csv("users.csv", header=True, inferSchema=True) # читаем в Spark DataFrame второй источник
df_orders.createOrReplaceTempView("orders") # создаем темповые таблицы заказов df_users.createOrReplaceTempView("users") # создаем темповые таблицы юзеров
#теперь читаем тут в sql-форматеquery = """ SELECT u.country, COUNT(DISTINCT o.user_id) AS active_users, AVG(o.order_sum) AS avg_order_sum FROM orders o JOIN users u ON o.user_id = u.user_id WHERE o.is_active = 1 GROUP BY u.country ORDER BY avg_order_sum DESC """
result = spark.sql(query) # читаем в spark.sql, результат тот же получаем, но в SQL-формате result.show() # показать значения, но можно перевести и в pandas, но ресурсов много сожрет
**Spark спасает, когда надо соединить и обработать десятки миллионов строк из разных источников, и обычный pandas падает по памяти, ядро умирает.
Ставьте** 🐳****, если хотите продолжение истории про Spark 💫``
· 30.07.2025
История интересная, но смайла с китом тут нет
ответить
коммент удалён