Adaptive Dynamic Data Placement Algorithm

Download as pdf or txt
Download as pdf or txt
You are on page 1of 14

Journal of Advances in Computer Engineering and Technology, 2(4) 2016

Adaptive Dynamic Data Placement Algorithm for


Hadoop in Heterogeneous
Environments
Avishan Sharafi1, Ali Rezaee2

Received (2016-07-16)
Accepted (2016-12-11)

Abstract - Hadoop MapReduce framework is I. INTRODUCTION


an important distributed processing model for

I
large-scale data intensive applications. The current N recent years, the World Wide Web has
Hadoop and the existing Hadoop distributed file been adopted as a very useful platform for
system’s rack-aware data placement strategy in
developing data-intensive applications, since
MapReduce in the homogeneous Hadoop cluster
assume that each node in a cluster has the same
the communication paradigm of the Web is
computing capacity and a same workload is assigned sufficiently open and powerful. The search
to each node. Default Hadoop doesn’t consider engine, webmail, data mining and social network
load state of each node in distribution input data services are currently indispensable data-
blocks, which may cause inappropriate overhead intensive applications. These applications need
and reduce Hadoop performance, but in practice, data from a few gigabytes to several terabytes or
such data placement policy can noticeably reduce even petabytes.
MapReduce performance and may increase extra Google leverages the MapReduce model to
energy dissipation in heterogeneous environments. process approximately twenty petabytes of data
This paper proposes a resource aware adaptive
per day in a parallel programming models[1].
dynamic data placement algorithm (ADDP) .With
ADDP algorithm, we can resolve the unbalanced
Hadoop MapReduce is an attractive model for
node workload problem based on node load status. parallel data processing in high-performance
The proposed method can dynamically adapt and cluster computing environments. MapReduce
balance data stored on each node based on node model is primarily developed by Yahoo [2][site
load status in a heterogeneous Hadoop cluster. apache]. Hadoop is used by Yahoo servers, where
Experimental results show that data transfer hundreds of terabytes of data are generated on at
overhead decreases in comparison with DDP and least 10,000 cores[3]. Facebook uses Hadoop to
traditional Hadoop algorithms. Moreover, the process more than 15 terabytes of data per day.
proposed method can decrease the execution time In addition to Yahoo and Facebook, Amazon and
and improve the system’s throughput by increasing
Last.fm are employing Hadoop to manage the
resource utilization
Index Terms — Hadoop, MapReduce, Resource-
massive huge amount of data [1].
aware, Data placement, Heterogeneous The scalability of MapReduce is proven to be
high, because in the MapReduce programming
model the job will be divided into a series of small
tasks and run on multiple machines in a large-scale
cluster[4]. MapReduce allows a programmer
with no specific knowledge of distributed
programming to create his/her MapReduce
functions running in parallel across multiple
1- Department of Computer Engineering, Islamic Azad
University South Tehran Branch, Tehran, Iran.(Avishan. nodes in the cluster. MapReduce automatically
[email protected]) handles the gathering of results across the multiple
2- Department of Computer Engineering ,Science and nodes and return a single result or set of results
Research Branch, Islamic Azad University, Tehran, Iran. to server[4]. More importantly, the MapReduce
18 Journal of Advances in Computer Engineering and Technology, 2(4) 2016

