The key takeaways are that Hadoop is a framework for distributed storage and processing of large datasets across clusters of computers using simple programming models. It introduced the concepts of MapReduce and HDFS.

The main components of Hadoop are the Hadoop Distributed File System (HDFS) for storage and MapReduce for distributed processing of large datasets across clusters.

Pig Latin is a high-level language for analyzing large datasets. It can be used to express data analysis programs, involving operations like joins, grouping, filtering etc in a simple way without writing MapReduce programs directly.

Practical Problem Solving with Hadoop and Pig

Milind Bhandarkar ([email protected])

Introduction Hadoop Distributed File System Map-Reduce Pig Q &A
Agenda: Morning (8.30 - 12.00)

Introduction Motivating Examples Hadoop Distributed File System Hadoop Map-Reduce Q &A
Agenda: Afternoon (1.30 - 5.00)

Performance Tuning Hadoop Examples Pig Pig Latin Language & Examples Architecture Q &A
About Me
Lead Yahoo! Grid Solutions Team since June

Contributor to Hadoop since January 2006 Trained 1000+ Hadoop users at Yahoo! &

20+ years of experience in Parallel

Hadoop At Yahoo!

Hadoop At Yahoo! (Some Statistics)

25,000 + machines in 10+ clusters Largest cluster is 3,000 machines 3 Petabytes of data (compressed,

700+ users 10,000+ jobs/week

Sample Applications
Data analysis is the inner loop of Web 2.0 Data Information Value Log processing: reporting, buzz Search index Machine learning: Spam lters Competitive intelligence
Prominent Hadoop Users

Yahoo! A9.com EHarmony Facebook Fox Interactive Media IBM

Quantcast Joost Last.fm Powerset New York Times Rackspace

Yahoo! Search Assist


Search Assist
Insight: Related concepts appear close
together in text corpus

Input: Web pages 1 Billion Pages, 10K bytes each 10 TB of input data Output: List(word, List(related words))
Search Assist
// Input: List(URL, Text) foreach URL in Input : Words = Tokenize(Text(URL)); foreach word in Tokens : Insert (word, Next(word, Tokens)) in Pairs; Insert (word, Previous(word, Tokens)) in Pairs; // Result: Pairs = List (word, RelatedWord) Group Pairs by word; // Result: List (word, List(RelatedWords) foreach word in Pairs : Count RelatedWords in GroupedPairs; // Result: List (word, List(RelatedWords, count)) foreach word in CountedPairs : Sort Pairs(word, *) descending by count; choose Top 5 Pairs; // Result: List (word, Top5(RelatedWords))

You Might Also Know

You Might Also Know

Insight:You might also know Joe Smith if a
lot of folks you know, know Joe Smith

Numbers: 300 MM users Average connections per user is 100

if you dont know Joe Smith already

You Might Also Know

// Input: List(UserName, List(Connections)) foreach u in UserList : // 300 MM foreach x in Connections(u) : // 100 foreach y in Connections(x) : // 100 if (y not in Connections(u)) : Count(u, y)++; // 3 Trillion Iterations Sort (u,y) in descending order of Count(u,y); Choose Top 3 y; Store (u, {y0, y1, y2}) for serving;


101 Random accesses for each user Assume 1 ms per random access 100 ms per user 300 MM users 300 days on a single machine
MapReduce Paradigm


Map & Reduce

Primitives in Lisp (& Other functional
languages) 1970s

Google Paper 2004 http://labs.google.com/papers/

Output_List = Map (Input_List)

Square (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) = (1, 4, 9, 16, 25, 36,49, 64, 81, 100)


Output_Element = Reduce (Input_List)

Sum (1, 4, 9, 16, 25, 36,49, 64, 81, 100) = 385


Map is inherently parallel Each list element processed

Reduce is inherently sequential Unless processing multiple lists Grouping to produce multiple lists
Search Assist Map

// Input: http://hadoop.apache.org Pairs = Tokenize_And_Pair ( Text ( Input ) )

Output = { (apache, hadoop) (hadoop, mapreduce) (hadoop, streaming) (hadoop, pig) (apache, pig) (hadoop, DFS) (streaming, commandline) (hadoop, java) (DFS, namenode) (datanode, block) (replication, default)... }


Search Assist Reduce

// Input: GroupedList (word, GroupedList(words)) CountedPairs = CountOccurrences (word, RelatedWords)

Output = { (hadoop, apache, 7) (hadoop, DFS, 3) (hadoop, streaming, 4) (hadoop, mapreduce, 9) ... }


Issues with Large Data

Map Parallelism: Splitting input data Shipping input data Reduce Parallelism: Grouping related data Dealing with failures Load imbalance
Apache Hadoop
January 2006: Subproject of Lucene January 2008: Top-level Apache project Latest Version: 0.21 Stable Version: 0.20.x Major contributors:Yahoo!, Facebook,
Apache Hadoop
Reliable, Performant Distributed le system MapReduce Programming framework Sub-Projects: HBase, Hive, Pig, Zookeeper,
Chukwa, Avro

Related Projects: Mahout, Hama, Cascading,

Scribe, Cassandra, Dumbo, Hypertable, KosmosFS
Problem: Bandwidth to Data

Scan 100TB Datasets on 1000 node cluster Remote storage @ 10MB/s = 165 mins Local storage @ 50-200MB/s = 33-8 mins Moving computation is more efcient than
moving data

Need visibility into data placement

Failure is not an option, its a rule ! 1000 nodes, MTBF < 1 day 4000 disks, 8000 cores, 25 switches, 1000
NICs, 2000 DIMMS (16TB RAM)

Problem: Scaling Reliably

Need fault tolerant store with reasonable

availability guarantees

Handle hardware faults transparently

Hadoop Goals

Scalable: Petabytes (1015 Bytes) of data on thousands on nodes

Economical: Commodity components only Reliable Engineering reliability into every

application is expensive
Hadoop Distributed File System


Data is organized into les and directories Files are divided into uniform sized blocks
(default 64MB) and distributed across cluster nodes computation can be migrated to data

HDFS exposes block placement so that

Blocks are replicated (default 3) to handle
hardware failure

Replication for performance and fault

tolerance (Rack-Aware placement)

HDFS keeps checksums of data for

corruption detection and recovery
Master-Worker Architecture Single NameNode Many (Thousands) DataNodes
Middleware 2009 34

HDFS Master (NameNode)

Manages lesystem namespace File metadata (i.e. inode) Mapping inode to list of blocks + locations Authorization & Authentication Checkpoint & journal namespace changes
Middleware 2009 35

Mapping of datanode to list of blocks Monitor datanode health Replicate missing blocks Keeps ALL namespace in memory 60M objects (File/Block) in 16GB
Middleware 2009 36

Handle block storage on multiple volumes
& block integrity nodes

Clients access the blocks directly from data Periodically send heartbeats and block
reports to Namenode

Blocks are stored as underlying OSs les

HDFS Architecture

A les replication factor can be changed
dynamically (default 3)

Block placement is rack aware Block under-replication & over-replication

is detected by Namenode

Balancer application rebalances blocks to

balance datanode utilization
Accessing HDFS
hadoop fs [-fs <local | file system URI>] [-conf <configuration file>] [-D <property=value>] [-ls <path>] [-lsr <path>] [-du <path>] [-dus <path>] [-mv <src> <dst>] [-cp <src> <dst>] [-rm <src>] [-rmr <src>] [-put <localsrc> ... <dst>] [-copyFromLocal <localsrc> ... <dst>] [-moveFromLocal <localsrc> ... <dst>] [-get [-ignoreCrc] [-crc] <src> <localdst> [-getmerge <src> <localdst> [addnl]] [-cat <src>] [-copyToLocal [-ignoreCrc] [-crc] <src> <localdst>] [-moveToLocal <src> <localdst>] [-mkdir <path>] [-report] [-setrep [-R] [-w] <rep> <path/file>] [-touchz <path>] [-test -[ezd] <path>] [-stat [format] <path>] [-tail [-f] <path>] [-text <path>] [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...] [-chown [-R] [OWNER][:[GROUP]] PATH...] [-chgrp [-R] GROUP PATH...] [-count[-q] <path>] [-help [cmd]]



// Get default file system instance fs = Filesystem.get(new Configuration()); // Or Get file system instance from URI fs = Filesystem.get(URI.create(uri), new Configuration()); // Create, open, list, OutputStream out = fs.create(path, ); InputStream in = fs.open(path, ); boolean isDone = fs.delete(path, recursive); FileStatus[] fstat = fs.listStatus(path);

#include hdfs.h hdfsFS fs = hdfsConnectNewInstance("default", 0); hdfsFile writeFile = hdfsOpenFile(fs, /tmp/test.txt, O_WRONLY|O_CREAT, 0, 0, 0); tSize num_written = hdfsWrite(fs, writeFile, (void*)buffer, sizeof(buffer)); hdfsCloseFile(fs, writeFile); hdfsFile readFile = hdfsOpenFile(fs, /tmp/test.txt, O_RDONLY, 0, 0, 0); tSize num_read = hdfsRead(fs, readFile, (void*)buffer, sizeof(buffer)); hdfsCloseFile(fs, readFile); hdfsDisconnect(fs);

Installing Hadoop
Check requirements Java 1.6+ bash (Cygwin on Windows) Download Hadoop release Change conguration Launch daemons
Download Hadoop
$ wget http://www.apache.org/dist/hadoop/core/ hadoop-0.18.3/hadoop-0.18.3.tar.gz $ tar zxvf hadoop-0.18.3.tar.gz $ cd hadoop-0.18.3 $ ls -cF conf commons-logging.properties hadoop-site.xml configuration.xsl log4j.properties hadoop-default.xml masters hadoop-env.sh slaves hadoop-metrics.properties sslinfo.xml.example


Set Environment
# Modify conf/hadoop-env.sh $ $ $ $ export export export export JAVA_HOME=.... HADOOP_HOME=.... HADOOP_SLAVES=${HADOOP_HOME}/conf/slaves HADOOP_CONF_DIR=${HADOOP_HOME}/conf

# Enable password-less ssh # Assuming $HOME is shared across all nodes $ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa $ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys


Make Directories
# On Namenode, create metadata storage and tmp space $ mkdir -p /home/hadoop/dfs/name $ mkdir -p /tmp/hadoop # Create slaves file $ cat > conf/slaves slave00 slave01 slave02 ... ^D # Create data directories on each slave $ bin/slaves.sh "mkdir -p /tmp/hadoop" $ bin/slaves.sh "mkdir -p /home/hadoop/dfs/data"

Start Daemons
# Modify hadoop-site.xml with appropriate # fs.default.name, mapred.job.tracker, etc. $ mv ~/myconf.xml conf/hadoop-site.xml # On Namenode $ bin/hadoop namenode -format # Start all daemons $ bin/start-all.sh # Done !

Check Namenode

Cluster Summary

Browse Filesystem

Browse Filesystem

Browse Filesystem

Questions ?

Hadoop MapReduce


Think MR
Record = (Key,Value) Key : Comparable, Serializable Value: Serializable Input, Map, Shufe, Reduce, Output
Seems Familiar ?

cat /var/log/auth.log* | \ grep session opened | cut -d -f10 | \ sort | \ uniq -c > \ ~/userlist


Input: (Key ,Value ) Output: List(Key ,Value ) Projections, Filtering, Transformation
1 1 2 2
Input: List(Key ,Value ) Output Sort(Partition(List(Key , List(Value )))) Provided by Hadoop
2 2 2 2
Middleware 2009 58

Input: List(Key , List(Value )) Output: List(Key ,Value ) Aggregation
2 2 3 3
Middleware 2009 59

Example: Unigrams
Input: Huge text corpus Wikipedia Articles (40GB uncompressed) Output: List of words sorted in descending
order of frequency

$ cat ~/wikipedia.txt | \ sed -e 's/ /\n/g' | grep . | \ sort | \ uniq -c > \ ~/frequencies.txt $ cat ~/frequencies.txt | \ # cat | \ sort -n -k1,1 -r | # cat > \ ~/unigrams.txt


MR for Unigrams
mapper (filename, file-contents): for each word in file-contents: emit (word, 1) reducer (word, values): sum = 0 for each value in values: sum = sum + value emit (word, sum)


MR for Unigrams

mapper (word, frequency): emit (frequency, word) reducer (frequency, words): for each word in words: emit (word, frequency)



MR Dataow

Unigrams: Java Mapper

public static class MapClass extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { Text word = new Text(itr.nextToken()); output.collect(word, new IntWritable(1)); } } }

