Интеграция Snowflake ML для аналитики

Проектируем и внедряем системы искусственного интеллекта: от прототипа до production-ready решения. Наша команда объединяет экспертизу в машинном обучении, дата-инжиниринге и MLOps, чтобы AI работал не в лаборатории, а в реальном бизнесе.
Показано 1 из 1Все 1566 услуг
Интеграция Snowflake ML для аналитики
Средний
~1-2 недели
Часто задаваемые вопросы

Направления AI-разработки

Этапы разработки AI-решения

Последние работы

  • image_website-b2b-advance_0.webp
    Разработка сайта компании B2B ADVANCE
    1284
  • image_web-applications_feedme_466_0.webp
    Разработка веб-приложения для компании FEEDME
    1196
  • image_websites_belfingroup_462_0.webp
    Разработка веб-сайта для компании БЕЛФИНГРУПП
    901
  • image_ecommerce_furnoro_435_0.webp
    Разработка интернет магазина для компании FURNORO
    1119
  • image_logo-advance_0.webp
    Разработка логотипа компании B2B Advance
    586
  • image_crm_enviok_479_0.webp
    Разработка веб-приложения для компании Enviok
    853

Интеграция Snowflake ML для аналитики и машинного обучения

Snowflake ML — это ML непосредственно внутри data warehouse: обучение моделей на Snowpark Python без экспорта данных, Feature Store в Snowflake, и инференс через user-defined functions. Преимущество: данные не покидают DWH, что решает compliance-требования и устраняет data movement bottleneck.

Snowpark ML Pipeline

from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.preprocessing import StandardScaler, OrdinalEncoder
from snowflake.ml.modeling.ensemble import GradientBoostingClassifier
from snowflake.ml.modeling.model_selection import cross_validate
from snowflake.snowpark import Session

# Создание Snowpark сессии
session = Session.builder.configs({
    "account": "your-account",
    "user": "ml_user",
    "password": "...",
    "role": "ML_ROLE",
    "warehouse": "ML_WH",
    "database": "ML_DB",
    "schema": "FEATURES"
}).create()

# Загрузка данных как Snowpark DataFrame (данные остаются в Snowflake)
df = session.table("TRAINING_DATA")

# Feature engineering в Snowpark SQL
from snowflake.snowpark import functions as F

features_df = df.select(
    "USER_ID",
    "LABEL",
    F.col("AMOUNT").cast("float").alias("AMOUNT"),
    F.datediff("day", F.col("LAST_TX_DATE"), F.current_date()).alias("DAYS_SINCE_TX"),
    (F.col("TX_COUNT_30D") / F.col("TX_COUNT_90D")).alias("TX_ACCELERATION"),
    F.col("MERCHANT_CATEGORY"),
    F.col("COUNTRY")
)

# Train/test split
train_df, test_df = features_df.random_split([0.8, 0.2], seed=42)

# Snowpark ML Pipeline
pipeline = Pipeline(steps=[
    ("encoder", OrdinalEncoder(
        input_cols=["MERCHANT_CATEGORY", "COUNTRY"],
        output_cols=["MERCHANT_CATEGORY_ENC", "COUNTRY_ENC"]
    )),
    ("scaler", StandardScaler(
        input_cols=["AMOUNT", "DAYS_SINCE_TX", "TX_ACCELERATION"],
        output_cols=["AMOUNT_SCALED", "DAYS_SCALED", "TX_ACCEL_SCALED"]
    )),
    ("model", GradientBoostingClassifier(
        input_cols=["AMOUNT_SCALED", "DAYS_SCALED", "TX_ACCEL_SCALED",
                    "MERCHANT_CATEGORY_ENC", "COUNTRY_ENC"],
        label_col="LABEL",
        output_cols=["PREDICTED_LABEL"],
        n_estimators=200,
        learning_rate=0.05,
        max_depth=5
    ))
])

# Обучение — выполняется в Snowflake, данные не покидают DWH
fitted_pipeline = pipeline.fit(train_df)

# Предсказания
predictions = fitted_pipeline.transform(test_df)

Snowflake Feature Store

from snowflake.ml.feature_store import FeatureStore, FeatureView, Entity
import snowflake.ml.feature_store as fstore