platform can offer fault tolerance. MapReduce partitions a job to some tasks and HDFS partitions
model can automatically handle failures and it is input data into blocks, and assigns them to every
fault tolerance mechanisms. When a node fails, node in a cluster. Hadoop is based on distributed
MapReduce moves tasks, which is run on the architecture it means HadoopMapreduce adopts
failed node, to be rerun on another node.[5] master/slave architecture, in which a master
In the Hadoop architecture, data locality is node controls a group of slave nodes on which
one of the important factors affecting Hadoop the Map and Reduce functions run in parallel.
applications performance. However, in a Slaves are nodes that process tasks that master
heterogeneous environment, the data required assigns to them .In the MapReduce model, the
for performing a task is often nonlocal ,which master is called JobTracker, and each slave is
affects the performance of Hadoop platform[4]. called TaskTracker. In the HDFS, the master
Data placement decision of Hadoop distributed is called NameNode, and each slave is called
file system (HDFS) is very important for the DataNode. Master is responsible for distribution
data locality which is a determining factor for data blocks and assigning tasks slot to every
the MapReduce performance and is a primary node in Hadoop cluster. The default Hadoop
criterion for task scheduling of MapReduce assumes that the node computing capacity and
model. The existing HDFS’s rack- aware of storage capacity are the same in the cluster such
data placement strategy and replication scheme a homogeneous environment, the data placement
works well with MapReduce framework in strategy of Hadoop can boost the efficiency of
homogeneous Hadoop clusters[6], but in practice, the MapReduce model, but in a heterogeneous
such data placement policy can noticeably environment, such data placement has many
reduce heterogeneous environment performance problems [1].
and may cause increasingly the overhead of
transferring unprocessed data from slow nodes to 2. MapReduce
fast nodes [7]. The rest of this paper is organized
as follows. In Section II, the Hadoop system MapReduce is a parallel programming
architecture, MapReduce model, HDFS, and model used in clusters that have numerous
the motivation for this study is reported. Section nodes and use computing resources to manage
III presents ADDP algorithm, mathematics large amounts of data in parallel. MapReduce is
formulas, variable description and scenarios. proposed by Google in 2004. In the MapReduce
Experiments and performance analysis are model, an application should process is called a
presented in Section IV. Section V concludes this “job”. Hadoop divides the input of a MapReduce
paper by summarizing the main contributions of job into some pieces called “map tasks” and
this paper and commenting on future directions “reduce tasks”, in which the map-tasks run the
of our work. map function and the reduce tasks run the reduce
function. Map function processes input tasks and
II. RELATED WORK AND MOTIVATION data assigned by the Master node and produce
intermediate (key, value) pairs. Based on (key,
1. Hadoop value) pairs which are generated by map function
processes, the reduce function then merges, sorts,
Hadoop is a successful and well-known and returns the result. The MapReduce model is
implementation of the MapReduce model, which based on “master/slave” concept. It distributes a
is open-source and supported by the Apache large amount of input data to many processing
Software. nodes to perform parallel processing, which
Hadoop consists of two main components: reduces the execution time and improves the
the MapReduce programming model and the performance. Input data are divided into many
Hadoop’s Distributed File System HDFS [4], of the same size of data blocks; these blocks
in which MapReduce is responsible for parallel are then assigned to nodes that perform the
processing and the HDFS is responsible for data same map function in parallel. After the map
management. In the Hadoop system, MapReduce function is performed, the generated output is an
and HDFS are responsible for management intermediate several key, value pairs. The nodes
parallel process jobs and management data, that perform the reduce function obtain these
respectively. JobTracker module in Mapreduce intermediate data, and finally generate the output
Journal of Advances in Computer Engineering and Technology, 2(4) 2016 19

data[8] . The MapReduce model was conceived entire job execution time to become extended.
with the principle that “moving computation is A large number of moved data affects Hadoop
much cheaper than moving data[5] . performance. To improve the performance of
Hadoop in heterogeneous clusters, this paper
3. HDFS aims to minimize data movement between slow
and fast nodes. This goal can be achieved by a
HDFS is based on the Google File System data placement scheme that distributes and stores
which is used with the MapReduce model. data across multiple heterogeneous nodes based
It consists of a NameNode module in the on their computing capacities. Data movement
MasterNode and many DataNodes modules in can be reduced if each node is assigned to the
the slaveNodes. The NameNode is responsible workload that is based on node’s data processing
for the management and storage of the entire file speed and node’s system load[4, 7].
system and file information (such a namespace Some task scheduling strategies have been
and metadata). NameNode is responsible for proposed in Hadoop framework in recent
partition the input files that are written in HDFS years. Reference [9] proposed an Adaptive
into many data blocks. These blocks are the same Task Scheduling Strategy Based on Dynamic
size with default size of 64 MB. HDFS allocates Workload Adjustment called (ATSDWA). Each
these data blocks to every DataNode. DataNodes tasktracker collects its own load information
are responsible for storing and processing these and reports it to jobtracker periodically, so
data blocks and sending the result to NameNode. tasktrackers can adapt to the change of load at
Hadoop is fault tolerance and makes three replicas runtime, obtaining tasks in accordance with the
of each data block for the files that are stored on computing abilities. Reference [4] proposed
HDFS. HDFS’s replica placement strategy is to data placement algorithm (DDP) which
put one replica of the block on one node in the distributes input data blocks based on each node
local rack, another on a different node in the same computing capacity in a heterogeneous Hadoop
rack, and the third on a node in some other rack. cluster. Reference[10]proposed a resource
When failure happens to a node, these replicas aware scheduling algorithm in which algorithm
become very important and they should process classifies the type of work and node workload
instead of lost data blocks [1]. to I/O bound jobs and CPU-bound jobs. Each
workload assigns to a group of nodes. Algorithm
4. Background and motivation selects appropriate tasks to run according to the
workload of the node. Reference[11] explored
The Hadoop default data placement strategy an extensional MapReduce task scheduling
assumes that the computing capacity and storage algorithm for deadline constraints (MTSD) for
capacity of each node in the cluster is the same Hadoop platforms, which allows the user to
.Each node is assigned the same workload. Data specify a job’s deadline and finish it before the
placement strategy of Hadoop can boost the deadline. Reference [6] proposed a novel data
efficiency of the MapReduce model, but in a placement strategy (SLDP) for heterogeneous
heterogeneous environment, such data placement Hadoop clusters. That algorithm changes
has many problems. In a heterogeneous traditional Hadoop data block replication based
environment, the difference in nodes computing on data hotness. SLDP adopts a heterogeneity-
capacity may cause load imbalance. The reason aware algorithm to divide various nodes into
is that different computing capacities between several virtual storage tiers firstly, and then
nodes cause different task execution time, so places data blocks across nodes in each storage
the faster nodes finish processing local data tiers circuitously according to the hotness of data.
blocks faster than slower nodes do. At this point,
the master assigns non-performed tasks to the III. ADDP
idle faster nodes, but these nodes do not own
the data needed for processing .The required 1. Main Idea
data should be transferred from slow nodes to
idle faster nodes through the network. Because Computing capacity of each node in the
of waiting for the data transmission time, the heterogeneous clusters is different so a load
task execution time increases. It causes the of each node changes dynamically. Therefore,
20 Journal of Advances in Computer Engineering and Technology, 2(4) 2016

