Navigation

  • index
  • modules |
  • next |
  • previous |
  • Home| 
  • Installation| 
  • Support| 
  • Git Repo| 
  • Pydoop 1
  • Tutorial »

Logo

Table Of Contents

  • Writing Full-Featured Applications
    • Mappers and Reducers
    • Counters and Status Updates
    • Record Readers and Writers
    • Partitioners and Combiners
    • Profiling Your Application

Previous topic

The HDFS API

Next topic

Installation

Get Pydoop

  • Download page
  • Installation Instructions

Contributors

Pydoop is developed by: CRS4

Quick search

Enter search terms or a module, class or function name.

Writing Full-Featured Applications¶

While Pydoop Script allows to solve many problems with minimal programming effort, some tasks require a broader set of features. If your data is not simple text with one record per line, for instance, you may need to write a record reader; if you need to change the way intermediate keys are assigned to reducers, you have to write your own partitioner. These components are accessible via the Pydoop MapReduce API.

The rest of this section serves as an introduction to MapReduce programming with Pydoop; the API reference has all the details.

Mappers and Reducers¶

The Pydoop API is object-oriented: the application developer writes a Mapper class, whose core job is performed by the map() method, and a Reducer class that processes data via the reduce() method. The following snippet shows how to write the mapper and reducer for wordcount, an application that counts the occurrence of each word in a text data set:

import pydoop.mapreduce.api as api
import pydoop.mapreduce.pipes as pipes


class Mapper(api.Mapper):

    def map(self, context):
        for w in context.value.split():
            context.emit(w, 1)


class Reducer(api.Reducer):

    def reduce(self, context):
        context.emit(context.key, sum(context.values))


FACTORY = pipes.Factory(Mapper, reducer_class=Reducer)


def main():
    pipes.run_task(FACTORY)


if __name__ == "__main__":
    main()

The mapper is instantiated by the MapReduce framework that, for each input record, calls the map method passing a context object to it. The context serves as a communication interface between the framework and the application: in the map method, it is used to get the current key (not used in the above example) and value, and to emit (send back to the framework) intermediate key-value pairs. The reducer works in a similar way, the main difference being the fact that the reduce method gets a set of values for each key. The context has several other functions that we will explore later.

To run the above program, save it to a wc.py file and execute:

pydoop submit --upload-file-to-cache wc.py wc input output

Where input is the HDFS input directory.

See the section on running Pydoop programs for more details. Source code for the word count example is located under examples/pydoop_submit/mr in the Pydoop distribution.

Counters and Status Updates¶

Hadoop features application-wide counters that can be set and incremented by developers. Status updates are arbitrary text messages sent to the framework: these are especially useful in cases where the computation associated with a single input record can take a considerable amount of time, since Hadoop kills tasks that read no input, write no output and do not update the status within a configurable amount of time (ten minutes by default).

The following snippet shows how to modify the above example to use counters and status updates:

class Mapper(api.Mapper):

    def __init__(self, context):
        super(Mapper, self).__init__(context)
        context.set_status("initializing mapper")
        self.input_words = context.get_counter("WORDCOUNT", "INPUT_WORDS")

    def map(self, context):
        words = context.value.split()
        for w in words:
            context.emit(w, 1)
        context.increment_counter(self.input_words, len(words))
class Reducer(api.Reducer):

    def __init__(self, context):
        super(Reducer, self).__init__(context)
        context.set_status("initializing reducer")
        self.output_words = context.get_counter("WORDCOUNT", "OUTPUT_WORDS")

    def reduce(self, context):
        context.emit(context.key, sum(context.values))
        context.increment_counter(self.output_words, 1)

Counter values and status updates show up in Hadoop’s web interface. In addition, the final values of all counters are listed in the command line output of the job (note that the list also includes Hadoop’s default counters).

Record Readers and Writers¶

