L’analytique scalable avec Druid, déployé sur Kubernetes

 

Après avoir implémenté Apache Kylin sur AWS EMR et sur Google Cloud DataProc et avoir démontré qu’il peut répondre à des scénarios très complexes d’utilisation de la donnée tout en ayant des temps de réponses inférieurs à la seconde, nous avons voulu nous attaquer à Apache Druid sur EKS, le service managé Kubernetes de chez AWS, et tout cela via Terraform.

 

Pourquoi Apache Druid ?

 
 

Apache Druid est un datastore distribué qui a été open-sourcé par Metamarkets en 2012 et mis sous licence Apache en 2015. C’est une solution mature et déjà mise en place par les grands de la Tech, que ce soit Airbnb, Ebay, Pinterest ou encore Cisco, leurs retours d’expérience peut nous rassurer sur cette maturité.

Druid fait la promesse de pouvoir assimiler en temps réel ou en batch des grandes quantitées de données tout en permettant leur accessibilité avec peu de latence. Bienque ces promesses se retrouvent dans beaucoup de solutions OLAP, nous voulions en avoir le coeur net ( technique > marketing ).

Ce qui nous a immédiatement séduit dans Druid est que sa philosophie de cluster n’est pas une relation maître – esclaves mais un ensemble de services qui travaillent ensemble et sont coordonnés par Zookeeper. Cette philosophie peut rendre la solution résiliente et scalable à moindre effort (et à moindre coût). Druid possède aussi un grand nombre d’avantages analytiques bien connus (Pré-aggregation, data versioning, etc… ) mais surtout communauté particulièment dynamique !

 

Pourquoi Kubernetes ?

 

Comme dit juste au-dessus, Druid est un ensemble de services (noeuds) qui travaillent ensemble. Face à une philosophie de services sous forme de noeuds, Kubernetes (K8S) est une solution qui est adéquate, de plus K8S va nous permettre de mettre en place de l’auto scaling via la combinaison de Stateful Sets K8S, de l’autoscaler EKS et des AutoScaling Groups EC2 chez AWS.

Les bénéfices d’une automatisation sont multiples : accélération des temps de développement, limitation des erreurs de déploiement, adaptation de l’infrastructure aux fluctuations de demandes.

 

Architecture Druid:

 

Druid possède plusieurs composants bien distincts avec chacun un rôle bien précis. Dans notre cas, nous avons voulu séparer l’ensemble des composants (vous pouvez déployer plusieurs composants sur la même machine si vous le souhaitez).

Les composants sont les suivants :

  • Zookeeper : Plus besoin de présenter le service de configuration distribué, pierre angulaire de Hadoop, il va servir à coordonner l’orchestration de l’ensemble des services Druid.
  •  
  • Postgres / MySQL : Druid utilise une base de données internes afin de stocker l’ensemble de ses metadatas (essentiellement l’interface du router).
  •  
  • Router (Optionnel) : Composant optionnel, il permet d’avoir une API unifiée et un point d’entrée pour les requêtes qu’il redirigera vers le(s) broker(s). Il permet aussi d’avoir une interface.
  •  
  • Broker : Il va permettre de correctement diriger les requêtes Druid, il est capable de comprendre les metadata Zookeeper (emplacement des segments de données sur les Historicals). Il est aussi capable de merger les résultats des processus venant des Historicals.
  •  
  • Coordinator : Le coordinateur Druid est responsable du management des segments et de la distribution de ceux-ci. Il communique principalement avec les processus Historicals afin de charger / décharger les segments de données depuis / vers le deep storage (nous avons utilisé S3).
  •  
  • Overlords : L’Overlord Druid est responsable de l’acceptation et de la coordination des tâches et des mécanismes de locks autour celles-ci.
  •  
  • Middle-managers : Le Middle-manager est un processus d’exécution des tâches d’ingestion / compaction des données dans le cluster. Chaque Middle-manager va posséder 1 à N péons qui vont chacun réaliser une tâche bien précise lors des phases de calculs (La philosophie du péon est comparable aux exécuteurs Spark même si leurs fonctionnement sont très différents).
  •  
  • Historicals : Chaque processus Historicals maintient une connexion constante à Zookeeper et regarde si il existe certains chemins Zookeeper très précis pour ingestion de nouveaux segments (a.k.a Zookeeper load paths). Les processus Historicals reposent entièrement sur Zookeeper pour leurs coordinations et ne communiquent pas entre eux si il existe plusieurs processus Historicals.

Apache Druid est relativement riche en termes de processus qui opèrent entre eux. Maintenant, voyons comment nous pouvons organiser cela sur un cluster Kubernetes.

Architecture K8S:

 

Notre architecture est composée d’un VPC AWS, de 2 subnets appartenant à ce VPC, chacun de ces subnets sont dans des zones de disponibilités distinctes. Nous avons créé 8 groupes d’auto scaling différents pour chacun des services nécessaires à Apache Druid.

Ce choix vient du fait que nous voulions pouvoir contrôler de bout en bout notre architecture en terme de coût, certains services Druid nécessitant beaucoup moins de ressources que d’autres pour fonctionner.