Unigrams: Java Reducer

public static class Reduce extends MapReduceBase implements Reducer <Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }

Unigrams: Driver
public void run(String inputPath, String outputPath) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); conf.setMapperClass(MapClass.class); conf.setReducerClass(Reduce.class); FileInputFormat.addInputPath(conf, new Path(inputPath)); FileOutputFormat.setOutputPath(conf, new Path(outputPath)); JobClient.runJob(conf); }

MapReduce Pipeline

Pipeline Details

Unied Mechanism for Conguring Daemons Runtime environment for Jobs/Tasks Defaults: *-default.xml Site-Specic: *-site.xml nal parameters
Middleware 2009 71

<configuration> <property> <name>mapred.job.tracker</name> <value>head.server.node.com:9001</value> </property> <property> <name>fs.default.name</name> <value>hdfs://head.server.node.com:9000</value> </property> <property> <name>mapred.child.java.opts</name> <value>-Xmx512m</value> <final>true</final> </property> .... </configuration>

TextInputFormat (Default) KeyValueInputFormat SequenceFileInputFormat

Key Type
File Offset Text (upto \t) User-Dened

Value Type
Text Line Remaining Text User-Dened

TextOutputFormat (default)

Key \t Value \n

Binary Serialized keys and SequenceFileOutputFormat values NullOutputFormat Discards Output

