Розробка Workflow-рушія на базі Apache Airflow

Наша компанія займається розробкою, підтримкою та обслуговуванням сайтів будь-якої складності. Від простих односторінкових сайтів до масштабних кластерних систем, побудованих на мікро сервісах. Досвід розробників підтверджено сертифікатами від вендорів.

Розробка та обслуговування будь-яких видів сайтів:

Інформаційні сайти або веб-програми
Сайти візитки, landing page, корпоративні сайти, онлайн каталоги, квіз, промо-сайти, блоги, ресурси новин, інформаційні портали, форуми, агрегатори
Сайти або веб-програми електронної комерції
Інтернет-магазини, B2B-портали, маркетплейси, онлайн-обмінники, кешбек-сайти, біржі, дропшиппінг-платформи, парсери товарів
Веб-програми для управління бізнес-процесами
CRM-системи, ERP-системи, корпоративні портали, системи управління виробництвом, парсери інформації
Сайти або веб-програми електронних послуг
Дошки оголошень, онлайн-школи, онлайн-кінотеатри, конструктори сайтів, портали надання електронних послуг, відеохостинги, тематичні портали

Це лише деякі з технічних типів сайтів, з якими ми працюємо, і кожен із них може мати свої специфічні особливості та функціональність, а також бути адаптованим під конкретні потреби та цілі клієнта.

Пропоновані послуги
Показано 1 з 1 послугУсі 2065 послуг
Розробка Workflow-рушія на базі Apache Airflow
Складна
~2-4 тижні
Часті питання

Наші компетенції:

Етапи розробки

Останні роботи

  • image_website-b2b-advance_0.png
    Розробка сайту компанії B2B ADVANCE
    1262
  • image_web-applications_feedme_466_0.webp
    Розробка веб-додатків для компанії FEEDME
    1171
  • image_websites_belfingroup_462_0.webp
    Розробка веб-сайту для компанії БЕЛФІНГРУП
    874
  • image_ecommerce_furnoro_435_0.webp
    Розробка інтернет магазину для компанії FURNORO
    1094
  • image_crm_enviok_479_0.webp
    Розробка веб-додатків для компанії Enviok
    831
  • image_bitrix-bitrix-24-1c_fixper_448_0.png
    Розробка веб-сайту для компанії ФІКСПЕР
    851

Розроблення Workflow-рушія на базі Apache Airflow

Apache Airflow — платформа для оркестрації data pipelines та ETL-процесів. Workflow визначаються як Python-код у вигляді DAG (Directed Acyclic Graph). Airflow зберігає історію запусків, вміє робити backfill, моніторить стан завдань та підтримує паралельне виконання.

Коли Airflow замість Temporal/Camunda

Airflow оптимізований для batch-обробки даних:

  • ETL/ELT пайплайни (PostgreSQL → трансформація → Data Warehouse)
  • Щоденні звіти та виконавання
  • ML-пайплайни (підготовка даних → навчання → розгортання моделі)
  • Періодичні агрегації та синхронізації

Для подійво-керованих бізнес-процесів з human tasks — використовуйте Temporal або Camunda.

Встановлення через Helm (Kubernetes)

helm repo add apache-airflow https://airflow.apache.org
helm upgrade --install airflow apache-airflow/airflow \
  --namespace airflow \
  --create-namespace \
  --set executor=KubernetesExecutor \
  --set postgresql.enabled=true \
  --set redis.enabled=true \
  --values airflow-values.yaml
# airflow-values.yaml
airflow:
  image:
    repository: apache/airflow
    tag: 2.8.0
  config:
    AIRFLOW__CORE__DAGS_FOLDER: /opt/airflow/dags
    AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: "3"
    AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: "30"

dags:
  gitSync:
    enabled: true
    repo: https://github.com/company/airflow-dags.git
    branch: main
    subPath: dags/

DAG — приклад ETL-пайплайну

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime, timedelta
import pandas as pd

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2026, 1, 1),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['[email protected]'],
}

