Sailfish - A Framework For Large Scale Data Processing
Sailfish - A Framework For Large Scale Data Processing
Sailfish - A Framework For Large Scale Data Processing
• We argue that batching data for disk I/O (or aggregating data
for disk I/O) should be a core design principle in handling
Permission to make digital or hard copies of all or part of this work for intermediate data at scale (Section 3).
personal or classroom use is granted without fee provided that copies are
not made or distributed for profit or commercial advantage and that copies • Based on this design principle, we develop I-files as an
bear this notice and the full citation on the first page. To copy otherwise, to abstraction, to support batching of data written by multiple
republish, to post on servers or to redistribute to lists, requires prior specific writers (Section 4). We develop an I-file implementation
permission and/or a fee.
SOCC’12, October 14-17, 2012, San Jose, CA USA by extending distributed filesystems that were previously de-
Copyright 2012 ACM 978-1-4503-1761-0/12/10 ...$15.00. signed for data intensive computing and are already deployed
3.2 Benefits of Aggregation for Addressing Skew 4.1 Background: KFS Overview
While data aggregation lowers the disk overheads in the reduce KFS is designed to support high throughput access to large files in
phase, as noted in Section 2.1 disk overheads can affect the map clusters built using commodity hardware. KFS’s design is similar
phase as well if the map output does not fit in memory and must in spirit to Google’s GFS [13] and Hadoop’s HDFS [3]. Blocks
be spilled to disk before it is merged. Since a reduce task cannot of a file (referred to as chunks) are striped across nodes and repli-
begin execution until all map tasks generate their output file, the cated for fault-tolerance. Chunks can be variable in size but with
sort overhead incurred by even a single map task can substantially a fixed maximal value (which by default is 128MB). KFS consists
increase job completion time. Such a slow-down occurs, for in- of three components: (1) a single metadata server (metaserver) that
stance, if there is skew in map output: some map tasks may incur provides a global namespace, (2) a set of chunkservers that run on
a multi-pass external merge to produce their final output file while each of the cluster machines and store chunks as files on the ma-
the remaining tasks may produce it via a single pass in-memory chine’s local disk(s), and (3) a client library which should be linked
sort. While DISC frameworks employ speculative execution to with applications to access files stored in KFS. Finally, KFS is in-
work around “straggler” tasks, note that such a mechanism has lim- tegrated with Hadoop for storing the input/output in MapReduce
ited benefit in a setting where the slow-down in task completion is computations and has been deployed in practical settings.
due to the external sort.
4.2 Adapting KFS to Support I-files
To mitigate these effects and also to handle skew, we decouple
the sorting of intermediate data from map task execution. That is, An I-file is like any other KFS file, with a few exceptions. First,
the intermediate data is first aggregated, and then sorted outside the the chunks of an I-file are written to using an atomic record append
context of map tasks. Therefore, sorting of intermediate data is primitive and hence, by definition, append-only. Second, in our
now a separate phase of execution which can be optimized by the implementation, once a chunk is closed for writing, it is immutable
DISC framework. This decoupling relieves the programmer from (i.e., stable). We leverage the immutable property of stable chunks
having to tune framework parameters related to map-side sorting to sort the records stored in a chunk (see Section 4.2.3).
of the intermediate data. For instance, specifically for Hadoop, our When a client appends a record to an I-file, this translates to
approach eliminates 4 parameters related to map-side sorting (see an append operation on a chunk of that I-file. The chunkserver
Table 2 (a)) that a programmer has to tune. storing that chunk plays the role of the aggregator task that we out-
Furthermore, we can take advantage of the aggregation of map lined above (see Section 3.1). Next, to allow data aggregation in
outputs to build an index, as noted above, which enables us to auto- I-files to be done in a distributed manner, we (1) restrict the num-
tune the subsequent reduce phase effectively. This alleviates the ber of concurrent writers to a given chunk of an I-file, and (2) allow
need for programmers to select the number of reduce tasks (as re- multiple chunks of an I-file to be concurrently appended to. This
quired, for example, by Hadoop). Using the index, based on the approach has multiple advantages. First, by allowing a chunk to
distribution of keys across the intermediate data files, the number be written to by multiple writers, data for an I-file can be packed
of reduce tasks as well as a task’s key-range assignment can be de- into fewer chunks. Second, since chunks are striped across nodes,
termined dynamically in a data dependent manner. The index also data aggregation for the chunks of an I-file is handled by multiple
allows each reduce task to efficiently retrieve (only) its input. chunkservers which allows us to avoid hot-spots.
Figure 3 shows how Sailfish uses I-files for transporting the In the following subsections, we present the APIs for supporting
intermediate data between map tasks and reduce tasks. Briefly, in I-files, describe how we implement the key operations of append-
our design, the output of map tasks (which consists of key/value ing records and retrieving records by key, and finally, discuss some
pairs) is first partitioned by key and then aggregated1 on a per- efficiency considerations.
partition basis using I-files. This intermediate data is sorted and
4.2.1 I-file APIs
augmented with an index to support key-based retrieval. Finally,
the reduce phase of execution is automatically parallelized in a Conceptually, the APIs we have developed for I-files to support
data-driven manner. We elaborate on this design in the next two record-based I/O are the following:
sections.
• create_ifile(filename): An application uses this
Since I-files are a key building block in the Sailfish design,
API to create an I-file. The call returns a file descriptor for
in what follows (Section 4) we first describe the design and im-
use with subsequent append operations.
plementation of the I-file abstraction. Next, in Section 5, we de-
scribe the Sailfish system, and how it uses I-files to implement • record_append(fd, <key, value>): A writer uses
a Map-Reduce framework. this API to append records to an I-file.2
Figure 8: Variation in job run-time with Figure 9: Data read per retrieval by Figure 10: Disk read throughput in the re-
the volume of intermediate data. Note that a reduce task with Stock Hadoop and duce phase with Sailfish is higher and
the axes are log-scale. Sailfish. hence reduce phase is faster (int. data size
= 16TB).
Note that our implementation of Sailfish can be tuned fur- writes to disk asynchronously). This additional network transfer
ther. For instance, rather than sorting individual chunks when they causes the map phase of execution to be higher by about 10%.
become stable, multiple chunks can be locally aggregated and sorted. From a practical standpoint, aggregating map output on per-rack
This optimization increases the length of the “sorted runs” which basis (see Section 5.4) minimizes the impact of the additional traf-
can lead to better scale/performance. That is, when compared to fic on the network for two reasons. First, clusters for data inten-
Figure 9, this optimization can increase the data read per retrieval sive computing are configured with higher intra-rack connectivity
at larger values of intermediate data size. when compared to inter-rack capacity. For instance, in Yahoo!’s
Finally, there is also the issue of implementation differences be- clusters, the connectivity is 1Gbps between any pair of intra-rack
tween the two systems when transporting intermediate data. Based nodes when compared to 200Mbps between inter-rack nodes. Sec-
on the above results (coupled with the observation that there is no ond, due to locality optimizations in the Hadoop job scheduler (i.e.,
data skew), much of the gains in Sailfish are due to better disk schedule tasks where the data is stored) the intra-rack capacity is
I/O in the reduce phase. Therefore, unless the I/O sizes in both relatively unused and Sailfish leverages the unused capacity.
systems are comparable, we do not believe the implementation dif- Finally, network capacity within the datacenter is slated to sub-
ferences have a significant impact on performance. stantially increase over the next few years. Clusters with 10Gbps
inter-node connectivity (on a small scale) have been deployed [25];
6.2.3 Effectiveness of I-files larger clusters with such connectivity will be commonplace. We ex-
pect Sailfish to be deployed in such settings, where maximiz-
For a given I-file our atomic record append implementation tries
ing the achievable disk subystem bandwidth and in turn effectively
to minimize the number of chunks required for storing the interme-
utilizing the available network bandwidth becomes paramount.
diate data. The experimental results validated the implementation.
The chunk allocation policy ensured that the number of chunks per 6.3 Evaluation With Actual Jobs
I-file was close to optimal (i.e., size of I-file / KFS chunksize).
Furthermore, except for the last chunk of an I-file, the remaining For this part of the study we first construct a job mix by develop-
chunks were over 99% full. This is key to our strategy of maximiz- ing a simple taxonomy for classifying Map-Reduce jobs in general.
ing batching: a metric discussed in Section 4.2.4 was the number Our taxonomy is based on an analysis of jobs we see in Yahoo!’s
of files used to store intermediate data. clusters (see Section 6.3.1). Using this taxonomy, we handpicked
a representative mix of jobs to drive an evaluation using actual
6.2.4 Chunk sorting overheads datasets and jobs (from data mining, data analytics pipelines) run
in production clusters at Yahoo!. We then present the results of our
For I-files, whenever a chunk becomes stable, the chunkserver evaluation.
utilizes the chunksorter daemon to sort the records in the chunk.
The chunksorter daemon is I/O intensive and performs largely se- 6.3.1 Constructing a Job Mix
quential I/O: First, it spends approximately 2-4 seconds loading a
Based on conversations with our users as well as an analysis of our
128MB chunk of data into RAM. Second, it spends approximately
cluster workloads, we use the following taxonomy for classifying
1-2 seconds sorting the keys using a radix trie algorithm. Finally, it
MapReduce jobs in general:
spends approximately 2-4 seconds writing the sorted data back to
disk4 .
1. Skew in map output: Data compression is commonly used
in practice, and users organize their data so as to obtain high
6.2.5 Impact of network-based aggregation
compression ratios. As a result, the number of input records
The improvements in the reduce phase of execution with Sailfish processed by various map tasks can be substantially different,
come at the expense of an additional network transfer. During the and this impacts the size of the output of the task.
map phase, in Stock Hadoop, a map task writes its output to the
local filesystem’s buffer cache (which writes the data to disk asyn- 2. Skew in reduce input: These are jobs for which some parti-
chronously). With Sailfish, the map output is committed to tions get more data than others. The causes for skew include
RAM on remote machines (and the chunkserver aggregates and poor choice of partitioning function, low entropy in keys, etc.
4
Since writing out the sorter output file is a large sequential 3. Incremental computation: Incremental computation is a
I/O, as a performance optimization, our implementation uses the commonly used paradigm in data intensive computing en-
posix_fallocate() API for contiguous disk space allocation. vironments. A typical use case is creating a sliding window
900 Reduce
part of follow-on work [8], we are exploring ways of eliminating
800 Map this parameter. This would then allow the reduce phase of execu-
tion to be adapted completely dynamically based on the available
Time (in minutes)
700
cluster resources (viz., CPUs).
600
Figure 11 shows the results of running the various jobs using
500 Stock Hadoop as well as Sailfish. Our results show that as
400 the volume of intermediate data scales, job completion times with
300 Sailfish are between 20% to 5x faster when compared to the
200 same job run with Stock Hadoop. There are three aspects to the
gains:
100
0
S H S H S H S H S H S H S H • Using I-files for aggregation: In terms of the reduce phase
Nday Behavior Click Segment of computation, except for the LogProc and LogRead jobs
LogCount LogProc LogRead Model Model Attribution Exploder
in which the volume of intermediate data is relatively low
Figure 11: Time spent in the Map and Reduce phases of execu- (see Table 3), for the remaining jobs there is a substantial
tion for the various MapReduce jobs. At scale, Sailfish (S) speedup with Sailfish. The speedup in the reduce phase
outperforms Stock Hadoop (H) between 20% to a factor of 5. is due to the better batching of intermediate data in I-files,
similar to what we observed with Benchmark.
over a dataset. For instance, for behavioral targeting, N -day
models of user behavior are created by a key-based join of a
• Decoupling sorting from map task execution: From our
1-day model with the previous N -day model.
job mix, we found that skew in map output impacted Log-
4. Big data: These are data mining jobs that process vast amounts Proc and NdayModel jobs: (1) in the LogProc job, a few
of data, e.g., jobs that process a day of server logs (where the of the map tasks generated as much as 30GB of data, and
daily log volume is about 5TB in size). With these jobs, the (2) in the NdayModel job, which involves a JOIN of an N -
output is proportional to the input (i.e., for each input record, day dataset with a 1-day dataset, about half the map tasks
the job generates an output record of proportional size). that processed files from the N -day dataset generated about
10GB of data while the remaining tasks generated 450MB of
5. Data explosion: These are jobs for which the output of the data. Figure 12 shows the distribution of map task comple-
map step is a multiple of the input size. For instance, to tion times for NdayModel job. While the skew affects map
analyze the effectiveness of an online ad-campaign by geo- task completion times in both Stock Hadoop and Sailfish,
location (e.g., impressions by (1) country, (2) state, (3) city), the impact on Stock Hadoop due to the sorting overheads in-
the map task emits multiple records for each input record. curred by map tasks is much higher. This result validates
one of our design choices: decoupling the sort of map output
6. Data reduction: These are jobs in which the computation
from map task execution. In these experiments, particularly
involves a data reduction step which causes the intermediate
for the LogProc job, such a separation yielded upto a 5x im-
data size (and job output) to be a fraction of the job input.
provement in application run-time.
For example, there are jobs that compute statistics over the
data by processing a few TB of input but producing only a
few GB of output. • Making reduce phase dynamic: Dynamically determining
the number of reduce tasks and their work assignment in a
Table 3 shows the jobs that we handpicked for our evaluation. We data dependent manner helps in skew handling as well as in
note that several of these are Pig scripts containing joins and co- automatically exploiting the parallelism in the reduce phase.
grouping, and produce large amounts of intermediate data. Of these We illustrate these effects using the LogRead job in which
jobs, BehaviorModel, ClickAttribution are CPU and data inten- there is a skew in the intermediate data (particularly, as Fig-
sive, while the rest are data intensive. Finally, note that in all of ure 13 shows, partitions 0-200 had more data than the rest—
these jobs, with the exception of LogCount, there is no reduction 4.5GB vs 0.5GB). As shown in Table 3 Sailfish used
in the intermediate data size when compared to the job input’s size. more reduce tasks than Stock Hadoop (800 compared to 512),
and proportionately more reducers were assigned to those
6.3.2 Evaluation With Representative Jobs partitions (i.e., as shown in Figure 14, with 2GB of data per
Hadoop best practices [19] recommend using compression to min- reduce task, I-file0 to I-file200 were assigned 3 reducers per
imize the amount of I/O when handling intermediate data. Hence, I-file while the remaining I-files were assigned 1 reducer
for this set of experiments, for handling intermediate data we en- apiece). As a result, by better exploiting the available paral-
abled LZO-based compression with Stock Hadoop and extended lelism, the reduce phase in Sailfish is much faster com-
our Sailfish implementation to support an LZO codec. pared to Stock Hadoop. Our approach realizes these benefits
Table 3 shows the data volumes for the various jobs as well in a seamless manner without re-partitioning the intermediate
as the number of map/reduce tasks. Note that multiple waves of data and simplifies program tuning.
map/reduce tasks per job is common.
For this set of experiments, the workbuilder was configured Finally, to study the effect of change in data volume, we ran the
to assign upto 2GB of data per reduce task (independent of the ClickAttribution job using Sailfish where we increased the in-
job). This value represents a trade-off between fault-tolerance (i.e., put data size (from 25% to 100%). We found that the workbuilder
amount of computation that has to be re-done when a reducer fails) deamon automatically caused the number of reduce tasks to in-
versus performance (i.e., a large value implies fewer reducers, pos- crease proportionately (i.e., from 4096 to 8192) in a data dependent
sibly improving disk performance due to larger sequential I/Os). As manner.
Job Name Job Characteristics Operators Input size Int. data size # of mappers # of reducers Run time
Stock Hadoop Sailfish Stock Hadoop Sailfish
LogCount Data reduction COUNT 1.1TB 0.04TB 400 512 512 0:11 0:14
LogProc Skew in map output GROUP BY 1.1TB 1.1TB 400 1024 1024 3:27 0:37
LogRead Skew in reduce input GROUP BY 1.1TB 1.1TB 400 512 800 0:58 0:40
NdayModel Incr. computation JOIN 3.54TB 3.54TB 2000 4096 4096 2:18 0:42
BehaviorModel Big data job COGROUP 3.6TB 9.47TB 4000 4096 5120 4:55 3:15
ClickAttribution Big data job COGROUP, 6.8TB 8.2TB 21259 4096 4096 6:00 5:00
FILTER
SegmentExploder Data explosion COGROUP, 14.1TB 25.2TB 42092 16384 13824 13:20 8:48
FLATTEN,
FILTER
Table 3: Characteristics of the jobs used in the experiments. The data sizes are post-compression. The job run times reported in
this table are end-to-end (i.e., from start to finish). As data volumes scale, Sailfish outperforms Stock Hadoop between 20% to a
factor of 5. See Figure 11 for a break-down in the time spent in Map/Reduce phases of the job.
40 5000 4
Task run-time (in minutes)
Stock Hadoop
35 Sailfish 4500
assigned to an I-file
Data Size (in MB)
4000
Number of tasks
30 3
3500
25 3000
20 2500 2
15 2000
1500
10 1
1000
5 500
0 0 0
0 200 400 600 800 1000 1200 1400 1600 1800 2000 0 50 100 150 200 250 300 350 400 0 50 100 150 200 250 300 350 400
Map Task # Partition # Partition #
Figure 12: Distribution of map task run Figure 13: Distribution of the size of the Figure 14: For the LogRead job
time for NdayModel job. Skew in map intermediate data files in LogRead job. Sailfish handles skew in interme-
output sizes affects task completion times For this job there is a skew in distribution diate data by assigning reduce tasks in
for both Stock Hadoop and Sailfish, of data across partitions (i.e., skew in re- proportion to the data in the I-file (see
but the impact for Stock Hadoop is much ducer input). Figure 13).
higher.