Hadoop Map Reduce For Mobile Clouds PDF

Download as pdf or txt
Download as pdf or txt
You are on page 1of 14

This article has been accepted for publication in a future issue of this journal, but has not been

fully edited. Content may change prior to final publication. Citation information: DOI 10.1109/TCC.2016.2603474,
IEEE Transactions on Cloud Computing

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOL. 3, NO. 1, JANUARY 2014 1

Hadoop MapReduce for Mobile Clouds


Johnu George, Chien-An Chen, Radu Stoleru, Member, IEEE, Geoffrey G. Xie Member, IEEE

Abstract—The new generations of mobile devices have high processing power and storage, but they lag behind in terms of
software systems for big data storage and processing. Hadoop is a scalable platform that provides distributed storage and
computational capabilities on clusters of commodity hardware. Building Hadoop on a mobile network enables the devices to
run data intensive computing applications without direct knowledge of underlying distributed systems complexities. However,
these applications have severe energy and reliability constraints (e.g., caused by unexpected device failures or topology changes
in a dynamic network). As mobile devices are more susceptible to unauthorized access, when compared to traditional servers,
security is also a concern for sensitive data. Hence, it is paramount to consider reliability, energy efficiency and security for such
applications. The MDFS (Mobile Distributed File System) [1] addresses these issues for big data processing in mobile clouds. We
have developed the Hadoop MapReduce framework over MDFS and have studied its performance by varying input workloads in
a real heterogeneous mobile cluster. Our evaluation shows that the implementation addresses all constraints in processing large
amounts of data in mobile clouds. Thus, our system is a viable solution to meet the growing demands of data processing in a
mobile environment.

Index Terms—Mobile Computing, Hadoop MapReduce, Cloud Computing, Mobile Cloud, Energy-Efficient Computing, Fault-
Tolerant Computing

1 Introduction fails in the absence of external network connectivity,


as it is the case in military or disaster response oper-

W ith advances in technology, mobile devices are


slowly replacing traditional personal computers.
The new generations of mobile devices are power-
ations. This architecture is also avoided in emergency
response scenarios where there is limited connectivity to
cloud, leading to expensive data upload and download
ful with gigabytes of memory and multi-core proces-
operations. In such situations, wireless mobile ad-hoc
sors. These devices have high-end computing hardware
networks are typically deployed [5]. The limitations of
and complex software applications that generate large
the traditional cloud computing motivate us to study
amounts of data on the order of hundreds of megabytes.
the data processing problem in an infrastructureless and
This data can range from application raw data to im-
mobile environment in which the internet is unavailable
ages, audio, video or text files. With the rapid increase
and all jobs are performed on mobile devices. We
in the number of mobile devices, big data processing
assume that mobile devices in the vicinity are willing
on mobile devices has become a key emerging necessity
to share each other’s computational resources.
for providing capabilities similar to those provided by
There are many challenges in bringing big data ca-
traditional servers [2].
pabilities to the mobile environment: a) mobile devices
Current mobile applications that perform massive are resource-constrained in terms of memory, process-
computing tasks (big data processing) offload data and ing power and energy. Since most mobile devices are
tasks to data centers or powerful servers in the cloud [3]. battery powered, energy consumption during job exe-
There are several cloud services that offer computing cution must be minimized. Overall energy consumption
infrastructure to end users for processing large datasets. depends on the nodes selected for the job execution.
Hadoop MapReduce is a popular open source program- The nodes have to be selected based on each node’s
ming framework for cloud computing [4]. The frame- remaining energy, job retrieval time, and energy profile.
work splits the user job into smaller tasks and runs these As the jobs are retrieved wirelessly, shorter job retrieval
tasks in parallel on different nodes, thus reducing the time indicates lower transmission energy and shorter
overall execution time when compared with a sequential job completion time. Compared to the traditional cloud
execution on a single node. This architecture however, computing, transmission time is the bottleneck for the
job makespan and wireless transmission is the major
• Johnu George, Chien-An Chen, and Radu Stoleru are with source of the energy consumption; b) reliability of data
Department of Computer Science and Engineering, Texas
A&M University, College Station, TX 77840.
is a major challenge in dynamic networks with unpre-
E-mail:{stoleru,jaychen}@cse.tamu.edu, johnuge- dictable topology changes. Connection failures could
[email protected] cause mobile devices to go out of the network reach after
• Geoffrey G. Xie is with Department of Computer Science,
Naval Post Graduate School, Monterey, CA 93943
limited participation. Device failures may also happen
E-mail:[email protected] due to energy depletion or application specific failures.
Manuscript received 29 Aug. 2013; revised 24 Jan. 2014; revised Hence, a reliability model stronger than those used by
28 Mar. 2014 traditional static networks is essential; c) security is
2168-7161 (c) 2016 IEEE. Personal use is permitted, but republication/redistribution requires IEEE permission. See http://www.ieee.org/publications_standards/publications/rights/index.html for more
information.
This article has been accepted for publication in a future issue of this journal, but has not been fully edited. Content may change prior to final publication. Citation information: DOI 10.1109/TCC.2016.2603474,
IEEE Transactions on Cloud Computing

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOL. 3, NO. 1, JANUARY 2014 2

also a major concern as the stored data often contains 2 Related Work & Background
sensitive user information [6] [7]. Traditional security There have been several research studies that at-
mechanisms tailored for static networks are inadequate tempted to bring MapReduce framework to the het-
for dynamic networks. Devices can be captured by erogeneous cluster of devices, due to its simplicity and
unauthorized users and data can be compromised easily powerful abstractions [11].
if necessary security measures are not provided. To ad- Marinelli [12] introduced the Hadoop based platform
dress the aforementioned issues of energy efficiency, reli- Hyrax for cloud computing on smartphones. Hadoop
ability and security of dynamic network topologies, the TaskTracker and DataNode processes were ported on
k-out-of-n computing framework was introduced [8] [9]. Android mobile phones, while a single instance of
An overview of the previous MDFS will be described NameNode and JobTracker were run in a traditional
in section 2.2. What remains as an open challenge is server. Porting Hadoop processes directly onto mobile
to bring the cloud computing framework to a k-out-of-n devices doesn’t mitigate the problems faced in the mo-
environment such that it solves the bottlenecks involved bile environment. As presented earlier, HDFS is not well
in processing and storage of big data in a mobile cloud. suited for dynamic network scenarios. There is a need
for a more lightweight file system which can adequately
The Hadoop MapReduce cloud computing framework address dynamic network topology concerns. Another
meets our processing requirements for several reasons: MapReduce framework based on Python, Misco [13]
1) in the MapReduce framework, as the tasks are run was implemented on Nokia mobile phones. It has a
in parallel, no single mobile device becomes a bottle- similar server-client model where the server keeps track
neck for overall system performance; 2) the MapReduce of various user jobs and assigns them to workers on
framework handles resource management, task schedul- demand. Yet another server-client model based MapRe-
ing and task execution in an efficient fault tolerant duce system was proposed over a cluster of mobile
manner. It also considers the available disk space and devices [14] where the mobile client implements MapRe-
memory of each node before tasks are assigned to duce logic to retrieve work and produce results from
any node; 3) Hadoop MapReduce has been extensively the master node. The above solutions, however, do not
tested and used by large number of organizations for solve the issues involved in data storage and processing
big data processing over many years. However, the of large datasets in the dynamic network.
default file system of Hadoop, HDFS (Hadoop Dis- P2P-MapReduce [15] describes a prototype imple-
tributed File System) [10] is tuned for static networks mentation of a MapReduce framework which uses a
and is unsuitable for mobile environments. HDFS is peer-to-peer model for parallel data processing in dy-
not suitable for dynamic network topologies because: namic cloud topologies. It describes mechanisms for
1) it ignores energy efficiency. Mobile devices have managing node and job failures in a decentralized man-
limited battery power and can easily fail due to energy ner.
depletion; 2) HDFS needs better reliability schemes for The previous research focused only on the parallel
data in the mobile environment. In HDFS, each file processing of tasks on mobile devices using the MapRe-
block is replicated to multiple devices considering heavy duce framework without addressing the real challenges
I/O bound jobs with strong requirements on backend that occur when these devices are deployed in the
network connections. Instead, we need lightweight pro- mobile environment. Huchton et al. [1] proposed a k-
cesses which react well to slow and varying network Resilient Mobile Distributed File System (MDFS) for
connections. Consequently, we considered k-out-of-n mobile devices targeted primarily for military opera-
based MDFS [8], instead of HDFS, as our underlying tions. Chen et al. [16] proposed a new resource al-
file system for the MapReduce framework. location scheme based on k-out-of-n framework and
implemented a more reliable and energy efficient Mobile
In this paper, we implement Hadoop MapReduce Distributed File System for Mobile Ad Hoc Networks
framework over MDFS and evaluate its performance (MANETs) with significant improvements in energy
on a general heterogeneous cluster of devices. We im- consumption over the traditional MDFS architecture.
plement the generic file system interface of Hadoop In the remaining part of this section we present
for MDFS which makes our system interoperable with background material on Hadoop and MDFS.
other Hadoop frameworks like HBase. There are no
changes required for existing HDFS applications to be
deployed over MDFS. To the best of our knowledge, 2.1 Hadoop Overview
this is the first work to bring Hadoop MapReduce The two primary components of Apache Hadoop are
framework for mobile cloud that truly addresses the the MapReduce framework and HDFS, as shown in
challenges of the dynamic network environment. Our Figure 1. MapReduce is a scalable parallel processing
system provides a distributed computing model for pro- framework that runs on HDFS. It refers to two distinct
cessing of large datasets in mobile environment while tasks that Hadoop jobs perform- the Map Task and
ensuring strong guarantees for energy efficiency, data the Reduce Task. The Map Task takes the input data
reliability and security. set and produces a set of intermediate <key,value>
2168-7161 (c) 2016 IEEE. Personal use is permitted, but republication/redistribution requires IEEE permission. See http://www.ieee.org/publications_standards/publications/rights/index.html for more
information.
This article has been accepted for publication in a future issue of this journal, but has not been fully edited. Content may change prior to final publication. Citation information: DOI 10.1109/TCC.2016.2603474,
IEEE Transactions on Cloud Computing

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOL. 3, NO. 1, JANUARY 2014 3

