Sampling Based Range Partition Methods For Big Data Analytics

Download as pdf or txt
Download as pdf or txt
You are on page 1of 16

Sampling Based Range Partition Methods

for Big Data Analytics


Milan Vojnovic
Microsoft Research
[email protected]
Fei Xu
Microsoft Corporation
[email protected]
Jingren Zhou
Microsoft Corporation
[email protected]
March 2012
Technical Report
MSR-TR-2012-18
Microsoft Research
Microsoft Corporation
One Microsoft Way
Redmond, WA 98052
http://www.research.microsoft.com
Abstract Big Data Analytics requires partitioning datasets
into thousands of partitions according to a specic set of
keys so that dierent machines can process dierent par-
titions in parallel. Range partition is one of the ways to
partition the data that is needed whenever global ordering
is required. It partitions the data according to a pre-dened
set of exclusive and continuous ranges that covers the entire
domain of the partition key. Providing high-quality (ap-
proximately equal-sized) partitions is a key problem for the
big data analytics because the job latency is determined by
the most loaded node. This problem is especially challeng-
ing because typically no statistics about the key distribu-
tion over machines for an input dataset is available at the
beginning of a range partition. The system needs to nd
a way to determine the partition boundaries that is both
cost-eective and accurate. This paper presents a weighted-
sampling based approach, implemented in Cosmosthe cloud
infrastructure for big data analytics used by Microsoft On-
line Service Division. The approach has been used by many
jobs daily and was found to be both ecient and providing
desired partition quality.
1. INTRODUCTION
Big data analytics has been gaining a momentum and
growing in importance over the last few years [5, 6, 10, 12,
15, 1]. This involves large-scale computations that use as in-
put data of volume that is often in the order of terabytes or
petabytes and are run in production data centers involving
tens of thousands of machines. A key factor to make such
computations ecient is to partition the data evenly across
dierent machines, so as to fully exploit parallelism in com-
putation where individual data partitions are processed in
parallel on dierent machines. However, we cannot always
partition the data arbitrarily. Dierent operators impose
dierent restrictions on the data partitioning. For example,
a GROUP BY query requires all the records with the same
key to reside in the same node. ORDER BY requires the
output le to be fully sorted, which means the data needs
to be range partitioned. In distributed data analytics, three
partition methods are commonly used (1) random, (2) hash
partition and (3) range partition. Dierent operators may
impose dierent restrictions and thus only some of the parti-
tions are applicable. Section 2 provides a detailed discussion
between commonly used operators and the three partition
methods.
Compared to the range partition, the other two methods
are relatively easier to implement since they do not need
to worry about the relationship between dierent key val-
ues. On the other hand, range partition requires a set of
key-ranges to be pre-dened. It is especially dicult to de-
termine the range partition boundaries in a time and com-
munication cost ecient manner for massive scale data that
is input to a computation. This input data may be a result
of an intermediate step in a computation and may be of an
arbitrary format. In such cases, we may not have statistics
about the data (to be partitioned), such as the number of
distinct keys or the total number of keys. Furthermore, the
input data may be skewed and the statistics such as the
number of records per machine or the number of records
per machine per key would typically be a priori unknown,
making the problem of data partitioning even more dicult.
A typical architecture for solving the range partition prob-
lem is to retrieve some statistics from each site that contains
1 2 k
c
...
Figure 1: System consists of a coordinator node and
k sites. Each site has access to a portion of input
data.
the input data by a coordinator node. The coordinator
then uses the statistic to determine the partition bound-
aries. In general, eective range partitioning is crucial for
many scenarios in a distributed environment and how to
choose partition boundaries is a challenge. Sampling based
methods for range partition are in particular appealing in
comparison with alternative approaches (e.g. using quantile
summaries) in view of their simplicity and eciency (see
the discussion in related work section). For example, Tera-
sort [13], which won the data sorting challenge in 2008 [11],
used a customized sampling based partitioner with standard
Mapreduce sort running on Hadoop [5, 6]. The importance
of selecting an appropriate sample for range partition was
articulated and experimentally demonstrated in [14].
We consider a standard distributed system model that
consists of k sites and a coordinator node (see Figure 1).
The coordinator is a designated node that needs to compute
the partition boundaries using as input some statistic about
the dataset. A site may refer to a task running on a machine
in a distributed cluster of machines, which reads a portion
of input data. Tasks are usually assigned to machines re-
specting data locality, so that typically, a task is assigned
to a machine that has a proximal access to data. Hence,
our goal is to make ecient use of the communication band-
width between the coordinator node and sites. Notice that
the range partition problem bears an intrinsic accuracy-cost
trade-o - we would like to take a large number of samples
in order to accurately determine the boundaries of ranges;
at the same time, we want to limit the number of samples
in order to save communication bandwidth. The question
that we study in this paper is as follows:
Q) How samples should be taken from a set of
distributed sites such that the data range parti-
tion meets a prescribed accuracy guarantee?
We dene the range partition problem studied in this pa-
per more specically as follows. Suppose the input data is
a multiset of n 1 data items that admit a total order,
that is for every pair of data items a, b we have either
a b or a > b. This accommodates as a special case a
dataset where each item a corresponds to a real value in ,
and note that we allow for values to be non unique. The
input data is partitioned across k sites such that each site
j [k] = {1, 2, . . . , k} is associated with a multiset of data
items
j
and we have =
j[k]

j
. Given an input pa-
rameter p = (p
1
, p
2
, . . . , p
m
) such that 0 p
i
1, for every
i [m] and