adaptive dynamic data placement algorithm row in table belongs to a load state .There is
(ADDP) is presented in this paper which uses the volume load formula for each row. Every load
type and volume load of jobs for adjusting the parameters compare with every row. If a node’s
distribution of input data block. The proposed load parameters will place in any row in the
algorithm consists of two main phases. In the table, the formulas calculate data load volume
first round, NameNode distributes data blocks that is appropriate for the node’s load state to
based on each node computing capacity ratios change node’s load state and make it becomes in
in the Ratio table. In the next rounds, each normalload. The load volume formulas show how
node load parameters (average Cpu utilization, much workload should add to the current node’s
average memory utilization) are monitored and workload to make it becomes more compatible
registered in the “History table” of the node then with node’s load state so that the nodes use
NameNode calculates each node appropriate resources more efficient. The percentage of
data block numbers which is more compatible added workload is shown by λ factor. Next
with load status based on comparing each node node’s volume load average (VLAi+1) is equal
load parameters with cluster load parameters in to previous volume load average (VLAi) plus
the Load-Distribution-Patterns table. This table a percentage of the current load average.This
has load volume formulas for each load state of percentage factor is different from one row to
a node and these formulas determine the best another and depends on node load state. The
workload that is more compatible with node load percentage factors are defined in definition
situation. The workload that is calculated for lambda factor table.
each node which is more compatible with node
load state is stored in a Cluster-History table and TABLE 1
LOAD-DISTRIBUTION-PATTERNS
will distribute to nodes in the next rounds. Load volume formula Average AverageMemory load state
Cpu Usage Usage
VLAi +1 = VLAi + λ1i (VLAi ) α1≤CpuUsage ≤ α 2 β1 ≤ MemoryUsage ≤ β 2 Underload

VLAi +1 = VLAi + λ2i (VLAi ) α 2 ≤ CpuUsage ≤ α 3 β 2 ≤ MemoryUsage ≤ β3 Normal


load

VLAi +1 = VLAi − λ3 (VLAi ) α 3 ≤ CpuUsage ≤ α 4 β3 ≤ MemoryUsage ≤ β 4 Overload

TABLE 2
DEFENITION- LAMBDA-FACTOR
Lambda Load State
definition
λ11 Very Underload
Fig. 1. Shows how the Name node deploys data blocks λ12 Underload
on data nodes λ13 Underload near to
Normal
In the algorithm, there are two tables: “Ratio λ2 NormalLoad
table” and ”Load-Distribution-Patterns table”. Optimize-NormalLoad
λ21
Ratio table is a table that stores computing
λ3 Overload
capacity ratios of each node in different job type
and Load-Distribution-Patterns table stores load
parameters as defined average Cpu utilization Every load volume formula in the Load-
(AvgCpuUsage) and average memory utilization distribution-Patterns table tries to calculate
(AvgMemUsage) of the whole cluster in different workload that is more compatible with node load
load states to compare each node load parameters situation. So in general, we have six load levels
with cluster load parameters. In the cluster, we which will be explained in the next part.
assume three main states: the overloading state If a node state is” Very underload”, lambda
is defined as overload, the underloading state is factor for it in the load volume formula is ;
defined as “underload” and the normal loading so node’s workload which will be assigned to
state is defined as “normalload”. There are some current node’s workload for the next round is at
sub load states based on cluster load situation. least 50% of node current workload plus current
These sub-states are for underload state. Every workload.
Journal of Advances in Computer Engineering and Technology, 2(4) 2016 21