MapReduce
Program M Map Task
Client Node
R Reduce Task
Job Client
A B File Blocks
Submit Job

Assign Task 1 Assign Task 2 Encrypted AES


Hadoop JobTracker Hadoop TaskTracker Hadoop TaskTracker Plain File
M R M R Erasure Coding Secret Sharing
HDFS Client HDFS Client HDFS Client

Encrypted AES
Name Node
Block A
File.txt
Block B Data Node Data Node
Block Datanodes A B A B
A 1,2
Block Datanodes Fig. 2. Existing MDFS architecture
B 1,2

Metadata Data Read/ Data Read/ at regular intervals through heartbeat messages which
Operations Write Write
Network contain the information regarding their stored blocks.
NameNode builds its metadata from these block reports
Fig. 1. Hadoop architecture and always stays in sync with the DataNodes in the
pairs which are sorted and partitioned per reducer. The cluster.
map output is then passed to Reducers to produce the When the HDFS client initiates the file read opera-
final output. The user applications implement mapper tion, it fetches the list of blocks and their corresponding
and reducer interfaces to provide the map and reduce DataNode locations from NameNode. The locations are
functions. In the MapReduce framework, computation ordered by their distance from the reader. It then tries
is always moved closer to nodes where data is located, to read the content of the block directly from the first
instead of moving data to the compute node. In the location. If this read operation fails, it chooses the
ideal case, the compute node is also the storage node next location in the sequence. As the client retrieves
minimizing the network congestion and thereby maxi- data directly from the DataNodes, the network traffic
mizing the overall throughput. is distributed across all the DataNodes in the HDFS
Two important modules in MapReduce are the cluster.
JobTracker and the TaskTracker. JobTracker is the When the HDFS client is writing data to a file, it
MapReduce master daemon that accepts the user jobs initiates a pipelined write to a list of DataNodes which
and splits them into multiple tasks. It then assigns are retrieved from the NameNode. The NameNode
these tasks to MapReduce slave nodes in the cluster chooses the list of DataNodes based on the pluggable
called the TaskTrackers. TaskTrackers are the process- block placement strategy. Each DataNode receives data
ing nodes in the cluster that run the tasks- Map and from its predecessor in the pipeline and forwards it to
Reduce. The JobTracker is responsible for scheduling its successor. The DataNodes report to the NameNode
tasks on the TaskTrackers and re-executing the failed once the block is received.
tasks. TaskTrackers report to JobTracker at regular
intervals through heartbeat messages which carry the
information regarding the status of running tasks and 2.2 MDFS Overview
the number of available slots. The traditional MDFS was primarily targeted for mil-
HDFS is a reliable, fault tolerant distributed file itary operations where front line troops are provided
system designed to store very large datasets. Its key with mobile devices. A collection of mobile devices
features include load balancing for maximum efficiency, form a mobile ad-hoc network where each node can
configurable block replication strategies for data protec- enter or move out of the network freely. MDFS is
tion, recovery mechanisms for fault tolerance and auto built on a k-out-of-n framework which provides strong
scalability. In HDFS, each file is split into blocks and guarantees for data security and reliability. k-out-of-n
each block is replicated to several devices across the enabled MDFS finds n storage nodes such that total
cluster. expected transmission cost to k closest storage nodes
The two modules in HDFS layer are NameNode and is minimal. Instead of relying on conventional schemes
DataNode. NameNode is the file system master daemon which encrypt the stored data per device, MDFS uses
that holds the metadata information about the stored a group secret sharing scheme.
files. It stores the inode records of files and directories As shown in Figure 2, every file is encrypted using
which contain various attributes like name, size, per- a secret key and partitioned into n1 file fragments
missions and last modified time. DataNodes are the file using erasure encoding (Reed Solomon algorithm). Un-
system slave nodes which are the storage nodes in the like conventional schemes, the secret key is not stored
cluster. They store the file blocks and serve read/write locally. The key is split into n2 fragments using Shamir’s
requests from the client. The NameNode maps a file to secret key sharing algorithm. File creation is complete
the list of its blocks and the blocks to the list of DataN- when all the key and file fragments are distributed
odes that store them. DataNodes report to NameNode across the cluster. For file retrieval, a node has to
2168-7161 (c) 2016 IEEE. Personal use is permitted, but republication/redistribution requires IEEE permission. See http://www.ieee.org/publications_standards/publications/rights/index.html for more
information.
This article has been accepted for publication in a future issue of this journal, but has not been fully edited. Content may change prior to final publication. Citation information: DOI 10.1109/TCC.2016.2603474,
IEEE Transactions on Cloud Computing

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOL. 3, NO. 1, JANUARY 2014 4

retrieve at least k1 (<n1 ) file fragments and k2 (<n2 ) an empty slot on any node that contains the block. If
key fragments to reconstruct the original file. no slots are available, it looks for an empty slot on a
MDFS architecture provides high security by en- different node but in the same rack. In MDFS, as no
suring that data cannot be decrypted unless an au- node in the network has a complete block for processing,
thorized user obtains k2 distinct key fragments. It the challenge is to determine the best locations for each
also ensures resiliency by allowing the authorized users task execution. Section 4.5 proposes an energy-aware
to reconstruct the data even after losing n1 -k1 frag- scheduling algorithm that overrides the default one.
ments of data. Reliability of the file increases when 5. The MapReduce and HDFS components are rack
the ratio k1 /n1 decreases, but it also incurs higher aware. They use network topology for obtaining the
data redundancy. The data fragments are placed on a rack awareness knowledge. As discussed, if the node
set of selected storage nodes considering each node’s that contains the block is not available for task exe-
failure probability and its distance to the potential cution, the default task scheduling algorithm selects a
clients. A node’s failure probability is estimated based different node in the same rack using the rack awareness
on the remaining energy, network connectivity, and knowledge. This scheme leverages the single hop and
application-dependent factors. Data fragments are then high bandwidth of in-rack switching. The challenge is
allocated to the network such that the expected data to define rack awareness in context of mobile ad-hoc
transferring energy for all clients to retrieve/recover the network as the topology is likely to change at any
file is minimized. time during the job execution. This challenge is also
MDFS has a fully distributed directory service in addressed by the new scheduling algorithm described
which each device maintains information regarding the in Section 4.5.
list of available files and their corresponding key and
file fragments. Each node in the network periodically 4 System Design
synchronizes the directory with other nodes ensuring In this section, we present the details of our proposed
that the directories of all devices are always updated. architectures, system components and the interactions
among the components that occur during file system
3 Challenges operations.
This section describes the challenges involved in the
implementation of MapReduce framework over MDFS. 4.1 Requirements
1. Traditional MDFS architecture only supports a For the design of our system, the following requirements
flat hierarchy. All files are stored at the same level in had to be met:
the file system without the use of folders or directories. • Since the Hadoop JobTracker is a single entity
But the MapReduce framework relies on fully qualified common to all nodes across the cluster, there
path names for all operations. The fully qualified path should be at least one node in the cluster which
is added to MDFS, as described in Section 4.4.6. always operates within the network range and
2. The capabilities of traditional MDFS are very remains alive throughout the job execution phase.
limited. It supports only a few functionalities such The system must tolerate node failures.
as read(), write() and list(). A user calls the write() • Data stored across the cluster may be sensitive.
function to store a file across the nodes in the network Unauthorized access to sensitive information must
and the read() function to read the contents of a file be prohibited.
from the network. The list() function provides the full • The system is tuned and designed for handling
listing of the available files in the network. large amounts of data in the order of hundreds of
However, MapReduce framework needs a fairly megabytes, but it must also support small files.
generic file system that implements wide range of func- • Though we primarily focus on mobile devices,
tions. It has to be compatible with available HDFS the system must support heterogeneous cluster
applications without any code modification or extra of devices which can be a combination of tradi-
configuration. The implementation detail is described tional personal computers, servers, laptops, mobile
in Section 4.4. phones and tablets depending on the working en-
3. MapReduce framework needs streaming access vironment of the user.
to their data, but MDFS reads and writes are not • Like HDFS, the system must support sequential
streaming operations. How the data is streamed during writes. Bytes written by a client may not be visible
read/write operations are described in Section 4.4. immediately to other clients in the network unless
4. During the job initialization phase of Hadoop, the file is closed or flush operation is called. Append
JobTracker queries the NameNode to retrieve the in- mode must be supported to append the data to
formation of all the blocks of the input file (blocks and an already existing file. Flush operation guarantees
list of DataNodes that store them) for selecting the that bytes up to that given point in the stream are
best nodes for task execution. JobTracker prioritizes persisted and changes are visible to all other clients
data locality for TaskTracker selection. It first looks for in the system.
2168-7161 (c) 2016 IEEE. Personal use is permitted, but republication/redistribution requires IEEE permission. See http://www.ieee.org/publications_standards/publications/rights/index.html for more
information.
This article has been accepted for publication in a future issue of this journal, but has not been fully edited. Content may change prior to final publication. Citation information: DOI 10.1109/TCC.2016.2603474,
IEEE Transactions on Cloud Computing

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOL. 3, NO. 1, JANUARY 2014 5

