Реализация AI-контроля качества данных
Контроль качества данных — это не просто "проверить NULL". Зрелая система покрывает 7 измерений: полнота, уникальность, актуальность, достоверность, согласованность, точность и валидность. AI-подход добавляет автоматическую генерацию правил из исторических данных и умную классификацию важности проблем.
Многоуровневая система проверок
import pandas as pd
import numpy as np
from anthropic import Anthropic
from dataclasses import dataclass
from enum import Enum
import great_expectations as gx
class Severity(Enum):
CRITICAL = "critical" # Блокирует пайплайн
WARNING = "warning" # Алерт, пайплайн продолжается
INFO = "info" # Логируется
@dataclass
class QualityCheck:
name: str
column: str
check_type: str
params: dict
severity: Severity
description: str
class AIQualityController:
def __init__(self):
self.llm = Anthropic()
self.checks = []
self.context = gx.get_context()
def generate_checks_from_data(self, df: pd.DataFrame,
domain_context: str = "") -> list[QualityCheck]:
"""Автогенерация правил качества из статистики данных"""
# Профиль данных
profile = {}
for col in df.columns:
s = df[col]
col_profile = {
'dtype': str(s.dtype),
'null_pct': s.isnull().mean(),
'unique_pct': s.nunique() / len(s),
}
if pd.api.types.is_numeric_dtype(s):
q1, q3 = s.quantile(0.01), s.quantile(0.99)
col_profile.update({'q01': float(q1), 'q99': float(q3),
'min': float(s.min()), 'max': float(s.max())})
else:
col_profile['sample_values'] = s.dropna().value_counts().head(5).index.tolist()
profile[col] = col_profile
import json
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=800,
messages=[{
"role": "user",
"content": f"""Generate data quality checks as JSON array.
Data profile:
{json.dumps(profile, indent=2)[:1500]}
Domain context: {domain_context}
Return JSON array of checks:
[
{{
"name": "user_id_not_null",
"column": "user_id",
"check_type": "not_null",
"params": {{}},
"severity": "critical",
"description": "User ID must never be null"
}},
{{
"name": "amount_positive",
"column": "amount",
"check_type": "value_range",
"params": {{"min": 0, "max": 1000000}},
"severity": "critical",
"description": "Transaction amount must be positive"
}},
...
]"""
}]
)
try:
checks_data = json.loads(response.content[0].text)
return [QualityCheck(**c) for c in checks_data]
except Exception:
return []
def run_checks(self, df: pd.DataFrame,
checks: list[QualityCheck] = None) -> dict:
"""Выполнение всех проверок"""
if checks is None:
checks = self.checks
results = {
'passed': [],
'failed_critical': [],
'failed_warning': [],
'stats': {
'total': len(checks),
'passed': 0,
'failed': 0
}
}
for check in checks:
try:
passed, details = self._execute_check(df, check)
if passed:
results['passed'].append({'check': check.name, 'details': details})
results['stats']['passed'] += 1
else:
result_entry = {
'check': check.name,
'column': check.column,
'severity': check.severity.value,
'description': check.description,
'details': details
}
if check.severity == Severity.CRITICAL:
results['failed_critical'].append(result_entry)
else:
results['failed_warning'].append(result_entry)
results['stats']['failed'] += 1
except Exception as e:
results['failed_warning'].append({
'check': check.name,
'error': str(e)
})
# AI-диагностика критических ошибок
if results['failed_critical']:
results['ai_diagnosis'] = self._diagnose_failures(results['failed_critical'], df)
results['quality_score'] = results['stats']['passed'] / max(results['stats']['total'], 1)
return results
def _execute_check(self, df: pd.DataFrame, check: QualityCheck) -> tuple[bool, dict]:
"""Выполнение одной проверки"""
col = df[check.column] if check.column in df.columns else None
if check.check_type == 'not_null':
if col is None:
return False, {'error': f"Column {check.column} not found"}
null_count = col.isnull().sum()
return null_count == 0, {'null_count': int(null_count)}
elif check.check_type == 'unique':
if col is None:
return False, {'error': f"Column {check.column} not found"}
dup_count = col.duplicated().sum()
return dup_count == 0, {'duplicate_count': int(dup_count)}
elif check.check_type == 'value_range':
if col is None:
return False, {}
min_val = check.params.get('min')
max_val = check.params.get('max')
violations = 0
if min_val is not None:
violations += (col.dropna() < min_val).sum()
if max_val is not None:
violations += (col.dropna() > max_val).sum()
return violations == 0, {'violations': int(violations)}
elif check.check_type == 'regex':
if col is None:
return False, {}
pattern = check.params.get('pattern', '.*')
matches = col.dropna().astype(str).str.match(pattern)
non_matching = (~matches).sum()
return non_matching == 0, {'non_matching': int(non_matching)}
elif check.check_type == 'accepted_values':
if col is None:
return False, {}
accepted = set(check.params.get('values', []))
invalid = ~col.dropna().isin(accepted)
invalid_count = invalid.sum()
return invalid_count == 0, {
'invalid_count': int(invalid_count),
'invalid_sample': col[col.notna() & invalid].head(3).tolist()
}
elif check.check_type == 'freshness':
if col is None:
return False, {}
max_age_hours = check.params.get('max_age_hours', 24)
latest = pd.to_datetime(col).max()
age_hours = (pd.Timestamp.now() - latest).total_seconds() / 3600
return age_hours <= max_age_hours, {'age_hours': round(age_hours, 1)}
return True, {}
def _diagnose_failures(self, failures: list[dict], df: pd.DataFrame) -> str:
"""LLM-диагностика причин сбоев"""
import json
response = self.llm.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=400,
messages=[{
"role": "user",
"content": f"""Diagnose these data quality failures and suggest root causes.
Failures:
{json.dumps(failures, indent=2)}
Dataset shape: {df.shape}
Provide: likely root cause for each failure group, recommended immediate actions."""
}]
)
return response.content[0].text
Great Expectations интеграция
def setup_gx_suite(df: pd.DataFrame, suite_name: str) -> gx.ExpectationSuite:
"""Создание GE suite из данных"""
context = gx.get_context()
suite = context.add_expectation_suite(expectation_suite_name=suite_name)
validator = context.get_validator(
batch_request=gx.RuntimeBatchRequest(
datasource_name="pandas_datasource",
data_connector_name="runtime_data_connector",
data_asset_name="training_data",
batch_identifiers={"default_identifier_name": "default_identifier"},
runtime_parameters={"batch_data": df}
),
expectation_suite_name=suite_name
)
# Автогенерация expectations через GE profiler
from great_expectations.profile.user_configurable_profiler import UserConfigurableProfiler
profiler = UserConfigurableProfiler(profile_dataset=validator)
suite, _ = profiler.build_suite()
context.save_expectation_suite(suite)
return suite
Внедрение AI-контроля качества выявляет 85-95% проблем данных до попадания в продакшн. Типичная экономия: предотвращение 1-2 инцидентов с данными в месяц, каждый из которых стоит 4-8 часов диагностики и исправления.







