Module 1
Module 1
Module 1
SEMESTER – VIII
CREDITS – 04
· Appraise the role of Business intelligence and its applications across industries
Module – 1 Teaching
Hours
Hadoop Distributed File System Basics, Running Example Programs and 10 Hours
Module – 2
Essential Hadoop Tools, Hadoop YARN Applications, Managing Hadoop with 10 Hours
Module – 3
Module – 4
Module – 5
Text Mining, Naïve-Bayes Analysis, Support Vector Machines, Web Mining, 10 Hours
MODULE 1
HDFS COMPONENTS
The design of HDFS is based on two types of nodes: a NameNode and multiple DataNodes. In a basic
design, a single NameNode manages all the metadata needed to store and retrieve the actual data from
the DataNodes. No data is actually stored on the NameNode, however. For a minimal Hadoop
installation, there needs to be a single NameNode daemon and a single DataNode daemon running on
at least one machine (see the section “Installing Hadoop from Apache Sources” in Chapter 2,
“Installation Recipes”).
The design is a master/slave architecture in which the master (NameNode) manages the file system
namespace and regulates access to files by clients. File system namespace operations such as
opening, closing, and renaming files and directories are all managed by the NameNode. The
NameNode also determines the mapping of blocks to DataNodes and handles DataNode failures.
The slaves (DataNodes) are responsible for serving read and write requests from the file system to the
clients. The NameNode manages block creation, deletion, and replication.
An example of the client/NameNode/DataNode interaction is provided in Figure 3.1. When a client
writes data, it first communicates with the NameNode and requests to create a file. The NameNode
determines how many blocks are needed and provides the client with the DataNodes that will store
the data. As part of the storage process, the data blocks are replicated after they are written to the
assigned node. Depending on how many nodes are in the cluster, the NameNode will attempt to write
replicas of the data blocks on nodes that are in other separate racks (if possible). If there is only one
rack, then the replicated blocks are written to other servers in the same rack. After the DataNode
acknowledges that the file block replication is complete, the client closes the file and informs the
NameNode that the operation is complete. Note that the NameNode does not write any data directly
to the DataNodes. It does, however, give the client a limited amount of time to complete the
operation. If it does not complete in the time period, the operation is canceled.
Reading data happens in a similar fashion. The client requests a file from the NameNode, which
returns the best DataNodes from which to read the data. The client then accesses the data directly
from the DataNodes.
Thus, once the metadata has been delivered to the client, the NameNode steps back and lets the
conversation between the client and the DataNodes proceed. While data transfer is progressing, the
NameNode also monitors the DataNodes by listening for heartbeats sent from DataNodes. The lack of
a heartbeat signal indicates a potential node failure. In such a case, the NameNode will route around
the failed DataNode and begin re-replicating the now-missing blocks. Because the file system is
redundant, DataNodes can be taken offline (decommissioned) for maintenance by informing the
NameNode of the DataNodes to exclude from the HDFS pool.
The mappings between data blocks and the physical DataNodes are not kept in persistent storage on
the NameNode. For performance reasons, the NameNode stores all metadata in memory. Upon
startup, each DataNode provides a block report (which it keeps in persistent storage) to the
NameNode. The block reports are sent every 10 heartbeats. (The interval between reports is a
configurable property.) The reports enable the NameNode to keep an up-to-date account of all data
blocks in the cluster.
In almost all Hadoop deployments, there is a SecondaryNameNode. While not explicitly required by
a NameNode, it is highly recommended. The term “SecondaryNameNode” (now called
CheckPointNode) is somewhat misleading. It is not an active failover node and cannot replace the
primary NameNode in case of its failure. (See the section “NameNode High Availability” later in this
chapter for more explanation.)
The purpose of the SecondaryNameNode is to perform periodic checkpoints that evaluate the status
of the NameNode. Recall that the NameNode keeps all system metadata memory for fast access. It
also has two disk files that track changes to the metadata:
An image of the file system state when the NameNode was started. This file begins
with fsimage_*and is used only at startup by the NameNode.
A series of modifications done to the file system after starting the NameNode. These files begin
with edit_* and reflect the changes made after the fsimage_* file was read.
The location of these files is set by the dfs.namenode.name.dir property in the hdfs-site.xmlfile.
The SecondaryNameNode periodically downloads fsimage and edits files, joins them into a
new fsimage, and uploads the new fsimage file to the NameNode. Thus, when the NameNode
restarts, the fsimage file is reasonably up-to-date and requires only the edit logs to be applied since
the last checkpoint. If the SecondaryNameNode were not running, a restart of the NameNode could
take a prohibitively long time due to the number of changes to the file system.
Thus, the various roles in HDFS can be summarized as follows:
HDFS uses a master/slave model designed for large file reading/streaming.
The NameNode is a metadata server or “data traffic cop.”
HDFS provides a single namespace that is managed by the NameNode.
Data is redundantly stored on DataNodes; there is no data on the NameNode.
The SecondaryNameNode performs checkpoints of NameNode file system’s state but is not a
failover node.
Rack Awareness
Rack awareness deals with data locality. Recall that one of the main design goals of Hadoop
MapReduce is to move the computation to the data. Assuming that most data center networks do not
offer full bisection bandwidth, a typical Hadoop cluster will exhibit three levels of data locality:
1. Data resides on the local machine (best).
2. Data resides in the same rack (better).
3. Data resides in a different rack (good).
When the YARN scheduler is assigning MapReduce containers to work as mappers, it will try to
place the container first on the local machine, then on the same rack, and finally on another rack.
In addition, the NameNode tries to place replicated data blocks on multiple racks for improved fault
tolerance. In such a case, an entire rack failure will not cause data loss or stop HDFS from working.
Performance may be degraded, however.
HDFS can be made rack-aware by using a user-derived script that enables the master node to map the
network topology of the cluster. A default Hadoop installation assumes all the nodes belong to the
same (large) rack. In that case, there is no option 3.
To guarantee the file system state is preserved, both the Active and Standby NameNodes receive
block reports from the DataNodes. The Active node also sends all file system edits to a quorum of
Journal nodes. At least three physically separate JournalNode daemons are required, because edit log
modifications must be written to a majority of the JournalNodes. This design will enable the system
to tolerate the failure of a single JournalNode machine. The Standby node continuously reads the
edits from the JournalNodes to ensure its namespace is synchronized with that of the Active node. In
the event of an Active NameNode failure, the Standby node reads all remaining edits from the
JournalNodes before promoting itself to the Active state.
To prevent confusion between NameNodes, the JournalNodes allow only one NameNode to be a
writer at a time. During failover, the NameNode that is chosen to become active takes over the role of
writing to the JournalNodes. A SecondaryNameNode is not required in the HA configuration because
the Standby node also performs the tasks of the Secondary NameNode.
Apache ZooKeeper is used to monitor the NameNode health. Zookeeper is a highly available service
for maintaining small amounts of coordination data, notifying clients of changes in that data, and
monitoring clients for failures. HDFS failover relies on ZooKeeper for failure detection and for
Standby to Active NameNode election. The Zookeeper components are not depicted in Figure 3.3.
$ hdfs version
Hadoop 2.6.0.2.2.4.2-2
Subversion [email protected]:hortonworks/hadoop.git -r
22a563ebe448969d07902aed869ac13c652b2872
Compiled by jenkins on 2015-03-31T19:49Z
Compiled with protoc 2.5.0
From source with checksum b3481c2cdbe2d181f2621331926e267
This command was run using /usr/hdp/2.2.4.2-2/hadoop/hadoop-
common-2.6.0.2.2.4.2-2.jar
HDFS provides a series of commands similar to those found in a standard POSIX file system. A list
of those commands can be obtained by issuing the following command. Several of these commands
will be highlighted here under the user account hdfs.
Click here to view code image
$ hdfs dfs
Usage: hadoop fs [generic options]
[-appendToFile <localsrc> ... <dst>]
[-cat [-ignoreCrc] <src> ...]
[-checksum <src> ...]
[-chgrp [-R] GROUP PATH...]
[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
[-chown [-R] [OWNER][:[GROUP]] PATH...]
[-copyFromLocal [-f] [-p] [-l] <localsrc> ... <dst>]
[-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
[-count [-q] [-h] <path> ...]
[-cp [-f] [-p | -p[topax]] <src> ... <dst>]
[-createSnapshot <snapshotDir> [<snapshotName>]]
[-deleteSnapshot <snapshotDir> <snapshotName>]
[-df [-h] [<path> ...]]
[-du [-s] [-h] <path> ...]
[-expunge]
[-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
[-getfacl [-R] <path>]
[-getfattr [-R] {-n name | -d} [-e en] <path>]
[-getmerge [-nl] <src> <localdst>]
[-help [cmd ...]]
[-ls [-d] [-h] [-R] [<path> ...]]
[-mkdir [-p] <path> ...]
[-moveFromLocal <localsrc> ... <dst>]
[-moveToLocal <src> <localdst>]
[-mv <src> ... <dst>]
[-put [-f] [-p] [-l] <localsrc> ... <dst>]
[-renameSnapshot <snapshotDir> <oldName> <newName>]
[-rm [-f] [-r|-R] [-skipTrash] <src> ...]
[-rmdir [--ignore-fail-on-non-empty] <dir> ...]
[-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set
<acl_spec> <path>]]
[-setfattr {-n name [-v value] | -x name} <path>]
[-setrep [-R] [-w] <rep> <path> ...]
[-stat [format] <path> ...]
[-tail [-f] <file>]
[-test -[defsz] <path>]
[-text [-ignoreCrc] <src> ...]
[-touchz <path> ...]
[-truncate [-w] <length> <path> ...]
[-usage [cmd ...]]
Found 10 items
drwxrwxrwx - yarn hadoop 0 2015-04-29 16:52 /app-logs
drwxr-xr-x - hdfs hdfs 0 2015-04-21 14:28 /apps
drwxr-xr-x - hdfs hdfs 0 2015-05-14 10:53 /benchmarks
drwxr-xr-x - hdfs hdfs 0 2015-04-21 15:18 /hdp
drwxr-xr-x - mapred hdfs 0 2015-04-21 14:26 /mapred
drwxr-xr-x - hdfs hdfs 0 2015-04-21 14:26 /mr-history
drwxr-xr-x - hdfs hdfs 0 2015-04-21 14:27 /system
drwxrwxrwx - hdfs hdfs 0 2015-05-07 13:29 /tmp
drwxr-xr-x - hdfs hdfs 0 2015-04-27 16:00 /user
drwx-wx-wx - hdfs hdfs 0 2015-05-27 09:01 /var
Found 13 items
drwx------ - hdfs hdfs 0 2015-05-27 20:00 .Trash
drwx------ - hdfs hdfs 0 2015-05-26 15:43 .staging
drwxr-xr-x - hdfs hdfs 0 2015-05-28 13:03 DistributedShell
drwxr-xr-x - hdfs hdfs 0 2015-05-14 09:19 TeraGen-50GB
drwxr-xr-x - hdfs hdfs 0 2015-05-14 10:11 TeraSort-50GB
drwxr-xr-x - hdfs hdfs 0 2015-05-24 20:06 bin
drwxr-xr-x - hdfs hdfs 0 2015-04-29 16:52 examples
drwxr-xr-x - hdfs hdfs 0 2015-04-27 16:00 flume-channel
drwxr-xr-x - hdfs hdfs 0 2015-04-29 14:33 oozie-4.1.0
drwxr-xr-x - hdfs hdfs 0 2015-04-30 10:35 oozie-examples
drwxr-xr-x - hdfs hdfs 0 2015-04-29 20:35 oozie-oozi
drwxr-xr-x - hdfs hdfs 0 2015-05-24 18:11 war-and-peace-input
drwxr-xr-x - hdfs hdfs 0 2015-05-25 15:22 war-and-peace-output
Found 1 items
-rw-r--r-- 2 hdfs hdfs 12857 2015-05-29 13:12 stuff/test
Note that when the fs.trash.interval option is set to a non-zero value in core-site.xml, all deleted files
are moved to the user’s .Trash directory. This can be avoided by including the -skipTrash option.
Click here to view code image
Deleted stuff/test
-------------------------------------------------
report: Access denied for user deadline. Superuser privilege is required
package org.myorg;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
The HadoopDFSFileReadWrite.java example in Listing 3.1 can be compiled on Linux systems using
the following steps. First, create a directory to hold the classes:
$ mkdir HDFSClient-classes
Next, compile the program using 'hadoop classpath' to ensure all the class paths are available:
Click here to view code image
A simple file copy from the local system to HDFS can be accomplished using the following
command:
Click here to view code image
The file can be seen in HDFS by using the hdfs dfs -ls command:
Click here to view code image
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "hdfs.h"
The example can be built using the following steps. The following software environment is assumed:
Operating system: Linux
Platform: RHEL 6.6
Hortonworks HDP 2.2 with Hadoop Version: 2.6
The first step loads the Hadoop environment paths. In particular, the $HADOOP_LIB path is needed
for the compiler.
Click here to view code image
$ . /etc/hadoop/conf/hadoop-env.sh
The program is compiled using gcc and the following command line. In addition to $HADOOP_LIB,
the $JAVA_HOME path is assumed to be in the local environment. If the compiler issues errors or
warnings, confirm that all paths are correct for the Hadoop and Java environment.
Click here to view code image
The location of the run-time library path needs to be set with the following command:
Click here to view code image
$ export
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/jre/lib/amd64/server:$HADOOP_
LIB/lib
The Hadoop class path needs to be set with the following command. The -glob option is required
because Hadoop version 2 uses a wildcard syntax in the output of the hadoop classpath command.
Hadoop version 1 used the full path to every jar file without wildcards. Unfortunately, Java does not
expand the wildcards automatically when launching an embedded JVM via JNI, so older scripts may
not work. The -glob option expands the wildcards.
Click here to view code image
$ /hdfs-simple-test
The new file contents can be inspected using the hdfs dfs -cat command:
Click here to view code image
In other versions, the examples may be in /usr/lib/hadoop-mapreduce/ or some other location. The
exact location of the example jar file can be found using the find command:
Click here to view code image
$ export HADOOP_EXAMPLES=/usr/hdp/2.2.4.2-2/hadoop-mapreduce
Once you define the examples path, you can run the Hadoop examples using the commands discussed
in the following sections.
Note
In previous versions of Hadoop, the command hadoop jar . . . was used to run MapReduce programs.
Newer versions provide the yarn command, which offers more capabilities. Both commands will
work for these examples.
The possible examples are as follows:
Click here to view code image
To illustrate several features of Hadoop and the YARN ResourceManager service GUI,
the pi and terasort examples are presented next. To find help for running the other examples, enter the
example name without any arguments. Chapter 6, “MapReduce Programming,” covers one of the
other popular examples called wordcount.
If the program runs correctly, you should see output similar to the following. (Some of the Hadoop
INFO messages have been removed for clarity.)
Click here to view code image
Number of Maps = 16
Samples per Map = 1000000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Wrote input for Map #10
Wrote input for Map #11
Wrote input for Map #12
Wrote input for Map #13
Wrote input for Map #14
Wrote input for Map #15
Starting Job
...
15/05/13 20:10:30 INFO mapreduce.Job: map 0% reduce 0%
15/05/13 20:10:37 INFO mapreduce.Job: map 19% reduce 0%
15/05/13 20:10:39 INFO mapreduce.Job: map 50% reduce 0%
15/05/13 20:10:46 INFO mapreduce.Job: map 56% reduce 0%
15/05/13 20:10:47 INFO mapreduce.Job: map 94% reduce 0%
15/05/13 20:10:48 INFO mapreduce.Job: map 100% reduce 100%
15/05/13 20:10:48 INFO mapreduce.Job: Job job_1429912013449_0047 completed
successfully
15/05/13 20:10:48 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=358
FILE: Number of bytes written=1949395
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=4198
HDFS: Number of bytes written=215
HDFS: Number of read operations=67
HDFS: Number of large read operations=0
HDFS: Number of write operations=3
Job Counters
Launched map tasks=16
Launched reduce tasks=1
Data-local map tasks=16
Total time spent by all maps in occupied slots (ms)=158378
Total time spent by all reduces in occupied slots (ms)=8462
Total time spent by all map tasks (ms)=158378
Total time spent by all reduce tasks (ms)=8462
Total vcore-seconds taken by all map tasks=158378
Total vcore-seconds taken by all reduce tasks=8462
Total megabyte-seconds taken by all map tasks=243268608
Total megabyte-seconds taken by all reduce tasks=12997632
Map-Reduce Framework
Map input records=16
Map output records=32
Map output bytes=288
Map output materialized bytes=448
Input split bytes=2310
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=448
Reduce input records=32
Reduce output records=0
Spilled Records=64
Shuffled Maps=16
Failed Shuffles=0
Merged Map outputs=16
GC time elapsed (ms)=1842
CPU time spent (ms)=11420
Physical memory (bytes) snapshot=13405769728
Virtual memory (bytes) snapshot=33911930880
Total committed heap usage (bytes)=17026777088
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=1888
File Output Format Counters
Bytes Written=97
Job Finished in 23.718 seconds
Estimated value of Pi is 3.14159125000000000000
Notice that the MapReduce progress is shown in the same way as Hadoop version 1, but the
application statistics are different. Most of the statistics are self-explanatory. The one important item
to note is that the YARN MapReduce framework is used to run the program. (See Chapter 1,
“Background and Concepts,” and Chapter 8, “Hadoop YARN Applications,” for more information
about YARN frameworks.)
For those readers who have used or read about Hadoop version 1, if you look at the Cluster Metrics
table, you will see some new information. First, you will notice that the “Map/Reduce Task Capacity”
has been replaced by the number of running containers. If YARN is running a MapReduce job, these
containers can be used for both map and reduce tasks. Unlike in Hadoop version 1, the number of
mappers and reducers is not fixed. There are also memory metrics and links to node status. If you
click on the Nodes link (left menu under About), you can get a summary of the node activity and
state. For example, Figure 4.2 is a snapshot of the node activity while the pi application is running.
Notice the number of containers, which are used by the MapReduce framework as either mappers or
reducers.
Figure 4.2 Hadoop YARN ResourceManager nodes status window
Going back to the main Applications/Running window (Figure 4.1), if you click on
the application_14299... link, the Application status window in Figure 4.3 will appear. This window
provides an application overview and metrics, including the cluster node on which the
ApplicationMaster container is running.
Figure 4.3 Hadoop YARN application status for the pi example
Clicking the ApplicationMaster link next to “Tracking URL:” in Figure 4.3 leads to the window
shown in Figure 4.4. Note that the link to the application’s ApplicationMaster is also found in the last
column on the main Running Applications screen shown in Figure 4.1.
In the MapReduce Application window, you can see the details of the MapReduce application and the
overall progress of mappers and reducers. Instead of containers, the MapReduce application now
refers to maps and reducers. Clicking job_14299... brings up the window shown in Figure 4.5. This
window displays more detail about the number of pending, running, completed, and failed mappers
and reducers, including the elapsed time since the job started.
Figure 4.5 Hadoop YARN MapReduce job progress
The status of the job in Figure 4.5 will be updated as the job progresses (the window needs to be
refreshed manually). The ApplicationMaster collects and reports the progress of each mapper and
reducer task. When the job is finished, the window is updated to that shown in Figure 4.6. It reports
the overall run time and provides a breakdown of the timing of the key phases of the MapReduce job
(map, shuffle, merge, reduce).
Going back to the job summary page (Figure 4.6), you can also examine the logs for the
ApplicationMaster by clicking the “logs” link. To find information about the mappers and reducers,
click the numbers under the Failed, Killed, and Successful columns. In this example, there were 16
successful mappers and one successful reducer. All the numbers in these columns lead to more
information about individual map or reduce process. For instance, clicking the “16” under
“-Successful” in Figure 4.6 displays the table of map tasks in Figure 4.8. The metrics for the
Application Master container are displayed in table form. There is also a link to the log file for each
process (in this case, a map process). Viewing the logs requires that
the yarn.log.aggregation-enable variable in the yarn-site.xml file be set. For more on changing
Hadoop settings, see Chapter 9, “Managing Hadoop with Apache Ambari.”
Figure 4.8 Hadoop YARN MapReduce logs available for browsing
If you return to the main cluster window (Figure 4.1), choose Applications/Finished, and then select
our application, you will see the summary page shown in Figure 4.9.
There are a few things to notice in the previous windows. First, because YARN manages
applications, all information reported by the ResourceManager concerns the resources provided and
the application type (in this case, MAPREDUCE). In Figure 4.1 and Figure 4.4, the YARN
ResourceManager refers to the pi example by its application-id (application_1429912013449_0044).
YARN has no data about the actual application other than the fact that it is a MapReduce job. Data
from the actual MapReduce job are provided by the MapReduce framework and referenced by a
job-id (job_1429912013449_0044) in Figure 4.6. Thus, two clearly different data streams are
combined in the web GUI: YARN applications and MapReduce framework jobs. If the framework
does not provide job information, then certain parts of the web GUI will not have anything to display.
Another interesting aspect of the previous windows is the dynamic nature of the mapper and reducer
tasks. These tasks are executed as YARN containers, and their number will change as the application
runs. Users may request specific numbers of mappers and reducers, but the ApplicationMaster uses
them in a dynamic fashion. As mappers complete, the ApplicationMaster will return the containers to
the ResourceManager and request a smaller number of reducer containers. This feature provides for
much better cluster utilization because mappers and reducers are dynamic—rather than
fixed—resources.
To report results, the time for the actual sort (terasort) is measured and the benchmark rate in
megabytes/second (MB/s) is calculated. For best performance, the actual terasort benchmark should
be run with a replication factor of 1. In addition, the default number of terasort reducer tasks is set to
1. Increasing the number of reducers often helps with benchmark performance. For example, the
following command will instruct terasort to use four reducer tasks:
Click here to view code image
Also, do not forget to clean up the terasort data between runs (and after testing is finished). The
following command will perform the cleanup for the previous example:
Click here to view code image
Example results are as follows (date and time prefix removed). The large standard deviation is due to
the placement of tasks in the cluster on a small four-node cluster.
Click here to view code image
$ mapred job
Usage: CLI <command> <args>
[-submit <job-file>]
[-status <job-id>]
[-counter <job-id> <group-name> <counter-name>]
[-kill <job-id>]
[-set-priority <job-id> <priority>]. Valid values for priorities
are: VERY_HIGH HIGH NORMAL LOW VERY_LOW
[-events <job-id> <from-event-#> <#-of-events>]
[-history <jobHistoryFile>]
[-list [all]]
[-list-active-trackers]
[-list-blacklisted-trackers]
[-list-attempt-ids <job-id> <task-type> <task-state>]. Valid values
for <task-type> are REDUCE MAP. Valid values for <task-state> are
running, completed
[-kill-task <task-attempt-id>]
[-fail-task <task-attempt-id>]
[-logs <job-id> <task-attempt-id>]
●
● Figure 5.1 Apache Hadoop parallel MapReduce data flow
● The input to the MapReduce application is the following file in HDFS with three lines of text.
The goal is to count the number of times each word is used.
● see spot run
run spot run
see the cat
● The first thing MapReduce will do is create the data splits. For simplicity, each line will be
one split. Since each split will require a map task, there are three mapper processes that count
the number of words in the split. On a cluster, the results of each map task are written to local
disk and not to HDFS. Next, similar keys need to be collected and sent to a reducer process.
The shuffle step requires data movement and can be expensive in terms of processing time.
Depending on the nature of the application, the amount of data that must be shuffled
throughout the cluster can vary from small to large.
● Once the data have been collected and sorted by key, the reduction step can begin (even if
only partial results are available). It is not necessary—and not normally recommended—to
have a reducer for each key–value pair as shown in Figure 5.1. In some cases, a single reducer
will provide adequate performance; in other cases, multiple reducers may be required to speed
up the reduce phase. The number of reducers is a tunable option for many applications. The
final step is to write the output to HDFS.
● As mentioned, a combiner step enables some pre-reduction of the map output data. For
instance, in the previous example, one map produced the following counts:
● (run,1)
(spot,1)
(run,1)
● As shown in Figure 5.2, the count for run can be combined into (run,2) before the shuffle.
This optimization can help minimize the amount of data transfer needed for the shuffle phase.
●
● Figure 5.2 Adding a combiner process to the map step in MapReduce
● The Hadoop YARN resource manager and the MapReduce framework determine the actual
placement of mappers and reducers. As mentioned earlier, the MapReduce framework will try
to place the map task as close to the data as possible. It will request the placement from the
YARN scheduler but may not get the best placement due to the load on the cluster. In general,
nodes can run both mapper and reducer tasks. Indeed, the dynamic nature of YARN enables
the work containers used by completed map tasks to be returned to the pool of available
resources.
● Figure 5.3 shows a simple three-node MapReduce process. Once the mapping is complete, the
same nodes begin the reduce process. The shuffle stage makes sure the necessary data are sent
to each mapper. Also note that there is no requirement that all the mappers complete at the
same time or that the mapper on a specific node be complete before a reducer is started.
Reducers can be set to start shuffling based on a threshold of percentage of mappers that have
finished.
●
● Figure 5.3 Process placement during MapReduce (Adapted from Yahoo Hadoop
Documentation)
● Finally, although the examples are simple in nature, the parallel MapReduce algorithm can be
scaled up to extremely large data sizes. For instance, the Hadoop word count sample
application (see Chapter 6, “MapReduce Programming”) can be run on the three lines given
earlier or on a 3TB file. The application requires no changes to account for the scale of the
problem—a feature that is one of the remarkable advantages of MapReduce processing.
● FAULT TOLERANCE AND SPECULATIVE EXECUTION
● One of the most interesting aspects of parallel MapReduce operation is the strict control of
data flow throughout the execution of the program. For example, mapper processes do not
exchange data with other mapper processes, and data can only go from mappers to
reducers—not the other direction. The confined data flow enables MapReduce to operate in a
fault-tolerant fashion.
● The design of MapReduce makes it possible to easily recover from the failure of one or many
map processes. For example, should a server fail, the map tasks that were running on that
machine could easily be restarted on another working server because there is no dependence
on any other map task. In functional language terms, the map tasks “do not share state” with
other mappers. Of course, the application will run more slowly because work needs to be
redone, but it will complete.
● In a similar fashion, failed reducers can be restarted. However, there may be additional work
that has to be redone in such a case. Recall that a completed reduce task writes results to
HDFS. If a node fails after this point, the data should still be available due to the redundancy
in HDFS. If reduce tasks remain to be completed on a down node, the MapReduce
ApplicationMaster will need to restart the reducer tasks. If the mapper output is not available
for the newly restarted reducer, then these map tasks will need to be restarted. This process is
totally transparent to the user and provides a fault-tolerant system to run applications.
● Speculative Execution
● One of the challenges with many large clusters is the inability to predict or manage
unexpected system bottlenecks or failures. In theory, it is possible to control and monitor
resources so that network traffic and processor load can be evenly balanced; in practice,
however, this problem represents a difficult challenge for large systems. Thus, it is possible
that a congested network, slow disk controller, failing disk, high processor load, or some other
similar problem might lead to slow performance without anyone noticing.
● When one part of a MapReduce process runs slowly, it ultimately slows down everything else
because the application cannot complete until all processes are finished. The nature of the
parallel MapReduce model provides an interesting solution to this problem. Recall that input
data are immutable in the MapReduce process. Therefore, it is possible to start a copy of a
running map process without disturbing any other running mapper processes. For example,
suppose that as most of the map tasks are coming to a close, the ApplicationMaster notices
that some are still running and schedules redundant copies of the remaining jobs on less busy
or free servers. Should the secondary processes finish first, the other first processes are then
terminated (or vice versa). This process is known as speculative execution. The same
approach can be applied to reducer processes that seem to be taking a long time. Speculative
execution can reduce cluster efficiency because redundant resources are assigned to
applications that seem to have a slow spot. It can also be turned off and on in
the mapred-site.xml configuration file (see Chapter 9, “Managing Hadoop with Apache
Ambari”).
● Hadoop MapReduce Hardware
● The capability of Hadoop MapReduce and HDFS to tolerate server—or even whole
rack—failures can influence hardware designs. The use of commodity (typically
x86_64) servers for Hadoop clusters has made low-cost, high-availability implementations of
Hadoop possible for many data centers. Indeed, the Apache Hadoop philosophy seems to
assume servers will always fail and takes steps to keep failure from stopping application
progress on a cluster.
● The use of server nodes for both storage (HDFS) and processing (mappers, reducers) is
somewhat different from the traditional separation of these two tasks in the data center. It is
possible to build Hadoop systems and separate the roles (discrete storage and processing
nodes). However, a majority of Hadoop systems use the general approach where servers enact
both roles. Another interesting feature of dynamic MapReduce execution is the capability to
tolerate dissimilar servers. That is, old and new hardware can be used together. Of course,
large disparities in performance will limit the faster systems, but the dynamic nature of
MapReduce execution will still work effectively on such systems.
● SUMMARY AND ADDITIONAL RESOURCES
● The Apache Hadoop MapReduce framework is a powerful yet simple computation model that
can be scaled from one to thousands of processors. The functional nature of MapReduce
enables scalable operation without the need to modify the user’s application. In essence, the
programmer can focus on the application requirements and not the parallel execution
methodology.
● Parallel MapReduce data flow is easily understood by examining the various component steps
and identifying how key–value pairs traverse the cluster. The Hadoop MapReduce design also
makes possible transparent fault tolerance and possible optimizations through speculative
execution. Further information on Apache Hadoop MapReduce can be found from the
following sources:
● https://developer.yahoo.com/hadoop/tutorial/module4.html (based on Hadoop version 1, but
still a good MapReduce background)
● http://en.wikipedia.org/wiki/MapReduce
● http://research.google.com/pubs/pub36249.html
In This Chapter:
The classic Java WordCount program for Hadoop is compiled and run.
A Python WordCount application using the Hadoop streaming interface is introduced.
The Hadoop Pipes interface is used to run a C++ version of WordCount.
An example of MapReduce chaining is presented using the Hadoop Grep example.
Strategies for MapReduce debugging are presented.
At the base level, Hadoop provides a platform for Java-based MapReduce programming. These
applications run natively on most Hadoop installations. To offer more variability, a streaming
interface is provided that enables almost any programming language to take advantage of the Hadoop
MapReduce engine. In addition, a pipes C++ interface is provided that can work directly with the
MapReduce components. This chapter provides programming examples of these interfaces and
presents some debugging strategies.
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
WordCount is a simple application that counts the number of occurrences of each word in a given
input set. The example will work with all installation methods presented in Chapter 2, “Installation
Recipes” (i.e., HDP Sandbox, pseudo-distributed, full cluster, or cloud).
As discussed in Chapter 5, the MapReduce framework operates exclusively on key–value pairs; that
is, the framework views the input to the job as a set of key–value pairs and produces a set of
key–value pairs of different types. The MapReduce job proceeds as follows:
Click here to view code image
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3>
(output)
The mapper implementation, via the map method, processes one line at a time as provided by the
specified TextInputFormat class. It then splits the line into tokens separated by whitespaces using
the StringTokenizer and emits a key–value pair of <word, 1>. The relevant code section is as follows:
Click here to view code image
Given two input files with contents Hello World Bye World and Hello Hadoop Goodbye Hadoop, the
WordCount mapper will produce two maps:
job.setMapperClass(TokenizerMapper.class);
a combiner
Click here to view code image
job.setCombinerClass(IntSumReducer.class);
and a reducer
Click here to view code image
job.setReducerClass(IntSumReducer.class);
Hence, the output of each map is passed through the local combiner (which sums the values in the
same way as the reducer) for local aggregation and then sends the data on to the final reducer. Thus,
each map above the combiner performs the following pre-reductions:
The reducer implementation, via the reduce method, simply sums the values, which are the
occurrence counts for each key. The relevant code section is as follows:
Click here to view code image
The source code for WordCount.java is available from the book download page (see Appendix A,
“Book Webpage and Code Download”). To compile and run the program from the command line,
perform the following steps:
1. Make a local wordcount_classes directory.
$ mkdir wordcount_classes
2. Compile the WordCount.java program using the 'hadoop classpath' command to include all the
available Hadoop class paths.
Click here to view code image
In addition, the following files should be in the war-and-peace-output directory. The actual file name
may be slightly different depending on your Hadoop version.
Click here to view code image
The complete list of word counts can be copied from HDFS to the working directory with the
following command:
Click here to view code image
If the WordCount program is run again using the same outputs, it will fail when it tries to overwrite
the war-and-peace-output directory. The output directory and all contents can be removed with the
following command:
Click here to view code image
#!/usr/bin/env python
import sys
#!/usr/bin/env python
current_word = None
current_count = 0
word = None
The operation of the mapper.py script can be observed by running the command as shown in the
following:
Click here to view code image
Piping the results of the map into the sort command can create a simulated shuffle phase:
Click here to view code image
$ echo "foo foo quux labs foo bar quux" | ./mapper.py|sort -k1,1
Bar 1
Foo 1
Foo 1
Foo 1
Labs 1
Quux 1
Quux 1
Finally, the full MapReduce process can be simulated by adding the reducer.py script to the following
command pipeline:
Click here to view code image
$ echo "foo foo quux labs foo bar quux" | ./mapper.py|sort -k1,1|./reducer.py
Bar 1
Foo 3
Labs 1
Quux 2
To run this application using a Hadoop installation, create, if needed, a directory and move
the war-and-peace.txt input file into HDFS:
Click here to view code image
Make sure the output directory is removed from any previous test runs:
Click here to view code image
Locate the hadoop-streaming.jar file in your distribution. The location may vary, and it may contain a
version tag. In this example, the Hortonworks HDP 2.2 distribution was used. The following
command line will use the mapper.py and reducer.py to do a word count on the input file.
Click here to view code image
The output will be the familiar (_SUCCESS and part-00000) in the war-and-peace-outputdirectory.
The actual file name may be slightly different depending on your Hadoop version. Also note that the
Python scripts used in this example could be Bash, Perl, Tcl, Awk, compiled C code, or any language
that can read and write from stdin and stdout.
Although the streaming interface is rather simple, it does have some disadvantages over using Java
directly. In particular, not all applications are string and character based, and it would be awkward to
try to use stdin and stdout as a way to transmit binary data. Another disadvantage is that many tuning
parameters available through the full Java Hadoop API are not available in streaming.
Listing 6.4 wordcount.cpp and Example of Hadoop Pipes Interface Using C++
#include <algorithm>
#include <limits>
#include <string>
#include "stdint.h" // <--- to prevent uint64_t errors!
#include "Pipes.hh"
#include "TemplateFactory.hh"
#include "StringUtils.hh"
The wordcount.cpp source is available from the book download page (see Appendix A) or
from http://wiki.apache.org/hadoop/C++WordCount. The location of the Hadoop include files and
libraries may need to be specified when compiling the code. If $HADOOP_HOME is defined, the
following options should provide the correct path. Check to make sure the paths are correct for your
installation.
Click here to view code image
-L$HADOOP_HOME/lib/native/ -I$HADOOP_HOME/include
Additionally, the original source code may need to be changed depending on where the include files
are located (i.e., some distributions may not use the hadoop prefix). In Listing 6.4, the following lines
(from the original program) had the hadoop prefix removed:
Click here to view code image
#include "hadoop/Pipes.hh"
#include "hadoop/TemplateFactory.hh"
#include "hadoop/StringUtils.hh"
The program can be compiled with the following line (adjusted for include file and library locations).
In this example, use of Hortonworks HDP 2.2 is assumed.
Click here to view code image
If needed, create the war-and-peace-input directory and move the file into HDFS:
Click here to view code image
As mentioned, the executable must be placed into HDFS so YARN can find the program. Also, the
output directory must be removed before running the program:
Click here to view code image
To run the program, enter the following line (shown in multiple lines for clarity). The lines specifying
the recordreader and recordwriter indicate that the default Java text versions should be used. Also
note that the location of the program in HDFS must be specified.
Click here to view code image
$ mapred pipes \
-D hadoop.pipes.java.recordreader=true \
-D hadoop.pipes.java.recordwriter=true \
-input war-and-peace-input \
-output war-and-peace-output \
-program bin/wordcount
When run, the program will produce the familiar output (_SUCCESS and part-00000) in
the war-and-peace-output directory. The part-00000 file should be identical to the Java WordCount
version.
$ jar xf hadoop-mapreduce-examples-2.6.0-sources.jar
package org.apache.hadoop.examples;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.map.RegexMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
In the preceding code, each mapper of the first job takes a line as input and matches the user-provided
regular expression against the line. The RegexMapper class is used to perform this task and extracts
text matching using the given regular expression. The matching strings are output as <matching
string, 1> pairs. As in the previous WordCount example, each reducer sums up the number of
matching strings and employs a combiner to do local sums. The actual reducer uses
the LongSumReducer class that outputs the sum of long values per reducer input key.
The second job takes the output of the first job as its input. The mapper is an inverse map that
reverses (or swaps) its input <key, value> pairs into <value, key>. There is no reduction step, so
the IdentityReducer class is used by default. All input is simply passed to the output. (Note: There is
also an IdentityMapper class.) The number of reducers is set to 1, so the output is stored in one file
and it is sorted by count in descending order. The output text file contains a count and a string per
line.
The example also demonstrates how to pass a command-line parameter to a mapper or a reducer.
The following discussion describes how to compile and run the Grep.java example. The steps are
similar to the previous WordCount example:
1. Create a directory for the application classes as follows:
$ mkdir Grep_classes
As always, make sure the output directory has been removed by issuing the following command:
Click here to view code image
DEBUGGING MAPREDUCE
The best advice for debugging parallel MapReduce applications is this: Don’t. The key word here
is parallel. Debugging on a distributed system is hard and should be avoided at all costs.
The best approach is to make sure applications run on a simpler system (i.e., the HDP Sandbox or the
pseudo-distributed single-machine install) with smaller data sets. Errors on these systems are much
easier to locate and track. In addition, unit testing applications before running at scale is important. If
applications can run successfully on a single system with a subset of real data, then running in
parallel should be a simple task because the MapReduce algorithm is transparently scalable. Note that
many higher-level tools (e.g., Pig and Hive) enable local mode development for this reason. Should
errors occur at scale, the issue can be tracked from the log file (see the section “Hadoop Log
Management”) and may stem from a systems issue rather than a program artifact.
When investigating program behavior at scale, the best approach is to use the application logs to
inspect the actual MapReduce progress. The time-tested debug print statements are also visible in the
logs.
If log aggregation is not enabled, the logs will be placed locally on the cluster nodes on which the
mapper or reducer ran. The location of the unaggregated local logs is given by
the yarn.nodemanager.log-dirs property in the yarn-site.xml file. Without log aggregation, the cluster
nodes used by the job must be noted, and then the log files must be obtained directly from the nodes.
Log aggregation is highly recommended.
2. Add the following properties in the yarn-site.xml (on all nodes) and restart all YARN services on
all nodes (the ResourceManager, NodeManagers, and JobHistoryServer).
Click here to view code image
<property>
<name>yarn.nodemanager.remote-app-log-dir</name>
<value>/yarn/logs</value>
</property>
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
$ yarn logs
Retrieve logs for completed YARN applications.
usage: yarn logs -applicationId <application ID> [OPTIONS]
After the pi example completes, note the applicationId, which can be found either from the
application output or by using the yarn application command. The applicationId will start
with application_ and appear under the Application-Id column.
Click here to view code image
Next, run the following command to produce a dump of all the logs for that application. Note that the
output can be long and is best saved to a file.
Click here to view code image
The AppOut file can be inspected using a text editor. Note that for each container, stdout, stderr,
and syslog are provided (the same as the GUI version in Figure 6.1). The list of actual containers can
be found by using the following command:
[...]
Container: container_1432667013445_0001_01_000008 on limulus_45454
====================================================================
--
Container: container_1432667013445_0001_01_000010 on limulus_45454
====================================================================
--
Container: container_1432667013445_0001_01_000001 on n0_45454
===============================================================
--
Container: container_1432667013445_0001_01_000023 on n1_45454
===============================================================
[...]
A specific container can be examined by using the containerId and the nodeAddress from the
preceding output. For example, container_1432667013445_0001_01_000023 can be examined by
entering the command following this paragraph. Note that the node name (n1) and port number are
written as n1_45454 in the command output. To get the nodeAddress, simply replace the _ with
a : (i.e., -nodeAddress n1:45454). Thus, the results for a single container can be found by entering
this line:
Click here to view code image