Hadoop Streaming
Hadoop is written in Java Java MapReduce code is native What about Non-Java Programmers ? Perl, Python, Shell, R grep, sed, awk, uniq as Mappers/Reducers Text Input and Output
Hadoop Streaming
Thin Java wrappers for Map & Reduce Tasks Forks actual Mapper & Reducer IPC via stdin, stdout, stderr Key.toString() \t Value.toString() \n Slower than Java programs Allows for quick prototyping / debugging
Middleware 2009 76

Hadoop Streaming
$ bin/hadoop jar hadoop-streaming.jar \ -input in-files -output out-dir \ -mapper mapper.sh -reducer reducer.sh # mapper.sh sed -e 's/ /\n/g' | grep . # reducer.sh uniq -c | awk '{print $2 "\t" $1}'


Hadoop Pipes
Library for C/C++ Key & Value are std::string (binary) Communication through Unix pipes High numerical performance legacy C/C++ code (needs modication)
Pipes Program
#include "hadoop/Pipes.hh" #include "hadoop/TemplateFactory.hh" #include "hadoop/StringUtils.hh" int main(int argc, char *argv[]) { return HadoopPipes::runTask( HadoopPipes::TemplateFactory<WordCountMap, WordCountReduce>()); }


Pipes Mapper
class WordCountMap: public HadoopPipes::Mapper { public: WordCountMap(HadoopPipes::TaskContext& context){} void map(HadoopPipes::MapContext& context) { std::vector<std::string> words = HadoopUtils::splitString( context.getInputValue(), " "); for(unsigned int i=0; i < words.size(); ++i) { context.emit(words[i], "1"); } } };