i[m]
p
i
= 1, the goal is to partition data
in m ranges such that the number of data items mapped
to a range i is approximately p
i
n. Our primary interest in
this paper is to produce a balanced range partition, so that
p
i
= 1/m, for every i [m], as this is the most important
case for big data analytics applications.
1
A range partition
of according to p is said to be -accurate, if for given
0 < 1, the number of data items that are mapped to
range i, denoted as Q
i
, satises Q
i
(1 + )p
i
n, for ev-
ery range i [m]. This is a standard objective for range
partition [2]. A sampling based range partition amounts to
collecting a sample of data items from by the coordinator
node, and then computing the boundaries of ranges using
a statistic derived from this sample. The challenge lies in
choosing a sample size that is large enough so that the range
partitioning meets the accuracy guarantee with a prescribed
probability of success.
In this paper we characterize the required sample size to
guarantee the aforementioned quality of the data range par-
tition. We consider a simple sampling scheme which assumes
a provision to take a random sample of a given sample size
from a distributed dataset. We rst characterize the suf-
cient sample size for the simple sampling scheme without
worrying how to take such a sample (assuming this is en-
abled by an underlying system component). Previous work
assumes that each record in the dataset (to be partitioned)
has a unique key. We provide results that allow non-unique
key values. Taking a random sample from a distributed
dataset is a nontrivial task in absence of statistics such as
the number of data items per site. TeraSort uses a simple
approximate approach by taking an equal number of sam-
ples from each site independently. This approximation is
acceptable if the input among dierent sites are approxi-
mately equally-sized, though they still did not answer an
important question on how many samples to take from each
site so as to guarantee a prescribed partition accuracy. This
approach fails in many day-to-day jobs running in Cosmos
where the distribution of the number of data items across
sites is commonly imbalanced. We discuss several common
scenarios that cause this imbalance and provide empirical
evidence that such imbalances are indeed common patterns
in practice, using some typical datasets that are processed in
Cosmos. We then introduce a more sophisticated weighted
sampling scheme that accounts for the imbalance of the
number of data items across sites. This sampling scheme
takes an equal number of samples from each site, but then
merges these samples into a summary sample in such a way
that ensures a site gets a proportional representation in this
summary sample with respect to the number of data items
accessed through this site. We then mathematically analyze
how many samples are needed using this approach in order
to satisfy a prescribed accuracy guarantee.
1.1 Summary of Our Results
Our contributions can be summarized in the following
1
Our analysis allows for arbitrary range partition, which
may be of general interest. This would support environ-
ments where data has to be range-partitioned across hetero-
geneous computing machines or when computational com-
plexity of an underlying task is dependent on the section of
the input data range.
points:
1) Simple Sampling Scheme: We studied a simple sampling
scheme and characterized a sucient sample size for this
sampling scheme to guarantee a given accuracy with a pre-
scribed probability where the input data could have multiple
records with the same partition key value. Previous research
work was restricted to the case of unique key values [2].
2) Weighted Sampling Scheme: We provide and analyze a
practical weighted sampling scheme that allows input data
to have repeated partition key values and allows the input
data to have arbitrary distribution of the number of data
items per site, given that the simple sampling scheme is dif-
cult to be implemented in practice. We characterize the
sucient sample size to guarantee a given accuracy with a
prescribed probability. This analysis reveals what is the suf-
cient statistics needed about the dataset to determine the
required sample size. In particular, we nd that a key statis-
tic is a measure of imbalance of the number of data items
across sites.
3) Data Analysis: We provide a data analysis of some typical
datasets processed in a production data center of a major
online service provider to evaluate the extent of data imbal-
ance across sites, and provide simulation results to evaluate
the eciency of the weighted sampling scheme.
1.2 Outline of the Paper
The paper is structured as follows. In Section 2 we rst
overview the architecture of Cosmos and then discuss the re-
lationship between the commonly used operators and data
partitioning methods. In Section 3, we rst introduce the
simple sampling scheme, and then provide the sucient sam-
ple size analysis. In Section 4, we rst discuss why the sim-
ple sampling scheme is not practical. We then discuss the
approach used in TeraSort and point out that this is also
not practical because the input data is not always balanced
across dierent sites for day-to-day jobs. We then discuss
several typical scenarios that cause imbalance of the data
items across sites, and then introduce our weighted sampling
scheme. We nally provide analysis results on the required
sample size to meet a prescribed accuracy guarantee for data
partition with a given probability using the weighted sam-
pling scheme. Section 5 provides data analysis results and
evaluation of the required sample size for dierent values
of input parameters and data inputs. Related work is dis-
cussed in Section 6 and in Section 7 we conclude. All the
proofs are deferred to Appendix (some to the Appendix of
the companion online technical report [18]).
2. DATAPARTITIONINGINBIGDATAAN-
ALYTICS
In this section, we rst discuss how a typical big data
analytics platform looks like by giving a brief description
of Cosmos, the cloud infrastructure for big data analytics
used in Microsoft Online Service Division. We then discuss
the SCOPE language, a high-level SQL-like descriptive lan-
guage used by Cosmos to for big data analytics. Then we
discuss dierent operators used in SCOPE and their rela-
tionship with three well-known partition methods. Finally,
we briey discuss the infrastructure described in the intro-
duction section, which is used for the range partition.
2.1 Cosmos and SCOPE
Cosmos is a large-scale, distributed storage and comput-
ing platform developed by Microsoft, running on tens of
thousands of commodity servers. Cosmos provides a cost-
eective infrastructure to reliably store massive amounts of
data, and process and analyze the data in an ecient, robust
and scalable fashion.
Cosmos has three layers: (1) Cosmos storage layer, (2)
Cosmos execution layer and (3) SCOPE layer. Cosmos stor-
age layer is a distributed storage system that is designed and
optimized for storing petabytes append-only sequential les.
Each le is composed by a sequence of extents. Each extent
is typically a few hundred megabytes in size. Extents are
compressed, and replicated to dierent machines for cost
eciency and reliability.
Cosmos execution layer uses Dryad [10] to execute a job.
A job is modeled as a directed acyclic graph (DAG). Each
vertex in the graph is a computation unit and each edge
represents a data ow. The job is executed by the Job Man-
ager, which is a single coordinator for all the vertices of
the job. Job manager constructs the job graph and sched-
ules the vertices (computation units) to dierent machines
in the data center. It also monitors the running vertices,
and rerun the vertices when failure happens. We refer the
interested reader to the Dryad paper [10] for details.
SCOPE is the ocial (and only) language for users within
Microsoft to submit analytical jobs to Cosmos. Each day,
thousands of scope jobs are running over petabytes of data.
SCOPE is a SQL-like descriptive scripting language. But it
is also very extensible to allow arbitrary customized C#
types and functions. A SCOPE script is a sequence of
SCOPE statements. Most SCOPE statements take one or
more rowsets as input and generates one output rowset. The
most commonly used statement is the SELECT statement,
which basically implements the SQL SELECT statement.
It supports ltering, projection, all sorts of joins, aggre-
gates, group by and order by. Sub-queries are not sup-
ported. SCOPE also provides three statements, PROCESS,
REDUCE, and COMBINE, that allows users to write cus-
tomized C# code to manipulate the rowset transformation.
PROCESS is a row-wise independent operator that is equiv-
alent to Map in MapReduce. REDUCE is the same as the
Reduce in MapReduce. Combine is equivalent to a cus-
tomized join. We refer the interested reader to the SCOPE
paper [1] for details.
2.2 Data Processing Operators and
Data Partitioning
Like other data analytics platforms, Big Data Analytics
needs similar operators: projection, ltering, join, group by,
sorting, and etc. Given the input data volume, it is impor-
tant for Big Data Analytics platform to be able to partition
input data into thousands of partitions, so that each parti-
tion can be executed in parallel. However, data cannot be
partitioned arbitrarily as dierent operators impose dier-
ent restrictions on how the system can partition the data.
We review some basic restrictions.
We rst look at the MapReduce programming model. Map
is basically an operator that applies to each row indepen-
dently. We call this type of operator a row-wise indepen-
dent operator. Row-wise independent operators allow the
system to process each row independently and possibly in
parallel. Therefore, regardless how we partition the data, it
would not break the semantic of row-wise independent op-
erators. Reduce, however, is dierent. It is required that
all the rows that has the same key value must be presented
and processed together for a Reduce command. Therefore,
our partition scheme is constrained. We cannot put rows
with the same key into dierent partitions. We call such an
operator a key-wise independent operator.
In SCOPE, it is clear that operators such as ltering and
projection are row-wise independent operators. Join and
group by are key-wise independent operators. Order by is
dierent. Order by imposes a global ordering according to
some ordering key. But no single machine can sort all the
data, even using disk-based sorting, in a reasonable amount
of time. Thus, we require data to be partitioned so that
these partitions have an ordering: all records in one parti-
tion must all be either smaller or larger than any record in
another partition.
As mentioned in Section 1, random partition, hash parti-
tion and range partition are three commonly used partition
methods in big data analytics. Random partition randomly
partitions the data into partitions without any limitations.
Hash partition rst needs to hash the record partition key,
and then map the hash value of the key to a partition. There
are multiple ways for this mapping. A typical approach is
round-robin, that is, mod the hash-key with the number of
partitions, the result is the partition id (0 is the rst index).
For example, if hash key is 38 and we need 5 partitions.
This record is mapped to partition 3. It is important to un-
derstand that hash-partition provides key-wise independent
guarantee, because if records have the same key value, they
must have the same hash value. Thus, they will be mapped
to the same partition. However, hash partition does not
guarantee ordering among the partitions.
Range partition is to partition the data according to a
prescribed ranges. Range partition provides both key-wise
independent guarantee and partition-wise ordering. There-
fore, anything that could be implemented using hash par-
tition can also be implemented using range partition. In a
big data analytical platform, if global sorting is a required
feature, then range partition is required to be implemented,
but hash partition is not required.
3. SIMPLE SAMPLING-BASED RANGE
PARTITION
This section rst introduces the simple sampling scheme.
Then we discuss the sucient sample size for this approach.
3.1 Simple Sampling Approach
In practice, the sample size is typically arbitrary xed
and thus does not necessarily meet an a priori specied
guarantee. For example, in Terasort [13], the sampler used
100,000 keys to determine the reduce boundaries, and it was
observed that the distribution between reduces was hardly
perfect and would have beneted from more samples. It is
thus important to understand how many samples should be
taken from a dataset to meet an a priori specied accuracy
guarantee. We will consider the following simple sampling
scheme.
Simple Sampling Scheme
Input: partition relative sizes p = (p
1
, p
2
, . . . , p
m
)
Parameter t := number of samples
1. SAMPLE: Coordinator collects a set S of t random
samples from
2. PARTITION: Coordinator determines boundaries of
ranges using data samples S
This is a natural and standard random sampling scheme
that was considered in prior work as early as in [2]. The
random sample is assumed to be either with or without re-
placement where the latter could be realized by a reservoir
sampling scheme [17], if the data is stored on one site. The
challenge lies in setting the sample size large enough such
that the desired data range partition meets a desired accu-
racy guarantee for given relative partition sizes p and input
data which may consist of data items with repeated data
values.
3.2 Approximation Guarantee
We recall that given as input, relative partition sizes p =
(p
1
, p
2
, . . . , p
m
) and the relative accuracy parameter > 0,
a range partition of a dataset is said to be -accurate,
if denoting with Q
i
the number of data items of that
fall in range i, we have Q
i
(1 + )p
i
n, for every range
i [m]. We would like to guarantee the latter property to
hold with a probability at least 1 , where (0, 1] is an
input parameter. We will denote with the smallest relative
partition size, i.e. = min
i[m]
p
i
. Notice that for the case
of a balanced partition, = 1/m.
Let us also introduce some additional notation. Let h(a)
denote the frequency of a data item a , i.e. h(a) =
|{b|b=a}|
||
. We dene to be an upper bound on the largest
frequency of a data item, i.e.
h(a) , for every a .
Notice that if all data items are distinct, then = 1/n.
3.3 Sufcient Sample Size for Simple
Sampling Scheme
We characterize the sucient sample size for the simple
sampling scheme as follows.
Theorem 1. Suppose that sampling is random either with
or without replacement. Suppose that > / and that the
sample size t satises
t 2
1