By default, Hadoop assumes you want to process plain text and splits input data into text lines. If you need to process binary data, or your text data is structured into records that span multiple lines, you need to write your own RecordReader. The record reader operates at the HDFS file level: its job is to read data from the file and feed it as a stream of key-value pairs (records) to the mapper. To interact with HDFS files, we need to import the hdfs submodule:

import pydoop.hdfs as hdfs

The following example shows how to write a record reader that mimics Hadoop’s default LineRecordReader, where keys are byte offsets with respect to the whole file and values are text lines:

class Reader(api.RecordReader):
    """
    Mimics Hadoop's default LineRecordReader (keys are byte offsets with
    respect to the whole file; values are text lines).
    """
    def __init__(self, context):
        super(Reader, self).__init__(context)
        self.logger = LOGGER.getChild("Reader")
        self.logger.debug('started')
        self.isplit = context.input_split
        for a in "filename", "offset", "length":
            self.logger.debug(
                "isplit.{} = {}".format(a, getattr(self.isplit, a))
            )
        self.file = hdfs.open(self.isplit.filename)
        self.file.seek(self.isplit.offset)
        self.bytes_read = 0
        if self.isplit.offset > 0:
            discarded = self.file.readline()
            self.bytes_read += len(discarded)

    def close(self):
        self.logger.debug("closing open handles")
        self.file.close()
        self.file.fs.close()

    def next(self):
        if self.bytes_read > self.isplit.length:
            raise StopIteration
        key = self.isplit.offset + self.bytes_read
        record = self.file.readline()
        if not record:  # end of file
            raise StopIteration
        self.bytes_read += len(record)
        return (key, record.decode("utf-8"))

    def get_progress(self):
        return min(float(self.bytes_read) / self.isplit.length, 1.0)

From the context, the record reader gets the following information on the byte chunk assigned to the current task, or input split:

  • the name of the file it belongs to;

  • its offset with respect to the beginning of the file;

  • its length.

This allows to open the file, seek to the correct offset and read until the end of the split is reached. The framework gets the record stream by means of repeated calls to the next() method. The get_progress() method is called by the framework to get the fraction of the input split that’s already been processed. The close method (present in all components except for the partitioner) is called by the framework once it has finished retrieving the records: this is the right place to perform cleanup tasks such as closing open handles.

To use the reader, pass the class object to the factory with record_reader_class=Reader and, when running the program with pydoop submit, set the --do-not-use-java-record-reader flag.

The record writer writes key/value pairs to output files. The default behavior is to write one tab-separated key/value pair per line; if you want to do something different, you have to write a custom RecordWriter:

class Writer(api.RecordWriter):

    def __init__(self, context):
        super(Writer, self).__init__(context)
        self.logger = LOGGER.getChild("Writer")
        jc = context.job_conf
        outfn = context.get_default_work_file()
        self.logger.info("writing to %s", outfn)
        hdfs_user = jc.get("pydoop.hdfs.user", None)
        self.file = hdfs.open(outfn, "wt", user=hdfs_user)
        self.sep = jc.get("mapreduce.output.textoutputformat.separator", "\t")

    def close(self):
        self.logger.debug("closing open handles")
        self.file.close()
        self.file.fs.close()

    def emit(self, key, value):
        self.file.write(key + self.sep + str(value) + "\n")

The above example, which simply reproduces the default behavior, also shows how to get job configuration parameters: the one starting with mapreduce is a standard Hadoop parameter, while pydoop.hdfs.user is a custom parameter defined by the application developer. Configuration properties are passed as -D <key>=<value> (e.g., -D mapreduce.output.textoutputformat.separator='|') to the submitter.

To use the writer, pass the class object to the factory with record_writer_class=Writer and, when running the program with pydoop submit, set the --do-not-use-java-record-writer flag.

Partitioners and Combiners¶

The Partitioner assigns intermediate keys to reducers. If you do not explicitly set a partitioner via the factory, partitioning will be done on the Java side. By default, Hadoop uses HashPartitioner, which selects the reducer on the basis of a hash function of the key.

