Développer une pipeline de traitement avec Delta Lake et Airflow déployée sur OVHCloud

Résumé : Rendre ses Data Lake plus vivants, alignés avec les cycles de vie des données et les exigences de réactivité des équipes métier.

Ce défi est à portée de main avec les technologies de type Delta Lake.

Nous l’illustrons dans cet article par un projet complet que Novagen a mis en oeuvre.

Cet article propose une solution de pipelines de traitements, de validation et de contrôles des données sur un Data Lake existant. La finalité est la création d’un moteur de recherche d’œuvres d’art.

Dans ce cadre, Novagen s’est intéressé à la configuration d’algorithmes de manipulation de données sur un Data Lake. Les données sont composées :

  • d’un ensemble de fichiers textuels, json et html, chargés en mode batch,
  • de données modifiées par les utilisateurs en temps réel transmis par des streams,
  • des exports de base de données sur le Data Lake.

Pipeline OVHCloud Delta Lake

Cet article propose une solution de pipelines de traitement, de validation et de contrôle de données sur un Data Lake existant. La finalité est la création d’un moteur de recherche.

Création de la pipeline

Technologies choisies

Delta Lake est une solution open source, développée par Databricks, qui accompagne le déploiement de pipelines fiables sur un Data Lake à travers une couche de stockage supplémentaire.

En voici les caractéristiques principales :

*

Les données sont insérées dans des tables Delta avec un schéma et des contraintes définis par l’utilisateur.

*

Les insertions dans les tables Delta sont transactionnelles.
 

*

Ces transactions sont ACID, évitant ainsi de se polluer les tables avec des données corrompues.

*

L’isolation des transactions permet un contrôle des modifications concurrentes sur la table.

Ce dernier point est particulièrement important dans le cas de notre service qui offre la possibilité à tous les utilisateurs d’effectuer des modifications en temps réel. Delta résout de manière optimiste les conflits : si 2 opérations simultanées modifient la même partition alors le moteur enregistre une opération puis retente la deuxième si les lignes concernées par la deuxième n’ont pas été modifiées.

A chaque opération sur la table, des logs transactionnels décrivant les modifications sont créés. Grâce à cette sauvegarde supplémentaire, il est possible de revenir à des versions antérieures de la table. Dans le cas de notre moteur de recherche, cette fonctionnalité est très intéressante notamment dans le cas de modifications erronées d’une ou d’en ensemble de données de la table. Cela évite d’avoir à relancer l’ensemble de la pipeline.

Les étapes de traitement sur les tables sont effectués avec Apache Spark. Il est possible d’utiliser le langage core de Spark ainsi que les fonctionnalités SQL. 

Delta Lake incite les développeurs a respecter le schéma de transformation appelé « Bronze Silver Gold ». Ce design pattern est un schéma de pipeline qui prend en entrée les données du DataLake ou d’autres sources Stream et Batch et effectue trois groupes d’opérations successives sur celles-ci.

  • Bronze : Sauvegarde des données entrantes dans une table qui garde les données dans leur format originel
  • Silver (Argent) : Les tables générées à l’étape Bronze sont transformées à travers des filtres, des augmentations, des suppressions, des corrections, …
  • Gold (Or): La dernière étape est adaptée à chaque cas métier. Des agrégations et des transformations métiers fournissent des tables prêtes à être consommées pour les besoins fonctionnels.

Développement

La première partie correspond à la transformation des fichiers d’un container du DataLake en une table Delta. On crée notre table delta avec un premier fichier JSON si celle-ci n’a pas encore été initialisée.

//Initialisation de la table Delta
val firstFile = spark.read().option("multiline", "true").json("$repoPath/$firstFileName")
if (!DeltaTable.isDeltaTable(spark, deltaTableFilePath)) {
    firstFile.write().format("delta").save(deltaTableFilePath)
}

On doit ensuite mettre en place un stream d’upsert (insertion ou mise à jour) de notre table avec tous les fichiers présents et arrivant sur le bucket.

//Création du Stream d’upsert dans les tables Delta
val sourceStream = spark.readStream().option("multiline","true").schema(schema).json(jsonPath)
val writeStream = sourceStream.writeStream().format("delta")
    .foreachBatch(VoidFunction2<Dataset<Row?>?, Long?> { dataset, batchId ->
        upsertToDelta(dataset, batchId, deltaTableFilePath, colMergeName)
    }).outputMode("update").start()
writeStream.awaitTermination()

Dans le cas contraire (whenNotMatched) on insère la ligne.