• Like HDFS, the system must support streaming The client connects to the RPC server and talks to
reads. It must also support random reads where a it using the MDFS Name Protocol. The MDFS client
user reads bytes starting from an arbitrary offset and MDFS Name Server are completely unaware of the
in the file. fragment distribution which is handled by the Data
Server. We kept the namespace management and data
4.2 System Components management totally independent for better scalability
and design simplicity.
In the traditional MDFS architecture, a file to be stored 4.2.2.2 Data Server: The MDFS Data Server is
is encrypted and split into n fragments such that any k a lightweight MDFS daemon instantiated on each node
(<n) fragments are sufficient to reconstruct the original in the cluster. It coordinates with other MDFS Data
file. In this architecture, parallel file processing is not Server daemons to handle MDFS communication tasks
possible as even a single byte of the file cannot be like neighbor discovery, file creation, file retrieval and
read without retrieving the required number of frag- file deletion. On startup, it starts a local RPC server
ments. Moreover, MapReduce framework assumes that listening on the port defined by mdfs.dataservice.rpc-
the input file is split into blocks which are distributed port in the configuration file. When the user invokes
across the cluster. Hence, we propose the notion of any file system operation, the MDFS client connects to
blocks, which was missing in the traditional MDFS the local Data Server at the specified port and talks
architecture. In our approach, the files are split into to it using the MDFS Data Protocol. Unlike Hadoop
blocks based on the block size. These blocks are then DataNode, the Data Server has to be instantiated on
split into fragments that are stored across the cluster. all nodes in the network where data flow operations
Each block is a normal Unix file with configurable block (reads and writes) are invoked. This is because the Data
size. Block size has a direct impact on performance as Server prepares the data for these operations and they
it affects the read and write sizes. are always executed in the local file system of the client.
The file system functionality of each cluster node is The architecture is explained in detail in the subsequent
split across three layers- MDFS Client, Data processing sections.
layer and Network communication layer.
4.2.3 Network communication layer
4.2.1 MDFS Client This layer handles the communication between the
User applications invoke file system operations using nodes in the network. It exchanges control and data
the MDFS client, a built-in library that implements the packets for various file operations. This layer abstracts
MDFS file system interface. The MDFS client provides the network interactions and hides the complexities
file system abstraction to upper layers. The user does involved in routing packets to various nodes in case of
not need to be aware of file metadata or the storage dynamic topologies like in MANETs.
locations of file fragments. Instead, the user references 4.2.3.1 Fragment Mapper: The Fragment Map-
each file by paths in the namespace using the MDFS per stores information of file and key fragments which
client. Files and directories can be created, deleted, include the fragment identifiers and the location of
moved and renamed like in traditional file systems. fragments. It stores the mapping of a block to its list
All file system commands take path arguments in URI of key and file fragments.
format (scheme://authority/path). The scheme decides 4.2.3.2 Communication Server: The Communi-
the file system to be instantiated. For MDFS, the cation Server interacts with every other node and is
scheme is mdfs and the authority is the Name Server responsible for energy-efficient packets routing. It must
address. support broadcast and unicast, the two basic com-
munication modes required by MDFS. To allow more
4.2.2 Data processing layer flexible routing mechanisms in different environments,
Data Processing layer manages the data and control it is implemented as a pluggable component which can
flow of file system operations. The functionality of this be extended to support multiple routing protocols.
layer is split across two daemons- Name Server and 4.2.3.3 Topology Discovery & Maintenance
Data Server. Framework: This component stores the network
4.2.2.1 Name Server: MDFS Name Server is a topology information and the failure probabilities
lightweight MDFS daemon that stores the hierarchical of participating nodes. When the network topology
file organization or the namespace of the file system. changes, this framework detects the change through a
All file system metadata including the mapping of a file distributed topology monitoring algorithm and updates
to its list of blocks is also stored in the MDFS Name the Fragment Mapper. All the nodes in the network
Server. The Name Server has the same functionality are thus promptly updated about network topology
as Hadoop NameNode. The Name Server is always up- changes.
dated with any change in the file system namespace. On There are two types of system metadata. The file
startup, it starts a global RPC server at a port defined system namespace which includes the mapping of file
by mdfs.nameservice.rpc-port in the configuration file. to blocks is stored in the Name Server while mapping of
2168-7161 (c) 2016 IEEE. Personal use is permitted, but republication/redistribution requires IEEE permission. See http://www.ieee.org/publications_standards/publications/rights/index.html for more
information.
This article has been accepted for publication in a future issue of this journal, but has not been fully edited. Content may change prior to final publication. Citation information: DOI 10.1109/TCC.2016.2603474,
IEEE Transactions on Cloud Computing

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOL. 3, NO. 1, JANUARY 2014 6

Client Node MapReduce M Map Task


Program R Reduce Task messages. Any new node entering the network receives
Job Client A
C
B
D File Fragments these broadcast messages and creates a local cache for
Submit Job
further operations. In the hardware implementation,
Hadoop JobTracker Assign Hadoop TaskTracker
Task M R the updates are broadcasted using UDP packets. We
MDFS Client MDFS Client
assume all functional nodes can successfully receive
Data Server
the broadcast messages. The more comprehensive and
Data Server
Name Server
A C
Name Server
B D
robust distributed directory service is left as future
Fragment Fragment
Block 1 Mapper Block 1 Mapper work.
File.txt File.txt
Block 1 Frag A Commun. Block 1 Frag A Commun.
Block 2 Frag B
Frag C
Server Block 2 Frag B
Frag C
Server This architecture has no single point of failure and no
Block 2 Frag D Block 2
Frag D
constraint is imposed on the network topology. Each
node can operate independently, as each node stores
Metadata Data Read/ Fragment Metadata Data Read/ Fragment
Operations Write Operations Operations Write Operations a separate copy of the namespace and fragment map-
Network
ping. The load is evenly distributed across the cluster
Fig. 3. Distributed Architecture of MDFS in terms of metadata storage when compared to the
centralized architecture. However, network bandwidth
block to fragments is stored in the Fragment Mapper. It is wasted due to the messages broadcast by each node
was our design decision to separate Fragment Mapper for updating the local cache of every other node in the
functionality from the Name Server. There are two network. As the number of nodes involved in processing
reasons 1) The memory usage of Fragment Mapper can increases, this problem becomes more severe, leading to
grow tremendously based on the configured value of n higher response time for each user operation.
for each file. For example, consider a system with n set
to 10. For a 1 MB file with 4MB block size, only one 4.3.2 Master-Slave Architecture
block is required but 10 fragments are created by k-out-
of-n framework. Hence, there are 10 fragment entries In this architecture depicted in Figure 4, the Name
in the Fragment Mapper and only 1 block entry in Server and the Fragment Mapper are singleton in-
the Name Server for this particular file. Since memory stances across the complete cluster. These daemons can
requirements of Name Server and Fragment Mapper be run in any of the nodes in the cluster. The node
are different, this design gives flexibility to run them that runs these daemons is called the master node.
independently in different modes. 2) Fragment Mapper MDFS stores metadata on the master node similar to
is invoked only during network operations (read/writes) other distributed systems like HDFS, GFS [17] and
while the Name Server is accessed for every file system PVFS [18].
operation. Since the Name Server is a light weight The centralized architecture has many advantages. 1)
daemon that handles only the file system namespace, Since a single node in the cluster stores the complete
the directory operations are fast. metadata, there is no wastage of the device memory
by storing same metadata in all nodes when compared
to the distributed approach. 2) When a file is created,
4.3 System Architecture
modified or deleted, there is no need to broadcast any
We propose two approaches for our MDFS architecture- message across the network to inform other nodes for
a Distributed architecture where there is no central updating their metadata. This saves overall network
entity to manage the cluster and a Master-slave archi- bandwidth and reduces transmission cost. Lesser trans-
tecture, as in HDFS. Although the tradeoffs between mission cost leads to higher energy efficiency of the sys-
the distributed architecture and the centralized archi- tem. 3) Since our system is assumed to have at least one
tecture in a distributed system are well-studied, this node that always operates within the network range,
paper is the first to implement and compare Hadoop the Name Server and the Fragment Mapper can be run
framework on these two architectures. Some interesting in the same node that hosts the Hadoop JobTracker.
observations are also discovered, as described in sec- It can be a static server or any participating mobile
tion 6.6. The user can configure the architecture during device. Thus this approach doesn’t violate any of our
the cluster startup based on the working environment. initial assumptions.
It has to be configured during the cluster startup and The major disadvantage of the centralized approach
cannot be changed later. is that the master node is a single point of failure.
However, this problem can be solved by configuring
4.3.1 Distributed Architecture a standby node in the configuration file. The standby
In this architecture, depicted in Figure 3, every par- node is updated by the master node whenever there is
ticipating node runs a Name Server and a Fragment a change in the file system metadata. The master node
Mapper. After every file system operation, the update signals success to client operations only when metadata
is broadcasted in the network so that the local caches change is reflected in both master and standby nodes.
of all nodes are synchronized. Moreover, each node pe- Hence, data structures of the master and standby node
riodically syncs with other nodes by sending broadcast always remain in sync ensuring smooth failover.
2168-7161 (c) 2016 IEEE. Personal use is permitted, but republication/redistribution requires IEEE permission. See http://www.ieee.org/publications_standards/publications/rights/index.html for more
information.
This article has been accepted for publication in a future issue of this journal, but has not been fully edited. Content may change prior to final publication. Citation information: DOI 10.1109/TCC.2016.2603474,
IEEE Transactions on Cloud Computing

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOL. 3, NO. 1, JANUARY 2014 7

