Spark As A Service – avec Spark Job Server.

Votre algorithme de machine learning est maintenant bien entraîné, vous avez joué avec les paramètres que la librairie ML de Spark vous offre pour optimiser sa pertinence mais…. vous voudriez l’utiliser en mode interactif.

Il vous faut alors une solution de type Spark As A Service. De manière simplifié, on peut selon ce dispositif:

  • compiler un jar pour le soumettre au serveur via une API Rest;
  • exécuter un job sous forme de requête de cette Api qui le délègue au cluster distant.
Le premier bénéfice est celui d’un gain de vitesse important grâce au « long term running Spark Context »: En lançant le job une première fois le Spark Context va charger le modèle en mémoire et permettra lors des appels suivants d’économiser le temps de chargement du modèle.

Ceci est très bénéfique quand le modèle est particulièrement conséquent. Il est aussi possible de mettre en cache une grande variété de données pour éviter les chargements successifs avec les “namedObject” qui seront détaillé un peu plus loin.

Cette utilisation as A service est rendu possible grâce à l’API REST Job server https://github.com/spark-jobserver/spark-jobserver  

Et nous allons voir comment mettre en place et utiliser cette API, la paramétrer pour utiliser Spark 2.0 ainsi que des exemples d’utilisation avec des jobs différents.

Tout d’abord, il faut savoir que cette API a été écrite pour la version 1.6 de Spark et qu’il va donc falloir l’adapter légèrement avec les Jobs , afin de les faire marcher avec Spark 2.0.
Je précise que toute la configuration que je vais donner vaut pour la date d’aujourd’hui

1. Lancement du Service

Pour lancer l’API rien de plus simple en utilisant la puissance des containers Docker. Il sera même possible pour une utilisation en production de déployer cette API sur un cluster pour augmenter le nombre de requêtes simultanées. Si cela vous intéresse je vous invite à consulter un précédent article que j’ai écrit http://www.novagen.tech/deployer-cluster-applicatif-10mn-docker-swarm-_/

Pour lancer le serveur on va donc “simplement” lancer une image Docker composée de 3 éléments :

  • Spark Job Server pour l’API en elle même
  • Spark Core pour le noyau Spark
  • Mesos pour la gestion des ressources à l’intérieur du conteneur

Par rapport à l’image présente sur  le docker hub de spark jobserver j’ai modifié la version des logiciels en mettant à jour les 3 précédents éléments vers leurs dernières versions (et surtout Spark 2.0)

Pour lancer l’API / le containeur exécuter la commande suivante sur une machine possédant suffisamment de RAM pour faire tourner Spark

docker run -d -p 8090:8090 lucienfregosi/spark-job-server_2.0

Cette commande va lancer l’API sur le port 8090 de la “machine Centrale”, pour accéder à l’API : <IP_MACHINE_CENTRALE>:8090 (ou locahost:8090) si vous travaillez sur une unique machine.
Sur l’interface graphique on aperçoit 3 onglets distincts :

  • Un onglet Jobs
  • Un onglet Contexts
  • Un onglet Jars

2. Initialisation du squelette du projet en Spark 2.0

Pour envoyer des jobs à l’API nous allons utiliser un template de projet fourni par Spark Jobserver.

