CS19741-Cloud Computing-Unit 3 Notes

Download as pdf or txt
Download as pdf or txt
You are on page 1of 37

UNIT III DISTRIBUTED PROGRAMMING MODEL 9

Design of HDFS, Concepts and Java Interface, Dataflow of File read & File write, Map Reduce, Input
splitting, map and reduce functions, Specifying input and output parameters, Configuring and Running a
Job. Hadoop Vs Spark.
Case Study: Design and Implementation of Hive, Pig, HBase.

HADOOP
Hadoop is an open source distributed processing framework that manages data processing and
storage for big data applications in scalable clusters of computer servers.

It's at the center of big data technologies that are primarily used to support advanced analytics
initiatives, including predictive analytics, data mining and machine learning.

Hadoop systems can handle various forms of


Structured, and
unstructured data,
users more flexibility,
collecting, processing,
analyzing and
managing data than relational databases & data warehouses provide.

Hadoop's ability to process and store different types of data


good fit for big data environments.

They typically involve not only large amounts of data, a mix of

Structured transaction data: Structured data is organized into entities thathave


a defined format.

Semistructured: though there may be a schema, it is often ignored

Unstructured information: does not have any particular internal structure; for
example, plain text or image data, such as internet clickstream records, web server
and mobile application logs, social media posts, customer emails and sensor data
from the internet of things (IoT).

Hadoop works well on unstructured or semi-structured data because it is designed to


interpret the data at processing time (so called schema-on-read).

Data Storage and Analysis


This is a long time to read all data on a single drive—and writing is even slower.

Raid (redundant array of independent disks) is a way of storing the same data in different places
(thus, redundantly) on multiple hard disks.
RAID (redundant array of independent disks) is a data storage virtualization technology that
combines multiple physical disk drive components into a single logical unit for the purposes of data
redundancy, performance improvement, or both.

HDFS clusters do not benefit from using RAID (Redundant Array of Independent Disks) for data
node storage (although RAID is recommended for the name node’s disks to protect against corruption of
it‘s metadata). The redundancy that RAID provides is not needed, since HDFS handles it by replication
between nodes.

The Hadoop Distributed File System (HDFS) is the primary storage system used
by Hadoop applications.
Not for data nodes. For some master nodes processes like Hive Metastore, yes

Processing Techniques:
Batch Processing: Processing of previously collected jobs in a single batch.

The component to provide online access was HBase, a key-value store that uses HDFS for its
underlying storage. HBase provides both online read/write access of individual rows and batch operations
for reading and writing data in bulk, making it a
good solution for building applications on.

Other Different processing patterns:


Interactive SQL, Iterative processing, Stream processing, and Search

Traditional RDBMS MapReduce


Data size Gigabytes Petabytes
Access Interactive Batch
Updates Read and write many times Write once, read many times
Transactions ACID None
Structure Schema-on-write Schema-on-read
Integrity High Low
Scaling Nonlinear Linear

HDFS - Hadoop Distributed File System (HDFS)


The real enabler for new processing models in Hadoop was the introduction of YARN (which
stands for Yet Another Resource Negotiator) in Hadoop 2. YARN is a cluster resource management
system, which allows any distributed program (not just MapReduce) to run on data in a Hadoop cluster.

Hadoop Related Subprojects


• Pig: High-level language for data analysis
• HBase: Table storage for semi-structured data
• Zookeeper: Coordinating distributed applications
• Hive: SQL-like Query language and Metastore
• Mahout: Machine learning

Goals of HDFS
• Very Large Distributed File System
– 10K nodes, 100 million files, 10PB
• Assumes Commodity Hardware
– Files are replicated to handle hardware failure,
– Detect failures and recover from them
• Optimized for Batch Processing
– Data locations exposed so that computations can move to where data resides
– Provides very high aggregate bandwidth
Distributed File System
• Single Namespace for entire cluster
• Data Coherency
– Write-once-read-many access model
– Client can only append to existing files
• Files are broken up into blocks
– Typically 64MB block size
– Each block replicated on multiple DataNodes
• Intelligent Client
– Client can find location of blocks
– Client accesses data directly from DataNode

HDFS Architecture

Functions of a NameNode
• Manages File System Namespace
– Maps a file name to a set of blocks
– Maps a block to the DataNodes where it resides
• Cluster Configuration Management
• Replication Engine for Blocks

NameNode Metadata
• Metadata in Memory
– The entire metadata is in main memory
– No demand paging of metadata
• Types of metadata
– List of files
– List of Blocks for each file
– List of DataNodes for each block
– File attributes, e.g. creation time, replication factor
• A Transaction Log
– Records file creations, file deletions etc

DataNode
• A Block Server
– Stores data in the local file system (e.g. ext3)
– Stores metadata of a block (e.g. CRC)
– Serves data and metadata to Clients
• Block Report
– Periodically sends a report of all existing blocks to the NameNode
• Facilitates Pipelining of Data
– Forwards data to other specified DataNodes
Block Placement
• Current Strategy
– One replica on local node
– Second replica on a remote rack
– Third replica on same remote rack
– Additional replicas are randomly placed
• Clients read from nearest replicas
• Would like to make this policy pluggable

Heartbeats
• DataNodes send heartbeat to the NameNode: Once every 3 seconds
• NameNode uses heartbeats to detect DataNode failure

Replication Engine
• NameNode detects DataNode failures
– Chooses new DataNodes for new replicas
– Balances disk usage
– Balances communication traffic to DataNodes

Data Correctness
• Use Checksums to validate data: Use CRC32
• File Creation
– Client computes checksum per 512 bytes
– DataNode stores the checksum
• File access
– Client retrieves the data and checksum from DataNode
– If Validation fails, Client tries other replicas

NameNode Failure
• A single point of failure
• Transaction Log stored in multiple directories
– A directory on the local file system
– A directory on a remote file system (NFS/CIFS)
• Need to develop a real HA solution

Data Pieplining
• Client retrieves a list of DataNodes on which to place replicas of a block
• Client writes block to the first DataNode
• The first DataNode forwards the data to the next node in the Pipeline
• When all replicas are written, the Client moves on to write the next block in file

Rebalancer
• Goal: % disk full on DataNodes should be similar
– Usually run when new DataNodes are added
– Cluster is online when Rebalancer is active
– Rebalancer is throttled to avoid network congestion
– Command line tool

Secondary NameNode
• Copies FsImage and Transaction Log from Namenode to a temporary directory
• Merges FSImage and Transaction Log into a new FSImage in temporary directory
• Uploads new FSImage to the NameNode: Transaction Log on NameNode is purged
Java Interface, Dataflow of File read & File write,

Java Interface
The Hadoop FileSystem class: the API for interacting with one of Hadoop‘s filesystems.
Although we focus mainly on the HDFS implementation, DistributedFileSystem, in general you should
strive to write your code against the FileSystem abstract class, to retain portability across filesystems. This
is very useful when testing your program, for example, because you can rapidly run tests using data stored
on the local filesystem. Hadoop filesystem is by using a java.net.URL object to open a stream to read the
data from.
Table 3.1. Hadoop filesystems
Commads for HDFS User:
% hadoop dfs -mkdir /foodir
% hadoop dfs -cat /foodir/myfile.txt
% hadoop dfs -rm /foodir/myfile.txt
Commands for HDFS Administrator
% hadoop dfsadmin -report
% hadoop dfsadmin -decommision datanodename

Web Interface: http://host:port/dfshealth.jsp

Create a directory
To see how it is displayed in the listing:
% hadoop fs -mkdir books
% hadoop fs -ls .
Found 2 items
drwxr-xr-x - tom supergroup 0 2014-10-04 13:22 books
-rw-r ----- 1 tom supergroup 119 2014-10-04 13:21 quangle.txt
Example: rw- r-- ---
owner,group,Others mode
Copying a file from the local filesystem to HDFS:
% hadoop fs -copyFromLocal input/docs/quangle.txt \hdfs://localhost/user/tom/quangle.txt
% hadoop fs –l.
quangle.txt

Reading Data from a Hadoop URL


InputStream in =
null; try {
in = new URL("hdfs://host/path").openStream();
// process in
} finally {
IOUtils.closeStream(in);
}

