Mapreduce Lab
Mapreduce Lab
Mapreduce Lab
Hadoop-MapReduce Lab
Map |Reduce |Driver
Sriram Balasubramanian
2016
Problem statement
Lets understand the problem through a sample text file content:
Hello everyone this is a sample dataset. You need to print the word count of particular words in this dataset.
Your MapReduce program should process this text file and should provide output as follows:
Output
package com.mr;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
3|Map Reduce Lab Page
Import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Input file:
inputword.txt
hello welcome
welcome to big data
data is good
Debugging
Ctrl+Shift+O
Org.apache.hadoop.io.Text
Org.apache.hadoop.mapreduce.Job
Org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
Org.apache.hadoop.mapreduce.lib.input.FileInputFormat
Org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
Org.apache.hadoop.mapreduce.lib.input.TextInputFormat
Apply your MapReduce programming knowledge and write a MapReduce program to process two text files. You need to
calculate the size of each word and count the number of words of that size in the text file.
The dataset for this problem is the text file alphabets available in your LMS.
Problem statement
Lets understand the problem through a sample text file content:
Hello everyone this is a sample dataset. Calculate the word size and count the number of words of that size in
this text file.
Your MapReduce program should process this text file and should provide output as follows:
Sample Output
Solution
package com.mr;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
//Mapper
/**
* @method map
* <p>This method takes the input as text data type and splits the input into words.
* Now the length of each word in the input is determined and key value pair is made.
* This key value pair is passed to reducer.
* @method_arguments key, value, output, reporter
* @return void
*/
/*
* (non-Javadoc)
* @see org.apache.hadoop.mapred.Mapper#map(java.lang.Object, java.lang.Object,
org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
*/
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//Converting the record (single line) to String and storing it in a String variable line
String line = value.toString();
//iterating through all the words available in that line and forming the key value pair
while (tokenizer.hasMoreTokens()) {
/**
* @method reduce
* <p>This method takes the input as key and list of values pair from mapper, it does aggregation
* based on keys and produces the final output.
* @method_arguments key, values, output, reporter
* @return void
*/
/*
* (non-Javadoc)
* @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator,
org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
*/
@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
/*
* Iterates through all the values available with a key and add them together and give the final
* result as the key and sum of its values.
*/
for(Text x : values)
{
sum++;
}
//Driver
/**
* @method main
* <p>This method is used for setting all the configuration properties.
* It acts as a driver for map reduce code.
* @return void
* @method_arguments args
* @throws Exception
*/
//reads the default configuration of cluster from the configuration xml files
job.setJarByClass(WordSizeWordCount.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
//Defining the output key class for the final output i.e. from reducer
job.setOutputKeyClass(IntWritable.class);
//Defining the output value class for the final output i.e. from reduce
job.setOutputValueClass(IntWritable.class);
//Defining input Format class which is responsible to parse the dataset into a key value pair
job.setInputFormatClass(TextInputFormat.class);
//Defining output Format class which is responsible to parse the final key-value output from MR framework to
a text file into the hard disk
job.setOutputFormatClass(TextOutputFormat.class);
//Configuring the input/output path from the filesystem into the job
//deleting the output path automatically from hdfs so that we don't have delete it explicitly
outputPath.getFileSystem(conf).delete(outputPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
10 | M a p R e d u c e L a b P a g e
Assignment 3 - WeatherData Program
Apply your MapReduce programming knowledge and write a MapReduce program to process a dataset with temperature
records. You need to find the Hot and Cold days in a year based on the maximum and minimum temperatures on those days.
The dataset for this problem is the WeatherData records file available in your LMS. This dataset has been taken from National
Climatic Data Center (NCDC) public datasets. You can download more datasets from this FTP site and review the README file to
understand the available datasets.
Problem statement
Lets understand the problem through a subset of records in the dataset as shown in the following figure:
FIGURE shows WEATHER RECORDS
Your task is to find out the dates with maximum temperature greater than 40 (A Hot Day) and minimum temperature lower
than 10 (A Cold Day). Here is the sample output:
FIGURE shows SAMPLE OUTPUT
11 | M a p R e d u c e L a b P a g e
Solution
package com.mr;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public static class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable arg0, Text Value,
OutputCollector<Text, Text> output, Reporter arg3)
throws IOException {
// Example of Input
// Date Max Min
// 25380 20130101 2.514 -135.69 58.43 8.3 1.1 4.7 4.9 5.6 0.01 C 1.0 -0.1 0.4 97.3 36.0 69.4
-99.000 -99.000 -99.000 -99.000 -99.000 -9999.0 -9999.0 -9999.0 -9999.0 -9999.0
12 | M a p R e d u c e L a b P a g e
String date = line.substring(6, 14);
public static class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text Key, Iterator<Text> Values,OutputCollector<Text, Text> output, Reporter arg3) throws IOException {
// Note:- As Mapper's output types are not default so we have to define the following properties.
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(Text.class);
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
JobClient.runJob(conf);
13 | M a p R e d u c e L a b P a g e
Step 4. Export JAR file creation:
Right click src->Export->Java->JAR File->click Next button
14 | M a p R e d u c e L a b P a g e
Assignment 4 - Patent Program
Apply your MapReduce programming knowledge and write a MapReduce program to process a dataset with
patent records. You need to calculate the number of sub-patents associated with each patent.
The dataset for this problem is the patent records file available in your Lab.
Problem statement
Lets understand the problem through a subset of patent records as shown in the following figure:
Each patent has sub-patent ids associated with it. You need to calculate the number of sub-patent associated with
each patent. Here is the sample output:
Sample Output
Patent Number of Associated Sub-patents
1 13
2 10
3 4
Your task in this assignment is to process the patent records using MapReduce program and count the number of
associated sub-patents for each patent is in this dataset.
15 | M a p R e d u c e L a b P a g e
Step 3. Type the following MapReduce Program Patent
package com.mr;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
//Mapper
/*
*This method takes the input as text data type and tokenizes input
* by taking whitespace as delimiter. Now key value pair is made and this key
* value pair is passed to reducer.
* @method_arguments key, value, output, reporter
* @return void
*/
@Override
16 | M a p R e d u c e L a b P a g e
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//Converting the record (single line) to String and storing it in a String variable line
String line = value.toString();
//Iterating through all the tokens and forming the key value pair
while (tokenizer.hasMoreTokens()) {
/*
* The first token is going in jiten, second token in jiten1, third token in jiten,
* fourth token in jiten1 and so on.
*/
/*Reducer
*
* Reduce class is static and extends MapReduceBase and implements Reducer
* interface having four hadoop generics type Text, Text, Text, IntWritable.
*/
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
/*
* Iterates through all the values available with a key and add them together
* and give the final result as the key and sum of its values
*/
for(Text x : values)
{
sum++;
}
17 | M a p R e d u c e L a b P a g e
}
/*Driver
*
* This method is used for setting all the configuration properties.
* It acts as a driver for map reduce code.
* @return void
* @method_arguments args
* @throws Exception
*/
//reads the default configuration of cluster from the configuration xml files
job.setJarByClass(Patent.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//Explicitly setting the out key/value type from the mapper if it is not same as that of reducer
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//Defining the output key class for the final output i.e. from reducer
job.setOutputKeyClass(Text.class);
//Defining the output value class for the final output i.e. from reducer
job.setOutputValueClass(IntWritable.class);
//Defining the output key class for the final output i.e. from reducer
job.setOutputKeyClass(Text.class);
//Defining the output value class for the final output i.e. from reducer
job.setOutputValueClass(Text.class);
//Defining input Format class which is responsible to parse the dataset into a key value pair
job.setInputFormatClass(TextInputFormat.class);
18 | M a p R e d u c e L a b P a g e
//Defining output Format class which is responsible to parse the final key-value output from MR framework to a text file into the
hard disk
job.setOutputFormatClass(TextOutputFormat.class);
//Configuring the input/output path from the filesystem into the job
//deleting the output path automatically from hdfs so that we don't have delete it explicitly
outputPath.getFileSystem(conf).delete(outputPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
Step 4. Export JAR file creation:
Right click src->Export->Java->JAR File->click Next button
19 | M a p R e d u c e L a b P a g e
Assignment 5 - MaxTemp Program
Apply your MapReduce programming knowledge and write a MapReduce program to process a dataset with multiple
temperatures for a year. You need to process the dataset to find out the maximum temperature for each year in the dataset.
The dataset for this problem is the text file Temperature available in your Lab.
Problem statement
Lets understand the problem through a subset of temperature records as shown in the following figure:
In this data set, the first field represents the year and the second field represents the temperature in that year. As the
temperature will not be constant throughout the year, each year has multiple temperatures listed in the dataset. You need
to process the dataset and find the maximum temperature during a year. Here is the sample
Solution
20 | M a p R e d u c e L a b P a g e
Step 3. Type the following MapReduce Program MaxTemp
package com.mr;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* @method map
* <p>This method takes the input as text data type and tokenizes input
* by taking whitespace as delimiter. The first token goes year and second token is temperature,
* this is repeated till last token. Now key value pair is made and passed to reducer.
* @method_arguments key, value, output, reporter
* @return void
*/
/*
* (non-Javadoc)
* @see org.apache.hadoop.mapred.Mapper#map(java.lang.Object, java.lang.Object,
org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
*/
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//Converting the record (single line) to String and storing it in a String variable line
String line = value.toString();
21 | M a p R e d u c e L a b P a g e
//StringTokenizer is breaking the record (line) according to the delimiter whitespace
StringTokenizer tokenizer = new StringTokenizer(line," ");
//Iterating through all the tokens and forming the key value pair
while (tokenizer.hasMoreTokens()) {
//Takes next token and removes all the whitespaces around it and then stores it in the string variable called temp
String temp= tokenizer.nextToken().trim();
//Reducer
/**
* @author sriram!
* @interface Reducer
* <p>Reduce class is static and extends MapReduceBase and implements Reducer
* interface having four hadoop generics type Text, IntWritable, Text, IntWritable.
*/
/**
* @method reduce
* <p>This method takes the input as key and list of values pair from mapper, it does aggregation
* based on keys and produces the final output.
* @method_arguments key, values, output, reporter
* @return void
*/
/*
* (non-Javadoc)
* @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator,
org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
*/
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
/*
* Iterates through all the values available with a key and if the integer variable temperature
* is greater than maxtemp, then it becomes the maxtemp
*/
//Defining a local variable temperature of type int which is taking all the temperature
22 | M a p R e d u c e L a b P a g e
int temperature= it.get();
if(maxtemp<temperature)
{
maxtemp =temperature;
}
}
//Finally the output is collected as the year and maximum temperature corresponding to that year
context.write(key, new IntWritable(maxtemp));
}
//Driver
/**
* @method main
* <p>This method is used for setting all the configuration properties.
* It acts as a driver for map reduce code.
* @return void
* @method_arguments args
* @throws Exception
*/
//reads the default configuration of cluster from the configuration xml files
job.setJarByClass(MaxTemp.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//Defining the output key class for the final output i.e. from reducer
job.setOutputKeyClass(Text.class);
//Defining the output value class for the final output i.e. from reducer
job.setOutputValueClass(IntWritable.class);
//Defining input Format class which is responsible to parse the dataset into a key value pair
job.setInputFormatClass(TextInputFormat.class);
23 | M a p R e d u c e L a b P a g e
//Defining output Format class which is responsible to parse the final key-value output from MR framework to a text file into the
hard disk
job.setOutputFormatClass(TextOutputFormat.class);
//Configuring the input/output path from the filesystem into the job
//deleting the output path automatically from hdfs so that we don't have delete it explicitly
outputPath.getFileSystem(conf).delete(outputPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
24 | M a p R e d u c e L a b P a g e
Assignment 6 - AverageSalary Program
Problem statement
Solution
package com.mr;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
String values[]=value.toString().split("\t");
dept_id.set(values[0]);
salary.set(Float.parseFloat(values[2]));
context.write(dept_id,salary);
}
}
25 | M a p R e d u c e L a b P a g e
public static class avgReducer extends Reducer<Text,FloatWritable,Text,FloatWritable>{
result.set(sum/count);
context.write(key,result);
}
}
FileInputFormat.addInputPath(job,p);
FileOutputFormat.setOutputPath(job,p1);
job.waitForCompletion(true);
}
}
Step 4. Export JAR file creation:
Right click src->Export->Java->JAR File->click Next button
26 | M a p R e d u c e L a b P a g e
Assignment 7 - De Identify HealthCare Program
Problem statement
Populate the healthcare dataset with the following fields
PatientID, Name, DOB, Phone Number, Email_Address, SSN, Gender, Disease, weight
Phone
PatientID Name DOB Number Email_Address SSN Gender Disease weight
11111 bbb1 12/10/1950 1.23E+09 [email protected] 1.11E+09 M Diabetes 78
11112 bbb2 12/10/1984 1.23E+09 [email protected] 1.11E+09 F PCOS 67
11113 bbb3 712/11/1940 1.23E+09 [email protected] 1.11E+09 M Fever 90
11114 bbb4 12/12/1950 1.23E+09 [email protected] 1.11E+09 F Cold 88
Blood
11115 bbb5 12/13/1960 1.23E+09 [email protected] 1.11E+09 M Pressure 76
11116 bbb6 12/14/1970 1.23E+09 [email protected] 1.11E+09 F Malaria 84
Solution
Step 3. Type the following MapReduce Program DeIdentifyData (Program Works from JDK 1.8)
package com.mr;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.StringTokenizer;
import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
27 | M a p R e d u c e L a b P a g e
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
int counter=1;
while (itr.hasMoreTokens()) {
String token=itr.nextToken();
System.out.println("token"+token);
System.out.println("i="+counter);
if(list.contains(counter))
{
if(newStr.length()>0)
newStr+=",";
newStr+=encrypt(token, key1);
}
else
{
if(newStr.length()>0)
newStr+=",";
newStr+=token;
}
counter=counter+1;
}
context.write(NullWritable.get(), new Text(newStr.toString()));
}
}
28 | M a p R e d u c e L a b P a g e
Job = Job.getInstance(new Configuration());
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setJarByClass(DeIdentifyData.class);
job.waitForCompletion(true);
return encryptedString.trim();
//return decrypted;
}
catch (Exception e)
{
logger.error("Error while encrypting", e);
}
return null;
29 | M a p R e d u c e L a b P a g e
Assignment 8 - Music Track Program
Problem Statement
XYZ.com is an online music website where users listen to various tracks, the data gets collected like shown below. Write a map
reduce program to get following stats
The data is coming in log files and looks like as shown below.
UserId |TrackId|Shared|Radio|Skip
111115|222 |0 |1 |0
111113|225 |1 |0 |0
111117|223 |0 |1 |1
111115|225 |1 |0 |0
Solution
First we are going to solve the first problem that is finding out unique listeners per track.
First of all we need to understand the data, here the first column is UserId and the second one is Track Id. So we need to write a
mapper class which would emit trackId and userIds and intermediate key value pairs. so make it simple to remember the data
sequence, let's create a constants class as shown below
package com.mr;
public class LastFMConstants {
}
Now, lets create the mapper class which would emit intermediate key value pairs as (TrackId, UserId) as
shown below
Step 1. Class Creation
Right click com package->new class-> give class name as "UniqueListener and then
Click Finish button
30 | M a p R e d u c e L a b P a g e
Step 3. Type the following MapReduce Program UniqueListener
public static class UniqueListenersMapper extends Mapper< Object , Text, IntWritable, IntWritable > {
IntWritable trackId = new IntWritable();
IntWritable userId = new IntWritable();
public void map(Object key, Text value, Mapper< Object , Text, IntWritable, IntWritable > .Context context)
throws IOException, InterruptedException {
You would have also noticed that we are using a counter here named INVALID_RECORD_COUNT , to count if there are any
invalid records which are not coming the expected format. Remember, if we don't do this then in case of invalid records, our
program might fail.
Now let's write a Reducer class to aggregate the results. Here we simply can not use sum reducer as the records we are getting
are not unique and we have to count only unique users. Here is how the code would look like
public static class UniqueListenersReducer extends Reducer< IntWritable , IntWritable, IntWritable, IntWritable> {
Here we are using Set to eliminate duplicate userIds. Now we can take look at the Driver class
32 | M a p R e d u c e L a b P a g e
Assignment 9 - Telecom Call Data Record Program
Problem Statement
We are going to solve a very useful problem called Call Data Record (CDR) Analytics.
FromPhoneNumber|ToPhoneNumber|CallStartTime|CallEndTime|STDFlag
Now we have to write a map reduce code to find out all phone numbers who are making more than 60 mins of STD calls. Here if
STD Flag is 1 that means it was as STD Call. STD is call is call which is made outside of your state or long distance calls. By
identifying such subscribers, Telcom Company wants to offer them STD (Long Distance) Pack which would efficient for them
instead spending more money without that package. The data is coming in log files and looks like as shown below.
FromPhoneNumber|ToPhoneNumber|CallStartTime|CallEndTime|STDFlag
First of all we need to understand the data, depending upon the output we are expecting, we need to write a mapper class
which would emit FromPhoneNumber and Duration of STD Call intermediate key value pairs. To make it simple to remember the
data sequence, let's create a constants class as shown below
package com.mr;
public class CDRConstants {
Now, lets create the mapper class which would emit intermediate key value pairs as (FromPhoneNumber, Duration), here we
would also need to use our Java skills to calculate duration ( CallEndTime- CallStartTime). We are also making some
manipulations to get duration in minutes
33 | M a p R e d u c e L a b P a g e
Step 3. Type the following MapReduce Program CallDataRecord
public void map(Object key, Text value, Mapper< Object , Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
String[] parts = value.toString().split("[|]");
if (parts[CDRConstants.STDFlag].equalsIgnoreCase("1")) {
phoneNumber.set(parts[CDRConstants.fromPhoneNumber]);
String callEndTime = parts[CDRConstants.callEndTime];
String callStartTime = parts[CDRConstants.callStartTime];
long duration = toMillis(callEndTime) - toMillis(callStartTime);
durationInMinutes.set(duration / (1000 * 60));
context.write(phoneNumber, durationInMinutes);
}
}
} catch (ParseException e) {
e.printStackTrace();
}
return dateFrm.getTime();
}
}
You can also use counters in case you are not sure if the data you are receiving is correct or no like we did in previous tutorial.
Now that we have already done majority of things in Mapper Class itself, here a reduce would be a simple Sum Reducer. Here is
how the code would look like
34 | M a p R e d u c e L a b P a g e
public static class SumReducer extends Reducer< Text , LongWritable, Text, LongWritable> {
public void reduce(Text key, Iterable< LongWritable> values, Reducer< Text , LongWritable, Text, LongWritable>.Context
context)
throws IOException, InterruptedException {
long sum = 0;
for (LongWritable val : values) {
sum += val.get();
}
this.result.set(sum);
if (sum >= 60) {
context.write(key, this.result);
}
}
}
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(SumReducer.class);
job.setReducerClass(SumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
35 | M a p R e d u c e L a b P a g e