Pipes Reducer
class WordCountReduce: public HadoopPipes::Reducer { public: WordCountReduce(HadoopPipes::TaskContext& context){} void reduce(HadoopPipes::ReduceContext& context) { int sum = 0; while (context.nextValue()) { sum += HadoopUtils::toInt(context.getInputValue()); } context.emit(context.getInputKey(), HadoopUtils::toString(sum)); } };

Running Pipes
# upload executable to HDFS $ bin/hadoop fs -put wordcount /examples/bin # Specify configuration $ vi /tmp/word.xml ... // Set the binary path on DFS <property> <name>hadoop.pipes.executable</name> <value>/examples/bin/wordcount</value> </property> ... # Execute job # bin/hadoop pipes -conf /tmp/word.xml \ -input in-dir -output out-dir

MR Architecture

Job Submission




Map Task

Sort Buffer

Reduce Task

Questions ?

Running Hadoop Jobs


Running a Job
[milindb@gateway ~]$ hadoop jar \ $HADOOP_HOME/hadoop-examples.jar wordcount \ /data/newsarchive/20080923 /tmp/newsout input.FileInputFormat: Total input paths to process : 4 mapred.JobClient: Running job: job_200904270516_5709 mapred.JobClient: map 0% reduce 0% mapred.JobClient: map 3% reduce 0% mapred.JobClient: map 7% reduce 0% .... mapred.JobClient: map 100% reduce 21% mapred.JobClient: map 100% reduce 31% mapred.JobClient: map 100% reduce 33% mapred.JobClient: map 100% reduce 66% mapred.JobClient: map 100% reduce 100% mapred.JobClient: Job complete: job_200904270516_5709

Running a Job
mapred.JobClient: Counters: 18 mapred.JobClient: Job Counters mapred.JobClient: Launched reduce tasks=1 mapred.JobClient: Rack-local map tasks=10 mapred.JobClient: Launched map tasks=25 mapred.JobClient: Data-local map tasks=1 mapred.JobClient: FileSystemCounters mapred.JobClient: FILE_BYTES_READ=491145085 mapred.JobClient: HDFS_BYTES_READ=3068106537 mapred.JobClient: FILE_BYTES_WRITTEN=724733409 mapred.JobClient: HDFS_BYTES_WRITTEN=377464307