Displaying files from a Hadoop filesystem on standard output using a URLStreamHandler


public class URLCat {
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) throws
Exception { InputStream in = null;
try {
in = new URL(args[0]).openStream();
IOUtils.copyBytes(in, System.out, 4096,
false);
} finally {
IOUtils.closeStream(i
n);
}
}
}
IOUtils class
IOUtils class that comes with Hadoop for closing the stream in the finally clause, and also for
copying bytes between the input stream and the output stream (System.out, in this case). The last two
arguments to the copyBytes() method are the buffer size used for copying and whether to close the streams
when the copy is complete.

% export HADOOP_CLASSPATH=hadoop-examples.jar
% hadoop URLCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty
TreeThe Quangle Wangle sat,
But his face you could not
see, On account of his
Beaver Hat.

Reading Data Using the FileSystem API


FileSystem is a general filesystem API, so the first step is to retrieve an instance for the
filesystem we wantto use—HDFS, in this case. There are several static factory methods for getting a
FileSystem instance:
public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf, String user) throws IOException
A Configuration object encapsulates a client or server‘s configuration, which is set using
configuration files read from the classpath, such as etc/hadoop/core-site.xml.

To retrieve a local filesystem instance:


For this, you can use the convenience method getLocal():
public static LocalFileSystem getLocal(Configuration conf) throws IOException
With a FileSystem instance in hand, we invoke an open() method to get the input stream for a file:
public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException

Displaying files from a Hadoop filesystem on standard output by using the FileSystem directly
public class FileSystemCat {
public static void main(String[] args) throws
Exception { String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri),
conf); InputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096,
false);
} finally {
IOUtils.closeStream(i
n);
}
}
}
The program runs as follows:
% hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty
TreeThe Quangle Wangle sat,
But his face you could not
see, On account of his
Beaver Hat.
Displaying files from a Hadoop filesystem on standard output twice, by using seek()
public class FileSystemDoubleCat {
public static void main(String[] args) throws
Exception { String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri),
conf); FSDataInputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096,
false); in.seek(0); // go back to the start of
the file IOUtils.copyBytes(in, System.out,
4096, false);
} finally {
IOUtils.closeStream(in);
}
}
}

Here‘s the result of running it on a small file:


% hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty
TreeThe Quangle Wangle sat,
But his face you could not
see, On account of his Beaver
Hat. On the top of the
Crumpetty TreeThe Quangle
Wangle sat,
But his face you could not
see, On account of his
Beaver Hat.

Copying a local file to a Hadoop filesystem


