Unit2 HDFS
Unit2 HDFS
Unit2 HDFS
Since
they are network based, all the complications of network programming kick in, thus making distributed
filesystems more complex than regular disk filesystems. For example, one of the biggest challenges is
making the filesystem tolerate node failure without suffering data loss.
Design of HDFS
HDFS is a filesystem designed for storing very large files with streaming data access patterns, running on
clusters of commodity hardware.
3. Commodity hardware
Hadoop doesn’t require expensive, highly reliable hardware. It is designed to run on clusters of commodity
hardware (commonly available hardware that can be obtained from multiple vendors) for which the chance
of node failure across the cluster is high, at least for large clusters. HDFS is designed to carry on working
without a noticeable interruption to the user in the face of such failure.
HDFS Concepts
1. Blocks
A disk has a block size, which is the minimum amount of data that it can read or write.
Filesystems for a single disk build on this by dealing with data in blocks, which are an integral multiple of
the disk block size.
Filesystem blocks are typically a few kilobytes in size, whereas disk blocks are normally 512 bytes.
This is generally transparent to the filesystem user who is simply reading or writing a file of
whatever length.
However, there are tools to perform filesystem maintenance, such as df and fsck, that operate on the
filesystem block level.
HDFS, too, has the concept of a block, but it is a much larger unit—128 MB by default.
files in HDFS are broken into block-sized chunks,
which are stored as independent units.
Unlike a filesystem for a single disk, a file in HDFS that is smaller than a single block does not occupy a
full block’s worth of underlying storage.
will list the blocks that make up each file in the filesystem
Datanodes are the workhorses of the filesystem. They store and retrieve blocks when they are told to (by
clients or the namenode), and they report back to the namenode periodically with lists of blocks that they
are storing. Without the namenode, the filesystem cannot be used. In fact, if the machine running the
namenode were obliterated, all the files on the filesystem would be lost since there would be no way of
knowing how to reconstruct the files from the blocks on the datanodes. For this reason, it is important to
make the namenode resilient to failure, and Hadoop provides two mechanisms for this. The first way is to
back up the files that make up the persistent state of the filesystem metadata. Hadoop can be configured so
that the namenode writes its persistent state to multiple filesystems. These writes are synchronous and
atomic. The usual configuration choice is to write to local disk as well as a remote NFS mount.
It is also possible to run a secondary namenode, which despite its name does not act as a namenode. Its
main role is to periodically merge the namespace image with the edit log to prevent the edit log from
becoming too large. The secondary namenode usually runs on a separate physical machine because it
requires plenty of CPU and as much memory as the namenode to perform the merge. It keeps a copy of the
merged namespace image, which can be used in the event of the namenode failing. However, the state
of the secondary namenode lags that of the primary, so in the event of total failure of the primary, data loss
is almost certain. The usual course of action in this case is to copy the namenode’s metadata files that are
on NFS to the secondary and run it as the new primary
3. Block Caching
Normally a datanode reads blocks from disk, but for frequently accessed files the blocks may be explicitly
cached in the datanode’s memory, in an off-heap block cache.
By default, a block is cached in only one datanode’s memory, although the number is configurable
on a per-file basis.
Job schedulers (for MapReduce, Spark, and other frameworks) can take advantage of cached blocks by
running tasks on the datanode where a block is cached, for increased read performance. Small lookup table
used in a join is a good candidate for caching, for example.
Users or applications instruct the namenode which files to cache (and for how long) by adding a cache
directive to a cache pool. Cache pools are an admin
istrative grouping for managing cache permissions and resource usage
4. HDFS Federation
https://data-flair.training/blogs/hadoop-hdfs-federation-tutorial/
1. Due to the tight coupling of namespace and the storage layer, an alternate implementation of NameNode is difficult. This
limits the usage of block storage directly by the other services.
2. Due to single NameNode, we can have only a limited number of DataNodes that a single NameNode can handle.
3. The operations of the filesystem are also limited to the number of tasks that NameNode handles at a time. Thus, the
performance of the cluster depends on the NameNode throughput.
4. Also, because of a single namespace, there is no isolation among the occupant organizations which are using the cluster.
Every NameNode is independent of the other and does not require any coordination amongst themselves.
Each Datanode gets registered to all the NameNodes in the cluster and store blocks for all the block pools in the cluster.
Also, DataNodes periodically send heartbeats and block reports to all the NameNode in the cluster and handles the
instructions from the NameNodes.
Look at the figure below that shows the architecture design of the HDFS Federation.
In the above figure, which represents HDFS Federation architecture, there are multiple NameNodes which are represented as
NN1, NN2, ..NNn.
NS1, NS2, and so on are the multiple namespaces managed by their respective NameNode (NS1 by NN1, NS2 by NN2, and
so on).
Each namespace has its own block pool (NS1 has Pool1, NS2 has Pool2, and so on).
Each Datanode store blocks for all the block pools in the cluster.
For example, DataNode1 stores the blocks from Pool 1, Pool 2, Pool3, etc.
Let us now understand the block pool and namespace volume in detail.
Block pool
Block pool in HDFS Federation architecture is the collection of blocks belonging to the single namespace. HDFS Federation
architecture has a collection of block pools, and each block pool is managed independently from each other. This allows the
generation of the Block IDs for new blocks by the namespace, without any coordination with other namespaces.
Namespace Volume
Namespace with its block pool is termed as Namespace Volume. The HDFS Federation architecture has the collection of
Namespace volume, which is a self-contained management unit. On deleting the NameNode or namespace, the
corresponding block pool present in the DataNodes also gets deleted. On upgrading the cluster, each namespace volume gets
upgraded as a unit.
High availability refers to the availability of system or data in the wake of component failure in the
system.
The high availability feature in Hadoop ensures the availability of the Hadoop cluster without any
downtime, even in unfavorable conditions like NameNode failure, DataNode failure, machine crash,
etc.
It means if the machine crashes, data will be accessible from another path.
This allows for the fast failover to the new machine even if the machine crashes.
Thus, data is available and accessible to the user even if the NameNode itself goes down.
Before going to NameNode High Availability architecture, one should know the reason for introducing such architecture.
Prior to Hadoop 2.0, NameNode is the single point of failure in a Hadoop cluster. This is because:
1. Each cluster consists of only one NameNode. If the NameNode fails, then the whole cluster would go down. The cluster
would be available only when we either restart the NameNode or bring it on a separate machine. These had limited
availability in two ways:
The cluster would be unavailable if the machine crash until an operator restarts the NameNode.
Planned maintenance events such as software or hardware upgrades on the NameNode, results in downtime of the
Hadoop cluster.
2. The time taken by NameNode to start from cold on large clusters with many files can be 30 minutes or more. This long
recovery time is a problem.
To overcome these problems Hadoop High Availability architecture was introduced in Hadoop 2.
Passive node is the standby node that acts as a slave node, having similar data as an active node. It maintains enough state
to provide a fast failover.
There are two issues in maintaining the consistency of the HDFS high availability cluster. They are:
The active node and the passive node should always be in sync with each other and must have the same metadata. This
allows us to restore the Hadoop cluster to the same namespace where it crashed.
Only one NameNode in the same cluster must be active at a time. If two NameNodes are active at a time, then cluster
gets divided into smaller clusters, each one believing it is the only active cluster. This is known as the “ Split-brain
scenario” which leads to data loss or other incorrect results. Fencing is a process that ensures that only one NameNode
is active at a time.
Implementation of NameNode High Availability architecture
With two or more running NameNode in the same cluster, only one active at a time, we can configure and manage an HA
HDFS cluster, using any of the two ways:
The active node and the passive nodes communicate with a group of separate daemons called “JournalNodes,” which is the
lightweight process to get sync with each other.
The active node writes the edit log modification to the majority of JournalNodes.
There are generally three JournalNode daemons that allow the system to tolerate the failure of a single machine.
The system can tolerate at most (N-1) / 2 failures when running with N JournalNodes.
One should run an odd number of JNs, to increase the number of failures the system tolerates.
The standby nodes, continuously watch the JNs for edit log change. Standby nodes read the changes in edit logs and apply
them to their namespace.
If the active NameNode fails, the standby will ensure that it has read all the edits from JournalNodes before promoting itself
to the Active state. It ensures that the NameNode state is properly synchronized before a failure occurs.
For providing fast failover, the Standby node must have up-to-date information regarding the location of blocks in the
cluster. To achieve this, DataNodes have the IPs of all NameNodes and send block information and heartbeats to all.
Fencing Of NameNode:
In order to prevent the HA cluster from the “split-brain scenario,” as discussed earlier, only one NameNode should be active
at a time.
During any namespace modification performed by active NameNode, active node log the modification record to an edit log
file that is stored in the shared storage device.
The standby nodes constantly watch this directory for edits. When edits occur, the Standby node applies them to its own
namespace.
In the case of failure, the standby node must ensure that it has read all edits from the shared directory before promoting itself
to the active state. This makes the namespace state, fully synchronized before a failover occurs.
The DataNodes send heartbeat messages and block locations to all the NameNodes. It makes standby nodes to have up-to-
date information about the location of blocks in the cluster.
We can fence the previously active name node with one of the fencing techniques called STONITH or “shoot the other node
in the head”. It uses a specialized power distribution unit to forcibly power down the NameNode machine.
There are two properties that we set in the pseudo distributed configuration:
1. The first is fs.defaultFS, set to hdfs://localhost/, which is used to set a default filesystem for
Hadoop. We have used an hdfs URI to configure Hadoop to use HDFS by default. The HDFS
daemons will use this property to determine the host and port for the HDFS namenode. We’ll be
running it on localhost, on the default HDFS port, 8020
2. We set the second property, dfs.replication, to 1 so that HDFS doesn’t replicate file
system blocks by the default factor of three. When running with a single datanode, HDFS
can’t replicate blocks to three datanodes, so it would perpetually warn about blocks being
under-replicated. This setting solves that problem.
This command invokes Hadoop’s filesystem shell command fs, which supports a number of subcommands
—in this case, we are running -copyFromLocal. The local file quangle.txt is copied to the file
/user/tom/quangle.txt on the HDFS instance running on localhost.
3. Let’s copy the file back to the local filesystem and check whether it’s the same:
% hadoop fs -copyToLocal quangle.txt quangle.copy.txt
The MD5 digests are the same, showing that the file survived its trip to HDFS and is back intact.
The information returned is very similar to that returned by the Unix command ls - l, with a
few minor differences. The first column shows the file mode. The second column is the
replication factor of the file (something a traditional Unix filesystem does not have).
Remember we set the default replication factor in the site-wide configuration to be 1, which
is why we see the same value here. The entry in this column is empty for directories because
the concept of replication does not apply to them—directories are treated as metadata and
stored by the namenode, not the datanodes. The third and fourth columns show the file owner
and group. The fifth column is the size of the file in bytes, or zero for directories. The sixth
and seventh columns are the last modified date and time. Finally, the eighth column is the
name of the file or directory.
There are three types of permission: the read permission ( r), the write permission (w), and the execute
permission (x). The read permission is required to read files or list the contents of a directory. The write
permission is required to write a file or, for a directory, to create or delete files or directories in it. The execute
permission is ignored for a file because you can’t execute a file on HDFS (unlike POSIX), and for a directory
this permission is required to access its children.
There is a concept of a superuser, which is the identity of the namenode process. Permissions checks are not
performed for the superuser.
2. By exposing its filesystem interface as a Java API, Hadoop makes it awkward for non- Java applications to
access HDFS.
3. The HTTP REST API exposed by the WebHDFS protocol makes it easier for other languages to interact
with HDFS.
4. Note that the HTTP interface is slower than the native Java client, so should be avoided for very large
data transfers if possible.
5. There are two ways of accessing HDFS over HTTP: directly, where the HDFS daemons serve HTTP
requests to clients; and via a proxy (or proxies), which accesses HDFS on the client’s behalf using the usual
DistributedFileSystem API. The two ways are illustrated in Figure 3-1. Both use the WebHDFS protocol.
CASE 1:
In the first case, the embedded web servers in the namenode and datanodes act as WebHDFS
endpoints. (WebHDFS is enabled by default, since dfs.webhdfs.enabled is set to true.) File metadata
operations are handled by the namenode, while file read (and write) operations are sent first to
the namenode, which sends an HTTP redirect to the client indicating the datanode to stream file
data from (or to).
CASE 2:
The second way of accessing HDFS over HTTP relies on one or more standalone proxy servers. (The
proxies are stateless, so they can run behind a standard load balancer.) All traffic to the cluster
passes through the proxy, so the client never accesses the namenode or datanode directly. This
allows for stricter firewall and bandwidth-limiting policies to be put in place. It’s common to use a
proxy for transfers between Hadoop clusters located in different data centers, or when accessing a
Hadoop cluster running in the cloud from an external network.
The HttpFS proxy exposes the same HTTP (and HTTPS) interface as WebHDFS, so clients can
access both using webhdfs (or swebhdfs) URIs. The HttpFS proxy is started independently of the
namenode and datanode daemons, using the httpfs.sh script, and by default listens on a different port
number (14000).
B. C
Hadoop provides a C library called libhdfs that mirrors the Java FileSystem interface. access any
Hadoop filesystem). It works using the Java Native Interface (JNI) to call a Java filesystem client. There is
also a libwebhdfs library that uses the WebHDFS interface described in the previous section.
The C API is very similar to the Java one, but it typically lags the Java one, so some newer features may
not be supported. You can find the header file, hdfs.h, in the include directory of the Apache Hadoop
binary tarball distribution.
The Apache Hadoop binary tarball comes with prebuilt libhdfs binaries for 64-bit Linux, but for other
platforms you will need to build them yourself by following the BUILD ING.txt instructions at the top
level of the source tree.
C. NFS
It is possible to mount HDFS on a local client’s filesystem using Hadoop’s NFSv3 gateway.
You can then use Unix utilities (such as ls and cat) to interact with the filesystem,
upload files, and in general use POSIX libraries to access the filesystem from any programming
language.
Appending to a file works, but random modifications of a file do not, since HDFS can only write to
the end of a file.
D. FUSE
Filesystem in Userspace (FUSE) allows filesystems that are implemented in user space to be
integrated as Unix filesystems. Hadoop’s Fuse-DFS contrib module allows HDFS (or any Hadoop
filesystem) to be mounted as a standard local filesystem. Fuse-DFS is implemented in C using libhdfs
as the interface to HDFS. At the time of writing, the Hadoop NFS gateway is the more robust solution
to mounting HDFS, so should be preferred over Fuse-DFS.
Data Flow
Data Ingest with Flume and Sqoop and Hadoop Archives
Hadoop I/O:
3. Data Integrity:
Chances of data corruption is high as every I/O operation on the disk or the network carries with it a
small chance of introducing error when reading or writing data.
Computing a checksum for the data when it first enters the system and again whenever it is
transmitted across a channel that is unreliable and hence capable of corrupting the data. The data is
deemed to be corrupt if the newly generated checksum doesn’t exactly match the original. A
commonly used error-detecting code is CRC-32 (32-bit cyclic redundancy check), which computes a
32-bit integer checksum for input of any size. CRC-32 is used for checksumming in Hadoop’s
ChecksumFileSystem, while HDFS uses a more efficient variant called CRC-32C.
Compression
1. File compression brings two major benefits: it reduces the space needed to store files, and it
speeds up data transfer across the network or to or from disk.
LZO files are splittable if they have been indexed in a preprocessing step
The “Splittable” column indicates whether the compression format supports splitting (that is,
whether you can seek to any point in the stream and start reading from some point further on).
Splittable compression formats are especially suitable for Map‐ Reduce;
2. All compression algorithms exhibit a space/time trade-off: faster compression and de‐
compression speeds usually come at the expense of smaller space savings.
3. The tools listed in above table typically give some control over this trade-off at compression time
by offering nine different options: –1 means optimize for speed, and -9 means optimize for space.
4. For example, the following command creates a compressed file file.gz using the fastest
compression method: % gzip -1 file
gzip is a generalpurpose compressor and sits in the middle of the space/time trade-off.
bzip2 compresses more effectively than gzip, but is slower
bzip2’s decompression speed is faster than its compression speed, but it is still slower than
the other formats.
LZO, LZ4, and Snappy, on the other hand, all optimize for speed and are around an order of
magnitude faster than gzip, but compress less effectively
Snappy and LZ4 are also significantly faster than LZO for decompression
Codecs
1. A codec is the implementation of a compression-decompression algorithm.
CompressionCodec has two methods that allow you to easily compress or decompress data.
To decompress data being read from an input stream call createInputStream(InputStream in) to
obtain a CompressionInputStream, which allows you to read uncompressed data from the
underlying stream.
The application expects the fully qualified name of the CompressionCodec implemen‐ tation as the
first command-line argument.
We use ReflectionUtils to construct a new instance of the codec, then obtain a compression wrapper
around System.out.
Then we call the utility method copyBytes() on IOUtils to copy the input to the output, which is
compressed by the CompressionOutputStream.
Finally, we call finish() on CompressionOutputStream, which tells the compressor to finish writing to
the com‐ pressed stream, but doesn’t close the stream.
We can try it out with the following command line, which compresses the string “Text” using the
StreamCompressor program with the GzipCodec, then decompresses it from standard input using
gunzip:
If you are reading a compressed file, normally you can infer which codec to use by looking at its
filename extension. A file ending in .gz can be read with GzipCodec.
Once the codec has been found, it is used to strip off the file suffix to form the output filename (via
the removeSuffix() static method of CompressionCodecFactory).
In this way, a file named file.gz is decompressed to file by invoking the program as follows:
% hadoop FileDecompressor file.gz
Each codec knows its default filename extension, thus permitting CompressionCodecFactory to
search through the registered codecs to find a match for the given extension (if any).
Consider an uncompressed file stored in HDFS whose size is 1 GB. With an HDFS block size of 128
MB, the file will be stored as eight blocks, and a MapReduce job using this file as input will create
eight input splits, each processed independently as input to a separate map task.
Imagine now that the file is a gzip-compressed file whose compressed size is 1 GB. As before, HDFS
will store the file as eight blocks. However, creating a split for each block won’t work, because it is
impossible to start reading at an arbitrary point in the gzip stream and therefore impossible for a
map task to read its split independently of the others.
The gzip format uses DEFLATE to store the compressed data, and DEFLATE stores data as a series of
compressed blocks.
The problem is that the start of each block is not distinguished in any way that would allow a
reader positioned at an arbitrary point in the stream to advance to the beginning of the next
block, thereby synchronizing itself with the stream. For this reason, gzip does not support splitting.
In this case, MapReduce will do the right thing and not try to split the gzipped file, since it knows
that the input is gzip-compressed (by looking at the filename extension) and that gzip does not
support splitting. This will work, but at the expense of locality: a single map will process the eight
HDFS blocks, most of which will not be local to the map. Also, with fewer maps, the job is less
granular and so may take longer to run.
Serialization
Serialization is the process of turning structured objects into a byte stream for trans‐ mission over a
network or for writing to persistent storage. Deserialization is the reverse process of turning a byte
stream back into a series of structured objects.
The byte stream created is platform independent. So, the object serialized on one platform can be deserialized on a
different platform.
In Hadoop, interprocess communication between nodes in the system is implemented using remote
procedure calls (RPCs). The RPC protocol uses serialization to render the message into a binary
stream to be sent to the remote node, which then deserializes the binary stream into the original
message. In general, it is desirable that an RPC seriali‐ zation format is:
Compact
A compact format makes the best use of network bandwidth, which is the most scarce resource in a
data center.
Fast
Interprocess communication forms the backbone for a distributed system, so it is essential that
there is as little performance overhead as possible for the serialization and deserialization process.
Extensible
Protocols change over time to meet new requirements, so it should be straightforward to evolve the
protocol in a controlled manner for clients and servers. For example, it should be possible to add a
new argument to a method call and have the new servers accept messages in the old format
(without the new ar‐ gument) from old clients.
Interoperable
For some systems, it is desirable to be able to support clients that are written in different languages
to the server, so the format needs to be designed to make this possible.
Data serialization is a process that converts structure data manually back to the original form.
Serialize to translate data structures into a stream of data. Transmit this stream of data over the
network or store it in DB regardless of the system architecture.
Isn't storing information in binary form or stream of bytes is the right approach.
Serialization does the same but isn't dependent on architecture.
Consider CSV files contains a comma (,) in between data, so while Deserialization, wrong outputs
may occur. Now, if metadata is stored in XML form, a self- architected form of data storage, data can
easily deserialize.
1
void readFields(DataInput in)
2
void write(DataOutput out)
Writable Classes
Hadoop comes with a large selection of Writable classes in the org.apache.hadoop.io
package.
Text
Text is a Writable for UTF-8 sequences. It can be thought of as the Writable equivalent
of java.lang.String.
Writable collections
There are six Writable collection types in the org.apache.hadoop.io package: Array
Writable, ArrayPrimitiveWritable, TwoDArrayWritable, MapWritable, SortedMapWrita
ble, and EnumSetWritable.
Avro
Apache Avro is a language-neutral data serialization system. The project was created
by Doug Cutting (the creator of Hadoop) to address the major downside of Hadoop
Writables: lack of language portability.
Avro