Airflow

Supported OS Linux Windows Mac OS

Intégration5.0.1

Présentation

L’Agent Datadog recueille de nombreuses métriques à partir d’Airflow, notamment pour :

  • Les DAG (Directed Acyclic Graphs, ou graphes orientés acycliques) : nombre de processus DAG, taille des dagbags, etc.
  • Les tâches : tâches non réussies, réussies, arrêtées, etc.
  • Les pools : emplacements libres, emplacements utilisés, etc.
  • Les exécuteurs : emplacements libres, tâches en attente, tâches en cours d’exécution, etc.

Les métriques sont recueillies via le plugin StatsD pour Airflow puis envoyées à Datadog DogStatsD.

En plus des métriques, l’Agent Datadog envoie également des checks de service liés à l’état de santé d’Airflow.

Configuration

Installation

Les étapes ci-dessous sont toutes nécessaires pour faire fonctionner l’intégration Airflow. Avant de commencer, installez l’Agent Datadog version >=6.17 ou >=7.17 pour bénéficier de la fonctionnalité de mappage StatsD/DogStatsD.

Configuration

L’intégration Airflow se présente sous deux formes. Il y a d’abord l’intégration de l’Agent Datadog, qui effectue des requêtes vers un endpoint spécifié pour permettre à Airflow de signaler s’il peut établir une connexion et s’il est fonctionnel. Puis il y a la portion StatsD pour Airflow, grâce à laquelle il est possible de configurer Airflow de façon à envoyer des métriques à l’Agent Datadog, qui peut alors remapper la notation Airflow vers une notation Datadog.

Host

Configurer l’intégration Airflow de l’Agent Datadog

Configurez le check Airflow inclus avec le package de l’Agent Datadog pour recueillir ses métriques de santé et ses checks de service. Pour ce faire, modifiez le paramètre url du fichier airflow.d/conf.yaml dans le dossier conf.d/ à la racine du répertoire de configuration de votre Agent pour commencer à recueillir vos checks de service Airflow. Consultez le fichier d’exemple airflow.d/conf.yaml pour découvrir toutes les options de configuration disponibles.

Vérifiez que le paramètre url possède la même valeur que le paramètre base_url du serveur Web d’Airflow (l’URL permettant de se connecter à votre instance Airflow).

Connecter Airflow à DogStatsD

Connectez Airflow à DogStatsD (inclus avec l’Agent Datadog) via la fonctionnalité statsd pour recueillir des métriques. Pour en savoir plus sur les métriques transmises par la version d’Airflow utilisée et sur les options de configuration supplémentaires, consultez la documentation Airflow ci-dessous :