If a node state is” Underload”, lambda factor Total2 = user2 + sys2 + nice2 + idle2 + IOwait2 + irq2 + softirq2 (7)
for it in the load volume formula is ; so node’s
workload which will be assigned to current node’s Total Memory + Free Memory + Buffers + Cache
workload for the next round is at least 33% of MemoryUsage = (8)
Total Memory
node current workload plus current workload.
If a node state is” Underload near to
NormalLoad”, lambda factor for it in the load 3. Variable Description
volume formula is ; so node’s workload which
will be assigned to current node’s workload for In the mentioned mathematical formulation,
the next round is at least 20% of node current Tavg(i) denotes the average execution time to
workload plus current workload. complete a batch of tasks in the node(i) and Tt(i)
If a node state is” NormalLoad”, lambda shows the average time required to complete one
factor for it in the load volume formula is .When task for the node (I) [4].
node’s load state is in the normal situation, most In order to get the real-time information of
of the time there is no need to add workload to CpuUsage, we can use related parameters in
node current workload, but sometimes cluster the file /proc/stat of Linux system to calculate
administrator can add some more workload to CpuUsage. Seven pieces of items can be extracted
the node current workload to optimize node from file /proc/stat: user-mode time (user), low-
resource utilization. In this situation the lambda priority user-mode time (nice), system mode time
factore will be and the percentage of this factor (sys), idle task-mode time (idle), hard disk I/O
is based on administrator opinion. If a node state preparing time (iowait), hardware interrupting
is ” Overload”, lambda factor for it in the load time (irq), and software interrupting time (softirq).
volume formula is ; so node’s workload which File /proc/stat keeps track of a variety of different
will be assigned to current node’s workload is statistics about the system since it was restarted.
at least 10% of node current workload minus The time unit is called “Jiffy” (1/100 of Figure
current workload. axis labels are often a source of a second for×86
systems). Thus, CpuUsage is calculated with the
2. Mathematical Formulation difference of values between two sample points.
The memory utilization ( MemUsage) reflects
For making Ratio table, mathematical the state of memory in real time. The relevant
formulation 1 to 4 are needed and for making parameters are used from file /proc/meminfo of
Load-distribution-Patterns table mathematical Linux system to calculate MemUsage. There are
formulation 5 to 8 are needed four pieces of useful items extracted from file /
proc/meminfo: total memory size (MemTotal),
N free memory (MemFree), block-device buffers
∑ TaskExeTime ( i ) (1) (Buffers), and file cache (Cached). MemUsage
Tavg = i =1
can be calculated by (8) [9].
Number of Tasks

Tavg ( x)
NodeComputingCapacity = Tt (x) =
NumberOfTaskSlot
(2)

Tt (x)
NodeComputingCapacityRatio
= R= t (x) (3)
Min (x) Tt

R t (x)
BlockNumber = Total BlockNumber × n (4)
∑ R ( x)
t =1
t

cpuUsage =
( user1 + sys1 + nice1 ) − ( user2 + sys2 + nice2 )
Total1 − Total2 (5)

Total1 = user1 + sys1 + nice1 + idle1 + IOwait1 + irq1 + softirq1 (6)


Journal of Advances in Computer Engineering and Technology, 2(4) 2016 22

The followings are algorithm ADDP workflow and pseudocode.

Data Input
Job Input

Add Type and


Volume to
Check if Job Type is In the
YES NO ClusterHistory Table
Ratio Table
and Type to Ratio
Table

Is data Volume exist in


NO
ClusterHistoy Table Distribute Test DataSet and
Test TaskSet
YES

Add Type and


Volume to
ClusterHistory Table Calculate AvgCpuUsage
Distribute input Data AvgMemUsage
volume based on Block ComputingCapacity
Number Result in Ratio
ClusterHistory Table

Make a record in Ratio Table and


LoadDistributionPattern Table for job type

Calculate each node block numbers based on Ratio Table

Check whether Utilize Field in


Cluster History Table is true
Distribute calculated Data Block Numbers on
each node

Calculate all nodes


NO AvgCpuUsage
AvgMemUsage

Are All Nodes AvgCpuUsage and


AvgMemUsage Utilized based on
YES LoadDistributionPattern Table
NO

Set Utilize Fild false in


Set Utilize Fild True in
Cluster history Table
Cluster history Table

Calculate Nubmer of Distribution


Data block for each Node Based on
LoadDistributionPattern Table

Add each node blocks number


in Cluster history Table
YES

End Of Process One Job

Fig. 2. Workflow of ADDP algorithm


Journal of Advances in Computer Engineering and Technology, 2(4) 2016 23

Algorithm 1: Adaptive Dynamic Data Placement Algorithm(ADDP)


1. Find number of cluster’s node and number of each node’s core
2. Find Job Type in Ratio Table
3. IF Job Type Doesn’t exist in Ratio Table do
4. Add Job Type and Job input Volume in Cluster-History Table and add Job Type in Ratio Table
5. Distribute Test data and Test task on each cluster’s nodes
6. Make a record for Job in Ratio-Table (see Algorithm(2))
7. Make Load- Distribution-Patterns Table (see Algorithm (3))
8. for each node in the cluster do
Calculate BlockNumbers
R t (x)
9. BlockNumber= Total BlockNumber*[ ]
n