2
(1 +/)(1 /(1 ))
(1 /())
2
polylog(1/, m, n)
where polylog(1/, m, n) = log
_
1

_
+ log(m) + log(n).
2
Then, the range partition is -approximate with probability
at least 1 .
2
Hereinafter, for simplicity of exposition, we omit a factor
in the sucient sample size which is 1 +O().
Proof is provided in Appendix A. Note that the theorem
tells us that in order to derive a sucient sample size, we
need the following information about the input dataset: (1)
an upper bound on the total number of data items and (2)
an upper bound on the frequency of the most frequent data
item. For the case of a balanced range partition, we have
the following corollary.
Corollary 1. For a balanced range partition over m >
1 ranges, we have that an -approximate range partition can
be computed with probability at least 1 , provided that
> m and the sample size t is such that
t
2m

2
1 +m
(1 m/)
2
polylog(1/, m, n).
These results are established by using the framework of
large deviations and provide a tight characterization of the
probability of error exponent (the factor that multiplies the
polylog term in the above expressions).. For the special case
of a balanced partition into m ranges and m = o(1),
3
the
bound in the corollary boils down to
t
2m

2
polylog(1/, m, n).
The result of this type for a balanced range partition was
rst asserted in [2] and later extended to arbitrary range
partition in [16]. The new information in the above results
is in allowing for non unique data values which aects the
sample size through the parameter . The communication
complexity of the sampling scheme is

O(m/
2
) where the
scaling (1/
2
) is a folklore known to hold for sampling
based schemes. We further discuss related work later in
Section 6.
Remark In Appendix A, we provide a less explicit but
tighter characterization of the sucient sample size than
that in Theorem 1.
4. WEIGHTEDSAMPLING-BASEDRANGE
PARTITION
In this section, we rst discuss why simple sampling scheme
is not practical in applications. We then introduce the
weighted sampling scheme, and provide the sucient sample
size analysis for this scheme.
4.1 Why is Simple Sampling Scheme not
Practical?
The key assumption of the simple sampling scheme is that
there is a provisioning to take a random sample from the
dataset. This is a nontrivial task in distributed environ-
ments where the input data is distributed across several
sites. Any one-pass algorithm requires to merge the sam-
ples taken from each site into a statistically correct random
sample, with or without replacement. The merge opera-
tion is costly. One needs to take a random sample from
a hypo-geometric distribution or multinomial distribution
for sampling without replacement and with replacement, re-
spectively. This limits size of merged sample to be the mini-
mum size of the samples from each site. That is, most of the
3
This holds, for example, when the data values are unique
so that = O(1/n) and m/n = o(1), i.e. the number of
partitions is much smaller than the total number of data
items.
samples are discarded in order to make a statistically correct
random sample. Other approaches are no longer one-pass.
Typical approaches need at least two passes. In the rst
pass, each site sends the number of records from each site to
the coordinator. The coordinator then can determine how
many samples is needed from each site by using a hyper-
geometric distribution or multinomial distribution, depend-
ing whether sampling without replacement or replacement
is used. These samples can then be simply merged. A two-
pass algorithm is generally too costly in this environment.
TeraSort [13] uses an approximate approach by taking
equal-size random samples from each site, and the merges
these samples at the coordinator directly. This approach
is reasonable for TeraSort since each site has the same in-
put data volume, even though they do not study the su-
cient sample-size problem in their approach, and just picked
some arbitrary number as the sample size. However, this
approach is not practical for day-to-day big analytical jobs
in practice. This is because the data input sizes across sites
is imbalanced and there may be a lack of a prior informa-
tion about this imbalance. In the next section, we evaluate
origins of such data input size imbalance and then present
a sampling scheme that accounts for this imbalance.
4.2 What may Cause Input Sizes Imbalance?
We discuss several common data processing operations
that may cause imbalance of data input sizes across sites.
This, together with data analysis in Section 5, serves as a
motivation to consider a more sophisticated sampling scheme
in the section following the present one. We consider the fol-
lowing common data processing operations.
JOIN. Consider the following basic join operation:
SELECT
FROM A INNER JOIN B ON A.KEY==B.KEY
ORDER BY COL;
This requires the system to co-partition A and B on A.KEY
and B.KEY, respectively. For intermediate results, our sys-
tem writes the output to the local volume of the same ma-
chine. Then, the system needs to partition the join result
(using range-partition) on COL. Even though we can guar-
antee that A and B are evenly partitioned, there is no guar-
antee that the join result of each node would contain approx-
imately the same number of records. In fact, the number of
records on dierent nodes can be very dierent. This is one
real example where we need to partition a data input with
imbalanced sizes across machines. In distributed comput-
ing environments, there are even other fundamental reasons
why a balanced input does not guarantee a balanced output,
for example, in these environments it is typical to write in-
termediate output to local volumes directly, which are then
used as input for subsequent computations. Again, there
is no reason that such intermediate result sets would be of
roughly the same sizes across dierent nodes.
REDUCE. Consider a reduce command that reduces on
an integer key. What this reducer does is to replicate copies
of rows it sees according to the key value. For example, if
key=2 then all records are replicated twice. If key=100, all
records are recorded 1000 times. Now if the number of keys
at each node is identical, the output data may be dierent,
depending only on the key values.
Lookup Table. Consider that we have a static lookup ta-
ble. Each node does the following: for any given input row,
look at column X, if column X is in the lookup table, then
the row is returned. Otherwise, the row is ltered out. If the
input was range partitioned (or hash partitioned) on column
X, then there is a good chance that many nodes return very
few rows and a few nodes return many rows.
UNPIVOT. Consider the following example of the unpivot
operation, which is widely used in processing of graph data:
Column 1 Column 2
1 2, 3
2 3, 9, 8, 13
...
The output of the unpivot operation is (1,2), (1,3), (2,3),
(2,9), (2,8), (2,13), . If the data range partition is with
respect to Column 1, then the output sizes may well be
imbalanced because of the variable number of elements in
Column 2.
The above common data processing operations suggest
that in practice input data sizes across sites may well be
imbalanced, and later in Section 5 we provide an evidence
of this using some common datasets from an operational
data center.
4.3 Weighted Sampling Approach
In this section, we introduce a weighted sampling scheme
that accounts for imbalance of data input sizes across sites.
The sampling scheme is designed to create a summary sam-
ple such that a site with a larger number of data items is
represented with a larger number of elements in this sum-
mary sample. This is done with the aim to reduce the bias
due to imbalance of the data input sizes across dierent sites.
Weighted Sampling Scheme
Input: partition relative sizes p = (p
1
, p
2
, . . . , p
m
)
Parameter t := total number of samples to take
1. SAMPLE: Coordinator node requests t
1
samples from
each site. Each site i reports to the coordinator a
multiset S
i
that is a random sample of t/k data items
from
i
and the total number of items n
i
= |
i
|.
2. MERGE: Coordinator node constructs a summary sam-
ple S of data items by initializing S = and then
adding n
i
copies of each item in S
i
for every site i.
3. PARTITION: Determine a set of keys that partition
the summary sample S into m ranges with each range
i containing a fraction p
i
of data items in S.
Notice that the data input size accessed through each site
is used only after the sample is collected. This means that
the scheme can be implemented by making one pass through
the data - each site may take samples by using a sampling
scheme (e.g. reservoir sampling [17]) that makes one pass
through the data, and once this pass is completed, the site
knows the total number of data items accessed through this
site. Using the above weighted sampling scheme, a summary
sample is constructed such that each site is represented by a
number of sampled items that is proportional to the number
of data items accessed by this site. Note that is to mitigate
the bias due to the imbalance of data input sizes across sites,
but it does not necessarily completely remove it.
4
. There-
fore, we need to study what the sucient sample size is in
order to guarantee accuracy of range partition.
4.4 Sufcient Sample Size for Weighted Sam-
pling Scheme
We next characterize the sucient sample size for the
weighted sampling scheme. We shall see that a single pa-
rameter that quanties the imbalance of data sizes across
sites plays a key role in determining the sucient sample
size, which we introduce in the following denition.
Definition 1 (input sizes imbalance). Given a num-
ber of sites k 1 and the number of data items per site spec-
ied by the vector n = (n
1
, n
2
, . . . , n
k
), we dene the input
imbalance statistic as follows
(n) = k
2
(n)
2
/4.
where (n)
2
is the variance parameter given by (n)
2
=
1
k

