(17CS82) 8 Semester CSE: Big Data Analytics
(17CS82) 8 Semester CSE: Big Data Analytics
(17CS82) 8 Semester CSE: Big Data Analytics
(17CS82)
8th Semester CSE
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
• Distributed
• Parallel Computation
• Replication
• Fault tolerance
• Streaming Data Access
• Portable
Features of HDFS
• Distributed and Parallel Computation - This is one of the most important features
of HDFS that makes Hadoop very powerful. Here, data is divided into multiple blocks
and stored into nodes.
Features of HDFS
• Distributed and Parallel Computation.
• Let’s understand this concept with an example. Suppose, it takes 45
minutes to process 1 TB file on a single machine.
• So, how much time will it take to process the same 1 TB file when
you have 10 machines in a Hadoop cluster with similar
configuration – 45 minutes or 4.5 minutes? 4.5 minutes, Right!
• What happened here? Each of the nodes is working with a part of
the 1 TB file in parallel.
• Therefore, the work which was taking 45 minutes before, gets
finished in just 4.5 minutes now as the work got divided over ten
machines.
Features of HDFS
• Highly Scalable - HDFS is highly scalable as it can scale hundreds of nodes in a single
cluster.
Features of HDFS
• There are two types of scaling: vertical and horizontal.
• In vertical scaling (scale up), you increase the hardware capacity of your
system.
• In other words, you procure more RAM or CPU and add it to your existing
system to make it more robust and powerful.
• But there are challenges associated with vertical scaling or scaling up:
capacity.
• So, you can’t keep on increasing the RAM or CPU of the machine.
– In vertical scaling, you stop your machine first. Then you increase the
RAM or CPU to make it a more robust hardware stack. After you have
increased your hardware capacity, you restart the machine. This down
time when you are stopping your system becomes a challenge.
Features of HDFS
• In case of horizontal scaling (scale out), you add more nodes to
existing cluster instead of increasing the hardware capacity of
individual machines.
• And most importantly, you can add more machines on the go i.e.
Without stopping the system.
• Therefore, while scaling out we don’t have any down time
or green zone, nothing of such sort.
• At the end of the day, you will have more machines working in
parallel to meet your requirements.
Features of HDFS
• Replication - Due to some unfavorable conditions, the node containing the
data may be failed to work. So, to overcome such problems, HDFS always
maintains the copy of data on a different machine.
• Fault tolerance - In HDFS, the fault tolerance signifies the robustness of the
system in the event of failure. The HDFS is highly fault-tolerant that if any
machine fails, the other machine containing the copy of that data
automatically become active.
• Portable - HDFS is designed in such a way that it can easily portable from
platform to another.
• Streaming Data Access: The write-once/read-many design is intended to
facilitate streaming reads.
Components and Architecture
Hadoop Distributed File System (HDFS)
HDFS Components
• The design of HDFS is based on two types of nodes: a
NameNode and multiple DataNodes.
• No data is actually stored on the NameNode.
• A single NameNode manages all the metadata needed to
store and retrieve the actual data from the DataNodes.
• For a minimal Hadoop installation, there needs to be a
single NameNode daemon and a single DataNode daemon
running on at least one machine
HDFS
Components
HDFS Components
• The design of HDFS follows a master/slave architecture
• Master Node is NameNode and Slave Node is DataNode.
• The master node (NameNode) manages the file system
namespace and regulates access to files by clients.
• File system namespace operations such as opening, closing, and
renaming files and directories are all managed by the
NameNode.
• The NameNode also determines the mapping of blocks to
DataNodes and handles DataNode failures.
HDFS
Components
HDFS Components
• The slaves (DataNodes) are responsible for serving read and
write requests from the file system to the clients. The
NameNode manages block creation, deletion, and replication.
• When a client wants to writes data, it first communicates with
the NameNode and requests to create a file.
• The NameNode determines how many blocks are needed
and provides the client with the DataNodes that will store
the data.
• As part of the storage process, the data blocks are
replicated after they are written to the assigned node.
HDFS
Components
HDFS Components
• Reading data happens in a similar fashion.
• The client requests a file from the NameNode, which returns the best
DataNodes from which to read the data. The client then accesses the data
directly from the DataNodes.
• Thus, once the metadata has been delivered to the client, the NameNode
steps back and lets the conversation between the client and the
DataNodes proceed.
• While data transfer is progressing, the NameNode also monitors the
DataNodes by listening for heartbeats sent from DataNodes.
• The lack of a heartbeat signal indicates a potential node failure. In such a
case, the NameNode will route around the failed DataNode and begin re-
replicating the now-missing blocks.
HDFS
Components
HDFS Components
• The mappings between data blocks and the physical DataNodes
are not kept in persistent storage on the NameNode.
• For performance reasons, the NameNode stores all metadata in
memory.
• Upon startup, each DataNode provides a block report to the
NameNode.
• The block reports are sent every 10 heartbeats. (The interval
between reports is a configurable property.)
• The reports enable the NameNode to keep an up-to-date
account of all data blocks in the cluster.
HDFS
Components
HDFS Components
hdfs dfsadmin-safemode
Rack Awareness
• Rack awareness deals with data locality.
• Assume that there are 6 DataNode’s in the hadoop cluster
Rack Awareness
• A typical Hadoop cluster will exhibit three levels of data
locality:
–Data resides on the local machine (best).
• Best performance but suffers from single point of failure
–Data resides in the same rack (better).
• Better performance but suffers from single point of failure
–Data resides in a different rack (good).
• Good performance
Rack Awareness
• The NameNode tries to place replicated data blocks on
multiple racks for improved fault tolerance.
• In such a case, an entire rack failure will not cause data
loss or stop HDFS from working.
• However performance may be degraded.
• A default Hadoop installation assumes all the nodes
belong to the same (large) rack.
NameNode High Availability
• With early Hadoop installations, the NameNode was a single
point of failure that could bring down the entire Hadoop cluster.
• NameNode hardware often employed redundant power supplies
and storage to guard against such problems, but it was still
susceptible to other failures.
• The solution was to implement NameNode High Availability (HA)
as a means to provide true failover service.
• An NameNode High Availability Hadoop cluster has two (or more)
separate NameNode machines.
Each machine is configured with exactly the same software.
NameNode High Availability
• One of the NameNode machines is in the Active state, and the
other is in the Standby state.
• Like a single NameNode cluster, the Active NameNode is
responsible for all client HDFS operations in the cluster.
• The Standby NameNode maintains enough state to provide a fast
failover (if required).
• To guarantee the file system state is preserved, both the Active
and Standby NameNodes receive block reports from the
DataNodes.
NameNode High Availability
• The Active node also sends all file system edits to a quorum of Journal
nodes.
• At least three physically separate Journal Node daemons are required,
because edit log modifications must be written to a majority of the Journal
Nodes.
• This design will enable the system to tolerate the failure of a single Journal
Node machine.
• The Standby node continuously reads the edits from the Journal Nodes to
ensure its namespace is synchronized with that of the Active node.
• In the event of an Active NameNode failure, the Standby node reads all
remaining edits from the JournalNodes before promoting itself to the
Active state.
NameNode High Availability
• To prevent confusion between NameNodes, the JournalNodes
allow only one NameNode to be a writer at a time. During
failover, the NameNode that is chosen to become active takes
over the role of writing to the JournalNodes.
• A SecondaryNameN ode is not required in the HA configuration
because the Standby node also performs the tasks of the
Secondary NameNode.
• Apache Zookeeper is used to monitor the NameNode health.
• HDFS failover relies on ZooKeeper for failure detection and for
Standby to Active Name Node election
HDFS NameNode Federation
• Older versions of HDFS provided a single namespace for the entire cluster managed
by a single NameNode.
• Thus, the resources of a single NameNode determined the size of the namespace.
• Federation addresses this limitation by adding support for multiple NameNodes /
namespaces to the HDFS file system.
• The key benefits are as follows:
– Namespace scalability. HDFS cluster storage scales horizontally without placing a
burden on the NameNode.
– Better performance. Adding more NameNodes to the cluster scales the file
system read/write operations throughput by separating the total namespace.
– System isolation. Multiple NameNodes enable different categories of
applications to be distinguished, and users can be isolated to different
namespaces.
HDFS NameNode Federation
HDFS Checkpoints and Backups
• The NameNode stores the metadata informaion of the
HDFS file system in a file called fsimage.
• File systems modifications are written to an edits log file,
and at startup the NameNode merges the edits into a new
fsimage.
• The SecondaryNameNode or CheckpointNode periodioally
fetches edits from the NameNode, merges them, and
returns an updated fsimage to the NameNode.
HDFS Checkpoints and Backups
• An HDFS BackupNode is similar, but also maintains an up-to-
date copy of the file system namespace both in memory and on
disk.
• Unlike a CheckpointNode, the BackupNode does not need to
download the fsimage and edits files from the active
NameNode because it already has an up-to-date namespace
state in memory.
• A NameNode supports one BackupNode at a time.
• No CheckpointNodes may be registered if a Backup node is in
use.
HDFS Snapshots
• HDFS snapshots are similar to backups, but are created by administrators using
the hdfs dfs -snapshot command.
• HDFS snapshots are read-only point-in-time copies of the file system.
• They offer the following features:
– Snapshots can be taken of a sub-tree of the file system or the entire file
system.
– Snapshots can be used for data backup, protection against user errors, and
disaster recovery.
– Snapshot creation is instantaneous.
– Blocks on the DataNodes are not copied, because the snapshot files record
the block list and the file size. There is no data copying, although it appears to
the user that there are duplicate files
HDFS User Commands
Hadoop Distributed File System (HDFS)
HDFS USER COMMANDS
• The preferred way to interact with HDFS in Hadoop version 2 is
through the hdfs command
• Previously, in version 1 and subsequently in many Hadoop
examples, the hadoop dfs command was used to manage files
in HDFS.
• The hadoop dfs command will still work in version 2, but its use
will cause a message to be displayed indicating that the use
of hadoop dfs is deprecated
HDFS User Commands
General HDFS commands
• hdfs [--config confdir] COMMAND
• hdfs version
–Hadoop 2.6.0.2.2.4.3-2
• hdfs –dfs
–List all comands in HDFS
options that are available for the hdfs command
options that are available for the hdfs command
General HDFS Commands
General HDFS Commands
General HDFS Commands
HDFS User Commands
Lists files in HDFS
• hdfs dfs –ls /
–Lists files in the root HDFS directory
– First Person:
<Hello, 1>
<World, 2>
< Hadoop, 1>
– Second Person:
<Hello, 1>
<Hadoop, 2>
<Goodbye, 1>
MapReduce Model
• The reduce phase happens when everyone is done counting and
reducer sum the total of each word as each one of them tell their
counts.
<Hello, 2>
<World, 2>
< Hadoop, 3>
<Goodbye, 1>
MapReduce Parallel Data Flow
MapReduce Parallel Data Flow
MapReduce Parallel Data Flow
1. Input Splits
– HDFS distributes and replicates data over multiple
servers called DatNodes.
– The default data chunk or block size is 64MB.
– Thus, a 150MB file would be broken into 3 blocks and
written to different machines in the cluster.
– The data are also replicated on multiple machines
(typically three machines).
MapReduce Parallel Data Flow
2. Map Step
– The mapping process is where the parallel nature of Hadoop comes into play.
– For large amounts of data, many mappers can be operating at the same time.
– The user provides the specific mapping process.
– MapReduce will try to execute the mapper on the machines where the block
resides.
– Because the file is replicated in HDFS, the least busy node with the data will be
chosen.
– If all nodes holding the data are too busy, MapReduce will try to pick a node that is
closest to the node that hosts the data block (a characteristic called
rack awareness).
– The last choice is any node in the cluster that has access to HDFS.
MapReduce Parallel Data Flow
3. Combiner step.
– It is possible to provide an optimization or pre-reduction as part of the map stage
where key—value pairs are combined prior to the next stage. The combiner stage
is optional.
– Let the text at mapper is:
• Hello World Hadoop World
– Output of Mapper
- Output of Combiner
<Hello, 1> <Hello, 1>
<World, 1> <World, 2>
< Hadoop, 1> < Hadoop, 1>
<World, 1>
MapReduce Parallel Data Flow
MapReduce Parallel Data Flow
4. Shuffle step.
– Before the parallel reduction stage can complete, all similar
keys must be combined and counted by the same reducer
process.
– Therefore, results of the map stage must be collected by key—
value pairs and shuffled to the same reducer process. If only a
single reducer process is used, the Shuffle stage is not needed.
MapReduce Parallel Data Flow
5. Reduce Step.
– The final step is the actual reduction.
– In this stage, the data reduction is performed as per the
programmer's design.
– The reduce step is also optional.
– The results are written to HDFS. Each reducer will write an
output file.
– For example, a MapReduce job running two reducers will
create files called part-0000 and part-0001
MapReduce Parallel Data Flow
MapReduce Programming
Mapper Script (Shell Script)
#!/bin/bash #!/bin/bash
While read line ; Rcount=0
do Scount=0
for token in $line; While read line ;
do do
if [“$token” = “Ram”]; if [ $line =“Ram, 1”];
then then
echo “Ram, 1” Rcount = Rcount+1
elif [“$token” = “Sita”]; elif [ $line =“Sita, 1”];
then then
echo “Sita, 1” Scount = Scount+1
fi fi
done Done
done echo “Ram, $Rcount”
echo “Sita, $Scount”
To compile and run the program from the command line,
perform the following steps:
1. Make a local wordcount_classes directory.
$ mkdir wordcount_classes
2.Compile the WordCount program using the 'hadoop classpath’ command to include all the
available Hadoop class paths.
$ javac -cp ’hadoop classpath' -d wordcount_classes WordCount.java
3. The jar file can be created using the following command:
$ jar -cvf wordcount.jar -C wordcount_classes/
4.To run the example, create an input directory in HDFS and place a text file in the new directory.
For this example, we will use the war-and-peace.txt file (available from the book download page;
see Appendix A):
$ hdfs dfs -mkdir /Demo
$ hdfs dfs -put input. txt /Demo
5. Run the WordCount application using the following command:
$ hadoop jar wordcount.jar WordCount /Demo/input /output
Debugging MapReduce Applications
Hadoop Distributed File System (HDFS)
Debugging Parallel MapReduce
• The best advice for debugging parallel MapReduce applications is this:
Don't.
• The key word here is parallel.
• Debugging on a distributed system is hard and should be avoided.
• The best approach is to make sure applications run on a simpler
system (i.e., the pseudo-distributed single-machine install) with
smaller data sets.
• When investigating program behavior at scale, the best approach is to
use the application logs to inspect the actual MapReduce progress .
• The time-tested debug print statements are also visible in the logs.
Listing, Killing, and Job Status
• The jobs can be managed using the mapred job command.
• The most import options are -list, -kill, and -status.
– mapred –list
– mapred –kill AppID
–mapred –status AppID
• In addition, the yarn application command can be used to control
all applications running on the cluster.
Hadoop Log Management
• The MapReduce logs provide a comprehensive listing of both mappers and
reducers.
• The actual log output consists of three files - stdout, stderr, and syslog for the
application.
• There are two modes for log storage.
• The first (and best) method is to use log aggregation.
• In this mode, logs are aggregated in HDFS and can be displayed in the
YARN ResourceManager user interface or examined with the yarn logs
command.
Second, If log aggregation is not enabled, the logs will be placed locally on the
cluster nodes on which the mapper or reducer ran. The location of the
unaggregated local logs is given by the yarn.nodemanager.log-dirs property in
the yarn-site.xml file
Enabling YARN Log Aggregation
• To manually enable log aggregation, follows these steps:
• As the HDFS superuser administrator (usually user hdfs), create the
following directory in HDFS:
<property>
<name>yarn.nodemanager.remote-app-log-dir</name>
<value>/yarn/logs</value>
</property>
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
Web Interface Log View
• The most convenient way to view logs is to use the
YARN ResourceManager web user interface.
• In the figure, the contents of stdout, stderr, and syslog are
displayed on a single page.
• If log aggregation is not enabled, a message stating that the logs
are not available will be displayed.
• The follwing URL is used to launch the web Interface
http://localhost:8088/
Hadoop Log Management
Hadoop Log Management
Hadoop Log Management
Command-Line Log Viewing
• MapReduce logs can also be viewed from the command line.
• The yarn logs command enables the logs to be easily viewed together without
having to hunt for individual log files on the cluster nodes.
• As before, log aggregation is required for use.
• The options to yarn logs are as follows:
$ yarn logs
Retrieve logs for completed YARN applications .
usage: yarn logs -applicationld <application ID> (OPTIONS)
• general options are:
• -appOwner <Application Owner>
• -container Id <Container ID>
• -nodeAddress <Node Address>
Command-Line Log Viewing
• For example, after running the WordCount example program
• Next, run the following command to produce a dump of all the logs for
that application. Note that the output can be long and is best saved to a
file.
$ yarn logs -applicationld application_1432667013445_0001 > AppOut
Note:
• If you run program again it wont work because /war-and-peace-output
exists.
• Hadoop will not overwrite files!
Hadoop MapReduce
WordCount Program using Streaming
Interface in Python
WordCount Program using Streaming Interface in Python
• Content of Input text file:
• foo foo linux labs foo bar linux
#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
print '%s\t%s' % (word, 1)
WordCount Program using Streaming Interface in Python
Reducer Program
#!/usr/bin/env python
from operator import itemgetter
import syscurrent_word = None
current_count = 0
word = None
for line in sys.stdin:
line = line.strip()
word, count = line.split('\t', 1)
count = int(count)
if current_word == word:
current_count += count
else:
if current_word:
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
if current_word == word:
print '%s\t%s' % (current_word, current_count)
Steps to execute WordCount Program using Streaming
Interface in Python
1. Create a directory and move the file into HDFS
hdfs dfs -mkdir war-and-peace-input
hdfs dfs -put war-and-peace.txt war-and-peace-input
• Output of Mapper:
<foo, 1> Output of Reducer:
<foo, 1> <bar, 1>
<linux, 1> <foo, 3>
<labs, 1>
<labs, 1> <linux, 2>
<foo, 1>
<bar, 1>
<linux, 1>
WordCount Program in C++ using Pipes interface
#include <string>
#include "stdint.h" // <--- to prevent uint64_t errors!
#include "Pipes.hh“
#include "StringUtils.hh“
mapred pipes \
-D hadoop.pipes.java.recordreader=true \
-D hadoop.pipes.java.recordwriter=true \
-input war-and-peace.txt \
-output war-and-peace-output \
-program bin/wordcount