MapReduce M Map Task


Client Node
Program R Reduce Task the block to be read is first reconstructed locally. For
Job Client A
C
B
D
File Fragments security purpose, the retrieved key/data fragments and
Submit Job
Assign the decoded blocks are deleted as soon as the plain data
Task
Hadoop JobTracker Hadoop TaskTracker M R is loaded into the taskTracker’s memory. Although there
MDFS Client MDFS Client
is a short period of time that a plain data block may be
compromised, we leave the more secure data processing
Name Server
Data Server
A C
Data Server
B D problem as future work.
Block 1
The overall transmission cost during the read opera-
Fragment Mapper
File.txt
Block 2 Frag A,
tion varies across nodes based on the location of frag-
Block 1 Commun. Commun.
Frag B Server Server ments and the reader location. As the read operation is
Frag C,
Block 2 Frag D handled locally, random reads are supported in MDFS
where the user can seek to any position in the file.
Metadata Fragment Data Read/ Data Read/
Operations Operations Write Write Figure 5 illustrates the control flow of a read operation
Network
through these numbered steps.
Fig. 4. Centralized Architecture of MDFS Step 1: The user issues a read request for file blocks
of length L at a byte offset O.
Steps 2-3: As in HDFS, the MDFS client queries the
Control Flow Name Server to return all blocks of the file that span
Data Flow
the byte offset range from O to O+L. The Name Server
1
2 Name Server MDFS Client MDFS Client searches the local cache for the mapping from the file
3
MDFS CLient
to the list of blocks. It returns the list of blocks that
4
20
Data Server
contain the requested bytes.
Data Server Data Server Step 4: For each block in the list returned by the
5 19
8
File Retrieval
Name Server, the client issues a retrieval request to the
16-18
11
Module Data Server. Each file system operation is identified by
12 6 7 a specific opcode in the request.
15
Fragment
Commu.
Server
Commu.
Server
Step 5: The Data Server identifies the opcode and
Commu. Mapper
Server Topology
Discovery/
instantiates the File Retriever module to handle the
Maintenance
Unit
k-out-of-n
framework block retrieval.
9 Steps 6-7: The Data Server requests the Fragment
10
13
Mapper to provide information regarding the key and
14
file fragments of the file. The Fragment Mapper replies
with the identity of the fragments and the locations of
Fig. 5. Data Flow of a read operation the fragments in the networks.
Steps 8-15: The Data Server requests the Communi-
The master node can be loaded when large number of cation Server to fetch the required number of fragments
mobile devices are involved in processing. There are sev- from the locations which are previously returned by the
eral distributed systems like Ceph [19] and Lustre [20] Fragment Mapper. Fragments are fetched in parallel
that support more than one instance of metadata server and stored in the local file system of the requesting
for managing the file system metadata evenly. Multi- client. After fetching each request, the Communication
ple metadata servers are deployed to avoid scalability Server acknowledges the Data Server with the location
bottlenecks of a single metadata server. MDFS can where the fragments are stored in the local file system.
now efficiently handle hundreds of megabytes with a Step 16: The above operations are repeated for
single metadata server and there is no need for multiple fetching the key fragments. These details are not in-
metadata servers in our environment. For rest of the cluded in the diagram for brevity. The secret key is
discussion, we use centralized approach for simplicity. constructed from the key fragments.
Step 17: Once the required file fragments are down-
4.4 System Operations loaded into the local file system, they are decoded and
This section describes how the file read, file write, file then decrypted using the secret key to get the original
append, file delete, file rename, and directory operations block.
are performed in a centralized architecture. Step 18: The key and file fragments which were
downloaded into the local file system during the re-
4.4.1 File Read Operation trieval process are deleted for security reasons.
HDFS read design is not applicable in MDFS. For any Step 19: The Data Server acknowledges the client
block read operation, the required number of fragments with the location of the block in the local file system.
has to be retrieved and then combined and decrypted Step 20: The MDFS client reads the requested
to recover the original block. Unlike HDFS, an MDFS number of bytes of the block. Steps 4-19 are repeated
block read operation is always local to the reader as if there are multiple blocks to be read. Once the read
2168-7161 (c) 2016 IEEE. Personal use is permitted, but republication/redistribution requires IEEE permission. See http://www.ieee.org/publications_standards/publications/rights/index.html for more
information.
This article has been accepted for publication in a future issue of this journal, but has not been fully edited. Content may change prior to final publication. Citation information: DOI 10.1109/TCC.2016.2603474,
IEEE Transactions on Cloud Computing

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOL. 3, NO. 1, JANUARY 2014 8

B is the user configured block size. The last block might


Control Flow
Data Flow
not be complete depending on the file length. The user
1 request can also be a streaming write where the user
MDFS CLient
2
3
Name Server MDFS Client MDFS Client
writes to the file system byte by byte. Once the block
20
boundary is reached or when the file is closed, the block
4 Data Server
Data Server
is written to the network. In both scenarios, the data
Data Server
5 19 to be written is assumed to be present in the local file
14
File Creator
6-7 system.
11 Module

