在“MapReduce--input之输入原理”中说到实现定义输入的方法,其实就是继承InputFormat以及 RecordReader实现其中的方法。下面例子讲解操作。

网站建设哪家好,找成都创新互联公司!专注于网页设计、网站建设、微信开发、小程序设计、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了新邵免费建站欢迎大家使用!
将多个文件合并成一个大文件(有点类似于combineInputFormat),并输出。大文件中包括小文件所在的路径,以及小文件的内容。
inputFormat
public class SFileInputFormat extends FileInputFormat {
    /**
     * 是否切片
     * @param context
     * @param filename
     * @return
     */
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }
    /**
     * 返回读取文件内容的读取器
     * @param inputSplit
     * @param taskAttemptContext
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        SRecordReader sRecordReader = new SRecordReader();
        sRecordReader.initialize(inputSplit, taskAttemptContext);
        return sRecordReader;
    }
}  RecordReader:
public class SRecordReader extends RecordReader {
    private Configuration conf;
    private FileSplit split;
    //当前分片是否已读取的标志位
    private boolean process = false;
    private BytesWritable value = new BytesWritable();
    /**
     * 初始化
     * @param inputSplit
     * @param taskAttemptContext
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        split = (FileSplit)inputSplit;
        conf = taskAttemptContext.getConfiguration();
    }
    /**
     * 从分片中读取下一个KV
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!process) {
            byte[] buffer = new byte[(int) split.getLength()];
            //获取文件系统
            Path path = split.getPath();
            FileSystem fs = path.getFileSystem(conf);
            //创建输入流
            FSDataInputStream fis = fs.open(path);
            //流对接,将数据读取缓冲区
            IOUtils.readFully(fis, buffer, 0, buffer.length);
            //将数据装载入value
            value.set(buffer, 0, buffer.length);
            //关闭流
            IOUtils.closeStream(fis);
            //读完就标志位设置为true,表示已读
            process = true;
            return true;
        }
        return false;
    }
    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return this.value;
    }
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return process? 1 : 0;
    }
    @Override
    public void close() throws IOException {
    }
} mapper:
public class SFileMapper extends Mapper {
    Text k = new Text();
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        FileSplit inputSplit = (FileSplit)context.getInputSplit();
        String name = inputSplit.getPath().toString();
        k.set(name);
    }
    @Override
    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
        context.write(k, value);
    }   
} reducer:
public class SFileReducer extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        context.write(key, values.iterator().next());
    }
}  driver:
public class SFileDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        args = new String[]{"G:\\test\\date\\A\\order\\", "G:\\test\\date\\A\\order2\\"};
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(SFileDriver.class);
        job.setMapperClass(SFileMapper.class);
        job.setReducerClass(SFileReducer.class);
        //设置输入和输出类,默认是 TextInputFormat
        job.setInputFormatClass(SFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
    }
}自定义的inputformat需要在job中通过 job.setInputFormatClass() 来指定