k
i=1
_
n
i
n
_
2
(
1
k

k
i=1
n
i
n
)
2
.
Notice that (n) = [k

k
i=1
(n
i
/n)
2
1]/4. For the per-
fectly balanced number of data items per site, i.e. n =
(n/k, n/k, . . . , n/k), we have (n) = 0. The other extreme
is when all data items reside exclusively at one site, i.e.
n
i
= n, for some i [k], and in this case (n) = (k 1)/4.
The following is the main theorem of this paper.
Theorem 2. Assume < . Then, the sucient sam-
ple size t to guarantee -accurate range partition with prob-
ability at least 1 is
t
2(1 +(n))

2
(1 /())
2
(1/, m, n, k)
where (1/, m, n, k) = log(1/) + log(m) +k log(n).
Proof is provided in Appendix. The proof is based on a
careful analysis of the probability of error exponent and ex-
tracting the most essential terms. The theorem tells us that
in addition to the sucient statistics identied for the sim-
ple sampling scheme, the additional statistic that is needed
to determine a sucient sample size is the input sizes im-
balance statistic (n). In Appendix B.3, we provide a nor-
mal approximation that provides the same result as in the
theorem with = log(m/), which we suggest to use in
applications.
We next derive a corollary that characterizes sucient
sample size for data input whose sizes across sites are con-
stant factor imbalanced, a simple concept that we introduce
in the following denition.
Definition 2 (constant factor imbalance). A dis-
tribution data items over sites is said to be -constant factor
imbalanced if the number of data items n
i
at each site i is
such that n
i
n/k.
4
Under a uniform random sampling scheme, the number
of samples taken across sites (t
1
, t
2
, . . . , t
k
) is a multivari-
ate random variable that has a multinomial distribution
with parameter (t, n/n). Moreover, the weighted sampling
scheme replicates samples which may introduce correlations.
Figure 2: Weighted Sampling algorithm implemen-
tation in SCOPE.
In other words, the maximum number of data items per
site is at most a factor of the mean number of data items
per site. We observe that for data input sizes n that are
-constant factor imbalanced, we have (n) = (
2
1)/4,
which yields the following corollary.
Corollary 2. Assume < and that the distribu-
tion of data items over sites is -constant factor imbalanced.
Then, the sucient sample size to guarantee -accurate par-
tition with probability at least 1 is
t
2(1 ) +
1
2
(
2
1)

2
(1 /())
2
(1/, m, n, k).
The corollary tells us that for any distribution of data
over sites such that the maximum number of data items at
a site is at most a constant factor of the mean number of
data items per site, the required sample size increases only
for a constant factor compared with the case of perfectly
balanced data input.
The above results provide an upper bound on the commu-
nication complexity. Notice that the algorithm uses a sin-
gle round of communication between the coordinator node
and sites, where in the forward direction

O(k) information
is transferred, it remains only to asses the information in
the backward direction, from the sites to the coordinator.
For a balanced range partition, the total communication is

O(m(1 + (n))/
2
), and we note that in case of a constant
factor balanced input sizes, the communication complexity
is

O(m/
2
). This is again is inline with the folklore that
sampling based schemes require (1/
2
) amount of commu-
nication.
4.5 How is Weighted Sampling Range Parti-
tion Implemented in SCOPE?
Dryad uses a DAG as its execution graph, where each
vertex is a computation unit and each edge is a data ow.
SCOPE generates a few specic vertices and edges for the
range partition in the execution graph. Figure 2 describes
the graph. We describe the algorithm as follows:
1. Each site that contains the input data takes a xed-size
sample, described as the sample collector in the gure.
The site sends the total number of records together
with the samples to a coordinator, discussed below.
Notice that these vertices output two data channels,
one for the samples to the coordinator, and one for
the local partitionerthe computation vertices that do
the local partitioning according to boundaries from the
coordinator.
2. The coordinator collects all the samples according to
the weighted-sampling scheme, and computes the range
boundaries. The boundaries are sent to the local par-
titioners.
3. Each of the local partitioner partitions its local data
according to the boundaries sent by the coordinator.
4. Finally, each downstream partition merger (one per
partition) reads its portion of the data from every local
partitioner and aggregates the inputs to continue its
computation.
There are a few things that are worth mentioning. First,
the job manager is quite intelligent in that it could gure
out that sample collectors and local partitioners are reading
the same data. Therefore, each corresponding local parti-
tioner will be scheduled to the same machine as each sample
collector to minimize the overhead. Second, the aggregation
at the coordinator side to collects all the samples is done
by building an aggregation tree to improve the eciency
and reduce the cross-pod network trac. Third, when the
coordinator sends back the partition boundaries to each lo-
cal partitioner, the job manager is smart enough to only let
the coordinator communicate to one vertex per pod; other
vertices in the same pod will get the data from the one
that communicates to the coordinator. This way, we reduce
the cross-pod network trac. Finally, the local partitioner
writes to a single le even though it partitions the data into
multiple partitions. The partition mergers are able to gure
out what osets they need to read from each local parti-
tioner using the information passed as part of the metadata
to build the execution graph. This way, we reduce some
overhead to the distributed le system, especially for the
metadata service.
5. DATA ANALYSIS
In this section we evaluate the extent of data input sizes
imbalance in some real-world datasets and compare the su-
cient sample size with empirical estimates. For our analysis,
we use some typical datasets that are processed in Cosmos
in daily job processing. We also use some synthetic data to
further evaluate tightness of the sample size characteriza-
tions.
We consider a set of datasets that we retrieved by execut-
ing SCOPE queries on some typical datasets over particular
intervals of time, so as to keep the data sizes manageable
for the purpose of our analysis but still being representative
of typical datasets. The datasets are diverse with respect
to the total number of records, the raw byte sizes and the
number of sites through which the data is accessed. These
datasets are also diverse with respect to the application do-
main; they pertain to some major online services supported
and processed in the Cosmos platform on a day-to-day basis.
A summary of the datasets is provided in Table 5.
Table 1: Datasets used.
Dataset Records Raw bytes Sites
DataSet-1 62M 150G 262
DataSet-2 37M 25G 80
DataSet-3 13M 0.26G 1
DataSet-4 7M 1.2T 301
DataSet-5 106M 7T 5652
0 20 40 60 80 100
0
20
40
60
80
100
Sites (%)
C
u
m
u
l
a
t
i
v
e

i
t
e
m
s

(
%
)
DataSet1
DataSet4
DataSet5
DataSet2
Figure 3: Distribution of the number of data items
across sites.
5.1 Data Input Sizes Imbalance
We rst evaluate the imbalance of the number of data
items across sites. In Figure 3, for a given dataset, we
present the percentage of data items covered by a given per-
centage of sites where sites are sorted in decreasing order
with respect to the number of data items they are associated
with. These representation clearly shows that the distribu-
tion of the number of data items across sites is imbalanced.
If the data input sizes were perfectly balanced, then this
would correspond to the dashed line in Figure 3. We ob-
serve that data input sizes may be signicantly imbalanced.
We now examine the data input sizes imbalance, the statis-
tic, which in Section 4.4 we found to play a key role in deter-
mining the sucient sample size for the weighted sampling
scheme. In Figure 4, we present this statistic for various
datasets that we study and observe that there exist cases
for which this parameter can be in excess of 10%.
5.2 Sufcient Sample Size
We demonstrate correctness and accuracy of the weighted
sampling scheme using both real and synthetic dataset. The
use of real data enables us to demonstrate performance of
the algorithm in an operational system. We also use syn-
thetic data to demonstrate accuracy and tightness of the
sucient sample size as this allows us to have control over
data input sizes imbalance.
We rst demonstrate the accuracy of the range partition
using Dataset-1 as input data to our implementation of the
weighted sampling scheme in Cosmos. This range partition
was run for the number of ranges determined such that there
are 100,000 of records per range and xing the probability
of error to 0.1, for various choices of the input parame-
ter . In Figure 5, we show the sucient sample size and
the observed relative error versus the input parameter .
These results demonstrate how closely the observed relative
1 2 4 5
0
2
4
6
8
10
12
DataSet
I
m
b
a
l
a
n
c
e