Remarque : les métriques StatsD transmises par Airflow peuvent varier en fonction de l’exécuteur Airflow utilisé. Par exemple, les métriques airflow.ti_failures/successes, airflow.operator_failures/successes et airflow.dag.task.duration ne sont pas transmises pour KubernetesExecutor.

  1. Installez le plugin StatsD pour Airflow.

    pip install 'apache-airflow[statsd]'
    
  2. Modifiez le fichier de configuration airflow.cfg d’Airflow pour y ajouter les paramètres suivants :

    Ne définissez pas `statsd_datadog_enabled` sur true. L'activation de `statsd_datadog_enabled` peut entraîner des conflits. Pour éviter tout problème, vérifiez que la variable est définie sur `false`.
    [scheduler]
    statsd_on = True
    # Hostname or IP of server running the Datadog Agent
    statsd_host = localhost  
    # DogStatsD port configured in the Datadog Agent
    statsd_port = 8125
    statsd_prefix = airflow
    
  3. Modifiez le fichier de configuration principal de l’Agent Datadog datadog.yaml pour y ajouter les paramètres suivants :

    # dogstatsd_mapper_cache_size: 1000  # default to 1000
    dogstatsd_mapper_profiles:
      - name: airflow
        prefix: "airflow."
        mappings:
          - match: "airflow.*_start"
            name: "airflow.job.start"
            tags:
              job_name: "$1"
          - match: "airflow.*_end"
            name: "airflow.job.end"
            tags:
              job_name: "$1"
          - match: "airflow.*_heartbeat_failure"
            name: airflow.job.heartbeat.failure
            tags:
              job_name: "$1"
          - match: "airflow.operator_failures_*"
            name: "airflow.operator_failures"
            tags:
              operator_name: "$1"
          - match: "airflow.operator_successes_*"
            name: "airflow.operator_successes"
            tags:
              operator_name: "$1"
          - match: 'airflow\.dag_processing\.last_runtime\.(.*)'
            match_type: "regex"
            name: "airflow.dag_processing.last_runtime"
            tags:
              dag_file: "$1"
          - match: 'airflow\.dag_processing\.last_run\.seconds_ago\.(.*)'
            match_type: "regex"
            name: "airflow.dag_processing.last_run.seconds_ago"
            tags:
              dag_file: "$1"
          - match: 'airflow\.dag\.loading-duration\.(.*)'
            match_type: "regex"
            name: "airflow.dag.loading_duration"
            tags:
              dag_file: "$1"
          - match: "airflow.dagrun.*.first_task_scheduling_delay"
            name: "airflow.dagrun.first_task_scheduling_delay"
            tags:
              dag_id: "$1"
          - match: "airflow.pool.open_slots.*"
            name: "airflow.pool.open_slots"
            tags:
              pool_name: "$1"
          - match: "airflow.pool.queued_slots.*"
            name: "airflow.pool.queued_slots"
            tags:
              pool_name: "$1"
          - match: "airflow.pool.running_slots.*"
            name: "airflow.pool.running_slots"
            tags:
              pool_name: "$1"
          - match: "airflow.pool.used_slots.*"
            name: "airflow.pool.used_slots"
            tags:
              pool_name: "$1"
          - match: "airflow.pool.starving_tasks.*"
            name: "airflow.pool.starving_tasks"
            tags:
              pool_name: "$1"
          - match: 'airflow\.dagrun\.dependency-check\.(.*)'
            match_type: "regex"
            name: "airflow.dagrun.dependency_check"
            tags:
              dag_id: "$1"
          - match: 'airflow\.dag\.(.*)\.([^.]*)\.duration'
            match_type: "regex"
            name: "airflow.dag.task.duration"
            tags:
              dag_id: "$1"
              task_id: "$2"
          - match: 'airflow\.dag_processing\.last_duration\.(.*)'
            match_type: "regex"
            name: "airflow.dag_processing.last_duration"
            tags:
              dag_file: "$1"
          - match: 'airflow\.dagrun\.duration\.success\.(.*)'
            match_type: "regex"
            name: "airflow.dagrun.duration.success"
            tags:
              dag_id: "$1"
          - match: 'airflow\.dagrun\.duration\.failed\.(.*)'
            match_type: "regex"
            name: "airflow.dagrun.duration.failed"
            tags:
              dag_id: "$1"
          - match: 'airflow\.dagrun\.schedule_delay\.(.*)'
            match_type: "regex"
            name: "airflow.dagrun.schedule_delay"
            tags:
              dag_id: "$1"
          - match: 'airflow.scheduler.tasks.running'
            name: "airflow.scheduler.tasks.running"
          - match: 'airflow.scheduler.tasks.starving'
            name: "airflow.scheduler.tasks.starving"
          - match: 'airflow.sla_email_notification_failure'
            name: 'airflow.sla_email_notification_failure'
          - match: 'airflow\.task_removed_from_dag\.(.*)'
            match_type: "regex"
            name: "airflow.dag.task_removed"
            tags:
              dag_id: "$1"
          - match: 'airflow\.task_restored_to_dag\.(.*)'
            match_type: "regex"
            name: "airflow.dag.task_restored"
            tags:
              dag_id: "$1"
          - match: "airflow.task_instance_created-*"
            name: "airflow.task.instance_created"
            tags:
              task_class: "$1"
          - match: 'airflow\.ti\.start\.(.+)\.(\w+)'
            match_type: regex
            name: airflow.ti.start
            tags: 
              dag_id: "$1"
              task_id: "$2"
          - match: 'airflow\.ti\.finish\.(\w+)\.(.+)\.(\w+)'
            name: airflow.ti.finish
            match_type: regex
            tags: 
              dag_id: "$1"
              task_id: "$2"
              state: "$3"
    
