Sailfish - A Framework For Large Scale Data Processing

Download as pdf or txt
Download as pdf or txt
You are on page 1of 14

Sailfish: A Framework For Large Scale Data Processing

Sriram Rao∗ Raghu Ramakrishnan∗ Adam Silberstein∗


Microsoft Corp. Microsoft Corp. LinkedIn
[email protected] [email protected] [email protected]
Mike Ovsiannikov Damian Reeves
Quantcast Corp Quantcast Corp
[email protected] [email protected]

Abstract machine learning models for computational advertising, click log


In this paper, we present Sailfish, a new Map-Reduce frame- processing, etc.) that process several tens of terabytes of data are
work for large scale data processing. The Sailfish design is common. Such applications are executed on large clusters of com-
centered around aggregating intermediate data, specifically data modity hardware by using parallel dataflow graph frameworks such
produced by map tasks and consumed later by reduce tasks, to as Map-Reduce [10], Dryad [15], Hadoop [2], Hive [28], and Pig [20].
improve performance by batching disk I/O. We introduce an ab- Programmers use these frameworks on in-house clusters as well as
straction called I-files for supporting data aggregation, and de- on public cloud settings (such as Amazon’s EC2/S3) to build ap-
scribe how we implemented it as an extension of the distributed plications in which they structure the computation as a sequence of
filesystem, to efficiently batch data written by multiple writers and steps, some of which are blocking. The framework in turn simpli-
read by multiple readers. Sailfish adapts the Map-Reduce layer fies distributed programming on large clusters by providing soft-
in Hadoop to use I-files for transporting data from map tasks to ware infrastructure to deal with issues such as task scheduling,
reduce tasks. We present experimental results demonstrating that fault-tolerance, coordination, and data transfer between computa-
Sailfish improves performance of standard Hadoop; in partic- tion steps. We refer to the data that is transferred between steps as
ular, we show 20% to 5 times faster performance on a represen- intermediate data.
tative mix of real jobs and datasets at Yahoo!. We also demon- In the cluster workloads at Yahoo!, we find that, a small fraction
strate that the Sailfish design enables auto-tuning functionality of the daily workload (about 5% of the jobs) use well over 90% of
that handles changes in data volume and skewed distributions ef- the cluster’s resources. That is, these jobs (1) involve thousands of
fectively, thereby addressing an important practical drawback of tasks that are run on many machines in the cluster, (2) run for sev-
Hadoop, which in contrast relies on programmers to configure sys- eral hours processing multiple terabytes of input, and (3) generate
tem parameters appropriately for each job, for each input dataset. intermediate data that is at least as big as the input. Therefore, we
Our Sailfish implementation and the other software compo- find that the overall cluster performance tends to be substantially
nents developed as part of this paper has been released as open impacted by these jobs and in particular, how the computational
source. framework handles intermediate data at scale.
At large data volumes, transporting intermediate data between
computation steps typically involves writing data to disk, reading it
Categories and Subject Descriptors back later, and often a network transfer. This is because tasks from
C.2.4 [Computer-Communication Networks]: Distributed Systems– successive computation steps are usually not co-scheduled and may
Distributed applications also be scheduled on different machines to exploit parallelism. For
computations in which tens of terabytes of data have to be trans-
General Terms ported across cluster machines, the associated overheads can be-
come a significant factor in the job’s overall run-time. Surprisingly,
Design, Experimentation, Performance the overheads involved in the handling of intermediate data and its
impact on performance at scale have not been well studied in the
1 Introduction literature. This is the focus of our paper.
In recent years data intensive computing has become ubiquitous at The main contributions of this paper are as follows:
Internet companies of all sizes, and the trend is extending to all
• We demonstrate the importance of optimizing the transport
kinds of enterprises. Data intensive computing applications (viz.,
of intermediate data in distributed dataflow systems such as

Work done at Yahoo! Labs Hadoop (Section 2).

• We argue that batching data for disk I/O (or aggregating data
for disk I/O) should be a core design principle in handling
Permission to make digital or hard copies of all or part of this work for intermediate data at scale (Section 3).
personal or classroom use is granted without fee provided that copies are
not made or distributed for profit or commercial advantage and that copies • Based on this design principle, we develop I-files as an
bear this notice and the full citation on the first page. To copy otherwise, to abstraction, to support batching of data written by multiple
republish, to post on servers or to redistribute to lists, requires prior specific writers (Section 4). We develop an I-file implementation
permission and/or a fee.
SOCC’12, October 14-17, 2012, San Jose, CA USA by extending distributed filesystems that were previously de-
Copyright 2012 ACM 978-1-4503-1761-0/12/10 ...$15.00. signed for data intensive computing and are already deployed
3.2 Benefits of Aggregation for Addressing Skew 4.1 Background: KFS Overview
While data aggregation lowers the disk overheads in the reduce KFS is designed to support high throughput access to large files in
phase, as noted in Section 2.1 disk overheads can affect the map clusters built using commodity hardware. KFS’s design is similar
phase as well if the map output does not fit in memory and must in spirit to Google’s GFS [13] and Hadoop’s HDFS [3]. Blocks
be spilled to disk before it is merged. Since a reduce task cannot of a file (referred to as chunks) are striped across nodes and repli-
begin execution until all map tasks generate their output file, the cated for fault-tolerance. Chunks can be variable in size but with
sort overhead incurred by even a single map task can substantially a fixed maximal value (which by default is 128MB). KFS consists
increase job completion time. Such a slow-down occurs, for in- of three components: (1) a single metadata server (metaserver) that
stance, if there is skew in map output: some map tasks may incur provides a global namespace, (2) a set of chunkservers that run on
a multi-pass external merge to produce their final output file while each of the cluster machines and store chunks as files on the ma-
the remaining tasks may produce it via a single pass in-memory chine’s local disk(s), and (3) a client library which should be linked
sort. While DISC frameworks employ speculative execution to with applications to access files stored in KFS. Finally, KFS is in-
work around “straggler” tasks, note that such a mechanism has lim- tegrated with Hadoop for storing the input/output in MapReduce
ited benefit in a setting where the slow-down in task completion is computations and has been deployed in practical settings.
due to the external sort.
4.2 Adapting KFS to Support I-files
To mitigate these effects and also to handle skew, we decouple
the sorting of intermediate data from map task execution. That is, An I-file is like any other KFS file, with a few exceptions. First,
the intermediate data is first aggregated, and then sorted outside the the chunks of an I-file are written to using an atomic record append
context of map tasks. Therefore, sorting of intermediate data is primitive and hence, by definition, append-only. Second, in our
now a separate phase of execution which can be optimized by the implementation, once a chunk is closed for writing, it is immutable
DISC framework. This decoupling relieves the programmer from (i.e., stable). We leverage the immutable property of stable chunks
having to tune framework parameters related to map-side sorting to sort the records stored in a chunk (see Section 4.2.3).
of the intermediate data. For instance, specifically for Hadoop, our When a client appends a record to an I-file, this translates to
approach eliminates 4 parameters related to map-side sorting (see an append operation on a chunk of that I-file. The chunkserver
Table 2 (a)) that a programmer has to tune. storing that chunk plays the role of the aggregator task that we out-
Furthermore, we can take advantage of the aggregation of map lined above (see Section 3.1). Next, to allow data aggregation in
outputs to build an index, as noted above, which enables us to auto- I-files to be done in a distributed manner, we (1) restrict the num-
tune the subsequent reduce phase effectively. This alleviates the ber of concurrent writers to a given chunk of an I-file, and (2) allow
need for programmers to select the number of reduce tasks (as re- multiple chunks of an I-file to be concurrently appended to. This
quired, for example, by Hadoop). Using the index, based on the approach has multiple advantages. First, by allowing a chunk to
distribution of keys across the intermediate data files, the number be written to by multiple writers, data for an I-file can be packed
of reduce tasks as well as a task’s key-range assignment can be de- into fewer chunks. Second, since chunks are striped across nodes,
termined dynamically in a data dependent manner. The index also data aggregation for the chunks of an I-file is handled by multiple
allows each reduce task to efficiently retrieve (only) its input. chunkservers which allows us to avoid hot-spots.
Figure 3 shows how Sailfish uses I-files for transporting the In the following subsections, we present the APIs for supporting
intermediate data between map tasks and reduce tasks. Briefly, in I-files, describe how we implement the key operations of append-
our design, the output of map tasks (which consists of key/value ing records and retrieving records by key, and finally, discuss some
pairs) is first partitioned by key and then aggregated1 on a per- efficiency considerations.
partition basis using I-files. This intermediate data is sorted and
4.2.1 I-file APIs
augmented with an index to support key-based retrieval. Finally,
the reduce phase of execution is automatically parallelized in a Conceptually, the APIs we have developed for I-files to support
data-driven manner. We elaborate on this design in the next two record-based I/O are the following:
sections.
• create_ifile(filename): An application uses this
Since I-files are a key building block in the Sailfish design,
API to create an I-file. The call returns a file descriptor for
in what follows (Section 4) we first describe the design and im-
use with subsequent append operations.
plementation of the I-file abstraction. Next, in Section 5, we de-
scribe the Sailfish system, and how it uses I-files to implement • record_append(fd, <key, value>): A writer uses
a Map-Reduce framework. this API to append records to an I-file.2

• scan(fd, buffer, lower_key, upper_key): A


4 I-files for Aggregating Intermediate Data reader uses this API to retrieve records from an I-file that
are in the specified key range. The KFS client library im-
We extended the Kosmos distributed filesystem (KFS) [4], an alter- plements the functionality of retrieving the matching records
native to HDFS, to implement the I-file abstraction. We chose KFS from all the chunks of the I-file.
purely for implementation convenieance (see Section 4.1) since it
already implements some of the features for constructing I-files. 4.2.2 Appending Records To An I-file
Our ideas are general and applicable to HDFS as well. In what fol-
lows, we first provide an overview of KFS and then describe how Appending a record to an I-file translates to an append operation
we extended KFS to implement I-files. on a chunk of that file. The steps involved in appending a record to
an I-file chunk are as follows:
1 2
In the rest of the paper, in the context of Sailfish and I-files Our implementation of this API was released as part of the KFS
we use aggregation to mean batching data for disk I/O. open-source software distribution in version 0.5.
task contacts the workbuilder to obtain the work assignment small (i.e., 32). Second, for a given volume of data, fewer I-files
and thereby recovers the lost execution. means that a larger number of reduce tasks will need to retrieve
their input from each I-file; effectively this increases the number
5.2.5 Recovering Lost Map Task Output of scans on each chunk of the I-file which lowers disk through-
Whenever a chunk of an I-file is lost (e.g., a chunk of an I-file is put. We settled on per-rack aggregation since it provides reason-
lost due to disk failure), the records in that chunk are irretrievably able fault-containment while allowing for a large number of I-files
lost. Since chunks of an I-file were generated by multiple map to be concurrently written (viz., 512 files in our cluster). An eval-
tasks appending data via the record append API, the lost data will uation of the various approaches for aggregation based on different
need to be regenerated by re-executing the appropriate map tasks. cluster/node configurations is outside the scope of this paper.
To regenerate the lost data, additional bookkeeping information to
track the identity of map tasks that wrote to a given I-file chunk 6 Experimental Evaluation
has to be maintained. In our implementation, the workbuilder
We deployed our Sailfish prototype in a 150-node cluster in
maintains this bookkeeping information and uses it to appropri-
our lab and used it to drive a two-part experimental evaluation.
ately trigger re-execution: First, when a map task completes execu-
tion, the iappender notifies the workbuilder about the set of • The first part of the study involves using a synthetic bench-
chunks it wrote its output to. Second, whenever a chunk is lost, the mark to (1) evaluate the effectiveness of I-files in aggregat-
workbuilder notifies the Hadoop JobTracker to re-run the map ing intermediate data and (2) study the system effects of the
tasks that wrote to that chunk. In our implementation, a lost chunk Sailfish dataflow path (see Section 6.2).
is detected when an imerger is unable to retrieve data from the
chunk. The imerger notifies the workbuilder, which then • The second part of the study involves using Sailfish to
triggers re-execution. run a representative mix of real Map-Reduce jobs with their
actual input datasets (see Section 6.3).
5.3 Disk Seek Analysis
In summarizing our results, we find that job completion times with
To derive the number of disk seeks involved in the map phase with Sailfish are in general faster when compared to the same job
Sailfish, note that the map output is committed to disk by the run with Stock Hadoop (see Figure 8 and Table 3/Figure 11). There
chunkservers and then subsequently, read back, sorted, and written are four aspects to the performance gains:
back to disk by the chunksorter. The number of seeks is effectively
data dependent: Let the number of I-files be i, and the number of • I-files enable better batching of intermediate data (see Sec-
chunks in an I-file be c. Now, (1) a lower bound on the number tion 6.2.3). As a result, this leads to higher disk throughput
of disk seeks incurred by the chunkservers for writing out the data during the reduce phase (see Figure 10) and in turn, translates
is i ∗ c, and (2) since the chunksorters perform sequential I/O, the to a faster reduce phase.
minimum number of seeks incurred by the sorters is 2∗i∗c. Hence, • Due to batching of intermediate data, Sailfish provides
a lower bound on the number of seeks is 3 ∗ i ∗ c. better scale when compared to Stock Hadoop (see Figure 8).
To derive the number of disk seeks involved in the reduce phase
with Sailfish, observe that each reduce task retrieves its input • Dynamically planning the execution of the reduce phase en-
from a single I-file and in the worst case must access every chunk ables Sailfish to exploit the parallelism in a data depen-
of that I-file. With R reducers and c chunks per I-file, the number dent manner (see Table 3 and Section 6.3.2). This approach
of disk seeks is proportional to c ∗ R. Note that, in contrast to Stock possibly simplifies program tuning.
Hadoop in which disk overheads are also dependent on the number
• Map-phase execution with Sailfish is in general slower
of map tasks, the disk overheads with Sailfish are independent
when compared to Stock Hadoop. This is because records
of the number of map tasks, but are dependent on the data volume.
are written to the RAM of a remote machine as opposed to
Therefore, batching intermediate data in to as few chunks as possi-
local RAM with Stock Hadoop. However, whenever there is
ble is critical for Sailfish.
a skew in map output, the sorting of the map output can cause
5.4 Miscellaneous Issues: Topology Aware I-files the map phase of Stock Hadoop to be slower when compared
to Sailfish: with Stock Hadoop, due to the skew, some
The I-file abstraction provides the flexibility of choosing where tasks are able to sort the data entirely in RAM while others
(i.e., local versus remote) to aggregate map output. Fault-tolerance incur the overheads of a multi-pass external sort. In con-
considerations influence this choice. Recall that a data loss which trast, with Sailfish, since map output is aggregated and
involves a single chunk of an I-file requires all the map tasks that then sorted, the decoupling allows Sailfish to better par-
appended to that chunk to be re-executed. With local aggregation, allelize the sorting of map output and thereby better handle
map tasks (from a given job that run on a machine) append their the skew (see Figure 12).
output to I-files whose chunks are stored locally; thus if a disk
fails, only map tasks on this machine may be affected. With per- In what follows, we describe the details of our setup in Sec-
rack aggregation, map tasks write to chunks on the same rack; thus, tion 6.1 and then present the results of our evaluation.
if a disk fails, only map tasks on that rack may be affected. With
global I-files, map tasks can write to any chunk; if a disk fails, all 6.1 Cluster Setup
map tasks are potentially affected. Our experimental cluster has 150 machines organized in 5 racks
While the local approach is best with respect to fault-tolerance, with 30 machines/rack. Each machine has 2 quad-core Intel Xeon
unfortunately, it did not scale on our cluster (which has 4 drives per E5420 processors, 16GB RAM, 1Gbps network interface card, and
node) for two reasons. First, the number of files that can be con- four 750GB drives configured as a JBOD, and runs RHEL 5.6. The
currently written to while still obtaining reasonable disk subsystem connectivity between any pair of nodes in the cluster is 1Gbps.
performance is relatively low on a single machine (viz., about 32 We run Hadoop version 0.20.2, KFS (version 0.5) with modifi-
files in our cluster). This causes the number of I-files per job to be cations for the key-based variants defined in Section 4.2.1, and the
Parameter Values
Map tasks per node 6 Parameter Values 6.2.2 Handling intermediate data at scale
Reduce tasks per node 6 Map tasks per node 6
Memory per Reduce tasks per node 6 For scale, we ran Benchmark while varying the volume of inter-
map/reduce task 1.5GB Memory per mediate data generated by the map tasks from 1TB to 64TB. For
io.sort.mb = 512 map/reduce task 512MB
Map-side sort io.sort.factor = 100 Memory per iappender 1GB both Stock Hadoop and Sailfish, we configure the number of
parameters io.sort.record.percent = 0.2 Memory per imerger 1GB mapper tasks such that each mapper generates 1GB of output. For
io.sort.spill.percent = 0.95
the reduce phase, (1) with Stock Hadoop we provide a value for the
(a) Stock Hadoop (b) Sailfish number of reduce tasks and (2) with Sailfish we configure the
Table 2: Parameter settings workbuilder process to assign each reduce task approximately
1GB of data. In the experiments, the number of map/reduce tasks
other Sailfish components. On each machine we run an in- varied from 1024 (for handling 1TB of data) to 65536 (for handling
stance of a Hadoop TaskTracker, a KFS chunkserver, and 4 KFS 64TB of data).
chunksorter daemon processes (one sorter process per drive). The Figure 8 shows the results of our experiments. A key takeaway
disks on each machine are used by all the software components. from this graph is that the performance of Sailfish for handling
Parameter Settings: We configure Stock Hadoop using pub- intermediate data scales linearly even upto large volumes of data
lished best practices [19] along with settings from Yahoo! clusters (viz., 64TB). On the other hand, the performance of Stock Hadoop
for the Hadoop map-side sort parameters. Table 2(a) shows the pa- grows non-linearly as the volume of intermediate data to be trans-
rameters we used. Due to the differences in intermediate data han- ported begins to exceed 16TB.
dling, the parameter settings for Sailfish (shown in Table 2(b)) The following discussion focusses on the system characteristics
are different from Stock Hadoop. The total memory budget im- during the reduce phase of execution. We defer the discussion of
posed by either system is similar. Finally, during the experiments the map phase of execution to Section 6.2.5.
none of the nodes in the cluster incurred swapping. Recall that, in this set of experiments, the amount of input data
Sailfish Notes: For Sailfish, we use the rack-aware vari- to a reduce task is approximately 1GB. Based on the parameter
ant of I-files described in Section 5.4. In the experiments, we settings, the reducer input fits entirely in RAM. Furthermore, in
limit the number of concurrent appenders per chunk of an I-file both systems, a reducer retrieves its input from the multiple sources
to 128, enforced by having each iappender reserve 1MB of log- concurrently: with Stock Hadoop, a reduce task obtains its input
ical space before it appends records to a chunk. We set the number multiple mapper machines (viz., 30 by default) in parallel; with
of I-files to be 512 (the largest possible value given our system Sailfish, an imerger issues concurrent reads to all the chunks
configuration). Choosing a large value makes Sailfish perfor- of the I-file. However, the difference between the two systems is in
mance less sensitive to the specific choice. Furthermore, this set- the efficiency with which the reduce task obtains its input, namely,
ting relieves our users from choosing the number of I-files for their the amount of data read per seek which effectively determines the
specific job. We configure each of the chunksorter deamons to use disk throughput that can be achieved.
256MB RAM. Finally, for the merge involved in generating reducer For Stock Hadoop, Section 2.2 details why data retrieved per
input, if imerger determines that the reducer input exceeds the I/O shrinks and why this hurts its performance: the amount of
amount of RAM, it does an external merge. (Our implementation data a reducer pulls from a mapper, on average, is (1GB/R). For
for merging records is similar to that of Stock Hadoop’s.) Sailfish, since the number of I-files is fixed (i.e., 512), there
is an increase in both the number of chunks in an I-file as well
as the number of reduce tasks assigned to a given I-file. While
6.2 Evaluation With Synthetic Benchmark the amount of data consumed by a reduce task is fixed (namely,
1GB), this data is spread over almost all the chunks of the I-file.
In this part of the study, we evaluate Sailfish for handling in- Consequently, the amount of data retrieved per I/O by a reduce
termediate at scale (viz., for data volumes ranging from 1TB to task from a single I-file chunk begins to decrease. However, due
64TB). We then discuss aspects of the Sailfish dataflow path as to better batching (see Section 6.2.3), the amount of data read per
it relates to (1) packing intermediate data in chunks, (2) overheads I/O with Sailfish is an order of magnitude higher when com-
imposed by chunksorter daemon, and (3) system effects of aggre- pared to Stock Hadoop (see Figure 9). The difference in the amount
gating map output on a rack-wide basis. We begin by describing of data read per seek translates to higher disk read throughput for
our synthetic benchmark program and then present the results. Sailfish in the reduce phase leading to better job performance.
We highlight this effect next.
6.2.1 Benchmark Description Figure 10 shows the disk throughput obtained with Stock Hadoop
as well as Sailfish for runs of Benchmark in which the vol-
To highlight the overheads of transporting intermediate data in iso- ume of intermediate data is 16TB. Given our 1GB limit of data for
lation, we implemented a synthetic MapReduce job in which, inten- each map or reducer task, this job involved executing 16384 map-
tionally, there is no job input/output. Our program, Benchmark, pers and 16384 reducers. For Stock Hadoop, the average amount
performs a partitioned sort: (1) each map task generates a config- data retrieved by a reducer from a map task is about 70KB. For
urable number of records (namely, strings with 10-byte key, 90- Sailfish, the average amount data retrieved by a reducer from
byte value over the ASCII character set), (2) the records are hash- an I-file chunk is about 1.5MB. With fewer seeks and higher amount
partitioned, sorted, and merged and then provided as input to the of data read per seek, the disk read throughput obtained by Sailfish
reduce task, and (3) each reduce task validates its input records on a single machine averages to about 35MB/s. On the other hand,
and discards them. Our Benchmark is very similar to the Day- with Stock Hadoop, due to higher seeks and less amount of data
tona Sort benchmark program that is used in data sorting competi- read per seek, the observed disk throughput averages to about 20MB/s.
tions [7]. Finally, with Benchmark, there is no skew: (1) all map As a result, this effect causes the reduce phase in Stock Hadoop
tasks generate an equal amount of data such that the keys are uni- to be substantially longer when compared to Sailfish’s reduce
formly random and (2) all reduce tasks process roughly the same phase for the same job (viz., 3.5 hours when compared to 1.75
number of keys. hours).
32 45

Data read by a reduce task


Stock Hadoop Stock Hadoop Stock Hadoop
Job run-time (in Hours)
Sailfish Sailfish 40 Sailfish

per retrieval (in MB)


10

Disk Read Throughput (MB/s)


16 35
30
8 1 25
20
4
15
0.1
2 10
5
1 0.01 0
1 2 4 8 16 32 64 128 1 2 4 8 16 32 64 128 0 0.5 1 1.5 2 2.5 3 3.5 4
Intermediate data size (TB) Intermediate data size (TB) Time (in hours)

Figure 8: Variation in job run-time with Figure 9: Data read per retrieval by Figure 10: Disk read throughput in the re-
the volume of intermediate data. Note that a reduce task with Stock Hadoop and duce phase with Sailfish is higher and
the axes are log-scale. Sailfish. hence reduce phase is faster (int. data size
= 16TB).

Note that our implementation of Sailfish can be tuned fur- writes to disk asynchronously). This additional network transfer
ther. For instance, rather than sorting individual chunks when they causes the map phase of execution to be higher by about 10%.
become stable, multiple chunks can be locally aggregated and sorted. From a practical standpoint, aggregating map output on per-rack
This optimization increases the length of the “sorted runs” which basis (see Section 5.4) minimizes the impact of the additional traf-
can lead to better scale/performance. That is, when compared to fic on the network for two reasons. First, clusters for data inten-
Figure 9, this optimization can increase the data read per retrieval sive computing are configured with higher intra-rack connectivity
at larger values of intermediate data size. when compared to inter-rack capacity. For instance, in Yahoo!’s
Finally, there is also the issue of implementation differences be- clusters, the connectivity is 1Gbps between any pair of intra-rack
tween the two systems when transporting intermediate data. Based nodes when compared to 200Mbps between inter-rack nodes. Sec-
on the above results (coupled with the observation that there is no ond, due to locality optimizations in the Hadoop job scheduler (i.e.,
data skew), much of the gains in Sailfish are due to better disk schedule tasks where the data is stored) the intra-rack capacity is
I/O in the reduce phase. Therefore, unless the I/O sizes in both relatively unused and Sailfish leverages the unused capacity.
systems are comparable, we do not believe the implementation dif- Finally, network capacity within the datacenter is slated to sub-
ferences have a significant impact on performance. stantially increase over the next few years. Clusters with 10Gbps
inter-node connectivity (on a small scale) have been deployed [25];
6.2.3 Effectiveness of I-files larger clusters with such connectivity will be commonplace. We ex-
pect Sailfish to be deployed in such settings, where maximiz-
For a given I-file our atomic record append implementation tries
ing the achievable disk subystem bandwidth and in turn effectively
to minimize the number of chunks required for storing the interme-
utilizing the available network bandwidth becomes paramount.
diate data. The experimental results validated the implementation.
The chunk allocation policy ensured that the number of chunks per 6.3 Evaluation With Actual Jobs
I-file was close to optimal (i.e., size of I-file / KFS chunksize).
Furthermore, except for the last chunk of an I-file, the remaining For this part of the study we first construct a job mix by develop-
chunks were over 99% full. This is key to our strategy of maximiz- ing a simple taxonomy for classifying Map-Reduce jobs in general.
ing batching: a metric discussed in Section 4.2.4 was the number Our taxonomy is based on an analysis of jobs we see in Yahoo!’s
of files used to store intermediate data. clusters (see Section 6.3.1). Using this taxonomy, we handpicked
a representative mix of jobs to drive an evaluation using actual
6.2.4 Chunk sorting overheads datasets and jobs (from data mining, data analytics pipelines) run
in production clusters at Yahoo!. We then present the results of our
For I-files, whenever a chunk becomes stable, the chunkserver evaluation.
utilizes the chunksorter daemon to sort the records in the chunk.
The chunksorter daemon is I/O intensive and performs largely se- 6.3.1 Constructing a Job Mix
quential I/O: First, it spends approximately 2-4 seconds loading a
Based on conversations with our users as well as an analysis of our
128MB chunk of data into RAM. Second, it spends approximately
cluster workloads, we use the following taxonomy for classifying
1-2 seconds sorting the keys using a radix trie algorithm. Finally, it
MapReduce jobs in general:
spends approximately 2-4 seconds writing the sorted data back to
disk4 .
1. Skew in map output: Data compression is commonly used
in practice, and users organize their data so as to obtain high
6.2.5 Impact of network-based aggregation
compression ratios. As a result, the number of input records
The improvements in the reduce phase of execution with Sailfish processed by various map tasks can be substantially different,
come at the expense of an additional network transfer. During the and this impacts the size of the output of the task.
map phase, in Stock Hadoop, a map task writes its output to the
local filesystem’s buffer cache (which writes the data to disk asyn- 2. Skew in reduce input: These are jobs for which some parti-
chronously). With Sailfish, the map output is committed to tions get more data than others. The causes for skew include
RAM on remote machines (and the chunkserver aggregates and poor choice of partitioning function, low entropy in keys, etc.
4
Since writing out the sorter output file is a large sequential 3. Incremental computation: Incremental computation is a
I/O, as a performance optimization, our implementation uses the commonly used paradigm in data intensive computing en-
posix_fallocate() API for contiguous disk space allocation. vironments. A typical use case is creating a sliding window
900 Reduce
part of follow-on work [8], we are exploring ways of eliminating
800 Map this parameter. This would then allow the reduce phase of execu-
tion to be adapted completely dynamically based on the available
Time (in minutes)
700
cluster resources (viz., CPUs).
600
Figure 11 shows the results of running the various jobs using
500 Stock Hadoop as well as Sailfish. Our results show that as
400 the volume of intermediate data scales, job completion times with
300 Sailfish are between 20% to 5x faster when compared to the
200 same job run with Stock Hadoop. There are three aspects to the
gains:
100
0
S H S H S H S H S H S H S H • Using I-files for aggregation: In terms of the reduce phase
Nday Behavior Click Segment of computation, except for the LogProc and LogRead jobs
LogCount LogProc LogRead Model Model Attribution Exploder
in which the volume of intermediate data is relatively low
Figure 11: Time spent in the Map and Reduce phases of execu- (see Table 3), for the remaining jobs there is a substantial
tion for the various MapReduce jobs. At scale, Sailfish (S) speedup with Sailfish. The speedup in the reduce phase
outperforms Stock Hadoop (H) between 20% to a factor of 5. is due to the better batching of intermediate data in I-files,
similar to what we observed with Benchmark.
over a dataset. For instance, for behavioral targeting, N -day
models of user behavior are created by a key-based join of a
• Decoupling sorting from map task execution: From our
1-day model with the previous N -day model.
job mix, we found that skew in map output impacted Log-
4. Big data: These are data mining jobs that process vast amounts Proc and NdayModel jobs: (1) in the LogProc job, a few
of data, e.g., jobs that process a day of server logs (where the of the map tasks generated as much as 30GB of data, and
daily log volume is about 5TB in size). With these jobs, the (2) in the NdayModel job, which involves a JOIN of an N -
output is proportional to the input (i.e., for each input record, day dataset with a 1-day dataset, about half the map tasks
the job generates an output record of proportional size). that processed files from the N -day dataset generated about
10GB of data while the remaining tasks generated 450MB of
5. Data explosion: These are jobs for which the output of the data. Figure 12 shows the distribution of map task comple-
map step is a multiple of the input size. For instance, to tion times for NdayModel job. While the skew affects map
analyze the effectiveness of an online ad-campaign by geo- task completion times in both Stock Hadoop and Sailfish,
location (e.g., impressions by (1) country, (2) state, (3) city), the impact on Stock Hadoop due to the sorting overheads in-
the map task emits multiple records for each input record. curred by map tasks is much higher. This result validates
one of our design choices: decoupling the sort of map output
6. Data reduction: These are jobs in which the computation
from map task execution. In these experiments, particularly
involves a data reduction step which causes the intermediate
for the LogProc job, such a separation yielded upto a 5x im-
data size (and job output) to be a fraction of the job input.
provement in application run-time.
For example, there are jobs that compute statistics over the
data by processing a few TB of input but producing only a
few GB of output. • Making reduce phase dynamic: Dynamically determining
the number of reduce tasks and their work assignment in a
Table 3 shows the jobs that we handpicked for our evaluation. We data dependent manner helps in skew handling as well as in
note that several of these are Pig scripts containing joins and co- automatically exploiting the parallelism in the reduce phase.
grouping, and produce large amounts of intermediate data. Of these We illustrate these effects using the LogRead job in which
jobs, BehaviorModel, ClickAttribution are CPU and data inten- there is a skew in the intermediate data (particularly, as Fig-
sive, while the rest are data intensive. Finally, note that in all of ure 13 shows, partitions 0-200 had more data than the rest—
these jobs, with the exception of LogCount, there is no reduction 4.5GB vs 0.5GB). As shown in Table 3 Sailfish used
in the intermediate data size when compared to the job input’s size. more reduce tasks than Stock Hadoop (800 compared to 512),
and proportionately more reducers were assigned to those
6.3.2 Evaluation With Representative Jobs partitions (i.e., as shown in Figure 14, with 2GB of data per
Hadoop best practices [19] recommend using compression to min- reduce task, I-file0 to I-file200 were assigned 3 reducers per
imize the amount of I/O when handling intermediate data. Hence, I-file while the remaining I-files were assigned 1 reducer
for this set of experiments, for handling intermediate data we en- apiece). As a result, by better exploiting the available paral-
abled LZO-based compression with Stock Hadoop and extended lelism, the reduce phase in Sailfish is much faster com-
our Sailfish implementation to support an LZO codec. pared to Stock Hadoop. Our approach realizes these benefits
Table 3 shows the data volumes for the various jobs as well in a seamless manner without re-partitioning the intermediate
as the number of map/reduce tasks. Note that multiple waves of data and simplifies program tuning.
map/reduce tasks per job is common.
For this set of experiments, the workbuilder was configured Finally, to study the effect of change in data volume, we ran the
to assign upto 2GB of data per reduce task (independent of the ClickAttribution job using Sailfish where we increased the in-
job). This value represents a trade-off between fault-tolerance (i.e., put data size (from 25% to 100%). We found that the workbuilder
amount of computation that has to be re-done when a reducer fails) deamon automatically caused the number of reduce tasks to in-
versus performance (i.e., a large value implies fewer reducers, pos- crease proportionately (i.e., from 4096 to 8192) in a data dependent
sibly improving disk performance due to larger sequential I/Os). As manner.
Job Name Job Characteristics Operators Input size Int. data size # of mappers # of reducers Run time
Stock Hadoop Sailfish Stock Hadoop Sailfish
LogCount Data reduction COUNT 1.1TB 0.04TB 400 512 512 0:11 0:14
LogProc Skew in map output GROUP BY 1.1TB 1.1TB 400 1024 1024 3:27 0:37
LogRead Skew in reduce input GROUP BY 1.1TB 1.1TB 400 512 800 0:58 0:40
NdayModel Incr. computation JOIN 3.54TB 3.54TB 2000 4096 4096 2:18 0:42
BehaviorModel Big data job COGROUP 3.6TB 9.47TB 4000 4096 5120 4:55 3:15
ClickAttribution Big data job COGROUP, 6.8TB 8.2TB 21259 4096 4096 6:00 5:00
FILTER
SegmentExploder Data explosion COGROUP, 14.1TB 25.2TB 42092 16384 13824 13:20 8:48
FLATTEN,
FILTER

Table 3: Characteristics of the jobs used in the experiments. The data sizes are post-compression. The job run times reported in
this table are end-to-end (i.e., from start to finish). As data volumes scale, Sailfish outperforms Stock Hadoop between 20% to a
factor of 5. See Figure 11 for a break-down in the time spent in Map/Reduce phases of the job.
40 5000 4
Task run-time (in minutes)

Stock Hadoop
35 Sailfish 4500

assigned to an I-file
Data Size (in MB)
4000

Number of tasks
30 3
3500
25 3000
20 2500 2

15 2000
1500
10 1
1000
5 500
0 0 0
0 200 400 600 800 1000 1200 1400 1600 1800 2000 0 50 100 150 200 250 300 350 400 0 50 100 150 200 250 300 350 400
Map Task # Partition # Partition #

Figure 12: Distribution of map task run Figure 13: Distribution of the size of the Figure 14: For the LogRead job
time for NdayModel job. Skew in map intermediate data files in LogRead job. Sailfish handles skew in interme-
output sizes affects task completion times For this job there is a skew in distribution diate data by assigning reduce tasks in
for both Stock Hadoop and Sailfish, of data across partitions (i.e., skew in re- proportion to the data in the I-file (see
but the impact for Stock Hadoop is much ducer input). Figure 13).
higher.

6.3.3 Impact of data loss in I-files 7 Related Work


In our setting the intermediate data is not replicated: Stock Hadoop The performance of Hadoop Map-Reduce framework has been stud-
does not implement it and for a fair comparison, we did not enable ied recently [16, 22, 27]. In [22], they find that Hadoop job perfor-
replication for I-files. Hence, in the event of data loss (viz., caused mance is affected by factors such as, lack of a schema which im-
by a disk failure), the lost data has to be regenerated via recompu- pacts selection queries, lack of an index which affects join perfor-
tation. When a disk fails, the required recomputations on the two mance, and data parsing overheads. Reference [12] shows how to
systems are: improve Hadoop performance by augmenting data with a schema
as well as an index and then using the augmented information to
• Stock Hadoop: Since map tasks store their output on the local substantially speed up select/join operators. These mechanisms are
disks by arbitrarily choosing a drive, the expected number of applicable with Sailfish as well. In [16], they identify addi-
recomputes is: # #ofoftasks run on a node
drives on a node .
tional factors that affect Hadoop performance such as, I/O mode
for accessing data from disk, and task scheduling. The paper also
• Sailfish: With 512 I-files and 30 machines per rack, describes ways to tune these factors to improve performance by 2.5
with per-rack I-files, a map task running on a node will write to 3.5x. The same paper also notes that another potential source
to all the chunkservers in the rack. Since a chunkserver on of performance improvement is in modifying how the intermedi-
a node arbitrarily chooses a drive to store a chunk file, the ate data is handled. Our work addresses this aspect and our results
expected number of recomputes is: ##ofoftasks run on a rack
drives on a node .
demonstrate substantial gains.
TritonSort [24, 25] was developed for doing large-scale data sort-
Though these recompute tasks can be run in parallel, their effect ing. Recently, in an effort that parallels our Sailfish work, the
on job runtimes is data dependent. For jobs where there is a skew TritonSort architecture has been extended with a Map-Reduce im-
in map output, the cost of recompute with Stock Hadoop is much plementation called ThemisMR [23]. While an experimental com-
higher than Sailfish. For instance, for the LogProc job in parison of the two systems is outside the scope of this paper, we
which over 90% of the job run-time is in the map phase of com- briefly describe the ThemisMR design and then contrast it with
putation, recomputes can cause the overall job run-time to nearly Sailfish. The design goals of ThemisMR are similar to some
double: The run-time with Stock Hadoop increases from 2 :18 to our objectives in building Sailfish: (1) ThemisMR focuses on
4 :06, while with Sailfish it increases from 0 :42 to 1 :08. For optimizing disk subsystem performance when handling intermedi-
the other jobs where there is no skew in map output, regenerating ate data at scale and (2) ThemisMR tries to mitigate skew in reducer
the lost data requires about 1 to 2 waves of map task execution input. With ThemisMR, a Map-Reduce computation consists of
which causes a 10-20% increase in job runtime in either system. two distinct phases, namely map and shuffle followed by sort and
reduce. For handling intermediate data, the first phase involves are inherently limited by Hadoop’s mechanisms for handling inter-
large sequential writes, while the second phase involves large se- mediate data; Sailfish improves performance further by better
quential reads. Through careful buffer management their design en- batching of disk I/O.
sures that intermediate data touches disk exactly twice using I/O’s
that are long/sequential thereby maximizing disk subsystem per- 8 Summary and Future Work
formance. Next, for mitigating skew in reducer input, ThemisMR We presented Sailfish, an alternate Map-Reduce framework
contains an optional sampling phase which is used to determine built around the principle of aggregating intermediate data for im-
partition boundaries. Finally, ThemisMR considers a point in the proved disk I/O. To enable aggregation, we developed I-files as an
design space where cluster sizes are small (on the order of 30-100 abstraction, implemented as an extension of the distributed filesys-
nodes) in which component failures are rare and hence, forgoes tem. Our Sailfish prototype runs standard Hadoop jobs, with
fault-tolerance (i.e., entire job must be re-run whenever there is a no changes to application code, but uses I-files to transport inter-
failure). However, for large clusters consisting of 100’s to 1000’s mediate data (i.e., the output of the map step). We demonstrated
of nodes, it is well-known that failures are not uncommon [9]. For both improved performance and less dependence on user-tuning.
large clusters, the ThemisMR paper [23] notes that requiring entire As part of on-going research, there are several avenues of work
jobs to be re-run whenever there is a failure can adversely impact that we are currently exploring. First, by adding a adding a feed-
performance. Contrasting the two systems, (1) Sailfish pro- back loop to the reduce phase of Sailfish it becomes possi-
vides fault-tolerance while still improving application performance ble to re-partition the work assigned to a reduce task at a key-
and (2) Sailfish tries to mitigate skew in reducer input with- boundary [8]. Such dynamic re-partitioning enables elasticity for
out an explicit sampling step. In addition, within each of the two the reduce phase, thereby improve utilization in multi-tenanted clus-
phases of ThemisMR, to avoid (unnecessary) spilling of data to ters. Second, for mitigating the impact of failures, we are evaluat-
disk (e.g., under memory pressure) their design relies on a memory ing mechanisms for replicating intermediate data thereby minimiz-
manager that forces data generation to appropriately stall. How- ing the number of recomputes. Third, I-files provide new opportu-
ever, as noted in their paper, carefully choosing memory manage- nities for debugging, particularly, the reduce phase of a MapReduce
ment policies and tuning them to maximize performance (such as, job, saving valuable programmer time.
by avoiding deadlocks and by minimizing stalls) is non-trivial. The Sailfish design is geared towards computations in which
Dealing with skew in the context of Map-Reduce has been stud- the volume of intermediate data is large. As we noted in Section 1,
ied by [18, 20, 23, 28]. In these systems, a job is executed twice: for a vast majority of the jobs in the cluster, the volume of inter-
The first execution samples the input dataset to determine the split mediate data is small. For such jobs alternate implementations for
points, which are then used to drive the actual execution over the handling intermediate data may afford better performance. Though
complete dataset. The objective here is to minimize the skew in in- the current versions of the Hadoop framework forces all jobs to use
termediate data (i.e., skew in the reducer input). With Sailfish, the same intermediate data handling mechanism, the next genera-
by gathering statistics over the data at run-time, we try to achieve tion of the Hadoop framework (namely, YARN [1]) relaxes this re-
the same objective without requiring an explicit sampling step. striction. The YARN architecture includes hooks for customizing
An alternate approach for handling skew in reducer input is to intermediate data handling, including a per-job application mas-
adaptively change the map-output partitioning function [29] for ter that coordinates job execution. Incorporating many of the core
Hadoop jobs. In their work, the number of partitions is an input ideas from this paper into an application master and task execution
parameter and is fixed apriori; then, by sampling the output of a layer is the focus of ongoing work [5].
small number of map tasks, they mitigate skew by dynamically Sailfish is currently deployed in our lab and is being eval-
constructing the partitioning function (i.e., split points) such that uated by our colleagues at Yahoo!. We have released Sailfish
the partitions will be balanced. Their methodology is similar to and the other software components developed as part of this paper
Sailfish in that they mitigate skew by sampling the intermedi- as open source [6].
ate data. However, since they assign one reduce task per partition
and the number of partitions is fixed apriori, this parameter has 9 Acknowledgements
to be carefully chosen. In particular, as the volume of intermedi- The authors would like thank Tyson Condie for providing detailed
ate scales, and jobs are run with larger number of partitions, while feedback and insightful suggestions on early drafts. We thank Chris
their techniques may mitigate skew, the performance gains are lim- Douglas, Carlo Curino, and the anonymous reviewers for helpful
ited by Hadoop’s intermediate data handling mechanisms. comments and feedback. We also thank Igor Gashinsky for provid-
Augmenting datasets with an index, particularly after the dataset ing us the cluster used in our experiments.
has been generated is known to be expensive. For instance, in [12],
they find that the one-time cost involved in building an index over a
2TB input dataset using 100 nodes takes over 10 hours. In contrast, 10 References
with Sailfish the augmentation of an index to an I-file chunk is
done as part of intermediate data generation and hence, incurs little [1] Apache Hadoop NextGen MapReduce (YARN).
overhead. http://hadoop.apache.org/docs/r0.23.0/
hadoop-yarn/hadoop-yarn-site/YARN.html.
Starfish [14] uses job profiling techniques to help tune Hadoop
parameters including those related to handling of intermediate data [2] Apache Hadoop Project.
(i.e., the map-side sort parameters and the number of reduce tasks). http://hadoop.apache.org/.
With Starfish, the computation has to be run once to obtain the job [3] HDFS. http://hadoop.apache.org/hdfs.
profile and it then suggests input parameter values for subsequent [4] KFS. http://code.google.com/p/kosmosfs/.
runs of the same job. The dynamic data-driven approach to param- [5] Preemption and restart of mapreduce tasks.
eter tuning in Sailfish achieves the same gains without having http://issues.apache.org/jira/browse/
to run the job once to determine the job profile. Further, as our MAPREDUCE-4585.
analysis and results show, the gains achievable by tuning Hadoop [6] Sailfish. http://code.google.com/p/sailfish/.
[7] Sort benchmark home page. [23] A. Rasmussen, M. Conley, R. Kapoor, V. The Lam,
http://sortbenchmark.org/. G. Porter, and A. Vahdat. ThemisMR: An I/O Efficient
[8] G. Ananthanarayanan, C. Douglas, R. Ramakrishnan, MapReduce. Technical Report CS2012-0983, Department of
S. Rao, and I. Stoica. True Elasticity in Multi-Tenant Computer Science and Engineering, University of California
Clusters through Amoeba. In ACM Symposium on Cloud at San Diego, July 2012.
Computing, SoCC’12, October 2012. [24] A. Rasmussen, M. Conley, G. Porter, and A. Vahdat.
[9] J. Dean. Software engineering advice from building Tritonsort 2011. http://sortbenchmark.org/
large-scale distributed systems. 2011_06_tritonsort.pdf.
http://research.google.com/people/jeff/ [25] A. Rasmussen, G. Porter, M. Conley, H. V. Madhyastha,
stanford-295-talk.pdf. R. N. Mysore, A. Pucher, and A. Vahdat. Tritonsort: a
[10] J. Dean and S. Ghemawat. Mapreduce: simplified data balanced large-scale sorting system. In Proceedings of the
processing on large clusters. In OSDI’04: Proceedings of the 8th USENIX conference on Networked systems design and
6th conference on Symposium on Operating Systems Design implementation, NSDI’11, Berkeley, CA, USA, 2011.
& Implementation, 2004. [26] S. Rao, R. Ramakrishnan, A. Silberstein, M. Ovsiannikov, D.
[11] J. Dean and S. Ghemawat. Mapreduce: A flexible data Reeves. Sailfish: A framework for large scale data
processing tool. Communications of the ACM, 53(1):72–77, processing. Technical Report YL-2012-002, Yahoo! Labs.
January 2010. [27] M. Stonebraker, D. Abadi, D. J. DeWitt, S. Madden,
[12] J. Dittrich, J.-A. Quiané-Ruiz, A. Jindal, Y. Kargin, V. Setty, E. Paulson, A. Pavlo, and A. Rasin. Mapreduce and parallel
and J. Schad. Hadoop++: Making a yellow elephant run like dbmss: friends or foes? Commun. ACM, 53(1):64–71, Jan.
a cheetah (without it even noticing). Proc. VLDB Endow., 2010.
3(1), 2010. [28] A. Thusoo, J. S. Sarma, N. Jain, Z. Shao, P. Chakka,
[13] S. Ghemawat, H. Gobioff, and S. T. Leung. The Google file S. Anthony, H. Liu, P. Wyckoff, and R. Murthy. Hive: a
system. In Proceedings of the nineteenth ACM symposium on warehousing solution over a map-reduce framework. Proc.
Operating systems principles, volume 37 of SOSP ’03, pages VLDB Endow., 2(2), 2009.
29–43, New York, NY, USA, Oct. 2003. [29] R. Vernica, A. Balmin, K. S. Beyer, and V. Ercegovac.
[14] H. Herodotou, H. Lim, G. Luo, N. Borisov, L. Dong, F. B. Adaptive MapReduce using Situation-Aware Mappers. In
Cetin, and S. Babu. Starfish: A self-tuning system for big International Conference on Extending Database Technology
data analytics. Systems Research, pages 261–272, 2011. (EDBT), 2012.
[15] M. Isard, M. Budiu, Y. Yu, A. Birrell, and D. Fetterly. Dryad:
distributed data-parallel programs from sequential building
blocks. In EuroSys ’07: Proceedings of the 2nd ACM
SIGOPS/EuroSys European Conference on Computer
Systems 2007, pages 59–72, 2007.
[16] D. Jiang, B. C. Ooi, L. Shi, and S. Wu. The performance of
mapreduce: an in-depth study. Proc. VLDB Endow., 3(1),
Sept. 2010.
[17] D. E. Knuth. Sorting and Searching, volume 3 of The Art of
Computer Programming. Addison-Wesley Professional,
second edition, May 1998.
[18] Y. Kwon, M. Balazinska, B. Howe, and J. Rolia.
Skew-resistant parallel processing of feature-extracting
scientific user-defined functions. In Proceedings of the 1st
ACM symposium on Cloud computing, SoCC ’10, pages
75–86, New York, NY, USA, 2010. ACM.
[19] A. Murthy. Apache hadoop: Best practices and anti-patterns.
http://developer.yahoo.com/blogs/hadoop/
posts/2010/08/apache_hadoop_best_
practices_a/.
[20] C. Olston, B. Reed, U. Srivastava, R. Kumar, and
A. Tomkins. Pig latin: a not-so-foreign language for data
processing. In SIGMOD ’08: Proceedings of the 2008 ACM
SIGMOD international conference on Management of data,
pages 1099–1110, 2008.
[21] J. Ousterhout et al. The case for ramclouds: Scalable
high-performance storage entirely in dram. SIGOPS
Operating Systems Review, 43(4):92–105, December 2009.
[22] A. Pavlo, E. Paulson, A. Rasin, D. J. Abadi, D. J. DeWitt,
S. Madden, and M. Stonebraker. A comparison of
approaches to large-scale data analysis. In Proceedings of the
35th SIGMOD international conference on Management of
data, SIGMOD ’09, pages 165–178, New York, NY, USA,
2009. ACM.

You might also like