s
t
a
t
i
s
t
i
c

(
%
)
Figure 4: The data input sizes imbalance statistic
for dierent datasets.
0 0.2 0.4 0.6 0.8 1
0
0.1
0.2
0.3
0.4
0.5

S
a
m
p
l
e

s
i
z
e

(
%
)


0 0.2 0.4 0.6 0.8 1
0
0.2
0.4
0.6
0.8
1

M
a
x

r
e
l
a
t
i
v
e

d
e
v
i
a
t
i
o
n
Figure 5: Range partition of DataSet-1: (top) sam-
ple size vs. and (bottom) relative error vs. .
error matches the input parameter and how this can be
achieved by sampling only a small percentage of the input
data records.
We further demonstrate the accuracy of the weighted sam-
pling range partition using a synthetic input dataset in a
simulator, which allows us to control the data input sizes
imbalance. The input dataset is assumed to be distributed
across sites so that there is a unique site that may access
a larger number of data items than any other site, and all
other sites access an equal portion of the input dataset. Con-
cretely, without loss of generality, we assume that data in-
put sizes are such that n
1
n
2
= = n
k
= an
1
, where
a 1 is an input parameter that we vary. The case a = 1
corresponds to a balanced input dataset. Given the to-
tal data input size n, we have n
1
= na/(a + k 1) and
n
i
= n/(a + k 1), for i = 2, 3, . . . , k. Specically, we con-
sider the case the input data of n = 100, 000 data items,
partitioned across k = 4 sites and balanced range partition
in m = 5 ranges with the relative accuracy = 1/10. The
input data items are assumed to be of distinct values and
assigned uniformly at random across sites subject to given
data input sizes across sites. In Figure 6 we present esti-
mated probability of error for given sample size along with
95% condence intervals for varying values of parameter a.
We observe that the probability of error clearly exibits an
exponential decrease with the sample size. In the gures, we
also show our analytical sample sizes, derived in Theorem 2,
for = log(1/) and (dashed line) and = log(m/) (solid
line). These results demonstrate the accuracy and tightness
of the sucient sample size characterization.
6. RELATED WORK
Big data analytics becomes an important area over last
few years. MapReduce [5, 6] introduced by researchers al-
lows users to process massive amount of data using two sim-
ple primitives: Map and Reduce. Hadoop is an open source
implementation of MapReduce. Dryad [10] is a more exible
approach that allows users to describe the computation as a
data ow DAG, where each vertex is a computation unit and
an edge indicates a data ow. These systems provide auto-
matic parallelism and automatic fault tolerance and failure
recovery mechanisms. Pig [12] and Hive [15] are two high-
level descriptive languages on Hadoop for big data analytics.
SCOPE [1] is a SQL-like descriptive language on Cosmos for
big data analytics. These languages make big data analytics
easy to perform for end users while maintaining the scala-
bility and robustness of the underlining system.
One of the early references related to our work is that of
random sampling for histogram construction by Chaudhuri
et al [2] which may be seen as simple sampling scheme for
determining a balanced range partition. Our analysis of the
simple sampling scheme yields similar type of bounds for
sucient sample size, but allows for non-unique data values
and arbitrary range partition.
The range partition problem may be seen as essentially
equivalent to the problem of computing quantiles. For the
quantile computation there are two versions of the prob-
lem (1) one-shot, where the output of the computation is
required at the end of processing of an input stream and
(2) continuous, where the output is required at each instant
while passing through an input stream of data. Our work in
on the one-shot version of the problem. Recent work on con-
tinuous quantile tracking [9, 19, 3] suggests that a practical
algorithm may be derived for balanced range partition in m
ranges with expected total communication cost

O(m

k/).
Note, however, that the algorithms that we consider are dif-
ferent as they require only a single round of communication
between the coordinator node and sites.
An alternative method for computing quantiles is to use a
histogram-based approach where each sites compute quan-
tile summaries from their locally accessed data, which are
then merged to create an aggregate quantile summary. By
results of Greenwald and Khanna [7, 8], we know that using
this approach we could compute a balanced range partition
in m ranges with total communication cost

O(mk/). A
drawback of this approach is that it assumes that the largest
frequency of an item locally at each site is at most 2 where
is the relative accuracy parameter. While this is a reason-
able assumption for the case of unique data values, it may
be a restrictive assumption in case of non-unique values and
where the data items are distributed across sites such that
0 1000 2000 3000 4000 5000
10
3
10
2
10
1
10
0
Sample size
P
r
o
b

E
r
r
0 1000 2000 3000 4000 5000
10
3
10
2
10
1
10
0
Sample size
P
r
o
b

E
r
r
0 1000 2000 3000 4000 5000
10
3
10
2
10
1
10
0
Sample size
P
r
o
b

E
r
r
Figure 6: Probability of error vs. sample size: (left) a = 1, (middle) 2, (right) 4.
for some sites the local frequency of an item is large while
the frequency of each item in the entire dataset is actually
small. Another drawback is that the total communication
cost would be larger whenever the number of sites k is suf-
ciently larger than 1/, which may well be the case in big
data analytics environments. Finally, the approach based
on quantile summaries is more computationally expensive
as each site needs to create a quantile summary which re-
quires sorting the locally accessed data.
7. CONCLUSION
We have analyzed several sampling based methods for
range partition of data for big data analytics processing
tasks. We introduced a simple weighted sampling scheme
that accounts for data input sizes imbalance, which we demon-
strated to be prevalent in real production data centers. Our
analysis provides tight estimates for the sucient sample
size to guarantee a prescribed accuracy of range partition.
The weighted sampling scheme presented in this paper has
been used daily in the Cosmos environment, and has proved
itself to provide a desired accuracy at a low cost.
There are several interesting directions for future work.
First, one may investigate how to design practical algorithms
that are communication cost ecient for various types of
queries and allowing for multiple rounds of communication.
Furthermore, one may study how to integrate with a query
optimizer so that the most eective sampling method is
chosen given the prior knowledge that the optimizer knows
while the query is computed.
Acknowledgments
We would like to thank Wei Lin and Bing Shi for parts of
basic system infrastructure that they developed and we used
in our implementation and all the SCOPE team members
for their support to this project.
8. REFERENCES
[1] R. Chaiken, B. Jenkins, P.-A. Larson, B. Ramsey,
D. Shakib, S. Weaver, and J. Zhou. Scope: easy and
ecient parallel processing of massive data sets. Proc.
of VLDB 08, 1:12651276, August 2008.
[2] S. Chaudhuri, R. Motwani, and V. Narasayya.
Random sampling for histogram construction: how
much is enough? In Proc. of ACM SIGMOD,
volume 27, pages 436447, June 1998.
[3] G. Cormode, M. Garofalakis, S. Mutukrishnan, and
R. Rastogi. Holistic aggregates in a networked world:
Distributed tracking of approximate quantiles. In
Proc. of SIGMOD, June 2005.
[4] T. M. Cover and J. A. Thomas. Elements of
Information Theory. Wiley-Interscience, 2 edition,
2006.
[5] J. Dean and S. Ghemawat. Mapreduce: simplied
data processing on large clusters. In Proc. of OSDI
04, pages 1010, 2004.
[6] J. Dean and S. Ghemawat. Mapreduce: simplied
data processing on large clusters. Commun. ACM,
51:107113, January 2008.
[7] M. B. Greenwald and S. Khanna. Space-ecient
online computation of quantile summaries. In Proc. of
SIGMOD, pages 5866, 2001.
[8] M. B. Greenwald and S. Khanna. Power-conserving
computation of order-statistics over sensor networks.
In Proc. of PODS, 2004.
[9] Z. Huang, K. Yi, and Q. Zhang. Randomized
algorithms for tracking distributed count, frequencies,
and ranks. In arXiv:1108.3413v1, Aug 2011.
[10] M. Isard, M. Budiu, Y. Yu, A. Birrell, and D. Fetterly.
Dryad: distributed data-parallel programs from
sequential building blocks. In Proc. of ACM
SIGOPS/EuroSys 07, pages 5972.
[11] C. Nyberg, M. Shah, and N. Govindaraju.
Softbenchmark. http://softbenchmark.org, 2011.
[12] C. Olston, B. Reed, U. Srivastava, R. Kumar, and
A. Tomkins. Pig latin: a not-so-foreign language for
data processing. In Proc. of ACM SIGMOD 08, pages
10991110, 2008.
[13] O. OMalley. Terabyte sort on apache hadoop.
http://sortbenchmark.org/YahooHadoop.pdf, May
2008.
[14] O. OMalley and A. C. Murthy. Winning a 60 second
dash with a yellow elephant.
http://sortbenchmark.org/Yahoo2009.pdf, 2009.
[15] A. Thusoo, J. Sarma, N. Jain, Z. Shao, P. Chakka,
N. Zhang, S. Antony, H. Liu, and R. Murthy. Hive - a
petabyte scale data warehouse using hadoop. In Proc.
of IEEE ICDE, pages 996 1005, march 2010.
[16] D. Vasudevan and M. Vojnovic. Random sampling for
data intensive computations. Technical Report
MSR-TR-2009-08, Microsoft Research, April 2010.
[17] J. S. Vitter. Random sampling with a reservoir. ACM
Trans. on Mathematical Software, 11:3757, March
1985.
[18] M. Vojnovic, F. Xu, and J. Zhou. Sampling based
range partition methods for big data analytics.
Technical Report MSR-TR-2012-18, Microsoft
Research, Februrary 2012.
[19] K. Yi and Q. Zhang. Optimal tracking of distributed
heavy hitters and quantiles. In Proc. of PODS, June
2009.
APPENDIX
A. SIMPLE SAMPLING SCHEME
We consider range partition of the dataset with the fol-
lowing relative range sizes p = (p
1
, p
2
, . . . , p
m
). Let S be
a sample of t samples obtained by uniform random sam-
pling from without replacement. The boundaries

