Unit2 HDFS

Download as docx, pdf, or txt
Download as docx, pdf, or txt
You are on page 1of 17

Filesystems that manage the storage across a network of machines are called distributed filesystems.

Since
they are network based, all the complications of network programming kick in, thus making distributed
filesystems more complex than regular disk filesystems. For example, one of the biggest challenges is
making the filesystem tolerate node failure without suffering data loss.

Design of HDFS

HDFS is a filesystem designed for storing very large files with streaming data access patterns, running on
clusters of commodity hardware.

1. Very large files


“Very large” in this context means files that are hundreds of megabytes, gigabytes, or terabytes in size.
There are Hadoop clusters running today that store petabytes of data

2. Streaming data access


HDFS is built around the idea that the most efficient data processing pattern is a write-once, read-many-
times pattern. A dataset is typically generated or copied from source, and then various analyses are
performed on that dataset over time. Each analysis will involve a large proportion, if not all, of the dataset,
so the time to read the whole dataset is more important than the latency in reading the first record.

3. Commodity hardware
Hadoop doesn’t require expensive, highly reliable hardware. It is designed to run on clusters of commodity
hardware (commonly available hardware that can be obtained from multiple vendors) for which the chance
of node failure across the cluster is high, at least for large clusters. HDFS is designed to carry on working
without a noticeable interruption to the user in the face of such failure.

Areas where HDFS is not a good fit today:

1. Low-latency data access


Applications that require low-latency access to data, in the tens of milliseconds range, will not work well
with HDFS. Remember, HDFS is optimized for delivering a high throughput of data, and this may be at
the expense of latency. HBase is currently a better choice for low-latency access.

2. Lots of small files


Because the namenode holds filesystem metadata in memory, the limit to the number of files in a
filesystem is governed by the amount of memory on the namenode. As a rule of thumb, each file,
directory, and block takes about 150 bytes. So, for example, if you had one million files, each taking one
block, you would need at least 300 MB of memory. Although storing millions of files is feasible, billions is
beyond the capability of current hardware.

3. Multiple writers, arbitrary file modifications


Files in HDFS may be written to by a single writer. Writes are always made at the end of the file, in
append-only fashion. There is no support for multiple writers or for modifications at arbitrary offsets in the
file. (These might be supported in the future, but they are likely to be relatively inefficient.)

HDFS Concepts
1. Blocks
A disk has a block size, which is the minimum amount of data that it can read or write.
Filesystems for a single disk build on this by dealing with data in blocks, which are an integral multiple of
the disk block size.
Filesystem blocks are typically a few kilobytes in size, whereas disk blocks are normally 512 bytes.
This is generally transparent to the filesystem user who is simply reading or writing a file of
whatever length.
However, there are tools to perform filesystem maintenance, such as df and fsck, that operate on the
filesystem block level.

HDFS, too, has the concept of a block, but it is a much larger unit—128 MB by default.
files in HDFS are broken into block-sized chunks,
which are stored as independent units.

Unlike a filesystem for a single disk, a file in HDFS that is smaller than a single block does not occupy a
full block’s worth of underlying storage.

Why do we need block abstraction :

 Files can be bigger than individual disks.


 Filesystem metadata does not need to be associated with each and every block.
 Simplifies storage management - Easy to figure out the number of blocks which can be stored on each disk.
 Fault tolerance and storage replication can be easily done on a per-block basis.

% hdfs fsck / -files –blocks

will list the blocks that make up each file in the filesystem

2. Namenodes and Datanodes


The namenode manages the filesystem namespace. It maintains the filesystem tree and the metadata for all
the files and directories in the tree. This information is stored persistently on the local disk in the form of
two files: the namespace image and the edit log. The namenode also knows the datanodes on which all the
blocks for a given file are located; however, it does not store block locations persistently, because this
information is reconstructed from datanodes when the system starts so the user code does not need to know
about the namenode and datanodes to function.

Datanodes are the workhorses of the filesystem. They store and retrieve blocks when they are told to (by
clients or the namenode), and they report back to the namenode periodically with lists of blocks that they
are storing. Without the namenode, the filesystem cannot be used. In fact, if the machine running the
namenode were obliterated, all the files on the filesystem would be lost since there would be no way of
knowing how to reconstruct the files from the blocks on the datanodes. For this reason, it is important to
make the namenode resilient to failure, and Hadoop provides two mechanisms for this. The first way is to
back up the files that make up the persistent state of the filesystem metadata. Hadoop can be configured so
that the namenode writes its persistent state to multiple filesystems. These writes are synchronous and
atomic. The usual configuration choice is to write to local disk as well as a remote NFS mount.

