TP Introduction à Hadoop

Mise en place

Le TP se déroulera en binôme. Choisir votre partenaire pour ce TP et informer Emmanuel Coquery qui attribuera les machines (IP de la machine master Hadoop). Ce TP consistuera le point de départ du TP de la semaine suivante qui sera à rendre.

On réalisera de préférence ce TP sous Unix (Mac OS X ou Linux).

La clé ssh nécessaire à la connexion sur la machine vous a été envoyée par mail. Pour se connecter sur la machine on peut utiliser la commande suivante (en supposant que la machine a pour IP 192.168.xx.yy):

ssh -i pedabdcloud ubuntu@192.168.xx.yy

L'utilisation de ssh-agent permet d'éviter l'option -i pedabdcloud pour les commandes ssh et scp du reste du TP:

ssh-add pedabdcloud
ssh ubuntu@192.168.xx.yy # pour se connecter en ssh sans resaisir la clé ssh
scp toto.txt ubuntu@192.168.xx.yy: # pour copier toto.txt sur la machine distante (attention aux : )

Les opérations à exécuter sur hadoop (hdfs, hadoop) sont à faire en tant qu'utilisateur hdfs:

sudo su hdfs # devenir l'utilisateur hdfs
hdfs dfs mkdir /user/hdfs # a faire une seule fois

Exécution d'un job Hadoop

Algorithme de comptage de mots

Le comptage de mots (Word Count) est un algorithme classique de type Map/Reduce. Il permet de compter le nombre d'occurrences de chaque mot dans un fichier. Sur Hadoop en Java, les fonctions map et reduce sont distribuées sur deux classes. Le pseudo code de ces classes est donné ci-dessous:

class Mapper
    method map(docid a, doc d)
        for all term t in d
            emit(t,1)
        end for
    end map
end Mapper

class Reducer
    method reduce(term t, int list counts)
        sum := 0
        for all int c in counts
            sum = sum + c
        end for
        emit(t, sum)
    end reduce
end Reducer    

Version Java

Exécution du job Java précompilé

L'archive bda-tp1.zip contient un projet de départ permettant de lancer le WordCount sur un ensemble d'exemples. Ces exemples se situent dans le répertoire input de l'archive. Copier le répertoire input ainsi que le fichier target/bda-tp1-1.0-SNAPSHOT.jar sur la machine hadoop:

scp -r input target/bda-tp1-1.0-SNAPSHOT.jar ubuntu@192.168.xx.yy:

Copier le répertoire input dans le système de fichier distribué d'Hadoop:

hdfs dfs -copyFromLocal input input

Lancer ensuite le job hadoop via la commande suivante:

hadoop jar bda-tp1-1.0-SNAPSHOT.jar fr.univlyon1.bda.WordCount input outputwc0

Observer les informations affichées lors de l'exécution du job, puis vérifier le contenu du résultat:

hdfs dfs -ls outputwc0
hdfs dfs -cat outputwc0/part-r-00000 | less 

Recompilation du job Java

Sur les machines des salles TP, s'assurer de la bonne configuration de maven.

Sur votre machine, le job peut être recompilé en ligne de commande

mvn package

Ouvrez le projet dans un IDE. La classe WordCount contient le code qui a été exécuté précédement. Modifier la fonction map afin de ne compter que les occurrences de mots de taille > 3. Exécuter le nouveau job et vérifier la cohérence des résultats.

Quelques explications sur le projet fourni

Le projet utilise le système de construction Maven qui s'occupe entre autres de la compilation et de la génération des archives jar. La configuration d'un projet Maven se fait via le fichier pom.xml qui contient en particulier les bibliothèques dont le projet dépend. La bibliothèque hadoop-client contient les classes de la distribution Hadoop. Ces classes n'ont pas besoin de figurer dans l'archive fournie à Hadoop pour exécuter le projet (d'où le scope provided).

Lecture d'un fichier CSV et compte d'occurrences

La machine 192.168.238.6 contient des données issue d'observation astronomiques. Vous pouvez vous connecter en utilisant le login data à cette machine, en utilisant la clé SSH envoyée par mail.

On s'intéressera au contenu du répertoire /data. Dans ce premier TP, on manipulera une quantité modeste de données. Le fichier Source/Source-001.gz est un fichier CSV compressé contenant des données d'observation. Le nom des colonnes (absent du fichier CSV) peut être trouvé dans le fichier Source.sql. Décompresser et ajouter le fichier Source-001 dans le HDFS. Créer une nouvelle classe de job Hadoop dans le projet Java qui lira le contenu de ce fichier et extraira le nombre d'occurrence de chaque object_id présent dans ce fichier, ce qui revient à la requête SQL suivante:

SELECT object_id, count(*)
FROM source
WHERE object_id IS NOT NULL
GROUP BY object_id

Pour découper en colonnes chaque ligne du CSV lu, on pourra utiliser , comme séparateur pour découper chaque ligne:

String[] tokens = value.toString().split(",");

Remarques:

  • les lignes ayant un object_id à NULL ne doivent pas produire de résultat.
  • le fichier src/test/resources/Source.sample.csv contient deux lignes extraites du fichier Source-001.gz
  • source_id correspond à une observation
  • object_id correspond à un objet astronomique (par exemple une étoile)

Calcul de valeurs extrémales

Créer une nouvelle classe de job Hadoop pour pour obtenir en plus du nombre d'occurrences:

  • le plus grand source_id pour chaque object_id (i.e. la dernière observation de l'objet)
  • les valeurs minimales et maximales de ra et decl (les coordonnées angulaires)

soit en SQL:

SELECT object_id, count(*), max(source_id), min(ra), max(ra), min(decl), max(decl)
FROM source
WHERE object_id IS NOT NULL
GROUP BY object_id

Remarque: Il faudra changer les types qui paramètrent les classes de mapper et reducers, voir:

L'objet qui s'est le plus déplacé

Créer une dernière classe de job qui va trouver l'objet qui s'est le plus déplacé dans le ciel. On se contentera ici d'une approximation grossière du déplacement qui consistera en une distance euclidienne calculée à partir des valurs extrémales de ra et decl calculées par le job précédent (dont la sortie sera ainsi utilisée pour ce job).