private fun upsertToDelta(microBatchOutputDF: Dataset<Row?>,batchId: Long,
    deltaTableFilePath: String, colMergeName: String) {
    val deltaTable = DeltaTable.forPath(deltaTableFilePath)
    deltaTable.`as`("oldData").merge(
            microBatchOutputDF.`as`("newData"),
            "oldData.$colMergeName = newData.$colMergeName")
        .whenMatched().updateAll()
        .whenNotMatched().insertAll()
        .execute()
}

Dans la même logique, pour créer les table Silver, on peut ensuite fusionner les différentes tables Delta en créant des streams (flux de données) qui ont pour données entrantes la table Delta à fusionner et pour sortie la table avec les données agrégées.

On peut appliquer différentes options de modification des données entrantes pour garder uniquement les données validées.

val stream = spark.readStream().format("delta")
    .load(inputTablePath)
    .withColumnRenamed(...)
    .select(...)
    .filter(...)
    .toDF(...)
    .withColumn(...)
    .writeStream()
...

Déployer

Data Processing est un service Spark dédié fournit par OVH Cloud afin de simplifier et d’accélérer les traitements big data. Cette fonctionnalité permet de réduire le coût de déploiement car le service se lance rapidement et seule la durée d’utilisation est facturée. Ce comportement est particulièrement adapté au projet dans lequel certains jobs Batch ne nécessitent pas un lancement en continu d’une instance.
Il faut tout d’abord créer un conteneur dans OVHcloud Object Storage et y ajouter le jar de l’application.
Ovh propose ensuite une interface ligne de commande Data Processing permettant de soumettre des jobs à l’aide de la commande « ovh-spark-submit ».

$ ./ovh-spark-submit --projectid yourProjectId --class org.apache.spark.examples.SparkPi --driver-cores 1 --driver-memory 4G --executor-cores 1 --executor-memory 4G --num-executors 1 swift://odp/spark-examples.jar 1000

Il est nécessaire de spécifier les différentes configurations identique à un spark-submit, le jar, le nom du job, les ressources… Data Processing gère alors le déploiement et l’exécution

Pour plus de précisions, se reporter à notre article complet sur la solution

Orchestration de la pipeline par Airflow

Pour orchestrer notre pipeline nous avons opté pour Airflow, solution open source de référence pour l’orchestration de flux. Le choix de cet outil repose sur sa flexibilité, ses nombreux opérateurs, et sa communauté d’utilisateurs de plus en plus active.
Une orchestration est nécessaire sur notre Data Lake du fait de la dépendance entre certaines tâches. Airflow permet en effet de démarrer un job à partir du résultat d’un autre.

Concepts de base :

Airflow est basé sur la notion de DAG (Directed Acyclic Graph), un ensemble de tasks à exécuter. Ces dernières doivent s’exécuter dans un ordre précis (en parallèle ou séquentiellement), sans retour possible ; donc pas de boucles infinies. Ces tasks sont ordonnancées par un scheduler (ou planificateur).

Une interface de visualisation, WebUI, permet de monitorer, et d’assurer un suivi de d’avancement dans l’exécution des jobs.

Différentes informations sont disponibles sur WebUI, notamment un schéma illustrant le graphe d’exécution (DAG), visible ci-dessous.

On retrouve l’implémentation du design pattern Bronze, Silver et Gold.

*

L’étape Bronze correspond aux 3 tâches parallèles de parsing des fichiers jsons vers des tables delta.

Cette première étape est directement suivie d’une étape de compaction de fichiers (expliquée dans la dernière partie de cette article).

*

L’étape Silver correspond à la fusion entre les tables issues du parsing permettant l’agrégation des informations.

*

Enfin, l’étape Gold correspond à la préparation des informations pour leur export hors du Data Lake.

Composants d’architecture 

L’exécution des tasks se fait sur des Executors. Par défaut, Airflow utilise SequentialExecutor qui impose une exécution séquentielle. Notre pipeline demande un parallélisme sur cette étape d’où l’intérêt d’utiliser LocalExecutor, plus performant pour notre besoin.
Les configurations s’effectuent dans le fichier airflow.cfg. On ajoute donc la ligne suivante afin de définir un nouvel executor.

executor = LocalExecutor

Airflow sauvegarde les métadonnées, les informations d’identification, les connexions, l’historique et la configuration…etc, mais ne dispose d’aucun stockage de données traitées.