L’option la plus intéressante à déployer se situe au niveau du bootstrap de l’AMI sur chaque worker node de EKS. En effet, vous pouvez passer des arguments en plus sur votre kubelet, nous avons donc construit ainsi 8 bootstraps différents pour chacun des workers nodes qui serait déployé dans chacun des auto scaling group.

Le fait de pouvoir ajouter un rôle à chaque node va nous permettre par la suite de paramétrer facilement un nodeSelector pour les stateFullSets Kubernetes.

Dans notre code Terraform, cela se représente de la façon suivante :

AMI EKS:

data "aws_ami" "eks-worker" {

  filter {

    name   = "name"

    values = ["amazon-eks-node-${aws_eks_cluster.tf_eks.version}-v*"]

  }

  most_recent = true

  owners      = ["602401143452"] # Amazon EKS AMI Account ID

}

EKS bootstrap configuration for zookeepers nodes :

locals {

  tf-eks-node-zookeepers = <<USERDATA

  #!/bin/bash

  set -o xtrace

  /etc/eks/bootstrap.sh 

–apiserver-endpoint '${aws_eks_cluster.tf_eks.endpoint}'

–b64-cluster-ca '${aws_eks_cluster.tf_eks.certificate_authority.0.data}' –kubelet-extra-args '–node-labels=kubernetes.io/role=zookeepers' '${var.cluster_name}'

  USERDATA}

AWS launch configuration :

resource "aws_launch_configuration" "zookeepers_tf_eks" {

  iam_instance_profile     = "${aws_iam_instance_profile.node.name}"

  image_id          = "${data.aws_ami.eks-worker.id}"

  instance_type     = "m5.large"

  name_prefix       = "zookeepers-terraform-eks"

  security_groups   = ["${aws_security_group.tf-eks-node.id}"]

  # Bootstrap user data

  user_data_base64  = "${base64encode(local.tf-eks-node-zookeepers)}"

}

AWS auto scaling group :

resource "aws_autoscaling_group" "zookeepers_tf_eks" {

  desired_capacity     = "3"

  launch_configuration = "${aws_launch_configuration.zookeepers_tf_eks.id}"

  max_size             = "5"

  min_size             = "1"

  name                 = "zookeepers-tf-eks"

  vpc_zone_identifier  = ["${aws_subnet.subnet_A.id}","${aws_subnet.subnet_B.id}"]

  tag {

    key                 = "Name"

    value               = "zookeepers-tf-eks"

    propagate_at_launch = true

  }

  tag {

    key                 = "kubernetes.io/cluster/${var.cluster_name}"

    value               = "owned"

    propagate_at_launch = true

  }

  tag {

    key                 = "k8s.io/cluster-autoscaler/enabled"

    value               = true

    propagate_at_launch = true

  }

  tag {

    key                 = "k8s.io/cluster-autoscaler/${var.cluster_name}"

    value               = true

    propagate_at_launch = true

  }

}

L’ensemble de ces configurations nous permet comme dit plus haut de pouvoir sélectionner sur quelles instances précises nous allons déployer nos pods kubernetes et cela via le paramètre nodeSelector, configuré ainsi :

    spec:

      nodeSelector:

        kubernetes.io/role: zookeepers

Le bootstrap des worker nodes EKS nous a permis de manager notre partie hardware sur Kubernetes selon les besoins demandés par chacun des services Druid (on peut voir que les nodes historicals qui contiennent la donnée sur Druid sont des instances i3 et optimisées pour la lecture sur disque par exemple).

Conclusion :

Cet article traite seulement du moyen de faire le lien entre vos machines et le déploiement de vos pods kubernetes, mais pour le déploiement complet d’un cluster Druid sous Kubernetes vous devez réfléchir aussi à :

  • Votre réseau, vos groupes de sécurités.
  • Vos rôles IAM.
  • La création de vos / votre image Docker à déployer.
  • Vos méthodes de déploiement des configuration des services k8s.
  • Votre auto-scaler EKS (qui permet de déclencher les AWS autoscaling groups)
  • Votre système de supervision du cluster Kubernetes.
  • Lier vos métriques de supervision avec votre auto-scaler EKS afin d’automatiser complètement l’autoscaling du cluster.

Un cas client

Pour un de nos clients, nous avons démontré que l’on pouvait intégrer facilement 80 Gb de fichiers CSV en moins de 30 minutes pour une architecture similaire à celle présentée ci-dessus.
Le coût annuel est légèrement inférieur à 30k$ en termes de machines on-demands, divisant par 3 le coût de l’infrastructure en place. L’ingestion a demandé un peu de scalabilité au niveau des middle-managers (+ 10 machines).

Le coût d’ingestion a été évalué à environ 1,5$ le temps d’insérer ces 80Gb dans le cluster, après quoi le cluster effectue un scale down pour retourner à un fonctionnement classique.

Lucas

Apache Druid
Utilisation de Druid à AirBnb
Utilisation de Druid à Pinterest