8 9
Step 2: Similar to HDFS block allocation scheme,
18
15 10 Commu. Commu. for each block to be written, the MDFS client requests
K-out-of-n Server Server
Commu.
Server Topology
framework the Name Server to allocate a new block Id which is
Discovery/
Maintenance Fragment a unique identifier for each block. As all the identifiers
Unit Mapper
13
are generated by a single Name Server in a centralized
12
architecture, there will not be any identifier. However,
17
in the distributed architecture, an appropriate hashing
16
function is required to generate the unique global iden-
Fig. 6. Data Flow of a write operation tifier. In our implementation, the absolute path of each
file is used as the hash key.
operation is completed, the block is deleted for security
reasons to restore the original state of the cluster. Step 3: The Name Server returns a new block id
If many clients are accessing the same file, the mo- based on the allocation algorithm and adds the block
bile nodes that store the fragments may become the identifier in its local cache. The mapping of file to list
hot spots. This problem can be fixed by enabling file of blocks is stored in the Name Server.
caching. Caching is disabled by default and each node Steps 4-5: The MDFS client issues a creation request
deletes the file fragments after the file retrieval. If to the Data Server which contains a specific opcode
caching is enabled, the reader node caches the file in the request message. The Data Server identifies the
fragments in its local file system so that it does not fetch opcode and instantiates the File Creator module to
the fragments from the network during the subsequent handle the block creation.
read operations.
Step 6: The block stored in the local file system is
If the file fragments are available locally, the reader
encrypted using the secret key. The encrypted block is
client verifies the length of cached file with the actual
partitioned into n fragments using erasure encoding.
length stored in Name Server. This avoids the problem
of reading the outdated version of the file. If some Step 7: The key is also split into fragments using
data has been appended to the file after caching, file Shamir’s secret key sharing algorithm.
fragments are re-fetched from the network overwriting Steps 8-9: The Data Server requests the k-out-of-n
the existing ones. Fragment availability increases due framework to provide n storage nodes such that total
to caching which leads to fair distribution of load in expected transmission cost from any node to k closest
the cluster without consuming extra energy. However, storage nodes is minimal.
caching affects system security due to higher availability
Step 10: The Data Server requests the Fragment
of file fragments in the network.
Mapper to add the fragment information of each file
which includes the fragment identifier with the new
4.4.2 File Write Operation
locations returned by the k-out-of-n framework. If the
HDFS write design is not applicable for MDFS as data network topology changes after the initial computation,
cannot be written unless the block is decrypted and k-out-of-n framework recomputes the storage nodes
decoded. Hence in the MDFS architecture, when write for every file stored in the network and updates the
operation is called, bytes are appended to the current Fragment Mapper. This ensures that Fragment Mapper
block till the block boundary is reached or the file is is always in sync with the current network topology.
closed. The block is then encrypted, split into fragments
and redistributed across the cluster. Steps 11-18: The file fragments are distributed
Our MDFS architecture does not support random in parallel across the cluster. The key fragments are
writes. Random writes make the design more compli- also stored in the same manner. These details are not
cated when writes span across multiple blocks. This included in the diagram for brevity.
feature is not considered in the present design as it is Steps 19-20: Once the file and key fragments are
not required for the MapReduce framework. Figure 6 distributed across the cluster, the Data Server informs
illustrates the control flow of a write operation through the client that the file has been successfully written
these numbered steps to the nodes. For security purposes, the original block
Step 1: The user issues a write request for a file of stored in the local file system of the writer is deleted
length L. The file is split into blocks of size [L/B] where after the write operation as it is no longer needed.
2168-7161 (c) 2016 IEEE. Personal use is permitted, but republication/redistribution requires IEEE permission. See http://www.ieee.org/publications_standards/publications/rights/index.html for more
information.
This article has been accepted for publication in a future issue of this journal, but has not been fully edited. Content may change prior to final publication. Citation information: DOI 10.1109/TCC.2016.2603474,
IEEE Transactions on Cloud Computing

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOL. 3, NO. 1, JANUARY 2014 9

4.4.3 File Append Operation Algorithm 1: Task Scheduling


MDFS supports Append operation which was intro- Input: Sb , Fb , d, k
duced in Hadoop 0.19. If a user needs to write to an Output: X, Ci⋆ // index b in Ci⋆ (b) is omitted
existing file, the file has to be open in append mode. Ci⋆ = 0
If the user appends data to the file, bytes are added to X ←− 1 × N array initialized to 0
the last block of the file. Hence, for block append mode, D ←− 1 × N array
the last block is read into the local file system of the for j=1 to N do
writer and the file pointer is updated appropriately to D[j].node=j
the last written byte. Then, writes are executed in a D[j].cost=(Sb /k) × Fb (j) × dij
similar way as described in the previous section. if D[j].cost == 0 then
D[j].cost=N 2 // Just assign a big number
4.4.4 File Delete Operation end
For a file to be deleted, all file fragments of every block end
of the file have to be deleted. When the user issues a D ←− Sort D in increasing order by D.cost
file delete request, the MDFS client queries the Name for i=1 to k do
Server for all the blocks of the file. It then requests the X[D[i].node]=1
Data Server to delete these blocks from the network. Ci⋆ += D[i].cost
The Data Server gathers information about the file end
fragments from the Fragment Mapper and requests return X, Ci⋆
the Communication Server to send delete requests to
all the locations returned by the Fragment Mapper. from the corresponding nodes, adding latency. In a
Once the delete request has been successfully executed, mobile environment, higher network traffic leads to
the corresponding entry in the Fragment Mapper is increased energy consumption which is a major concern.
removed. In case of the distributed architecture, the Therefore, fetching data for non-local data processing
update has to be broadcast to the network so that the results in higher energy consumption and increased
entry is deleted from all nodes in the network. transmission cost. Hence, it is important to bring Map
Task processing nearer to the nodes that store the data
4.4.5 File Rename Operation for minimum latency and maximum energy efficiency.
The File Rename operation requires only an update There are many challenges in bringing data locality to
in the namespace where the file is referenced with the MDFS. Unlike native Hadoop, no single node running
new path name instead of the old path. When the user MDFS has a complete data block; each node has at
issues a file rename request, the MDFS client requests most one fragment of a block due to security reasons.
the Name Server to update its namespace. The Name Consequently, the default MapReduce scheduling al-
Server updates the current inode structure of the file gorithm that allocates processing tasks closer to data
based on the renamed path. blocks does not apply. When MDFS performs a read
operation to retrieve a file, it finds the k fragments
4.4.6 Directory Create/Delete/Rename Operations
that can be retrieved with the lowest data transferring
When the user issues the file commands to create, delete energy. Specifically, the k fragments that are closest
or rename any directory, the MDFS client requests the to the file requester in terms of the hop-count are
Name Server to update the namespace. The namespace retrieved. As a result, knowing the network topology
keeps a mapping of each file to its parent directory (from the topology maintenance component in MDFS)
where the topmost level is the root directory (’/’). All and the locations of each fragment (from the fragments
paths from the root node to the leaf nodes are unique. mapper), we could estimate the total hop-count for each
Recursive operations are also allowed for delete and node to retrieve the closest k fragments of the block.
rename operations. Smaller total hop-count indicates lower transmission
4.5 Energy-aware task scheduling time, lower transmission energy, and shorter job com-
Hadoop Mapreduce framework relies on data locality pletion time. Although this estimation adds a slight
for boosting overall system throughput. Computation overhead, and is repeated again when MDFS actually
is moved closer to the nodes where the data resides. retrieves/reads the data block, we leave the engineering
JobTracker first tries to assign tasks to TaskTrackers optimization as the future work.
in which the data is locally present (local). If this is We now describe how to find the minimal cost (hop-
not possible (no free map slots or if tasks have already count) for fetching a block from a taskTracker. Al-
failed in the specific node), it then tries to assign tasks gorithm 1 illustrates the main change made to the
to other TaskTrackers in the same rack (non-local). default Hadoop MapReduce scheduler such that the
This reduces the cross-switch network traffic thereby data transferring energy is taken into account when
reducing the overall execution time of Map tasks. In scheduling a task. The improved scheduler uses the
case of non-local task processing, data has to be fetched current network condition (topology and nodes’ failure
2168-7161 (c) 2016 IEEE. Personal use is permitted, but republication/redistribution requires IEEE permission. See http://www.ieee.org/publications_standards/publications/rights/index.html for more
information.
This article has been accepted for publication in a future issue of this journal, but has not been fully edited. Content may change prior to final publication. Citation information: DOI 10.1109/TCC.2016.2603474,
IEEE Transactions on Cloud Computing

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOL. 3, NO. 1, JANUARY 2014 10

probability) to estimate the task retrieval energy and is being executed. Once the write has completed, the
assign tasks. Only nodes that are currently functional new data is visible across the cluster immediately. In
and available may be selected. Ci⋆ (b) is defined as the all circumstances, MDFS provides strong consistency
minimal cost of fetching block b at node i. Let Fb be a guarantee for reads such that all concurrent reader
1 × N binary vector where each element Fb (j) indicates clients will read the same data irrespective of their
whether node j contains a fragment of block b (note that
P locations.
N
j=1 Fb (j) = n ∀b); Sb is the size of block b; di,j is
the distance (hop-count) between node i and node j; 4.7 Failure Tolerance
the pair-wise node distance can be estimated efficiently
This section describes how MDFS uses k-out-of-n en-
using all pair shorted distance algorithm X is a 1 × N
coding technique and snapshot operation to improve the
binary decision variable where Xj = 1 indicates that
data reliability and prevent node failures.
node j sends a data fragment to node i.
X
N X
N 4.7.1 k-out-of-n reliability
Ci⋆ (b) = min (Sb /k)Fb (j)di,j Xj , s.t. Xj = k In HDFS, each block is replicated a specific number of
j=1 j=1 times for fault tolerance, which is determined by the
Ci⋆ (b) can be solved by Algorithm 1, which minimizes replication factor configured by the user. In MDFS, the
the communication cost for node i to retrieve k data k-out-of-n framework ensures data reliability where k
fragments of block b. Once Ci⋆ (b) for each node is solved, and n parameters determine the level of fault tolerance.
the processing task for block b is then assigned to node These parameters are per file configurable which are
p where p = arg mini Ci⋆ (b). The time complexity of specified at the file creation time. Only k nodes are
Algorithm 1 is N logN due to the sorting procedure, and required to retrieve the complete file, ensuring data
because it is performed once for each of the N node, the reliability.
time complexity for assigning a block to a taskTracker
is N 2 logN . Considering the size of our network (≤ 100) 4.7.2 Snapshot operation
and the processing power of the modern mobile devices, A snapshot operation creates a backup image of current
the computational overhead of the scheduling algorithm state which includes in-memory data structures. During
is minimal. safe shutdown of the Name Server and Fragment Map-
per, a snapshot operation is automatically invoked to
4.6 Consistency Model save the state on the disk. On restart, the saved image
is used to rebuild the system state. Snapshot operations
Like HDFS, MDFS also follows single writer and multi-
are particularly useful when a user is experimenting
ple reader model. An application can add data to MDFS
with changes that need to be rolled back easily in
by creating a new file and writing data to it (Create
the future. When client requests a snapshot operation,
Mode). The data once written cannot be modified or
the Name Server enters a special maintenance state
removed except when the file is reopened for append
called safe mode. No client operations are allowed when
(Append Mode). In both write modes, data is always
the Name Server is in safe mode. The Name Server
added to the end of the file. MDFS provides the support
leaves safe mode automatically once backup is created.
for overwriting the entire file but not from any arbitrary
The data server is not backup as it mainly handles
offset in the file.
MDFS communication tasks like neighbor discovery, file
If an MDFS client opens a file in Create or Append
creation, file retrieval. These information varies with
mode, the Name Server acquires a write lock on the
time, so it is unnecessary to include in the snapshot.
corresponding file path so that no other client can open
the same file for write. The writer client periodically
notifies the Name Server through heartbeat messages 4.8 Diagnostic Tools
to renew the lock. To prevent the starvation of other MDFS Shell is a handy and powerful debugging tool to
writer clients, the Name Server releases the lock after execute all available file system commands. It is invoked
a user configured time limit if the client fails to renew by hadoop mdfs <command><command args>. Each
the lock. The lock is also released when the file is closed command has file path URI and other command specific
by the client. arguments. MDFS shell can simulate complex cluster
A file can have concurrent reader clients even if it is operations like concurrent reads and writes, device fail-
locked for a write. When a file is opened for a read, the ures, device reboots etc. All MDFS specific parameters
Name Server acquires a read lock on the corresponding can be changed at run time using MDFS shell. MDFS
file path to protect it from deletion from other clients. shell is particularly useful in testing new features and
As the writes are always executed in the local file analyzing its impact on overall performance of the
system, the data is not written to the network unless the system. All file system operations are logged in a user
file is closed or the block boundary is reached. So, the specific folder for debugging purposes and performance
changes made to the last block of the file may not be analysis. If any issue is encountered, the operation logs
visible to the reader clients while the write operation can be used to reproduce the issue and diagnose it.
2168-7161 (c) 2016 IEEE. Personal use is permitted, but republication/redistribution requires IEEE permission. See http://www.ieee.org/publications_standards/publications/rights/index.html for more
information.
This article has been accepted for publication in a future issue of this journal, but has not been fully edited. Content may change prior to final publication. Citation information: DOI 10.1109/TCC.2016.2603474,
IEEE Transactions on Cloud Computing

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOL. 3, NO. 1, JANUARY 2014 11