Cette fonctionnalité n’est pas disponible par défaut avec le LocalExecutor. Il faut donc ajouter une connexion à une base de données externe. Pour cela, nous avons configuré une base PostgresSQL à l’aides des opérations suivantes.

a- Création de la Base de données
b- Création d’un utilisateur Airflow
c- Accord des privilèges à l’utilisateur sur la base de données
d- Ajout des lignes suivantes dans le fichier de configuration

sql_alchemy_conn= postgresql+psycopg2://airflow-user:airflow-password@postgresIP:5432/airflow_database

Création du DAG

la création d’un DAG nécessite la réception de paramètres en entrée tel que l’identifiant du DAG, la date de début, schedule_interval (dans l’exemple suivant le dag s’exécute toutes les heures), ..

Une fois les paramètres définis, il faut créer les tâches à exécuter en se basant sur des operators. Dans cet exemple, nous utilisons des commandes bash à l’aide du bashOperator.

La dernière étape à gérer dans airflow est la définition d’un ordre d’exécution des taches :

Ici les taches t1, t2 et t3 s’exécutent en parallèle après le succès de l’exécution de t0.

Optimisation possibles

Réduire le coût des Streams

Les Streams Spark peuvent être particulièrement coûteux en ressources du fait qu’ils restent tout le temps actifs.

Dans notre cas d’utilisation, les jsons décrivant un artiste sont chargés uniquement une fois par mois en mode batch. Il n’est donc pas nécessaire d’utiliser un Stream écoutant un dossier en permanence. Pour autant, passer en mode Batch implique de devoir redévelopper certaines fonctionnalité des Streams :

  • Checkpoint : Les streams gardent un historique des fichiers déjà utilisés pour déterminer lesquels sont nouveaux
  • Atomicité : Les fichiers créés sont commités après chaque déclenchement (trigger) réussi. Il n’y a donc pas de risque de création de données par des jobs qui échoueraient
  • Gestion des duplicata : En mode Stream il est possible d’ajouter la méthode dropDuplicates() pour implémenter la sémantique « exactly once », ce qui est impossible en mode Batch

Afin d’éviter ce travail supplémentaire, il est possible de déclencher un stream avec Trigger.Once(). Cela permet de ne déclencher le Stream qu’une seule fois puis de l’arrêter. On a alors le comportement d’un Batch avec toutes les fonctionnalités d’un Stream.

Réduction du nombre de fichiers dans les tables Delta

Spark n’est pas très efficace lorsqu’il s’agit de traiter un nombre important de petits fichiers. Pour accélérer le temps de traitement à l’intérieur de la pipeline il est possible de réduire ce nombre de petits fichiers en les compactant pour résulter en peu de fichiers de taille plus grande.
Pour cela on peut réduire le nombre de fichiers créés à chaque écriture.

spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true
spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true

Ou bien compacter le nombre de fichiers d’une table existante.

fun compactDeltaTableFiles(deltaTableFilePath: String, numberOfFile: Int, spark: SparkSession) {
    try {
        spark.read()
            .format("delta")
            .load(deltaTableFilePath)
            .repartition(numberOfFile)
            .write()
            .option("dataChange", "false")
            .format("delta")
            .mode("overwrite")
            .save(deltaTableFilePath)

        //Ne supprime pas le fichier sauf si les fichiers sont plus anciens que la période de rétention par défaut = 7 jours
        // Nous pouvons également supprimer tous les anciens fichiers dès maintenant en ajoutant le paramètre .vacuum (0.000001)
        // et en désactivant la vérification de la durée de rétention (spark.databricks.delta.retentionDurationCheck.enabled = false)
        DeltaTable.forPath(deltaTableFilePath).vacuum()
    } catch (exception: org.apache.spark.sql.AnalysisException) {
        logger.severe(exception.message())
    }
}

Monitoring des temps d’exécution

Grâce à la représentation graphique des temps d’exécution par Airflow nous avons pu vérifier que ces derniers ont baissé d’environ 30% après avoir appliqué les optimisations sur les jobs spark et cela en lançant l’exécution avec la même quantité de données et de ressources avant et après avoir rajouté la compaction.

Dynamiser ses Data Lake par des supports plus fonctionnels, plus faciles à mobiliser dans des chaînes de valorisation de données plus agiles : voici un sujet critique pour lequel Delta Lake propose une approche élégante et intégrée à des développements réalisés avec Apache Spark.

N’hésitez pas à nous solliciter pour toute question que vous auriez,
Les Data ingénieurs de Novagen sont à votre disposition pour vous conseiller.

contact@novagen.tech