Conversion Metrics

Track and analyze migration operations for optimization and reporting.

Overview

The metrics module collects data about migration operations, helping you:

  • Track success rates across migration projects
  • Identify common patterns and coverage gaps
  • Analyze warning patterns to improve workflows
  • Generate reports for stakeholders

Enabling Metrics

Metrics collection is opt-in via environment variable:

# Enable metrics
export AIRFLOW_UNFACTOR_METRICS=1

# Or use other truthy values
export AIRFLOW_UNFACTOR_METRICS=true
export AIRFLOW_UNFACTOR_METRICS=yes

When disabled (default), record_conversion() returns None and no data is stored.

Recording Conversions

from airflow_unfactor.metrics import record_conversion

# Record a successful conversion
metrics = record_conversion(
    dag_id="my_etl_dag",
    success=True,
    operators_total=10,
    operators_converted=9,
    operators_unknown=1,
    warnings=["Unknown operator: MyCustomOperator"],
    features_detected=["taskflow", "datasets"],
    execution_time_ms=150.5,
)

# Record a failed conversion
metrics = record_conversion(
    dag_id="problem_dag",
    success=False,
    error_message="Parse error on line 42",
)

Parameters

ParameterTypeDescription
dag_idstringID of the DAG being converted
successbooleanWhether conversion succeeded (default: True)
operators_totalintegerTotal operators in the DAG
operators_convertedintegerSuccessfully converted operators
operators_unknownintegerUnknown/unsupported operators
warningslist[string]Warning messages generated
features_detectedlist[string]Features like "taskflow", "datasets"
execution_time_msfloatConversion time in milliseconds
error_messagestringError message if conversion failed

Aggregate Statistics

from airflow_unfactor.metrics import get_aggregate_stats

stats = get_aggregate_stats()

print(f"Total conversions: {stats.total_conversions}")
print(f"Success rate: {stats.success_rate:.1%}")
print(f"Operator coverage: {stats.operator_coverage:.1%}")
print(f"Average time: {stats.avg_execution_time_ms:.0f}ms")

AggregateStats Fields

FieldTypeDescription
total_conversionsintegerTotal conversion attempts
successful_conversionsintegerSuccessful conversions
failed_conversionsintegerFailed conversions
success_ratefloatRatio of successful to total
total_operatorsintegerTotal operators across all DAGs
operators_convertedintegerSuccessfully converted operators
operators_unknownintegerUnknown operators encountered
operator_coveragefloatRatio of converted to total
total_warningsintegerTotal warnings generated
warning_frequencydictCount by warning category
features_frequencydictCount by feature detected
avg_execution_time_msfloatAverage conversion time

Exporting Data

JSON Export

from airflow_unfactor.metrics import export_json

json_data = export_json()
print(json_data)

Output format:

{
  "metrics": [
    {
      "dag_id": "my_etl",
      "timestamp": "2024-01-15T10:30:00",
      "success": true,
      "operators_total": 10,
      "operators_converted": 9,
      "warnings": ["Unknown operator: MyCustom"]
    }
  ],
  "aggregate": {
    "total_conversions": 50,
    "success_rate": 0.92,
    "operator_coverage": 0.87
  },
  "exported_at": "2024-01-15T12:00:00"
}

File Export

from airflow_unfactor.metrics import export_to_file

export_to_file("metrics_report.json")
export_to_file("/path/to/reports/migration_metrics.json")

Integration with Migration Workflow

The metrics module tracks LLM-assisted conversions. After each DAG migration:

import os
from pathlib import Path
from airflow_unfactor.metrics import (
    record_conversion,
    get_aggregate_stats,
    export_to_file,
    clear_metrics,
)

# Enable metrics
os.environ["AIRFLOW_UNFACTOR_METRICS"] = "1"

# Clear any previous data
clear_metrics()

# After each LLM-assisted conversion, record the result
record_conversion(
    dag_id="my_etl",
    success=True,
    operators_total=10,
    operators_converted=10,
    features_detected=["taskflow", "datasets"],
)

# Generate report
stats = get_aggregate_stats()
print(f"\nMigration Summary:")
print(f"  Converted: {stats.successful_conversions}/{stats.total_conversions}")
print(f"  Success rate: {stats.success_rate:.1%}")
print(f"  Operator coverage: {stats.operator_coverage:.1%}")

# Export for analysis
export_to_file("migration_report.json")

CI/CD Integration

# .github/workflows/migration.yml
- name: Run migration
  env:
    AIRFLOW_UNFACTOR_METRICS: "1"
  run: |
    python scripts/migrate_dags.py

- name: Upload metrics
  uses: actions/upload-artifact@v4
  with:
    name: migration-metrics
    path: metrics_report.json

Utility Functions

Clear Metrics

from airflow_unfactor.metrics import clear_metrics

clear_metrics()

Get All Metrics

from airflow_unfactor.metrics import get_all_metrics

all_metrics = get_all_metrics()
for m in all_metrics:
    print(f"{m.dag_id}: {'Y' if m.success else 'N'}")

Check if Enabled

from airflow_unfactor.metrics import metrics_enabled

if metrics_enabled():
    print("Metrics collection is active")
else:
    print("Set AIRFLOW_UNFACTOR_METRICS=1 to enable")

Best Practices

  1. Enable in batch operations — Track progress across many DAGs
  2. Export regularly — Save metrics before clearing
  3. Analyze warnings — Identify patterns to improve conversions
  4. Use in CI/CD — Track migration health over time
  5. Share reports — Keep stakeholders informed of progress