with DAG(
    'daily_orders_etl',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # щодня о 02:00 UTC
    catchup=False,
    tags=['etl', 'orders'],
    description='Завантаження та трансформація замовлень у DWH',
) as dag:

    # Крок 1: Витяг даних з production DB
    def extract_orders(**context):
        hook = PostgresHook(postgres_conn_id='production_db')
        ds = context['ds']  # дата виконання: 2026-03-28

        df = hook.get_pandas_df(f"""
            SELECT o.id, o.customer_id, o.total, o.status,
                   o.created_at, c.email, c.country
            FROM orders o
            JOIN customers c ON c.id = o.customer_id
            WHERE o.created_at::date = '{ds}'
              AND o.status IN ('paid', 'shipped', 'delivered')
        """)

        # Зберегти в XCom для наступного кроку
        context['ti'].xcom_push(key='orders_count', value=len(df))
        df.to_parquet(f'/tmp/orders_{ds}.parquet')
        return len(df)

    # Крок 2: Трансформація
    def transform_orders(**context):
        ds = context['ds']
        df = pd.read_parquet(f'/tmp/orders_{ds}.parquet')

        # Трансформації
        df['order_date'] = pd.to_datetime(df['created_at']).dt.date
        df['revenue_usd'] = df['total'] / 100  # центи → доларі
        df['is_international'] = df['country'] != 'RU'
        df['customer_tier'] = df['revenue_usd'].apply(
            lambda x: 'vip' if x >= 500 else 'regular'
        )

        df.to_parquet(f'/tmp/orders_transformed_{ds}.parquet')

    # Крок 3: Завантаження у DWH
    def load_to_dwh(**context):
        ds = context['ds']
        df = pd.read_parquet(f'/tmp/orders_transformed_{ds}.parquet')

        hook = PostgresHook(postgres_conn_id='datawarehouse')
        engine = hook.get_sqlalchemy_engine()

        # Upsert у DWH
        df.to_sql('fact_orders', engine, schema='dwh',
                  if_exists='append', index=False,
                  method='multi', chunksize=1000)

    # Крок 4: Агрегації для дашборду
    aggregate_metrics = PostgresOperator(
        task_id='aggregate_metrics',
        postgres_conn_id='datawarehouse',
        sql="""
        INSERT INTO dwh.daily_metrics (date, total_revenue, orders_count, avg_order)
        SELECT
          '{{ ds }}'::date,
          SUM(revenue_usd),
          COUNT(*),
          AVG(revenue_usd)
        FROM dwh.fact_orders
        WHERE order_date = '{{ ds }}'
        ON CONFLICT (date) DO UPDATE SET
          total_revenue = EXCLUDED.total_revenue,
          orders_count = EXCLUDED.orders_count,
          avg_order = EXCLUDED.avg_order;
        """,
    )

    extract = PythonOperator(task_id='extract_orders', python_callable=extract_orders)
    transform = PythonOperator(task_id='transform_orders', python_callable=transform_orders)
    load = PythonOperator(task_id='load_to_dwh', python_callable=load_to_dwh)

    # Залежності
    extract >> transform >> load >> aggregate_metrics

Паралельне виконання

from airflow.utils.task_group import TaskGroup

with TaskGroup('process_regions') as process_regions:
    for region in ['EU', 'US', 'APAC']:
        PythonOperator(
            task_id=f'process_{region.lower()}',
            python_callable=process_region_data,
            op_kwargs={'region': region}
        )

# Обробити всі регіони паралельно, потім агрегувати
extract >> process_regions >> aggregate_all

Sensors — очікування умов

from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.filesystem import FileSensor

# Чекати поки файл з'явиться
wait_for_file = FileSensor(
    task_id='wait_for_export',
    filepath='/data/exports/daily_export_{{ ds }}.csv',
    timeout=3600,
    poke_interval=60,
)

# Чекати поки API повернеться успіхом
wait_for_api = HttpSensor(
    task_id='wait_for_processing',
    http_conn_id='data_api',
    endpoint='/status/{{ ds }}',
    response_check=lambda response: response.json()['status'] == 'ready',
    timeout=1800,
    poke_interval=120,
)

KubernetesExecutor

З KubernetesExecutor кожне завдання запускається в окремому Pod:

# Конфігурація Pod для конкретного завдання
executor_config = {
    'KubernetesExecutor': {
        'request_memory': '2Gi',
        'request_cpu': '500m',
        'limit_memory': '4Gi',
        'image': 'custom-airflow:2.8.0-pandas',  # користувацький образ з залежностями
    }
}

heavy_transform = PythonOperator(
    task_id='heavy_transform',
    python_callable=transform_large_dataset,
    executor_config=executor_config
)

Backfill

Пересчитати історичні дані за минулий період:

airflow dags backfill daily_orders_etl \
  --start-date 2026-01-01 \
  --end-date 2026-03-27 \
  --reset-dagruns

Строки виконання

  • Airflow deployment (Helm/Docker) + перший DAG — 3–5 днів
  • ETL-пайплайн з 5–8 завданнями, трансформаціями та DWH-завантаженням — 1–2 тижні
  • Складний пайплайн з паралелізмом, sensors та backfill — 2–4 тижні