It is also possible to run a secondary namenode, which despite its name does not act as a namenode. Its
main role is to periodically merge the namespace image with the edit log to prevent the edit log from
becoming too large. The secondary namenode usually runs on a separate physical machine because it
requires plenty of CPU and as much memory as the namenode to perform the merge. It keeps a copy of the
merged namespace image, which can be used in the event of the namenode failing. However, the state
of the secondary namenode lags that of the primary, so in the event of total failure of the primary, data loss
is almost certain. The usual course of action in this case is to copy the namenode’s metadata files that are
on NFS to the secondary and run it as the new primary

3. Block Caching
Normally a datanode reads blocks from disk, but for frequently accessed files the blocks may be explicitly
cached in the datanode’s memory, in an off-heap block cache.
By default, a block is cached in only one datanode’s memory, although the number is configurable
on a per-file basis.

Job schedulers (for MapReduce, Spark, and other frameworks) can take advantage of cached blocks by
running tasks on the datanode where a block is cached, for increased read performance. Small lookup table
used in a join is a good candidate for caching, for example.

Users or applications instruct the namenode which files to cache (and for how long) by adding a cache
directive to a cache pool. Cache pools are an admin
istrative grouping for managing cache permissions and resource usage

4. HDFS Federation
https://data-flair.training/blogs/hadoop-hdfs-federation-tutorial/

1. Due to the tight coupling of namespace and the storage layer, an alternate implementation of NameNode is difficult. This
limits the usage of block storage directly by the other services.
2. Due to single NameNode, we can have only a limited number of DataNodes that a single NameNode can handle.
3. The operations of the filesystem are also limited to the number of tasks that NameNode handles at a time. Thus, the
performance of the cluster depends on the NameNode throughput.
4. Also, because of a single namespace, there is no isolation among the occupant organizations which are using the cluster.

In HDFS Federation architecture, there are multiple NameNodes and DataNodes.


Each NameNode has its own namespace and block pool.
All the NameNodes uses DataNodes as the common storage.

Every NameNode is independent of the other and does not require any coordination amongst themselves.

Each Datanode gets registered to all the NameNodes in the cluster and store blocks for all the block pools in the cluster.

Also, DataNodes periodically send heartbeats and block reports to all the NameNode in the cluster and handles the
instructions from the NameNodes.
Look at the figure below that shows the architecture design of the HDFS Federation.

In the above figure, which represents HDFS Federation architecture, there are multiple NameNodes which are represented as
NN1, NN2, ..NNn.

NS1, NS2, and so on are the multiple namespaces managed by their respective NameNode (NS1 by NN1, NS2 by NN2, and
so on).

Each namespace has its own block pool (NS1 has Pool1, NS2 has Pool2, and so on).

Each Datanode store blocks for all the block pools in the cluster.

For example, DataNode1 stores the blocks from Pool 1, Pool 2, Pool3, etc.

Let us now understand the block pool and namespace volume in detail.

Block pool
Block pool in HDFS Federation architecture is the collection of blocks belonging to the single namespace. HDFS Federation
architecture has a collection of block pools, and each block pool is managed independently from each other. This allows the
generation of the Block IDs for new blocks by the namespace, without any coordination with other namespaces.

Namespace Volume

Namespace with its block pool is termed as Namespace Volume. The HDFS Federation architecture has the collection of
Namespace volume, which is a self-contained management unit. On deleting the NameNode or namespace, the
corresponding block pool present in the DataNodes also gets deleted. On upgrading the cluster, each namespace volume gets
upgraded as a unit.

5. HDFS High Availability


As the Hadoop HDFS follows the master-slave architecture where the NameNode is the master node
and maintains the filesystem tree. So HDFS cannot be used without NameNode. This NameNode
becomes a bottleneck. HDFS high availability feature addresses this issue.

High availability refers to the availability of system or data in the wake of component failure in the
system.
The high availability feature in Hadoop ensures the availability of the Hadoop cluster without any
downtime, even in unfavorable conditions like NameNode failure, DataNode failure, machine crash,
etc.