Le template utilise Giter8 (http://www.foundweekends.org/giter8/) qui est un outil en ligne de commande pour adapter des repository github aux différentes versions des logiciels sur l’environnement de celui qui cherche à cloner le repo. Pour ce qui va suivre assurez vous d’être en sudo pendant toute la phase d’installation/copie du projet.

a. Installation de Giter 8

Pour installer giter8, il faut tout d’abord installer Conscript (http://www.foundweekends.org/conscript/index.html oui on s’y perd un peu des fois..) sur lequel se base giter8 (pour l’anecdote c’est un mécanisme de distribution pour les app scala comprenne qui pourra..)

Lancer les 3 commandes suivantes pour exporter les variables de session:

export CONSCRIPT_HOME=″$HOME/.conscript″ &&
export CONSCRIPT_OPTS=″-XX:MaxPermSize=512M -Dfile.encoding=UTF-8″ &&
export PATH=$CONSCRIPT_HOME/bin:$PATH`

Pour récupérer les sources de conscript avec Curl

curl https://raw.githubusercontent.com/foundweekends/conscript/master/setup.sh | sh

Ensuite installer giter8

cs foundweekends/giter8

 

b. Copie et modification du template

Pour cloner le dossier nous allons utiliser SBT (http://www.scala-sbt.org/ assurez vous de l’avoir installé `sbt sbtVersion`  et d’avoir au minimum la version 0.13.13 au minimum, sinon suivez ce lien pour mettre sbt à jour https://askubuntu.com/questions/678787/how-to-update-sbt/678788)

Pour cloner le projet, utilisez la commande sbt new:

sbt new spark-jobserver/spark-jobserver.g8

Giter8 va nous demander un certain nombre d’informations pour mettre à jour les bonnes versions dans le dossier du projet.

Remplir les champs comme suit en adaptant à votre projet, cela adaptera le template générique de Github.
Attention ! veillez à laisser la version de scala par défaut car sans cela, impossible de compiler même si cela correspond bien à la version sur votre client..

Une fois les différentes informations indiquées, cela devrait créer un dossier avec le champ “name” que vous avez renseigné.

C’est presque terminé mais, malheureusement, le template est configuré pour la version 1.6 de Spark et nous devons passer sur la version 2

c. Mise à jour du template vers Spark 2.0

Dans l’arborescence du projet ouvrir le fichier <nom_projet>/projet/Versions.scala

qui devrait être composé des lignes suivantes :

object Versions {
lazy val netty = ″4.0.33.Final″
lazy val jobServer = ″-SNAPSHOT″
lazy val spark = ″1.6.2″
lazy val typesafeConfig = ″1.2.1″
}

Et remplacez les par les lignes suivantes :

import scala.util.Properties.isJavaAtLeast

object Versions {
lazy val netty = ″4.0.42.Final″
lazy val jobServer = ″-SNAPSHOT″
lazy val spark = ″2.0.0″
lazy val typesafeConfig = if (isJavaAtLeast(« 1.8 »)) « 1.3.0 » else « 1.2.1 »
}

Ce faisant le projet compilera en Spark version 2.0

3. Soumission d’un Job à l’API

Maintenant que le projet est correctement configuré pour permettre d’exécuter des jobs Spark 2.0, nous allons pouvoir les soumettre.

Dans le template de projet chargé sur Github, un exemple de projet est disponible, un Word Count bien évidemment le “Hello Word” du Big Data.
Ce fichier est situé au chemin suivant : /src/main/scala/<package_name>/WordCountExample.scala

En ouvrant ce fichier on remarque deux choses :

  • Il y a deux versions du code. En effet, l’API Spark jobserver a radicalement changé et le fichier d’exemple présente les deux façon d’utiliser cette API, nous nous concentrerons sur la nouvelle utilisation plutôt que l’ancienne.
  • Le code est extrêmement simple et l’utilisation de l’API triviale

On déclare un objet héritant de la classe NewSparkJob qui implémente toutes les méthodes de l’API

Puis la définition du format des données d’entrées (JobData) et des données de sortie (JobOutput) ce qui dans notre cas correspondant à une collection de mots (donc string) et le nombre de fois où ils apparaissent (donc Long)

Le reste du code est composé de deux fonctions :

  • Validate() qui comme son nom l’indique permet de valider la présence et le format des données d’entrées
  • runJob() la fonction ou le code à exécuter sera placé. Cette fonction a 3 arguments d’entrée : un SparkContext qui permet d’implémenter toutes les fonctions de Spark (Datasets, DataFrame…) un JobEnvironment qui contient les spécificités liés aux options du Job (mémoire utilisé, nombre de coeurs etc) et enfin les données d’entrées

Dans le cas de notre WordCount le code est extrêmement simple :

Une fois le Job Écrit, il va falloir compiler un Jar que l’on va envoyer sur le cluster
Pour ce faire avec sbt utiliser la commande

sbt package

Cette commande crée un Jar situé sur le chemin suivant target/scala-2.10/job_2.10-0.1.0-SNAPSHOT.jar

Pour uploader le Jar sur le cluster distant avec une requête Curl

curl –data-binary @target/scala-2.10/job_2.10-0.1.0-SNAPSHOT.jar <IP_DU_CLUSTER>:8090/jars/<NOM_DU_JAR>

Il est nécessaire ensuite de créer un contexte, en effet pour profiter du long term running Spark Context il est important de le définir et de l’utiliser lors de chaque appel à l’API. Pour rappel le long term spark context permet de charger en mémoire des modèle ou des données qu’une seule fois et ainsi profiter de gain de temps considérable lors des appels à l’API suivants.

Le contexte  va permettre de définir les ressources allouées aux jobs contenus dans le Jar (mémoire, nombre de coeurs etc).

Pour le créer avec par exemple ici 4 coeurs et 1024 méga de RAM par noeuds :

curl -d ″″ ‘localhost:8090/contexts/test-context?num-cpu-cores=4&memory-per-node=1024m’

On exécute le job WordCount avec la commande Curl suivante :

curl -d ″input.string = ceci est le texte exemple pour le fonctionnement de spark job server″ ‘<IP_DU_CLUSTER>:8090/jobs?appName=<NOM_DU_JAR>&classPath=<NOM_DU_PACKAGE>.WordCountExample&sync=true’&context=test-context

Le paramètre sync=true permet de recevoir la réponse de façon asynchrone (on attend que le job se soit exécuté pour recevoir la réponse). Il peut être nécessaire de rajouter une option timeout élevée pour ne pas déclencher une erreur.

4. Utilisation des Named Objects

Un des avantages de Spark Jobserver est l’utilisation des NamedObjects qui permettent de persister les données en mémoire. Néanmoins les données sont mises en cache pour un contexte donné, il est donc indispensable d’utiliser le même contexte pour la soumission du Job pour avoir les avantages de la mise en cache.

La première chose à faire est d’ajouter ces deux lignes avant la fonction run() pour pouvoir faire persister nos objets et de les réutiliser.

implicit def rddPersister[T] : NamedObjectPersister[NamedRDD[T]] = new RDDPersister[T]
implicit val dataFramePersister = new DataFramePersister

Ensuite dans le code de la fonction run(), il faut utiliser la fonction getOrElseCreate (qui comme son nom l’indique crée le named Object s’il n’existe pas et le récupérer sinon)

La syntaxe pour créer le named Object est la suivante :

runtime.namedObjects.getOrElseCreate(″dfKey″, {
NamedDataFrame(″le,contenu,de,nimporte,quelle,dataframe,″.split(″,″).map {
t => Seq(t)
}.toSeq.toDF, true, StorageLevel.MEMORY_ONLY) })

La premier argument est la clef texte du named Object. Le deuxième est un tableau contenant le code de la dataframe ainsi que deux options : le booléen forceComputation et le niveau de stockage dans la mémoire.

Enfin pour obtenir le contenu de la dataframe la commande est la suivante :

val dfNamed = runtime.namedObjects.get[NamedDataFrame](″dfKey″).get.df

En lançant des jobs avec le même contexte plusieurs fois vous pourrez vous rendre compte que les données ne sont pas rechargées à chaque fois mais sont persisté dans le cache.

5. Ouverture

Ce tutoriel vous a permis de voir le fonctionnement de l’API Spark Job Server, il ne reste plus qu’à écrire vos propres Jobs !

Pour aller plus loin, Spark Job Server propose des fonctionnalitées avancées comme :

  • La communication chiffrée entre le client et le serveur via SSL
  • L’authentification via Apache Shiro qui peut s’interfacer avec un Active Directory
  • Un cookbook Chef pour le déploiement en production de Spark Job Server