Redémarrer l’Agent Datadog et Airflow
  1. Redémarrez l’Agent.
  2. Redémarrez Airflow pour commencer à envoyer vos métriques Airflow à l’endpoint DogStatsD de l’Agent Datadog.
Checks de service de l’intégration

Utilisez la configuration par défaut de votre fichier airflow.d/conf.yaml pour activer vos checks de service Airflow. Consultez le fichier d’exemple airflow.d/conf.yaml pour découvrir toutes les options de configuration disponibles.

Collecte de logs

Disponible à partir des versions > 6.0 de l’Agent

  1. La collecte de logs est désactivée par défaut dans l’Agent Datadog. Vous devez l’activer dans datadog.yaml :

    logs_enabled: true
    
  2. Supprimez la mise en commentaire du bloc de configuration suivant en bas de votre fichier airflow.d/conf.yaml, puis modifiez-le : Modifiez les valeurs des paramètres path et service et configurez-les pour votre environnement.

    • Configuration des logs pour le planificateur et le gestionnaire de processeurs DAG :

      logs:
        - type: file
          path: "<PATH_TO_AIRFLOW>/logs/dag_processor_manager/dag_processor_manager.log"
          source: airflow
          log_processing_rules:
            - type: multi_line
              name: new_log_start_with_date
              pattern: \[\d{4}\-\d{2}\-\d{2}
        - type: file
          path: "<PATH_TO_AIRFLOW>/logs/scheduler/latest/*.log"
          source: airflow
          log_processing_rules:
            - type: multi_line
              name: new_log_start_with_date
              pattern: \[\d{4}\-\d{2}\-\d{2}
      

      Il est conseillé d’effectuer un nettoyage régulier des logs du planificateur, en réalisant une rotation quotidienne des logs.

    • Configuration supplémentaire pour les logs des tâches DAG :

      logs:
        - type: file
          path: "<PATH_TO_AIRFLOW>/logs/*/*/*/*.log"
          source: airflow
          log_processing_rules:
            - type: multi_line
              name: new_log_start_with_date
              pattern: \[\d{4}\-\d{2}\-\d{2}
      

      Attention : par défaut, Airflow utilise le modèle de fichier de log suivant pour les tâches : log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log. Le nombre de logs augmentera considérablement si vous n’effectuez pas un nettoyage régulier. Ce modèle est utilisé par l’interface d’Airflow pour afficher tous les logs de chaque tâche exécutée.

      Si vous ne consultez pas vos logs depuis l’interface Airflow, nous vous conseillons plutôt d’utiliser la configuration suivante dans airflow.cfg : log_filename_template = dag_tasks.log. Effectuez ensuite une rotation des logs de ce fichier et utilisez cette configuration :

      logs:
        - type: file
          path: "<PATH_TO_AIRFLOW>/logs/dag_tasks.log"
          source: airflow
          log_processing_rules:
            - type: multi_line
              name: new_log_start_with_date
              pattern: \[\d{4}\-\d{2}\-\d{2}
      
  3. Redémarrez l’Agent.

Environnement conteneurisé

Configurer l’intégration Airflow de l’Agent Datadog

Consultez la documentation relative aux modèles d’intégration Autodiscovery pour découvrir comment appliquer les paramètres ci-dessous à un environnement conteneurisé.

ParamètreValeur
<NOM_INTÉGRATION>airflow
<CONFIG_INIT>vide ou {}
<CONFIG_INSTANCE>{"url": "http://%%host%%:8080"}

Vérifiez que le paramètre url possède la même valeur que le paramètre base_url du serveur Web d’Airflow (l’URL permettant de se connecter à votre instance Airflow). Remplacez localhost par la template variable %%host%%.

Connecter Airflow à DogStatsD

Connectez Airflow à DogStatsD (inclus avec l’Agent Datadog) via la fonctionnalité statsd pour recueillir des métriques. Pour en savoir plus sur les métriques transmises par la version d’Airflow utilisée et sur les options de configuration supplémentaires, consultez la documentation Airflow ci-dessous :

Remarque : les métriques StatsD transmises par Airflow peuvent varier en fonction de l’exécuteur Airflow utilisé. Par exemple, les métriques airflow.ti_failures/successes, airflow.operator_failures/successes et airflow.dag.task.duration ne sont pas transmises pour KubernetesExecutor.

Remarque : les variables d’environnement utilisées pour Airflow peuvent varier d’une version à l’autre. Par exemple, Airflow 2.0.0 utilise la variable d’environnement AIRFLOW__METRICS__STATSD_HOST, tandis qu’Airflow 1.10.15 utilise AIRFLOW__SCHEDULER__STATSD_HOST.

La configuration de StatsD pour Airflow peut être activée avec les variables d’environnement suivantes dans un déploiement Kubernetes :

env:
  - name: AIRFLOW__SCHEDULER__STATSD_ON
    value: "True"
  - name: AIRFLOW__SCHEDULER__STATSD_PORT
    value: "8125"
  - name: AIRFLOW__SCHEDULER__STATSD_PREFIX
    value: "airflow"
  - name: AIRFLOW__SCHEDULER__STATSD_HOST
    valueFrom:
      fieldRef:
        fieldPath: status.hostIP

La variable d’environnement pour l’endpoint de host AIRFLOW__SCHEDULER__STATSD_HOST est fournie avec l’adresse IP du host du nœud pour acheminer les données StatsD au pod de l’Agent Datadog sur le même nœud que le pod Airflow. Cette configuration nécessite également que l’Agent dispose d’un hostPort ouvert pour ce port 8125 et accepte le trafic StatsD non local. Pour en savoir plus, consultez la section sur la configuration de DogStatsD sur Kubernetes.

Cette configuration doit rediriger le trafic StatsD provenant du conteneur Airflow vers un Agent Datadog prêt à accepter les données entrantes. La dernière étape consiste à mettre à jour l’Agent Datadog avec les dogstatsd_mapper_profiles correspondants. Pour ce faire, copiez les dogstatsd_mapper_profiles fournis dans l’installation de host dans votre fichier datadog.yaml. Vous pouvez également déployer votre Agent Datadog avec la configuration JSON équivalente dans la variable d’environnement DD_DOGSTATSD_MAPPER_PROFILES. Pour Kubernetes, la notation de variable d’environnement équivalente est la suivante :

env: 
  - name: DD_DOGSTATSD_MAPPER_PROFILES
    value: >
      [{"name":"airflow","prefix":"airflow.","mappings":[{"match":"airflow.*_start","name":"airflow.job.start","tags":{"job_name":"$1"}},{"match":"airflow.*_end","name":"airflow.job.end","tags":{"job_name":"$1"}},{"match":"airflow.*_heartbeat_failure","name":"airflow.job.heartbeat.failure","tags":{"job_name":"$1"}},{"match":"airflow.operator_failures_*","name":"airflow.operator_failures","tags":{"operator_name":"$1"}},{"match":"airflow.operator_successes_*","name":"airflow.operator_successes","tags":{"operator_name":"$1"}},{"match":"airflow\\.dag_processing\\.last_runtime\\.(.*)","match_type":"regex","name":"airflow.dag_processing.last_runtime","tags":{"dag_file":"$1"}},{"match":"airflow\\.dag_processing\\.last_run\\.seconds_ago\\.(.*)","match_type":"regex","name":"airflow.dag_processing.last_run.seconds_ago","tags":{"dag_file":"$1"}},{"match":"airflow\\.dag\\.loading-duration\\.(.*)","match_type":"regex","name":"airflow.dag.loading_duration","tags":{"dag_file":"$1"}},{"match":"airflow.dagrun.*.first_task_scheduling_delay","name":"airflow.dagrun.first_task_scheduling_delay","tags":{"dag_id":"$1"}},{"match":"airflow.pool.open_slots.*","name":"airflow.pool.open_slots","tags":{"pool_name":"$1"}},{"match":"airflow.pool.queued_slots.*","name":"airflow.pool.queued_slots","tags":{"pool_name":"$1"}},{"match":"airflow.pool.running_slots.*","name":"airflow.pool.running_slots","tags":{"pool_name":"$1"}},{"match":"airflow.pool.used_slots.*","name":"airflow.pool.used_slots","tags":{"pool_name":"$1"}},{"match":"airflow.pool.starving_tasks.*","name":"airflow.pool.starving_tasks","tags":{"pool_name":"$1"}},{"match":"airflow\\.dagrun\\.dependency-check\\.(.*)","match_type":"regex","name":"airflow.dagrun.dependency_check","tags":{"dag_id":"$1"}},{"match":"airflow\\.dag\\.(.*)\\.([^.]*)\\.duration","match_type":"regex","name":"airflow.dag.task.duration","tags":{"dag_id":"$1","task_id":"$2"}},{"match":"airflow\\.dag_processing\\.last_duration\\.(.*)","match_type":"regex","name":"airflow.dag_processing.last_duration","tags":{"dag_file":"$1"}},{"match":"airflow\\.dagrun\\.duration\\.success\\.(.*)","match_type":"regex","name":"airflow.dagrun.duration.success","tags":{"dag_id":"$1"}},{"match":"airflow\\.dagrun\\.duration\\.failed\\.(.*)","match_type":"regex","name":"airflow.dagrun.duration.failed","tags":{"dag_id":"$1"}},{"match":"airflow\\.dagrun\\.schedule_delay\\.(.*)","match_type":"regex","name":"airflow.dagrun.schedule_delay","tags":{"dag_id":"$1"}},{"match":"airflow.scheduler.tasks.running","name":"airflow.scheduler.tasks.running"},{"match":"airflow.scheduler.tasks.starving","name":"airflow.scheduler.tasks.starving"},{"match":"airflow.sla_email_notification_failure","name":"airflow.sla_email_notification_failure"},{"match":"airflow\\.task_removed_from_dag\\.(.*)","match_type":"regex","name":"airflow.dag.task_removed","tags":{"dag_id":"$1"}},{"match":"airflow\\.task_restored_to_dag\\.(.*)","match_type":"regex","name":"airflow.dag.task_restored","tags":{"dag_id":"$1"}},{"match":"airflow.task_instance_created-*","name":"airflow.task.instance_created","tags":{"task_class":"$1"}},{"match":"airflow\\.ti\\.start\\.(.+)\\.(\\w+)","match_type":"regex","name":"airflow.ti.start","tags":{"dag_id":"$1","task_id":"$2"}},{"match":"airflow\\.ti\\.finish\\.(\\w+)\\.(.+)\\.(\\w+)","name":"airflow.ti.finish","match_type":"regex","tags":{"dag_id":"$1","task_id":"$2","state":"$3"}}]}]      
Collecte de logs

Disponible à partir des versions > 6.0 de l’Agent

La collecte des logs est désactivée par défaut dans l’Agent Datadog. Pour l’activer, consultez la section Collecte de logs Kubernetes.

ParamètreValeur
<CONFIG_LOG>{"source": "airflow", "service": "<NOM_APPLICATION>"}

Validation

Lancez la sous-commande status de l’Agent et cherchez airflow dans la section Checks.

Annexe

Hook Datadog pour Airflow

Un hook Datadog pour Airflow peut également être utilisé pour interagir avec Datadog :

  • Envoyer des métriques
  • Interroger des métriques
  • Envoyer des événements

Données collectées

Métriques

Événements

Le check Airflow n’inclut aucun événement.

Checks de service

Dépannage

Besoin d’aide ? Contactez l’assistance Datadog.

PREVIEWING: may/embedded-workflows