It means if the machine crashes, data will be accessible from another path.

1. Availability if DataNode fails


 In HDFS, replicas of files are stored on different nodes.
 DataNodes in HDFS continuously sends heartbeat messages to NameNode every 3 seconds by default.
 If NameNode does not receive a heartbeat from DataNode within a specified time (10 minutes by default), the
NameNode considers the DataNode to be dead.
 NameNode then checks for the data in DataNode and initiates data replication. NameNode instructs the DataNodes
containing a copy of that data to replicate that data on other DataNodes.
 Whenever a user requests to access his data, NameNode provides the IP of the closest DataNode containing user data.
Meanwhile, if DataNode fails, the NameNode redirects the user to the other DataNode containing a copy of the same
data. The user requesting for data read, access the data from other DataNodes containing a copy of data, without any
downtime. Thus cluster is available to the user even if any of the DataNodes fails.

2. Availability if NameNode fails


NameNode is the only node that knows the list of files and directories in a Hadoop cluster. “The filesystem cannot be used
without NameNode”.
The addition of the High Availability feature in Hadoop 2 provides a fast failover to the Hadoop cluster. The Hadoop HA
cluster consists of two NameNodes (or more after Hadoop 3) running in a cluster in an active/passive configuration with a
hot standby. So, if an active node fails, then a passive node becomes the active NameNode, takes the responsibility, and
serves the client request.

This allows for the fast failover to the new machine even if the machine crashes.
Thus, data is available and accessible to the user even if the NameNode itself goes down.

Let us now study the NameNode High Availability in detail.

Before going to NameNode High Availability architecture, one should know the reason for introducing such architecture.

Prior to Hadoop 2.0, NameNode is the single point of failure in a Hadoop cluster. This is because:
1. Each cluster consists of only one NameNode. If the NameNode fails, then the whole cluster would go down. The cluster
would be available only when we either restart the NameNode or bring it on a separate machine. These had limited
availability in two ways:
 The cluster would be unavailable if the machine crash until an operator restarts the NameNode.
 Planned maintenance events such as software or hardware upgrades on the NameNode, results in downtime of the
Hadoop cluster.
2. The time taken by NameNode to start from cold on large clusters with many files can be 30 minutes or more. This long
recovery time is a problem.
To overcome these problems Hadoop High Availability architecture was introduced in Hadoop 2.

Passive node is the standby node that acts as a slave node, having similar data as an active node. It maintains enough state
to provide a fast failover.

There are two issues in maintaining the consistency of the HDFS high availability cluster. They are:

 The active node and the passive node should always be in sync with each other and must have the same metadata. This
allows us to restore the Hadoop cluster to the same namespace where it crashed.
 Only one NameNode in the same cluster must be active at a time. If two NameNodes are active at a time, then cluster
gets divided into smaller clusters, each one believing it is the only active cluster. This is known as the “ Split-brain
scenario” which leads to data loss or other incorrect results. Fencing is a process that ensures that only one NameNode
is active at a time.
Implementation of NameNode High Availability architecture
With two or more running NameNode in the same cluster, only one active at a time, we can configure and manage an HA
HDFS cluster, using any of the two ways:

1. Using Quorum Journal Nodes


Quorum Journal Nodes is an HDFS implementation.

The active node and the passive nodes communicate with a group of separate daemons called “JournalNodes,” which is the
lightweight process to get sync with each other.

The active node writes the edit log modification to the majority of JournalNodes.
There are generally three JournalNode daemons that allow the system to tolerate the failure of a single machine.

The system can tolerate at most (N-1) / 2 failures when running with N JournalNodes.

One should run an odd number of JNs, to increase the number of failures the system tolerates.

The active NameNode updates the edit log in the JNs.

The standby nodes, continuously watch the JNs for edit log change. Standby nodes read the changes in edit logs and apply
them to their namespace.

If the active NameNode fails, the standby will ensure that it has read all the edits from JournalNodes before promoting itself
to the Active state. It ensures that the NameNode state is properly synchronized before a failure occurs.

For providing fast failover, the Standby node must have up-to-date information regarding the location of blocks in the
cluster. To achieve this, DataNodes have the IPs of all NameNodes and send block information and heartbeats to all.

