UNIT4 Notes
UNIT4 Notes
UNIT4 Notes
Module IV
MapReduce workflows, Unit tests with MRUnit, test data and local tests, anatomy of
MapReduce job run, classic Map-reduce, YARN, failures in classic Map-reduce and
YARN, job scheduling, shuffle and sort, task execution, MapReduce types, input formats,
output Formats.
4.1 MapReduce workflows
What is MapReduce workflow?
MapReduce is the process of making a list of objects and running an operation over each
object in the list (i.e., map) to either produce a new list or calculate a single value (i.e.,
reduce).
The input data that needs to be processed using MapReduce is stored in HDFS. The
processing can be done on a single file or a directory that has multiple files.
The input format defines the input specification and how the input files would be split
and read.
The input split logically represents the data to be processed by an individual mapper.
RecordReader communicates with the input split and converts the data into key-value
pairs suitable to be read by the mapper.
The mapper works on the key-value pairs and gives an intermittent output, which goes
for further processing.
Combiner is a mini reducer that performs mini aggregation on the key-value pairs
generated by the mapper.
Partitioner decides how outputs from combiners are sent to the reducers.
The output of the partitioner is shuffled and sorted. This output is fed as input to the
reducer.
The reducer combines all the intermediate values for the intermediate keys into a list
called tuples.
The RecordWriter writes these output key-value pairs from reducer to the output files.
The output data gets stored in HDFS.
The next section of this MapReduce tutorial covers the architecture of MapReduce.
JobControl
When there is more than one job in a MapReduce workflow, the question arises: how do you
manage the jobs so they are executed in order? There are several approaches, and the main
consideration is whether you have a linear chain of jobs or a more complex directed acyclic
graph (DAG) of jobs.
For a linear chain, the simplest approach is to run each job one after another, waiting until a job
completes successfully before running the next:
JobClient.runJob(conf1);
JobClient.runJob(conf2);
If a job fails, the runJob() method will throw an IOException, so later jobs in the pipeline don’t
get executed. Depending on your application, you might want to catch the exception and clean
up any intermediate data that was produced by any previous jobs.
The approach is similar with the new MapReduce API, except you need to examine the Boolean
return value of the waitForCompletion() method on Job: true means the job succeeded, and false
means it failed.
For anything more complex than a linear chain, there are libraries that can help arrange your
workflow (although they are also suited to linear chains, or even one-off jobs). The simplest is in
the org.apache.hadoop.mapreduce.jobcontrol package: the JobControl class. (There is an
equivalent class in the org.apache.hadoop.mapred.job control package, too.) An instance of
JobControl represents a graph of jobs to be run. You add the job configurations, then tell the
JobControl instance the dependencies between jobs. You run the JobControl in a thread, and it
runs the jobs in dependency order. You can poll for progress, and when the jobs have finished,
you can query for all the jobs’ statuses and the associated errors for any failures. If a job fails,
JobControl won’t run its dependencies.
Apache Oozie
Apache Oozie is a system for running workflows of dependent jobs. It is composed of two main
parts: a workflow engine that stores and runs workflows composed of different types of Hadoop
jobs (MapReduce, Pig, Hive, and so on), and a coordinator engine that runs workflow jobs based
on predefined schedules and data availability. Oozie has been designed to scale, and it can
manage the timely execution of thousands of workflows in a Hadoop cluster, each composed of
possibly dozens of constituent jobs.
Oozie makes rerunning failed workflows more tractable, since no time is wasted running
successful parts of a workflow. Anyone who has managed a complex batch system knows how
difficult it can be to catch up from jobs missed due to downtime or failure, and will appreciate
this feature. (Furthermore, coordinator applications representing a single data pipeline may be
packaged into a bundle and run together as a unit.)
Unlike JobControl, which runs on the client machine submitting the jobs, Oozie runs as a service
in the cluster, and clients submit workflow definitions for immediate or later execution. In Oozie
parlance, a workflow is a DAG of action nodes and control-flow nodes
4.2 Writing a Unit Test with MRUnit
The map and reduce functions in MapReduce are easy to test in isolation, which is a
consequence of their functional style. MRUnit is a testing library that makes it easy to pass
known inputs to a mapper or a reducer and check that the outputs are as expected.MRUnit is
used in conjunction with a standard test execution framework, such as JUnit, so you can run the
tests for MapReduce jobs in your normal development environment.
What is MRUnit?
MRUnit is a Java library that provides a testing framework for Hadoop MapReduce code. It
allows you to write unit tests for your MapReduce code and run them locally, without the need
for a Hadoop cluster. MRUnit is built on top of JUnit, a popular unit testing framework for Java,
and provides a set of APIs that you can use to test your MapReduce code. MRUnit can test both
the map and reduce functions of your code, as well as the overall job configuration.
1. Correctness: MapReduce code can be complex and difficult to debug. Testing your code
can help you catch errors early and ensure that your code works as expected.
2. Performance: MapReduce jobs can take a long time to run, especially when processing
large datasets. Testing your code can help you identify performance bottlenecks and
optimize your code for faster processing.
3. Scalability: Hadoop is designed to scale horizontally, which means that your code must
be able to handle datasets of varying sizes. Testing your code can help you ensure that it
can handle large datasets without crashing or slowing down.
Setting up MRUnit
To use MRUnit, you need to add the MRUnit dependency to your project. You can do this by
adding the following code to your Maven pom.xml file:
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>1.1.0</version>
<scope>test</scope>
</dependency>
Once you have added the dependency, you can start writing unit tests for your MapReduce code.
To write MRUnit tests, you need to create a test class that extends the
org.apache.mrunit.mapreduce.MapReduceDriver class. This class provides methods that allow
you to set up your input data, run your MapReduce job, and verify the output.
driver.withMapper(new WordCountMapper())
.withReducer(new WordCountReducer())
.withInput(new LongWritable(1), new Text("hello world"))
.withOutput(new Text("hello"), new IntWritable(1))
.withOutput(new Text("world"), new IntWritable(1))
.runTest();
}
}
In this example, we create a MapReduceDriver object and configure it with a WordCountMapper
and a WordCountReducer. We then set up our input data and expected output, and run the test
using the runTest() method.
The withInput() method is used to set up the input data for the MapReduce job. In this case, we
are providing a single input record with the value “hello world”.
The withOutput() method is used to set up the expected output for the MapReduce job. In this
case, we expect the output to contain two records: one with the word “hello” and a count of 1,
and one with the word “world” and a count of 1.
Once we have set up our input and output data, we run the test using the runTest() method. This
method runs the MapReduce job and verifies that the output matches the expected output.
Now we can run this application against some local files. Hadoop comes with a local job runner,
a cut-down version of the MapReduce execution engine for running MapReduce jobs in a single
JVM. It’s designed for testing and is very convenient for use in an IDE, since you can run it in a
debugger to step through the code in your mapper and reducer.
Equivalently, we could use the -fs and -jt options provided by GenericOptionsParser: % hadoop
v2.MaxTemperatureDriver -fs file:/// -jt local input/ncdc/micro output This command executes
MaxTemperatureDriver using input from the local input/ncdc/ micro directory, producing output
in the local output directory. Note that although we’ve set -fs so we use the local filesystem
(file:///), the local job runner will actually work fine against any filesystem, including HDFS (and
it can be handy to do this if you have a few files that are on HDFS).
We can examine the output on the local filesystem:
% cat output/part-r-00000
1949 111
1950 22
You can run a MapReduce job with a single method call: submit() on a Job object (you can also
call waitForCompletion(), which submits the job if it hasn’t been submitted already, then waits
for it to finish).1 This method call conceals a great deal of processing behind the scenes. This
section uncovers the steps Hadoop takes to run a job.
Once we give a MapReduce job the system will enter into a series of life cycle phases:
The whole process is illustrated in Figure 7-1. At the highest level, there are five independent
entities:2
• The client, which submits the MapReduce job.
• The YARN resource manager, which coordinates the allocation of compute resources on the
cluster.
• The YARN node managers, which launch and monitor the compute containers on
machines in the cluster.
• The MapReduce application master, which coordinates the tasks running the MapReduce job.
The application master and the MapReduce tasks run in containers that are scheduled by the
resource manager and managed by the node managers.
Job Submission :
The submit() method on Job creates an internal JobSubmitter instance and calls
submitJobInternal() on it.
Having submitted the job, waitForCompletion polls the job’s progress once per second
and reports the progress to the console if it has changed since the last report.
When the job completes successfully, the job counters are displayed Otherwise, the error
that caused the job to fail is logged to the console.
Asks the resource manager for a new application ID, used for the MapReduce job ID.
Checks the output specification of the job For example, if the output directory has not
been specified or it already exists, the job is not submitted and an error is thrown to the
MapReduce program.
Computes the input splits for the job If the splits cannot be computed (because the input
paths don’t exist, for example), the job is not submitted and an error is thrown to the
MapReduce program.
Copies the resources needed to run the job, including the job JAR file, the configuration
file, and the computed input splits, to the shared filesystem in a directory named after the
job ID.
Submits the job by calling submitApplication() on the resource manager.
Job Initialization :
When the resource manager receives a call to its submitApplication() method, it hands off
the request to the YARN scheduler.
The scheduler allocates a container, and the resource manager then launches the
application master’s process there, under the node manager’s management.
The application master for MapReduce jobs is a Java application whose main class is
MRAppMaster.
It initializes the job by creating a number of bookkeeping objects to keep track of the
job’s progress, as it will receive progress and completion reports from the tasks.
It retrieves the input splits computed in the client from the shared
filesystem.
It then creates a map task object for each split, as well as a number of reduce task objects
determined by the mapreduce.job.reduces property (set by the setNumReduceTasks()
method on Job).
Task Assignment:
If the job does not qualify for running as an uber task, then the application master
requests containers for all the map and reduce tasks in the job from the resource manager
Requests for map tasks are made first and with a higher priority than
those for reduce tasks, since all the map tasks must complete before
the sort phase of the reduce can start.
Requests for reduce tasks are not made until 5% of map tasks have completed.
Task Execution:
Once a task has been assigned resources for a container on a particular node by the
resource manager’s scheduler, the application master starts the container by contacting
the node manager.
The task is executed by a Java application whose main class is YarnChild. Before it can
run the task, it localizes the resources that the task needs, including the job configuration
and JAR file, and any files from the distributed cache.
Finally, it runs the map or reduce task.
Streaming:
Streaming runs special map and reduce tasks for the purpose of launching the user
supplied executable and communicating with it.
The Streaming task communicates with the process (which may be written in any
language) using standard input and output streams.
During execution of the task, the Java process passes input key value pairs to the external
process, which runs it through the user defined map or reduce function and passes the
output key value pairs back to the Java process.
From the node manager’s point of view, it is as if the child process
ran the map or reduce code itself.
MapReduce jobs are long running batch jobs, taking anything from
tens of seconds to hours to run.
A job and each of its tasks have a status, which includes such things as the state of the job
or task (e g running, successfully completed, failed), the progress of maps and reduces,
the values of the job’s counters, and a status message or description (which may be set by
user code).
When a task is running, it keeps track of its progress (i e the proportion of task is
completed).
For map tasks, this is the proportion of the input that has been processed.
For reduce tasks, it’s a little more complex, but the system can still estimate the
proportion of the reduce input processed.
It does this by dividing the total progress into three parts, corresponding to the three phases of
the shuffle.
As the map or reduce task runs, the child process communicates with its parent
application master through the umbilical interface.
The task reports its progress and status (including counters) back to its application
master, which has an aggregate view of the job, every three seconds over the umbilical
interface.
Job Completion:
When the application master receives a notification that the last task for a job is complete,
it changes the status for the job to Successful. Then, when the Job polls for status, it
learns that the job has completed successfully, so it prints a message to tell the user and
then returns from the waitForCompletion() .
Finally, on job completion, the application master and the task containers clean up their
working state and the OutputCommitter’s commitJob () method is called.
Job information is archived by the job history server to enable later interrogation by users
if desired.
Task Failure
Consider first the case of the task failing. The most common occurrence of this failure is when
user code in the map or reduce task throws a runtime exception. If this happens, the task JVM
reports the error back to its parent application master before it exits. The error ultimately makes
it into the user logs. The application master marks the task attempt as failed, and frees up the
container so its resources are available for another task.
For Streaming tasks, if the Streaming process exits with a nonzero exit code, it is marked as
failed. This behavior is governed by the stream.non.zero.exit.is.failure property (the default is
true).
Another failure mode is the sudden exit of the task JVM—perhaps there is a JVM bug that
causes the JVM to exit for a particular set of circumstances exposed by the MapReduce user
code. In this case, the node manager notices that the process has exited and informs the
application master so it can mark the attempt as failed.
4.5 YARN
YARN also allows different data processing engines like graph processing, interactive
processing, stream processing as well as batch processing to run and process data stored in
HDFS (Hadoop Distributed File System) thus making the system much more efficient. Through
its various components, it can dynamically allocate various resources and schedule the
application processing. For large volume data processing, it is quite necessary to manage the
available resources properly so that every application can leverage them.
YARN Features: YARN gained popularity because of the following features-
Resource Manager: It is the master daemon of YARN and is responsible for resource
assignment and management among all the applications. Whenever it receives a
processing request, it forwards it to the corresponding node manager and allocates
resources for the completion of the request accordingly. It has two major components:
o Scheduler: It performs scheduling based on the allocated application and available
resources. It is a pure scheduler, means it does not perform other tasks such as
monitoring or tracking and does not guarantee a restart if a task fails. The YARN
scheduler supports plugins such as Capacity Scheduler and Fair Scheduler to partition
the cluster resources.
o Application manager: It is responsible for accepting the application and negotiating the
first container from the resource manager. It also restarts the Application Master container if
a task fails.
Node Manager: It take care of individual node on Hadoop cluster and manages
application and workflow and that particular node. Its primary job is to keep-up with the
Resource Manager. It registers with the Resource Manager and sends heartbeats with the
health status of the node. It monitors resource usage, performs log management and also
kills a container based on directions from the resource manager. It is also responsible for
creating the container process and start it on the request of Application master.
Application Master: An application is a single job submitted to a framework. The
application master is responsible for negotiating resources with the resource manager,
tracking the status and monitoring progress of a single application. The application
master requests the container from the node manager by sending a Container Launch
Context(CLC) which includes everything an application needs to run. Once the
application is started, it sends the health report to the resource manager from time-to-
time.
Container: It is a collection of physical resources such as RAM, CPU cores and disk on
a single node. The containers are invoked by Container Launch Context(CLC) which is a
record that contains information such as environment variables, security tokens,
dependencies etc.
Application workflow in Hadoop YARN:
In the real world, user code is buggy, processes crash, and machines fail. One of the major
benefits of using Hadoop is its ability to handle such failures and allow your job to complete
successfully. We need to consider the failure of any of the following entities the task, the
application master, the node manager, and the resource manager.
Task Failure
Consider first the case of the task failing. The most common occurrence of this failure is when
user code in the map or reduce task throws a runtime exception. If this happens, the task JVM
reports the error back to its parent application master before it exits. The error ultimately makes
it into the user logs. The application master marks the task attempt as failed, and frees up the
container so its resources are available for another task.
Another failure mode is the sudden exit of the task JVM perhaps there is a JVM bug that causes
the JVM to exit for a particular set of circumstances exposed by the MapReduce user code. In
this case, the node manager notices that the process has exited and informs the application master
so it can mark the attempt as failed.
Hanging tasks are dealt with differently. The application master notices that it hasn’t received a
progress update for a while and proceeds to mark the task as failed. The task JVM process will
be killed automatically after this period. The timeout period after which tasks are considered
failed is normally 10 minutes and can be configured on a per-job basis (or a cluster basis) by
setting the mapreduce.task.timeout property to a value in milliseconds.
Setting the timeout to a value of zero disables the timeout, so long-running tasks are never
marked as failed. In this case, a hanging task will never free up its container, and over time there
may be cluster slowdown as a result. This approach should therefore be avoided, and making
sure that a task is reporting progress periodically should suffice.
Just like MapReduce tasks are given several attempts to succeed (in the face of hardware or
network failures), applications in YARN are retried in the event of failure. The maximum
number of attempts to run a MapReduce application master is controlled by the
mapreduce.am.max-attempts property. The default value is 2, so if a MapReduce application
master fails twice it will not be tried again and the job will fail.
YARN imposes a limit for the maximum number of attempts for any YARN application master
running on the cluster, and individual applications may not exceed this limit. The limit is set by
yarn.resourcemanager.am.max-attempts and defaults to 2, so if you want to increase the number
of MapReduce application master attempts, you will have to increase the YARN setting on the
cluster, too.
The way recovery works is as follows. An application master sends periodic heartbeats to the
resource manager, and in the event of application master failure, the resource manager will detect
the failure and start a new instance of the master running in a new container which is managed
by a node manager.
ode Manager Failure
If a node manager fails by crashing or running very slowly, it will stop sending heartbeats to the
resource manager (or send them very infrequently). The resource manager will notice a node
manager that has stopped sending heartbeats if it hasn’t received one for 10 minutes (this is
configured, in milliseconds, via the yarn.resourcemanager.nm.liveness-monitor.expiry-interval-
ms property) and remove it from its pool of nodes to schedule containers on.
Any task or application master running on the failed node manager will be recovered using the
mechanisms described in the previous two sections. In addition, the application master arranges
for map tasks that were run and completed successfully on the failed node manager to be rerun if
they belong to incomplete jobs, since their intermediate output residing on the failed node
manager’s local filesystem may not be accessible to the reduce task.
Failure of the resource manager is serious, because without it, neither jobs nor task containers
can be launched. In the default configuration, the resource manager is a single point of failure,
since in the (unlikely) event of machine failure, all running jobs fail—and can’t be recovered.
To achieve high availability (HA), it is necessary to run a pair of resource managers in an active-
standby configuration. If the active resource manager fails, then the standby can take over
without a significant interruption to the client.
Information about all the running applications is stored in a highly available state store (backed
by ZooKeeper or HDFS), so that the standby can recover the core state of the failed active
resource manager.
4.7 job scheduling YARN
In Hadoop, we can receive multiple jobs from different clients to perform. The Map-Reduce
framework is used to perform multiple tasks in parallel in a typical Hadoop cluster to process
large size datasets at a fast rate. This Map-Reduce Framework is responsible for scheduling and
monitoring the tasks given by different clients in a Hadoop cluster. But this method of
scheduling jobs is used prior to Hadoop 2.
Schedulers and Applications Manager are the 2 major components of resource Manager. The
Scheduler in YARN is totally dedicated to scheduling the jobs, it can not track the status of the
application. On the basis of required resources, the scheduler performs or we can say schedule
the Jobs.
A Job queue is nothing but the collection of various tasks that we have received from our
various clients. The tasks are available in the queue and we need to schedule this task on the
basis of our requirements.
1. FIFO Scheduler
As the name suggests FIFO i.e. First In First Out, so the tasks or application that comes first will
be served first. This is the default Scheduler we use in Hadoop. The tasks are placed in a queue
and the tasks are performed in their submission order. In this method, once the job is scheduled,
no intervention is allowed. So sometimes the high-priority process has to wait for a long time
since the priority of the task does not matter in this method.
Advantage:
Disadvantage:
In Capacity Scheduler we have multiple job queues for scheduling our tasks. The Capacity
Scheduler allows multiple occupants to share a large size Hadoop cluster. In Capacity Scheduler
corresponding for each job queue, we provide some slots or cluster resources for performing job
operation. Each job queue has it’s own slots to perform its task. In case we have tasks to perform
in only one queue then the tasks of that queue can access the slots of other queues also as they
are free to use, and when the new task enters to some other queue then jobs in running in its own
slots of the cluster are replaced with its own job.
Capacity Scheduler also provides a level of abstraction to know which occupant is utilizing the
more cluster resource or slots, so that the single user or application doesn’t take disappropriate or
unnecessary slots in the cluster. The capacity Scheduler mainly contains 3 types of the queue that
are root, parent, and leaf which are used to represent cluster, organization, or any subgroup,
application submission respectively.
Advantage:
Best for working with Multiple clients or priority jobs in a Hadoop cluster
Maximizes throughput in the Hadoop cluster
Disadvantage:
More complex
Not easy to configure for everyone
3. Fair Scheduler
The Fair Scheduler is very much similar to that of the capacity scheduler. The priority of the job
is kept in consideration. With the help of Fair Scheduler, the YARN applications can share the
resources in the large Hadoop Cluster and these resources are maintained dynamically so no need
for prior capacity. The resources are distributed in such a manner that all applications within a
cluster get an equal amount of time. Fair Scheduler takes Scheduling decisions on the basis of
memory, we can configure it to work with CPU also.
As we told you it is similar to Capacity Scheduler but the major thing to notice is that in Fair
Scheduler whenever any high priority job arises in the same queue, the task is processed in
parallel by replacing some portion from the already dedicated slots.
Advantages:
In Hadoop, the process by which the intermediate output from mappers is transferred to the
reducer is called Shuffling. Reducer gets 1 or more keys and associated values on the basis of
reducers. Intermediated key-value generated by mapper is sorted automatically by key. In this
blog, we will discuss in detail about shuffling and Sorting in Hadoop MapReduce.
Here we will learn what is sorting in Hadoop, what is shuffling in Hadoop, what is the purpose of
Shuffling and sorting phase in MapReduce, how MapReduce shuffle works and how
MapReduce sort works.
What is Shuffling and Sorting in Hadoop MapReduce?
Before we start with Shuffle and Sort in MapReduce, let us revise the other phases of
MapReduce like Mapper, reducer in MapReduce, Combiner, partitioner in MapReduce and
inputFormat in MapReduce.
Shuffle phase in Hadoop transfers the map output from Mapper to a Reducer in MapReduce.
Sort phase in MapReduce covers the merging and sorting of map outputs. Data from the mapper
are grouped by the key, split among reducers and sorted by the key. Every reducer obtains all
values associated with the same key. Shuffle and sort phase in Hadoop occur simultaneously and
are done by the MapReduce framework.
Shuffling in MapReduce
The process of transferring data from the mappers to reducers is known as shuffling i.e. the
process by which the system performs the sort and transfers the map output to the reducer as
input. So, MapReduce shuffle phase is necessary for the reducers, otherwise, they would not
have any input (or input from every mapper). As shuffling can start even before the map phase
has finished so this saves some time and completes the tasks in lesser time.
Sorting in MapReduce
The keys generated by the mapper are automatically sorted by MapReduce Framework, i.e.
Before starting of reducer, all intermediate key-value pairs in MapReduce that are generated by
mapper get sorted by key and not by value. Values passed to each reducer are not sorted; they
can be in any order.
Sorting in Hadoop helps reducer to easily distinguish when a new reduce task should start. This
saves time for the reducer. Reducer starts a new reduce task when the next key in the sorted input
data is different than the previous. Each reduce task takes key-value pairs as input and generates
key-value pair as output.
Note that shuffling and sorting in Hadoop MapReduce is not performed at all if you specify zero
reducers (setNumReduceTasks(0)). Then, the MapReduce job stops at the map phase, and the
map phase does not include any kind of sorting (so even the map phase is faster).
Secondary Sorting in MapReduce
If we want to sort reducer’s values, then the secondary sorting technique is used as it enables us
to sort the values (in ascending or descending order) passed to each reducer.
Speculative Execution
The MapReduce model is to break jobs into tasks and run the tasks in parallel to make the overall
job execution time smaller than it would be if the tasks ran sequentially. This makes the job
execution time sensitive to slow-running tasks, as it takes only one slow
task to make the whole job take significantly longer than it would have done otherwise. When a
job consists of hundreds or thousands of tasks, the possibility of a few straggling tasks is very
real.
Tasks may be slow for various reasons, including hardware degradation or software
misconfiguration, but the causes may be hard to detect because the tasks still complete
successfully, albeit after a longer time than expected. Hadoop doesn’t try to diagnose and fix
slow-running tasks; instead, it tries to detect when a task is running slower than expected and
launches another equivalent task as a backup. This is termed speculative execution of tasks.
Speculative execution is an optimization, and not a feature to make jobs run more reliably. If
there are bugs that sometimes cause a task to hang or slow down, relying on speculative
execution to avoid these problems is unwise and won’t work reliably, since the same bugs are
likely to affect the speculative task. You should fix the bug so that the task doesn’t hang or slow
down.
Speculative execution is turned on by default. It can be enabled or disabled independently for
map tasks and reduce tasks, on a cluster-wide basis, or on a per-job basis. The relevant properties
are shown in Table 7-4.
4.10 MapReduce types , input formats, output Formats.
MapReduce Types
Mapping is the core technique of processing a list of data elements that come in pairs of keys and
values. The map function applies to individual elements defined as key-value pairs of a list and
produces a new list. The general idea of map and reduce function of Hadoop can be illustrated as
follows:
The input parameters of the key and value pair, represented by K1 and V1 respectively,
are different from the output pair type: K2 and V2. The reduce function accepts the same
format output by the map, but the type of output again of the reduce operation is
different: K3 and V3. The Java API for this is as follows:
Note that the combine and reduce functions use the same type, except in the variable names
where K3 is K2 and V3 is V2.
The partition function operates on the intermediate key-value types. It controls the partitioning of
the keys of the intermediate map outputs. The key derives the partition using a typical hash
function. The total number of partitions is the same as the number of reduce tasks for the job.
The partition is determined only by the key ignoring the value.
Input Formats
Hadoop has to accept and process a variety of formats, from text files to databases. A chunk of
input, called input split, is processed by a single map. Each split is further divided into logical
records given to the map to process in key-value pair. In the context of database, the split means
reading a range of tuples from an SQL table, as done by the DBInputFormat and producing
LongWritables containing record numbers as keys and DBWritables as values. The Java API for
input splits is as follows:
public interface InputSplit extends Writable {
long getLength() throws IOException;
String[] getLocations() throws IOException;
}
The InputSplit represents the data to be processed by a Mapper. It returns the length in
bytes and has a reference to the input data. It presents a byte-oriented view on the input
and is the responsibility of the RecordReader of the job to process this and present a
record-oriented view. In most cases, we do not deal with InputSplit directly because they
are created by an InputFormat. It is is the responsibility of the InputFormat to create the
input splits and divide them into records.
The JobClient invokes the getSplits() method with appropriate number of split arguments. The
number given is a hint as the actual number of splits may be different from the given number.
Once the split is calculated it is sent to the jobtracker. The jobtracker schedules map tasks for the
tasktrackers using storage location. The tasktracker then passes the split by invoking
getRecordReader() method on the InputFormat to get RecordReader for the split.
The FileInputFormat is the base class for the file data source. It has the responsibility to identify
the files that are to be included as the job input and the definition for generating the split.
Hadoop also includes processing of unstructured data that often comes in textual format. The
TextInputFormat is the default InputFormat for such data.
The SequenceInputFormat takes up binary inputs and stores sequences of binary key-value pairs.
Similarly, DBInputFormat provides the capability to read data from relational database using
JDBC.
Output Formats
The output format classes are similar to their corresponding input format classes and work in the
reverse direction.
For example, the TextOutputFormat is the default output format that writes records as plain text
files, whereas key-values any be of any types, and transforms them into a string by invoking the
toString() method. The key-value character is separated by the tab character, although this can be
customized by manipulating the separator property of the text output format.
The output formats for relational databases and to HBase are handled by DBOutputFormat. It
sends the reduced output to a SQL table. For example, the HBase’s TableOutputFormat enables
the MapReduce program to work on the data stored in the HBase table and uses it for writing
outputs to the HBase table.