Hadoop Tutorial
Hadoop Tutorial
Hadoop Tutorial
Agenda
Introduction Hadoop Distributed File System Map-Reduce Pig Q &A
Middleware 2009 2
About Me
Lead Yahoo! Grid Solutions Team since June
2005
Contributor to Hadoop since January 2006 Trained 1000+ Hadoop users at Yahoo! &
elsewhere
Hadoop At Yahoo!
6
Sample Applications
Data analysis is the inner loop of Web 2.0 Data Information Value Log processing: reporting, buzz Search index Machine learning: Spam lters Competitive intelligence
Middleware 2009 8
Search Assist
Insight: Related concepts appear close
together in text corpus
Input: Web pages 1 Billion Pages, 10K bytes each 10 TB of input data Output: List(word, List(related words))
Middleware 2009 11
Search Assist
// Input: List(URL, Text) foreach URL in Input : Words = Tokenize(Text(URL)); foreach word in Tokens : Insert (word, Next(word, Tokens)) in Pairs; Insert (word, Previous(word, Tokens)) in Pairs; // Result: Pairs = List (word, RelatedWord) Group Pairs by word; // Result: List (word, List(RelatedWords) foreach word in Pairs : Count RelatedWords in GroupedPairs; // Result: List (word, List(RelatedWords, count)) foreach word in CountedPairs : Sort Pairs(word, *) descending by count; choose Top 5 Pairs; // Result: List (word, Top5(RelatedWords))
12
15
Performance
101 Random accesses for each user Assume 1 ms per random access 100 ms per user 300 MM users 300 days on a single machine
Middleware 2009 16
MapReduce Paradigm
17
Map
Output_List = Map (Input_List)
Square (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) = (1, 4, 9, 16, 25, 36,49, 64, 81, 100)
19
Reduce
Output_Element = Reduce (Input_List)
20
Parallelism
Map is inherently parallel Each list element processed
independently
Reduce is inherently sequential Unless processing multiple lists Grouping to produce multiple lists
Middleware 2009 21
Output = { (apache, hadoop) (hadoop, mapreduce) (hadoop, streaming) (hadoop, pig) (apache, pig) (hadoop, DFS) (streaming, commandline) (hadoop, java) (DFS, namenode) (datanode, block) (replication, default)... }
22
Output = { (hadoop, apache, 7) (hadoop, DFS, 3) (hadoop, streaming, 4) (hadoop, mapreduce, 9) ... }
23
Apache Hadoop
January 2006: Subproject of Lucene January 2008: Top-level Apache project Latest Version: 0.21 Stable Version: 0.20.x Major contributors:Yahoo!, Facebook,
Powerset
Middleware 2009 26
Apache Hadoop
Reliable, Performant Distributed le system MapReduce Programming framework Sub-Projects: HBase, Hive, Pig, Zookeeper,
Chukwa, Avro
Failure is not an option, its a rule ! 1000 nodes, MTBF < 1 day 4000 disks, 8000 cores, 25 switches, 1000
NICs, 2000 DIMMS (16TB RAM)
Hadoop Goals
31
HDFS
Data is organized into les and directories Files are divided into uniform sized blocks
(default 64MB) and distributed across cluster nodes computation can be migrated to data
HDFS
Blocks are replicated (default 3) to handle
hardware failure
HDFS
Master-Worker Architecture Single NameNode Many (Thousands) DataNodes
Middleware 2009 34
Namenode
Mapping of datanode to list of blocks Monitor datanode health Replicate missing blocks Keeps ALL namespace in memory 60M objects (File/Block) in 16GB
Middleware 2009 36
Datanodes
Handle block storage on multiple volumes
& block integrity nodes
Clients access the blocks directly from data Periodically send heartbeats and block
reports to Namenode
HDFS Architecture
Replication
A les replication factor can be changed
dynamically (default 3)
Accessing HDFS
hadoop fs [-fs <local | file system URI>] [-conf <configuration file>] [-D <property=value>] [-ls <path>] [-lsr <path>] [-du <path>] [-dus <path>] [-mv <src> <dst>] [-cp <src> <dst>] [-rm <src>] [-rmr <src>] [-put <localsrc> ... <dst>] [-copyFromLocal <localsrc> ... <dst>] [-moveFromLocal <localsrc> ... <dst>] [-get [-ignoreCrc] [-crc] <src> <localdst> [-getmerge <src> <localdst> [addnl]] [-cat <src>] [-copyToLocal [-ignoreCrc] [-crc] <src> <localdst>] [-moveToLocal <src> <localdst>] [-mkdir <path>] [-report] [-setrep [-R] [-w] <rep> <path/file>] [-touchz <path>] [-test -[ezd] <path>] [-stat [format] <path>] [-tail [-f] <path>] [-text <path>] [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...] [-chown [-R] [OWNER][:[GROUP]] PATH...] [-chgrp [-R] GROUP PATH...] [-count[-q] <path>] [-help [cmd]]
40
libHDFS
#include hdfs.h hdfsFS fs = hdfsConnectNewInstance("default", 0); hdfsFile writeFile = hdfsOpenFile(fs, /tmp/test.txt, O_WRONLY|O_CREAT, 0, 0, 0); tSize num_written = hdfsWrite(fs, writeFile, (void*)buffer, sizeof(buffer)); hdfsCloseFile(fs, writeFile); hdfsFile readFile = hdfsOpenFile(fs, /tmp/test.txt, O_RDONLY, 0, 0, 0); tSize num_read = hdfsRead(fs, readFile, (void*)buffer, sizeof(buffer)); hdfsCloseFile(fs, readFile); hdfsDisconnect(fs);
42
Installing Hadoop
Check requirements Java 1.6+ bash (Cygwin on Windows) Download Hadoop release Change conguration Launch daemons
Middleware 2009 43
Download Hadoop
$ wget http://www.apache.org/dist/hadoop/core/ hadoop-0.18.3/hadoop-0.18.3.tar.gz $ tar zxvf hadoop-0.18.3.tar.gz $ cd hadoop-0.18.3 $ ls -cF conf commons-logging.properties hadoop-site.xml configuration.xsl log4j.properties hadoop-default.xml masters hadoop-env.sh slaves hadoop-metrics.properties sslinfo.xml.example
44
Set Environment
# Modify conf/hadoop-env.sh $ $ $ $ export export export export JAVA_HOME=.... HADOOP_HOME=.... HADOOP_SLAVES=${HADOOP_HOME}/conf/slaves HADOOP_CONF_DIR=${HADOOP_HOME}/conf
# Enable password-less ssh # Assuming $HOME is shared across all nodes $ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa $ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
45
Make Directories
# On Namenode, create metadata storage and tmp space $ mkdir -p /home/hadoop/dfs/name $ mkdir -p /tmp/hadoop # Create slaves file $ cat > conf/slaves slave00 slave01 slave02 ... ^D # Create data directories on each slave $ bin/slaves.sh "mkdir -p /tmp/hadoop" $ bin/slaves.sh "mkdir -p /home/hadoop/dfs/data"
46
Start Daemons
# Modify hadoop-site.xml with appropriate # fs.default.name, mapred.job.tracker, etc. $ mv ~/myconf.xml conf/hadoop-site.xml # On Namenode $ bin/hadoop namenode -format # Start all daemons $ bin/start-all.sh # Done !
47
Check Namenode
Cluster Summary
Browse Filesystem
Browse Filesystem
Browse Filesystem
Questions ?
Hadoop MapReduce
54
Think MR
Record = (Key,Value) Key : Comparable, Serializable Value: Serializable Input, Map, Shufe, Reduce, Output
Middleware 2009 55
Seems Familiar ?
cat /var/log/auth.log* | \ grep session opened | cut -d -f10 | \ sort | \ uniq -c > \ ~/userlist
56
Map
Input: (Key ,Value ) Output: List(Key ,Value ) Projections, Filtering, Transformation
1 1 2 2
Middleware 2009 57
Shufe
Input: List(Key ,Value ) Output Sort(Partition(List(Key , List(Value )))) Provided by Hadoop
2 2 2 2
Middleware 2009 58
Reduce
Input: List(Key , List(Value )) Output: List(Key ,Value ) Aggregation
2 2 3 3
Middleware 2009 59
Example: Unigrams
Input: Huge text corpus Wikipedia Articles (40GB uncompressed) Output: List of words sorted in descending
order of frequency
Middleware 2009
60
Unigrams
$ cat ~/wikipedia.txt | \ sed -e 's/ /\n/g' | grep . | \ sort | \ uniq -c > \ ~/frequencies.txt $ cat ~/frequencies.txt | \ # cat | \ sort -n -k1,1 -r | # cat > \ ~/unigrams.txt
61
MR for Unigrams
mapper (filename, file-contents): for each word in file-contents: emit (word, 1) reducer (word, values): sum = 0 for each value in values: sum = sum + value emit (word, sum)
62
MR for Unigrams
mapper (word, frequency): emit (frequency, word) reducer (frequency, words): for each word in words: emit (word, frequency)
63
Dataow
MR Dataow
Unigrams: Driver
public void run(String inputPath, String outputPath) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); conf.setMapperClass(MapClass.class); conf.setReducerClass(Reduce.class); FileInputFormat.addInputPath(conf, new Path(inputPath)); FileOutputFormat.setOutputPath(conf, new Path(outputPath)); JobClient.runJob(conf); }
68
MapReduce Pipeline
Pipeline Details
Conguration
Unied Mechanism for Conguring Daemons Runtime environment for Jobs/Tasks Defaults: *-default.xml Site-Specic: *-site.xml nal parameters
Middleware 2009 71
Example
<configuration> <property> <name>mapred.job.tracker</name> <value>head.server.node.com:9001</value> </property> <property> <name>fs.default.name</name> <value>hdfs://head.server.node.com:9000</value> </property> <property> <name>mapred.child.java.opts</name> <value>-Xmx512m</value> <final>true</final> </property> .... </configuration>
72
InputFormats
Format
TextInputFormat (Default) KeyValueInputFormat SequenceFileInputFormat
Key Type
File Offset Text (upto \t) User-Dened
Value Type
Text Line Remaining Text User-Dened
OutputFormats
Format
TextOutputFormat (default)
Description
Key \t Value \n
Hadoop Streaming
Hadoop is written in Java Java MapReduce code is native What about Non-Java Programmers ? Perl, Python, Shell, R grep, sed, awk, uniq as Mappers/Reducers Text Input and Output
Middleware 2009 75
Hadoop Streaming
Thin Java wrappers for Map & Reduce Tasks Forks actual Mapper & Reducer IPC via stdin, stdout, stderr Key.toString() \t Value.toString() \n Slower than Java programs Allows for quick prototyping / debugging
Middleware 2009 76
Hadoop Streaming
$ bin/hadoop jar hadoop-streaming.jar \ -input in-files -output out-dir \ -mapper mapper.sh -reducer reducer.sh # mapper.sh sed -e 's/ /\n/g' | grep . # reducer.sh uniq -c | awk '{print $2 "\t" $1}'
77
Hadoop Pipes
Library for C/C++ Key & Value are std::string (binary) Communication through Unix pipes High numerical performance legacy C/C++ code (needs modication)
Middleware 2009 78
Pipes Program
#include "hadoop/Pipes.hh" #include "hadoop/TemplateFactory.hh" #include "hadoop/StringUtils.hh" int main(int argc, char *argv[]) { return HadoopPipes::runTask( HadoopPipes::TemplateFactory<WordCountMap, WordCountReduce>()); }
79
Pipes Mapper
class WordCountMap: public HadoopPipes::Mapper { public: WordCountMap(HadoopPipes::TaskContext& context){} void map(HadoopPipes::MapContext& context) { std::vector<std::string> words = HadoopUtils::splitString( context.getInputValue(), " "); for(unsigned int i=0; i < words.size(); ++i) { context.emit(words[i], "1"); } } };
80
Pipes Reducer
class WordCountReduce: public HadoopPipes::Reducer { public: WordCountReduce(HadoopPipes::TaskContext& context){} void reduce(HadoopPipes::ReduceContext& context) { int sum = 0; while (context.nextValue()) { sum += HadoopUtils::toInt(context.getInputValue()); } context.emit(context.getInputKey(), HadoopUtils::toString(sum)); } };
81
Running Pipes
# upload executable to HDFS $ bin/hadoop fs -put wordcount /examples/bin # Specify configuration $ vi /tmp/word.xml ... // Set the binary path on DFS <property> <name>hadoop.pipes.executable</name> <value>/examples/bin/wordcount</value> </property> ... # Execute job # bin/hadoop pipes -conf /tmp/word.xml \ -input in-dir -output out-dir
82
MR Architecture
Job Submission
Initialization
Scheduling
Execution
Map Task
Sort Buffer
Reduce Task
Questions ?
92
Running a Job
[milindb@gateway ~]$ hadoop jar \ $HADOOP_HOME/hadoop-examples.jar wordcount \ /data/newsarchive/20080923 /tmp/newsout input.FileInputFormat: Total input paths to process : 4 mapred.JobClient: Running job: job_200904270516_5709 mapred.JobClient: map 0% reduce 0% mapred.JobClient: map 3% reduce 0% mapred.JobClient: map 7% reduce 0% .... mapred.JobClient: map 100% reduce 21% mapred.JobClient: map 100% reduce 31% mapred.JobClient: map 100% reduce 33% mapred.JobClient: map 100% reduce 66% mapred.JobClient: map 100% reduce 100% mapred.JobClient: Job complete: job_200904270516_5709
93
Running a Job
mapred.JobClient: Counters: 18 mapred.JobClient: Job Counters mapred.JobClient: Launched reduce tasks=1 mapred.JobClient: Rack-local map tasks=10 mapred.JobClient: Launched map tasks=25 mapred.JobClient: Data-local map tasks=1 mapred.JobClient: FileSystemCounters mapred.JobClient: FILE_BYTES_READ=491145085 mapred.JobClient: HDFS_BYTES_READ=3068106537 mapred.JobClient: FILE_BYTES_WRITTEN=724733409 mapred.JobClient: HDFS_BYTES_WRITTEN=377464307
94
Running a Job
mapred.JobClient: mapred.JobClient: mapred.JobClient: mapred.JobClient: mapred.JobClient: mapred.JobClient: mapred.JobClient: mapred.JobClient: mapred.JobClient: Map-Reduce Framework Combine output records=73828180 Map input records=36079096 Reduce shuffle bytes=233587524 Spilled Records=78177976 Map output bytes=4278663275 Combine input records=371084796 Map output records=313041519 Reduce input records=15784903
95
JobTracker WebUI
JobTracker Status
Jobs Status
Job Details
Job Counters
Job Progress
All Tasks
Task Details
Task Counters
Task Logs
Debugging
Run job with the Local Runner Set mapred.job.tracker to local Runs application in a single thread Run job on a small data set on a 1 node
cluster
Middleware 2009 106
Debugging
Set keep.failed.task.les to keep les from
failed tasks
108
Example
Bob wants to count records in AdServer
logs (several hundred GB) reducer
Used Identity Mapper & Single counting What is he doing wrong ? This happened, really !
Middleware 2009 109
MapReduce Performance
Reduce intermediate data size map outputs + reduce inputs Maximize map input transfer rate Pipelined writes from reduce Opportunity to load balance
Middleware 2009 110
Shufe
Often the most expensive component M * R Transfers over the network Sort map outputs (intermediate data) Merge reduce inputs
Middleware 2009 111
Improving Shufe
Avoid shufing/sorting if possible Minimize redundant transfers Compress intermediate data
Middleware 2009 112
Avoid Shufe
Set mapred.reduce.tasks to zero Known as map-only computations Filters, Projections, Transformations Number of output les = number of input
splits = number of input blocks
Middleware 2009
114
Combiners
When Maps produce many repeated keys Combiner: Local aggregation after Map &
before Reduce
Compression
Often yields huge performance gains Set mapred.output.compress to true to
compress job output
Load Imbalance
Inherent in application Imbalance in input splits Imbalance in computations Imbalance in partitions Heterogenous hardware Degradation over time
Middleware 2009 117
Speculative Execution
Runs multiple instances of slow tasks Instance that nishes rst, succeeds mapred.map.speculative.execution=true mapred.reduce.speculative.execution=true Can dramatically bring in long tails on jobs
Middleware 2009 120
Hadoop Examples
121
Implementation 1
Two Map-Reduce stages First stage computes Mean Second stage computes standard deviation
Middleware 2009 123
1 ..Nm
Single Reducer Reduce Input (Group (Map Output)) & N Reduce Output ()
Middleware 2009 125
Standard Deviation
Algebraically equivalent Be careful about numerical accuracy, though
Implementation 2
Map Input (x for i = 1 ..N ) Map Output (N ,
i m m 2 [Sum(x 1..Nm),Mean(x1..Nm)])
NGrams
Bigrams
Input: A large text corpus Output: List(word , Top (word )) Two Stages: Generate all possible bigrams Find most frequent K bigrams for each
1 K 2
word
Middleware 2009
129
pairs.pl
while(<STDIN>) { chomp; $_ =~ s/[^a-zA-Z]+/ /g ; $_ =~ s/^\s+//g ; $_ =~ s/\s+$//g ; $_ =~ tr/A-Z/a-z/; my @words = split(/\s+/, $_); for (my $i = 0; $i < $#words - 1; ++$i) { print "$words[$i]:$words[$i+1]\n"; print "$words[$i+1]:$words[$i]\n"; } }
131
partitioned
count.pl
$_ = <STDIN>; chomp; my ($pw1, $pw2) = split(/:/, $_); $count = 1; while(<STDIN>) { chomp; my ($w1, $w2) = split(/:/, $_); if ($w1 eq $pw1 && $w2 eq $pw2) { $count++; } else { print "$pw1:$count:$pw2\n"; $pw1 = $w1; $pw2 = $w2; $count = 1; } } print "$pw1:$count:$pw2\n";
133
rstN.pl
$N = 5; $_ = <STDIN>; chomp; my ($pw1, $count, $pw2) = split(/:/, $_); $idx = 1; $out = "$pw1\t$pw2,$count;"; while(<STDIN>) { chomp; my ($w1, $c, $w2) = split(/:/, $_); if ($w1 eq $pw1) { if ($idx < $N) { $out .= "$w2,$c;"; $idx++; } } else { print "$out\n"; $pw1 = $w1; $idx = 1; $out = "$pw1\t$w2,$c;"; } } print "$out\n"; 136
Partitioner
By default, evenly distributes keys hashcode(key) % NumReducers Overriding partitioner Skew in map-outputs Restrictions on reduce outputs All URLs in a domain together
Middleware 2009 137
Partitioner
// JobConf.setPartitionerClass(className) public interface Partitioner <K, V> extends JobConfigurable { int getPartition(K key, V value, int maxPartitions); }
138
Number of Maps
Number of Input Splits Number of HDFS blocks mapred.map.tasks Minimum Split Size (mapred.min.split.size) split_size = max(min(hdfs_block_size,
data_size/#maps), min_split_size)
141 Middleware 2009
Parameter Sweeps
External program processes data based on
command-line parameters
./prog params=0.1,0.3 < in.dat > out.dat Objective: Run an instance of ./prog for each
parameter combination
Parameter Sweeps
Input File: params.txt Each line contains one combination of
parameters
Auxiliary Files
-le auxFile.dat Job submitter adds le to job.jar Unjarred on the task tracker Available to task as $cwd/auxFile.dat Not suitable for large / frequently used les
Middleware 2009 144
Auxiliary Files
Tasks need to access side les Read-only Dictionaries (such as for porn
ltering)
Tasks themselves can fetch les from HDFS Not Always ! (Hint: Unresolved symbols)
Middleware 2009 145
Distributed Cache
Specify side les via cacheFile If lot of such les needed Create a tar.gz archive Upload to HDFS Specify via cacheArchive
Middleware 2009 146
Distributed Cache
TaskTracker downloads these les once Untars archives Accessible in tasks $cwd before task starts Cached across multiple tasks Cleaned up upon exit
Middleware 2009 147
Datasets are streams of key-value pairs Could be split across multiple les in a
single directory
Join could be on Key, or any eld in Value Join could be inner, outer, left outer, cross
product etc
Example
A = (id, name), B = (name, address) A is in /path/to/A/part-* B is in /path/to/B/part-* Select A.name, B.address where A.name ==
B.name
Middleware 2009 149
Map in Join
Input: (Key ,Value ) from A or B map.input.le indicates A or B MAP_INPUT_FILE in Streaming Output: (Key , [Value , A|B]) Key is the Join Key
1 1 2 2 2
Middleware 2009 150
Reduce in Join
Input: Groups of [Value , A|B] for each Key Operation depends on which kind of join Inner join checks if key has values from
2 2
both A & B
MR Join Performance
Map Input = Total of A & B Map output = Total of A & B Shufe & Sort Reduce input = Total of A & B Reduce output = Size of Joined dataset Filter and Project in Map
Middleware 2009 152
Fragment-Replicate
Fragment larger dataset Specify as Map input Replicate smaller dataset Use Distributed Cache Map-Only computation No shufe / sort
Middleware 2009 154
Equipartitioned Join
Available since Hadoop 0.16 Datasets joined before input to mappers Input format: CompositeInputFormat mapred.join.expr Simpler to use in Java, but can be used in
Streaming
Middleware 2009 155
Example
mapred.join.expr = inner ( tbl ( ....SequenceFileInputFormat.class, "hdfs://namenode:8020/path/to/data/A" ), tbl ( ....SequenceFileInputFormat.class, "hdfs://namenode:8020/path/to/data/B" ) )
156
Questions ?
Apache Pig
What is Pig?
System for processing large semistructured data sets using Hadoop MapReduce platform
Pig Latin: High-level procedural language Pig Engine: Parser, Optimizer and
distributed query execution
Middleware 2009 159
Pig vs SQL
Pig is procedural Nested relational data model Schema is optional Scan-centric analytic workloads Limited query optimization
160
SQL is declarative Flat relational data model Schema is required OLTP + OLAP workloads Signicant opportunity for query optimization
Pig vs Hadoop
Increases programmer productivity Decreases duplication of effort Insulates against Hadoop complexity Version Upgrades JobConf conguration tuning Job Chains
Middleware 2009 161
Example
Input: User proles, Page visits Find the top 5 most visited pages by users aged 18-25
In Native Hadoop
In Pig
Users = load users as (name, age); Filtered = filter Users by age >= 18 and age <= 25; Pages = load pages as (user, url); Joined = join Filtered by name, Pages by user; Grouped = group Joined by url; Summed = foreach Grouped generate group, COUNT(Joined) as clicks; Sorted = order Summed by clicks desc; Top5 = limit Sorted 5; store Top5 into top5sites;
164
Natural Fit
Comparison
Expressions
A = LOAD data.txt AS (f1:int , f2:{t:(n1:int, n2:int)}, f3: map[] )
Pig Unigrams
Input: Large text document Process: Load the le For each line, generate word tokens Group by word Count words in each group
Middleware 2009 171
Load
myinput = load '/user/milindb/text.txt' USING TextLoader() as (myword:chararray);
{ (program program) (pig pig) (program pig) (hadoop pig) (latin latin) (pig latin) }
172
Tokenize
words = FOREACH myinput GENERATE FLATTEN(TOKENIZE(*));
{ (program) (program) (pig) (pig) (program) (pig) (hadoop) (pig) (latin) (latin) (pig) (latin) }
173
Group
grouped = GROUP words BY $0;
{ (pig, {(pig), (pig), (pig), (pig), (pig)}) (latin, {(latin), (latin), (latin)}) (hadoop, {(hadoop)}) (program, {(program), (program), (program)}) }
174
Count
counts = FOREACH grouped GENERATE group, COUNT(words);
Store
store counts into /user/milindb/output using PigStorage();
5 3 1 3
176
177
Schema on the y
-- declare your types Grades = load studentgrades as (name: chararray, age: int, gpa: double); Good = filter Grades by age > 18 and gpa > 3.0; -- ordering will be by type Sorted = order Good by gpa; store Sorted into smartgrownups;
178
Nested Data
Logs = load weblogs as (url, userid); Grouped = group Logs by url; -- Code inside {} will be applied to each -- value in turn. DisinctCount = foreach Grouped { Userid = Logs.userid; DistinctUsers = distinct Userid; generate group, COUNT(DistinctUsers); } store DistinctCount into distinctcount;
179
Pig Architecture
Pig Stages
Logical Plan
Directed Acyclic Graph Logical Operator as Node Data ow as edges Logical Operators One per Pig statement Type checking with Schema
Middleware 2009 182
Pig Statements
Load Read data from the le system Write data to the le system Write data to stdout
Store
Dump
Pig Statements
Foreach..Generate Apply expression to each record and generate one or more records Apply predicate to each record and remove records where false Stream records through user-provided binary
Filter
Stream..through
Pig Statements
Group/CoGroup Collect records with the same key from one or more inputs Join two or more inputs based on a key Sort records based on a key
Join
Order..by
Physical Plan
Pig supports two back-ends Local Hadoop MapReduce 1:1 correspondence with most logical
operators
MapReduce Plan
Detect Map-Reduce boundaries Group, Cogroup, Order, Distinct Coalesce operators into Map and Reduce
stages
Lazy Execution
Nothing really executes until you request
output
Middleware 2009
Parallelism
Split-wise parallelism on Map-side
operators
By default, 1 reducer PARALLEL keyword group, cogroup, cross, join, distinct, order
Middleware 2009 189
Running Pig
$ pig grunt > A = load students as (name, age, gpa); grunt > B = filter A by gpa > 3.5; grunt > store B into good_students; grunt > dump A; (jessica thompson, 73, 1.63) (victor zipper, 23, 2.43) (rachel hernandez, 40, 3.60) grunt > describe A; A: (name, age, gpa )
190
Running Pig
Batch mode $ pig myscript.pig Local mode $ pig x local Java mode (embed pig statements in java) Keep pig.jar in the class path
Middleware 2009 191
PigPen
PigPen
194
SQL to Pig
SQL
...FROM MyTable...
Pig
A = LOAD MyTable USING PigStorage(\t) AS (col1:int, col2:int, col3:int);
SQL to Pig
SQL Pig
D = GROUP A BY (col1, col2) SELECT col1, col2, sum(col3) E = FOREACH D GENERATE FROM X GROUP BY col1, col2 FLATTEN(group), SUM(A.col3);
F = FILTER E BY $2 > 5;
...ORDER BY col1
G = ORDER F BY $0;
SQL to Pig
SQL Pig
SQL to Pig
SQL Pig
N = JOIN A by col1 INNER, B by col1 INNER; O = FOREACH N GENERATE A.col1, B.col3; SELECT A.col1, B. -- Or col3 FROM A JOIN B USING (col1) N = COGROUP A by col1 INNER, B by col1 INNER; O = FOREACH N GENERATE flatten(A), flatten(B); P = FOREACH O GENERATE A.col1, B.col3
Questions ?