Table Of Contents

Previous topic


Next topic

Writing a Custom InputFormat

Get Pydoop


Pydoop is developed by: CRS4

Using the Hadoop SequenceFile Format

Although many MapReduce applications deal with text files, there are many cases where processing binary data is required. In this case, you basically have two options:

  1. write appropriate RecordReader / RecordWriter classes for the binary format you need to process
  2. convert your data to Hadoop’s standard SequenceFile format.

To write sequence files with Pydoop, set the ouput format and the compression type as follows:

[MapReduce V1]
pydoop submit \
--output-format=org.apache.hadoop.mapred.SequenceFileOutputFormat \
-D mapred.output.compression.type=NONE|RECORD|BLOCK [...]

[MapReduce V2]
pydoop submit \
--output-format=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat \
-D mapreduce.output.fileoutputformat.compress.type=NONE|RECORD|BLOCK [...]

To read sequence files, set the input format as follows:

[MapReduce V1]
pydoop submit \

[MapReduce V2]
pydoop submit \

Example Application: Filter Wordcount Results

SequenceFile is mostly useful to handle complex objects like C-style structs or images. To keep our example as simple as possible, we considered a situation where a MapReduce task needs to emit the raw bytes of an integer value.

We wrote a trivial application that reads input from a previous word count run and filters out words whose count falls below a configurable threshold. Of course, the filter could have been directly applied to the wordcount reducer: the job has been artificially split into two runs to give a SequenceFile read / write example.

Suppose you know in advance that most counts will be large, but not so large that they cannot fit in a 32-bit integer: since the decimal representation could require as much as 10 bytes, you decide to save space by having the wordcount reducer emit the raw four bytes of the integer instead:

class WordCountReducer(Reducer):

    def reduce(self, context):
        s = sum(context.values)
        context.emit(context.key, struct.pack(">i", s))

Since newline characters can appear in the serialized values, you cannot use the standard text format where each line contains a tab-separated key-value pair. The problem can be solved by using SequenceFileOutputFormat for wordcount and SequenceFileInputFormat for the filtering application.

The full source code for the example is available under examples/sequence_file.