∑ R ( x)
t =1
t

10. End

11. for each node in the cluster do


Distribute the calculated Data Block Numbers
12.
13. End

14. for each node in the cluster do


Calculate the AvgCpuUsage and the AvgMemUsage
n
15.
AvgCpuUsage = ∑ AvgCpuUsage[ x]
x =1

NumberOfNodes
n

∑ AvgMemUsage
x =1
16. AvgMemUsage = NumberOfNodes

17. end

18. for each node in the cluster do


Determine Node’s LoadState by comparing Node’s AvgCpuUsage and AvgMemUsage with Load-
19.
Distribution-Patterns Table
Calculate Node’s new volume-load based on Node’s LoadState by using Load- Distribution-Patterns
20.
Table’s formulas.
21. Store Node’s new volume- load in Cluster-History Table
22. end

All Node’s AvgCpuUsage and AvgMemUsage are Utilized based on Load-Distribution-Patterns Table
23. If
do
24. Set the Utilized flag = True
25. Store utilized flag in utilized field in the Cluster-History Table
26. else
27. Set the Utilized flag = False
28. Store utilized flag in utilized field in the Cluster-History Table
29. end
Journal of Advances in Computer Engineering and Technology, 2(4) 2016 24

Algorithm 1: Adaptive Dynamic Data Placement Algorithm(ADDP) (Continue)

30. Else
31. If Input data volume exists in Cluster-History Table do
Distribute the input Data volume based on the value of Block Numbers which exist in the Cluster-
32.
History Table
33. Check utilized flag in utilized field in the Cluster-History Table
34. if The Input data volume is utilized based on utilized field in the Cluster-History Table do
35. Print “ The Cluster Is Utilized” and finish
els
36. Go to 14
e
37. end
38. Else Go to 9
39. End
40. End
41. End Of Algorithm 1

Algorithm for making Ratio-Table:

Algorithm 2: Make Ratio-Table


1. for each node do
2. Distribute TestTasks
N
3. Calculate Node’s TotalExeTime(Ttotal) = Ttotal = ∑ TaskExeTime ( i )
i =1

Calculate Node’s AverageExeTime(Tavg) = Ttotal


4. Tavg ( x) =
Number of TaskSlots
Tavg ( x)
5. Calculate Node’s ComputingCapacity (Tt) =
NumberofTaskSlots
Tt (x)
6. Calculate Node’s ComputingCapacityRatio(Rt) =
MinxTt ( x)
7. end
8. Fill Computing-Capacity-Ratio Table with (Rt) ratios
9. Add JobType in Computing-Capacity-Ratio Table (RatioTable)
10 End Of Algorithm 2
Journal of Advances in Computer Engineering and Technology, 2(4) 2016 25

Algorithm for making Load-distribution-Patterns Table:

Algorithm 3: Make Load-Distribution-Patterns Table

1. for each node in cluster do


Calculate Node’s Average CpuUsage(AvgCpuUsage)

cpuUsage =
( user1 + sys1 + nice1 ) − ( user2 + sys2 + nice2 )
Total1 − Total2
2.
Total1 = user1 + sys1 + nice1 + idle1 + IOwait1 + irq1 + softirq1
Total2 = user2 + sys2 + nice2 + idle2 + IOwait2 + irq2 + softirq2

Algorithm 3: Make Load-Distribution-Patterns Table (Continue)

Calculate Node’s Average MemoryUsage(AvgMemUsage)


Total Memory + Free Memory + Buffers + Cache
3. MemoryUsage =
Total Memory

4 End
n

5.
∑ AvgCpuUsage( x)
Calculate Cluster AverageCpuUsage ( LoadParameter ) → x =1

NumberOfNodes
n

6.
∑ AvgMemUsage( x)
Calculate Cluster AverageMemoryUsage ( LoadParameter ) → x =1

NumberOfNodes
7. Fill Load-distribution-Pattern Table with LoadParameters
8. End Of Algorithm 3
Journal of Advances in Computer Engineering and Technology, 2(4) 2016 26