Fencing Of NameNode:
In order to prevent the HA cluster from the “split-brain scenario,” as discussed earlier, only one NameNode should be active
at a time.

Fencing is a process that ensures this property in a cluster.

In the Quorum JournalNodes implementation:

 The JournalNodes will allow only a single NameNode to be a writer at a time.


 During failover, the standby which is to become active will take over the responsibility of writing to the JournallNodes,
preventing other NameNode from continuing in the active state.
 The new active node can safely proceed with failover.

2. Using Shared Storage
In this implementation, the active node and the standby nodes have access to a directory on a shared storage device in order
to keep their states synchronized with each other.

During any namespace modification performed by active NameNode, active node log the modification record to an edit log
file that is stored in the shared storage device.

The standby nodes constantly watch this directory for edits. When edits occur, the Standby node applies them to its own
namespace.

In the case of failure, the standby node must ensure that it has read all edits from the shared directory before promoting itself
to the active state. This makes the namespace state, fully synchronized before a failover occurs.

The DataNodes send heartbeat messages and block locations to all the NameNodes. It makes standby nodes to have up-to-
date information about the location of blocks in the cluster.

[ps2id id=’Fencing-of-NameNode’ target=”/]Fencing of NameNode:


To prevent the HA cluster from a split-brain scenario, the administrator must configure at least one fencing method. It
ensures that only one NameNode is active at a time.
The fencing method can include the killing of the NameNode process and preventing its access to the shared storage
directory.

We can fence the previously active name node with one of the fencing techniques called STONITH or “shoot the other node
in the head”. It uses a specialized power distribution unit to forcibly power down the NameNode machine.

Command Line Interface


We are going to run HDFS on one machine, so first follow the instructions for setting up Hadoop in
pseudo distributed mode.

There are two properties that we set in the pseudo distributed configuration:

1. The first is fs.defaultFS, set to hdfs://localhost/, which is used to set a default filesystem for
Hadoop. We have used an hdfs URI to configure Hadoop to use HDFS by default. The HDFS
daemons will use this property to determine the host and port for the HDFS namenode. We’ll be
running it on localhost, on the default HDFS port, 8020

2. We set the second property, dfs.replication, to 1 so that HDFS doesn’t replicate file
system blocks by the default factor of three. When running with a single datanode, HDFS
can’t replicate blocks to three datanodes, so it would perpetually warn about blocks being
under-replicated. This setting solves that problem.

1. Basic File system operations

1. hadoop fs -help to get detailed help on every command


2. % hadoop fs -copyFromLocal input/docs/quangle.txt \
hdfs://localhost/user/tom/quangle.txt

This command invokes Hadoop’s filesystem shell command fs, which supports a number of subcommands
—in this case, we are running -copyFromLocal. The local file quangle.txt is copied to the file
/user/tom/quangle.txt on the HDFS instance running on localhost.

% hadoop fs -copyFromLocal input/docs/quangle.txt /user/tom/quangle.txt

3. Let’s copy the file back to the local filesystem and check whether it’s the same:
% hadoop fs -copyToLocal quangle.txt quangle.copy.txt

4. % md5 input/docs/quangle.txt quangle.copy.txt


MD5 (input/docs/quangle.txt) = e7891a2627cf263a079fb0f18256ffb2
MD5 (quangle.copy.txt) = e7891a2627cf263a079fb0f18256ffb2

The MD5 digests are the same, showing that the file survived its trip to HDFS and is back intact.

5. % hadoop fs -mkdir books


% hadoop fs -ls .

The information returned is very similar to that returned by the Unix command ls - l, with a
few minor differences. The first column shows the file mode. The second column is the
replication factor of the file (something a traditional Unix filesystem does not have).
Remember we set the default replication factor in the site-wide configuration to be 1, which
is why we see the same value here. The entry in this column is empty for directories because
the concept of replication does not apply to them—directories are treated as metadata and
stored by the namenode, not the datanodes. The third and fourth columns show the file owner
and group. The fifth column is the size of the file in bytes, or zero for directories. The sixth
and seventh columns are the last modified date and time. Finally, the eighth column is the
name of the file or directory.

2. File Permission in HDFS

