Integrating Python with other languages: the Pydoop case

Simone Leo – CRS4

These slides

What is Pydoop?
A set of Python bindings for Hadoop
OK, but what is Hadoop?
A Java distributed computing (DC) framework


  • DC framework focused on data-intensive jobs
  • Simple programming model (MapReduce)
  • Backed up by a distributed filesystem (HDFS)
  • Written in Java


SVG not supported

Word count – pseudocode

void map(Object key, String value) {
  for (String word: tokenize(value)) {
    emit(word, 1);

void reduce(String key, Iterator values) {
  int count = 0;
  for (int v: values) {
    count += v;
  emit(key, count);

Word count – Java

public static class TokenizerMapper
    extends Mapper<Object, Text, Text, IntWritable> {
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  public void map(Object k, Text v, Context context)
      throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(v.toString());
    while (itr.hasMoreTokens()) {
      context.write(word, one);

public static class IntSumReducer
     extends Reducer<Text, IntWritable, Text, IntWritable> {
  private IntWritable result = new IntWritable();
  public void reduce(Text k, Iterable<IntWritable> values,
      Context context)
      throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable v: values) {
      sum += v.get();
    context.write(k, result);


  • Python bindings for Hadoop
  • MapReduce & HDFS API
  • Started 10 years ago with the following goal

class Mapper(api.Mapper):
    def map(self, context):
        for w in context.value.split():
            context.emit(w, 1)

class Reducer(api.Reducer):
    def reduce(self, context):
        context.emit(context.key, sum(context.values))

Already available in Hadoop


  • Hadoop Streaming
    • any language (write executable scripts)
    • exchange k/v pairs via stdin/stdout
  • Hadoop Pipes (no howto in current version!)
    • any language (a C++ client is included)
    • communicate with framework via socket

HDFS: libhdfs (C, via JNI)

Hadoop Streaming

for line in sys.stdin:
    for word in line.split():
        sys.stdout.write("%s\t1\n" % word)

out_k, out_v = None, 0
for line in sys.stdin:
    k, v = line.split("\t", 1)
    v = int(v)
    if k != out_k:
        if out_k is not None:
            sys.stdout.write("%s\t%d\n" % (out_k, out_v))
            out_v = 0
        out_k = k
    out_v += v
sys.stdout.write("%s\t%d\n" % (out_k, out_v))

Hadoop Streaming – Fancy Reducer

#!/usr/bin/env python

import sys
from itertools import groupby
from operator import itemgetter

def istream():
    for line in sys.stdin:
        k, v = line.split("\t", 1)
        yield k, int(v)

for k, stream in groupby(istream(), itemgetter(0)):
    v = sum(_[1] for _ in stream)
    sys.stdout.write("%s\t%d\n" % (k, v))

mrjob is based on Hadoop Streaming

Hadoop Pipes

SVG not supported

Python Pipes Task Runner

Options, options …

  • Wrap the whole C++ client code
  • Write everything in pure Python
    • Second thing we tried, too slow
  • Extension module only for performance-critical stuff
    • Current approach, via the C API

Main job: read/write Java types

public static void writeVLong(DataOutput stream, long i) {
  if (i >= -112 && i <= 127) {
  int len = -112;
  if (i < 0) {
    i ^= -1L;
    len = -120;
  long tmp = i;
  while (tmp != 0) {
    tmp = tmp >> 8;
  len = (len < -120) ? -(len + 120) : -(len + 112);
  for (int idx = len; idx != 0; idx--) {
    int shiftbits = (idx - 1) * 8;
    long mask = 0xFFL << shiftbits;
    stream.writeByte((byte)((i & mask) >> shiftbits));

Read/write Java types – C++

int64_t deserializeLong(InStream& stream) {
  int8_t b;, 1);
  if (b >= -112) {
    return b;
  bool negative;
  int len;
  if (b < -120) {
    negative = true;
    len = -120 - b;
  } else {
    negative = false;
    len = -112 - b;
  uint8_t barr[len];, len);
  int64_t t = 0;
  for (int idx = 0; idx < len; idx++) {
    t = t << 8;
    t |= (barr[idx] & 0xFF);
  if (negative) {
    t ^= -1ll;
  return t;


Read/write Java types: Python

static PyObject *
FileInStream_readVLong(FileInStreamObj *self) {
  int64_t rval;
  PyThreadState *state = PyEval_SaveThread();
  try {
    rval = HadoopUtils::deserializeLong(*self->stream);
  } catch (HadoopUtils::Error e) {
    PyErr_SetString(PyExc_IOError, e.getMessage().c_str());
    return NULL;
  return Py_BuildValue("L", rval);

Task runner

import pydoop.sercore as sercore

class BinaryProtocol(object):

    def __init__(self, istream, ...):
        self.istream = istream

    def __next__(self):
        cmd =
        if cmd == AUTHENTICATION_REQ:
            digest =
            challenge =
            self.verify_digest(digest, challenge)
        elif cmd == START: ...


tOffset hdfsGetDefaultBlockSize(hdfsFS fs) {
  jobject jFS = (jobject)fs;
  jvalue jVal;
  jthrowable jthr;
  JNIEnv* env = getJNIEnv();
  if (env == NULL) {
    errno = EINTERNAL;
    return -1;
  jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
                      "getDefaultBlockSize", "()J");
  if (jthr) ... /* handle error */
  return jVal.j;

PyObject* FsClass_get_default_block_size(FsInfo* self) {
  tOffset size = hdfsGetDefaultBlockSize(self->_fs);
  return PyLong_FromSsize_t(size);
Can I wrap Java code without going through JNI?
Yes, we tried JPype at some point. Too slow.
Any other options for integrating Python with X?

Thank you