5 System Implementation 400 400

Hadoop Job Time (sec)

Hadoop Job Time (sec)


TeraSort of 50 MB DataSet TeraSort of 50 MB DataSet
390
380 380
We have used Apache Hadoop stable release 1.2.1 [21] 370
360 360
for our implementation. Our MDFS framework consists 350
340 340
of 18,365 lines of Java code, exported as a single jar file. 330
320 320
The MDFS code does not have any dependency on the 310
300 300
Hadoop code base. Similar to DistributedFileSystem 0 2 4 6 8 10 4 5 6 7 8 9 10
Block Size (MB) Cluster Size
class of HDFS, MDFS provides MobileDistributedFS
(a) (b)
class that implements FileSystem, the abstract base
class of Hadoop for backwards compatibility of all Fig. 7. Effect of (a) Block size (b) Cluster size on Job
present HDFS applications. The user invokes this object Completion Time
to interact with the file system. In order to switch from
3) Cluster Size. Each experiment was repeated 15 times
HDFS to MDFS, the Hadoop user only needs to add
and average values were computed. The parameters k
the location of jar file to the HADOOP CLASSPATH
and n are set to 3 and 10, respectively for all runs.
variable and change the file system scheme to ‘mdfs’.
Each node is configured to run 1 Map task and 1
The parameter ‘mdfs.standAloneConf’determines the
Reduce task per job, controlled by the parameters
MDFS architecture to be instantiated. If it is set
‘mapred.tasktracker.map.tasks.maximum’and
to false, all the servers are started locally as in
‘mapred.tasktracker.reduce.tasks.maximum’respectively.
the distributed architecture. If it is set to true, the
As this paper is the first work that addresses the
user needs to additionally configure the parameter
challenges in processing of large datasets in mobile
‘mdfs.nameservice.rpc-address’to specify the location of
environment, we do not have any solutions to
Name Server. In the present implementation, the Frag-
compare against. MDFS suffers the overhead of data
ment Mapper is started in the same node as the Name
encoding/decoding and data encryption/decryption,
Server. Since no changes are required in the existing
but MDFS achieves better reliability and energy-
code base for MDFS integration, the user can upgrade
efficiency. Furthermore, MDFS uses Android phones as
to a different Hadoop release without any conflict.
storage nodes and performs read/write operations on
these phones, but HDFS data node is not ported on
6 Performance Evaluation Android devices. As a result, it is difficult to compare
In this section, we present performance results and the performance between HDFS and MDFS directly.
identify bottlenecks involved in processing large input
datasets. To measure the performance of MDFS on
mobile devices, we ran Hadoop benchmarks on a hetero-
geneous 10 node mobile wireless cluster consisting of 1 6.1 Effect of Block Size on Job Completion Time
personal desktop computer (Intel Core 2 Duo 3 GHz
processor, 4 GB memory), 10 netbooks (Intel Atom The parameter ‘dfs.block.size’ in the configuration file
1.60 GHz processor, 1 GB memory, Wi-Fi 802.11 b/g determines the default value of block size. It can be
interface) and 3 HTC Evo 4G smartphones running overridden by the client during file creation if needed.
Android 2.3 OS (Scorpion 1Ghz processor, 512 MB Figure 7(a) shows the effect of block size on job com-
RAM, Wi-Fi 802.11 b/g interface). As TaskTracker pletion time. For our test cluster setup, we found that
daemons are not ported to the Android environment the optimal value of block size for a 50MB dataset is
yet, smartphones are used only for data storage, and 4 MB. The results show that the performance degrades
not for data processing. Note that although the Hadoop when the block size is reduced or increased further.
framework is not yet completely ported to Android A larger block size will reduce the number of blocks
smartphones in our experiment, which will be our future and thereby limit the amount of possible parallelism
work, the results obtained from the netbooks should in the cluster. By default, each Map task processes
be very similar to the results on real smartphones as one block of data at a time. There has to be sufficient
modern smartphones are equipped with more powerful number of tasks in the system such that they can be run
CPU, larger memory, and higher communication band- in parallel for maximum throughput. If the block size
width than the netbooks we used. is small, there will be more Map tasks processing lesser
We used TeraSort, a well-known benchmarking tool amount of data. This would lead to more read and write
that is included in the Apache Hadoop distribution. requests across the network proving to be costly in a
Our benchmark run consists of generating a random mobile environment. Figure 7(a) shows that processing
input data set using TeraGen and then sorting the time is 70% smaller than the network transmission
generated data using TeraSort. We considered the time for TeraSort benchmark. So, tasks have to be
following metrics: 1) Job completion time of TeraSort; sufficiently long enough to compensate the overhead in
2) MDFS Read/Writes Throughput; and 3) Network task setup and data transfer for maximum throughput.
bandwidth overhead. We are interested in the following For real world clusters, the optimal value of block size
parameters: 1) Size of input dataset; 2) Block Size; and will be experimentally obtained.
2168-7161 (c) 2016 IEEE. Personal use is permitted, but republication/redistribution requires IEEE permission. See http://www.ieee.org/publications_standards/publications/rights/index.html for more
information.
This article has been accepted for publication in a future issue of this journal, but has not been fully edited. Content may change prior to final publication. Citation information: DOI 10.1109/TCC.2016.2603474,
IEEE Transactions on Cloud Computing

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOL. 3, NO. 1, JANUARY 2014 12

