2

I want to read big files in Hadoop, block by block (not line by line), where each block is of size nearly 5 MB. For that I have written a custom recordreader. But it gives me a error Premature EOF from inputStream, which is caused by the nextKeyValue(), readfully(), while reading.

This is my code:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeFileRecordReader extends RecordReader<Text, apriori> {

public Text key = new Text("");
public apriori value = new apriori();
public Configuration job;
public FileSplit filesplit;
public FSDataInputStream in;
public Boolean processed = false;
public int len = 5000000;
public long filepointer = 0;
public int mapperFlag = 0;



public WholeFileRecordReader(FileSplit arg0, TaskAttemptContext arg1) {
    this.filesplit = arg0;
    this.job=arg1.getConfiguration();
}


@Override
public void close() throws IOException {

}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
}

@Override
public apriori getCurrentValue() throws IOException, InterruptedException {
        return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
    return processed ? 1.0f : 0.0f;
}

@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
        throws IOException, InterruptedException {
    this.job = arg1.getConfiguration();
    this.filesplit = (FileSplit)arg0;
    final Path file = filesplit.getPath();


    FileSystem fs = file.getFileSystem(job);
    in = fs.open(file);
    }

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if ((!processed)&&(filesplit.getLength()>filepointer)) {
        byte[] contents = new byte[ len];
        Path file = filesplit.getPath();
        key.set(file.getName());
        in.seek(filepointer);
        try {
            IOUtils.readFully(in, contents, 0, len);
            value.set(contents, 0, len);
                        } finally {
    //        IOUtils.closeStream(in);
        }
        filepointer = filepointer + len;
        processed = false;
        return true;
    }
    else if((!processed)&&(filesplit.getLength()<filepointer))
    {
        Path file = filesplit.getPath();
        key.set(file.getName());
        int last = (int)(filesplit.getLength()-(filepointer-len));
        byte[] contents = new byte[last];
        in.seek(filepointer-len);
        try {
            IOUtils.readFully(in, contents, 0, last);
            mapperFlag =1;
            value.set(contents, 0, last,mapperFlag);

        } finally {
            IOUtils.closeStream(in);
        }
        processed = true;
        return true;
    }

    return false;
 }
 }
1
  • 1
    have you found a root cause of that? I am facing similar issue when using AvroMultipleOutput but top error message: "Error could only be replicated to 0 nodes instead of minReplication (=1)"
    – jaksky
    Commented Aug 28, 2014 at 9:48

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Browse other questions tagged or ask your own question.