When a new job is submitted to a cluster History Table and there is Load-Distribution-
and there is no information of that job in the Patterns Table for the job type. Then NameNode
NameNode, at the first round NameNode checks job input volume in the Cluster-History
distributes input data blocks based on values in table.
Ratio table. In the next rounds, the whole cluster If the input volume of the submitted job is
will be monitored by monitoring module. not on the Ratio table , it means that there is no
distribution pattern for input data in the Cluster-
4. Scenarios History table. As a result the newly written data
will be allocated to each node in accordance
In the monitoring phase in general, NameNode with the computing capacity which exists in the
monitors each node load state and compare these RatioTable. After assigning input data blocks,
states with the values in the Load-distribution- NameNode monitors and compares each node’s
Patterns table until node’s new workload which load state with the values in the Load-distribution-
is more compatible with node’s load state will Patterns table until the workload that is more
be calculated. For every node these calculated compatible with node load situation is calculated
workloads will be registerd in the Cluster- History by load formulas in the Load-distribution-
table and will be distributed to each node in the Patterns table. This workload will register for
next rounds . each node in the Cluster- History table and will
In General, based on workflow for every job distribute to nodes when that job with same data
which is submitted to the cluster, there are there input will be submitted into the cluster.
scenarios (three situations) described in next
subsection. The first scenario happens when a 4.3 Scenario 3 (Statements 30 to 35):
new type of job submits to cluster and there are no
information of job type and its input data volume If there are records of the submitted job type
in the cluster. The second scenario happens when and its load volume input data in the Ratio table and
the type of job isn’t new, but its data volume is Cluster-History table, it means that NameNode
new. The third scenario happens when the type of has all information for distributing input data
submitted job and its input data volume are not blocks to each node. NomeNode distributes input
new for the cluster. data blocks based on information that registered
in Cluster-History table. If all nodes in the cluster
4.1 Scenario 1 (Statements 1 to 16): are in normal load situation, the utilized field for
that job with its input load volume in Cluster-
When a new job is submitted to a cluster History table will set True (T), otherwise will
and data are written into the HDFS, NameNode set False (F). These histories in Cluster-History
first checks the RatioTable. These data are used table will help the NameNode to distribute input
to determine whether this type of job has been data blocks without any more effort when a job
performed. If there is no record of the job type in with the same workload is submitd to the cluster,
the RatioTable, It means this type of job is new because all information for distributing input data
and there isn’t any information of job type in the blocks is registered in the Cluster-History table.
NameNode, so for distributing input data blocks,
NameNode needs to make record of the job type IV. EXPERIMENTAL RESULT
in Ratio Table and make records of the job type
and its data volume in Cluster-History Table. This section presents the experimental
Then NameNode makes Load-Distribution- environment and the experimental results for the
Patterns Table for the job type.After distributing proposed algorithm.
input data blocks based on information in Ratio
Table, monitoring phase will start.

4.2 Scenario 2 (Statements 18 to29):

If the RatioTable has a record of the submitted


job, it means the type of job has been performed.
Thus there is a record for the job in the Cluster-
Journal of Advances in Computer Engineering and Technology, 2(4) 2016 27

TABLE 3 TABLE 5
EACH NODE SPECIFICATION RATIO TABLE EXAMPLE
Machine Operating Memor Number Disk(GB) Job Type Input Data Slave1 Slave2 Slave3
system y ofCores
(GB) α α×
1
α×
2
α×
4
1+ 2 + 4 1+ 2 + 4 1+ 2 + 4
Master Windows7 6 4 930
WordCount
Slave1 Ubuntu Linux15.0 2 1 19.8
Parametri β 2β 4β
Slave2 Ubuntu Linux15.0 3 2 19.8 c Each
node
Slave3 Ubuntu Linux15.0 6 4 583.4 workload
350 MB 50 MB 100 MB 200 MB

A TestBed was designed for testing and


comparing presented algorithm with DDP Table 4 shows ratios for WordCount job in
algorithm and Hadoop framework. WordCount the RatioTable. Table 5 is made by ratios in the
is a type of job runs to evaluate the performance RatioTable and is shown if input data block is
of the proposed algorithm in a Hadoop 350 MB, slave1 is assigned 50 MB, slave2 is
heterogeneous cluster. The WordCount is a assigned 100 MB and slave3 is assigned 200
MapReduce application running on a Hadoop MB. In proposed algorithm number of tasks that
cluster and it is an application used for counting run on each node is based on node core numbers.
the words in the input file. Slave1 has one core, so slave1 just runs 1task
The experimental environment is shown in each round .Slave2 has two cores, so it runs
in the table. 3. We use Intel Core i5-4210U 2 tasks in each round simultaneously. Slave3
1.70GHZ for salve1 and Intel Core i5-4210U has four cores, so it runs 4 tasks in each round
1.70GHZ for salve2 and Intel Core i7-4790 simultaneously. Each job processes different
3.60GHZ for salve3.We use VirtualBox 4.1.14 to input data in which the size of input data for slave
create our computing node for slave1 and salve2. 1, slave2 and slave3 are 50 MB, 100 MB and 350
In order to achieve the effect of a heterogeneous MB, respectively.
environment, the capacity of the nodes is not the
same. Different amounts of CPUs and memories Experiment 1:
were set on nodes. In total, four machines were
created: one master and three slaves. One machine In the experiment 1, a comparison is made
as the master has 4 CPUs, 6 GB of memory, and between the DDP algorithm and the ADDP
930 GB disk; one virtual machine as a slave1 has algorithm when an overload state happens in the
1 CPU, 2 GB of memory, and a 19 GB disk; one cluster. Fig 3. Shows the normal execution time
virtual machine as a slave2 has 2 CPUs, 3GB of of three slaves of cluster when the workloads in
memory, and a 19 GB disk; one machine as a normal load are 50, 100 and 200 MB for slave 1
slave3 has 4CPUs, 6GB of memory, and a 538 to 3, respectively.
GB disk. Slave 2 in the cluster is overloaded (Fig.4.),
Table 3 presents the specifications of each because it takes 240 s to finish its job (more than
node. All of the slave machines adopt the its normal execution time). The execution time
operating system as Ubuntu 15.0 LTS, and the of WordCount is measured for each node in all
master machine adopts the operating system as rounds in DDP algorithm and ADDP algorithm
windows 7. in this situation and the results is shown in Fig.
4 to Fig. 11.
The both algorithms in the first round distribute
TABLE 4 data blocks based on computing capacity ratios
RATIO TABLE (Fig.4, Fig5.).
Job Type Slave1 Slave2 Slave3
In round 2, the DDP algorithm distributes
WordCount 1 2 4 data blocks based on computing capacity, but
the presented ADDP algorithm distributes data
blocks based on values which is registered in
Cluster-History table.
NameNode assigns data blocks based on
Journal of Advances in Computer Engineering and Technology, 2(4) 2016 28

