Anatomy of Mapreduce Job Run: Some Slides Are Taken From Cmu PPT Presentation
Anatomy of Mapreduce Job Run: Some Slides Are Taken From Cmu PPT Presentation
Anatomy of Mapreduce Job Run: Some Slides Are Taken From Cmu PPT Presentation
Run
Some slides are taken from CMU ppt presentation.
Hadoop 2.X MapReduce Components (Entities)
How Hadoop Runs a MapReduce Job
Job Submission
• Job.runJob()
• Creates a new JobClient instances
• Calls submit() on Job object
• Job.submit ()
• Creates an internal JobSubmitter instance and calls submitJobInternal() on it(step 1 in Figure).
• Having submitted the job, waitForCompletion() polls the job’s progress once per second and
reports the progress to the console if it has changed since the last report.
• The job submission process implemented by JobSubmitter does five things.
Five things done Job Submission Process
1. Asks the resource manager for a new application ID, used for the
MapReduce job ID (step 2).
2. Checks the output specification of the job.
3. Computes the input splits for the job.
4. Copies the resources needed to run the job, including the job JAR
file, the configuration file, and the computed input splits, to the
shared filesystem in a directory named after the job ID (step 3).
5. Submits the job by calling submitApplication() on the resource
manager (step 4).
Input Splits
•
Relation between input splits and HDFS Blocks
Job Initialization
1. When the resource manager receives a call to its submitApplication()
method, it hands off the request to the YARN scheduler.
2. The scheduler allocates a container, and the resource manager then
launches the application master’s process there, under the node manager’s
management (steps 5a and 5b).
3. The application master for MapReduce jobs is a Java application whose main
class is MRAppMaster. It initializes the job by creating a number of
bookkeeping objects to keep track of the job’s progress, as it will receive
progress and completion reports from the tasks (step 6).
4. Next, it retrieves the input splits computed in the client from the shared
filesystem (step 7). It then creates a map task object for each split, as well as
a number of reduce task objects. Tasks are given IDs at this point.
Task Assignment
1. The application master requests containers for all the map and
reduce tasks in the job from the resource manager (step 8).
2. Reduce tasks can run anywhere in the cluster, but requests for map
tasks have data locality constraints that the scheduler tries to
honor.
3. Requests for reduce tasks are not made until 5% of map tasks have
completed
4. Requests also specify memory requirements and CPUs for tasks. By
default, each map and reduce task is allocated 1,024 MB of
memory and one virtual core.
Task Execution
1. Once a task has been assigned resources for a container on a particular
node by the resource manager’s scheduler, the application master starts
the container by contacting the node manager (steps 9a and 9b).
2. The task is executed by a Java application whose main class is YarnChild.
3. Before it can run the task, it localizes the resources that the task needs,
including the job configuration and JAR file, and any files from the
distributed cache (step 10).
4. Finally, it runs the map or reduce task (step 11).
5. The YarnChild runs in a dedicated JVM, so that any bugs in the user-defined
map and reduce functions (or even in YarnChild) don’t affect the node
manager—by causing it to crash or hang, for example.
Progress Measure
• Following operations constitute task progress in Hadoop:
1. Reading an input record (in a mapper or reducer)
2. Writing an output record (in a mapper or reducer)
3. Setting the status description (via Reporter’s or
TaskAttemptContext’s setStatus() method)
4. Incrementing a counter (using Reporter’s incrCounter() method
or Counter’s increment() method)
5. Calling Reporter’s or TaskAttemptContext’s progress() method
Progress and Status Updates
1. When a task is running, it keeps track of its progress (i.e., the proportion of the task
completed). Progress is not always measurable.
2. For map tasks, this is the proportion of the input that has been processed. For reduce
tasks, it’s a little more complex, but the system can still estimate the proportion of
the reduce input processed.
3. Tasks also have a set of counters that count various events as the task runs. The
counters are either built into the framework, such as the number of map output
records written, or defined by users.
4. The task reports its progress and status (including counters) back to its application
master, which has an aggregate view of the job.
5. During the course of the job, the client receives the latest status by polling the
application master every second
propagation of status updates through the
MapReduce system
Job Completion
1. When the application master receives a notification that the last
task for a job is complete, it changes the status for the job to
“successful.”
2. When the Job polls for status, it learns that the job has completed
successfully, so it prints a message to tell the user and then returns
from the waitForCompletion() method.
3. Job statistics and counters are printed to the console at this point.
4. Finally, on job completion, the application master and the task
containers clean up their working state.
5. Job information is archived by the job history server to enable later
interrogation by users if desired.
Failures in Hadoop
• In the real world,
1. User code is buggy,
2. Processes crash, and
3. Machines fail.
• One of the major benefits of using Hadoop is its ability to handle failures.
• Various entities that may fail in Hadoop
1. Task Failure
2. Application Master Failure
3. Node Manager Failure
4. Resource Manager Failure
Task Failure
• User code in the map or reduce task throws a runtime exception
The task JVM reports the error back to its parent application master before it exits. The
error ultimately makes it into the user logs. The application master marks the task
attempt as failed, and frees up the container so its resources are available for another
task.
• Sudden exit of the task JVM—perhaps there is a JVM bug
The node manager notices that the process has exited and informs the application
master so it can mark the attempt as failed.
• Hanging tasks are dealt with differently
The application master notices that it hasn’t received a progress update for a while and
proceeds to mark the task as failed.
• When the application master is notified of a task attempt that has failed, it will reschedule
execution of the task. The application master will try to avoid rescheduling the task on a
node manager where it has previously failed.
Application Master Failure
• An application master sends periodic heartbeats to the resource manager, and
in the event of application master failure, the resource manager will detect the
failure and start a new instance of the master running in a new container.
• In the case of the MapReduce application master, it will use the job history to
recover the state of the tasks that were already run by the (failed) application
so they don’t have to be rerun. Recovery is enabled by default.
• The MapReduce client polls the application master for progress reports, but if
its application master fails, the client needs to locate the new instance.
If the application master fails, however, the client will experience a timeout
when it issues a status update, at which point the client will go back to the
resource manager to ask for the new application master’s address.
Node Manager Failure
• If a node manager fails by crashing or running very slowly, it will stop sending
heartbeats to the resource manager.
The resource manager will notice a node manager that has stopped sending heartbeats
if it hasn’t received one for 10 minutes and remove it from its pool of nodes to schedule
containers on.
Any task or application master running on the failed node manager will be recovered
using the mechanisms described under “Task Failure” and “Application Master Failure”
sections respectively.
The application master arranges for map tasks (which were scheduled on failed nodes)
to be rerun if they belong to incomplete jobs.
Node managers may be blacklisted if the number of failures for the application is high,
even if the node manager itself has not failed. Blacklisting is done by the application
master.
Resource Manager Failure
• The resource manager is a single point of failure.
• To achieve high availability (HA), it is necessary to run a pair of resource managers in an active-
standby configuration. If the active resource manager fails, then the standby can take over
without a significant interruption to the client.
• Information about all the running applications is stored in a highly available state store (backed
by ZooKeeper or HDFS), so that the standby can recover the core state of the failed active
resource manager.
• Node manager information can be reconstructed by the new resource manager as the node
managers send their first heartbeats.
• When the new resource manager starts, it reads the application information from the state
store, then restarts the application masters for all the applications running on the cluster.
• The transition of a resource manager from standby to active is handled by a failover controller.
• Clients and node managers must be configured to handle resource manager failover.
Hadoop MapReduce: A Closer Look
Node 1 Node 2
Files loaded from local HDFS store Files loaded from local HDFS store
InputFormat InputFormat
file file
Split Split Split Split Split Split
file file
RecordReaders RR RR RR RR RR RR RecordReaders
OutputFormat OutputFormat
Writeback to local Writeback to local
HDFS store HDFS store
Input Files
Input files are where the data for a MapReduce task is
initially stored
file
Line-based log files
Binary files
Multi-line input records
Or something else entirely
21
InputFormat
How the input files are split up and read is defined by
the InputFormat
InputFormat is a class that does the following: Files loaded from local HDFS store
InputFormat
Selects the files that should be used
for input file
22
InputFormat Types
Several InputFormats are provided with Hadoop:
23
Input Splits
An input split describes a unit of work that comprises a single map task in a MapReduce program
By dividing the file into splits, we allow Files loaded from local HDFS store
file
If the file is very large, this can improve Split Split Split
The RecordReader class actually loads data from its source and converts it into (K, V) pairs suitable
for reading by Mappers
Files loaded from local HDFS store
file
Split Split Split
The Reducer performs the user-defined work of file
the second phase of the MapReduce program
RR RR RR
Reduce
Combiners and Partitioners
Combiner Example
Partitioner
Each mapper may emit (K, V) pairs to any partition Files loaded from local HDFS store
InputFormat
Therefore, the map nodes must all agree on
where to send different pieces of file
intermediate data Split Split Split
file
RR RR RR
The partitioner class determines which
partition a given (K,V) pair will go to
Map Map Map
Reduce
Sort
Files loaded from local HDFS store
Each Reducer is responsible for reducing
the values associated with (several)
InputFormat
intermediate keys
file
Split Split Split
The set of intermediate keys on a single file
node is automatically sorted by
RR RR RR
MapReduce before they are presented
to the Reducer
Map Map Map
Partitioner
Sort
Reduce
OutputFormat
Files loaded from local HDFS store
OutputFormat Description
Partitioner
TextOutputFormat Default; writes lines in "key \t value"
format
Sort
SequenceFileOutputFormat Writes binary files suitable for
reading into subsequent MapReduce
Reduce
jobs
NullOutputFormat Generates no output files
OutputFormat
Shuffle and Sort: The Map Side
• MapReduce makes the guarantee that the input to every reducer is sorted by
key.
• The process by which the system performs the sort—and transfers the map
outputs to the reducers as inputs—is known as the shuffle
• When the map function starts producing output, it is not simply written to
disk. It takes advantage of buffering by writing in main memory and doing
some presorting for efficiency reasons.
• Each map task has a circular memory buffer that it writes the output to. The
buffer is 100 MB by default.
• When the contents of the buffer reach a certain threshold size (default value
0.80, or 80%), background thread will start to spill the contents to disk.
Shuffle and Sort in MapReduce
Shuffle and Sort: The Map Side
• Spills are written in round-robin fashion to the specified directories.
• Before it writes to disk, the thread first divides the data into partitions corresponding
to the reducers that they will ultimately be sent to.
• Within each partition, the background thread performs an in-memory sort by key,
and if there is a combiner function, it is run on the output of the sort.
• Running the combiner function makes for a more compact map output, so there is
less data to write to local disk and to transfer to the reducer.
• Each time the memory buffer reaches the spill threshold, a new spill file is created, so
after the map task has written its last output record, there could be several spill files.
• Before the task is finished, the spill files are merged into a single partitioned and
sorted output file. If there are at least three spill files, the combiner is run again
before the output file is written.
Shuffle and Sort: The Reduce Side
• The map output file is sitting on the local disk of the machine that ran the map
task, but now it is needed by the machine that is about to run the reduce task
for the partition.
• Moreover, the reduce task needs the map output for its particular partition
from several map tasks across the cluster.
• The map tasks may finish at different times, so the reduce task starts copying
their outputs as soon as each completes. This is known as the copy phase of the
reduce task.
• The reduce task has a small number of copier threads, by default 5, so that it
can fetch map outputs in parallel.
• A thread in the reducer periodically asks the master for map output hosts until
it has retrieved them all.
Shuffle and Sort: The Reduce Side
• Map outputs are copied to the reduce task JVM’s memory if they are small enough.
otherwise, they are copied to disk.
• When the in-memory buffer reaches a threshold size or reaches a threshold number
of map outputs, it is merged and spilled to disk. If a combiner is specified, it will be run
during the merge to reduce the amount of data written to disk.
• As the copies accumulate on disk, a background thread merges them into larger,
sorted files.
• When all the map outputs have been copied, the reduce task moves into the sort
phase, which merges the map outputs, maintaining their sort ordering. This is done in
rounds.
• During the reduce phase, the reduce function is invoked for each key in the sorted
output. The output of this phase is written directly to the output filesystem, typically
HDFS.
Speculative Execution
• The MapReduce model is to break jobs into tasks and run the tasks in parallel to make
the overall job execution time smaller than it would be if the tasks ran sequentially.
• A MapReduce job is dominated by the slowest task
• Hadoop doesn’t try to diagnose and fix slow-running tasks(stragglers); instead, it tries
to detect when a task is running slower than expected and launches another
equivalent task as a backup. This is termed speculative execution of tasks.
• Only one copy of a straggler is allowed to be speculated
• Whichever copy (among the two copies) of a task commits first, it becomes the
definitive copy, and the other copy is killed.
• Speculative execution is an optimization, and not a feature to make jobs run more
reliably.
• Speculative execution is turned on by default.
Output Committers
• Hadoop MapReduce uses a commit protocol to ensure that
jobs and tasks either succeed or fail cleanly.
• The behavior is implemented by the OutputCommitter in use
for the job.
• In the old MapReduce API, the OutputCommitter is set by
calling the setOutputCommitter() on JobConf or by setting
mapred.output.committer.class in the configuration.
• In the new MapReduce API, the OutputCommitter is
determined by the OutputFormat, via its
getOutputCommitter() method.
Apache Pig
Unit III
Outline of Today’s Talk
• What is Pig?
• Why do we need Apache Pig?
• Features of Pig
• Pig Architecture
• Pig Terms
• Example Scripts
What is Pig ?
• Developed by Yahoo! and a top level Apache project
• Pig is a High-level programming language useful for analyzing
large data sets.
• It is an alternative to map reduce programming.
• Pig is generally used with Hadoop. We can perform all the
data manipulations operations in Hadoop using Apache Pig.
• Apache Pig is an abstraction over Map Reduce .It is built on
top of Hadoop.
• The Pig programming language designed to work upon any
kind of data.
• Pig provides a High Level language known as Pig Latin for data
analysis.
Cont...
• Programmers need to write scripts using Pig
Latin language which are internally converted
to Map Reduce tasks. Apache Pig has a
component known as Pig Engine that accepts
the Pig Latin scripts as input and converts
those scripts into Map Reduce jobs.
Why do we need Apache Pig?
• Pig Latin helps programmers to perform Map
Reduce tasks without having to type complex
codes in Java.
• Apache pig uses multi-query approach there by
reducing the length of codes.
• Pig (like Map Reduce) is oriented around the
batch processing of data. If you need to process
gigabytes or terabytes of data ,Pig is a good
choice. But it expects to read all the records of a
file and write all of its output sequentially.
Contd...
• Apache Pig provides many built-in operators
to support data operations like joins, filters,
ordering, etc. In addition, it also provides
nested data types like tuples,bags,and maps
that are missing in Map Reduce.
Features of Pig
• Extensibility: Using the existing operators,
users can develop their own functions to read,
process, and write data.
• UDF’s: Pig provides the facility to create User-
defined Functions in other programming
languages such as Java and invoke or embed
them in Pig scripts.
• Handles all kinds of data: Apache Pig analyzes
all kinds of data, both structured as well as
unstructured. It stores the results in HDFS.
Contd...,
• Rich set of operators: It provides many
operations like joins, sort ,filter, etc.
• Ease of programming: Pig Latin is similar to
SQL and it is easy to write a Pig script if you
are good at SQL.
• Optimization opportunities: The tasks in
Apache Pig optimize their execution
automatically, so the programmers need to
focus only on semantics of the language.
Pig Architecture
Contd...
• Grunt shell : It is an interactive shell to rite/execute Pig Latin
scripts.
• Parser : It takes input from Grunt shell and does following
things like Type checking ,Syntax check and construct a DAG
for Pig Latin scripts. Parser will generate logical plan in the
form of DAG.
• Optimizer: logical plan is given as input to the Optimizer.
Optimizer applies some projections and pushdowns.
Optimizer produces optimized logical plan.
• Compiler: compiler will take logical plan and compile and
generate series of Map Reduce jobs.
• Execution Engine: it will execute Map Reduce tasks by fetching
data from HDFS.
Pig Terms
• All data in Pig one of four types:
– An Atom is a simple data value - stored as a string but
can be used as either a string or a number
– A Tuple is a data record consisting of a sequence of
"fields"
• Each field is a piece of data of any type (atom, tuple or bag)
– A Bag is a set of tuples (also referred to as a ‘Relation’)
• The concept of a “kind of a” table
– A Map is a map from keys that are string literals to
values that can be any data type
• The concept of a hash map
Execution Modes
• Local
-- Executes in a single JVM
--Works exclusively with local file system
--Great for development, experimentation and prototyping
$pig -x local
• Hadoop Mode
-- Also known as MapReduce mode
-- Pig renders Pig Latin into MapReduce jobs and executes
them on the cluster.
-- Can execute against semi-distributed or fully-distributed
hadoop installation
$ pig -x mapreduce
Comparison of Pig with Databases
Running Pig
• Script
– Execute commands in a file
$pig scriptFile.pig
• Grunt
– Interactive Shell for executing Pig Commands
– Started when script file is NOT provided
– Can execute scripts from Grunt via run or exec commands
• Embedded
– Execute Pig commands using Pig Server class
–Just like JDBC to execute SQL
– Can have programmatic access to Grunt via Pig Runner
class
Grunt
• Grunt has line-editing facilities like those found in GNU Readline (used in
the bash shell and many other command-line applications).
• For instance, the Ctrl-E key combination will move the cursor to the end of
the line.
• Grunt remembers command history too and you can recall lines in the
history buffer using Ctrl-P or Ctrl-N (for previous and next), or
equivalently, the up or down cursor keys.
• Another handy feature is Grunt’s completion mechanism, which will try to
complete Pig Latin keywords and functions when you press the Tab key.
• For example, consider the following incomplete line:
grunt> a = foreach b ge
If you press the Tab key at this point, ge will expand to generate, a Pig Latin
keyword:
grunt> a = foreach b generate
Contd...,
• You can customize the completion tokens by creating a file named
autocomplete and placing it on Pig’s classpath (such as in the conf
directory in Pig’s install directory) or in the directory you invoked Grunt
from.
• When you’ve finished your Grunt session, you can exit with the quit
command, or the equivalent shortcut \q.
Pig Data Model
• The data model of Pig Latin is fully nested and it allows complex non-
atomic datatypes such as map and tuple.
• Types
Pig’s data types can be divided into two categories: scalar types, which
contain a single value, and complex types, which contain other types.
Scalar Types
• Pig’s scalar types are simple types that appear in most programming
languages. With the exception of bytearray, they are all represented in Pig
interfaces byjava.lang classes, making them easy to work with in UDFs:
Int,Long ,Float Double, CharArray and byteArray.
Complex Types
Pig has three complex data types: maps, tuples, and bags. All of these
types can contain data of any type, including other complex types.
• Map
• A map in Pig is a chararray to data element mapping, where that element
can be any Pig type, including a complex type. The chararray is called a
key and is used as an index to find the element, referred to as the value.
Bag constants are constructed using braces, with tuples in the bag
separated by commas. For example, {('bob', 55), ('sally', 52), ('john',
25)}constructs a bag with three tuples, each with two fields.
Pig Latin.
• The language used to analyze data in Hadoop using Pig is known as Pig
Latin.
• It is a high level data processing language which provides a rich set of data
types and operators to perform various operations on the data.
• To perform a particular task Programmers using Pig, programmers need to
write a Pig script using the Pig Latin language, and execute them using any
of the execution mechanisms (Grunt Shell, UDFs, Embedded).
• After execution, these scripts will go through a series of transformations
applied by the Pig Framework, to produce the desired output.
• Structure
• Statements
• Expressions
• Functions
Structure
• A Pig Latin program consists of a collection of statements. A statement can be thought of as
an operation or a command.
grouped_records = GROUP records BY year;
• Statements are usually terminated with a semicolon and can be split across multiple lines for
readability.
• records = LOAD 'input/ncdc/micro-tab/sample.txt'
AS (year:chararray, temperature:int, quality:int);
• Pig Latin has two forms of comments.
• Double hyphens are used for single-line comments.
• Everything from the first hyphen to the end of the line is ignored by the Pig Latin Interpreter:
-- My program
DUMP A; -- What's in A?
• Multi-line Comments:
/*
* Description of my program spanning
* multiple lines.
*/
A = LOAD 'input/pig/join/A';
B = LOAD 'input/pig/join/B';
Statements
• As a Pig Latin program is executed, each statement is parsed in turn.
• If there are syntax errors or other (semantic) problems, such as undefined
aliases, the interpreter will halt and display an error message.
• The interpreter builds a logical plan for every relational operation, which
forms the core of a Pig Latin program.
• The logical plan for the statement is added to the logical plan for the
program so far, and then the interpreter moves on to the next statement.
Expressions
• An expression is something that is evaluated to yield a value.
• Expressions can be used in Pig as a part of a statement containing a
relational operator. Pig has a rich variety of expressions, many of which
will be familiar from other programming languages.
• Examples:
Category : Constant
Expression: Literal
Description: Constant value.
Example : 1.0, 'a‘.
• A relation in Pig may have an associated schema, which gives the fields in
the relation names and types.
• Example :
• grunt> records = LOAD 'input/ncdc/micro-tab/sample.txt'
>> AS (year:int, temperature:int, quality:int);
• Apache Pig provides extensive support for user defined functions (UDFs)
as a way to specify custom processing.
• Pig UDFs can currently be executed in three languages: Java, Python,
JavaScript and Ruby.
• The most extensive support is provided for Java functions.
• Java UDFs can be invoked through multiple ways.
• The simplest UDF can just extend EvalFunc, which requires only the exec
function to be implemented.
• What is a Piggybank?
• Piggybank is a collection of user-contributed UDFs that is released along
with Pig. Piggybank UDFs are not included in the Pig JAR, so you have to
register them manually in your script. You can also write your own UDFs or
use those written by other users.
Contd..,
• Eval Functions
• The UDF class extends the EvalFunc class which is the base for all Eval functions.
• All Evaluation functions extend the Java class ‘org.apache.pig.EvalFunc.
package myudfs;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
public class UPPER extends EvalFunc<String>
{
public String exec(Tuple input) throws IOException {
if (input == null || input.size() == 0)
return null;
try{ String str = (String)input.get(0);
return str.toUpperCase();
}catch(Exception e){
throw new IOException("Caught exception processing input row ", e);
}
}
}
Contd..,
• Using the UDF:
a) Registering the Jar File
REGISTER path;
b) Deining the Alias
DEFINE UPPER up();
c) Using the UDF
grunt> Upper_case = FOREACH emp_data
GENERATE up(name);
Data Processing Operators
• Loading and Storing Data:
Load: The Apache Pig LOAD operator is used to load the data from the file system.
Example:
Store: We can store the loaded data in the file system using the store operator.
Example:
• Filtering Data:
Once you have some data loaded into a relation, often the
next step is to filter it to remove the data that you are not
interested in. By filtering early in the processing pipeline,
• you minimize the amount of data flowing through the system,
which can improve efficiency.
FOREACH...GENERATE : The FOREACH...GENERATE operator has
a nested form to support more complex processing.
Example:
grunt> B = FOREACH A GENERATE $0, $2+1, 'Constant';
grunt> DUMP B;
• STREAM: The STREAM operator allows you to transform data in a
relation using an external program or script.
Contd..,