Zichen Jiang
Airflow en bref
Airflow a été créé par Airbnb en 2014. L’outil est devenu un incubateur Apache en 2016, et a été adopté comme projet de l’Apache Software Foundation en 2019. C’est une plateforme codée en Python qui permet la planification et le suivi automatisés des flux de travail. Elle est conçue pour exécuter une série de tâches à partir de dépendances et de contraintes de temps préprogrammées. Son interface utilisateur riche présente tous les flux de travail en cours, ce qui facilite le suivi et la résolution de problèmes. Les flux de travail y sont d’ailleurs codifiés, pour une gestion facile des mises à jour, des versions et des étapes de test.
Comment fonctionne Apache Airflow?
Dans Airflow, chaque flux de travail est défini par un graphe orienté acyclique (ou DAG, pour Directed Acyclic Graph), lequel contient des « tâches », soit des unités de travail individuelles. Les tâches, elles, sont associées à des dépendances et à des branches.
Les instances du logiciel sont généralement composées d’un ordonnanceur, d’un serveur Web et d’une base de métadonnées. L’ordonnanceur déclenche des flux de travail préplanifiés et envoie des tâches à l’exécuteur, qui les exécute. Le serveur Web, lui, héberge une interface permettant aux utilisateurs de déclencher et de déboguer les DAG. Quant à la base de métadonnées, elle stocke les états, les registres et les paramètres de toutes les composantes.
Tous les DAG sont définis en Python et très extensibles. Vous pouvez définir les tâches qui conviennent à votre environnement, planifier le déclenchement des flux de travail (calendrier ou événements externes), et déployer facilement les instances vers n’importe quel service infonuagique.
Pourquoi choisir Apache Airflow?
- Dans Airflow, les flux de travail sont codifiés en Python. Les développeurs peuvent écrire des scripts qui les instancient de manière dynamique.
- Ils peuvent facilement définir leurs propres opérateurs et exécuteurs, et ajouter des dépendances au besoin.
- C’est une plateforme modulaire capable d’orchestrer autant de « workers » que souhaité, et donc utilisable à grande échelle.
- Grâce à la diversité des opérateurs disponibles dans Airflow, rien de plus simple que de convertir les flux de travail existants en DAG.
- Le déclenchement des flux de travail peut être automatisé (calendrier ou événements externes).
- Le tableau de bord analytique aide à optimiser ces derniers.
- Plusieurs flux de données peuvent être exécutés à différents moments dans la même instance.
Les opérateurs d’Airflow
Un opérateur est un gabarit de tâche prédéfinie qu’on peut définir de façon déclarative dans un DAG. Airflow propose une multitude d’opérateurs couvrant la plupart des besoins de base, notamment :
- BashOperator – exécute une commande Bash
- PythonOperator – appelle une fonction Python
- KubernetesPodOperator – crée et exécute un ensemble de conteneurs (pod) sur une grappe Kubernetes
- EmailOperator – envoie un courriel
- SimpleHttpOperator – envoie une requête HTTP
Vous pouvez bien entendu installer d’autres opérateurs depuis des progiciels fournisseurs (provider packages).
Exemple
Si vous n’avez pas déjà installé et configuré Airflow, créez votre première instance en dix minutes grâce à ce guide :
Airflow Docs (en anglais seulement).
Avant de suivre les étapes, vous devez installer Docker sur votre machine.
Voici un exemple de flux de données extrait d’un projet créé par Levio. Dans cet exemple, le flux s’articule en deux étapes :
- Attendre de recevoir un fichier .ZIP.
- Extraire le contenu du fichier .ZIP.
- Exécuter les fichiers dans des programmes Java et Python pour en extraire l’information.
- Appeler une API pour classer les fichiers en fonction de l’information extraite.
- Supprimer le fichier .ZIP reçu et tous les fichiers extraits.
- Envoyer un courriel de confirmation aux développeurs une fois que le flux s’est correctement exécuté.
Regardons de plus près le code du DAG présenté ci-dessus. Dans le script ci-dessous , nous initialisons un DAG avec des arguments prédéfinis. Grâce à l’argument « depends_on_past », le déclenchement du flux dépendra des résultats de l’exécution précédente. Si la valeur « True » est associée à l’argument, le flux ne s’exécutera que si le dernier cycle a fonctionné. L’argument « concurrency » limite le nombre d’instances simultanées du flux. L’argument « schedule_interval » utilise le format de travail « cron ». Dans cet exemple, le flux s’exécute tous les jours à 4 h.
args = {
"owner": "levio",
"email": ["info@levio.ca"],
"depends_on_past": False,
"start_date": "2021-08-03",
"email_on_failure": True,
"email_on_retry": True,
"retries": 2,
}
dag = DAG(
dag_id="example_pipeline",
default_args=args,
concurrency = 1,
schedule_interval="0 4 * * *")
Regardons maintenant la première tâche, « zipfile_sensor ». Cette tâche FileSensor vérifie toutes les 30 secondes s’il existe un fichier /input/pdf_files.zip. Si le fichier existe, la tâche activera la suivante. Vous remarquerez qu’il faut configurer le paramètre dans chaque tâche pour que celles-ci soient associées au bon DAG.
zipfile_sensor = FileSensor(
task_id="zipfile_sensor",
poke_interval=30,
fs_conn_id="fs_conn_1",
filepath="/input/pdf_files.zip",
dag=dag,
)
Le paramètre « fs_conn_id » renvoie à un identifiant de connexion (« Conn_id ») devant être configuré via l’interface Web d’Airflow (page « Connections », dans le menu « Admin »).
Configurer une nouvelle connexion s’affiche
Lorsque vous cliquez sur le « + » bleu, une page vous permettant de configurer une nouvelle connexion s’affiche. Vous pouvez choisir parmi une douzaine de connexions. Une fois que vous en avez configuré une, vous pouvez la désigner dans votre flux de travail en indiquant sa « Conn id ».
Appeler une commande Bash
Pour ce qui est de la tâche « unzip_files », un simple BashOperator (dont le rôle est d’appeler une commande Bash) nous permet d’extraire le fichier .ZIP récupéré. Évidemment, libre à vous de coder des scripts Bash plus complexes et de les exécuter en appelant leur nom. Par exemple : « bash_command=”bash your_custom_script.sh” ».
unzip_files = BashOperator(
task_id="unzip_files",
bash_command="unzip pdf_files.zip",
dag=dag,
)
Une fois le fichier décompressé, nous allons créer un pod sur une grappe Kubernetes, lequel exécutera un programme Java permettant de convertir les fichiers PDF en images. Les paramètres de KubernetesPodOperator sont très semblables à ceux que vous configureriez dans un fichier YAML pour créer le pod en question. « is_delete_operator_pod » permet de déterminer si le pod doit être supprimé de la grappe une fois le travail terminé, et « image_pull_secrets » est une variable d’environnement que vous configurez dans Airflow pour extraire une image depuis un répertoire de conteneurs privé.
volume = k8s.V1Volume(name = "ra-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
claim_name="example-pvc"))
volumeMount = k8s.V1VolumeMount(mount_path = "/example-volume-mount",
name="example-volume", sub_path = None, read_only = False)
exampleSecrets = Secret("volume", "/secret-path", "example-credentials")
resourceLimit = {"limit_cpu":"4","limit_memory":"16G",
"request_memory":"200M", "request_cpu": "100m"}
java_program = KubernetesPodOperator(
image = "example_docker_image:latest",
namespace = "example_namespace",
env_vars = {
"INPUT_FOLDER": "/input/",
"OUTPUT_FOLDER": "/output/",
"DB_CRED_PATH": "/etc/secret-volume/db.properties",
},
image_pull_policy = "Always",
image_pull_secrets="example-secretes",
name = "java-program",
task_id = "java_program",
get_logs = True,
is_delete_operator_pod = True,
dag = dag,
startup_timeout_seconds = 120,
resources = resourceLimit,
volumes = [volume],
volume_mounts = [volumeMount],
secrets = [exampleSecrets]
)
Après avoir converti les fichiers en images, nous devons déterminer quel script Python utiliser pour en extraire l’information. Dans cet exemple, nous avons défini une fonction simple qui retourne l’identifiant d’une des tâches Python à partir d’un nombre aléatoire (la logique serait bien plus complexe dans un flux de production). Il est probablement préférable que le script Python soit conservé dans un fichier distinct. Vous pouvez même créer votre propre bibliothèque Python et l’importer dans le DAG.
def python_branch():
task_id = str(randint(1,2))
if task_id == 1:
return "python_script_1"
else:
return "python_script_2"
branch = BranchPythonOperator(
task_id="choose_python_script",
python_callable=python_branch,
dag=dag,
)
Ensuite, la tâche dont l’identifiant correspond à la valeur retournée par la fonction « branch » s’exécute. Ici, « PythonOperator » appelle une fonction Python. Dans cet exemple, nous avons défini une fonction simple dans le DAG (là encore, la fonction serait bien plus complexe dans un flux de production). Comme pour « PythonBranchOperator » décrit ci-dessus, vous pouvez importer les fonctions Python depuis votre propre bibliothèque.
def python_function_1():
print("processing data through python function 1")
# do the data processing
python_script_1 = PythonOperator(
task_id="python_script_1",
python_callable=python_function_1,
op_kwargs={"db":"dev", "input_folder":"/input/"},
dag=dag,
)
Une fois que le script Python a extrait l’information et l’a stockée dans notre base de données, nous appelons une API pour classer les documents en fonction de cette information. Nous pouvons utiliser « SimpleHttpOperator » pour créer une requête POST. L’identifiant « http_conn_id » est semblable à l’identifiant « fs_conn_id » retrouvé dans la tâche FileSensor. Il renvoie à une connexion que vous aurez configurée dans Airflow. La méthode de configuration est identique à celle décrite aux figures 3 et 4. Le paramètre « trigger_rule=”none_failed” » joue un rôle crucial dans cette tâche. Par défaut, il prend la valeur « all_success » : la tâche ne s’exécutera que si toutes les tâches en amont se sont exécutées correctement. Comme nous avons ignoré une des tâches de script Python, la condition « all_success » ne sera pas satisfaite. C’est pourquoi nous attribuerons la valeur « none_failed » au paramètre, qui permettra à la tâche de s’exécuter tant qu’aucune tâche en amont n’a rencontré d’erreur.
call_api = SimpleHttpOperator(
task_id="call_api",
http_conn_id="http_conn_1",
endpoint="/relative-path-to-api",
method="POST",
trigger_rule='none_failed',
dag=dag,
)
L’exemple ne le montre pas, mais « SimpleHttpOperator » peut aussi envoyer des données à une API cible grâce au paramètre « data » :
data={"param1": "value1", "param2": "value2"},
Par défaut, « SimpleHttpOperator » retourne le corps de réponse en texte brut et peut l’envoyer à la tâche suivante. Vous pouvez aussi choisir de modifier la réponse avant qu’elle soit envoyée, avec le paramètre « response_filter ». Exemple :
response_filter=lambda response: response.json()['nested']['property']
Une fois le traitement terminé, il faudrait idéalement supprimer le fichier .ZIP initial et tous les fichiers créés pour éviter de s’en servir par erreur lors du prochain cycle. Le script est semblable à celui de la tâche « unzip_files ». Vous pouvez aussi exécuter un script Bash plus complexe en attribuant la valeur « bash clean_up.script » au paramètre « bash_command ».
clean_up = BashOperator(
task_id="clean_up",
bash_command="rm /input/*.zip /input/*.pdf /output/*.jpg",
dag=dag,
)
Une fois le flux de travail terminé, nous souhaitons envoyer un courriel de notification aux développeurs. Pour ce faire, nous pouvons utiliser « EmailOperator ». Dans cet exemple, on envoie un courriel avec un objet et un contenu plutôt simples. Une fois de plus, vous pouvez accéder à l’API Airflow pour envoyer des courriels plus détaillés, indiquant notamment le temps d’exécution de chaque tâche, du flux de travail au complet, etc.
email_success = EmailOperator(
task_id="email_success",
to="info@indellient.com",
subject="Example Pipeline Success",
html_content="<p>The Example Pipeline ran successfully!</p>",
dag=dag,
)
À la fin, il nous faut explicitement indiquer à Airflow l’ordre dans lequel on souhaite que les tâches s’exécutent, et s’il y a des branches conditionnelles. Le symbole « >> » sert à connecter deux tâches ensemble. La direction des flèches précise que le flux s’exécute de la tâche de gauche à celle de droite. Nous insérons d’ailleurs les deux tâches Python conditionnelles dans un tableau pour en indiquer la relation parallèle. Le paramètre « trigger_rule='none_failed’ » doit être inséré dans la tâche qui suit directement les tâches parallèles.
zipfile_sensor >> unzip_files >> java_program >> branch >> \
[python_script_1, python_script_2] >> call_api >> clean_up >> email_success
Arranger des tâches en tableaux peut aussi servir à exécuter plusieurs tâches en parallèle pour réduire le temps de traitement. Imaginez que toutes les tâches suivantes sont des PythonOperators. Une fois que « task1 » s’est exécutée, « task2 », « task3 » et « task4 » se déclencheront toutes en parallèle.
task1 >> [task2, task3, task4]
Ce n’est qu’un exemple, c’est pourquoi nous utilisons des noms génériques. Par exemple, « KubernetesPodOperator » utilise une image qui n’existe pas, les « PythonOperators » appellent de simples fonctions « print » tout en haut du fichier DAG, et « SimpleHttpOperator » ne pointe pas vers une URL d’API valide.
Les cas d’utilisation
On utilise généralement Airflow pour automatiser des processus d’extraction, de transformation et de chargement des données (ETC), pour créer des flux de données et pour alimenter des entrepôts de données. Quand transformer vos données sources demande une dizaine d’étapes, les traiter manuellement n’est pas très pratique, et développer des logiciels personnalisés pour gérer ces tâches peut être coûteux et chronophage.
Grâce à Airflow, nous avons créé un flux de données qui récupère des données sources et les soumet à plus de 20 étapes de transformation. Notre client le faisait manuellement, et seulement une fois par mois, puisque le processus prenait plusieurs jours. Depuis que nous avons migré le flux dans Airflow, il s’exécute tous les jours aux heures creuses de manière presque complètement automatisée. Comme le flux avait été codé en Python, la migration s’est faite en un tournemain.
Airlfow révèle toute son utilité pour les flux qui s’exécutent habituellement entre plusieurs programmes indépendants. Lors d’un autre projet, nous avons travaillé avec un flux codé en Java et en Python. Son exécution reposait sur des fichiers externes et sur des API RESTful. C’est un cas de figure idéal pour Airflow. Nous avons utilisé « KubernetesPodOperator » pour créer des ensembles de conteneurs (pods) sur une grappe Kubernetes afin d’exécuter le programme Java et quelques-uns des scripts Python les plus gourmands. Nous avons utilisé « PythonOperator » pour les scripts Python les plus simples, et « SimpleHttpOperator » pour envoyer les requêtes HTTP aux API. Pour vérifier toutes les heures si les fichiers externes ont été récupérés, nous avons utilisé des détecteurs. Enfin, l’interface Web d’Airflow nous a permis de suivre et de déboguer le flux avec beaucoup de facilité. Grâce à elle, rien de plus simple que de détecter les étapes qui posent problème et de consulter le registre pour résoudre ces derniers.
Conclusion
Apache Airflow est une excellente plateforme pour exécuter vos flux de données avec fiabilité. Son extensibilité vous permet de créer un flux reposant sur des outils et des programmes codés dans des langages différents. En outre, c’est une plateforme utilisable à l’échelle qui a été adoptée par des milliers d’entreprises dans le monde. Elle bénéficie d’une riche communauté d’utilisateurs et s’intègre avec facilité dans de nombreuses plateformes infonuagiques – un choix fiable et populaire pour créer vos flux de données.