this values which are calculated by Load- 300


Distribution-Patterns table formulas. Slave2 is 240

Execution Time (s)


250
overloaded, so 10% of slave2 workload must be
200
added to workload of salve3 which is underload.
150
As a result, in round2 the nodes’ workloads
100
become 50MB, 90MB, 210MB and the execution 33
53
50
times are 33s, 190s and 61s for slave1, slave2 and
0
slave3, respectively.(Fig.7)
slave1: 50 slave2: 100 slave3: 200
The execution time 190s for slave2 is still too
much, so 10% of slave2 workload must be added Size Of Data (MB)
to workload of salve3. As a result, in round3 the Fig. 5.Execution time of each slave for ADDP in overload
nodes’ workloads become 50MB, 81MB, 219MB state (Round (1))
and the execution times are 33s, 141 s and 73 s for
slave1, slave2 and slave3, respectively (Fig.9).
300
The execution time of slave2 is still too much, 240

Execution Time (s)


250
so in similar approach, 10% of slave2 workload 200
is added to workload of salve3 in round 4. Thus, 150
in round4 the nodes’ workloads become 50MB, 100 53
73MB, 227MB and the execution times are 50 33
33s, 91s and 80s for slave1, slave2 and slave3, 0
respectively (Fig.11). slave1: 50 slave2: 100 slave3: 200
After four rounds the cluster with 350 MB
Size Of Data (MB)
input data volume, is balanced and the average
execution time of the whole cluster is 68 seconds, Fig. 6.Execution time of each slave for DDP in overload
but the average execution time of the whole state (Round (2))
cluster in DDP algorithm is 108.66 seconds.
200 190
Execution Time(s)

150
100 83
Execution Time(s)

80 100
60 53 61
40 33 50 33
20
0
0
Slave1: 50 Slave2: 100 Slave3: 200
slave1: 50 slave2: 90 slave3: 210

Size Of Data (MB) Size Of Data (MB)

Fig. 3.Execution time of each slave in normal load state Fig. 7.Execution time of each slave for ADDP in overload
state (Round (2))

300
300 240
Execution Time (s)

240 250
Execution Time (s)

250
200 200
150 150
100 100
53 53
50 33 33
50
0
0
slave1: 50 slave2: 100 slave3: 200
slave1: 50 slave2: 100 slave3: 200
Size Of Data (MB)
Size Of Data (MB)
Fig.4.Execution time of each slave for DDP in overload
state (Round (1)) Fig. 8.Execution time of each slave for DDP in overload
state (Round (3))
Journal of Advances in Computer Engineering and Technology, 2(4) 2016 29

only considers computing capacity in assigning


160 141
data, but also monitors and considers load state
140 of nodes in assigning data block.
Execution Time (s)

120
100
73
Experiment 2:
80
60
33 In the experiment 2, a comparison is made
40
between the DDP algorithm, the ADDP algorithm
20
0 and Hadoop1.2.1 when an overload state happens
salve1: 50 slave2: 81 slave3: 219
in the cluster. Fig. 12 shows cluster in overload
states in the Hadoop-1.2.1 framework. Fig. 13
Size Of Data (MB)
shows execution time of the whole cluster in the
Fig. 9.Execution time of each slave for ADDP in overload
Hadoop framework, the DDP algorithms and
state (Round (3)) the presented ADDP when slave2 is overload.
As the results shown, Hadoop framework and
300
DDP algorithm can’t understand overloading
240 state in the nodes and can’t handle underload
250
Execution Time (s)