b =
(b
1
, b
2
, . . . , b
m1
) of the ranges are determined using the
sample S such that range 1 contains items a such that
a b
1
, range i contains items a such that b
i1
< a b
i
,
for i {2, 3, . . . , m1}, and range m contains items a
such that a > b
m1
. Let
i
be the fraction of samples in S
that fall in range i [m], and let = (
1
,
2
, . . . ,
m
). The
range boundaries

b are determined such that the following
inequalities hold
p
i

i
p
i
+, for every i [m]
where recall is an upper bound on the frequency of the
most frequent data item.
Let q
i
denote the fraction of data items in that fall in
range i [m]. Notice that the sampling based scheme uses

i
as an estimate for q
i
, i [m]. We shall use the notation
T
i
= (b
i1
, b
i
,
i
), for i [m].
We dene the cumulative distribution function F associ-
ated with the dataset as follows
F(a) =
|{b | b a}|
||
, for a .
Recall that the range partition is said to be -accurate
with probability at least 1 , if the following holds:

t
:= P[
m
i=1
{q
i
> (1 +)p
i
} | S] . (1)
Notice that we have
P[
m
i=1
{q
i
> (1 +)p
i
} | S]
= P[
m
i=1
{q
i
> (1 +)p
i
} | (

b, )]

i=1
P[q
i
> (1 +)p
i
| T
i
]
where the rst equality holds because under uniform ran-
dom sampling, given a set of samples, the probability of the
given event depends only on the width of the range and the
fraction of samples in S that fall in the given range, and the
last inequality is by the union bound.
Note under T
i
, q
i
is a random variable that takes values
from the set Q(T
i
) dened as follows
Q(T
i
) = {
t
i
n
,
t
i
+ 1
n
, . . . , F(b
i
) F(b
i1
)}.
Let us dene f
n,t
(x, y) as the probability that a uniform
random sample of size t, drawn without replacement from a
pool of n items, draws ty items from a designated subset of
nx items. This probability is equal to
f
n,t
(x, y) =
_
nx
ty
__
n(1x)
t(1y)
_
_
n
t
_ . (2)
Note that
P[q
i
> (1 +)p
i
| T
i
]
=

q
i
Q(T
i
):q
i
(1+)p
i
f
n,t
(q
i
,
i
)
(T
i
) f
n,t
(q

i
,
i
).
where q

i
is such that q

i
(1 +)p
i
and
f
n,t
(q

i
,
i
) = max
q[0,1]:q(1+)p
i
f
n,t
(q,
i
)
and
(T
i
) =

qQ(T
i
):q(1+)p
i
f
n,t
(q,
i
)
f
n,t
(q

i
,
i
)
.
From (2), using a simple calculus, we observe log(f
n,t
(x, y)) =

n,t
(x, y) with

n,t
(x, y) = n
_
t
n
D(y||x) +
_
1
t
n
_
D
_
x
t
n
y
1
t
n
||x
__
where D(x||y) is the Kullback-Lieber divergence between
two Bernoulli distributions with parameters x and y in [0, 1].
Let us dene A
i
= {(q, ) [0, 1]
2
: q > (1 + )p
i
,
p
i
+}. We then observe
log P[q
i
> (1 +)p
i
| T
i
]
log((T
i
)) min
(
i
,q
i
)A
i

n,t
(
i
, q
i
).
The inequality (1) holds provided that for every i [m] and
(q
i
,
i
) A
i
, the following holds

n,t
(
i
, q
i
) log
_
1

_
+ log(m) + max
j[m]
log((T
j
)).
Since

n,t
(
i
, q
i
) = log(f
n,t
(
i
, q
i
)) tD(q
i
||
i
) (3)
it suces that
t max
i[m]
max
(q
i
,
i
)A
i
1
D(q
i
||
i
)

1
min
i[m]
D((1 +)p
i
||p
i
+)

= max
i[m]
2(p
i
+)(1 p
i
)
(p
i
)
2
[1 +O()]
=
2( +)(1 )
( )
2
[1 +O()]
where = log
_
1

_
+ log(m) + max
i[m]
log((T
i
)).
It remains only to note that for every given T
i
, log((T
i
))
log(|Q(T
i
)|) log(n), for every i [m].
A.1 A Tighter Sufcient Sample Size
In the preceding section, we used a crude bound for the
term max
i[m]
log((T
i
)). We now describe how one may
derive a tighter bound which may be of interest in applica-
tions.
For every given value 0, let us dene the set B

(T
i
)
as follows
B

(T
i
) = {q Q(T
i
) | D(q||
i
) D(q

i
||
i
) }.
Then, note
(T
i
) |Q(T
i
) \ B

(T
i
)| +|B

(T
i
)|e
t
.
Now, by convexity of the Kullback-Lieber divergence, note
that we have
D(q||
i
) D(q

i
||
i
)
i
(q q

i
)
where

i
= log
_
q

i
(1
i
)
(1 q

i
)
i
_
.
We will use this for q

i
= (1+)p
i
and
i
= p
i
+ as these are
the values that minimize the error exponent (see preceding
section).
We observe that for the set dened as

B

(T
i
) := {q
Q(T
i
) |
i
(q q

i
) }, we have

B

(T
i
) B

(T
i
). Thus,
(T
i
) |Q(T
i
) \

B

(T
i
)| +|

(T
i
)|e
t
.
Furthermore, note that
|Q(T
i
) \

B

(T
i
)|

i
n and |

(T
i
)| n.
Therefore,
(T
i
) n(

i
+e
t
).
By minimizing the right-hand side over 0, we obtain
(T
i
) n
log(
i
t) + 1

i
t
.
Therefore,
max
i[m]
log((T
i
)) log(n) + max
i[m]
log
_
log(
i
t) + 1

i
t
_
.
Combining this with the analysis in the preceding section,
we obtain the following sucient condition for the sample
size to guarantee -accuracy with probabity at least 1 ,
Dt log(1/) + log(m) + log(n) min
i[m]
log
_

