I want to read big files in Hadoop, block by block (not line by line), where each block is of size nearly 5 MB. For that I have written a custom recordreader
. But it gives me a error Premature EOF from inputStream
, which is caused by the nextKeyValue()
, readfully()
, while reading.
This is my code:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WholeFileRecordReader extends RecordReader<Text, apriori> {
public Text key = new Text("");
public apriori value = new apriori();
public Configuration job;
public FileSplit filesplit;
public FSDataInputStream in;
public Boolean processed = false;
public int len = 5000000;
public long filepointer = 0;
public int mapperFlag = 0;
public WholeFileRecordReader(FileSplit arg0, TaskAttemptContext arg1) {
this.filesplit = arg0;
this.job=arg1.getConfiguration();
}
@Override
public void close() throws IOException {
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public apriori getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return processed ? 1.0f : 0.0f;
}
@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
this.job = arg1.getConfiguration();
this.filesplit = (FileSplit)arg0;
final Path file = filesplit.getPath();
FileSystem fs = file.getFileSystem(job);
in = fs.open(file);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if ((!processed)&&(filesplit.getLength()>filepointer)) {
byte[] contents = new byte[ len];
Path file = filesplit.getPath();
key.set(file.getName());
in.seek(filepointer);
try {
IOUtils.readFully(in, contents, 0, len);
value.set(contents, 0, len);
} finally {
// IOUtils.closeStream(in);
}
filepointer = filepointer + len;
processed = false;
return true;
}
else if((!processed)&&(filesplit.getLength()<filepointer))
{
Path file = filesplit.getPath();
key.set(file.getName());
int last = (int)(filesplit.getLength()-(filepointer-len));
byte[] contents = new byte[last];
in.seek(filepointer-len);
try {
IOUtils.readFully(in, contents, 0, last);
mapperFlag =1;
value.set(contents, 0, last,mapperFlag);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
}