There are three types of permission: the read permission ( r), the write permission (w), and the execute
permission (x). The read permission is required to read files or list the contents of a directory. The write
permission is required to write a file or, for a directory, to create or delete files or directories in it. The execute
permission is ignored for a file because you can’t execute a file on HDFS (unlike POSIX), and for a directory
this permission is required to access its children.

There is a concept of a superuser, which is the identity of the namenode process. Permissions checks are not
performed for the superuser.

Hadoop File system interfaces

1. Hadoop File System


2. Interfaces
A. HTTP:
1. Hadoop is written in Java, so most Hadoop filesystem interactions are mediated through the Java API.
The filesystem shell, for example, is a Java application that uses the Java FileSystem class to provide
filesystem operations.

2. By exposing its filesystem interface as a Java API, Hadoop makes it awkward for non- Java applications to
access HDFS.
3. The HTTP REST API exposed by the WebHDFS protocol makes it easier for other languages to interact
with HDFS.
4. Note that the HTTP interface is slower than the native Java client, so should be avoided for very large
data transfers if possible.
5. There are two ways of accessing HDFS over HTTP: directly, where the HDFS daemons serve HTTP
requests to clients; and via a proxy (or proxies), which accesses HDFS on the client’s behalf using the usual
DistributedFileSystem API. The two ways are illustrated in Figure 3-1. Both use the WebHDFS protocol.

CASE 1:
In the first case, the embedded web servers in the namenode and datanodes act as WebHDFS
endpoints. (WebHDFS is enabled by default, since dfs.webhdfs.enabled is set to true.) File metadata
operations are handled by the namenode, while file read (and write) operations are sent first to
the namenode, which sends an HTTP redirect to the client indicating the datanode to stream file
data from (or to).

CASE 2:
The second way of accessing HDFS over HTTP relies on one or more standalone proxy servers. (The
proxies are stateless, so they can run behind a standard load balancer.) All traffic to the cluster
passes through the proxy, so the client never accesses the namenode or datanode directly. This
allows for stricter firewall and bandwidth-limiting policies to be put in place. It’s common to use a
proxy for transfers between Hadoop clusters located in different data centers, or when accessing a
Hadoop cluster running in the cloud from an external network.

The HttpFS proxy exposes the same HTTP (and HTTPS) interface as WebHDFS, so clients can
access both using webhdfs (or swebhdfs) URIs. The HttpFS proxy is started independently of the
namenode and datanode daemons, using the httpfs.sh script, and by default listens on a different port
number (14000).

B. C
Hadoop provides a C library called libhdfs that mirrors the Java FileSystem interface. access any
Hadoop filesystem). It works using the Java Native Interface (JNI) to call a Java filesystem client. There is
also a libwebhdfs library that uses the WebHDFS interface described in the previous section.
The C API is very similar to the Java one, but it typically lags the Java one, so some newer features may
not be supported. You can find the header file, hdfs.h, in the include directory of the Apache Hadoop
binary tarball distribution.
The Apache Hadoop binary tarball comes with prebuilt libhdfs binaries for 64-bit Linux, but for other
platforms you will need to build them yourself by following the BUILD ING.txt instructions at the top
level of the source tree.

C. NFS
It is possible to mount HDFS on a local client’s filesystem using Hadoop’s NFSv3 gateway.
You can then use Unix utilities (such as ls and cat) to interact with the filesystem,
upload files, and in general use POSIX libraries to access the filesystem from any programming
language.
Appending to a file works, but random modifications of a file do not, since HDFS can only write to
the end of a file.

D. FUSE
Filesystem in Userspace (FUSE) allows filesystems that are implemented in user space to be
integrated as Unix filesystems. Hadoop’s Fuse-DFS contrib module allows HDFS (or any Hadoop
filesystem) to be mounted as a standard local filesystem. Fuse-DFS is implemented in C using libhdfs
as the interface to HDFS. At the time of writing, the Hadoop NFS gateway is the more robust solution
to mounting HDFS, so should be preferred over Fuse-DFS.

E. The Java Interface

Data Flow
Data Ingest with Flume and Sqoop and Hadoop Archives

Hadoop I/O:
3. Data Integrity:
Chances of data corruption is high as every I/O operation on the disk or the network carries with it a
small chance of introducing error when reading or writing data.

Detecting corrupted data:

Computing a checksum for the data when it first enters the system and again whenever it is
transmitted across a channel that is unreliable and hence capable of corrupting the data. The data is
deemed to be corrupt if the newly generated checksum doesn’t exactly match the original. A
commonly used error-detecting code is CRC-32 (32-bit cyclic redundancy check), which computes a
32-bit integer checksum for input of any size. CRC-32 is used for checksumming in Hadoop’s
ChecksumFileSystem, while HDFS uses a more efficient variant called CRC-32C.

Compression
1. File compression brings two major benefits: it reduces the space needed to store files, and it
speeds up data transfer across the network or to or from disk.
LZO files are splittable if they have been indexed in a preprocessing step

The “Splittable” column indicates whether the compression format supports splitting (that is,
whether you can seek to any point in the stream and start reading from some point further on).
Splittable compression formats are especially suitable for Map‐ Reduce;

2. All compression algorithms exhibit a space/time trade-off: faster compression and de‐
compression speeds usually come at the expense of smaller space savings.

3. The tools listed in above table typically give some control over this trade-off at compression time
by offering nine different options: –1 means optimize for speed, and -9 means optimize for space.

4. For example, the following command creates a compressed file file.gz using the fastest
compression method: % gzip -1 file

 gzip is a generalpurpose compressor and sits in the middle of the space/time trade-off.
 bzip2 compresses more effectively than gzip, but is slower
 bzip2’s decompression speed is faster than its compression speed, but it is still slower than
the other formats.
 LZO, LZ4, and Snappy, on the other hand, all optimize for speed and are around an order of
magnitude faster than gzip, but compress less effectively
 Snappy and LZ4 are also significantly faster than LZO for decompression

Codecs
1. A codec is the implementation of a compression-decompression algorithm.

2. In Hadoop, a codec is represented by an implementation of the CompressionCodec interface.


, GzipCodec encapsulates the compression and decompression algorithm for gzip
Compressing and decompressing streams with CompressionCodec

CompressionCodec has two methods that allow you to easily compress or decompress data.

To compress data being written to an output stream, use the createOutputStream(OutputStream


out) method to create a CompressionOutputStream to which you write your uncompressed data to
have it written in compressed form to the underlying stream.

To decompress data being read from an input stream call createInputStream(InputStream in) to
obtain a CompressionInputStream, which allows you to read uncompressed data from the
underlying stream.

The application expects the fully qualified name of the CompressionCodec implemen‐ tation as the
first command-line argument.

We use ReflectionUtils to construct a new instance of the codec, then obtain a compression wrapper
around System.out.

Then we call the utility method copyBytes() on IOUtils to copy the input to the output, which is
compressed by the CompressionOutputStream.
Finally, we call finish() on CompressionOutputStream, which tells the compressor to finish writing to
the com‐ pressed stream, but doesn’t close the stream.

We can try it out with the following command line, which compresses the string “Text” using the
StreamCompressor program with the GzipCodec, then decompresses it from standard input using
gunzip:

% echo "Text" | hadoop StreamCompressor org.apache.hadoop.io.compress.GzipCodec \ | gunzip –


Text

Inferring CompressionCodecs using CompressionCodecFactory

If you are reading a compressed file, normally you can infer which codec to use by looking at its
filename extension. A file ending in .gz can be read with GzipCodec.

CompressionCodecFactory provides a way of mapping a filename extension to a CompressionCodec


using its getCodec() method, which takes a Path object for the file in question

Once the codec has been found, it is used to strip off the file suffix to form the output filename (via
the removeSuffix() static method of CompressionCodecFactory).

In this way, a file named file.gz is decompressed to file by invoking the program as follows:
% hadoop FileDecompressor file.gz

Each codec knows its default filename extension, thus permitting CompressionCodecFactory to
search through the registered codecs to find a match for the given extension (if any).

Compression and Input Splits


When considering how to compress data that will be processed by MapReduce, it is important to
understand whether the compression format supports splitting.

Consider an uncompressed file stored in HDFS whose size is 1 GB. With an HDFS block size of 128
MB, the file will be stored as eight blocks, and a MapReduce job using this file as input will create
eight input splits, each processed independently as input to a separate map task.

Imagine now that the file is a gzip-compressed file whose compressed size is 1 GB. As before, HDFS
will store the file as eight blocks. However, creating a split for each block won’t work, because it is
impossible to start reading at an arbitrary point in the gzip stream and therefore impossible for a
map task to read its split independently of the others.