i
t
log(
i
t) + 1
_
where D = min
i[m]
D((1 +)p
i
||p
i
+).
B. WEIGHTED SAMPLING SCHEME
We denote with
j
the portion of data items that reside
at site j, i.e.
j
= |
j
|/||, for j [k], and, without of
loss of generality, we assume that sites are enumerated in
decreasing order of
j
, i.e.
1
1

2

k
> 0.
We consider the range partition for given relative range sizes
p = (p
1
, p
2
, . . . , p
m
). Let R
i
(

b) denote the set of data items


from that fall in range i according to the range boundaries

b, which are computed using the sample of data items



S that
are input to the weighted sampling scheme. Let q
i
be the
fraction of data items from that fall in range i. Let q
i,j
denote the fraction of data items in that are in range i and
reside at site j, i.e. q
i,j
= |{a | a
j
, a R
i
}|/||, for
i [m] and j [k]. Similarly, let
i,j
denote the fraction
of samples in

S that are in range i and are from site j, i.e.

i,j
= |{a

S | a R
i
, a
j
}|/|

S|, for i [m] and


j [k]. Notice that under weighted sampling scheme, the
range boundaries are chosen such that the following holds
p
i

k

j=1

i,j
p
i
+, for every i [m].
Now, the error probability
t
is given as follows:

t
= P[
m
i=1
{q
i
> (1 +)p
i
} | S].
Using the union bound, we have

t

m

i=1
P[q
i
> (1 +)p
i
| S]
m max
i[m]
P[q
i
> (1 +)p
i
| S]. (4)
We x an arbitrary range i and consider the probability of
the error event {q
i
> (1+)p
i
}. Let
i
= (
i,1
,
i,2
, . . . ,
i,m
),
for every i [m].
Note that the following holds for every range i [m],
P[q
i
> (1 +)p
i
| S] = P[
k

j=1

j
q
i,j
> (1 +)p
i
| T
i
]
where we dene T
i
= (b
i1
, b
i
,
i
).
Let us denote with F
j
the cumulative distribution function
of data items that reside at site j, which is dened as follows
F
j
(a) =
|{b
j
| b a}|
|
j
|
, a
j
, j [k].
Notice that under T
i
, q
i,j
is a random variable that takes
values on the set Q
j
(T
i
) dened by
Q
j
(T
i
) = {
t
i,j
n
,
t
i,j
+ 1
n
, . . . ,
n
j
(F
j
(b
i
) F
j
(b
i1
))
n
}.
Let Q(T
i
) =
j[k]
Q
j
(T
i
). We observe
P[
k

j=1

j
q
i,j
> (1 +)p
i
| T
i
]
=

q
i
Q(T
i
):

k
j=1

j
q
i,j
>(1+)p
i

j[k]
f
n,t
(q
i,j
,
i,j
)
where f
n,t
(x, y) =
(
nx
ty
)(
n(1x)
t(1y)
)
(
n
t
)
. From (3), we have

j[k]
f
n,t
(q
i,j
,
i,j
) exp(t

j[k]
D(q
i,j
||
i,j
)).
Therefore,
P[
k

j=1

j
q
i,j
> (1 +)p
i
| T
i
] (5)
(T
i
) exp(t

j[k]
D(q

i,j
||
i,j
)) (6)
where q

i
maximizes exp(t

j[k]
D(q

i,j
||
i,j
)) over q
i

[0, 1]
k
such that

k
j=1

j
q
i,j
> (1 +)p
i
, and
(T
i
) =

q
i


Q(T
i
)
e
t

j[k]
[D(q
i,j
||
i,j
)D(q

i,j
||
i,j
)]
where

Q(T
i
) = {q
i
Q(T
i
) |

k
j=1

j
q
i,j
> (1 +)p
i
}.
B.1 Analysis of the Error Exponent
Let D
i
be the optimal value of the following optimization
problem:
minimize
k

j=1
D(q
i,j
||
i,j
)
over 0 q
i,j
1, j [k]
0
i,j
1, j [k]
subject to
k

j=1

j
q
i,j
(1 +)p
i
k

j=1

i,j
p
i
+.
This is a convex optimization problem and has a unique
solution. The following result provides a tight characteriza-
tion of the error exponent and follows by Sanovs theorem
on the deviation of empirical measures [4].
Theorem 3. Assume < and let = min
i
D
i
. The
error probability
t
satises

1
t
log(
t
) , large t.
We will next provide a tight characterization of the error
exponent and on the way, we will characterize the optimal
solution q
i
and
i
. This will enable us to identify what the
worst-case distribution of data over sites is with respect to
the error exponent.
Since it suces to consider an arbitrary range i [m], in
order to simplify the notation, we will omit the subscript i
and, thus, write q
j
and
j
in lieu of q
i,j
and
i,j
, respec-
tively. Furthermore, we simplify the notation by denoting
A = p
i
(1 + ) and B = p
i
+ . With our new notation, the
optimization problem can be rewritten as follows
minimize
k

j=1
D(q
j
||
j
)
over 0 q
j
1, j [k]
0
j
1, j [k]
subject to
k

j=1

j
q
j
A (7)
k

j=1

j
B. (8)
We solve this problem by the method of Lagrange mul-
tipliers. Let 0 and 0 be the Lagrange multipliers
associated with the constraints (7) and (8), respectively, and
let
j
0 and
j
0 (resp.
j
0 and

j
0) denote the
Lagrange multipliers associated with constraints 0 q
j
and
q
j
1 (resp. 0
j
and
j
1), respectively.
The optimal solution is such that for some positive valued
, ,
j
,
j
,
j
and

j
, j [k], the following holds for every
j [k],

j
q
j

j
(1
j
)
=
j
+
j

j
(9)
q
j
1 q
j
=

j
1
j
2

j
+
j

j
. (10)
In addition, the following complementarity slackness con-
ditions hold, for every j [k],
(
k

j=1

j
q
j
A) = 0
(B
k

j=1

j
) = 0

j
q
j
= 0

j

j
= 0

j
(1 q
j
) = 0

j
(1
j
) = 0.
The following lemma provides a characterization of the
optimal solution and is easy to conclude from the conditions
above, so the proof is omitted.
Lemma 1. For every j [k], q
j
= 0 if and only if
j
= 0
and similarly
j
= 1 if and only if
j
= 1. Otherwise, if
0 < q
j
< 1, then

j
q
j

j
(1
j
)
=
j
and
q
j
1 q
j
=

j
1
j
2

j
.
From (9) and (10), we have that for every j [k] such
that 0 < q
j
< 1, it holds

j
= 1
1

j
+
1
2

j
1
and
q
j
=
j
2

j
(2

j
1)
2

1
2

j
1
.
We next note another important property of the optimum
solution.
Lemma 2. There exists an integer [k] such that and
q
j
,
j
> 0, for j [], and q
j
=
j
= 0, otherwise.
The lemma tells us that the worst case is to have a strictly
positive fraction of the range in a site only if the fraction of
data items residing at the site is large enough.
Proof of the lemma. From (9) and (10), note that q
j
>
0 i

j
> 1 2

j
.
It is easily noted that this inequality holds i
j
> , for
some > 0. Thus, if the inequality holds for some
i
, then
it holds for every
j

i
. This completes the proof.
We next characterize the optimal values of the Lagrange
multipliers and and the value of the threshold . It
is easy to establish that conditions

k
j=1

j
= A and

k
j=1

j
q
j
= B can be, respectively, written as follows:
1

j=1

j
2

j
2

j
1

1

=
A

(11)
and

j=1

2
j
2

j
(2

j
1)
2

1

j=1

j
2

j
1
=
B

. (12)
Lemma 3. The threshold is equal to the largest integer
i [k] such that q
i
> 0, i.e. the largest integer i [k] such
that
2

i
2

i
1

i
>
1
i
_
i

j=1
2

j
2

j
1

j
A
_
.
Proof of the lemma. The lemma follows by Lemma 2
and noting that q
i
> 0 is equivalent to 1
i
< 2

i
and
then using the identities (11) and (12) to derive the assertion
above.
The above characterizations of the optimal solution en-
ables us to identify the optimal error exponent which is
stated in the following theorem.
Theorem 4. Given dual optimal values > 0 and > 0,
the optimal error exponent is given by
=

k
_
1

j=1
log
_

j
2

j
1
_
+

j=1

j
B


_
.
The last theorem gives us an explicit characterization of
the error exponent as a function of the dual variables and
. We provide a more explicit characterization of the error
exponent that holds for the case of small parameter in the
following.
Theorem 5. Assume < . The error exponent satis-
es
=

