TP2 MapReduce
TP2 MapReduce
TP2 MapReduce
MapReduce
1 Introduction et objectifs du TP
Le TP précédent était consacré à l’installation d’un cluster Hadoop. Nous
allons maintenant exploiter ce cluster pour effectuer les premiers traitements
à l’aide du patron de programmation MapReduce. Mis au point par Google,
ce patron permet de traiter de façon parallèle de gros volumes de données.
Ce patron, largement populaire chez les acteurs du Big Data, repose sur deux
primitives, Map et Reduce, inspirées par la programmation fonctionnelle. Au
cours de ce TP, nous utiliserons l’implémentation du patron MapReduce
fournie par le framework Hadoop et mettrons en place les processus Ma-
pReduce en Java. La Figure 1 illustre les 5 étapes d’un process (appelé
communément job) MapReduce :
1. Un job MapReduce prend un jeu de données en entrée initiale-
ment stocké sur une partition HDFS. Ces données peuvent être de
différentes natures, e.g., documents textuels, n-uplets, données numé-
riques, images. La première étape consiste à découper ce jeu de don-
nées de sorte à produire un ensemble de paires de type hClé, V aleuri.
Chacune de ces paires sera ensuite envoyée à un nœud qui exécutera
la seconde étape, la fonction Map.
2. Dans chaque nœud où elle s’exécute, la fonction Map reçoit en entrée
une paire de type hClé, V aleuri, effectue un traitement sur la valeur
et retourne un ensemble de paires de type hCléInter, V aleurInteri.
Chaque nœud émet ainsi un ensemble de paires. Cette étape est à la
charge du programmeur.
3. Les paires de type hCléInter, V aleurInteri émises par tous les nœuds
Map lors de la phase précédente sont triées dans cette troisième étape.
Sur le principe des tables de hachages, les paires sont regroupées selon
les valeurs de CléInter. Le résultat de cette étape est ainsi la géné-
1
PAIRES PAIRES PAIRES
ENTREE SORTIE
en ENTREE INTERMEDIAIRES en SORTIE
REDUCE <oClé1,oVal1>
<iClé2,iVal300>
<clé198,val198> MAP
<iClé3,iVal301>
2
ration de nouvelles paires, de type hCléInter, V aleurInter[]i. Cette
étape est automatique.
4. Chaque noœud où s’exécutera une fonction Reduce reçoit ainsi une
paire de type hCléInter, V aleurInter[]i. La fonction Reduce effectue
alors un traitement sur cette paire et retourne une nouvelle paire
de type hCléSortie, V aleurSortiei. Cette étape est à la charge du
programmeur.
5. Enfin, ces paires de type hCléSortie, V aleurSortiei sont agrégées et
stockées sur le cluster. Dans le cadre de l’implémentation de MapRe-
duce par Hadoop, le résultat du processus sera stocké sur le système
de fichier HDFS.
Nous voyons ainsi que l’utilisateur ne s’occupe pas de la parallélisation
de la tâche mais se consacre uniquement au traitement des données à travers
les fonctions Map et Reduce, i.e., le reste étant géré par le framework. Néan-
moins, la conception des fonctions Map et Reduce peut s’avérer non triviale
et demander une profonde réflexion de la part du programmeur.
3
PAIRES PAIRES PAIRES
ENTREE SORTIE
en ENTREE INTERMEDIAIRES en SORTIE
<banane,1>
pomme : 3
poire : 2
<pomme,1> banane : 1
melon : 1
pomme, poire, <2, « pomme,
melon poire,melon »>
MAP REDUCE <banane,1>
<poire,1>
<melon,1>
REDUCE <melon,1>
2.3 Instructions
1. Lancez le cluster que vous avez créé lors du TP précédent ;
2. Téléchargez l’archive sur le nœud principal et décompressez là dans le
répertoire de votre choix (par exemple dans /home/hduser/TP/wc1) ;
3. Copiez les données sur la partition HDFS et vérifiez leur présence ;
4. Compilez le code source Java et créez l’archive jar à l’aide des com-
mandes :
— hadoop com.sun.tools.javac.Main WordCount1.java
— jar cf wc1.jar WordCount*.class
5. Lancez le job avec la commande
hadoop jar wc1.jar WordCount1 <INPUT_DIR> <OUTPUT_DIR> ;
4
6. Vérifier que le job a correctement fait son travail en regardant le
résultat produit.
7. L’exemple précédent est relativement inefficace en terme de communi-
cation entre les nœuds. En effet, chaque fonction Map émettra autant
de paires qu’il y aura de mots dans le document qu’elle traite. Dans
un environnement distribué, il est important de minimiser les coûts
de communication et donc de réfléchir à des solutions qui minimisent
le nombre de paires émises par la fonction Map. Adaptez le code pré-
cédent pour optimiser le processus de comptage de mots.
Les fichiers en cache peuvent être spécifiés dans le code Java via l’API en
utilisant les fonctions Job.addCacheFile(URI), Job.addCacheArchive(URI),
Job.setCacheFiles(URI[]) et Job.setCacheArchives(URI[]) où URI est
de la forme hdfs://host:port/absolute-path#fileName. Vous trouverez à
l’adresse www.irit.fr/~Yoann.Pitarch/Docs/SID/BigData/WordCount2.java,
le code commenté du programme Java qui permet de prendre en compte une
liste de mots vides.
5
1. La fonction setup qui permet d’effectuer quelques traitements sur
chaque Mapper avant que la fonction Map ne s’exécute.
2. La notion de cache distribué comme discutée précédemment.
3. La notion de configuration qui permet de définir des paramètres qui
seront connus par tous les nœuds du cluster.
3.1 Instructions
1. En reprenant ce qui a été dit lors du précédent exercice, compilez et
lancez ce programme WordCount2. Testez son comportement.
2. Nous nous proposons d’ajouter une nouvelle fonctionnalité au pro-
gramme à travers l’ajout d’une option : permettre de ne pas considérer
la casse lors du comptage des mots. Modifiez le programme précédent
pour ajouter cette fonctionnalité.
4.1 Instructions
1. Téléchargez l’archive à l’adresse www.irit.fr/~Yoann.Pitarch/Docs/
SID/BigData/tfidf.zip et décompressez là. Vous trouverez à l’inté-
6
rieur les classes Java et les scripts nécessaires au calcul du TF-IDF.
2. Les fonctions Map et Reduce des 3 jobs MapReduce ont volontaire-
ment été effacés. Seules les spécifications demeurent. Vous devez donc
écrire ces fonctions.
3. Une fois l’écriture des fonctions terminée, compilez le code et créez
l’archive jar avec la commande ant (installez ant via la commande
sudo yum install ant si cette commande n’existe pas)
4. Lancez le script run_tfidf.sh <INPUT_DIR> <OUTPUT_DIR>
5. Vérifiez les sorties.