Report 2018
Report 2018
Report 2018
A Dissertation Report on
Mrs. Geetha J
Assistant Professor,
Department of Computer Science and Engineering
Ramaiah Institute of Technology, Bangalore
A Dissertation Report on
Mrs. Geetha J
Assistant Professor,
Department of Computer Science and Engineering
Ramaiah Institute of Technology, Bangalore
CERTIFICATE
This is to certify that the project work titled “Application of Zipf’s Law for the
Optimization of Hadoop Shuffle Phase and Comparison of Performance of Hadoop Job
Execution based on Shuffle Parameters” is a bonafide work carried out by Aasia Afreen
(1MS14CS002), Chayanika Bhandary (1MS14CS031) & Jyothi Kumari R
(1MS14CS049) in partial fulfillment for the award of degree of Bachelor of Engineering in
Computer Science and Engineering during the year 2018. The Project report has been
approved as it satisfies the academic requirements with respect to the project work prescribed
for Bachelor of Engineering Degree. To the best of our understanding the work submitted in
this report has not been submitted, in part or full, for the award of said degree.
External Examiners
DECLARATION
We the students of final semester BE, Department of Computer Science and Engineering,
Ramaiah Institute of Technology, Bangalore, hereby declare that the project entitled
“Application of Zipf’s Law for the Optimization of Hadoop Shuffle Phase and
Comparison of Performance of Hadoop Job Execution based on Shuffle Parameters”,
report is completed and written by us under the guidance of Mrs. Geetha J, Dept of CSE,
Bangalore for the partial fulfillment of the requirements for the award of the degree of
Bachelor of Engineering and has not been formed the basis for award of any other degree or
diploma certificate.
Place: Bangalore
Date: 05-05-2018
(1MS14CS002 AASIA AFREEN)
(1MS14CS031 CHAYANIKA BHANDARY)
(1MS14CS049 JYOTHI KUMARI R)
ACKNOWLEDGEMENT
First and foremost, my utmost gratitude to Mrs. Geetha J, Dept of CSE, MSRIT whose
sincerity and encouragement we will never forget. She has been our inspiration as we
overcame all the obstacles in the completion of this project work.
Dr. Anita Kanavalli, Head of the Department of Computer Science and Engineering, had
kind concern and consideration regarding project work and we would like to thank him for
continues support.
We would like to thank our beloved principal Dr. N.V.R Naidu for his support and
encouragement.
This work would not have been possible without the guidance and help of several individuals
who in one way or another contributed their valuable assistance in preparation and
completion of this study.
We would like to express sincere thanks to all the teaching and non-teaching faculty of CSE
Department and my dear friends who helped in all the ways while preparing the Report.
ABSTRACT
Hadoop is an open source distributed processing framework that manages data processing
and storage for big data applications running in clustered systems. It is at the center of a
growing ecosystem of big data technologies that are primarily used to support advanced
analytics initiatives, including predictive analytics, data mining and machine learning
applications. Hadoop can handle various forms of structured and unstructured data, giving
users more flexibility for collecting, processing and analyzing data than relational databases
and data warehouses provide. It uses MapReduce for parallel processing. It consists of two
major phases- the Map phase and the Reduce phase. There is an intermediate phase that goes
unnoticed – the shuffle phase. According to analysis, it is found that one-third of the Map
Reduce processing time is consumed by the shuffle phase and thus, an effort made in the
direction of optimizing Hadoop performance by fine tuning the shuffle phase is legit. In this
project, we attempt a feasibility test to check if the statistical law – Zipf’s Law can be applied
to make smart decisions while spilling to improve the shuffle phase performance of Hadoop
framework. We will also test the law against different datasets of varying sizes to check if the
law has bias against some types of datasets. We will also attempt to compare the time
required for Hadoop job execution by altering the different shuffle phase parameters
(precisely io.sort.factor, io.sort.mb, io.sort.spill.percent) and create predictive models to
predict the Hadoop job execution time according to the variation of the Hadoop Shuffle
phase parameters.
LIST OF FIGURES
1. List of activities in the schedule
2. Gantt chart (Part 1)
3. Gantt chart (Part 2)
4. Existing Architecture
5. Proposed Architecture
6. Sequence Diagram for Zipf’s Law Optimization
7. Sequence Diagram for Parameter Tuning
8. Map Reduce Architecture
9. Proposed Algorithm
10. Zipf’s Law plot for Pride & Prejudice Dataset
11. Zipf’s Law plot for Don Quixote Dataset
12. Zipf’s Law plot for Jane Eyre – Autobiography Dataset
13. Zipf’s Law plot for War & Peace Dataset
14. Failed Hadoop Job Execution
15. Successful Hadoop Job Execution
16. Parameter plots : io.sort.mb (mb) vs time (ms) with io.sort.spill.percent = 60%
17. Parameter plots : io.sort.mb (mb) vs time (ms) with io.sort.spill.percent = 80%
18. Parameter plots : io.sort.spill.percent vs time (ms) with io.sort.mb (mb) = 100
19. Parameter plots : io.sort.spill.percent vs time (ms) with io.sort.mb (mb) = 140
20. Correlation Matrix
21. Colored correlation matrix
Contents
Declaration i
Acknowledgements ii
Abstract iii
List of Figures
1 INTRODUCTION
1.1 General Introduction 10
1.2 Problem Statement 10
1.3 Objectives of the project 10
1.4 Project deliverables 11
1.5 Current Scope 11
1.6 Future Scope 11
2 PROJECT ORGANIZATION
2.1 Software Process Models 11
2.2 Roles and Responsibilities 12
3 LITERATURE SURVEY
3.1 Introduction 12
3.2 Related Works with the citation of the References 12
3.3 Conclusion of Survey 17
6 DESIGN
6.1 Introduction 20
6.2 Architecture Design 20
6.3 Graphical User Interface 23
6.4 Sequence Diagram 23
6.5 Conclusion 23
7 IMPLEMENTATION
7.1 Tools Introduction 23
7.2 Technology Introduction 24
7.3 Overall view of the project in terms of implementation 25
7.4 Explanation of Algorithm and how it is been implemented 26
7.5 Information about the implementation of Modules 27
7.6 Conclusion 28
8 TESTING
8.1 Introduction 28
8.2 Testing Tools and Environment 28
8.3 Test cases 28
1. INTRODUCTION
1.1 General Introduction
Matt Aslett defined Big Data as “Big Data is now almost universally understood to
refer to the realization of greater business intelligence by storing, processing, and analyzing
data that was previously ignored due to limitation of traditional data management
technologies.” Today, “Big Data” seems to be more than just a hype word. It is of real
concern with data growing at enormous rate and with increasing demand of processing of the
data so generated. In such a scenario, Big Data platforms like Hadoop come to the rescue.
Tuning Hadoop performance appears inevitable for satisfying the present needs and the ever-
increasing demands of tomorrow.
Also attempt to test for bias of Zipf’s law against different datasets of varying sizes
which can be further used for Optimization of Shuffle phase of MapReduce
paradigm.
Hadoop framework has over 190 configuration parameters and some of them have
significant effects on the performance of a Hadoop job. Manually setting the optimum
values for these parameters is a challenging task and also a time consuming process.
Apart from tuning Hadoop parameters, we also see that optimizing the Hadoop Spill
and Shuffle phase presents immense scope for performance optimization.
Since humongous amount of data is being produced by the day, BigData is here to
stay. More and more people and organizations will resort to BigData technologies for solving
data issues.
2. PROJECT ORGANIZATION
2.1 Software Process Models
The software process model used in our project is the Waterfall model. Since
requirements are fixed and all the procedures are sequential, waterfall model suits our case
the best. We firstly fixed our problem statement, and then our software and hardware
requirements. We finalized our datasets and then started working on our implementation.
Testing followed the completion of implementation. In the end, the entire project process was
documented.
3. LITERATURE SURVEY
3.1 Introduction
Below we present a literature review of the relentless efforts of the researchers all
over the world in optimizing the MapReduce paradigm especially the Shuffle phase.
The author makes a few assumptions- in a CPU-heavy applications, the number of parallel
tasks should be less to avoid over-utilization of the CPU and large number of tasks should be
spawned to distribute the load evenly among the CPUs. In CPU-light applications, large
number of parallel tasks should be scheduled and less number of tasks should be spawned to
reduce the overhead of each task.
The experiment consisted of two parts- determining if the task is CPU-heavy or CPU-light by
running the job in a single node cluster and verifying the theoretical assumptions on multi-
node cluster. The number of splits which is automatically decided by the Hadoop Map can be
monitored using UNIX file split or cat. The test cases used for the experiment are Grep,
PiEstimator and MRIF. Among these three, Grep and PiEstimator were found to be CPU-
light tasks with CPU utilization of 35% and 4% respectively while MRIF being CPU-heavy
task has 88% CPU utilization.
The theories that were proved in this paper are that in light CPU applications, the number of
map tasks is less than equal to the number of CPUs and in case too many map tasks are to be
run, run as many as possible in parallel. In case of CPU-heavy tasks, run at least as many
map tasks as there are CPUs and the number of map tasks spawned in parallel should be less
than or equal to two.
In the paper, Optimizing Hadoop for the Cluster [2], Hansen, the author claims that the
default configuration of Hadoop is not always the most efficient. The number of mappers
should be equal to the input size divided by the block size and the number of reducers is
equal to 1.75 * number of nodes * maximum number of reducers that can run in parallel for
best results. The results of the experiment follows that for WordCount application, the new
configuration performed 2.5 times faster than the default Hadoop configuration and 1.7 faster
than Cloudera’s configuration.
Karthik Kambatla, Abhinav Pathak & Himabindu Pucha, in their paper, Towards
Optimizing Hadoop Provisioning in the Cloud [3], say that there is no hard and fast
configuration rules that are applicable to all kinds of jobs. It suggests that to make proper
utilization of the resources, a consumption profile history of the application is necessary.
The steps to generate the consumption profile history is enlisted as finding resource
consumption by running the task against smaller number of nodes and dataset and then match
the resource consumption signature with that of the signatures of the previous tasks stored in
the database. The closest signature was assigned to the task.
The paper, Hadoop MapReduce Shuffle Phase Management [4], by Narendar Kumar,
proposes that the time required for the shuffle phase can be reduced by using an SDN
enabled structure. Having an SDN controller helps us control the network traffic and enable
re-routing traffic through alternative routes thus reducing network congestion in the shuffle
phase.
For benchmarking, the authors have used Hadoop Terasort. Different datasets of different
sizes (2GB, 10GB, 20GB and 50GB) in a cluster of 16 nodes having an SDN based
architecture including an SDN controller
In the white paper, Advanced Hadoop Tuning and Optimizations [5], by Sanjay Sharma,
it’s proposed that tuning of Hadoop configuration parameters is a black box art. It suggests
users to change as many parameters as possible for better results. Among the many
suggestions provided the most important ones include - mapred.compress.map.output should
be set to TRUE for large clusters and large jobs,
mapred.map/reduce.tasks.spectulative.execution should be set to FALSE if the average
execution time is greater than 1 hour, mapred.tasktracker.map/reduce.tasks.maximum should
be set in the range of [(cores per node) / 2 , 2 * (cores per node)] for large clusters and the
block size should be made larger for large datasets to reduce the number of map tasks
spawned.
In the paper, Speculative Executive Strategy Based on Node Classification and Hierarchy
Index Mechanism for Heterogeneous Hadoop Systems [6], the authors propose to employ
Node Classification along with a different hierarchical storage structure to keep the amount
of time required for completion of map and reduce phases of a backup task to help in
scheduling of tasks. Scheduling of tasks is based on an intuitive assumption that a backup
task is selected to be scheduled only if the execution time of the backup task is at least 50%
less than the remaining execution time of the currently running task. The authors of this
paper claim that the new scheduling technique runs the Word Count problem 12 times faster
than Hadoop original. They claim that because of hierarchical indexing of stored data, search
time complexity is reduced i.e in case the data was stored as a list, then the time complexity
would have been O(k) where k is the number of all items stored but because of hierarchical
structure, the time complexity has been reduced to log 2 n+k/n where n in the number of
nodes. Thus we can see that the time complexity has reduced to O(log 2 (n)).
In the paper, Reducing MapReduce Abstraction Costs for Text-Centric Applications [8],
the authors have proposed two optimization techniques to improve MapReduce performance
for text-centric tasks. The first approach, Frequency Buffering makes use of Zipf’s law to
maintain an in-memory buffer which keeps track of frequently appearing keys during the
shuffle phase. To find the frequent keys, the algorithm proposed by Metwally et al is used for
profiling, which uses a table consisting of entries consisting of frequency numbers, and a
linked list of keys which occur that many number of times. After profiling, the top k keys are
predicted, reducing sort costs before the combine and reduce phases. If this optimization
technique is implemented, 40% of the abstraction costs are reduced for WordCount, 30% for
InvertedIndex, and 45% for WordPOSTag.
The second approach presented by the authors, is Spill Matcher which makes use of two
types of threads (Support threads & Map threads). Here, the support threads are used to sort
the data when a spill occurs, followed by combine() which writes output to intermediate local
file. The map threads are used to merge sort the spilled tuples from different intermediate
local files, following which combine() occurs before the reduce phase. To minimize the wait
times between both these threads, we can make use of the bulk-service queuing model, which
uses 2 assumptions: the producer threads fill the spill buffer as a Poisson process and the
execution time for the consumer threads which process the spills are identically distributed
random variables. If this approach is used, wait time for WordCount reduces by 90%, for
InvertedIndex by 89%, for AccessLogSum by 77%, and by 83% for AccessLogJoin.
WordPOSTag has negligible wait time in its slowest thread, and spill-matcher is not very
effective for PageRank, cutting down the wait time by only 42% between the two threads.
Native Hadoop uses java based network transport stack on top of JVM for merging and
shuffling phases. This proves to be a bottleneck in Hadoop framework. In the paper, JVM-
Bypass for Efficient Hadoop Shuffling [9], the authors implemented a portable plug-in
library for JVM-Bypassing that can help existing Hadoop framework to leverage TCP/IP
protocol as well as RDMA protocol. This proposal supports RDMA protocol which was an
inherent problem in Hadoop.
Yanfei Guo, Jia Rao, Dazhao Cheng, and Xiaobo Zhou, in their paper- iShuffle:
Improving Hadoop Performance with Shuffle-on- Write [10], propose to separate the shuffle
phase from the reduce phase and make it job-independent. It claims to solve the problem of
skewed input data to the reducers as it predicts the size of the partitions to be fed to the
reducers and also balances the distribution of the map outputs across the nodes. The iShuffle
is designed to have 3 components- shuffler, shuffle manager, and task scheduler. For iShufle
to function properly with minimal changes to the existing architecture, Shuffler is designed to
be an independent component in the TaskTracker which takes the input from the combiner
and shuffles the data. It performs shuffling each time the data is written to local disks by map
tasks i.e shuffle-on- write.
It is seen that a lot of research has been undertaken in the field of tuning Hadoop
parameters but it still remains a black box with no hard and fast rules concluded. This
provides immense opportunity to explore more in this regard.
Zipf’s law also presents itself in light of optimizing Hadoop shuffle phase. If Zipf’s law can
be tested for bias against different datasets and if it proves to have no bias against any
particular dataset then the Hadoop shuffle phase can be designed in accordance to this
statistical law and such a concept can also be extended to other popular Big Data platform
like Apache Spark.
In this project we use Big Data tools like Apache Hadoop and also data analytics to
optimize the shuffle phase in addition to verifying the Zipfian distribution.
The code is written in Java (Hadoop word count and sort) and in Python (sort and graph
plotting). For parameter tuning, Python Flask has been used.
6. DESIGN
6.1 Introduction
For implementing our project, we have used Apache Hadoop Big Data platform. We have
also used Python for sorting and getting our final results.
Existing Architecture:
In Hadoop MapReduce, after the Map phase and before the beginning of the Reduce phase is
a handoff process, known as shuffle and sort. Here, data from the mapper tasks is prepared
and moved to the nodes where the reducer tasks will be run. When the mapper task is
complete, the results are sorted by key, partitioned if there are multiple reducers, and then
written to disk. During the reduce-shuffle phase, merging of intermediate results into an
ordered file takes place after which the shuffling phase stops. During the shuffle phase, the
intermediate map output is stored in the circular buffer with limited storage capacity (by
default 100MB) and when the buffer is 80% full (this value is also configurable), the data is
spilled onto the disk. Before the reduce phase starts, all the output is expected to be on the
disk from where the reducer threads fetch the input for each of the reducers. This leads to a
mandatory spill to the disk at the end of the shuffle phase.
● We see that spilling phase is a major hindrance to better performance because if the data
is spilled more than once into the disk, 3 extra I/O needs to be performed to perform
sorting by the key. The 3 I/O are writing onto the disk for the first time and then reading
from and writing from the disk again to perform the sort and since disk access time is 6-7
times the buffer access time, spilling to the disk more than once is to be avoided to
ameliorate the performance of this phase. It is also important to understand that if the
spill thread is slower than the rate at which the map output is being produced, the circular
buffer will get full and the map task has to be blocked until there is again some free space
on the buffer.
● In case the intermediate results are geographically distributed across different data
centers, then data transferring is done through HTTP connections. This process is highly
time-consuming as the bandwidth and topology affect remote file transfer.
Proposed Architecture:
In our undertaking of the current project, we propose a new design for the existing Hadoop
architecture to use Zipf’s Law for gaining better performance in the shuffle phase. To make
use of the frequency-rank relationship given by the Zipfian distribution, one needs to have
prior knowledge of the text-centric application. If not so, the data needs to be preprocessed to
gain an insight into the frequency of the keys appearing in the data. The formulating of an
efficient algorithm for preprocessing of data is beyond the scope of the project.
By using the prior knowledge of the data and the frequency of appearance of the keys in the
data, smart decisions can be made as to which key is to be spilled onto the disk when the
circular buffer is nearing saturation. The relatively infrequent keys are spilled.
For the GUI, we have used Python Flask. The user gives four Hadoop Shuffle phase tuning
parameter values as input, namely Dataset size, io.sort.mb, io.sort.spill.percent and
io.sort.factor. The output will be the execution time that is predicted by the three prediction
models – Regression, SVMs and KNN, will be displayed along with the accuracy of each
model. Depending on the accuracy, the user can select the desired execution time from these
3 models.
6.5 Conclusion
Thus, we propose an efficient design that will make sure that the Shuffle phase of the
Hadoop MapReduce paradigm is optimized by making use of Zipf’s Law for faster execution
and also, making use of tuning parameters to make the job execution better.
7. IMPLEMENTATION
7.1 Tools Introduction
The tools used by us in this project are namely SciPy, Python, Hadoop and Matplotlib.
The main technology we emphasize on in our project is Apache Hadoop and its
MapReduce paradigm. Though Hadoop MapReduce framework has been accepted whole
heartedly in the industry but time and again we have come to question the efficiency of the
paradigm and lot of research has been in progress to access and improve the performance of
the framework as a whole as well its components. Here we describe briefly the components
of the MapReduce architecture.
The mapper maps the <key, value> input to intermediate <key, value> pairs and these are put
into the circular buffer. The transformed intermediate output is not necessarily of the same
form as that of the input. In the spilling phase, the map output is stored in an in-memory
buffer and when the buffer is almost full, which is usually set to 80%, spilling to the local
disk starts by a thread background thread. Spilling happens at least once- i.e. when the entire
map-task is completed because all the partitions, one for each reducer, must be available on
the disk.
By doing this we have generated datasets which contain the Hadoop parameters and
execution time as different columns. These datasets are used to train ML models like
Multiple Regression and SVMs, which later predict the Hadoop job execution time given the
different parameter values provided by the user through the GUI.
Thus, we can approximately predict the job execution time depending on the tuning
performed on the different Shuffle phase parameters.
For our approach where we suggest using Zipf’s Law using a Hash Table during the Shuffle
phase, the proposed algorithm assumes that there is only a single time spill from the circular
buffer to the disk for simplicity. The frequency rank relationship of all the keys are presumed
to be known (pre-processing of input data is required). The number of keys to be kept in the
buffer is entirely intuitional and is assumed to be 5% of the total keys available. It is also
known that the buffer access time is much lower than that of the disk access time. The
algorithm, with such assumptions, mathematically proves that the job execution time in a
traditional system is much higher than the one proposed below –
7.6 Conclusion
From the obtained results, it is seen that Zipf’s law is followed by all the datasets to
maximum extent, and the correlation matrix shows the relationship between different
parameters, including the execution time itself. The Machine Learning models used can help
us predict the Hadoop job execution time when a set of parameters are given.
8. TESTING
8.1 Introduction
The Zipf’s law has to be tested for each dataset individually to ensure that the distribution
proposed is not biased towards a particular dataset. Even though testing is usually automated
in the industry, manual testing can suffice the purpose for small projects like this. Similar
approach is used for parameter tuning.
We have used Manual Testing as well as graph based testing for Zipf’s law
verification. Expected results are computed using the Zipfian distribution and the obtained
results are tested against the pre-computed expected results. Hence, visual testing can suffice.
The datasets used for the verification of Zipf’s Law purpose are obtained from Project
Gutenberg. Four datasets of different sizes have chosen namely Pride and Prejudice (727
KB), Don Quixote (2.4 MB), War and Peace (3.4 MB) and Jane Eyre- The Autobiography
(1.2 MB).
For parameter tuning, the datasets used were taken from Wikimedia Data Dumps. The
three datasets are of sizes 1.2 GB, 832.4 MB and 2.1 GB respectively.
We are striving to make a prediction of the (approximate) job execution time given the
Hadoop tuning parameters and the dataset size using different machine learning techniques
(This prediction may not be exactly correct because it also depends the map-reduce business
logic but our efforts lies in approximation so that users have some idea about tuning and not
use Hadoop like a black box). We also propose to suggest the most optimal tuning
parameters for a dataset of the given size.
We see that the datasets used approximately follow Zipf’s Law, and thus this law could be
used to optimize the Shuffle phase of Hadoop MapReduce. Later this optimization could
even be extended to Apache Spark.
10. REFERENCES
11. APPENDIX
The following are the graphs plotted for each of the datasets taken. We can see here that our
output (plotted in red) closely follows the theoretical output (plotted in blue).
Figure 10: Pride & Prejudice Dataset Figure 11: Don Quixote Dataset
Figure 12: Jane Eyre – Autobiography Dataset Figure 13: War & Peace Dataset
The datasets are fed to the WordCount MapReduce on the Hadoop cluster as input and
the output so obtained are sorted in a descending fashion of the frequency of their
occurrence. The highest frequency is used as reference for calculating the expected values in
accordance to Zipf’s Law. Both the actual results and expected values are plotted on a graph
to visually understand the deviation of the results obtained from the theoretical values.
The screenshots below were taken at the time of running the Hadoop jobs after tuning the
parameters to create different data points according to the dataset.
As highlighted in the figure above, this screenshot was taken when the Hadoop job failed,
“Spill Failed” is the error message shown.
When the job is successful, the failed shuffles shown are zero, shuffled maps are indicated
and as highlighted in the figure below, the time taken for execution can be noted.
We’ve plotted many graphs, out of which a few are displayed here. The following graphs are
a plot of job execution time vs io.sort.mb, where the parameter io.sort.spill.percent is varied
with values 60% & 80% respectively.
Figure 16: io.sort.mb (mb) vs time Figure 17: io.sort.mb (mb) vs time
(ms) with io.sort.spill.percent = (ms) with io.sort.spill.percent =
The following graphs are a plot of job execution time vs io.sort.spill.percent, where the
parameter io.sort.mb is varied with values 100MB & 140MB respectively.
Following is the correlation matrix that was created which shows the correlation between
different parameters including execution time.
The colored correlation matrix shows correlation in a visually appealing way. Thus, these are
the results that have been obtained for our project.