Интеграция Snowflake ML для аналитики и машинного обучения
Snowflake ML — это ML непосредственно внутри data warehouse: обучение моделей на Snowpark Python без экспорта данных, Feature Store в Snowflake, и инференс через user-defined functions. Преимущество: данные не покидают DWH, что решает compliance-требования и устраняет data movement bottleneck.
Snowpark ML Pipeline
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.preprocessing import StandardScaler, OrdinalEncoder
from snowflake.ml.modeling.ensemble import GradientBoostingClassifier
from snowflake.ml.modeling.model_selection import cross_validate
from snowflake.snowpark import Session
# Создание Snowpark сессии
session = Session.builder.configs({
"account": "your-account",
"user": "ml_user",
"password": "...",
"role": "ML_ROLE",
"warehouse": "ML_WH",
"database": "ML_DB",
"schema": "FEATURES"
}).create()
# Загрузка данных как Snowpark DataFrame (данные остаются в Snowflake)
df = session.table("TRAINING_DATA")
# Feature engineering в Snowpark SQL
from snowflake.snowpark import functions as F
features_df = df.select(
"USER_ID",
"LABEL",
F.col("AMOUNT").cast("float").alias("AMOUNT"),
F.datediff("day", F.col("LAST_TX_DATE"), F.current_date()).alias("DAYS_SINCE_TX"),
(F.col("TX_COUNT_30D") / F.col("TX_COUNT_90D")).alias("TX_ACCELERATION"),
F.col("MERCHANT_CATEGORY"),
F.col("COUNTRY")
)
# Train/test split
train_df, test_df = features_df.random_split([0.8, 0.2], seed=42)
# Snowpark ML Pipeline
pipeline = Pipeline(steps=[
("encoder", OrdinalEncoder(
input_cols=["MERCHANT_CATEGORY", "COUNTRY"],
output_cols=["MERCHANT_CATEGORY_ENC", "COUNTRY_ENC"]
)),
("scaler", StandardScaler(
input_cols=["AMOUNT", "DAYS_SINCE_TX", "TX_ACCELERATION"],
output_cols=["AMOUNT_SCALED", "DAYS_SCALED", "TX_ACCEL_SCALED"]
)),
("model", GradientBoostingClassifier(
input_cols=["AMOUNT_SCALED", "DAYS_SCALED", "TX_ACCEL_SCALED",
"MERCHANT_CATEGORY_ENC", "COUNTRY_ENC"],
label_col="LABEL",
output_cols=["PREDICTED_LABEL"],
n_estimators=200,
learning_rate=0.05,
max_depth=5
))
])
# Обучение — выполняется в Snowflake, данные не покидают DWH
fitted_pipeline = pipeline.fit(train_df)
# Предсказания
predictions = fitted_pipeline.transform(test_df)
Snowflake Feature Store
from snowflake.ml.feature_store import FeatureStore, FeatureView, Entity
import snowflake.ml.feature_store as fstore
fs = FeatureStore(
session=session,
database="ML_DB",
name="PRODUCTION_FS",
default_warehouse="ML_WH"
)
# Определение сущностей
user_entity = Entity(name="USER", join_keys=["USER_ID"])
merchant_entity = Entity(name="MERCHANT", join_keys=["MERCHANT_ID"])
fs.register_entity(user_entity)
fs.register_entity(merchant_entity)
# Feature View — SQL запрос + обновление
user_feature_view = FeatureView(
name="USER_TX_FEATURES",
entities=[user_entity],
feature_df=session.sql("""
SELECT
USER_ID,
COUNT(*) OVER (PARTITION BY USER_ID ORDER BY TX_DATE
RANGE BETWEEN 30 PRECEDING AND CURRENT ROW) as TX_COUNT_30D,
SUM(AMOUNT) OVER (PARTITION BY USER_ID ORDER BY TX_DATE
RANGE BETWEEN 30 PRECEDING AND CURRENT ROW) as TX_AMOUNT_30D,
AVG(AMOUNT) OVER (PARTITION BY USER_ID ORDER BY TX_DATE
RANGE BETWEEN 7 PRECEDING AND CURRENT ROW) as TX_AVG_7D
FROM TRANSACTIONS
"""),
refresh_freq="1 day", # Автообновление через Snowflake task
desc="User transaction features, rolling windows"
)
registered_fv = fs.register_feature_view(user_feature_view, version="v1")
# Генерация обучающего датасета
dataset = fs.generate_dataset(
spine_df=session.table("TRAINING_LABELS"),
features=[registered_fv],
spine_timestamp_col="TX_DATE",
name="fraud_training_v1",
desc="Fraud detection training set"
)
Регистрация и деплой модели
from snowflake.ml.registry import Registry
registry = Registry(session=session, database_name="ML_DB", schema_name="MODELS")
# Логирование модели
model_ref = registry.log_model(
fitted_pipeline,
model_name="FRAUD_DETECTION",
version_name="v1_0",
comment="GBT fraud detection model, trained on 6M transactions",
metrics={"test_auc": 0.934, "test_f1": 0.812},
tags={"team": "risk", "env": "production"}
)
# Деплой как SQL функция
model_ref.deploy(
deployment_name="fraud_scoring",
platform="WAREHOUSE", # или "SPCS" для Snowpark Container Services
target_method="predict",
options={"compute_pool": "ML_COMPUTE_POOL"}
)
# Использование в SQL
session.sql("""
SELECT
t.TRANSACTION_ID,
t.AMOUNT,
FRAUD_DETECTION!PREDICT(t.AMOUNT_SCALED, ...) as FRAUD_SCORE
FROM TRANSACTIONS t
WHERE TX_DATE = CURRENT_DATE()
""").show()
Snowflake Cortex ML (встроенные LLM функции)
# Встроенные AI-функции через SQL — без кода Python
sql_examples = """
-- Sentiment analysis
SELECT SNOWFLAKE.CORTEX.SENTIMENT(REVIEW_TEXT) as sentiment_score
FROM CUSTOMER_REVIEWS;
-- Classification без обучения
SELECT SNOWFLAKE.CORTEX.CLASSIFY_TEXT(
SUPPORT_TICKET,
['billing', 'technical', 'account', 'general']
) as ticket_category
FROM SUPPORT_TICKETS;
-- Summarization
SELECT SNOWFLAKE.CORTEX.SUMMARIZE(LONG_DOCUMENT) as summary
FROM DOCUMENTS
WHERE LENGTH(LONG_DOCUMENT) > 1000;
-- Extract structured info
SELECT SNOWFLAKE.CORTEX.EXTRACT_ANSWER(
QUESTION => 'What is the order amount?',
CONTEXT => ORDER_EMAIL_TEXT
) as extracted_amount
FROM ORDER_EMAILS;
"""
Snowflake ML оправдан когда данные уже в Snowflake и их объём > 500GB (data movement был бы дорог). Для команд с SQL-культурой: Snowpark ML + Cortex снижает порог входа для аналитиков без Python-опыта. Snowflake compute дороже dedicated ML cluster на 30-50%, но экономится инфраструктурный overhead.