Running a Job
mapred.JobClient: mapred.JobClient: mapred.JobClient: mapred.JobClient: mapred.JobClient: mapred.JobClient: mapred.JobClient: mapred.JobClient: mapred.JobClient: Map-Reduce Framework Combine output records=73828180 Map input records=36079096 Reduce shuffle bytes=233587524 Spilled Records=78177976 Map output bytes=4278663275 Combine input records=371084796 Map output records=313041519 Reduce input records=15784903


JobTracker WebUI

JobTracker Status

Jobs Status

Job Details

Job Counters

Job Progress

All Tasks

Task Details

Task Counters

Task Logs

Run job with the Local Runner Set mapred.job.tracker to local Runs application in a single thread Run job on a small data set on a 1 node
Set keep.failed.task.les to keep les from
failed tasks

Use the IsolationRunner to run just the

failed task

Java Debugging hints Send a kill -QUIT to the Java process to

get the call stack, locks held, deadlocks
Hadoop Performance Tuning


Bob wants to count records in AdServer
logs (several hundred GB) reducer

Used Identity Mapper & Single counting What is he doing wrong ? This happened, really !
Middleware 2009 109

MapReduce Performance
Reduce intermediate data size map outputs + reduce inputs Maximize map input transfer rate Pipelined writes from reduce Opportunity to load balance
Middleware 2009 110

Often the most expensive component M * R Transfers over the network Sort map outputs (intermediate data) Merge reduce inputs
Middleware 2009 111

Improving Shufe
Avoid shufing/sorting if possible Minimize redundant transfers Compress intermediate data
Middleware 2009 112

Avoid Shufe
Set mapred.reduce.tasks to zero Known as map-only computations Filters, Projections, Transformations Number of output les = number of input
splits = number of input blocks

May overwhelm namenode

Middleware 2009 113

Minimize Redundant Transfers

Combiners Intermediate data compression

When Maps produce many repeated keys Combiner: Local aggregation after Map &
before Reduce

Side-effect free Same interface as Reducers, and often the

same class
Middleware 2009 115

Often yields huge performance gains Set mapred.output.compress to true to
compress job output

Set mapred.compress.map.output to true to

compress map outputs native gzip

Codecs: Java zlib (default), LZO, bzip2,

Middleware 2009 116

Load Imbalance
Inherent in application Imbalance in input splits Imbalance in computations Imbalance in partitions Heterogenous hardware Degradation over time
Middleware 2009 117

Optimal Number of Nodes

T = Map slots per TaskTracker N = optimal number of nodes S = N * T = Total Map slots in cluster M = Map tasks in application Rule of thumb: 5*S < M < 10*S
m m m m m
Middleware 2009 118

Conguring Task Slots

mapred.tasktracker.map.tasks.maximum mapred.tasktracker.reduce.tasks.maximum Tradeoffs: Number of cores, RAM, number
and size of disks

Also consider resources consumed by

TaskTracker & DataNode
Middleware 2009 119

Speculative Execution
Runs multiple instances of slow tasks Instance that nishes rst, succeeds mapred.map.speculative.execution=true mapred.reduce.speculative.execution=true Can dramatically bring in long tails on jobs
Middleware 2009 120

Hadoop Examples


Example: Standard Deviation

Takeaway: Changing algorithm to suit architecture yields the best implementation

Implementation 1
Two Map-Reduce stages First stage computes Mean Second stage computes standard deviation
Stage 1: Compute Mean

Map Input (x for i = 1 ..N ) Map Output (N , Mean(x )) Single Reducer Reduce Input (Group(Map Output)) Reduce Output (Mean(x ))
i m m 1..Nm 1..N
Middleware 2009 124

Stage 2: Compute Standard Deviation

