Mapreduce: Simplified Data Processing On Large Clusters
Mapreduce: Simplified Data Processing On Large Clusters
Mapreduce: Simplified Data Processing On Large Clusters
AGENDA
Motivation
Lots of machines
Google needed good Distributed file System Why not use the existing file systems?
Googles problems are different from anyone else. GFS is designed for Google apps and workloads. Google apps are designed for GFS.
NFS Disadvantages
Network congestion Heavy disk activity of the NFS server adversely affects the NFSs performance. When the client attempts to mount , the client system hangs, although this can be mitigated using a specific mount. If the server hosting the exportedfile system becomes unavailable due to any reason, no one can access the resource. NFS has security problems because its design assumes a trustednetwork.
GFS Assumptions
Perhaps Concurrently
- Each chunk is replicated across 3+ chunkservers Single master to co ordinate access,keep metadata - Simple centralized management. No data caching - Little benefit due to large datasets,streaming reads.
GFS Architecture
Single Master
GFS solutions
- Shadow masters
- Minimize master involvement
Metadata (1/2)
- Easily Accessible.
Metadata (2/2)
Master has an operation log for persistent logging of critical metadata updates.
- Persistent on local disk
Deployment in Google
50 + GFS Clusters Each with thousands of storage nodes Managing petabytes of data. GFS is under big table.
Conclusion of GFS
GFS demonstrates how to support large scale processing workloads on commodity hardware
- Designed to tolerate frequent component failures. - Optimized for huge files that are mostly appended and read. - Go for simple solutions.
Page 1: the weather is good Page 2: today is good Page 3: good weather is good.
Map output
Worker 1:
Worker 2:
Worker 3:
Reduce Input
Worker 1:
(the 1)
(is 1), (is 1), (is 1) (weather 1), (weather 1) (today 1) (good 1), (good 1), (good 1), (good 1)
Worker 2:
Worker 3:
Worker 4:
Worker 5:
Reduce Output
Worker 1:
(the 1)
(is 3) (weather 2) (today 1) (good 4)
Worker 2:
Worker 3:
Worker 4:
Worker 5:
MapReduce Architecture
Parallel Execution
Fault Tolerance
Network Failure:
Detect failure via periodic heartbeats Re-execute completed and in-progress map tasks Re-execute in progress reduce tasks Task completion committed through master
Master failure:
Refinement
Different partitioning functions. Combiner function. Different input/output types. Skipping bad records. Local execution. Status info. Counters.
Whats
Framework for running applications on large clusters of commodity hardware Scale: petabytes of data on thousands of nodes Include Storage: HDFS Processing: MapReduce Support the Map/Reduce programming model Requirements Economy: use cluster of comodity computers Easy to use Users: no need to deal with the complexity of distributed computing Reliable: can handle node failures automatically
Hadoop is a software platform that lets one easily write and run applications that process vast amounts of data.
Here's what makes Hadoop especially useful:
HDFS
Hadoop implements MapReduce, using the Hadoop Distributed File System (HDFS) (see figure below.) MapReduce divides applications into many small blocks of work. HDFS creates multiple replicas of data blocks for reliability, placing them on compute nodes around the cluster. MapReduce can then process the data where it is located. Hadoop has been demonstrated on clusters with 2000 nodes. The current design target is 10,000 node clusters.
Hadoop Architecture
Data
Data data data data data Data data data data data Data data data data data Data data data data data Data data data data data Data data data data data Data data data data data Data data data data data Data data data data data Data data data data data Data data data data data Data data data data data
Hadoop Cluster
DFS Block 1 DFS Block 1
Results
Data data data data Data data data data Data data data data Data data data data Data data data data Data data data data Data data data data Data data data data Data data data data
MAP
Reduce
MAP
DFS Block 3 DFS Block 3 DFS Block 3
Sample text-files as input: $ bin/hadoop dfs -ls /usr/joe/wordcount/input/ /usr/joe/wordcount/input/file01 /usr/joe/wordcount/input/file02 $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01 Hello World, Bye World! $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02 Hello Hadoop, Goodbye to hadoop.
Run the application: $ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output Output: $ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000 Bye 1 Goodbye 1 Hadoop, 1 Hello 2 World! 1 World, 1 hadoop. 1 to 1
Contd
Notice that the inputs differ from the first version we looked at, and how they affect the outputs. Now, lets plug-in a pattern-file which lists the word-patterns to be ignored, via the DistributedCache. $ hadoop dfs -cat /user/joe/wordcount/patterns.txt \. \, \! to Run it again, this time with more options: $ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount Dwordcount.case.sensitive=true /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt As expected, the output: $ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000 Bye 1 Goodbye 1 Hadoop 1 Hello 2 World 2 hadoop 1
Contd
Run it once more, this time switch-off case-sensitivity: $ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=false /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt Sure enough, the output: $ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000 bye 1 goodbye 1 hadoop 2 hello 2 world 2
Hadoop
HDFS assumes that hardware is unreliable and will eventually fail. Similar to RAID level except -HDFS can replicate data across several machines Provides Fault tolerance Extremely high capacity storage
Hadoop
Facebook uses Hadoop to analyze user behavior and the effectiveness of ads on the site. The tech team at The New York Times rented computing power on Amazons cloud and used Hadoop to convert 11 million archived articles, dating back to 1851, to digital and searchable documents. They turned around in a single day a job that otherwise would have taken months.
Besides Yahoo!, many other organizations are using Hadoop to run large distributed computations. Some of them include: A9.com Facebook Fox Interactive Media IBM ImageShack ISI Joost Last.fm Powerset The New York Times Rackspace Veoh
YAHOO! RECENTLY LAUNCHED WHAT WE BELIEVE IS THE WORLDS LARGEST APACHE HADOOP PRODUCTION APPLICATION. THE YAHOO! SEARCH WEBMAP IS A HADOOP APPLICATION THAT RUNS ON A MORE THAN 10,000 CORE LINUX CLUSTER AND PRODUCES DATA THAT IS NOW USED IN EVERY YAHOO! WEB SEARCH QUERY. THE WEBMAP BUILD STARTS WITH EVERY WEB PAGE CRAWLED BY YAHOO! AND PRODUCES A DATABASE OF ALL KNOWN WEB PAGES AND SITES ON THE INTERNET AND A VAST ARRAY OF DATA ABOUT EVERY PAGE AND SITE. THIS DERIVED DATA FEEDS THE MACHINE LEARNED RANKING ALGORITHMS AT THE HEART OF YAHOO! SEARCH.
Yahoos Hadoop
One of Yahoo's Hadoop clusters sorted 1 terabyte of data in 209 seconds, which beat the previous record of 297 seconds in the annual general purpose (daytona) terabyte sort benchmark. The sort benchmark, which was created in 1998 by Jim Gray, specifies the input data (10 billion 100 byte records), which must be completely sorted and written to disk. This is the first time that either a Java or an open source program has won. Yahoo is both the largest user of Hadoop with 13,000+ nodes running hundreds of thousands of jobs a month and the largest contributor, although nonYahoo usage and contributions are increasing rapidly. The cluster statistics were:
910 nodes, 2 quad core Xeons @ 2.0ghz per node 4 SATA disks per node, 8G RAM per node 1 gigabit ethernet on each node, 40 nodes per rack 8 gigabit ethernet uplinks from each rack to the core. Red Hat Enterprise Linux Server Release 5.1 (kernel 2.6.18) Sun Java JDK 1.6.0_05-b13
Process Diagram
Map/Reduce Processes
Launching Application User application code Submits a specific kind of Map/Reduce job JobTracker Handles all jobs Makes all scheduling decisions TaskTracker Manager for all tasks on a given node Task Runs an individual map or reduce fragment for a given job Forks from the TaskTracker
failure
Imp Links