====== TP Spark ====== L'objectif de ce TP est de prendre en main [[http://spark.apache.org/|Spark]], un autre moteur de calcul distribué sur HDFS (entre autres). Ce TP est à faire en binôme (les mêmes que pour le TP Hadoop). Il est demandé de rendre un rapport décrivant le déroulement du TP et contenant en particulier le code commenté pour le **lundi 14/11/2016** au soir sous forme d'un fichier pdf à déposer dans la case tomuss ''rendu_spark''. Chaque binôme dispose d'une machine cloud (ip dans tomuus) pour exécuter des job Spark. La machine héberge un conteneur docker pour faire tourner une instance de la distribution Cloudera (cloudera-quickstart). Ce mode de fonctionnement a les conséquences suivantes: * Il est conseillé de lancer ''screen'', puis de lancer le conteneur depuis screen. * Docker est uniquement accessible par le ''root'' de vorte machine cloud -> penser à utiliser ''sudo'' * Une fois le conteneur démarré avec la commande fournie, un shell est ouvert sur le conteneur. :!: Fermer ce shell éteint le conteneur. :!: * Pour copier un fichier dans le conteneur: ''sudo docker cp le.fichier.a.copier cloudera:/'' * Pour copier un fichier dans HDFS, il faut d'abord le copier dans le conteneur, puis copier le fichier depuis le conteneur dans HDFS * L'utilisateur du conteneur (''root'') a les droits pour exécuter des jobs spark et pour écrire dans HDFS Lancer le conteneur (depuis ''screen'' pour ne pas le perdre en cas d'interruption de la connexion, attention à copier toute la ligne): sudo docker run --hostname=quickstart.cloudera --privileged=true -t -i --name cloudera cloudera/quickstart /usr/bin/docker-quickstart Récupérer l'archive [[https://box.univ-lyon1.fr/p/3f8282|data.zip]] qui contient les fichiers de données. Importer les données dans HDFS (dans le répertoire HDFS ''/home/root'') en conservant la structure de répertoires de l'archive. Dans la suite de ce TP, on va reproduire sur Spark les requêtes des TPs ([[enseignement:tp:bda:hadoop:tp1:2016|tp1]] et [[enseignement:tp:bda:hadoop:tp2:2016|tp2]]) précédents ===== Tutoriel et Wordcount ===== Lire le [[http://spark.apache.org/docs/1.6.0/quick-start.html|tutoriel spark]]. Implémenter le compte de mots sur les fichiers du répertoire ''textes'' lu dans le HDFS. Remarques: * Spark permet d'utiliser plusieurs langages (Scala, Java et Python). Python est probablement le plus simple a utiliser ici. * Utiliser ''%%--master yarn%%'' avec ''pyspark'' (ou ''spark-shell'') pour que Spark accède au HDFS. * On peut remarquer que les calculs effectifs produisent des logs visibles depuis le shell Spark. Essayer de comprendre à quel moment les calculs sont effectués. ===== Statistiques sur Source ===== Reprendre les questions de calcul de statistiques pour chaque objet dans les sources (répertoire ''Source'') du [[enseignement:tp:bda:hadoop:tp1:2016|tp1]]. Calculer le résultat de la requête suivante: SELECT object_id, count(*), max(source_id), min(ra), max(ra), min(decl), max(decl) FROM source WHERE object_id IS NOT NULL GROUP BY object_id Donner ensuite l'objet ayant le plus grand déplacement dans le ciel. Comme dans le TP Hadoop, on se contentera d'une approximation grossière du déplacement qui consistera en une distance euclidienne calculée à partir des valeurs extrémales de ra et decl calculées par le job précédent (dont la sortie sera ainsi utilisée pour ce job). Remarques: * Un RDD Spark est une source de données comprenant éventuellement plusieurs fichiers comme c'est le cas dans Hadoop Map/Reduce. * Spark s'appuie sur des primitives classiques de programmation fonctionnelle qui modifient les RDDs qu'il convient de connaître pour réussir à coder la requête ([[http://spark.apache.org/docs/1.6.0/programming-guide.html#transformations|doc]]): * filtrer les données d'un RDD (équivalent du WHERE en SQL): ''filter''; * transformer chaque ligne d'un RDD: ''map'', ou ''flatMap'' pour transformer chaque ligne en plusieurs résultats (renvoie une liste de résultats, les listes de chaque ligne étant concaténées les une autres); * aggréger (équivalent à GROUP BY + fonctions d'aggrégation): ''reduceByKey''. * Les RRD n'ont pas vocation à être lus directement mais peuvent exploités comme suit: * il est possible de sauver un RDD dans le HDFS via //e.g.// ''saveAsTextFile'' ([[http://spark.apache.org/docs/1.6.0/programming-guide.html#actions|doc]]); * il est également possible de calculer un unique aggrégat comme résultat via ''reduce''. * d'autre actions sont possibles pour exploiter des RDDs comme ''count'', ''take'', ''takeSample'', ''collect''((Attention à ''collect'' sur des données volumineuses, car toutes les lignes du RDD sont renvoyées)) * schémas: {{:enseignement:tp:bda:spark:source.sql|}} ===== Jointure avec Object ===== Reprendre la requête suivante issue du [[enseignement:tp:bda:hadoop:tp2:2016|TP2]] Hadoop: SELECT o.objectId, count(*) as cnt, avg(flux_Gaussian) flxG_avg FROM Source s join Object o ON s.objectId = o.objectId WHERE earliestObsTime <= 50985.0 AND s.objectId IS NOT NULL GROUP BY o.objectId Coder cette requête dans Spark. On pourra utiliser la transformation ''join'' pour effectuer la jointure entre Source et Object (schéma: {{:enseignement:tp:bda:spark:object.sql|}}).