Table des matières

TP Jointure et partionnement sur Hadoop

Remarque: dans ce TP, ont souhaite traiter de grand volumes de données, ce qui signifie des temps de traitement longs. Ci-dessous, quelques conseils pour y faire face:

Modalités de rendu

Ce TP est à rendre pour le dimanche 23 octobre 2016. Il est demandé de déposer un rapport sur tomuss dans la case rendu_hadoop1) qui comprendra:

Données et requête

On considère le jeu de données du TP précédent, mais dans son ensemble, c'est-à-dire les deux tables Source et Object et tous les fichiers de données de ces tables. Une description plus détaillée de ce schéma est disponible ici.

On souhaite répondre à la question suivante: calculer pour chaque objet observé avant le point temporel (earliestObsTime) 50980.0 50985.0 le nombre d'observations (i.e. de tuples de la table Source) ainsi que la moyenne de la mesure sourceWidth_SG flux_Gaussian:

SELECT o.objectId, count(*) as cnt, avg(flux_Gaussian) flxG_avg
FROM Source s join Object o ON s.objectId = o.objectId
WHERE earliestObsTime <= 50985.0 -- AND s.objectId IS NOT NULL
GROUP BY o.objectId

Il y a deux possibilités pour répondre à cette requête: effectuer une jointure en utilisant la valeur de l'attribut de jointure (objectId) comme clé lors du shuffle ou effectuer la jointure au niveau du map (ce qui suppose que les données concernées par cette jointure se trouvent dans le même noeud). Dans ce TP on va d'abord implémenter la première puis la seconde version.

Si on se limite au contenu des fichiers Source-001 et Object-001, on doit obtenir 152 ou 153 résultats.

Jointure via la clé de shuffle/reduce

Chargement des données

Écrire un script de chargement des données de la machine data dans le HDFS. Pour les besoins de ce TP on ne fera pas de réplication. Changer le nombre de réplicats peut se faire via hadoop fs -setrep 1 fichier_ou_rep (c.f. documentation). Ce script se contentera de récupérer sur votre master les fichiers sur la machine data, les décompresser et les charger dans le HDFS. Attention, il faut faire ces opérations pour un fichier puis effacer ce dernier de votre master avant de passer au fichier suivant sous peine de saturer le disque du master.

Requête

Créer des classes pour les opérations de map et reduce nécessaires pour répondre à la requête du TP via une jointure dans le shuffle/reduce. On rappelle les points suivants:

Réfléchir au bon moment (i.e. map ou reduce ?) pour effectuer le filtre (partie WHERE dans la requête).

Lancer la requête et évaluer le temps de calcul.

Partionnement préalable

Avant de pouvoir procéder à la suite, il est nécessaire d'avoir la réponse à la requête suivante (nombre maximum de source par objet):

SELECT MAX(cnt)
FROM (SELECT objectId, count(*) as cnt
      FROM Source
      WHERE objectId IS NOT NULL
      GROUP BY objectId)

Créer un ou polusieurs jobs map/reduce pour répondre à cette question.

Supprimer ensuite le jeu de données du HDFS afin de faire de la place pour en charger une version partionnée à la main. On souhaite faire maintenant faire la jointure dans le map. Cela ne peut fonctionner que si tous les tuples à combiner sont traités par le même mapper, c'est-à-dire se situent sur le même noeud. Dans HDFS cela signifie qu'ils doivent se trouver dans le même fichier de données, celui-ci devant avoir une taille inférieure à la taille d'un bloc (64 Mo). Une solution pourrait être de créer un fichier par valeur de l'attribut de jointure, mais on risque de tomber dans le “small files problem ” (voir ici par exemple). Il faut donc trouver un compromis entre les deux extrêmes suivants:

Il faut donc regrouper au sein d'un même fichier plusieurs valeurs de l'attribut de jointure.

Jointure dans le map

Une fois la partition effectuée coder et lancer un job map/reduce pour répondre à la requête en effectuant la jointure dans le mapper. Il peut être nécessaire de munir la classe du mapper de champs permettant de mémoriser les tuples déjà rencontrés. Pour cela, il peut être utile de gérer le démarrage et la fin d'une tâche en surchargeant les méthodes setup et cleanup de la classe Mapper

De même que pour la première version, évaluer le temps nécessaire pour répondre à la requête.

Attention cependant, le TextInputFormat (utilisé dans le main) utilisé ne garanti pas que le fichier soit intégralement lu par le même mapper. Il faut pour cela créer une sous-classe de TextInputFormat en surchargeant la méthode isSplitable de façon à toujours renvoyer false (cf doc)

1)
un rapport par binôme, le fichier étant déposé automatiquement pour les deux étudiants