Conversion Metrics
Track and analyze migration operations for optimization and reporting.
Overview
The metrics module collects data about migration operations, helping you:
- Track success rates across migration projects
- Identify common patterns and coverage gaps
- Analyze warning patterns to improve workflows
- Generate reports for stakeholders
Enabling Metrics
Metrics collection is opt-in via environment variable:
# Enable metrics
export AIRFLOW_UNFACTOR_METRICS=1
# Or use other truthy values
export AIRFLOW_UNFACTOR_METRICS=true
export AIRFLOW_UNFACTOR_METRICS=yes
When disabled (default), record_conversion() returns None and no data is stored.
Recording Conversions
from airflow_unfactor.metrics import record_conversion
# Record a successful conversion
metrics = record_conversion(
dag_id="my_etl_dag",
success=True,
operators_total=10,
operators_converted=9,
operators_unknown=1,
warnings=["Unknown operator: MyCustomOperator"],
features_detected=["taskflow", "datasets"],
execution_time_ms=150.5,
)
# Record a failed conversion
metrics = record_conversion(
dag_id="problem_dag",
success=False,
error_message="Parse error on line 42",
)
Parameters
| Parameter | Type | Description |
|---|---|---|
dag_id | string | ID of the DAG being converted |
success | boolean | Whether conversion succeeded (default: True) |
operators_total | integer | Total operators in the DAG |
operators_converted | integer | Successfully converted operators |
operators_unknown | integer | Unknown/unsupported operators |
warnings | list[string] | Warning messages generated |
features_detected | list[string] | Features like "taskflow", "datasets" |
execution_time_ms | float | Conversion time in milliseconds |
error_message | string | Error message if conversion failed |
Aggregate Statistics
from airflow_unfactor.metrics import get_aggregate_stats
stats = get_aggregate_stats()
print(f"Total conversions: {stats.total_conversions}")
print(f"Success rate: {stats.success_rate:.1%}")
print(f"Operator coverage: {stats.operator_coverage:.1%}")
print(f"Average time: {stats.avg_execution_time_ms:.0f}ms")
AggregateStats Fields
| Field | Type | Description |
|---|---|---|
total_conversions | integer | Total conversion attempts |
successful_conversions | integer | Successful conversions |
failed_conversions | integer | Failed conversions |
success_rate | float | Ratio of successful to total |
total_operators | integer | Total operators across all DAGs |
operators_converted | integer | Successfully converted operators |
operators_unknown | integer | Unknown operators encountered |
operator_coverage | float | Ratio of converted to total |
total_warnings | integer | Total warnings generated |
warning_frequency | dict | Count by warning category |
features_frequency | dict | Count by feature detected |
avg_execution_time_ms | float | Average conversion time |
Exporting Data
JSON Export
from airflow_unfactor.metrics import export_json
json_data = export_json()
print(json_data)
Output format:
{
"metrics": [
{
"dag_id": "my_etl",
"timestamp": "2024-01-15T10:30:00",
"success": true,
"operators_total": 10,
"operators_converted": 9,
"warnings": ["Unknown operator: MyCustom"]
}
],
"aggregate": {
"total_conversions": 50,
"success_rate": 0.92,
"operator_coverage": 0.87
},
"exported_at": "2024-01-15T12:00:00"
}
File Export
from airflow_unfactor.metrics import export_to_file
export_to_file("metrics_report.json")
export_to_file("/path/to/reports/migration_metrics.json")
Integration with Migration Workflow
The metrics module tracks LLM-assisted conversions. After each DAG migration:
import os
from pathlib import Path
from airflow_unfactor.metrics import (
record_conversion,
get_aggregate_stats,
export_to_file,
clear_metrics,
)
# Enable metrics
os.environ["AIRFLOW_UNFACTOR_METRICS"] = "1"
# Clear any previous data
clear_metrics()
# After each LLM-assisted conversion, record the result
record_conversion(
dag_id="my_etl",
success=True,
operators_total=10,
operators_converted=10,
features_detected=["taskflow", "datasets"],
)
# Generate report
stats = get_aggregate_stats()
print(f"\nMigration Summary:")
print(f" Converted: {stats.successful_conversions}/{stats.total_conversions}")
print(f" Success rate: {stats.success_rate:.1%}")
print(f" Operator coverage: {stats.operator_coverage:.1%}")
# Export for analysis
export_to_file("migration_report.json")
CI/CD Integration
# .github/workflows/migration.yml
- name: Run migration
env:
AIRFLOW_UNFACTOR_METRICS: "1"
run: |
python scripts/migrate_dags.py
- name: Upload metrics
uses: actions/upload-artifact@v4
with:
name: migration-metrics
path: metrics_report.json
Utility Functions
Clear Metrics
from airflow_unfactor.metrics import clear_metrics
clear_metrics()
Get All Metrics
from airflow_unfactor.metrics import get_all_metrics
all_metrics = get_all_metrics()
for m in all_metrics:
print(f"{m.dag_id}: {'Y' if m.success else 'N'}")
Check if Enabled
from airflow_unfactor.metrics import metrics_enabled
if metrics_enabled():
print("Metrics collection is active")
else:
print("Set AIRFLOW_UNFACTOR_METRICS=1 to enable")
Best Practices
- Enable in batch operations — Track progress across many DAGs
- Export regularly — Save metrics before clearing
- Analyze warnings — Identify patterns to improve conversions
- Use in CI/CD — Track migration health over time
- Share reports — Keep stakeholders informed of progress