UNIT 4 Notes by ARUN JHAPATE

Download as pdf or txt
Download as pdf or txt
You are on page 1of 20

UNIT-4

Creating the components of Hadoop Map Reduce jobs


Hadoop components and Daemons

Every framework needs two important components:


 Storage: The place where code, data, executables etc are stored.
 Compute: The logic by which code is executed and data is acted upon.

Two main components of Hadoop framework are also on the same lines:

1. HDFS: This is the storage in which all the data is stored. It is a file system that is required by
Hadoop to run various map reduce jobs.
2. MapReduce: This is the compute logic based on which Hadoop runs. MapReduce is the
fundamental algorithm behind the success of Hadoop. This provides very fast processing.

There are 2 layers in Hadoop – HDFS layer and Map-Reduce layer and 5 daemons which run on Hadoop
in these 2 layers. Daemons are the processes that run in the background.

1) Namenode – It runs on master node for HDFS.

2) Datanode – It runs on slave nodes for HDFS.

3) Resource Manager– It runs on YARN master node for MapReduce.

4) Node Manager – It runs on YARN slave node for MapReduce.

5) Secondary-namenode – It is back-up for namenode and runs on a different system (other than master
and slave nodes but can be configured on slave node also)

These 5 daemons run for Hadoop to be functional.


HDFS provides the storage layer and Map Reduce provides the computation layer in hadoop. There is 1
namenode and several datanodes on storage layer ie HDFS. Similarly there is a resource manager and
several node managers on computation layer ie Map Reduce.
Namenode (HDFS) and resource manager (Map-Reduce) run on master while datanodes (HDFS) and
node manager (Map-Reduce) run on slaves.

Job Tracker
o Is a service with Hadoop system
o It is like a scheduler
o Client application is sent to the JobTracker
o It talks to the Namenode, locates the TaskTracker near the data (remember the data has been
populated already).

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
o Job Tracker moves the work to the chosen Task Tracker node.
o Task Tracker monitors the execution of the task and updates the Job Tracker through heartbeat. Any
failure of a task is detected through missing heartbeat.
o Intermediate merging on the nodes are also taken care of by the Job Tracker

TaskTracker
◗ It accepts tasks (Map, Reduce, Shuffle, etc.) from Job Tracker
◗ Each Task Tracker has a number of slots for the tasks; these are execution slots available on
themachine or machines on the same rack;
◗ It spawns a sepearte JVM for execution of the tasks;
◗ It indicates the number of available slots through the hearbeat message to the JobTracker

The Execution Framework


