Интеграция Apache Spark MLlib для обработки больших данных

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1Все 1566 услуг
Интеграция Apache Spark MLlib для обработки больших данных
Средний
~1-2 недели
Часто задаваемые вопросы

Направления AI-разработки

Этапы разработки AI-решения

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1284
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1196
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    901
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1119
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    586
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    853

Интеграция Apache Spark MLlib для больших данных

Spark MLlib используется когда данные не помещаются в RAM одной машины, а значит pandas и sklearn не работают. Типичный порог: датасеты > 100GB или > 100M строк. MLlib предоставляет те же алгоритмы (логистическая регрессия, градиентный бустинг, k-means), но распределённо на кластере.

Настройка Spark ML пайплайна

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import (VectorAssembler, StringIndexer,
                                  StandardScaler, Imputer)
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

spark = SparkSession.builder \
    .appName("ML Pipeline") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.instances", "10") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.ml.param.maxParallelism", "4") \
    .getOrCreate()

# Загрузка данных
df = spark.read.parquet("s3://data/training/*.parquet")
df = df.repartition(200)  # Оптимальное число партиций

# Feature engineering
numeric_cols = ['amount', 'age', 'days_since_last_tx', 'tx_count_30d']
categorical_cols = ['category', 'country', 'device_type']

# Imputer для числовых
imputer = Imputer(
    inputCols=numeric_cols,
    outputCols=[f"{c}_imputed" for c in numeric_cols],
    strategy="median"
)

# Кодирование категориальных
indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_idx",
                  handleInvalid="keep")
    for col in categorical_cols
]

# Сборка вектора признаков
all_feature_cols = (
    [f"{c}_imputed" for c in numeric_cols] +
    [f"{c}_idx" for c in categorical_cols]
)

assembler = VectorAssembler(
    inputCols=all_feature_cols,
    outputCol="features_raw",
    handleInvalid="keep"
)

scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withMean=True,
    withStd=True
)

# Модель
gbt = GBTClassifier(
    labelCol="label",
    featuresCol="features",
    maxIter=100,
    maxDepth=5,
    stepSize=0.05,
    subsamplingRate=0.8,
    seed=42
)

# Pipeline
pipeline = Pipeline(stages=[
    imputer,
    *indexers,
    assembler,
    scaler,
    gbt
])

# Train/test split
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Обучение
model = pipeline.fit(train_df)
predictions = model.transform(test_df)

# Оценка
evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc:.4f}")

Гиперпараметрическая оптимизация

# Cross-validation на кластере
param_grid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [4, 6, 8]) \
    .addGrid(gbt.maxIter, [50, 100]) \
    .addGrid(gbt.stepSize, [0.05, 0.1]) \
    .build()

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=4,  # Параллельный запуск фолдов
    seed=42
)

cv_model = cv.fit(train_df)
best_model = cv_model.bestModel
print(f"Best params: {cv_model.bestModel.stages[-1].extractParamMap()}")

Feature Importance и интерпретация

# Извлечение feature importance
gbt_model = best_model.stages[-1]
importance = gbt_model.featureImportances

# Маппинг на имена признаков
feature_names = all_feature_cols
importance_df = spark.createDataFrame(
    [(name, float(imp)) for name, imp in zip(feature_names, importance.toArray())],
    ["feature", "importance"]
).orderBy("importance", ascending=False)

importance_df.show(20)

# SHAP через pandas на выборке
sample_pandas = predictions.sample(fraction=0.01).toPandas()
# ... далее стандартный TreeExplainer

Сохранение и деплой модели

import mlflow
import mlflow.spark

# Логирование в MLflow
with mlflow.start_run():
    mlflow.log_param("max_depth", gbt.getMaxDepth())
    mlflow.log_param("max_iter", gbt.getMaxIter())
    mlflow.log_metric("auc", auc)

    # Сохранение Spark модели
    mlflow.spark.log_model(best_model, "spark_model")

    # Экспорт в ONNX для быстрого инференса
    from onnxmltools import convert_sparkml
    onnx_model = convert_sparkml(best_model, "GBT Model", test_df.limit(5))
    mlflow.onnx.log_model(onnx_model, "onnx_model")

# Загрузка для предсказаний
loaded_model = mlflow.spark.load_model("runs:/RUN_ID/spark_model")
batch_predictions = loaded_model.transform(new_data_df)

Оптимизация производительности

Параметр Default Рекомендованное Эффект
spark.sql.shuffle.partitions 200 2x cores Избежать skew
executor.memory 1g 4-8g Кэш датасета
spark.ml.param.maxParallelism 1 4-8 CV параллелизм
repartition перед fit нет 200-400 Равномерная нагрузка
caching train_df нет да 3-5x ускорение CV

Обучение GBT на 500M строк в типичной конфигурации (10 executor × 4 CPU × 8GB): 15-40 минут в зависимости от числа признаков и итераций. Cross-validation с 6 конфигурациями × 3 фолда: 4-6 часов без параллелизма, 1-2 часа с parallelism=4.