The gzip format uses DEFLATE to store the compressed data, and DEFLATE stores data as a series of
compressed blocks.

The problem is that the start of each block is not distinguished in any way that would allow a
reader positioned at an arbitrary point in the stream to advance to the beginning of the next
block, thereby synchronizing itself with the stream. For this reason, gzip does not support splitting.

In this case, MapReduce will do the right thing and not try to split the gzipped file, since it knows
that the input is gzip-compressed (by looking at the filename extension) and that gzip does not
support splitting. This will work, but at the expense of locality: a single map will process the eight
HDFS blocks, most of which will not be local to the map. Also, with fewer maps, the job is less
granular and so may take longer to run.

Serialization

Serialization is the process of turning structured objects into a byte stream for trans‐ mission over a
network or for writing to persistent storage. Deserialization is the reverse process of turning a byte
stream back into a series of structured objects.
The byte stream created is platform independent. So, the object serialized on one platform can be deserialized on a
different platform.

To make a Java object serializable we implement the java.io.Serializable interface. The ObjectOutputStream class


contains writeObject() method for serializing an Object. 
 
public final void writeObject(Object obj)
throws IOException
The ObjectInputStream class contains readObject() method for deserializing an object. 
 
public final Object readObject()
throws IOException,
ClassNotFoundException

In Hadoop, interprocess communication between nodes in the system is implemented using remote
procedure calls (RPCs). The RPC protocol uses serialization to render the message into a binary
stream to be sent to the remote node, which then deserializes the binary stream into the original
message. In general, it is desirable that an RPC seriali‐ zation format is:

Compact

A compact format makes the best use of network bandwidth, which is the most scarce resource in a
data center.

Fast
Interprocess communication forms the backbone for a distributed system, so it is essential that
there is as little performance overhead as possible for the serialization and deserialization process.

Extensible

Protocols change over time to meet new requirements, so it should be straightforward to evolve the
protocol in a controlled manner for clients and servers. For example, it should be possible to add a
new argument to a method call and have the new servers accept messages in the old format
(without the new ar‐ gument) from old clients.

Interoperable

For some systems, it is desirable to be able to support clients that are written in different languages
to the server, so the format needs to be designed to make this possible.

How to enable data serialization in it?

 Data serialization is a process that converts structure data manually back to the original form.
 Serialize to translate data structures into a stream of data. Transmit this stream of data over the
network or store it in DB regardless of the system architecture.
 Isn't storing information in binary form or stream of bytes is the right approach.
 Serialization does the same but isn't dependent on architecture.

Consider CSV files contains a comma (,) in between data, so while Deserialization, wrong outputs
may occur. Now, if metadata is stored in XML form, a self- architected form of data storage, data can
easily deserialize.

Why Data Serialization for Storage Formats?

 To process records faster (Time-bound).


 When proper data formats need to maintain and transmit over data without schema support on
another end.
 Now when in the future, data without structure or format needs to process, complex Errors may occur.
 Serialization offers data validation over transmission

The Writable Interface


The Writable interface defines two methods: one for writing its state to a DataOutput
binary stream, and one for reading its state from a DataInput binary stream:
S.No. Methods and Description

1
void readFields(DataInput in)

This method is used to deserialize the fields of the given object.

2
void write(DataOutput out)

This method is used to serialize the fields of the given object.

Writable Classes
Hadoop comes with a large selection of Writable classes in the org.apache.hadoop.io
package.

Text
Text is a Writable for UTF-8 sequences. It can be thought of as the Writable equivalent
of java.lang.String.

Writable collections
There are six Writable collection types in the org.apache.hadoop.io package: Array
Writable, ArrayPrimitiveWritable, TwoDArrayWritable, MapWritable, SortedMapWrita
ble, and EnumSetWritable.

Avro
Apache Avro is a language-neutral data serialization system. The project was created
by Doug Cutting (the creator of Hadoop) to address the major downside of Hadoop
Writables: lack of language portability.

File-Based Data Structures


For some applications, you need a specialized data structure to hold your data. For
doing MapReduce-based processing, putting each blob of binary data into its own file
doesn’t scale, so Hadoop developed a number of higher-level containers for these
situations.

Avro

File based Data Structures

You might also like