To write a custom partitioner in Python, subclass Partitioner, overriding the partition() method. The framework will call this method with the current key and the total number of reducers N as the arguments, and expect the chosen reducer ID — in the [0, ..., N-1] range — as the return value.

The following examples shows how to write a partitioner that simply mimics the default HashPartitioner behavior:

from hashlib import md5
class Partitioner(api.Partitioner):

    def __init__(self, context):
        super(Partitioner, self).__init__(context)
        self.logger = LOGGER.getChild("Partitioner")

    def partition(self, key, n_reduces):
        reducer_id = int(md5(key).hexdigest(), 16) % n_reduces
        self.logger.debug("reducer_id: %r" % reducer_id)
        return reducer_id

The combiner is functionally identical to a reducer, but it is run locally, on the key-value stream output by a single mapper. Although nothing prevents the combiner from processing values differently from the reducer, the former, provided that the reduce function is associative and idempotent, is typically configured to be the same as the latter, in order to perform local aggregation and thus help cut down network traffic.

Local aggregation is implemented by caching intermediate key/value pairs in a dictionary. Like in standard Java Hadoop, cache size is controlled by mapreduce.task.io.sort.mb and defaults to 100 MB. Pydoop uses sys.getsizeof() to determine key/value size, which takes into account Python object overhead. This can be quite substantial (e.g., sys.getsizeof(b"foo") == 36) and must be taken into account if fine tuning is desired.

Important

Due to the caching, when using a combiner there are limitations on the types that can be used for intermediate keys and values. First of all, keys must be hashable. In addition, values belonging to a mutable type should not change after having been emitted by the mapper. For instance, the following (however contrived) example would not work as expected:

intermediate_value = {}

class Mapper(api.Mapper):
  def map(self, ctx):
     intermediate_value.clear()
     intermediate_value[ctx.key] = ctx.value
     ctx.emit("foo", intermediate_value)

For these reasons, it is recommended to use immutable types for both keys and values when the job includes a combiner.

Custom partitioner and combiner classes must be declared to the factory as done above for record readers and writers. To recap, if we need to use all of the above components, we need to instantiate the factory as:

FACTORY = pipes.Factory(
    Mapper,
    reducer_class=Reducer,
    record_reader_class=Reader,
    record_writer_class=Writer,
    partitioner_class=Partitioner,
    combiner_class=Reducer
)

Profiling Your Application¶

Python has built-in support for application profiling. Profiling a standalone program is relatively straightforward: run it through cProfile, store stats in a file and use pstats to read and interpret them. A MapReduce job, however, spawns multiple map and reduce tasks, so we need a way to collect all stats. Pydoop supports this via a pstats_dir argument to run_task:

pipes.run_task(factory, pstats_dir="pstats")

With the above call, Pydoop will run each MapReduce task with cProfile, and store resulting pstats files in the "pstats" directory on HDFS. You can also enable profiling in the pydoop submit command line:

pydoop submit --pstats-dir HDFS_DIR [...]

If the pstats directory is specified both ways, the one from run_task takes precedence.

Another way to do time measurements is via counters. The utils.misc module provides a Timer object for this purpose:

from pydoop.utils.misc import Timer

class Mapper(api.Mapper):

    def __init__(self, context):
        super(Mapper, self).__init__(context)
        self.timer = Timer(context)

    def map(self, context):
        with self.timer.time_block("tokenize"):
            words = context.value.split()
        for w in words:
            context.emit(w, 1)

With the above coding, the total time spent to execute context.value.split() (in ms) will be automatically accumulated in a TIME_TOKENIZE counter under the Timer counter group.

Since profiling and timers can substantially slow down the Hadoop job, they should only be used for performance debugging.

Navigation

  • index
  • modules |
  • next |
  • previous |
  • Home| 
  • Installation| 
  • Support| 
  • Git Repo| 
  • Pydoop 1
  • Tutorial »
© Copyright 2009-2019, CRS4. Created using Sphinx 2.0.1.