◗ A MapReduce program, referred to as a job, consists of code for mappers, reducers and others
packaged together with configuration parameters (such as IO locations).
◗ The developer submits the job to the submission node of a cluster (in Hadoop, this is called the
job tracker).
◗ Execution framework (sometimes called the \runtime") takes care of everything else: it
transparently handles all other aspects of distributed code execution, on clusters ranging from a
single node to a few thousand nodes.

Responsibilities of the Execution Framework


◗ Scheduling
◦ Each MapReduce job is divided into smaller units called tasks
◦ Essentially the key space is shared among the # of Mappers
◦ Maintain a queue in case # tasks> #mappers , reducers etc.
◦ Coordination among multiple jobs and users.
◗ Data/code co-location:
◗ Synchronization
◗ Error and fault handling
◗ Partitioners, Combiners

JobTracker

The JobTracker is the service within Hadoop that farms out MapReduce tasks to specific nodes in the
cluster, ideally the nodes that have the data, or at least are in the same rack.

1. Client applications submit jobs to the Job tracker.


2. The JobTracker talks to the NameNode to determine the location of the data
3. The JobTracker locates TaskTracker nodes with available slots at or near the data
4. The JobTracker submits the work to the chosen TaskTracker nodes.
5. The TaskTracker nodes are monitored. If they do not submit heartbeat signals often enough, they are
deemed to have failed and the work is scheduled on a different TaskTracker.
6. A TaskTracker will notify the JobTracker when a task fails. The JobTracker decides what to do then: it
may resubmit the job elsewhere, it may mark that specific record as something to avoid, and it may may
even blacklist the TaskTracker as unreliable.
7. When the work is completed, the JobTracker updates its status.
8. Client applications can poll the JobTracker for information.

The JobTracker is a point of failure for the Hadoop MapReduce service. If it goes down, all running jobs
are halted.

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
JobTracker and TaskTracker
JobTracker and TaskTracker are 2 essential process involved in MapReduce execution in MRv1 (or
Hadoop version 1). Both processes are now deprecated in MRv2 (or Hadoop version 2) and replaced by
Resource Manager, Application Master and Node Manager Daemons.

Job Tracker –
1. JobTracker process runs on a separate node and not usually on a DataNode.
2. JobTracker is an essential Daemon for MapReduce execution in MRv1. It is replaced by
Resource Manager/Application Master in MRv2.
3. JobTracker receives the requests for MapReduce execution from the client.
4. JobTracker talks to the NameNode to determine the location of the data.
5. JobTracker finds the best TaskTracker nodes to execute tasks based on the data locality
(proximity of the data) and the available slots to execute a task on a given node.
6. JobTracker monitors the individual TaskTrackers and the submits back the overall status of the
job back to the client.
7. JobTracker process is critical to the Hadoop cluster in terms of MapReduce execution.
8. When the JobTracker is down, HDFS will still be functional but the MapReduce execution can
not be started and the existing MapReduce jobs will be halted.
Task Tracker –
1. TaskTracker runs on DataNode. Mostly on all DataNodes.
2. TaskTracker is replaced by Node Manager in MRv2.
3. Mapper and Reducer tasks are executed on DataNodes administered by TaskTrackers.
4. TaskTrackers will be assigned Mapper and Reducer tasks to execute by JobTracker.
5. TaskTracker will be in constant communication with the JobTracker signalling the progress of the
task in execution.
6. TaskTracker failure is not considered fatal. When a TaskTracker becomes unresponsive,
JobTracker will assign the task executed by the TaskTracker to another node.

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
Google File System
Many datasets are too large to fit on a single machine. Unstructured data may not be easy
to insert into a database. Distributed file systems store data across a large number of
servers. The Google File System (GFS) is a distributed file system used by Google in the
early 2000s. It is designed to run on a large number of cheap servers.

The purpose behind GFS was the ability to store and access large files, and by large I
mean files that can’t be stored on a single hard drive. The idea is to divide these files into
manageable chunks of 64 MB and store these chunks on multiple nodes, having a
mapping between these chunks also stored inside the file system.

GFS assumes that it runs on many inexpensive commodity components that can often fail,
therefore it should consistently perform failure monitoring and recovery. It can store many
large files simultaneously and allows for two kinds of reads to them: small random reads
and large streaming reads. Instead of rewriting files, GFS is optimized towards appending
data to existing files in the system.

The GFS master node stores the index of files, while GFS chunk servers store the actual
chunks in the filesystems on multiple Linux nodes. The chunks that are stored in the GFS
are replicated, so the system can tolerate chunk server failures. Data corruption is also
detected using checksums, and GFS tries to compensate for these events as soon as
possible.
Here’s a brief history of the Google File System:

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
60

 2003: Google File System paper was released.

 2004: MapReduce framework was released. It is a programming model and an


associated implementation for processing and generating big data sets with a
parallel, distributed algorithm on a cluster.

 2006: Hadoop, which provides a software framework for distributed storage and
processing of big data using the MapReduce programming model, was created.
All the modules in Hadoop are designed with a fundamental assumption that
hardware failures are common occurrences and should be automatically handled
by the framework.

 2007: HBase, an open-source, non-relational, distributed database modeled after


Google’s Bigtable and written in Java, was born. It is developed as part of the
Apache Hadoop project and runs on top of HDFS.

 2008: Hadoop wins the TeraSort contest. TeraSort is a popular benchmark that
measures the amount of time to sort one terabyte of randomly distributed data on
a given computer system

 2009: Spark, an open-source distributed general purpose cluster-computing


framework, was built. It provides an interface for programming entire clusters
with implicit data parallelism and fault tolerance.

 2010: Hive, a data warehouse software project built on top of Apache Hadoop for
providing data query and analysis, was created. It gives a SQL-like interface to
query data stored in various databases and file systems that integrate with
Hadoop.

Hadoop Distributed File System


The Hadoop Distributed File System (HDFS) is a distributed file system designed to run
on commodity hardware. It has many similarities with existing distributed file systems.
However, the differences from other distributed file systems are significant. HDFS is
highly fault-tolerant and is designed to be deployed on low-cost hardware. HDFS provides
high throughput access to application data and is suitable for applications that have large
data sets. In fact, deployments of more than 1000s of nodes of HDFS exist.

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
61

In HDFS, files are divided into blocks, and file access follows multi-reader, single-writer
semantics. To meet the fault-tolerance requirement, multiple replicas of a block are stored
on different DataNodes. The number of replicas is called the replication factor. When a
new file block is created, or an existing file is opened for append, the HDFS write
operation creates a pipeline of DataNodes to receive and store the replicas.
(The replication factor generally determines the number of DataNodes in the pipeline.)
Subsequent writes to that block go through the pipeline. For reading operations the client
chooses one of the DataNodes holding copies of the block and requests a data transfer
from it.

MapReduce
MapReduce is a programming model which consists of
writing map and reduce functions. Map accepts key/value pairs and produces a sequence
of key/value pairs. Then, the data is shuffled to group keys together. After that, we reduce
the accepted values with the same key and produce a new key/value pair.

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
62

During the execution, the Map tasks are assigned to machines based on input data. Then
those Map tasks produce their output. Next, the mapper output is shuffled and sorted.
Then, the Reduce tasks are scheduled and run. The Reduce output is finally stored to disk.

MapReduce in Python
Let’s walk through some code. The following program is from Michael Noll’s tutorial on
writing a Hadoop MapReduce program in Python.

The code below is the Map function. It will read data from STDIN, split it into words and
output a list of lines mapping words to their (intermediate) counts to STDOUT. The Map
script will not compute an (intermediate) sum of a word’s occurrences though. Instead, it
will output <word> 1 tuple immediately — even though a specific word might occur
multiple times in the input. In our case, we let the subsequent Reduce step do the final sum
count.
import sys
# input comes from STDIN (standard input)for line in sys.stdin:
# remove leading and trailing whitespace line = line.strip()
# split the line into words words = line.
split()
# increase counters for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
# tab-delimited; the trivial word count is 1 print '%s\t%s' % (word, 1)

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
63

The code below is the Reduce function. It will read the results from the map step from
STDIN and sum the occurrences of each word to a final count, and then output its results
to STDOUT.
import syscurrent_word = Nonecurrent_count = 0word = None# input comes from STDINfor
line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input
we got from mapper.py word, count = line.split('\t', 1) # convert count (currently a string) to
int try: count = int(count) except ValueError: # count was not a number, so silently #
ignore/discard this line continue

MapReduce Example
Here is a real-world use case of MapReduce:

Facebook has a list of friends (note that friends are a bi-directional thing on Facebook. If
I’m your friend, you’re mine). They also have lots of disk space and they serve hundreds
of millions of requests every day. They’ve decided to pre-compute calculations when they
can to reduce the processing time of requests. One common processing request is the
―You and Joe have 230 friends in common‖ feature. When you visit someone’s profile,
you see a list of friends that you have in common. This list doesn’t change frequently so
it’d be wasteful to recalculate it every time you visited the profile (sure you could use a
decent caching strategy, but then I wouldn’t be able to continue writing about MapReduce
for this problem). We’re going to use MapReduce so that we can calculate every one’s
common friends once a day and store those results. Later on, it’s just a quick lookup.
We’ve got lots of disk, it’s cheap.

Assume the friends are stored as Person->[List of Friends], our friends list is then:
A -> B C D
B -> A C D E
C -> A B D E
D -> A B C E
E -> B C D

Each line will be an argument to a mapper. For every friend in the list of friends, the
mapper will output a key-value pair. The key will be a friend along with the person. The
value will be the list of friends. The key will be sorted so that the friends are in order,
causing all pairs of friends to go to the same reducer. This is hard to explain with text, so
let’s just do it and see if you can see the pattern. After all the mappers are done running,
you’ll have a list like this:
For map(A -> B C D) :(A B) -> B C D(A C) -> B C D(A D) -> B C DF or map(B -> A C D
E) : (Note that A comes before B in the key)(A B) -> A C D E(B C) -> A C D E(B D) -> A C
D E(B E) -> A C D EFor map(C -> A B D E) :(A C) -> A B D E(B C) -> A B D E(C D) -> A
B D E(C E) -> A B D EFor map(D -> A B C E) :(A D) -> A B C E(B D) -> A B C E(C D) ->
A B C E(D E) -> A B C EAnd finally for map(E -> B C D):(B E) -> B C D(C E) -> B C D(D
E) -> B C DBefore we send these key-value pairs to the reducers, we group them by their

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
keys and get:(A B) -> (A C D E) (B C D)(A C) -> (A B D E) (B C D)(A D) -> (A B C E) (B
C D)(B C) -> (A B D E) (A C D E)(B D) -> (A B C E) (A C D E)(B E) -> (A C D E) (B C
D)(C D) -> (A B C E) (A B D E)(C E) -> (A B D E) (B C D)(D E) -> (A B C E) (B C D)

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
64

Each line will be passed as an argument to a reducer. The reduce function will simply
intersect the lists of values and output the same key with the result of the intersection. For
example, reduce((A B) -> (A C D E) (B C D)) will output (A B) : (C D) and means that
friends A and B have C and D as common friends.

The result after reduction is:


(A B) -> (C D)(A C) -> (B D)(A D) -> (B C)(B C) -> (A D E)(B D) -> (A C E)(B E) -> (C
D)(C D) -> (A B E)(C E) -> (B D)(D E) -> (B C)

Now when D visits B’s profile, we can quickly look up (B D) and see that they have three
friends in common, (A C E).

MapReduce in MongoDB
We can also use Map-Reduce in MongoDB via the mapReduce database command.
Consider the following map-reduce operation:

In this map-reduce operation, MongoDB applies the map phase to each input document
(i.e. the documents in the collection that match the query condition). The map function
emits key-value pairs. For those keys that have multiple values, MongoDB applies
the reduce phase, which collects and condenses the aggregated data. MongoDB then
stores the results in a collection. Optionally, the output of the reduce function may pass
through a finalize function to further condense or process the results of the aggregation.

All map-reduce functions in MongoDB are JavaScript and run within the mongod process.
Map-reduce operations take the documents of a single collection as the input and can
perform any arbitrary sorting and limiting before beginning the map
stage. mapReduce can return the results of a map-reduce operation as a document or may
write the results to collections. The input and the output collections may be sharded.

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
65

Apache Spark
So as we discussed above, MapReduce is an iterative process. Sometimes an algorithm
cannot be executed in a single MapReduce job. To resolve that, MapReduce jobs can be
chained together to provide a solution. This often happens with algorithms which iterate
until convergence (such as k-means, PageRank, etc.). But the big disadvantage is that the
Reduce output must be read from disk again with each new job; and in some cases, the
input data is read from disk many times. Thus, there is no easy way to share the work done
between iterations.

In Apache Spark, the computation model is much richer than just MapReduce.
Transformations on input data can be written lazily and batched together. Intermediate
results can be cached and reused in future calculations. There is a series of
lazy transformations which are followed by actions that force evaluation of all
transformations. Notably, each step in the Spark model produces a resilient distributed
dataset (RDD). Intermediate results can be cached on memory or dis, optionally serialized.

For each RDD, we keep a lineage, which is the operations which created it. Spark can
then recompute any data which is lost without storing to disk. We can still decide to keep
a copy in memory or on disk for performance.

Let’s delve into the Spark model through an example. Say we have the code below that
reads in data .csv file and do data wrangling.

The code highlighted below perform transformations of the data. Spark transforms
include these function calls: distinct(), filter(fn), intersection(other), join(other), map(fn),
union(other).

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
66

The code highlighted below perform actions of the data. Spark actions include these
function calls: collect(), count(), first(), take(n), reduce(fn), foreach(fn), saveAsTextFile().

We can represent this Spark model via a Task Directed Acyclic Graph — as illustrated
below:

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
67

The transformations happen on the left branch:

The actions happen on the right branch:

Let’s say we want to implement k-means clustering in Spark. The process follows like
this:

1. Load the data and ―persist.‖

2. Randomly sample for initial clusters.

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
68

3. Broadcast the initial clusters.

4. Loop over the data and find the new centroids.

5. Repeat from step 3 until the algorithm converges.

Unlike MapReduce, we can keep the input in memory and load them once. The
broadcasting in step 3 means that we can quickly send the centers to all machines. All
cluster assignments do not need to be written to disk every time.

Spark applications run as independent sets of processes on a cluster, coordinated by the


SparkContext object in your main program (called the driver program).

Specifically, to run on a cluster, the SparkContext can connect to several types of cluster
managers (either Spark’s own standalone cluster manager, Mesos or YARN), which
allocate resources across applications. Once connected, Spark acquires executors on nodes
in the cluster, which are processes that run computations and store data for your
application. Next, it sends your application code (defined by JAR or Python files passed to
SparkContext) to the executors. Finally, SparkContext sends tasks to the executors to run.

Seen above is the Spark architecture. There are several useful things to note about this
architecture:

1. Each application gets its own executor processes, which stay up for the duration
of the whole application and run tasks in multiple threads. This has the benefit of
isolating applications from each other, on both the scheduling side (each driver
schedules its own tasks) and executor side (tasks from different applications run
in different JVMs). However, it also means that data cannot be shared across
different Spark applications (instances of SparkContext) without writing it to an
external storage system.

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
69

2. Spark is agnostic to the underlying cluster manager. As long as it can acquire


executor processes, and these communicate with each other, it is relatively easy
to run it even on a cluster manager that also supports other applications (e.g.
Mesos/YARN).

3. The driver program must listen for and accept incoming connections from its
executors throughout its lifetime. As such, the driver program must be network
addressable from the worker nodes.

4. Because the driver schedules tasks on the cluster, it should be run close to the
worker nodes, preferably on the same local area network. If you’d like to send
requests to the cluster remotely, it’s better to open an RPC to the driver and have
it submit operations from nearby than to run a driver far away from the worker
nodes.

The system currently supports several cluster managers:

 Standalone — a simple cluster manager included with Spark that makes it easy to
set up a cluster.

 Apache Mesos — a general cluster manager that can also run Hadoop MapReduce
and service applications.

 Hadoop YARN — the resource manager in Hadoop 2.

 Kubernetes — an open-source system for automating deployment, scaling, and


management of containerized applications.

Overall, Apache Spark is much more flexible since we can also run distributed SQL
queries. It also contains many libraries for machine learning, stream processing, etc.
Furthermore, Spark can connect to a number of different data sources.

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
70

Other Apache Platforms


Apache Hive is a data warehouse software built on top of Hadoop. It converts SQL
queries to MapReduce, Spark, etc. The data here is stored in files on HDFS. An example
of Hive Input is shown below:

Apache Flink is another system designed for distributed analytics like Apache Spark. It
executes everything as a stream. Iterative computations can be written natively with cycles
in the data flow. It has a very similar architecture to Spark, including (1) a client that
optimizes and constructs data flow graph, (2) a job manager that receives jobs, and (3) a
task manager that executes jobs.

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
71

Apache Pig is thus another platform for analyzing large datasets. It provides the Pig Latin
language which is easy to write but runs as MapReduce. The PigLatin code is shorter and
faster to develop than the equivalent Java code.

Here is a Pig Latin code example that counts word:

Here is a Pig Latin code example that visits Page:

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
72

Apache Pig is installed locally and can send jobs to any Hadoop cluster. It is slower than
Spark, but doesn’t require any software on the Hadoop cluster. We can write user-defined
functions for more complex operations (intersection, union, etc.)

Apache HBase is quite similar to Apache Cassandra — the wide column store that we
discussed above. It is essentially a large sorted map that we can update. It uses Apache
Zookeeper to ensure consistent updates to the data.

Next, I want to mention Apache Calcite, a framework that can parse/optimize SQL
queries and process data. It powers query optimization in Flink, Hive, Druid, and others. It
also provides many pieces needed to implement a database engine. The following
companies and projects are powered by Calcite:

More importantly, Calcite can connect to a variety of database systems including Spark,
Druid, Elastic Search, Cassandra, MongoDB, and Java JDBC.

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP
73

The conventional architecture looks like the one below:

On the other hand, the Calcite architecture takes away the client, server, and parser,
and lets the optimizer do the heavy work of processing metadata. The Calcite
optimizer uses more than 100 rewrite rules to optimize queries. Queries use
relational algebra but can operate on non-relational algebra. Calcite will aim to find
the lowest cost way to execute aquery.

And that’s the end of this short post on distributed data processing! If you’r

CS- 503 (A) Data Analytics Notes By –ARUN KUMAR JHAPATE, CSE, SIRT, BHOPAL, MP

You might also like