Adaptive Dynamic Data Placement Algorithm
Adaptive Dynamic Data Placement Algorithm
Adaptive Dynamic Data Placement Algorithm
Received (2016-07-16)
Accepted (2016-12-11)
I
large-scale data intensive applications. The current N recent years, the World Wide Web has
Hadoop and the existing Hadoop distributed file been adopted as a very useful platform for
system’s rack-aware data placement strategy in
developing data-intensive applications, since
MapReduce in the homogeneous Hadoop cluster
assume that each node in a cluster has the same
the communication paradigm of the Web is
computing capacity and a same workload is assigned sufficiently open and powerful. The search
to each node. Default Hadoop doesn’t consider engine, webmail, data mining and social network
load state of each node in distribution input data services are currently indispensable data-
blocks, which may cause inappropriate overhead intensive applications. These applications need
and reduce Hadoop performance, but in practice, data from a few gigabytes to several terabytes or
such data placement policy can noticeably reduce even petabytes.
MapReduce performance and may increase extra Google leverages the MapReduce model to
energy dissipation in heterogeneous environments. process approximately twenty petabytes of data
This paper proposes a resource aware adaptive
per day in a parallel programming models[1].
dynamic data placement algorithm (ADDP) .With
ADDP algorithm, we can resolve the unbalanced
Hadoop MapReduce is an attractive model for
node workload problem based on node load status. parallel data processing in high-performance
The proposed method can dynamically adapt and cluster computing environments. MapReduce
balance data stored on each node based on node model is primarily developed by Yahoo [2][site
load status in a heterogeneous Hadoop cluster. apache]. Hadoop is used by Yahoo servers, where
Experimental results show that data transfer hundreds of terabytes of data are generated on at
overhead decreases in comparison with DDP and least 10,000 cores[3]. Facebook uses Hadoop to
traditional Hadoop algorithms. Moreover, the process more than 15 terabytes of data per day.
proposed method can decrease the execution time In addition to Yahoo and Facebook, Amazon and
and improve the system’s throughput by increasing
Last.fm are employing Hadoop to manage the
resource utilization
Index Terms — Hadoop, MapReduce, Resource-
massive huge amount of data [1].
aware, Data placement, Heterogeneous The scalability of MapReduce is proven to be
high, because in the MapReduce programming
model the job will be divided into a series of small
tasks and run on multiple machines in a large-scale
cluster[4]. MapReduce allows a programmer
with no specific knowledge of distributed
programming to create his/her MapReduce
functions running in parallel across multiple
1- Department of Computer Engineering, Islamic Azad
University South Tehran Branch, Tehran, Iran.(Avishan. nodes in the cluster. MapReduce automatically
[email protected]) handles the gathering of results across the multiple
2- Department of Computer Engineering ,Science and nodes and return a single result or set of results
Research Branch, Islamic Azad University, Tehran, Iran. to server[4]. More importantly, the MapReduce
18 Journal of Advances in Computer Engineering and Technology, 2(4) 2016
platform can offer fault tolerance. MapReduce partitions a job to some tasks and HDFS partitions
model can automatically handle failures and it is input data into blocks, and assigns them to every
fault tolerance mechanisms. When a node fails, node in a cluster. Hadoop is based on distributed
MapReduce moves tasks, which is run on the architecture it means HadoopMapreduce adopts
failed node, to be rerun on another node.[5] master/slave architecture, in which a master
In the Hadoop architecture, data locality is node controls a group of slave nodes on which
one of the important factors affecting Hadoop the Map and Reduce functions run in parallel.
applications performance. However, in a Slaves are nodes that process tasks that master
heterogeneous environment, the data required assigns to them .In the MapReduce model, the
for performing a task is often nonlocal ,which master is called JobTracker, and each slave is
affects the performance of Hadoop platform[4]. called TaskTracker. In the HDFS, the master
Data placement decision of Hadoop distributed is called NameNode, and each slave is called
file system (HDFS) is very important for the DataNode. Master is responsible for distribution
data locality which is a determining factor for data blocks and assigning tasks slot to every
the MapReduce performance and is a primary node in Hadoop cluster. The default Hadoop
criterion for task scheduling of MapReduce assumes that the node computing capacity and
model. The existing HDFS’s rack- aware of storage capacity are the same in the cluster such
data placement strategy and replication scheme a homogeneous environment, the data placement
works well with MapReduce framework in strategy of Hadoop can boost the efficiency of
homogeneous Hadoop clusters[6], but in practice, the MapReduce model, but in a heterogeneous
such data placement policy can noticeably environment, such data placement has many
reduce heterogeneous environment performance problems [1].
and may cause increasingly the overhead of
transferring unprocessed data from slow nodes to 2. MapReduce
fast nodes [7]. The rest of this paper is organized
as follows. In Section II, the Hadoop system MapReduce is a parallel programming
architecture, MapReduce model, HDFS, and model used in clusters that have numerous
the motivation for this study is reported. Section nodes and use computing resources to manage
III presents ADDP algorithm, mathematics large amounts of data in parallel. MapReduce is
formulas, variable description and scenarios. proposed by Google in 2004. In the MapReduce
Experiments and performance analysis are model, an application should process is called a
presented in Section IV. Section V concludes this “job”. Hadoop divides the input of a MapReduce
paper by summarizing the main contributions of job into some pieces called “map tasks” and
this paper and commenting on future directions “reduce tasks”, in which the map-tasks run the
of our work. map function and the reduce tasks run the reduce
function. Map function processes input tasks and
II. RELATED WORK AND MOTIVATION data assigned by the Master node and produce
intermediate (key, value) pairs. Based on (key,
1. Hadoop value) pairs which are generated by map function
processes, the reduce function then merges, sorts,
Hadoop is a successful and well-known and returns the result. The MapReduce model is
implementation of the MapReduce model, which based on “master/slave” concept. It distributes a
is open-source and supported by the Apache large amount of input data to many processing
Software. nodes to perform parallel processing, which
Hadoop consists of two main components: reduces the execution time and improves the
the MapReduce programming model and the performance. Input data are divided into many
Hadoop’s Distributed File System HDFS [4], of the same size of data blocks; these blocks
in which MapReduce is responsible for parallel are then assigned to nodes that perform the
processing and the HDFS is responsible for data same map function in parallel. After the map
management. In the Hadoop system, MapReduce function is performed, the generated output is an
and HDFS are responsible for management intermediate several key, value pairs. The nodes
parallel process jobs and management data, that perform the reduce function obtain these
respectively. JobTracker module in Mapreduce intermediate data, and finally generate the output
Journal of Advances in Computer Engineering and Technology, 2(4) 2016 19
data[8] . The MapReduce model was conceived entire job execution time to become extended.
with the principle that “moving computation is A large number of moved data affects Hadoop
much cheaper than moving data[5] . performance. To improve the performance of
Hadoop in heterogeneous clusters, this paper
3. HDFS aims to minimize data movement between slow
and fast nodes. This goal can be achieved by a
HDFS is based on the Google File System data placement scheme that distributes and stores
which is used with the MapReduce model. data across multiple heterogeneous nodes based
It consists of a NameNode module in the on their computing capacities. Data movement
MasterNode and many DataNodes modules in can be reduced if each node is assigned to the
the slaveNodes. The NameNode is responsible workload that is based on node’s data processing
for the management and storage of the entire file speed and node’s system load[4, 7].
system and file information (such a namespace Some task scheduling strategies have been
and metadata). NameNode is responsible for proposed in Hadoop framework in recent
partition the input files that are written in HDFS years. Reference [9] proposed an Adaptive
into many data blocks. These blocks are the same Task Scheduling Strategy Based on Dynamic
size with default size of 64 MB. HDFS allocates Workload Adjustment called (ATSDWA). Each
these data blocks to every DataNode. DataNodes tasktracker collects its own load information
are responsible for storing and processing these and reports it to jobtracker periodically, so
data blocks and sending the result to NameNode. tasktrackers can adapt to the change of load at
Hadoop is fault tolerance and makes three replicas runtime, obtaining tasks in accordance with the
of each data block for the files that are stored on computing abilities. Reference [4] proposed
HDFS. HDFS’s replica placement strategy is to data placement algorithm (DDP) which
put one replica of the block on one node in the distributes input data blocks based on each node
local rack, another on a different node in the same computing capacity in a heterogeneous Hadoop
rack, and the third on a node in some other rack. cluster. Reference[10]proposed a resource
When failure happens to a node, these replicas aware scheduling algorithm in which algorithm
become very important and they should process classifies the type of work and node workload
instead of lost data blocks [1]. to I/O bound jobs and CPU-bound jobs. Each
workload assigns to a group of nodes. Algorithm
4. Background and motivation selects appropriate tasks to run according to the
workload of the node. Reference[11] explored
The Hadoop default data placement strategy an extensional MapReduce task scheduling
assumes that the computing capacity and storage algorithm for deadline constraints (MTSD) for
capacity of each node in the cluster is the same Hadoop platforms, which allows the user to
.Each node is assigned the same workload. Data specify a job’s deadline and finish it before the
placement strategy of Hadoop can boost the deadline. Reference [6] proposed a novel data
efficiency of the MapReduce model, but in a placement strategy (SLDP) for heterogeneous
heterogeneous environment, such data placement Hadoop clusters. That algorithm changes
has many problems. In a heterogeneous traditional Hadoop data block replication based
environment, the difference in nodes computing on data hotness. SLDP adopts a heterogeneity-
capacity may cause load imbalance. The reason aware algorithm to divide various nodes into
is that different computing capacities between several virtual storage tiers firstly, and then
nodes cause different task execution time, so places data blocks across nodes in each storage
the faster nodes finish processing local data tiers circuitously according to the hotness of data.
blocks faster than slower nodes do. At this point,
the master assigns non-performed tasks to the III. ADDP
idle faster nodes, but these nodes do not own
the data needed for processing .The required 1. Main Idea
data should be transferred from slow nodes to
idle faster nodes through the network. Because Computing capacity of each node in the
of waiting for the data transmission time, the heterogeneous clusters is different so a load
task execution time increases. It causes the of each node changes dynamically. Therefore,
20 Journal of Advances in Computer Engineering and Technology, 2(4) 2016
adaptive dynamic data placement algorithm row in table belongs to a load state .There is
(ADDP) is presented in this paper which uses the volume load formula for each row. Every load
type and volume load of jobs for adjusting the parameters compare with every row. If a node’s
distribution of input data block. The proposed load parameters will place in any row in the
algorithm consists of two main phases. In the table, the formulas calculate data load volume
first round, NameNode distributes data blocks that is appropriate for the node’s load state to
based on each node computing capacity ratios change node’s load state and make it becomes in
in the Ratio table. In the next rounds, each normalload. The load volume formulas show how
node load parameters (average Cpu utilization, much workload should add to the current node’s
average memory utilization) are monitored and workload to make it becomes more compatible
registered in the “History table” of the node then with node’s load state so that the nodes use
NameNode calculates each node appropriate resources more efficient. The percentage of
data block numbers which is more compatible added workload is shown by λ factor. Next
with load status based on comparing each node node’s volume load average (VLAi+1) is equal
load parameters with cluster load parameters in to previous volume load average (VLAi) plus
the Load-Distribution-Patterns table. This table a percentage of the current load average.This
has load volume formulas for each load state of percentage factor is different from one row to
a node and these formulas determine the best another and depends on node load state. The
workload that is more compatible with node load percentage factors are defined in definition
situation. The workload that is calculated for lambda factor table.
each node which is more compatible with node
load state is stored in a Cluster-History table and TABLE 1
LOAD-DISTRIBUTION-PATTERNS
will distribute to nodes in the next rounds. Load volume formula Average AverageMemory load state
Cpu Usage Usage
VLAi +1 = VLAi + λ1i (VLAi ) α1≤CpuUsage ≤ α 2 β1 ≤ MemoryUsage ≤ β 2 Underload
TABLE 2
DEFENITION- LAMBDA-FACTOR
Lambda Load State
definition
λ11 Very Underload
Fig. 1. Shows how the Name node deploys data blocks λ12 Underload
on data nodes λ13 Underload near to
Normal
In the algorithm, there are two tables: “Ratio λ2 NormalLoad
table” and ”Load-Distribution-Patterns table”. Optimize-NormalLoad
λ21
Ratio table is a table that stores computing
λ3 Overload
capacity ratios of each node in different job type
and Load-Distribution-Patterns table stores load
parameters as defined average Cpu utilization Every load volume formula in the Load-
(AvgCpuUsage) and average memory utilization distribution-Patterns table tries to calculate
(AvgMemUsage) of the whole cluster in different workload that is more compatible with node load
load states to compare each node load parameters situation. So in general, we have six load levels
with cluster load parameters. In the cluster, we which will be explained in the next part.
assume three main states: the overloading state If a node state is” Very underload”, lambda
is defined as overload, the underloading state is factor for it in the load volume formula is ;
defined as “underload” and the normal loading so node’s workload which will be assigned to
state is defined as “normalload”. There are some current node’s workload for the next round is at
sub load states based on cluster load situation. least 50% of node current workload plus current
These sub-states are for underload state. Every workload.
Journal of Advances in Computer Engineering and Technology, 2(4) 2016 21
If a node state is” Underload”, lambda factor Total2 = user2 + sys2 + nice2 + idle2 + IOwait2 + irq2 + softirq2 (7)
for it in the load volume formula is ; so node’s
workload which will be assigned to current node’s Total Memory + Free Memory + Buffers + Cache
workload for the next round is at least 33% of MemoryUsage = (8)
Total Memory
node current workload plus current workload.
If a node state is” Underload near to
NormalLoad”, lambda factor for it in the load 3. Variable Description
volume formula is ; so node’s workload which
will be assigned to current node’s workload for In the mentioned mathematical formulation,
the next round is at least 20% of node current Tavg(i) denotes the average execution time to
workload plus current workload. complete a batch of tasks in the node(i) and Tt(i)
If a node state is” NormalLoad”, lambda shows the average time required to complete one
factor for it in the load volume formula is .When task for the node (I) [4].
node’s load state is in the normal situation, most In order to get the real-time information of
of the time there is no need to add workload to CpuUsage, we can use related parameters in
node current workload, but sometimes cluster the file /proc/stat of Linux system to calculate
administrator can add some more workload to CpuUsage. Seven pieces of items can be extracted
the node current workload to optimize node from file /proc/stat: user-mode time (user), low-
resource utilization. In this situation the lambda priority user-mode time (nice), system mode time
factore will be and the percentage of this factor (sys), idle task-mode time (idle), hard disk I/O
is based on administrator opinion. If a node state preparing time (iowait), hardware interrupting
is ” Overload”, lambda factor for it in the load time (irq), and software interrupting time (softirq).
volume formula is ; so node’s workload which File /proc/stat keeps track of a variety of different
will be assigned to current node’s workload is statistics about the system since it was restarted.
at least 10% of node current workload minus The time unit is called “Jiffy” (1/100 of Figure
current workload. axis labels are often a source of a second for×86
systems). Thus, CpuUsage is calculated with the
2. Mathematical Formulation difference of values between two sample points.
The memory utilization ( MemUsage) reflects
For making Ratio table, mathematical the state of memory in real time. The relevant
formulation 1 to 4 are needed and for making parameters are used from file /proc/meminfo of
Load-distribution-Patterns table mathematical Linux system to calculate MemUsage. There are
formulation 5 to 8 are needed four pieces of useful items extracted from file /
proc/meminfo: total memory size (MemTotal),
N free memory (MemFree), block-device buffers
∑ TaskExeTime ( i ) (1) (Buffers), and file cache (Cached). MemUsage
Tavg = i =1
can be calculated by (8) [9].
Number of Tasks
Tavg ( x)
NodeComputingCapacity = Tt (x) =
NumberOfTaskSlot
(2)
Tt (x)
NodeComputingCapacityRatio
= R= t (x) (3)
Min (x) Tt
R t (x)
BlockNumber = Total BlockNumber × n (4)
∑ R ( x)
t =1
t
cpuUsage =
( user1 + sys1 + nice1 ) − ( user2 + sys2 + nice2 )
Total1 − Total2 (5)
Data Input
Job Input
∑ R ( x)
t =1
t
10. End
NumberOfNodes
n
∑ AvgMemUsage
x =1
16. AvgMemUsage = NumberOfNodes
17. end
All Node’s AvgCpuUsage and AvgMemUsage are Utilized based on Load-Distribution-Patterns Table
23. If
do
24. Set the Utilized flag = True
25. Store utilized flag in utilized field in the Cluster-History Table
26. else
27. Set the Utilized flag = False
28. Store utilized flag in utilized field in the Cluster-History Table
29. end
Journal of Advances in Computer Engineering and Technology, 2(4) 2016 24
30. Else
31. If Input data volume exists in Cluster-History Table do
Distribute the input Data volume based on the value of Block Numbers which exist in the Cluster-
32.
History Table
33. Check utilized flag in utilized field in the Cluster-History Table
34. if The Input data volume is utilized based on utilized field in the Cluster-History Table do
35. Print “ The Cluster Is Utilized” and finish
els
36. Go to 14
e
37. end
38. Else Go to 9
39. End
40. End
41. End Of Algorithm 1
cpuUsage =
( user1 + sys1 + nice1 ) − ( user2 + sys2 + nice2 )
Total1 − Total2
2.
Total1 = user1 + sys1 + nice1 + idle1 + IOwait1 + irq1 + softirq1
Total2 = user2 + sys2 + nice2 + idle2 + IOwait2 + irq2 + softirq2
4 End
n
5.
∑ AvgCpuUsage( x)
Calculate Cluster AverageCpuUsage ( LoadParameter ) → x =1
NumberOfNodes
n
6.
∑ AvgMemUsage( x)
Calculate Cluster AverageMemoryUsage ( LoadParameter ) → x =1
NumberOfNodes
7. Fill Load-distribution-Pattern Table with LoadParameters
8. End Of Algorithm 3
Journal of Advances in Computer Engineering and Technology, 2(4) 2016 26
When a new job is submitted to a cluster History Table and there is Load-Distribution-
and there is no information of that job in the Patterns Table for the job type. Then NameNode
NameNode, at the first round NameNode checks job input volume in the Cluster-History
distributes input data blocks based on values in table.
Ratio table. In the next rounds, the whole cluster If the input volume of the submitted job is
will be monitored by monitoring module. not on the Ratio table , it means that there is no
distribution pattern for input data in the Cluster-
4. Scenarios History table. As a result the newly written data
will be allocated to each node in accordance
In the monitoring phase in general, NameNode with the computing capacity which exists in the
monitors each node load state and compare these RatioTable. After assigning input data blocks,
states with the values in the Load-distribution- NameNode monitors and compares each node’s
Patterns table until node’s new workload which load state with the values in the Load-distribution-
is more compatible with node’s load state will Patterns table until the workload that is more
be calculated. For every node these calculated compatible with node load situation is calculated
workloads will be registerd in the Cluster- History by load formulas in the Load-distribution-
table and will be distributed to each node in the Patterns table. This workload will register for
next rounds . each node in the Cluster- History table and will
In General, based on workflow for every job distribute to nodes when that job with same data
which is submitted to the cluster, there are there input will be submitted into the cluster.
scenarios (three situations) described in next
subsection. The first scenario happens when a 4.3 Scenario 3 (Statements 30 to 35):
new type of job submits to cluster and there are no
information of job type and its input data volume If there are records of the submitted job type
in the cluster. The second scenario happens when and its load volume input data in the Ratio table and
the type of job isn’t new, but its data volume is Cluster-History table, it means that NameNode
new. The third scenario happens when the type of has all information for distributing input data
submitted job and its input data volume are not blocks to each node. NomeNode distributes input
new for the cluster. data blocks based on information that registered
in Cluster-History table. If all nodes in the cluster
4.1 Scenario 1 (Statements 1 to 16): are in normal load situation, the utilized field for
that job with its input load volume in Cluster-
When a new job is submitted to a cluster History table will set True (T), otherwise will
and data are written into the HDFS, NameNode set False (F). These histories in Cluster-History
first checks the RatioTable. These data are used table will help the NameNode to distribute input
to determine whether this type of job has been data blocks without any more effort when a job
performed. If there is no record of the job type in with the same workload is submitd to the cluster,
the RatioTable, It means this type of job is new because all information for distributing input data
and there isn’t any information of job type in the blocks is registered in the Cluster-History table.
NameNode, so for distributing input data blocks,
NameNode needs to make record of the job type IV. EXPERIMENTAL RESULT
in Ratio Table and make records of the job type
and its data volume in Cluster-History Table. This section presents the experimental
Then NameNode makes Load-Distribution- environment and the experimental results for the
Patterns Table for the job type.After distributing proposed algorithm.
input data blocks based on information in Ratio
Table, monitoring phase will start.
TABLE 3 TABLE 5
EACH NODE SPECIFICATION RATIO TABLE EXAMPLE
Machine Operating Memor Number Disk(GB) Job Type Input Data Slave1 Slave2 Slave3
system y ofCores
(GB) α α×
1
α×
2
α×
4
1+ 2 + 4 1+ 2 + 4 1+ 2 + 4
Master Windows7 6 4 930
WordCount
Slave1 Ubuntu Linux15.0 2 1 19.8
Parametri β 2β 4β
Slave2 Ubuntu Linux15.0 3 2 19.8 c Each
node
Slave3 Ubuntu Linux15.0 6 4 583.4 workload
350 MB 50 MB 100 MB 200 MB
150
100 83
Execution Time(s)
80 100
60 53 61
40 33 50 33
20
0
0
Slave1: 50 Slave2: 100 Slave3: 200
slave1: 50 slave2: 90 slave3: 210
Fig. 3.Execution time of each slave in normal load state Fig. 7.Execution time of each slave for ADDP in overload
state (Round (2))
300
300 240
Execution Time (s)
240 250
Execution Time (s)
250
200 200
150 150
100 100
53 53
50 33 33
50
0
0
slave1: 50 slave2: 100 slave3: 200
slave1: 50 slave2: 100 slave3: 200
Size Of Data (MB)
Size Of Data (MB)
Fig.4.Execution time of each slave for DDP in overload
state (Round (1)) Fig. 8.Execution time of each slave for DDP in overload
state (Round (3))
Journal of Advances in Computer Engineering and Technology, 2(4) 2016 29
120
100
73
Experiment 2:
80
60
33 In the experiment 2, a comparison is made
40
between the DDP algorithm, the ADDP algorithm
20
0 and Hadoop1.2.1 when an overload state happens
salve1: 50 slave2: 81 slave3: 219
in the cluster. Fig. 12 shows cluster in overload
states in the Hadoop-1.2.1 framework. Fig. 13
Size Of Data (MB)
shows execution time of the whole cluster in the
Fig. 9.Execution time of each slave for ADDP in overload
Hadoop framework, the DDP algorithms and
state (Round (3)) the presented ADDP when slave2 is overload.
As the results shown, Hadoop framework and
300
DDP algorithm can’t understand overloading
240 state in the nodes and can’t handle underload
250
Execution Time (s)
0
slave1: 50 slave2: 100 slave3: 200 300 250
Execution Time (s)
200
Size Of Data (MB)
80 83
100
Fig. 10.Execution time of each slave for DDP in overload
state (Round (4)) 0
Slave1: 116 Slave2 :116 Slave3: 116
100 91 Size Of Data (MB)
80
Execution Time(s)
80
Fig. 12.Execution time of each slave for Hadoop in
60 overload state
40 33
150 137.66137.66137.66137.66
20
Execution Time(s)
108.66108.66108.66108.66
100 108.6694.66
0 DDP
50 82.33 68
slave1: 50 slave2: 73 slave3: 227 algorithm
0 ADDP
Size of Data (MB) 1 2 3 4 algorithm
Fig. 11.Execution time of each slave for ADDP in overload
state (Round (4))
Execution Rounds
In fact, the whole cluster executions time Fig.13.Comparison between the execution time of the
whole cluster for Hadoop, DDP and ADDP algorithms,
of the presented ADDP algorithm are reduced in each round in overload state
in each round, but executions time of the DDP
algorithm is the same in all rounds (Fig.4, Fig6,
Fig8 and Fig10). V. CONCLUSION
The DDP algorithm allocates data to each node
in accordance with the nodes computing capacity This paper proposes adaptive dynamic data
which is accordance to hardware, so it doesn’t placement algorithm (ADDP) for map tasks of
work well in overload state and underload states. data locality to allocate data blocks. This algorithm
In contrast, the presented ADDP algorithm not belongs to the resource aware scheduling
Journal of Advances in Computer Engineering and Technology, 2(4) 2016 30