Remarque: dans ce TP, ont souhaite traiter de grand volumes de données, ce qui signifie des temps de traitement longs. Ci-dessous, quelques conseils pour y faire face:
screen
(documentation) qui permettra de continuer les requêtes tout en étant déconnecté du master (pour l'installer, lancer la commande suivante sur le master: sudo apt-get install screen
).
Ce TP est à rendre pour le dimanche 23 octobre 2016. Il est demandé de déposer un rapport sur tomuss dans la case rendu_hadoop
1) qui comprendra:
On considère le jeu de données du TP précédent, mais dans son ensemble, c'est-à-dire les deux tables Source
et Object
et tous les fichiers de données de ces tables. Une description plus détaillée de ce schéma est disponible ici.
On souhaite répondre à la question suivante: calculer pour chaque objet observé avant le point temporel (earliestObsTime
) 50980.0 50985.0 le nombre d'observations (i.e. de tuples de la table Source
) ainsi que la moyenne de la mesure sourceWidth_SG
flux_Gaussian
:
SELECT o.objectId, count(*) as cnt, avg(flux_Gaussian) flxG_avg FROM Source s join Object o ON s.objectId = o.objectId WHERE earliestObsTime <= 50985.0 -- AND s.objectId IS NOT NULL GROUP BY o.objectId
Il y a deux possibilités pour répondre à cette requête: effectuer une jointure en utilisant la valeur de l'attribut de jointure (objectId
) comme clé lors du shuffle ou effectuer la jointure au niveau du map (ce qui suppose que les données concernées par cette jointure se trouvent dans le même noeud). Dans ce TP on va d'abord implémenter la première puis la seconde version.
Si on se limite au contenu des fichiers Source-001
et Object-001
, on doit obtenir 152 ou 153 résultats.
Écrire un script de chargement des données de la machine data
dans le HDFS. Pour les besoins de ce TP on ne fera pas de réplication. Changer le nombre de réplicats peut se faire via hadoop fs -setrep 1 fichier_ou_rep
(c.f. documentation). Ce script se contentera de récupérer sur votre master les fichiers sur la machine data
, les décompresser et les charger dans le HDFS. Attention, il faut faire ces opérations pour un fichier puis effacer ce dernier de votre master avant de passer au fichier suivant sous peine de saturer le disque du master.
Créer des classes pour les opérations de map et reduce nécessaires pour répondre à la requête du TP via une jointure dans le shuffle/reduce. On rappelle les points suivants:
map
et du reduce
est la valeur de l'attribut de jointure
Réfléchir au bon moment (i.e. map
ou reduce
?) pour effectuer le filtre (partie WHERE
dans la requête).
Lancer la requête et évaluer le temps de calcul.
Avant de pouvoir procéder à la suite, il est nécessaire d'avoir la réponse à la requête suivante (nombre maximum de source par objet):
SELECT MAX(cnt) FROM (SELECT objectId, count(*) as cnt FROM Source WHERE objectId IS NOT NULL GROUP BY objectId)
Créer un ou polusieurs jobs map/reduce pour répondre à cette question.
Supprimer ensuite le jeu de données du HDFS afin de faire de la place pour en charger une version partionnée à la main. On souhaite faire maintenant faire la jointure dans le map
. Cela ne peut fonctionner que si tous les tuples à combiner sont traités par le même mapper, c'est-à-dire se situent sur le même noeud. Dans HDFS cela signifie qu'ils doivent se trouver dans le même fichier de données, celui-ci devant avoir une taille inférieure à la taille d'un bloc (64 Mo). Une solution pourrait être de créer un fichier par valeur de l'attribut de jointure, mais on risque de tomber dans le “small files problem ” (voir ici par exemple). Il faut donc trouver un compromis entre les deux extrêmes suivants:
Il faut donc regrouper au sein d'un même fichier plusieurs valeurs de l'attribut de jointure.
Une fois la partition effectuée coder et lancer un job map/reduce pour répondre à la requête en effectuant la jointure dans le mapper. Il peut être nécessaire de munir la classe du mapper de champs permettant de mémoriser les tuples déjà rencontrés. Pour cela, il peut être utile de gérer le démarrage et la fin d'une tâche en surchargeant les méthodes setup
et cleanup
de la classe Mapper
De même que pour la première version, évaluer le temps nécessaire pour répondre à la requête.
Attention cependant, le TextInputFormat
(utilisé dans le main
) utilisé ne garanti pas que le fichier soit intégralement lu par le même mapper. Il faut pour cela créer une sous-classe de TextInputFormat
en surchargeant la méthode isSplitable
de façon à toujours renvoyer false
(cf doc)