Pydoop Script User Guide

Pydoop Script is the easiest way to write simple MapReduce programs for Hadoop. With Pydoop Script, you only need to write a map and/or a reduce functions and the system will take care of the rest.

For a full explanation please see the tutorial.

Command Line Tool

In the simplest case, Pydoop Script is invoked as:

pydoop script MODULE INPUT OUTPUT

where MODULE is the file (on your local file system) containing your map and reduce functions, in Python, while INPUT and OUTPUT are, respectively, the HDFS paths of your input data and your job’s output directory.

Options are shown in the following table.

Short

Long

Meaning

--num-reducers

Number of reduce tasks. Specify 0 to only perform map phase

--no-override-home

Don’t set the script’s HOME directory to the $HOME in your environment. Hadoop will set it to the value of the ‘mapreduce.admin.user.home.dir’ property

--no-override-env

Use the default PATH, LD_LIBRARY_PATH and PYTHONPATH, instead of copying them from the submitting client node

--no-override-ld-path

Use the default LD_LIBRARY_PATH instead of copying it from the submitting client node

--no-override-pypath

Use the default PYTHONPATH instead of copying it from the submitting client node

--no-override-path

Use the default PATH instead of copying it from the submitting client node

--set-env

Set environment variables for the tasks. If a variable is set to ‘’, it will not be overridden by Pydoop.

-D

--job-conf

Set a Hadoop property, e.g., -D mapreduce.job.priority=high

--python-zip

Additional python zip file

--upload-file-to-cache

Upload and add this file to the distributed cache.

--upload-archive-to-cache

Upload and add this archive file to the distributed cache.

--log-level

Logging level

--job-name

name of the job

--python-program

python executable that should be used by the wrapper

--pretend

Do not actually submit a job, print the generated config settings and the command line that would be invoked

--hadoop-conf

Hadoop configuration file

--input-format

java classname of InputFormat

-m

--map-fn

name of map function within module

-r

--reduce-fn

name of reduce function within module

-c

--combine-fn

name of combine function within module

--combiner-fn

–combine-fn alias for backwards compatibility

-t

--kv-separator

output key-value separator

Example: Word Count with Stop Words

Here is the word count example modified to ignore stop words from a file that is distributed to all the nodes via the Hadoop distributed cache:

STOP_WORDS_FN = 'stop_words.txt'

try:
    with open(STOP_WORDS_FN) as f:
        STOP_WORDS = frozenset(l.strip() for l in f if not l.isspace())
except OSError:
    STOP_WORDS = frozenset()


def mapper(_, value, writer):
    for word in value.split():
        if word in STOP_WORDS:
            writer.count("STOP_WORDS", 1)
        else:
            writer.emit(word, 1)


def reducer(word, icounts, writer):
    writer.emit(word, sum(icounts))

To execute the above script, save it to a wc.py file and run:

pydoop script wc.py hdfs_input hdfs_output --upload-file-to-cache stop_words.txt

where stop_words.txt is a text file that contains the stop words, one per line.

While this script works, it has the obvious weakness of loading the stop words list even when executing the reducer (since it’s loaded as soon as we import the module). If this inconvenience is a concern, we could solve the issue by triggering the loading from the mapper function, or by writing a full Pydoop application which would give us all the control we need to only load the list when required.

Writing your Map and Reduce Functions

In this section we assume you’ll be using the default TextInputFormat and TextOutputFormat.

Mapper

The mapper function in your module will be called for each record in your input data. It receives 3 parameters:

  1. key: the byte offset with respect to the current input file. In most cases, you can ignore it;

  2. value: the line of text to be processed;

  3. writer object: a Python object to write output and count values (see below);

  4. optionally, a job conf object from which to fetch configuration property values (see Accessing Parameters below).

Combiner

The combiner function will be called for each unique key-value pair produced by your map function. It also receives 3 parameters:

  1. key: the key produced by your map function

  2. values iterable: iterate over this parameter to see all the values emitted for the current key

  3. writer object: a writer object identical to the one given to the map function

  4. optionally, a job conf object, identical to the one given to the map function.

The key-value pair emitted by your combiner will be piped to the reducer.

Reducer

The reducer function will be called for each unique key-value pair produced by your map function. It also receives 3 parameters:

  1. key: the key produced by your map function;

  2. values iterable: iterate over this parameter to traverse all the values emitted for the current key;

  3. writer object: this is identical to the one given to the map function;

  4. optionally, a job conf object, identical to the one given to the map function.

The key-value pair emitted by your reducer will be joined by the key-value separator specified with the --kv-separator option (a tab character by default).

Writer Object

The writer object given as the third parameter to both the mapper and reducer functions has the following methods:

  • emit(k, v): pass a (k, v) key-value pair to the framework;

  • count(what, how_many): add how_many to the counter named what. If the counter doesn’t already exist, it will be created dynamically;

  • status(msg): update the task status to msg;

  • progress(): mark your task as having made progress without changing the status message.

The latter two methods are useful for keeping your task alive in cases where the amount of computation to be done for a single record might exceed Hadoop’s timeout interval (Hadoop kills a task if it neither reads an input, writes an output, nor updates its status for a configurable amount of time, set to 10 minutes by default).

Accessing Parameters

Pydoop Script lets you access the values of your job configuration properties through a dict-like JobConf object, which gets passed as the fourth (optional) parameter to your functions.

Naming your Functions

If you’d like to give your map and reduce functions names different from mapper and reducer, you may do so, but you must tell the script tool. Use the --map-fn and --reduce-fn command line arguments to select your customized names. Combiner functions can only be assigned by explicitly setting the --combine-fn flag.

Map-only Jobs

You may have a program that doesn’t use a reduce function. Specify --num-reducers 0 on the command line and your map output will be written directly to file. In this case, your map output will go directly to the output formatter and be written to your final output, separated by the key-value separator.