public class FileCopyWithProgress {
public static void main(String[] args) throws
Exception { String localSrc = args[0];
String dst = args[1];
InputStream in = new BufferedInputStream(new
FileInputStream(localSrc)); Configuration conf = new
Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
OutputStream out = fs.create(new Path(dst), new
Progressable() {
public void
progress() {
System.out.print(".
");
}
});
IOUtils.copyBytes(in, out, 4096, true);
}
}
Typical usage:
% hadoop FileCopyWithProgress input/docs/1400-8.txt
hdfs://localhost/user/tom/1400-8.txt
MapReduce
MapReduce—and the other processing models in Hadoop—scales linearly with the size ofthe data.
Data is partitioned, and the functional primitives (like map and reduce) can work in parallel on separate
partitions. This means that if you double the size of the input data, a job will run twice as slowly. But if you
also double the size of the cluster, a job will run as fast as the original one. Hadoop can run MapReduce
programs written in various languages. MapReduce programs are inherently parallel, thus putting very
large-scale data analysis into the hands of anyone with enough machines at their disposal.

The classic tool for processing line-oriented data is awk


A program for finding the maximum recorded temperature by year from NCDC weather records
#!/usr/bin/env bash
for year in all/*
do
echo -ne `basename $year .gz`"\t"
gunzip -c $year | \
awk '{ temp = substr($0, 88, 5) + 0;
q = substr($0, 93, 1);
if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
END { print max }'
done
............
.....

MapReduce - What?
• MapReduce is a programming model for efficient distributed computing
• It works like a Unix pipeline
– cat input | grep | sort | uniq -c | cat > output
– Input | Map | Shuffle & Sort | Reduce | Output
• Efficiency from
– Streaming through data, reducing seeks
– Pipelining
• A good fit for a lot of applications
– Log processing
– Web index building

MapReduce - Features
• Fine grained Map and Reduce tasks
– Improved load balancing
– Faster recovery from failed tasks
• Automatic re-execution on failure
– In a large cluster, some nodes are always slow or flaky
– Framework re-executes failed tasks
• Locality optimizations
– With large data, bandwidth to data is a problem
– Map-Reduce + HDFS is a very effective solution
– Map-Reduce queries HDFS for locations of input data
– Map tasks are scheduled close to the inputs when possible
MapReduce – Dataflow

How many Maps and Reduces


• Maps
– Usually as many as the number of HDFS blocks being processed, this is the default
– Else the number of maps can be specified as a hint
– The number of maps can also be controlled by specifying the minimum split size
– The actual sizes of the map inputs are computed by:
max(min(block_size,data/#maps), min_split_size
• Reduces
– Unless the amount of data being processed is small
0.95*num_nodes*mapred.tasktracker.tasks.maximum

Input splitting, map and reduce functions, Specifying input and output parameters,
MapReduce:

MapReduce is a programming framework that allows us to perform distributed and parallel processing
on large data sets in a distributed environment.
 MapReduce consists of two distinct tasks – Map and Reduce.
 As the name MapReduce suggests, the reducer phase takes place after the mapper phase has been
completed.
 So, the first is the map job, where a block of data is read and processed to produce key-value pairs
as intermediate outputs.
 The output of a Mapper or map job (key-value pairs) is input to the Reducer.
 The reducer receives the key-value pair from multiple map jobs.
 Then, the reducer aggregates those intermediate data tuples (intermediate key-value pair) into a
smaller set of tuples or key-value pairs which is the final output.

Let us understand more about MapReduce and its components. MapReduce majorly has the following
three Classes. They are,

Mapper Class
The first stage in Data Processing using MapReduce is the Mapper Class. Here, RecordReader
processes each Input record and generates the respective key-value pair. Hadoop‘s Mapper store saves this
intermediate data into the local disk.
 Input Split: It is the logical representation of data. It represents a block of work that contains a
single map task in the MapReduce Program.
 RecordReader: It interacts with the Input split and converts the obtained data in the form of Key-
Value Pairs.

Reducer Class
The Intermediate output generated from the mapper is fed to the reducer which processes it and
generates the final output which is then saved in the HDFS.

Driver Class
The major component in a MapReduce job is a Driver Class. It is responsible for setting up a
MapReduce Job to run-in Hadoop. We specify the names of Mapper and Reducer Classes long with data
types and their respective job names.
Meanwhile, you may go through this MapReduce Tutorial video where our expert from Hadoop
online training has discussed all the concepts related to MapReduce has been clearly explained using
examples:

Advantages of MapReduce
The two biggest advantages of MapReduce are:
1. Parallel Processing:
In MapReduce, we are dividing the job among multiple nodes and each node works with a part of
the job simultaneously. So, MapReduce is based on Divide and Conquer paradigm which helps us to
process the data using different machines. As the data is processed by multiple machines instead of a
single machine in parallel, the time taken to process the data gets reduced by a tremendous amount as
shown in the figure below (2).
Fig.: Traditional Way Vs. MapReduce Way – MapReduce Tutorial

2. Data Locality:
Instead of moving data to the processing unit, we are moving the processing unit to the data in the
MapReduce Framework. In the traditional system, we used to bring data to the processing unit and
process it. But, as the data grew and became very huge, bringing this huge amount of data to the
processing unit posed the following issues:
 Moving huge data to processing is costly and deteriorates the network performance.
 Processing takes time as the data is processed by a single unit which becomes the bottleneck.
 The master node can get over-burdened and may fail.
Now, MapReduce allows us to overcome the above issues by bringing the processing unit to the data. So,
as you can see in the above image that the data is distributed among multiple nodes where each node
processes the part of the data residing on it. This allows us to have the following advantages:
 It is very cost-effective to move processing unit to the data.
 The processing time is reduced as all the nodes are working with their part of the data in parallel.
 Every node gets a part of the data to process and therefore, there is no chance of a node getting
overburdened.
Hadoop MapReduce Example: Word Count

• Mapper
– Input: value: lines of text of input
– Output: key: word, value: 1
• Reducer
– Input: key: word, value: set of counts
– Output: key: word, value: sum
• Launching program
– Defines this job
– Submits job to cluster

Let us understand, how a MapReduce works by taking an example where I have a text file called
example.txt whose contents are as follows:

Dear, Bear, River, Car, Car, River, Deer, Car and Bear

Now, suppose, we have to perform a word count on the sample.txt using MapReduce. So, we will
be finding the unique words and the number of occurrences of those unique words.

 First, we divide the input into three splits as shown in the figure. This will distribute the work
among all the map nodes.
 Then, we tokenize the words in each of the mappers and give a hardcoded value (1) to each of the
tokens or words. The rationale behind giving a hardcoded value equal to 1 is that every word, in
itself, will occur once.
 Now, a list of key-value pair will be created where the key is nothing but the individual words and
value is one. So, for the first line (Dear Bear River) we have 3 key-value pairs – Dear, 1; Bear, 1;
River, 1. The mapping process remains the same on all the nodes.
 After the mapper phase, a partition process takes place where sorting and shuffling happen so that
all the tuples with the same key are sent to the corresponding reducer.
 So, after the sorting and shuffling phase, each reducer will have a unique key and a list of values
corresponding to that very key. For example, Bear, [1,1]; Car, [1,1,1].., etc.
 Now, each Reducer counts the values which are present in that list of values. As shown in the
figure, reducer gets a list of values which is [1,1] for the key Bear. Then, it counts the number of
ones in the very list and gives the final output as – Bear, 2.
 Finally, all the output key/value pairs are then collected and written in the output file.
Word Count Mapper
public static class Map extends MapReduceBase implements
Mapper<LongWritable,Text,Text,IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();

public static void map(LongWritable key, Text value,


OutputCollector<Text,IntWritable> output, Reporter reporter) throws
IOException {
String line = value.toString();
StringTokenizer = new StringTokenizer(line);
while(tokenizer.hasNext()) {
word.set(tokenizer.nextToken());
output.collect(word,one);
}
}
}

Word Count Reducer


public static class Reduce extends MapReduceBase implements
Reducer<Text,IntWritable,Text,IntWritable> {
public static void map(Text key, Iterator<IntWritable> values,
OutputCollector<Text,IntWritable> output, Reporter reporter) throws
IOException {
int sum = 0;
while(values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}

Job represents a MapReduce job configuration.


Job is the primary interface for a user to describe a MapReduce job to the Hadoop framework for
execution. The framework tries to faithfully execute the job as described by Job, however:
 Some configuration parameters may have been marked as final by administrators (see Final
Parameters) and hence cannot be altered.
 While some job parameters are straight-forward to set (e.g. Job.setNumReduceTasks(int)) , other
parameters interact subtly with the rest of the framework and/or job configuration and are more
complex to set (e.g. Configuration.set(JobContext.NUM_MAPS, int)).

Job is typically used to specify the Mapper, combiner (if any), Partitioner, Reducer, InputFormat,
OutputFormat implementations. FileInputFormat indicates the set of input files
(FileInputFormat.setInputPaths(Job, Path…)/FileInputFormat.addInputPath(Job, Path)) and
(FileInputFormat.setInputPaths(Job, String…)/FileInputFormat.addInputPaths(Job, String)) and
where the output files should be written (FileOutputFormat.setOutputPath(Path)).

Optionally, Job is used to specify other advanced facets of the job such as the Comparator to be
used, files to be put in the DistributedCache, whether intermediate and/or job outputs are to be compressed
(and how), whether job tasks can be executed in a speculative manner
(setMapSpeculativeExecution(boolean))/setReduceSpeculativeExecution(boolean)), maximum number
of attempts per task (setMaxMapAttempts(int)/ setMaxReduceAttempts(int)) etc.

Of course, users can use Configuration.set(String, String) / Configuration.get(String) to set/get


arbitrary parameters needed by applications. However, use the DistributedCache for large amounts of
(read-only) data.

Configured Parameters:The following properties are localized in the job configuration for each task’s
execution:
Name Type Description
mapreduce.job.id String The job id
mapreduce.job.jar String job.jar location in job directory
mapreduce.job.local.dir String The job specific shared scratch space
mapreduce.task.id String The task id
mapreduce.task.attempt.id String The task attempt id
mapreduce.task.is.map boolean Is this a map task
mapreduce.task.partition int The id of the task within the job
mapreduce.map.input.file String The filename that the map is reading from
mapreduce.map.input.start long The offset of the start of the map input split
mapreduce.map.input.length long The number of bytes in the map input split
mapreduce.task.output.dir String The task‘s temporary output directory

JobConfs :
• Jobs are controlled by configuring JobConf
• JobConfs are maps from attribute names to string values
• The framework defines attributes to control how the job is Executed
– conf.set(―mapred.job.name‖, ―MyApp‖);
• Applications can add arbitrary values to the JobConf
– conf.set(―my.string‖, ―foo‖);
– conf.set(―my.integer‖, 12);
• JobConf is available to all tasks
Hadoop Vs Spark:

“Parallel computing is the simultaneous use of more than one processor to solve a problem”
. “Distributed computing is the simultaneous use of more than one computer to solve a
problem”

Hadoop is designed to handle batch processing efficiently whereas Spark is designed to handle
real-time data efficiently. Hadoop is a high latency computing framework, which does not have an
interactive mode whereas Spark is a low latency computing and can process data interactively.
1. HDFS – Hadoop Distributed File System. This is the file system that manages the storage of
large sets of data across a Hadoop cluster. HDFS can handle both structured and unstructured data.
The storage hardware can range from any consumer-grade HDDs to enterprise drives.
2. MapReduce. The processing component of the Hadoop ecosystem. It assigns the data fragments
from the HDFS to separate map tasks in the cluster. MapReduce processes the chunks in parallel
to combine the pieces into the desired result.
3. YARN. Yet Another Resource Negotiator. Responsible for managing computing resources and job
scheduling.
4. Hadoop Common. The set of common libraries and utilities that other modules depend on.
Another name for this module is Hadoop core, as it provides support for all other Hadoop
components.

Big Data Processing


Today, we have many free solutions for big data processing. Many companies also offer
specialized enterprise features to complement the open-source platforms.
The trend started in 1999 with the development of Apache Lucene. The framework soon became
open-source and led to the creation of Hadoop. Two of the most popular big data processing frameworks in
use today are open source – Apache Hadoop and Apache Spark.

What is Hadoop?
Apache Hadoop is a platform that handles large datasets in a distributed fashion. The framework
uses MapReduce to split the data into blocks and assign the chunks to nodes across a cluster. MapReduce
then processes the data in parallel on each node to produce a unique output.
Every machine in a cluster both stores and processes data. Hadoop stores the data to disks
using HDFS. The software offers seamless scalability options. You can start with as low as one machine
and then expand to thousands, adding any type of enterprise or commodity hardware.
The Hadoop ecosystem is highly fault-tolerant. Hadoop does not depend on hardware to achieve
high availability. At its core, Hadoop is built to look for failures at the application layer. By replicating
data across a cluster, when a piece of hardware fails, the framework can build the missing parts from
another location. The nature of Hadoop makes it accessible to everyone who needs it. The open-source
community is large and paved the path to accessible big data processing.

What is Spark?
Apache Spark is an open-source tool. This framework can run in a standalone mode or on a cloud
or cluster manager such as Apache Mesos, and other platforms. It is designed for fast performance and
uses RAM for caching and processing data.
Spark performs different types of big data workloads. This includes MapReduce-like batch
processing, as well as real-time stream processing, machine learning, graph computation, and interactive
queries. With easy to use high-level APIs, Spark can integrate with many different libraries,
including PyTorch and TensorFlow.
The Spark engine was created to improve the efficiency of MapReduce and keep its benefits. Even
though Spark does not have its file system, it can access data on many different storage solutions. The data
structure that Spark uses is called Resilient Distributed Dataset, or RDD.
There are five main components of Apache Spark:
1. Apache Spark Core. The basis of the whole project. Spark Core is responsible for necessary
functions such as scheduling, task dispatching, input and output operations, fault recovery, etc.
Other functionalities are built on top of it.
2. Spark Streaming. This component enables the processing of live data streams. Data can originate
from many different sources, including Kafka, Kinesis, Flume, etc.
3. Spark SQL. Spark uses this component to gather information about the structured data and how
the data is processed.
4. Machine Learning Library (MLlib). This library consists of many machine learning algorithms.
MLlib‘s goal is scalability and making machine learning more accessible.
5. GraphX. A set of APIs used for facilitating graph analytics tasks.

Differences Between Hadoop and Spark


The main differences and similarities between the two frameworks. Some of these
are cost, performance, security, and ease of use.
Categoryfor
Hadoop Comparison Spar
k
Slower performance, uses disks for storage Performance Fast in-memory performance with reduced
and depends on disk read and write speed. disk
reading and writing operations.
An open-source platform, less expensive to Cost An open-source platform, but relies on
run. Uses affordable consumer hardware. memory
Easier to find trained Hadoop professionals. for computation, which considerably
increases running costs.
Best for batch processing. Uses MapReduce Data Processing Suitable for iterative and live-stream
to split a large dataset across a cluster for data analysis. Works with RDDs and
parallel analysis. DAGs to run
operations.
A highly fault-tolerant system. Replicates the Fault Tolerance Tracks RDD block creation process, and
data across the nodes and uses them in case of then it can rebuild a dataset when a
an issue. partition fails.
Spark can also use a DAG to rebuild data
across nodes.
Easily scalable by adding nodes and disks for Scalability A bit more challenging to scale because it
storage. Supports tens of thousands of nodes relies
without a known limit. on RAM for computations. Supports
thousands of nodes in a cluster.
Extremely secure. Supports LDAP, ACLs, Security Not secure. By default, the security is
Kerberos, SLAs, etc. turned
off. Relies on integration with
Hadoop to achieve the necessary
security level.
More difficult to use with less supported Ease of Use and More user friendly. Allows interactive
languages. Uses Java or Python for Language shell mode. APIs can be written in
MapReduce apps. Support Java, Scala, R, Python, Spark SQL.
Slower than Spark. Data fragments can be Machine Much faster with in-memory processing.
too large and create bottlenecks. Mahout is Learning Uses MLlib for computations.
the main library.
Uses external solutions. YARN is the most Scheduling and Has built-in tools for resource
common option for resource management. Resource allocation, scheduling, and
Oozie is available for workflow scheduling. Management monitoring.
Design and Implementation of Hive
Developed at Facebook
• Used for majority of Facebook jobs
• ―Relational database‖ built on Hadoop
– Maintains list of table schemas
– SQL-like query language (HiveQL)
– Can call Hadoop Streaming scripts from HiveQL
– Supports table partitioning, clustering, complex data types, some optimizations
Hive Architecture

Hive design and architecture


Hive Architecture
The major components of Hive and its interactions with Hadoop. As shown in that figure, the main
components of Hive are:
 UI – The user interface for users to submit queries and other operations to the system. As of 2011
the system had a command line interface and a web based GUI was being developed.
 Driver – The component which receives the queries. This component implements the notion of
session handles and provides execute and fetch APIs modeled on JDBC/ODBC interfaces.
 Compiler – The component that parses the query, does semantic analysis on the different query
blocks and query expressions and eventually generates an execution plan with the help of the table
and partition metadata looked up from the metastore.
 Metastore – The component that stores all the structure information of the various tables and
partitions in the warehouse including column and column type information, the serializers and
deserializers necessary to read and write data and the corresponding HDFS files where the data is
stored.
 Execution Engine – The component which executes the execution plan created by the compiler.
The plan is a DAG of stages. The execution engine manages the dependencies between these
different stages of the plan and executes these stages on the appropriate system components.

Hive Storage Format: Items to be considered while choosing a file format for storage include:
 Support for columnar storage
 Splitability
 Compression
 Schema evolution
 Indexing capabilities

Data in Hive is organized into:


 Tables – These are analogous to Tables in Relational Databases. Tables can be filtered, projected,
joined and unioned. Additionally all the data of a table is stored in a directory in HDFS. Hive also
supports the notion of external tables wherein a table can be created on prexisting files or
directories in HDFS by providing the appropriate location to the table creation DDL. The rows in a
table are organized into typed columns similar to Relational Databases.
 Partitions – Each Table can have one or more partition keys which determine how the data is
stored, for example a table T with a date partition column ds had files with data for a particular
date stored in the <table location>/ds=<date> directory in HDFS. Partitions allow the system to
prune data to be inspected based on query predicates, for example a query that is interested in rows
from T that satisfy the predicate T.ds = '2008-09-01' would only have to look at files in <table
location>/ds=2008-09-01/ directory in HDFS.
 Buckets – Data in each partition may in turn be divided into Buckets based on the hash of a
column in the table. Each bucket is stored as a file in the partition directory. Bucketing allows the
system to efficiently evaluate queries that depend on a sample of data (these are queries that use
the SAMPLE clause on the table).
Creating a Hive Table
CREATE TABLE page_views(viewTime INT, userid
BIGINT, page_url STRING, referrer_url
STRING,
ip STRING COMMENT 'User IP
address')COMMENT 'This is the page view
table' PARTITIONED BY(dt STRING, country
STRING) STORED AS SEQUENCEFILE;
• Partitioning breaks table into separate files for each (dt, country) pair
Ex: /hive/page_view/dt=2008-06-08,country=USA
/hive/page_view/dt=2008-06-08,country=CA

A Simple Query
• Find all page views coming from xyz.com on March 31st:
SELECT
page_views.* FROM
page_views
WHERE page_views.date >= '2008-
03-01' AND page_views.date <=
'2008-03-31'
AND page_views.referrer_url like '%xyz.com';
• Hive only reads partition 2008-03-01,* instead of scanning entire table

Aggregation and Joins


• Count users who visited each page by gender:
SELECT pv.page_url, u.gender,
COUNT(DISTINCT u.id)
FROM page_views pv JOIN user u
ON(pv.userid = u.id)
GROUP BY pv.page_url,
u.gender WHERE pv.date =
'2008-03-03';
• Sample output:

Using a Hadoop Streaming Mapper Script


SELECT
TRANSFORM(page_views.userid,page_view
s.date)
USING 'map_script.py'
AS dt, uid CLUSTER BY dt
FROM page_views;
Apache Pig?
Pig is a scripting language for exploring huge data sets of size gigabytes or terabytes very easily. Pig
provides an engine for executing data flows in parallel on Hadoop. Pig is made up of two things mainly.
 Pig Latin: Language for expressing data flows
 Pig Engine: Pig Engine converts these Pig Latin operators or transformations into a series of
MapReduce jobs, Execution Environment to run Pig Latin programs. It has two modes
 Local Mode: Local execution in a single JVM, all files are installed and run using local host and
file system.
 Mapreduce Mode: Distributed execution on a Hadoop cluster, it is the default mode.
Started at Yahoo! Research, Now runs about 30% of Yahoo!‘s jobs

Apache Pig History:


The word ―Pig‖ is named after a domestic animal, it is not any acronym. This entertaining
nomenclature lead to some silly names in Pig project like Pig Latin for its language and Grunt for its
Interactive shell.
Apache Pig is top level project in Apache Software foundation, prior to this it was started by
Yahoo researchers and later contributed it to Apache Open source community in 2010.

Pig Latin Features:


 Pig Latin includes operators for many of the traditional data operations (join, sort, filter, etc.)
 Pig Latin is extensible so that users can develop their own functions for reading, processing, and
writing data.
 Pig Latin script is made up of a series of operations, or transformations, that are
applied to the input data to produce output
 Pig Latin programs can be executed either in Interactive mode through Grunt shell or in Batch
mode via Pig Latin Scripts.
 Expresses sequences of MapReduce jobs
 Data model: nested ―bags‖ of items
 Provides relational (SQL) operators (JOIN, GROUP BY, etc.)
 Easy to plug in Java functions

Pig Limitations:
 Pig does not support random reads or queries in the order of tens of milliseconds.
 Pig does not support random writes to update small portions of data, all writes are bulk,
streaming writes, just like MapReduce.
 Low latency queries are not supported in Pig, thus it is not suitable for OLAP and OLTP.
Pig Architecture: The conveys that,
1. Pig Latin scripts or Pig commands
from Grunt shell will be submitted to
Pig Engine.
2. Pig Engine parses, compiles,
optimizes, and fires MapReduce
statements.
3. MapReduce accesses HDFS and
returns the results.
One of the common use case of Pig is data
pipelines. A common example is web
companies bring in logs from their web servers,
cleansing the data, and pre-computing common
aggregates before loading it into their data
warehouse. In this case, the data is loaded
onto the grid, and then Pig is used to clean out
records from bots and records with corrupt data. It is also used to join web event data against user
databases so that user cookies can be connected with known user information.
What is the need for Pig when we already have Mapreduce?
Mapreduce is a low level data set processing paradigm where as the Pig provides the high level of
abstraction for processing large data sets. Though, both Mapreduce and Pig are used for processing data
sets and Pig transformations are converted into a series of Mapreduce jobs, below are the major differences
of Mapreduce Processing Framework and Pig framework.
Pig Latin provides all of the standard data-processing operations, such as join, filter, group
by, order by, union, etc. MapReduce provides the group by operation directly, but order by, filter,
projection, join are not provided and must be written by the user.
Mapreduce Pig
It is too low-level and rigid, and leads to a great High level Programming
deal of custom user code that is hard to maintain
and reuse

Development cycle is very long. Writing mappers In pig, no need of compiling or packaging of code.
and reducers, compiling and packaging the code, Pig operators will be converted into map or reduce
submitting jobs, and retrieving the results is a time tasks internally.
Consuming process

To extract small portion of data from large datasets Pig is not suitable small portions of data in a large
using Mapreduce is preferable dataset, since it is set up to scan the whole dataset,
or at least large portions of it

Not Easily Extendable. We need to write functions UDFs tend to be more reusable than the libraries
starting from scratch. developed for writing MapReduce programs

We need MapReduce when we need very deep level Sometimes, it is not very convenient to express
and fine grained control on the way we want to what we need exactly in terms of Pig and Hive
process our data. queries.

Performing Data set joins is very difficult Joins are simple to achieve in Pig.
Difference Between Hive and Pig:
Hive can be treated as competitor for Pig in some cases and Hive also operates on HDFS similar to
Pig but there are some significant differences. HiveQL is query language based on SQL but Pig Latin is
not a query language. It is data flow scripting language.
Since Pig Latin is procedural, it fits very naturally in the pipeline paradigm. HiveQL on the other
hand is declarative.

Example Problem
Suppose you have user data in a file,
website data in another, and you need to find the top
5 most visited pages by users aged 18-25.

Users = load ‗users‘ as (name, age);


Filtered = filter Users by age >= 18 and age <= 25;
Pages = load ‗pages‘ as (user, url);
Joined = join Filtered by name, Pages by user;
Grouped = group Joined by url;
Summed = foreach Grouped generate group, count(Joined) as
clicks;
Sorted = order Summed by clicks desc;
Top5 = limit Sorted 5; store Top5 into ‗top5sites‘;

Ease of Translation
HBase
HBase is a column-oriented non-relational database management system that runs on top
of Hadoop Distributed File System (HDFS). HBase provides a fault-tolerant way of storing sparse data
sets, which are common in many big data use cases. It is well suited for real-time data processing or
random read/write access to large volumes of data.
Unlike relational database systems, HBase does not support a structured query language like SQL;
in fact, HBase isn‘t a relational data store at all. HBase applications are written in Java™ much like a
typical Apache MapReduce application. HBase does support writing applications in Apache Avro, REST
and Thrift.
An HBase system is designed to scale linearly. It comprises a set of standard tables with rows and
columns, much like a traditional database. Each table must have an element defined as a primary key, and
all access attempts to HBase tables must use this primary key.
Avro, as a component, supports a rich set of primitive data types including: numeric, binary data and
strings; and a number of complex types including arrays, maps, enumerations and records. A sort order can
also be defined for the data.
HBase relies on ZooKeeper for high-performance coordination. ZooKeeper is built into HBase,
but if you‘re running a production cluster, it‘s suggested that you have a dedicated ZooKeeper cluster
that‘s integrated with your HBase cluster.
HBase works well with Hive, a query engine for batch processing of big data, to enable fault-
tolerant big data applications.

HBase - What?
• Modeled on Google‘s Bigtable
• Row/column store
• Billions of rows/millions on columns
• Column-oriented - nulls are free
• Untyped - stores byte[]

HBase Unique Features


HBase is built for low latency operations
HBase is used extensively for random read and write operations
HBase stores a large amount of data in terms of tables
Provides linear and modular scalability over cluster environment
Strictly consistent to read and write operations
Automatic and configurable sharding of tables
Automatic failover supports between Region Servers
Convenient base classes for backing Hadoop MapReduce jobs in HBase tables
Easy to use Java API for client access
Block cache and Bloom Filters for real-time queries
Query predicate pushes down via server-side filters.

The important topics that I will be taking you through in this HBase architecture blog are:
HBase Data Model
HBase Architecture and it‘s Components
HBase Write Mechanism
HBase Read Mechanism
HBase Performance Optimization Mechanisms
HBase Data Model
HBase is a column-oriented NoSQL database. Although it looks similar to a relational database
which contains rows and columns, but it is not a relational database. Relational databases are row oriented
while HBase is column-oriented. The difference between Column-oriented and Row-oriented databases:

Row-oriented vs column-oriented Databases:


Row-oriented databases store table records in a sequence of rows. Whereas column-oriented
databases store table records in a sequence of columns, i.e. the entries in a column are stored in
contiguous locations on disks.

To better understand it, let us take an example and consider the table below.
If this table is stored in a row-oriented database. It
will store the records as shown below:

1, Paul Walker, US, 231, Gallardo,


2, Vin Diesel, Brazil, 520, Mustang
In row-oriented databases data is stored on
the basis of rows or tuples as you can see above.
While the column-oriented databases store this data
as:
1,2, Paul Walker, Vin Diesel, US, Brazil, 231, 520, Gallardo, Mustang
In a column-oriented databases, all the column values are stored together like first column values
will be stored together, then the second column values will be stored together and data in other columns
are stored in a similar manner.
Meta Table
The META table is a special HBase catalog
table. It maintains a list of all the Regions
Servers in the HBase storage system.
META file maintains the table in form of
keys and values. Key represents the start
key of the region and its id whereas the
value contains the path of the Region
Server.
HBase Architecture and its Important
Components
HBase Architecture
Diagram
HBase architecture
consists mainly of
four components
HMaster
HRegionserv
er
HRegions
Zookeeper
HDFS
HMaster:
HMaster is the implementation of a
Master server in HBase architecture. It acts
as a monitoring agent to monitor all Region
Server instances present in the cluster and
acts as an interface for all the metadata
changes. In a distributed cluster
environment, Master runs on NameNode.
Master runs several background threads.

The following are important roles performed


by HMaster in HBase.
Plays a vital role in terms of performance and maintaining nodes in the cluster.
HMaster provides admin performance and distributes services to different region servers.
HMaster assigns regions to region servers.
HMaster has the features like controlling load balancing and failover to handle the load over nodes
present in the cluster.
When a client wants to change any schema and to change any Metadata operations, HMaster takes
responsibility for these operations.
Some of the methods exposed by HMaster Interface are primarily Metadata oriented methods.
Table (createTable, removeTable, enable, disable)
ColumnFamily (add Column, modify Column)
Region (move, assign)
The client communicates in a bi-directional way with both HMaster and ZooKeeper. For read and
write operations, it directly contacts with HRegion servers. HMaster assigns regions to region servers and
in turn, check the health status of region servers.
In entire architecture, we have multiple region servers. Hlog present in region servers which are going
to store all the log files.

HBase Regions Servers:


When Region Server receives writes and read requests from the client, it assigns the request to a
specific region, where the actual column family resides. However, the client can directly contact with
HRegion servers, there is no need of HMaster mandatory permission to the client regarding
communication with HRegion servers. The client requires HMaster help when operations related to
metadata and schema changes are required.
HRegionServer is the Region Server implementation. It is responsible for serving and managing
regions or data that is present in a distributed cluster.
The region servers run on Data Nodes present in the
Hadoop cluster.
HMaster can get into contact with multiple
HRegion servers and performs the following
functions.
Hosting and managing regions
Splitting regions automatically
Handling read and writes requests
Communicating with the client directly
Region Server Components
HBase Regions:
HRegions are the basic building elements of HBase cluster that consists of the distribution of
tables and are comprised of Column families. It contains multiple stores, one for each column family. It
consists of mainly two components, which are Memstore and Hfile.
ZooKeeper:
In HBase, Zookeeper is a centralized monitoring server which maintains configuration information
and provides distributed synchronization. Distributed synchronization is to access the distributed
applications running across the cluster with the
responsibility of providing coordination services
between nodes. If the client wants to
communicate with regions, the server's client has
to approach ZooKeeper first.
It is an open source project, and it
provides so many important services.
Services provided by ZooKeeper
Maintains Configuration information
Provides distributed synchronization
Client Communication establishment
with region servers
Provides ephemeral nodes for which represent different region servers
Master servers usability of ephemeral nodes for discovering available servers in the cluster
To track server failure and network partitions
Master and HBase slave nodes ( region servers) registered themselves with ZooKeeper. The client
needs access to ZK(zookeeper) quorum configuration to connect with master and region servers. During a
failure of nodes that present in HBase cluster, ZKquoram will trigger error messages, and it starts to
repair the failed nodes.

HDFS: HDFS get in contact with the HBase components and stores a large amount ofdata in a
distributed manner.
HBase Read and Write Data operations from Client into Hfile can be shown in below diagram.
Step 1) Client wants to write data and
in turn first communicates
with Regions server and then
regions
Step 2) Regions contacting memstore
for storing associated with the
column family
Step 3) First data stores into Memstore,
where the data is sorted and
after that, it flushes into
HFile. The main reason for
using Memstore is to store
data in a Distributed file
system based on Row Key. Memstore will be placed in Region server main memory while
HFiles are written into HDFS.
Step 4) Client wants to read data from Regions
Step 5) In turn Client can have direct access to Memstore, and it can request for data.
Step 6) Client approaches HFiles to get the data. The data are fetched and retrieved by the Client.
Memstore holds in-memory modifications to the store. The hierarchy of objects in HBase Regions is
as shown from top to bottom in below table.
Table HBase table present in the HBase cluster

Region HRegions for the presented tables

Store It stores per ColumnFamily for each region for the table

Memstore  Memstore for each store for each region for the table
 It sorts data before flushing into HFiles
 Write and read performance will increase because of sorting

StoreFile StoreFiles for each store for each region for the table

Block Blocks present inside StoreFiles


HBase vs. HDFS: HBase runs on top of HDFS and Hadoop. Some key differences betweenHDFS
and HBase are in terms of data operations and processing.

HBASE HDFS

Low latency operations High latency operations

Random reads and writes Write once Read many times

Accessed through shell commands, client API in Primarily accessed through MR (Map Reduce) jobs
Java, REST, Avro or Thrift

Storage and process both can be perform It's only for storage areas
Some typical IT industrial applications use HBase operations along with Hadoop. Applications
include stock exchange data, online banking data operations, and processing Hbase is best-suited solution
method.

HBase - Data Model


Time stamp Column family: animal: Column family
repairs:
animal:type animal:size repairs:cost
enclosure1 t2 zebra 1000 EUR
t1 lion big
enclosure2 … … … …

HBase - Data Storage


Column family animal:
(enclosure1, t2, animal:type) zebra
(enclosure1, t1, animal:size) big
(enclosure1, t1, animal:type) lion
Column family repairs:
(enclosure1, t1, repairs:cost) 1000 EUR
HBase - Code

HTable table = …
Text row = new
Text(“enclosure1”); Text col1 =
new Text(“animal:type”); Text
col2 = new Text(“animal:size”);
BatchUpdate update = new
BatchUpdate(row); update.put(col1,
“lion”.getBytes(“UTF-8”));
update.put(col2, “big”.getBytes(“UTF-
8)); table.commit(update);
update = new BatchUpdate(row);
update.put(col1, “zebra”.getBytes(“UTF-
8”)); table.commit(update);

HBase - Querying
• Retrieve a cell
Cell = table.getRow(―enclosure1‖).getColumn(―animal:type‖).getValue();
• Retrieve a row
RowResult = table.getRow( ―enclosure1‖ );
• Scan through a range of rows
Scanner s = table.getScanner( new String[] { ―animal:type‖ } );

Assignment 4:
Case Study: Map Reduce techniques:
Try it by yourself, this work will help during your placement time
Specifying input and output parameters,
Input splitting,
Mapping and
Reducing functions,
Design and Implementation of Hive, Pig, HBase.
Configuring and Running a Job using hadoop, .

Consider own set of students data set. Data set also from different formats(Facebook, tweeter,
Oracle database, notepad, excel, etc., ). The data set consists of different set of marks/grades, students
from different location(City/ Village), categories, various family background, family education, Different
mode education(Native language/Specific type of language), students get different stream of studies
(school/poly techniques/ College/ University may be educations is +2/ Diploma/ Engineering/ Arts/ U.G./
P.G./ Ph. d., present working and earnings, Certificates courses and placement record. From this data set
analysis which type of students, are performing well in the outside world/social sector. Analysis is based
age group between 25-30 ages.

Assume data set is located in various location in various forms, that is the records are stored in
the distributed fashion. Example School students record are in stored in different school data base server,
College students academic record are available in the university record.

Work:
1. Setup own Hadoop platform, use different types of tools (Spark, Hive, Hbase, Pig, based your
requirements), Minimum two tools should use in your work.
2. Consider different types of data format adopted by different sectors
3. Create Own data base.
3. Design your own Architecture for Analysis purpose.
Output:
1. Screen shorts of setup
2. Write a steps involved while setting up
3. Configuration
4. Analysis report/ Your prediction

Work:
1. Reference Contents and installation from various web sites, &
https://www.tutorialspoint.com
2. Hands on Try yourself: Configuring and Running a Job. Hadoop Vs Spark.
Interview Based Questions
1) What is MapReduce?
It is a framework or a programming model that is used for processing large data sets over clusters
of computers using distributed programming.

2) What are ‘maps’ and ‘reduces’?


‗Maps‘ and ‗Reduces‘ are two phases of solving a query in HDFS. ‗Map‘ is responsible to read
data from input location, and based on the input type, it will generate a key value pair,that is, an
intermediate output in local machine.‘Reducer‘ is responsible to process the intermediate output
received from the mapper and generate the final output.

3) What are the four basic parameters of a mapper?


The four basic parameters of a mapper are LongWritable, text, text and IntWritable. The first two
represent input parameters and the second two represent intermediate output parameters.

4) What are the four basic parameters of a reducer?


The four basic parameters of a reducer are Text, IntWritable, Text, IntWritable.The first two
represent intermediate output parameters and the second two represent final output parameters.

5) What do the master class and the output class do?


Master is defined to update the Master or the job tracker and the output class is defined to write
data onto the output location.

6) What is the input type/format in MapReduce by default?


By default the type input type in MapReduce is ‗text‘.

7) Is it mandatory to set input and output type/format in MapReduce?


No, it is not mandatory to set the input and output type/format in MapReduce. By default, the
cluster takes the input and the output type as ‗text‘.

8) What does the text input format do?


In text input format, each line will create a line object, that is an hexa-decimal number. Key is
considered as a line object and value is considered as a whole line text. This is how the data gets
processed by a mapper. The mapper will receive the ‗key‘ as a ‗LongWritable‘ parameter and value as
a ‗Text‘ parameter.

9) What does job conf class do?


MapReduce needs to logically separate different jobs running on the same cluster. ‗Job conf class‘
helps to do job level settings such as declaring a job in real environment. It is recommended that Job
name should be descriptive and represent the type of job that is being executed.

10) What does conf.setMapper Class do?


Conf.setMapperclass sets the mapper class and all the stuff related to map job such as reading a
data and generating a key-value pair out of the mapper.

11) What do sorting and shuffling do?


Sorting and shuffling are responsible for creating a unique key and a list of values.Making similar
keys at one location is known as Sorting. And the process by which the intermediate output of the
mapper is sorted and sent across to the reducers is known as Shuffling.
12) What does a split do?
Before transferring the data from hard disk location to map method, there is a phase or method
called the ‗Split Method‘. Split method pulls a block of data from HDFS to the framework. The Split
class does not write anything, but reads data from the block and pass it to the mapper.Be default, Split
is taken care by the framework. Split method is equal to the block size and is used to divide block into
bunch of splits.

13) How can we change the split size if our commodity hardware has less storage space?
If our commodity hardware has less storage space, we can change the split size by writing the
‗custom splitter‘. There is a feature of customization in Hadoop which can be called from the main
method.

14) What does a MapReduce partitioner do?


A MapReduce partitioner makes sure that all the value of a single key goes to the same reducer,
thus allows evenly distribution of the map output over the reducers. It redirects the mapper output to
the reducer by determining which reducer is responsible for a particular key.

15) How is Hadoop different from other data processing tools?


In Hadoop, based upon your requirements, you can increase or decrease the number of mappers
without bothering about the volume of data to be processed. This is the beauty of parallel processing in
contrast to the other data processing tools available.

16) Can we rename the output file?


Yes we can rename the output file by implementing multiple format output class.

17) Why we cannot do aggregation (addition) in a mapper? Why we require reducer for that?
We cannot do aggregation (addition) in a mapper because, sorting is not done in a mapper. Sorting
happens only on the reducer side. Mapper method initialization depends upon each input split. While
doing aggregation, we will lose the value of the previous instance. For each row, a new mapper will
get initialized. For each row, inputsplit again gets divided into mapper, thus we do not have a track of
the previous row value.

18) What is Streaming?


Streaming is a feature with Hadoop framework that allows us to do programming using
MapReduce in any programming language which can accept standard input and can produce standard
output. It could be Perl, Python, Ruby and not necessarily be Java. However, customization in
MapReduce can only be done using Java and not any other programming language.

19) What is a Combiner?


A ‗Combiner‘ is a mini reducer that performs the local reduce task. It receives the input from the
mapper on a particular node and sends the output to the reducer. Combiners help in enhancing the
efficiency of MapReduce by reducing the quantum of data that is required to be sent to the reducers.

20) What is the difference between an HDFS Block and Input Split?
HDFS Block is the physical division of the data and Input Split is the logical division of the data.

21) What happens in a TextInputFormat?


In TextInputFormat, each line in the text file is a record. Key is the byte offset of the line and value is
the content of the line.
For instance,Key: LongWritable, value: Text.
22) What do you know about KeyValueTextInputFormat?
In KeyValueTextInputFormat, each line in the text file is a ‗record‘. The first separator character
divides each line. Everything before the separator is the key and everything after the separator is the
value.
For instance, Key: Text, value: Text.

23) What do you know about SequenceFileInputFormat?


SequenceFileInputFormat is an input format for reading in sequence files. Key and value are user
defined. It is a specific compressed binary file format which is optimized for passing the data between
the output of one MapReduce job to the input of some other MapReduce job.

24) What do you know about NLineOutputFormat?


NLineOutputFormat splits ‗n‘ lines of input as one split.

25) What is a JobTracker in Hadoop? How many instances of JobTracker run on a Hadoop
Cluster?
JobTracker is the daemon service for submitting and tracking MapReduce jobs in Hadoop. There
is only One Job Tracker process run on any hadoop cluster. Job Tracker runs on its own JVM process.
In a typical production cluster its run on a separate machine. Each slave node is configured with job
tracker node location. The JobTracker is single point of failure for the Hadoop MapReduce service. If
it goes down, all running jobs are halted. JobTracker in Hadoop performs following actions(from
Hadoop Wiki:)
Client applications submit jobs to the Job tracker.
The JobTracker talks to the NameNode to determine the location of the data
The JobTracker locates TaskTracker nodes with available slots at or near the data
The JobTracker submits the work to the chosen TaskTracker nodes.
The TaskTracker nodes are monitored. If they do not submit heartbeat signals often enough, they
are deemed to have failed and the work is scheduled on a different TaskTracker.
A TaskTracker will notify the JobTracker when a task fails. The JobTracker decides what to do
then: it may resubmit the job elsewhere, it may mark that specific record as something to avoid, and it
may may even blacklist the TaskTracker as unreliable.
When the work is completed, the JobTracker updates its status.
Client applications can poll the JobTracker for information.

26) How JobTracker schedules a task?


The TaskTrackers send out heartbeat messages to the JobTracker, usually every few minutes,
to reassure the JobTracker that it is still alive. These message also inform the JobTracker of the
number of available slots, so the JobTracker can stay up to date with where in the cluster work can be
delegated. When the JobTracker tries to find somewhere to schedule a task within the MapReduce
operations, it first looks for an empty slot on the same server that hosts the DataNode containing the
data, and if not, it looks for an empty slot on a machine in the same rack.

27) What is a Task Tracker in Hadoop? How many instances of TaskTracker run on a Hadoop
Cluster
A TaskTracker is a slave node daemon in the cluster that accepts tasks (Map, Reduce and
Shuffle operations) from a JobTracker. There is only One Task Tracker process run on any hadoop
slave node. Task Tracker runs on its own JVM process. Every TaskTracker is configured with a set of
slots, these indicate the number of tasks that it can accept. The TaskTracker starts a separate JVM
processes to do the actual work (called as Task Instance) this is to ensure that process failure does not
take down the task tracker. The TaskTracker monitors these task instances, capturing the output and
exit codes. When the Task instances finish, successfully or not, the task tracker notifies the
JobTracker. The TaskTrackers also send out heartbeat messages to the JobTracker, usually every few
minutes, to reassure the JobTracker that it is still alive. These message also inform the JobTracker of
the number of available slots, so the JobTracker can stay up to date with where in the cluster work can
be delegated.

28) What is a Task instance in Hadoop? Where does it run?


Task instances are the actual MapReduce jobs which are run on each slave node. The
TaskTracker starts a separate JVM processes to do the actual work (called as Task Instance) this is to
ensure that process failure does not take down the task tracker. Each Task Instance runs on its own
JVM process. There can be multiple processes of task instance running on a slave node. This is based
on the number of slots configured on task tracker. By default a new task instance JVM process is
spawned for a task.

29) How many Daemon processes run on a Hadoop system?


Hadoop is comprised of five separate daemons. Each of these daemon run in its own JVM.
Following 3 Daemons run on Master nodes NameNode – This daemon stores and maintains the
metadata for HDFS. Secondary NameNode – Performs housekeeping functions for the NameNode.
JobTracker – Manages MapReduce jobs, distributes individual tasks to machines running the Task
Tracker. Following 2 Daemons run on each Slave nodes DataNode – Stores actual HDFS data blocks.
TaskTracker – Responsible for instantiating and monitoring individual Map and Reduce tasks.

30) What is configuration of a typical slave node on Hadoop cluster? How many JVMs run on a
slave node?
Single instance of a Task Tracker is run on each Slave node. Task tracker is run as a separate
JVM process. Single instance of a DataNode daemon is run on each Slave node. DataNode daemon is
run as a separate JVM process. One or Multiple instances of Task Instance is run on each slave node.
Each task instance is run as a separate JVM process. The number of Task instances can be controlled
by configuration. Typically a high end machine is configured to run more task instances.

31) What is the difference between HDFS and NAS ?


The Hadoop Distributed File System (HDFS) is a distributed file system designed to run on
commodity hardware. It has many similarities with existing distributed file systems. However, the
differences from other distributed file systems are significant. Following are differences between
HDFS and NAS
In HDFS Data Blocks are distributed across local drives of all machines in a cluster. Whereas
in NAS data is stored on dedicated hardware.
HDFS is designed to work with MapReduce System, since computation are moved to data.
NAS is not suitable for MapReduce since data is stored seperately from the computations.
HDFS runs on a cluster of machines and provides redundancy usinga replication protocal.
Whereas NAS is provided by a single machine therefore does not provide data redundancy.

32) How NameNode Handles data node failures?


NameNode periodically receives a Heartbeat and a Blockreport from each of the DataNodes in the
cluster. Receipt of a Heartbeat implies that the DataNode is functioning properly. A Blockreport
contains a list of all blocks on a DataNode. When NameNode notices that it has not recieved a
hearbeat message from a data node after a certain amount of time, the data node is marked as dead.
Since blocks will be under replicated the system begins replicating the blocks that were stored on the
dead datanode. The NameNode Orchestrates the replication of data blocks from one datanode to
another. The replication data transfer happens directly between datanodes and the data never passes
through the namenode.
Does MapReduce programming model provide a way for reducers to communicate with each
other? In a MapReduce job can a reducer communicate with another reducer?
Nope, MapReduce programming model does not allow reducers to communicate with each other.
Reducers run in isolation.

33) Can I set the number of reducers to zero?


Yes, Setting the number of reducers to zero is a valid configuration in Hadoop. When you set
the reducers to zero no reducers will be executed, and the output of each mapper will be stored to a
separate file on HDFS. [This is different from the condition when reducers are set to a number greater
than zero and the Mappers output (intermediate data) is written to the Local file system(NOT HDFS)
of each mappter slave node.]

34) Where is the Mapper Output (intermediate kay-value data) stored ?


The mapper output (intermediate data) is stored on the Local file system (NOT HDFS) of each
individual mapper nodes. This is typically a temporary directory location which can be setup in config
by the hadoop administrator. The intermediate data is cleaned up after the Hadoop Job completes.

35) What are combiners? When should I use a combiner in my MapReduce Job?
Combiners are used to increase the efficiency of a MapReduce program. They are used to
aggregate intermediate map output locally on individual mapper outputs. Combiners can help you
reduce the amount of data that needs to be transferred across to the reducers. You can use your reducer
code as a combiner if the operation performed is commutative and associative. The execution of
combiner is not guaranteed, Hadoop may or may not execute a combiner. Also, if required it may
execute it more then 1 times. Therefore your MapReduce jobs should not depend on the combiners
execution.

36) What is Writable & WritableComparable interface?


org.apache.hadoop.io.Writable is a Java interface. Any key or value type in the Hadoop Map-Reduce
framework implements this interface. Implementations typically implement a static read(DataInput)
method which constructs a new instance, calls readFields(DataInput) and returns the instance.
org.apache.hadoop.io.WritableComparable is a Java interface. Any type which is to be used as a key in
the Hadoop Map-Reduce framework should implement this interface. WritableComparable objects can
be compared to each other using Comparators.

37) What is the Hadoop MapReduce API contract for a key and value Class?
The Key must implement the org.apache.hadoop.io.WritableComparable interface.
The value must implement the org.apache.hadoop.io.Writable interface.

38) What is a IdentityMapper and IdentityReducer in MapReduce ?


org.apache.hadoop.mapred.lib.IdentityMapper Implements the identity function, mapping
inputs directly to outputs. If MapReduce programmer do not set the Mapper Class using
JobConf.setMapperClass then IdentityMapper.class is used as a default value.
org.apache.hadoop.mapred.lib.IdentityReducer Performs no reduction, writing all input values directly
to the output. If MapReduce programmer do not set the Reducer Class using JobConf.setReducerClass
then IdentityReducer.class is used as a default value.

39) What is the meaning of speculative execution in Hadoop? Why is it important?


Speculative execution is a way of coping with individual Machine performance. In large
clusters where hundreds or thousands of machines are involved there may be machines which are not
performing as fast as others. This may result in delays in a full job due to only one machine not
performaing well. To avoid this, speculative execution in hadoop can run multiple copies of same map
or reduce task on different slave nodes. The results from first node to finish are used.

40) When is the reducers are started in a MapReduce job?


In a MapReduce job reducers do not start executing the reduce method until the all Map jobs
have completed. Reducers start copying intermediate key-value pairs from the mappers as soon as they
are available. The programmer defined reduce method is called only after all the mappers have
finished.
If reducers do not start before all mappers finish then why does the progress on MapReduce
job shows something like Map(50%) Reduce(10%)? Why reducers progress percentage is displayed
when mapper is not finished yet?
Reducers start copying intermediate key-value pairs from the mappers as soon as they are
available. The progress calculation also takes in account the processing of data transfer which is done
by reduce process, therefore the reduce progress starts showing up as soon as any intermediate key-
value pair for a mapper is available to be transferred to reducer. Though the reducer progress is
updated still the programmer defined reduce method is called only after all the mappers have finished.

41) What is HDFS ? How it is different from traditional file systems?


HDFS, the Hadoop Distributed File System, is responsible for storing huge data on the cluster.
This is a distributed file system designed to run on commodity hardware. It has many similarities with
existing distributed file systems. However, the differences from other distributed file systems are
significant.
HDFS is highly fault-tolerant and is designed to be deployed on low-cost hardware.
HDFS provides high throughput access to application data and is suitable for applications that
have large data sets.
HDFS is designed to support very large files. Applications that are compatible with HDFS are
those that deal with large data sets. These applications write their data only once but they read it one or
more times and require these reads to be satisfied at streaming speeds. HDFS supports write-once-
read-many semantics on files.

42) What is HDFS Block size? How is it different from traditional file system block size?
In HDFS data is split into blocks and distributed across multiple nodes in the cluster. Each block is
typically 64Mb or 128Mb in size. Each block is replicated multiple times. Default is to replicate each
block three times. Replicas are stored on different nodes. HDFS utilizes the local file system to store
each HDFS block as a separate file. HDFS Block size can not be compared with the traditional file
system block size.

43) What is a NameNode? How many instances of NameNode run on a Hadoop Cluster?
The NameNode is the centerpiece of an HDFS file system. It keeps the directory tree of all
files in the file system, and tracks where across the cluster the file data is kept. It does not store the
data of these files itself. There is only One NameNode process run on any hadoop cluster. NameNode
runs on its own JVM process. In a typical production cluster its run on a separate machine. The
NameNode is a Single Point of Failure for the HDFS Cluster. When the NameNode goes down, the
file system goes offline. Client applications talk to the NameNode whenever they wish to locate a file,
or when they want to add/copy/move/delete a file. The NameNode responds the successful requests by
returning a list of relevant DataNode servers where the data lives.

44) What is a DataNode? How many instances of DataNode run on a Hadoop Cluster?
A DataNode stores data in the Hadoop File System HDFS. There is only One DataNode
process run on any hadoop slave node. DataNode runs on its own JVM process. On startup, a
DataNode connects to the NameNode. DataNode instances can talk to each other, this is mostly during
replicating data.

45) How the Client communicates with HDFS?


The Client communication to HDFS happens using Hadoop HDFS API. Client applications
talk to the NameNode whenever they wish to locate a file, or when they want to add/copy/move/delete
a file on HDFS. The NameNode responds the successful requests by returning a list of relevant
DataNode servers where the data lives. Client applications can talk directly to a DataNode, once the
NameNode has provided the location of the data.

46) How the HDFS Blocks are replicated?


HDFS is designed to reliably store very large files across machines in a large cluster. It stores
each file as a sequence of blocks; all blocks in a file except the last block are the same size. The blocks
of a file are replicated for fault tolerance. The block size and replication factor are configurable per
file. An application can specify the number of replicas of a file. The replication factor can be specified
at file creation time and can be changed later. Files in HDFS are write-once and have strictly one
writer at any time. The NameNode makes all decisions regarding replication of blocks. HDFS uses
rack-aware replica placement policy. In default configuration there are total 3 copies of a datablock on
HDFS, 2 copies are stored on datanodes on same rack and 3rd copy on a different rack.

You might also like