Map Input (x for i = 1 ..N ) & Mean(x ) Map Output (Sum(x Mean(x)) for i =
i m 1..N i 2

1 ..Nm

Single Reducer Reduce Input (Group (Map Output)) & N Reduce Output ()
Middleware 2009 125

Standard Deviation

Algebraically equivalent Be careful about numerical accuracy, though

Implementation 2
Map Input (x for i = 1 ..N ) Map Output (N ,
i m m 2 [Sum(x 1..Nm),Mean(x1..Nm)])

Single Reducer Reduce Input (Group (Map Output)) Reduce Output ()

Input: A large text corpus Output: List(word , Top (word )) Two Stages: Generate all possible bigrams Find most frequent K bigrams for each
1 K 2


Middleware 2009


Bigrams: Stage 1 Map

Generate all possible Bigrams Map Input: Large text corpus Map computation In each sentence, or each word word Output (word , word ), (word , word ) Partition & Sort by (word , word )
1 2 1 2 2 1 1 2
Middleware 2009 130

while(<STDIN>) { chomp; $_ =~ s/[^a-zA-Z]+/ /g ; $_ =~ s/^\s+//g ; $_ =~ s/\s+$//g ; $_ =~ tr/A-Z/a-z/; my @words = split(/\s+/, $_); for (my $i = 0; $i < $#words - 1; ++$i) { print "$words[$i]:$words[$i+1]\n"; print "$words[$i+1]:$words[$i]\n"; } }


Bigrams: Stage 1 Reduce

Input: List(word , word ) sorted and
1 2


Output: List(word , [freq, word ]) Counting similar to Unigrams example

1 2
Middleware 2009 132

$_ = <STDIN>; chomp; my ($pw1, $pw2) = split(/:/, $_); $count = 1; while(<STDIN>) { chomp; my ($w1, $w2) = split(/:/, $_); if ($w1 eq $pw1 && $w2 eq $pw2) { $count++; } else { print "$pw1:$count:$pw2\n"; $pw1 = $w1; $pw2 = $w2; $count = 1; } } print "$pw1:$count:$pw2\n";

Bigrams: Stage 2 Map

Input: List(word , [freq,word ]) Output: List(word , [freq, word ]) Identity Mapper (/bin/cat) Partition by word Sort descending by (word , freq)
1 2 1 2 1 1
Middleware 2009 134

Bigrams: Stage 2 Reduce

Input: List(word , [freq,word ]) partitioned by word sorted descending by (word , freq) Output: Top (List(word , [freq, word ])) For each word, throw away after K records
1 2 1 1 K 1 2
Middleware 2009 135

$N = 5; $_ = <STDIN>; chomp; my ($pw1, $count, $pw2) = split(/:/, $_); $idx = 1; $out = "$pw1\t$pw2,$count;"; while(<STDIN>) { chomp; my ($w1, $c, $w2) = split(/:/, $_); if ($w1 eq $pw1) { if ($idx < $N) { $out .= "$w2,$c;"; $idx++; } } else { print "$out\n"; $pw1 = $w1; $idx = 1; $out = "$pw1\t$w2,$c;"; } } print "$out\n"; 136

By default, evenly distributes keys hashcode(key) % NumReducers Overriding partitioner Skew in map-outputs Restrictions on reduce outputs All URLs in a domain together
Middleware 2009 137

// JobConf.setPartitionerClass(className) public interface Partitioner <K, V> extends JobConfigurable { int getPartition(K key, V value, int maxPartitions); }


Fully Sorted Output

By contract, reducer gets input sorted on

Typically reducer output order is the same

as input order

How to make sure that Keys in part i are all

less than keys in part i+1 ?
139 Middleware 2009

Each output le (part le) is sorted

Fully Sorted Output

Use single reducer for small output Insight: Reducer input must be fully sorted Partitioner should provide fully sorted
reduce input

Sampling + Histogram equalization

Middleware 2009 140

Number of Maps
Number of Input Splits Number of HDFS blocks mapred.map.tasks Minimum Split Size (mapred.min.split.size) split_size = max(min(hdfs_block_size,
data_size/#maps), min_split_size)
141 Middleware 2009

Parameter Sweeps
External program processes data based on
command-line parameters

./prog params=0.1,0.3 < in.dat > out.dat Objective: Run an instance of ./prog for each
parameter combination

Number of Mappers = Number of different

parameter combinations
142 Middleware 2009

Parameter Sweeps
Input File: params.txt Each line contains one combination of

Input format is NLineInputFormat (N=1) Number of maps = Number of splits =

Number of lines in params.txt
Middleware 2009 143

Auxiliary Files
-le auxFile.dat Job submitter adds le to job.jar Unjarred on the task tracker Available to task as $cwd/auxFile.dat Not suitable for large / frequently used les
Middleware 2009 144

Auxiliary Files
Tasks need to access side les Read-only Dictionaries (such as for porn

Tasks themselves can fetch les from HDFS Not Always ! (Hint: Unresolved symbols)
Middleware 2009 145

Dynamically linked libraries

Distributed Cache
Specify side les via cacheFile If lot of such les needed Create a tar.gz archive Upload to HDFS Specify via cacheArchive
Middleware 2009 146

Distributed Cache
TaskTracker downloads these les once Untars archives Accessible in tasks $cwd before task starts Cached across multiple tasks Cleaned up upon exit
Middleware 2009 147

Datasets are streams of key-value pairs Could be split across multiple les in a
single directory

Joining Multiple Datasets

Join could be on Key, or any eld in Value Join could be inner, outer, left outer, cross
product etc

Join is a natural Reduce operation

Middleware 2009 148

A = (id, name), B = (name, address) A is in /path/to/A/part-* B is in /path/to/B/part-* Select A.name, B.address where A.name ==
Middleware 2009 149

Map in Join
Input: (Key ,Value ) from A or B map.input.le indicates A or B MAP_INPUT_FILE in Streaming Output: (Key , [Value , A|B]) Key is the Join Key
1 1 2 2 2
Middleware 2009 150

Reduce in Join
Input: Groups of [Value , A|B] for each Key Operation depends on which kind of join Inner join checks if key has values from
2 2

both A & B

Output: (Key , JoinFunction(Value ,))

2 2
Middleware 2009 151

MR Join Performance
Map Input = Total of A & B Map output = Total of A & B Shufe & Sort Reduce input = Total of A & B Reduce output = Size of Joined dataset Filter and Project in Map
Middleware 2009 152

Join Special Cases

Fragment-Replicate 100GB dataset with 100 MB dataset Equipartitioned Datasets Identically Keyed Equal Number of partitions Each partition locally sorted
Middleware 2009 153

Fragment larger dataset Specify as Map input Replicate smaller dataset Use Distributed Cache Map-Only computation No shufe / sort
Middleware 2009 154

Equipartitioned Join
Available since Hadoop 0.16 Datasets joined before input to mappers Input format: CompositeInputFormat mapred.join.expr Simpler to use in Java, but can be used in
Middleware 2009 155

mapred.join.expr = inner ( tbl ( ....SequenceFileInputFormat.class, "hdfs://namenode:8020/path/to/data/A" ), tbl ( ....SequenceFileInputFormat.class, "hdfs://namenode:8020/path/to/data/B" ) )


Questions ?

Apache Pig

What is Pig?
System for processing large semistructured data sets using Hadoop MapReduce platform

Pig Latin: High-level procedural language Pig Engine: Parser, Optimizer and
distributed query execution
Middleware 2009 159

Pig vs SQL

Pig is procedural Nested relational data model Schema is optional Scan-centric analytic workloads Limited query optimization

SQL is declarative Flat relational data model Schema is required OLTP + OLAP workloads Signicant opportunity for query optimization

Pig vs Hadoop
Increases programmer productivity Decreases duplication of effort Insulates against Hadoop complexity Version Upgrades JobConf conguration tuning Job Chains
Middleware 2009 161


Input: User proles, Page visits Find the top 5 most visited pages by users aged 18-25

In Native Hadoop

In Pig
Users = load users as (name, age); Filtered = filter Users by age >= 18 and age <= 25; Pages = load pages as (user, url); Joined = join Filtered by name, Pages by user; Grouped = group Joined by url; Summed = foreach Grouped generate group, COUNT(Joined) as clicks; Sorted = order Summed by clicks desc; Top5 = limit Sorted 5; store Top5 into top5sites;


Natural Fit


Flexibility & Control

Easy to plug-in user code Metadata is not mandatory Does not impose a data model Fine grained control Complex data types
Middleware 2009 167

Pig Data Types

Tuple: Ordered set of elds Field can be simple or complex type Nested relational model Bag: Collection of tuples Can contain duplicates Map: Set of (key, value) pairs
Middleware 2009 168

Simple data types

int : 42 long : 42L oat : 3.1415f double : 2.7182818 chararray : UTF-8 String bytearray : blob
Middleware 2009 169

A = LOAD data.txt AS (f1:int , f2:{t:(n1:int, n2:int)}, f3: map[] )

A = { ( 1, { (2, 3), (4, 6) }, [ yahoo#mail ] ) }


-- A.f1 or A.$0 -- A.f2 or A.$1 -- A.f3 or A.$2

Pig Unigrams
Input: Large text document Process: Load the le For each line, generate word tokens Group by word Count words in each group
Middleware 2009 171

myinput = load '/user/milindb/text.txt' USING TextLoader() as (myword:chararray);

{ (program program) (pig pig) (program pig) (hadoop pig) (latin latin) (pig latin) }


{ (program) (program) (pig) (pig) (program) (pig) (hadoop) (pig) (latin) (latin) (pig) (latin) }


grouped = GROUP words BY $0;

{ (pig, {(pig), (pig), (pig), (pig), (pig)}) (latin, {(latin), (latin), (latin)}) (hadoop, {(hadoop)}) (program, {(program), (program), (program)}) }

counts = FOREACH grouped GENERATE group, COUNT(words);

{ (pig, 5L) (latin, 3L) (hadoop, 1L) (program, 3L) }


store counts into /user/milindb/output using PigStorage();

pig latin hadoop program

5 3 1 3


Example: Log Processing

-- use a custom loader Logs = load /var/log/access_log using CommonLogLoader() as (addr, logname, user, time, method, uri, p, bytes); -- apply your own function Cleaned = foreach Logs generate addr, canonicalize(url) as url; Grouped = group Cleaned by url; -- run the result through a binary Analyzed = stream Grouped through urlanalyzer.py; store Analyzed into analyzedurls;


Schema on the y
-- declare your types Grades = load studentgrades as (name: chararray, age: int, gpa: double); Good = filter Grades by age > 18 and gpa > 3.0; -- ordering will be by type Sorted = order Good by gpa; store Sorted into smartgrownups;


Nested Data
Logs = load weblogs as (url, userid); Grouped = group Logs by url; -- Code inside {} will be applied to each -- value in turn. DisinctCount = foreach Grouped { Userid = Logs.userid; DistinctUsers = distinct Userid; generate group, COUNT(DistinctUsers); } store DistinctCount into distinctcount;


Pig Architecture

Pig Stages

Logical Plan
Directed Acyclic Graph Logical Operator as Node Data ow as edges Logical Operators One per Pig statement Type checking with Schema
Middleware 2009 182

Pig Statements
Load Read data from the le system Write data to the le system Write data to stdout



Pig Statements
Foreach..Generate Apply expression to each record and generate one or more records Apply predicate to each record and remove records where false Stream records through user-provided binary



Pig Statements
Group/CoGroup Collect records with the same key from one or more inputs Join two or more inputs based on a key Sort records based on a key



Physical Plan
Pig supports two back-ends Local Hadoop MapReduce 1:1 correspondence with most logical

Except Distinct, Group, Cogroup, Join etc

Middleware 2009 186

MapReduce Plan
Detect Map-Reduce boundaries Group, Cogroup, Order, Distinct Coalesce operators into Map and Reduce

Job.jar is created and submitted to Hadoop

Middleware 2009 187

Lazy Execution
Nothing really executes until you request

Store, Dump, Explain, Describe, Illustrate Advantages

In-memory pipelining Filter re-ordering across multiple commands


Split-wise parallelism on Map-side

By default, 1 reducer PARALLEL keyword group, cogroup, cross, join, distinct, order
Middleware 2009 189

Running Pig
$ pig grunt > A = load students as (name, age, gpa); grunt > B = filter A by gpa > 3.5; grunt > store B into good_students; grunt > dump A; (jessica thompson, 73, 1.63) (victor zipper, 23, 2.43) (rachel hernandez, 40, 3.60) grunt > describe A; A: (name, age, gpa )


Running Pig
Batch mode $ pig myscript.pig Local mode $ pig x local Java mode (embed pig statements in java) Keep pig.jar in the class path
Middleware 2009 191



Pig for SQL Programmers


SQL to Pig
...FROM MyTable...

A = LOAD MyTable USING PigStorage(\t) AS (col1:int, col2:int, col3:int);

SELECT col1 + col2, col3 ...

B = FOREACH A GENERATE col1 + col2, col3;

...WHERE col2 > 2

C = FILTER B by col2 > 2;

SQL to Pig
D = GROUP A BY (col1, col2) SELECT col1, col2, sum(col3) E = FOREACH D GENERATE FROM X GROUP BY col1, col2 FLATTEN(group), SUM(A.col3);

...HAVING sum(col3) > 5

F = FILTER E BY $2 > 5;

...ORDER BY col1

G = ORDER F BY $0;

SQL to Pig



SELECT col1, count(DISTINCT col2) FROM X GROUP BY col1

K = GROUP A BY col1; L = FOREACH K { M = DISTINCT A.col2; GENERATE FLATTEN(group), count(M); }

SQL to Pig
N = JOIN A by col1 INNER, B by col1 INNER; O = FOREACH N GENERATE A.col1, B.col3; SELECT A.col1, B. -- Or col3 FROM A JOIN B USING (col1) N = COGROUP A by col1 INNER, B by col1 INNER; O = FOREACH N GENERATE flatten(A), flatten(B); P = FOREACH O GENERATE A.col1, B.col3

Questions ?