MDFS Throughput (MB/s)

MDFS Throughput (MB/s)


400 3 3

Hadoop Job Time (sec)


Terasort of 100 MB DataSet (sec)
1
380
Job Completion rate

3 node failed 2.5


0.8 360 2 node failed 2.5
2
340 1 node failed
0.6
2 1.5
320
0.4 300 1
1.5
280 MDFS Write 0.5 MDFS Write
0.2
HDFS (1,3)
260 MDFS Read MDFS Read
MDFS (3,9) 1 0
0 4 8 12 16 20 24 1 2 3 4 5 6 7
0 1 2 3 4 5 6 7 8 1 2 3 4 5 6 7 8 9
Number of node failures Number of Iterations Size of Input Dataset (MB) Size of Input Dataset (MB)

(a) (b) (a) (b)


Fig. 8. Effect of (a) Comparison of job completion rate Fig. 9. MDFS Read/Write Throughput of (a) Large files
between HDFS and MDFS (b) Job time vs. Number of (b) Small files
failures. 500 200

Hadoop Job Time (sec)

Hadoop Job Time (sec)


TeraSort Network Time (sec)
180
TeraGen I/O Time (sec)
400 160
6.2 Effect of Cluster Size on Job Completion Time 140
300
120
The cluster size determines the level of possible paral- 200
100

lelization in the cluster. As the cluster size increases, 80


60
100
more tasks can be run in parallel, thus reducing the 40
0 20
job completion time. Figure 8(b) shows the effect of 20 40 60 80 100 10 20 30 40 50
Size of Input Dataset (MB) Size of Input Dataset (MB)
cluster size on job completion time. For larger files,
(a) (b)
there are several map tasks that can be operated in
parallel depending on the configured block size. So Fig. 10. (a) Job Completion time v.s. input dataset size
the performance is improved significantly with increase (b) Processing time vs. Transmission time
in cluster size as in the figure. For smaller files, the 100 MB data. Node failures are induced by turning
performance is not affected much by the cluster size, as off the wireless interface during the processing stage.
the performance gain obtained as part of parallelism is This emulates real world situations wherein devices
comparable to the additional cost incurred in the task get disconnected from the network due to hardware or
setup. connection failures. In Figure 7(b), one, two and three
simultaneous node failures are induced in iterations
6.3 Effect of node failures on MDFS and HDFS 3, 5 and 8 respectively and original state is restored
In this section, we compare the fault-tolerance capabil- in the succeeding iteration. The job completion time
ity between MDFS and HDFS. We consider a simple is increased by 10% for each failure but the system
failure model in which a task fails with its processor successfully recovered from these failures.
node and a taskTracker can not be restarted once it In the MDFS layer, the k-out-of-n framework pro-
fails (crash failure). There are total 12 nodes in the vides data reliability. If a node containing fragments is
network. In HDFS, each data block is replicated to 3 not available, the k-out-of-n framework chooses another
different nodes, and HDFS can tolerate to lose at most node for the data retrieval. Since k and n are set to
2 data nodes; in MDFS, each data block is encoded and 3 and 10 respectively, the system can tolerate up to
stored to 9 different nodes (k = 3, n = 9), and MDFS 7 node failures before the data becomes unavailable.
can tolerate to lose up to 6 data nodes. The reason we If any task fails due to unexpected conditions, Task-
set parameter (k, n) = (3, 9) in this experiment (rather Trackers notify the JobTracker about the task status.
using the same n = 10 in the previous experiments) is JobTracker is responsible for re-executing the failed
that (k, n) = (3, 9) has the same data redundancy as the tasks on some other machine. JobTracker also considers
default HDFS 3-Replication scheme. This experiment a task to be failed if the assigned TaskTracker does not
is independent to the previous experiments in which report the failure in configured timeout interval.
n = 10. Note that although HDFS can tolerate to lose
at most 2 data nodes, it does not mean that the job 6.5 Effect of Input Data Size
would fail if more than 2 nodes fail; if the failed node
Figure 9(a) and Figure 9(b) show the effect of input
does not carry the data block of the current job, it does
dataset size on MDFS throughput. The experiment
not affect the taskTracker; as a result, we see completion
measures the average read and write throughput for dif-
rate gradually drops from 1 after more than 3 nodes fail.
ferent file sizes. The block size is set to 4 MB. The result
Figure 8(a) shows that MDFS clearly achieves better
shows that the system is less efficient with small files
fault-tolerance when 3 or more nodes fail.
due to the overhead in setup of creation and retrieval
tasks. Maximum throughput is observed for file sizes
6.4 Effects of Node Failure Rate on Job Completion that are multiples of block size. This will reduce the to-
Time tal number of subtasks needed to read/write the whole
Our system is designed to tolerate failures. Figure 7(b) file, decreasing the overall overhead. In Figure 9(b), the
shows the reliability of our system in case of node throughput gradually increases when the input dataset
failures. The benchmark is run for 10 iterations for size is increased from 1 MB to 4 MB because more data
2168-7161 (c) 2016 IEEE. Personal use is permitted, but republication/redistribution requires IEEE permission. See http://www.ieee.org/publications_standards/publications/rights/index.html for more
information.
This article has been accepted for publication in a future issue of this journal, but has not been fully edited. Content may change prior to final publication. Citation information: DOI 10.1109/TCC.2016.2603474,
IEEE Transactions on Cloud Computing

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOL. 3, NO. 1, JANUARY 2014 13

400
can be transferred in a single block read/write request.

# of Broadcast Packets
Distributed Architecture 400

# of Broadcast Packets
350 Distributed Architecture
Centralized Architecture 350 Centralized Architecture
However, when input dataset size is increased further, 300
250
300
250
one additional request is required for extra data and 200
200

thus throughput drops suddenly. The results show that 150


100
150
100
maximum MDFS throughput is around 2.83 MB/s for 50
50

reads and 2.12 MB/s for writes for file sizes that are 0
0 10 20 30 40 50
0
4 5 6 7 8 9 10
multiples of block size. Size of Input Dataset (MB) Cluster Size

Figure 10 shows the effect of input dataset size on (a) (b)


job completion time. The experiment measures the job Fig. 11. Effect of (a) Input dataset size on Network
completion time for different file sizes ranging from 5 bandwidth overhead in Centralized and Distributed Archi-
MB to 100MB. Files generated in mobile devices are tecture (b) Cluster size
unlikely to exceed 100 MB. However, MDFS does not 1600
have any hard limit on input dataset size and it can take TeraGen-EnergyUnaware

Hadoop Job Time (sec)


1400 TeraGen-EnergyAware
any input size allowed in the standard Hadoop release. TeraSort-EnergyUnaware
The result shows that the job completion time varies in 1200 TeraSort-EnergyAware

less than linear time with input dataset size. For larger 1000
datasets, there is a sufficient number of tasks that can
800
be executed in parallel across the cluster resulting in
better node utilization and improved performance. 600