fs = FeatureStore(
    session=session,
    database="ML_DB",
    name="PRODUCTION_FS",
    default_warehouse="ML_WH"
)

# Определение сущностей
user_entity = Entity(name="USER", join_keys=["USER_ID"])
merchant_entity = Entity(name="MERCHANT", join_keys=["MERCHANT_ID"])

fs.register_entity(user_entity)
fs.register_entity(merchant_entity)

# Feature View — SQL запрос + обновление
user_feature_view = FeatureView(
    name="USER_TX_FEATURES",
    entities=[user_entity],
    feature_df=session.sql("""
        SELECT
            USER_ID,
            COUNT(*) OVER (PARTITION BY USER_ID ORDER BY TX_DATE
                RANGE BETWEEN 30 PRECEDING AND CURRENT ROW) as TX_COUNT_30D,
            SUM(AMOUNT) OVER (PARTITION BY USER_ID ORDER BY TX_DATE
                RANGE BETWEEN 30 PRECEDING AND CURRENT ROW) as TX_AMOUNT_30D,
            AVG(AMOUNT) OVER (PARTITION BY USER_ID ORDER BY TX_DATE
                RANGE BETWEEN 7 PRECEDING AND CURRENT ROW) as TX_AVG_7D
        FROM TRANSACTIONS
    """),
    refresh_freq="1 day",  # Автообновление через Snowflake task
    desc="User transaction features, rolling windows"
)

registered_fv = fs.register_feature_view(user_feature_view, version="v1")

# Генерация обучающего датасета
dataset = fs.generate_dataset(
    spine_df=session.table("TRAINING_LABELS"),
    features=[registered_fv],
    spine_timestamp_col="TX_DATE",
    name="fraud_training_v1",
    desc="Fraud detection training set"
)

Регистрация и деплой модели

from snowflake.ml.registry import Registry

registry = Registry(session=session, database_name="ML_DB", schema_name="MODELS")

# Логирование модели
model_ref = registry.log_model(
    fitted_pipeline,
    model_name="FRAUD_DETECTION",
    version_name="v1_0",
    comment="GBT fraud detection model, trained on 6M transactions",
    metrics={"test_auc": 0.934, "test_f1": 0.812},
    tags={"team": "risk", "env": "production"}
)

# Деплой как SQL функция
model_ref.deploy(
    deployment_name="fraud_scoring",
    platform="WAREHOUSE",  # или "SPCS" для Snowpark Container Services
    target_method="predict",
    options={"compute_pool": "ML_COMPUTE_POOL"}
)

# Использование в SQL
session.sql("""
    SELECT
        t.TRANSACTION_ID,
        t.AMOUNT,
        FRAUD_DETECTION!PREDICT(t.AMOUNT_SCALED, ...) as FRAUD_SCORE
    FROM TRANSACTIONS t
    WHERE TX_DATE = CURRENT_DATE()
""").show()

Snowflake Cortex ML (встроенные LLM функции)

# Встроенные AI-функции через SQL — без кода Python
sql_examples = """
-- Sentiment analysis
SELECT SNOWFLAKE.CORTEX.SENTIMENT(REVIEW_TEXT) as sentiment_score
FROM CUSTOMER_REVIEWS;

-- Classification без обучения
SELECT SNOWFLAKE.CORTEX.CLASSIFY_TEXT(
    SUPPORT_TICKET,
    ['billing', 'technical', 'account', 'general']
) as ticket_category
FROM SUPPORT_TICKETS;

-- Summarization
SELECT SNOWFLAKE.CORTEX.SUMMARIZE(LONG_DOCUMENT) as summary
FROM DOCUMENTS
WHERE LENGTH(LONG_DOCUMENT) > 1000;

-- Extract structured info
SELECT SNOWFLAKE.CORTEX.EXTRACT_ANSWER(
    QUESTION => 'What is the order amount?',
    CONTEXT => ORDER_EMAIL_TEXT
) as extracted_amount
FROM ORDER_EMAILS;
"""

Snowflake ML оправдан когда данные уже в Snowflake и их объём > 500GB (data movement был бы дорог). Для команд с SQL-культурой: Snowpark ML + Cortex снижает порог входа для аналитиков без Python-опыта. Snowflake compute дороже dedicated ML cluster на 30-50%, но экономится инфраструктурный overhead.