and overload state in the cluster, but ADDP can


200 make the corresponding adjustment to achieve
150 the optimal state and realize self-regulation and
100 decrease the execution time in each round.
53
50 33

0
slave1: 50 slave2: 100 slave3: 200 300 250
Execution Time (s)

200
Size Of Data (MB)
80 83
100
Fig. 10.Execution time of each slave for DDP in overload
state (Round (4)) 0
Slave1: 116 Slave2 :116 Slave3: 116
100 91 Size Of Data (MB)
80
Execution Time(s)

80
Fig. 12.Execution time of each slave for Hadoop in
60 overload state

40 33

150 137.66137.66137.66137.66
20
Execution Time(s)

108.66108.66108.66108.66
100 108.6694.66
0 DDP
50 82.33 68
slave1: 50 slave2: 73 slave3: 227 algorithm
0 ADDP
Size of Data (MB) 1 2 3 4 algorithm
Fig. 11.Execution time of each slave for ADDP in overload
state (Round (4))
Execution Rounds

In fact, the whole cluster executions time Fig.13.Comparison between the execution time of the
whole cluster for Hadoop, DDP and ADDP algorithms,
of the presented ADDP algorithm are reduced in each round in overload state
in each round, but executions time of the DDP
algorithm is the same in all rounds (Fig.4, Fig6,
Fig8 and Fig10). V. CONCLUSION
The DDP algorithm allocates data to each node
in accordance with the nodes computing capacity This paper proposes adaptive dynamic data
which is accordance to hardware, so it doesn’t placement algorithm (ADDP) for map tasks of
work well in overload state and underload states. data locality to allocate data blocks. This algorithm
In contrast, the presented ADDP algorithm not belongs to the resource aware scheduling
Journal of Advances in Computer Engineering and Technology, 2(4) 2016 30

algorithms classification. IN a heterogeneous REFERENCE


environment, the difference in nodes computing
capacity may cause load imbalance and creates [1] G. Turkington, 2013. Hadoop Beginner’s Guide:
the necessity to spend additional overhead to Packt Publishing Ltd.
transfer unprocessed data from slow nodes to fast [2] A. Holmes , 2012. Hadoop in practice: Manning
Publications Co.
nodes. To improve the performance of Hadoop in
[3] R. D. Schneider, 2012. Hadoop for Dummies Special
heterogeneous clusters, we aim to minimize data
Edition, John Wiley&Sons Canada.
movement between slow and fast nodes. This [4] C.-W. Lee, K.-Y. Hsieh, S.-Y. Hsieh, and H.-C.
goal can be achieved by a data placement scheme Hsiao, 2014. A dynamic data placement strategy for hadoop
that distributes and stores data across multiple in heterogeneous environments, Big Data Research,1, pp.
heterogeneous nodes based on their computing 14-22
capacities and workloads. The proposed ADDP [5] A. Hadoop, “Welcome to apache hadoop,” Hämtat
algorithm mechanism distributes fragments of från http://hadoop. apache. org, 2014.
an input file to heterogeneous nodes based on [6] R. Xiong, J. Luo, and F. Dong, 2015. Optimizing
their computing capacities, and then calculates data placement in heterogeneous Hadoop clusters, Cluster
Computing, 18, pp. 1465-1480.
each node appropriate workload base on load
[7] J. Xie, S. Yin, X. Ruan, Z. Ding, Y. Tian, J. Majors,
parameters of each node to allocate data blocks,
et al, 2010. Improving mapreduce performance through data
thereby improving data locality and reducing placement in heterogeneous hadoop clusters, in Parallel
the additional overhead to enhance Hadoop & Distributed Processing, Workshops and Phd Forum
performance. The presented algorithm improves (IPDPSW), IEEE International Symposium on, 2010, pp.
the performance of Hadoop heterogeneous 1-9.
clusters and significantly benefits both DataNodes [8] K. Singh and R. Kaur, 2014. Hadoop: addressing
and NameNode. challenges of big data. In Advance Computing Conference
(IACC), on (pp. 686-689). IEEE.
[9] X. Xu, L. Cao, and X. Wang, 2014. Adaptive task
scheduling strategy based on dynamic workload adjustment
for heterogeneous Hadoop clusters.
[10] P. Xu, H. Wang, and M. Tian, 2014.New Scheduling
Algorithm in Hadoop Based on Resource Aware in Practical
Applications of Intelligent Systems, ed: Springer, pp. 1011-
1020.
[11] Z. Tang, J. Zhou, K. Li, and R. Li, 2012. MTSD:
A task scheduling algorithm for MapReduce base on
deadline constraints, in Parallel and Distributed Processing
Symposium Workshops & PhD Forum (IPDPSW), IEEE
26th International.

You might also like