TP Spark

L'objectif de ce TP est de prendre en main Spark, un autre moteur de calcul distribué sur HDFS (entre autres).

Ce TP est à faire en binôme (les mêmes que pour le TP Hadoop). Il est demandé de rendre un rapport décrivant le déroulement du TP et contenant en particulier le code commenté pour le lundi 14/11/2016 au soir sous forme d'un fichier pdf à déposer dans la case tomuss rendu_spark.

Chaque binôme dispose d'une machine cloud (ip dans tomuus) pour exécuter des job Spark. La machine héberge un conteneur docker pour faire tourner une instance de la distribution Cloudera (cloudera-quickstart).

Ce mode de fonctionnement a les conséquences suivantes:

  • Il est conseillé de lancer screen, puis de lancer le conteneur depuis screen.
  • Docker est uniquement accessible par le root de vorte machine cloud → penser à utiliser sudo
  • Une fois le conteneur démarré avec la commande fournie, un shell est ouvert sur le conteneur. :!: Fermer ce shell éteint le conteneur. :!:
  • Pour copier un fichier dans le conteneur: sudo docker cp le.fichier.a.copier cloudera:/
    • Pour copier un fichier dans HDFS, il faut d'abord le copier dans le conteneur, puis copier le fichier depuis le conteneur dans HDFS
    • L'utilisateur du conteneur (root) a les droits pour exécuter des jobs spark et pour écrire dans HDFS

Lancer le conteneur (depuis screen pour ne pas le perdre en cas d'interruption de la connexion, attention à copier toute la ligne):

sudo docker run --hostname=quickstart.cloudera --privileged=true -t -i --name cloudera cloudera/quickstart /usr/bin/docker-quickstart

Récupérer l'archive data.zip qui contient les fichiers de données.

Importer les données dans HDFS (dans le répertoire HDFS /home/root) en conservant la structure de répertoires de l'archive.

Dans la suite de ce TP, on va reproduire sur Spark les requêtes des TPs (tp1 et tp2) précédents

Tutoriel et Wordcount

Lire le tutoriel spark. Implémenter le compte de mots sur les fichiers du répertoire textes lu dans le HDFS.

Remarques:

  • Spark permet d'utiliser plusieurs langages (Scala, Java et Python). Python est probablement le plus simple a utiliser ici.
  • Utiliser --master yarn avec pyspark (ou spark-shell) pour que Spark accède au HDFS.
  • On peut remarquer que les calculs effectifs produisent des logs visibles depuis le shell Spark. Essayer de comprendre à quel moment les calculs sont effectués.

Statistiques sur Source

Reprendre les questions de calcul de statistiques pour chaque objet dans les sources (répertoire Source) du tp1. Calculer le résultat de la requête suivante:

SELECT object_id, COUNT(*), MAX(source_id), MIN(ra), MAX(ra), MIN(decl), MAX(decl)
FROM SOURCE
WHERE object_id IS NOT NULL
GROUP BY object_id

Donner ensuite l'objet ayant le plus grand déplacement dans le ciel. Comme dans le TP Hadoop, on se contentera d'une approximation grossière du déplacement qui consistera en une distance euclidienne calculée à partir des valeurs extrémales de ra et decl calculées par le job précédent (dont la sortie sera ainsi utilisée pour ce job).

Remarques:

  • Un RDD Spark est une source de données comprenant éventuellement plusieurs fichiers comme c'est le cas dans Hadoop Map/Reduce.
  • Spark s'appuie sur des primitives classiques de programmation fonctionnelle qui modifient les RDDs qu'il convient de connaître pour réussir à coder la requête (doc):
    • filtrer les données d'un RDD (équivalent du WHERE en SQL): filter;
    • transformer chaque ligne d'un RDD: map, ou flatMap pour transformer chaque ligne en plusieurs résultats (renvoie une liste de résultats, les listes de chaque ligne étant concaténées les une autres);
    • aggréger (équivalent à GROUP BY + fonctions d'aggrégation): reduceByKey.
  • Les RRD n'ont pas vocation à être lus directement mais peuvent exploités comme suit:
    • il est possible de sauver un RDD dans le HDFS via e.g. saveAsTextFile (doc);
    • il est également possible de calculer un unique aggrégat comme résultat via reduce.
    • d'autre actions sont possibles pour exploiter des RDDs comme count, take, takeSample, collect1)
    • schémas: source.sql

Jointure avec Object

Reprendre la requête suivante issue du TP2 Hadoop:

SELECT o.objectId, COUNT(*) AS cnt, avg(flux_Gaussian) flxG_avg
FROM SOURCE s JOIN Object o ON s.objectId = o.objectId
WHERE earliestObsTime <= 50985.0
  AND s.objectId IS NOT NULL
GROUP BY o.objectId

Coder cette requête dans Spark. On pourra utiliser la transformation join pour effectuer la jointure entre Source et Object (schéma: object.sql).

1)
Attention à collect sur des données volumineuses, car toutes les lignes du RDD sont renvoyées