k

2
(1 /())
2
2(

j=1

j
) +
1
2

+o(
2
).
The theorem reveals that for the case of small parameter,
the key parameter for the error exponent is the variance
of the distribution of the number of data items over sites.
The value of the parameter is given by Lemma 2 and is
noteworthy that for the case of small , is the largest
i [k] such that the following holds

i
>
1
i
_
i

j=1

j
2A
_
+O().
Indeed, this follows by combining (2) with the following fact

j
2

j
2

j
1
= 1 +
1
2

j
+O(
2
).
Remark For the case of perfectly balanced number of data
items per site, i.e.
j
= 1/k, for each site j [k], we have
= k,
2

= 0 and
=

2
(1 /())
2
2(1 )
+o(
2
)
which corresponds to the error exponent of sampling from a
dataset residing in a single site or multiple sites but where
the sample is taken by sampling without replacement across
the entire dataset (i.e. our simple sampling scheme).
Proof of Theorem 5. We make use of the following prop-
erty of the Kullback-Lieber divergence, for every j [k] such
that 0 < q
j
< 1,
D(q
j
||
j
) =
(q
j

j
)
2
2
j
(1
j
)
+O((q
j

j
)
3
).
Combining with (9), we have
D(q
j
||
j
) =

2

j
(q
j

j
) +O(
3
).
We observe
1
k

j=1
D(q
j
||
j
) =

2k
(AB) +O(
3
). (13)
Lemma 4. The following equality holds, for some v 0,
=
AB
A(

j=1

j
A) +
2
v
.
Furthermore, for the small case, it holds v =
1
4

+ o(1)
where

=
1

j=1

2
j
(
1

j=1

j
)
2
.
The lemma is established by the following arguments.
Subtracting (7) and (8), we obtain

__

j=1

j
+ 2

j=1

j
2

j
1
_
1

j=1

2
j
2

j
(2

j
1)
2
_
= AB.
By plugging and some elementary calculus, it can be
showed that the last equality is equivalent to

_
a(

j=1

j
a) +
2
v(, , )
_
= AB
where v(, , ) = C
1
C
2
and
C
1
=
1

j=1

j
1

j=1

j
2

j
1

1

j=1

2
j
2

j
1
C
2
=
1

j=1
_

j
2

j
1
_
2

_
1

j=1

j
2

j
1
_
2
.
It can be readily checked that C
1
=
1
2

+O() and C
2
=
1
4

+O() which completes the proof of the lemma.


Using the last lemma and (13), we obtain
1
k

j=1
D(q
j
||
j
) =

2k
(AB)
2
A(

j=1

j
A) +
2
v
+O(
3
).
This completes the proof of Theorem 5.
While the previous characterization provides a tight esti-
mate of the error exponent for small value of parameter , it
depends on the threshold parameter which requires know-
ing the distribution of the number of data items over sites.
It is of interest to obtain an estimate of the error exponent
that requires fewer parameters about the underlying distri-
bution of data over sites, which we present in the following
corollary.
Corollary 3. Assume < . Then the error exponent
satises


2
(1 /())
2
2(1 ) +
1
2
k
2

2
k
+o(
2
).
Proof of Corollary 3. It suces to show that for ev-
ery set of values
1

2
. . .
k
> 0 and 0 a 1, it
holds

2k
(AB)
2
a(

j=1

j
A) +
1
4

1
2
(AB)
2
a(1 a) +
1
4
k
2

2
k
.
This relation is indeed equivalent to
1

a(

j=1

j
A) +
1
4


1
k
A(1 A) +
1
4
k
2
k
.
For the case = k, we have that the last relation holds with
equality. It thus remains to consider the case < k which
we do in the following by induction over i = [k ].
Base case i = 1. By denition of and the assumption
< k, we have

k

1
k
(1 2A)
as k = , otherwise. We need to show that
1
k 1
A(
k1

j=1

j
A) +
1
4
(k 1)
2
k1

1
k
A(1 A) +
1
4
k
2
k
.
After some elementary algebra, we can show that this is
equivalent to:
4A(
1 A
k

k
) +k
k
(
2
k

k
)
1
k
0.
The left-hand side is increasing with
k
over [0, (1 2A)/k],
hence the inequality holds if it holds for
k
= (1 2A)/k.
Now, it is easy to check that for
k
= (1 2A)/k, the in-
equality holds with equality, which proves the base case.
Induction step 1 < i < k . We will show that
1
ki1
A(

ki1
j=1

j
A) +
1
4
(k i 1)
2
ki1

1
ki
A(

ki
j=1

j
A) +
1
4
(k i)
2
ki
.
(14)
By the induction hypothesis, the right-hand side is less than
or equal to
1
k
A(1A)+
1
4

2
k
, and thus provided that the last
above inequality holds, we have that our desired condition
holds for k i 1. Notice that

ki

1
k i
_
ki

j=1

j
2A
_
as, otherwise, we will have k i = .
By similar steps as for the base case, we can show that
(14) is equivalent to
4A(uA(mi)
ki
)(ki)
2

2
ki
+2(ki)u
ki
u
2
0
where u =

ki
j=1

j
. The left-hand side is increasing with

ki
over [0,
u2A
ki
] and achieves 0 for
ki
=
u2A
ki
. This
completes the proof.
B.2 Proof of Theorem 2
From (6) and the notation in the preceding section, we
have for every i [k],
P[q
i
> (1 +)p
i
| T
i
] (T
i
) exp(t).
Therefore, combining with (4), we have that a sucient sam-
ple size is
t
1

[log(1/) + log(m) + max


i[m]
log((T
i
))]
Notice that, for every i [m],
log((T
i
)) log(|Q(T
i
)|)
= log(

j[k]
|Q
j
(T
i
)|)
log(

j[k]
n
j
)
k log(n).
The last two relations, combined with Corollary 3, provide
a proof of Theorem 2.
B.3 A Normal Approximation
The result in Theorem 2 provides a sucient sample size
to guarantee -approximate range partition with probability
at least 1 . This sucient sample size is derived by us-
ing a large deviations approach which results in the factor
(1/, m, n, k) that is logarithmic in 1/ and the number of
partitions m, and is linear in k log(n). We will now show that
using a normal approximation we obtain the same remain-
ing factor as in Theorem 2 but this normal approximation
suggests a smaller factor that is independent of k.
We observe that the following inequalities hold, for every
range i [m],
P[
k

j=1

j
q
i,j
> (1 +)p
i
| S]
= P[
k

j=1

j
q
i,j

k

j=1

i,j
> (1 +)p
i

k

j=1

i,j
| S]
P[
k

j=1

j
q
i,j

k

j=1

i,j
> p
i
| S]
where the last inequality holds because

k
j=1

i,j
p
i
+.
Now, we may use the normal approximation for the ran-
dom variable

k
j=1

j
q
i,j

k
j=1

i,j
with mean zero and
variance

k
j=1

2
j

i,j
(1
i,j
)/(t/k). The above bound on
the probability of error thus may be approximated by

_
_
p
i

_

k
j=1

2
j

i,j
(1
i,j
)
_
t/k
_
_
where (x) is the cumulative distribution function of a nor-
mal random variable and

(x) = 1 (x). Now let M =
{(
i,1
, . . . ,
i,k
) [0, 1]
k
|

k
j=1

i,j
= 1} and note that
for every
i
= (
i,1
, . . . ,
i,k
) M,

_
_
p
i

_

k
j=1

2
j

i,j
(1
i,j
)
_
t/k
_
_
max

i
M

_
_
p
i

_

k
j=1

2
j

i,j
(1
i,j
)
_
t/k
_
_
=

_
_
p
i

_
max

i
M

k
j=1

2
j

i,j
(1
i,j
)
_
t/k
_
_
=

_
p
i

_
(p
i
+)(1 (p
i
+)) +(n)

t
_
Requiring that the last expression is less than or equal to
/m yields
t
(p
i
+)(1 (p
i
+)) +(n)
(p
i
)
2

1
(/m)
2
.
Since

(x) exp(
x
2
2
), for large enough x (x 1/

2
suces), we have

1
(x)
2
2 log(1/x). Using this yields a
sucient condition for the last above inequality, which reads
as
t
2[(p
i
+)(1 (p
i
+)) +(n)]
(p
i
)
2
log(m/).

You might also like