400
6.6 Centralized versus Distributed Architecture
200
The major difference between the distributed solution
0
and the centralized solution is that nodes in distributed 0 20 40 60 80 100
solution need to continuously synchronize their Name Size of Input Dataset (MB)
Server and Fragment Mapper. Due to the synchro-
nization, the distributed solution needs to broadcast Fig. 12. New task scheduling algorithm vs. Random
directory information after a file is created or updated. significant. Hence, a centralized approach is preferred
The broadcast messages directly impact the perfor- in large clusters. However, data reliability is guaranteed
mance difference between the centralized architecture by k-out-of-n framework in both architectures.
and the distributed architecture. When a MapReduce
job has more read operations, distributed architecture 6.7 Energy-aware task scheduling v.s. Random task
might perform better as all metadata information can scheduling
be queried locally rather than contacting the centralized As mentioned in Section 4.5, our energy-aware task
server; when a MapReduce task has more write opera- scheduling assigns tasks to taskTrackers considering the
tions, centralized architecture might perform better due locations of data fragments. The default task scheduling
to lesser broadcast messages. algorithm in Map-Reduce component is ineffective in
Figure 11(a) compares the number of broadcast mes- mobile ad-hoc network as the network topology in a
sages sent during file creation for different input dataset traditional data center is completely different from a
sizes. The block size is set to 4 MB. As input dataset size mobile network. Figure 12 compares the job completion
increases, the number of file blocks also increases. In a time between our energy-ware scheduling algorithm and
distributed architecture, each block allocation in Name a random task scheduling. The default Map-Reduce
Server and subsequent fragment information update in task scheduling in a mobile ad-hoc network is essen-
Fragment Mapper needs to be broadcast to all other tially a random task allocation. In both TeraGen and
nodes in the cluster so that their individual caches re- TeraSort experiments, our scheduling algorithm effec-
main in sync with each other. Large usage of bandwidth tively reduces the job completion time by more than
makes broadcasting a costly operation in wireless net- 100%. Lower job completion time indicates lower data
works. This effect is much worse when the cluster size retrieval time and lower data retrieval energy of each
grows. Figure 11(b) compares the number of broadcast taskTracker.
messages sent during file creation for varying cluster
sizes. The updates are not broadcast in a centralized 7 Roadmap for Mobile Hadoop 2.0
approach as the Name Server and Fragment Mappers Porting both the jobTracker and the taskTracker dae-
are singleton instances. mons to the Android environment is our ongoing work.
The results prove that the distributed architecture In future, we plan to integrate the energy-efficient
is ideal for medium sized clusters with independent and fault-tolerant k-out-of-n processing framework pro-
devices and no central server. The overhead due to posed in [9] into the Mobile Hadoop to provide better
broadcasting is minimal if the cluster is not large. For energy-efficiency and fault-tolerance in a dynamic net-
large clusters, the communication cost required to keep work. We will also look into the heterogeneous prop-
the metadata synchronized across all nodes becomes erties of the hardware and prioritize between various
2168-7161 (c) 2016 IEEE. Personal use is permitted, but republication/redistribution requires IEEE permission. See http://www.ieee.org/publications_standards/publications/rights/index.html for more
information.
This article has been accepted for publication in a future issue of this journal, but has not been fully edited. Content may change prior to final publication. Citation information: DOI 10.1109/TCC.2016.2603474,
IEEE Transactions on Cloud Computing

IEEE TRANSACTIONS ON CLOUD COMPUTING, VOL. 3, NO. 1, JANUARY 2014 14

devices based on the device specifications. For example, [12] E. E. Marinelli, “Hyrax: Cloud computing on mobile devices
if static devices are available in the network, they can using mapreduce,” Master’s thesis, School of Computer Sci-
ence Carnegie Mellon University, 2009.
be prioritized over other mobile nodes in the cluster [13] T. Kakantousis, I. Boutsis, V. Kalogeraki, D. Gunopulos,
for data storage as they are less likely to fail; if a G. Gasparis, and A. Dou, “Misco: A system for data analysis
more powerful node like a laptop is present, it can be applications on networks of smartphones using mapreduce,”
in Proc. Mobile Data Management, 2012.
assigned more tasks than a smartphone. How to balance [14] P. Elespuru, S. Shakya, and S. Mishra, “Mapreduce system
the processing load as well as the communication load over heterogeneous mobile devices,” in Software Technologies
of each node is a practical question that needs to be for Embedded and Ubiquitous Systems, 2009.
[15] F. Marozzo, D. Talia, and P. Trunfio, “P2p-mapreduce:
addressed. To mitigate the single point of failure issue in Parallel data processing in dynamic cloud environments,” J.
the centralized architecture, we plan to develop a hybrid Comput. Syst. Sci., 2012.
model where the Name Server and Fragment Mapper [16] C. A. Chen, M. Won, R. Stoleru, and G. G. Xie, “Energy-
efficient fault-tolerant data storage and processing in dy-
run concurrently on multiple master nodes. The hybrid namic networks,” in Proc. of MobiHoc, 2013.
model can reduce the load of each master node, tolerate [17] S. Ghemawat, H. Gobioff, and S.-T. Leung, “The google file
failures, and improve the RPC execution time. system,” SIGOPS Oper. Syst. Rev., 2003.
[18] P. H. Carns, W. B. Ligon, III, R. B. Ross, and R. Thakur,
“Pvfs: A parallel file system for linux clusters,” in Proc. of
8 Conclusions Annual Linux Showcase & Conference, 2000.
[19] S. A. Weil, S. A. Brandt, E. L. Miller, D. D. E. Long,
The Hadoop MapReduce framework over MDFS and C. Maltzahn, “Ceph: A scalable, high-performance dis-
demonstrates the capabilities of mobile devices to cap- tributed file system,” in Proc. of OSDI, 2006.
[20] “Lustre file system,” http://www.lustre.org.
italize on the steady growth of big data in the mobile [21] “Hadoop 1.2.1 release,” http://hadoop.apache.org/docs/r1.
environment. Our system addresses all the constraints 2.1/releasenotes.html.
of data processing in mobile cloud - energy efficiency,
data reliability and security. The evaluation results Johnu George received his B.Tech degree
show that our system is capable for big data analytics in Computer Science from National Institute
of Technology Calicut, India in 2009. He
of unstructured data like media files, text and sensor was a research engineer at Tejas Networks,
data. Our performance results look very promising for Bangalore during 2009-2012. He completed
the deployment of our system in real world clusters for his M.S. degree in Computer Science from
Texas A&M University in 2014. His research
big data analytic of unstructured data like media files, interests include distributed processing and
text and sensor data. big data technologies for mobile cloud.

Acknowledgment
Chien-An Chen received the BS and MS
This work was supported by Naval Postgraduate School degree in Electrical Engineering from Univer-
under Grant No. N00244-12-1-0035 and NSF under sity of California, Los Angeles in 2009 and
Grants #1127449, #1145858 and #0923203. 2010 respectively. He is currently working
toward the PhD degree in Computer Science
and Engineering at Texas A&M University.
References His research interests are mobile computing,
energy-efficient wireless network, and cloud
[1] S. Huchton, G. Xie, and R. Beverly, “Building and evaluating computing on mobile devices.
a k-resilient mobile distributed file system resistant to device
compromise,” in Proc. MILCOM, 2011.
[2] G. Huerta-Canepa and D. Lee, “A virtual cloud computing
provider for mobile devices,” in Proc. of MobiSys, 2010. Radu Stoleru is currently an associate pro-
[3] K. Kumar and Y.-H. Lu, “Cloud computing for mobile users: fessor in the Department of Computer Sci-
Can offloading computation save energy?” Computer, 2010. ence and Engineering at Texas A&M Uni-
[4] “Apache hadoop,” http://hadoop.apache.org/. versity. Dr. Stoleru’s research interests are
[5] S. George, Z. Wei, H. Chenji, W. Myounggyu, Y. O. Lee, in deeply embedded wireless sensor systems,
A. Pazarloglou, R. Stoleru, and P. Barooah, “Distressnet: a distributed systems, embedded computing,
wireless ad hoc and sensor network architecture for situation and computer networking. He is the recipient
management in disaster response,” Comm. Mag., IEEE, of the NSF CAREER Award in 2013. He has
2010. authored or co-authored over 80 conference
[6] J.-P. Hubaux, L. Buttyán, and S. Capkun, “The quest for and journal papers with over 3,000 citations.
security in mobile ad hoc networks,” in Proc. of MobiHoc,
2001.
[7] H. Yang, H. Luo, F. Ye, S. Lu, and L. Zhang, “Security in Geoffery G. Xie received the BS degree
mobile ad hoc networks: challenges and solutions,” Wireless in computer science from Fudan University,
Communications, IEEE, 2004. China, and the PhD degree in computer
[8] C. A. Chen, M. Won, R. Stoleru, and G. Xie, “Resource sciences from the University of Texas, Austin.
allocation for energy efficient k-out-of-n system in mobile ad He is a professor in the Computer Science
hoc networks,” in Proc. ICCCN, 2013. Department at the US Naval Postgraduate
[9] C. Chen, M. Won, R. Stoleru, and G. Xie, “Energy-efficient School. He has published more than 60 ar-
fault-tolerant data storage and processing in dynamic net- ticles in various areas of networking. His
work,” in MobiHoc, 2013. current research interests include network
[10] K. Shvachko, H. Kuang, S. Radia, and R. Chansler, “The analysis, routing design and theories, un-
hadoop distributed file system,” in Proc. of MSST, 2010. derwater acoustic networks, and abstraction
[11] J. Dean and S. Ghemawat, “Mapreduce: Simplified data driven design and analysis of enterprise networks.
processing on large clusters,” Commun. ACM, 2008.
2168-7161 (c) 2016 IEEE. Personal use is permitted, but republication/redistribution requires IEEE permission. See http://www.ieee.org/publications_standards/publications/rights/index.html for more
information.

You might also like