BDA Unit 4 Notes
BDA Unit 4 Notes
BDA Unit 4 Notes
Hive and Impala table in HDFS can be created using four different Hadoop file formats:
Text files
Sequence File
Avro data files
Parquet file format
Let’s learn about each Hadoop file formats in detail.
1. Text files
A text file is the most basic and a human-readable file. It can be read or written in any
programming language and is mostly delimited by comma or tab.
The text file format consumes more space when a numeric value needs to be stored as a string. It
is also difficult to represent binary data such as an image.
2. Sequence File
The sequence file format can be used to store an image in the binary format. They store key-
value pairs in a binary container format and are more efficient than a text file. However,
sequence files are not human- readable.
The Avro file format has efficient storage due to optimized binary encoding. It is widely
supported both inside and outside the Hadoop ecosystem.
The Avro file format is ideal for long-term storage of important data. It can read from and write
in many languages like Java, Scala and so on. Schema metadata can be embedded in the file to
ensure that it will always be readable. Schema evolution can accommodate changes. The Avro
file format is considered the best choice for general-purpose storage in Hadoop.
Analyzing the Data with Hadoop to take advantage of the parallel processing that Hadoop
provides, we need to express our query as a MapReduce job.
MapReduce works by breaking the processing into two phases: the map phase and the reduce
phase. Each phase has key-value pairs as input and output, the types of which may be chosen by
the programmer.
The programmer also specifies two functions:the map function and the reduce function.
The input to our map phase is the raw NCDC data. To visualize the way the map works, consider
the following sample lines of input data (some unused columns have been dropped to fit the
page, indicated by ellipses):
0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...
These lines are presented to the map function as the key-value pairs:
(0,0067011990999991950051507004...9999999N9+00001+99999999999...)
(106,0043011990999991950051512004...9999999N9+00221+99999999999...)
(212,0043011990999991950051518004...9999999N9-00111+99999999999...)
(318,0043012650999991949032412004...0500001N9+01111+99999999999...)
(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)
The keys are the line offsets within the file, which we ignore in our map function. The map
function merely extracts the year and the air temperature (indicated in bold text), and emits them
as its output (the temperature values have been interpreted as integers): (1950, 0) (1950, 22)
(1950, −11) (1949, 111) (1949, 78) The output from the map function is processed by the
MapReduce framework before being sent to the reduce function. This processing sorts and
groups the key-value pairs by key. So, continuing the example, our reduce function sees the
following input: (1949, [111, 78]) (1950, [0, 22, −11]) Each year appears with a list of all its air
temperature readings. All the reduce function has to do now is iterating through the list and pick
up the maximum reading:
(1949, 111) (1950, 22) This is the final output: the maximum global temperature recorded in each year.
Java MapReduce:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable>
{
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature = Integer.parseInt(line.substring(87, 92));
context.write(new Text(year), new IntWritable(airTemperature));
}
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable>
{
@Override
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException
{
HADOOP STREAMING
Hadoop streaming is a utility that comes with the Hadoop distribution. The utility allows you to
create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer.
For example:
When an executable is specified for mappers, each mapper task will launch the executable as a
separate process when the mapper is initialized. As the mapper task runs, it converts its inputs
into lines and feed the lines to the stdin of the process. In the meantime, the mapper collects the
line oriented outputs from the stdout of the process and converts each line into a key/value pair,
which is collected as the output of the mapper. By default, the prefix of a line up to the first tab
character is the key and the rest of the line (excluding the tab character) will be the value. If
there is no tab character in the line, then entire line is considered as key and the value is null.
However, this can be customized, as discussed later.
When an executable is specified for reducers, each reducer task will launch the executable as a
separate process then the reducer is initialized. As the reducer task runs, it converts its input
key/values pairs into lines and feeds the lines to the stdin of the process. In the meantime, the
reducer collects the line oriented outputs from the stdout of the process, converts each line into a
key/value pair, which is collected as the output of the reducer. By default, the prefix of a line up
to the first tab character is the key and the rest of the line (excluding the tab character) is the
value. However, this can be customized, as discussed later.
This is the basis for the communication protocol between the Map/Reduce framework and the
streaming mapper/reducer.
You can supply a Java class as the mapper and/or the reducer. The above example is equivalent
to:
Note: Be sure to place the generic options before the streaming options, otherwise the command
will fail. For an example, see Making Archives Available to Tasks.
-file filename Optional Make the mapper, reducer, or combiner executable available locally on the compute nodes
-inputformat Optional Class you supply should return key/value pairs of Text class. If not specified, Text Input
JavaClassName Format is used as the default
-outputformat Optional Class you supply should take key/value pairs of Text class. If not specified, Text Output
JavaClassName format is used as the default
-lazyOutput Optional Create output lazily. For example, if the output format is based on FileOutputFormat, the
output file is created only on the first call to output.collect (or Context.write)
HADOOP PIPES
DESIGN OF HDFS
HDFS is a filesystem designed for storing very large files with streaming data access patterns,
running on clusters of commodity hardware.
Very large files: “Very large” in this context means files that are hundreds of megabytes,
gigabytes, or terabytes in size. There are Hadoop clusters running today that store petabytes of
data.
Streaming data access : HDFS is built around the idea that the most efficient data processing
pattern is a write-once, readmany-times pattern. A dataset is typically generated or copied from
source and then various analyses are performed on that dataset over time.
Commodity hardware : Hadoop doesn’t require expensive, highly reliable hardware to run on.
It’s designed to run on clusters of commodity hardware (commonly available hardware available
from multiple vendors3) for which the chance of node failure across the cluster is high, at least
for large clusters. HDFS is designed to carry on working without a noticeable interruption to the
user in the face of such failure.
Low-latency data access : Applications that require low-latency access to data, in the tens of
milliseconds range, will not work well with HDFS.
Lots of small files : Since the namenode holds filesystem metadata in memory, the limit to the
number of files in a filesystem is governed by the amount of memory on the namenode.
Multiple writers, arbitrary file modifications: Files in HDFS may be written to by a single
writer. Writes are always made at the end of the file. There is no support for multiple writers, or
for modifications at arbitrary offsets in the file.
HDFS CONCEPTS
Blocks:
HDFS has the concept of a block, but it is a much larger unit—64 MB by default. Files in HDFS
are broken into block-sized chunks, which are stored as independent units.
Second:
Making the unit of abstraction a block rather than a file simplifies the storage subsystem. The
storage subsystem deals with blocks, simplifying storage management (since blocks are a fixed
size, it is easy to calculate how many can be stored on a given disk) and eliminating metadata
concerns.
Third:
Blocks fit well with replication for providing fault tolerance and availability. To insure against
Corrupted blocks and disk and machine failure, each block is replicated to a small number of
Physically separate machines (typically three).
HDFS blocks are large compared to disk blocks, and the reason is to minimize the cost of seeks.
By making a block large enough, the time to transfer the data from the disk can be made to be
significantly larger than the time to seek to the start of the block. Thus the time to transfer a large
file made of multiple blocks operates at the disk transfer rate.
A quick calculation shows that if the seek time is around 10 ms, and the transfer rate is 100
MB/s, then to make the seek time 1% of the transfer time, we need to make the block size around
100 MB. The default is actually 64 MB, although many HDFS installations use 128 MB blocks.
This figure will continue to be revised upward as transfer speeds grow with new generations of
disk drives.
Namenodes and Datanodes:
An HDFS cluster has two types of node operating in a master-worker pattern: a namenode (the
master) and a number of datanodes (workers). The namenode manages the filesystem name
space. It maintains the filesystem tree and the metadata for all the files and directories in the tree.
This information is stored persistently on the local disk in the form of two files: the namespace
image and the edit log. The namenode also knows the datanodes on which all the blocks for a
given file are located.
JAVA INTERFACE
In this section, we dig into the Hadoop FileSystem class: the API for interacting with one of
Hadoop’s filesystems.
DATA FLOW
A basic data flow of the Hadoop system can be divided into four phases:
1. Capture Big Data : The sources can be extensive lists that are structured, semi-
structured, and unstructured, some streaming, real-time data sources, sensors, devices,
machine-captured data, and many other sources. For data capturing and storage, we have
different data integrators such as, Flume, Sqoop, Storm, and so on in the Hadoop
ecosystem, depending on the type of data.
2. Process and Structure: We will be cleansing, filtering, and transforming the data by
using a MapReduce-based framework or some other frameworks which can perform
distributed programming in the Hadoop ecosystem. The frameworks available currently
are MapReduce, Hive, Pig, Spark and so on.
3. Distribute Results: The processed data can be used by the BI and analytics system or the
big data analytics system for performing analysis or visualization.
4. Feedback and Retain: The data analyzed can be fed back to Hadoop and used for
improvements.
HADOOP I/O
Unlike any I/O subsystem, Hadoop also comes with a set of primitives. These primitive
considerations, although generic in nature, go with the Hadoop IO system as well with some
special connotation to it, of course. Hadoop deals with multi-terabytes of datasets; a special
consideration on these primitives will give an idea how Hadoop handles data input and output.
This article quickly skims over these primitives to give a perspective on the Hadoop input output
system
DATA INTEGRIT
Data integrity means that data should remain accurate and consistent all across its storing,
processing, and retrieval operations. To ensure that no data is lost or corrupted during persistence
and processing, Hadoop maintains stringent data integrity constraints. Every read/write operation
occurs in disks, more so through the network is prone to errors. And, the volume of data that
Hadoop handles only aggravates the situation.
The usual way to detect corrupt data is through checksums. A checksum is computed when data
first enters into the system and is sent across the channel during the retrieval process. The
retrieving end computes the checksum again and matches with the received ones. If it matches
exactly then the data deemed to be error free else it contains error. But the problem is – what if
the checksum sent itself is corrupt? This is highly unlikely because it is a small data, but not an
undeniable possibility. Using the right kind of hardware such as ECC memory can be used to
alleviate the situation.
Hadoop takes it further and creates a distinct checksum for every 512 (default) bytes of data.
Because CRC-32 is 4 bytes only, the storage overhead is not an issue. All data that enters into the
system is verified by the datanodes before being forwarded for storage or further processing.
Data sent to the datanode pipeline is verified through checksums and any corruption found is
immediately notified to the client with ChecksumException. The client read from the datanode
also goes through the same drill. The datanodes maintain a log of checksum verification to keep
track of the verified block. The log is updated by the datanode upon receiving a block
verification success signal from the client. This type of statistics helps in keeping the bad disks at
bay.
Apart from this, a periodic verification on the block store is made with the help
of DataBlockScanner running along with the datanode thread in the background. This protects
data from corruption in the physical storage media.
Hadoop maintains a copy or replicas of data. This is specifically used to recover data from
massive corruption. Once the client detects an error while reading a block, it immediately reports
to the datanode about the bad block from the namenode before throwing ChecksumException.
The namenode then marks it as a bad block and schedules any further reference to the block to its
replicas. In this way, the replica is maintained with other replicas and the marked bad block is
removed from the system.
For every file created in the Hadoop LocalFileSystem, a hidden file with the same name in the
same directory with the extension. <filename>.crc is created. This file maintains the checksum
of each chunk of data (512 bytes) in the file. The maintenance of metadata helps in detecting
read error before throwing ChecksumException by the LocalFileSystem.
COMPRESSION
Keeping in mind the volume of data Hadoop deals with, compression is not a luxury but a
requirement. There are many obvious benefits of file compression rightly used by Hadoop. It
economizes storage requirements and is a must-have capability to speed up data transmission
over the network and disks. There are many tools, techniques, and algorithms commonly used by
Hadoop. Many of them are quite popular and have been used in file compression over the ages.
For example, gzip, bzip2, LZO, zip, and so forth are often used.
SERIALIZATION
Serialization is the process of translating data structures or objects state into binary or textual
form to transport the data over network or to store on some persisten storage. Once the data is
transported over network or retrieved from the persistent storage, it needs to be deserialized
again. Serialization is termed as marshalling and deserialization is termed as unmarshalling.
Serialization in Hadoop
Generally in distributed systems like Hadoop, the concept of serialization is used
for Interprocess Communication and Persistent Storage.
Interprocess Communication
There are a couple of high-level containers that elaborate the specialized data structure in
Hadoop to hold special types of data. For example, to maintain a binary log,
the SequenceFile container provides the data structure to persist binary key-value pairs. We then
can use the key, such as a timestamp represented by LongWritable and value by Writable, which
refers to logged quantity.
I/O Compression:
In the Hadoop framework, where large data sets are stored and processed, you will
need storage for large files.
These files are divided into blocks and those blocks are stored in different nodes across
the cluster so lots of I/O and network data transfer is also involved.
In order to reduce the storage requirements and to reduce the time spent in-network
transfer, you can have a look at data compression in the Hadoop framework.
Using data compression in Hadoop you can compress files at various steps, at all of these
steps it will help to reduce storage and quantity of data transferred.
You can compress the input file itself.
That will help you reduce storage space in HDFS.
You can also configure that the output of a MapReduce job is compressed in Hadoop.
That helps is reducing storage space if you are archiving output or sending it to some
other application for further processing.
AVRO
Apache Avro is a language-neutral data serialization system. It was developed by Doug Cutting,
the father of Hadoop. Since Hadoop writable classes lack language portability, Avro becomes
quite helpful, as it deals with data formats that can be processed by multiple languages. Avro is a
preferred tool to serialize data in Hadoop.
Avro has a schema-based system. A language-independent schema is associated with its read and
write operations. Avro serializes the data which has a built-in schema. Avro serializes the data
into a compact binary format, which can be deserialized by any application.
Avro uses JSON format to declare the data structures. Presently, it supports languages
such as Java, C, C++, C#, Python, and Ruby.
Avro Schemas
Avro depends heavily on its schema. It allows every data to be written with no prior knowledge
of the schema. It serializes fast and the resulting serialized data is lesser in size. Schema is stored
along with the Avro data in a file for any further processing.
In RPC, the client and the server exchange schemas during the connection. This exchange helps
in the communication between same named fields, missing fields, extra fields, etc.
Avro schemas are defined with JSON that simplifies its implementation in languages with JSON
libraries.
Like Avro, there are other serialization mechanisms in Hadoop such as Sequence Files, Protocol
Buffers, and Thrift.
Thrift and Protocol Buffers are the most competent libraries with Avro. Avro differs from these
frameworks in the following ways −
Avro supports both dynamic and static types as per the requirement. Protocol Buffers and
Thrift use Interface Definition Languages (IDLs) to specify schemas and their types.
These IDLs are used to generate code for serialization and deserialization.
Avro is built in the Hadoop ecosystem. Thrift and Protocol Buffers are not built in
Hadoop ecosystem.
Unlike Thrift and Protocol Buffer, Avro's schema definition is in JSON and not in any
proprietary IDL.
Features of Avro
Listed below are some of the prominent features of Avro −
Let us see the working of the avro file format step by step,
In the general working of the avro, we have to generate the schema and that schema has
been outlined as per the data.
It can also arrange the data by using the serialization API which has been given by avro,
and that will be established in the ‘org.apache.avro.specific’ package.
The data has been reconstructed by using API for it and that has been established in the
‘org.apache.avro.specific’ package.
The file format can work in the landing zone, in which the complete data from the zone
has been read in our program, and that can be used for further processing, one by creating
a class as per the schema in which the schema has been compiled.
It can create a class as per the schema, and second, with the help of Parsers library, it
means straightly we can able to read the schema with the help of parsers library.
The related systems can simply able to gain table schema from the avro files which do
not have to reserve schema separately.
As per the evolution of schema the change can be handled by the source.
{
"namespace": "fileformat",
"type": "record",
"name": "student",
"fields": [
{
"name": "sid",
"doc": "student id. may be it is stid, esa, yga.",
"type": [
{
"name": "stid",
"type": "long",
"doc": "College stu id."
}
Command Avro file format
The ‘sqoop’ command has been used to preserve the data in avro file format in which apache can
assist the avro data files, the ‘sqoop’ command has some parameters that we have to add, that
are,
as–avrodatafile – which can be used for importing data to the avro data files.
compression—codec – which has been used by Hadoop codec.
The ‘sqoop’ command has its template and we have to import this command we have
import bindir and we have to make the connection for it with the help of a template.
We also have to import the driver up to class manually for specifying the connection
manager class.
If we want to delete the target directory then we can do it by using AWS CLI.
The ‘sqoop’ command can be used transmit data either from Hadoop or AWS, for
querying the data we have to generate the tables on the head of the physical files.
If the data can transmit through Hadoop, then we have to generate the Hive tables and if
the data has been transmitted through AWS then we have to either generate Hive table or
tables in Amazon.
Hadoop Integration
Big Data Management can connect to clusters that run different Hadoop distributions. Hadoop is
an open-source software framework that enables distributed processing of large data sets across
clusters of machines. You might also need to use third-party software clients to set up and
manage your Hadoop cluster.
Big Data Management can connect to the supported data source in the Hadoop environment,
such as HDFS, HBase, or Hive, and push job processing to the Hadoop cluster. To enable high
performance access to files across the cluster, you can connect to an HDFS source. You can also
connect to a Hive source, which is a data warehouse that connects to HDFS.
It can also connect to NoSQL databases such as HBase, which is a database comprising key-
value pairs on Hadoop that performs operations in real-time. The Data Integration Service can
push mapping jobs to the Spark or Blaze engine, and it can push profile jobs to the Blaze engine
in the Hadoop environment.
Big Data Management supports more than one version of some Hadoop distributions. By default,
the cluster configuration wizard populates the latest supported version.