Integrating Python with other languages: the Pydoop case

Simone Leo – CRS4

These slides

Q.
What is Pydoop?
A.
A set of Python bindings for Hadoop
Q.
OK, but what is Hadoop?
A.
A Java distributed computing (DC) framework

Hadoop

  • DC framework focused on data-intensive jobs
  • Simple programming model (MapReduce)
  • Backed up by a distributed filesystem (HDFS)
  • Written in Java

MapReduce

SVG not supported

Word count – pseudocode


void map(Object key, String value) {
  for (String word: tokenize(value)) {
    emit(word, 1);
  }
}

void reduce(String key, Iterator values) {
  int count = 0;
  for (int v: values) {
    count += v;
  }
  emit(key, count);
}
	    

Word count – Java


public static class TokenizerMapper
    extends Mapper<Object, Text, Text, IntWritable> {
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  public void map(Object k, Text v, Context context)
      throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(v.toString());
    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
      context.write(word, one);
    }
  }
}

public static class IntSumReducer
     extends Reducer<Text, IntWritable, Text, IntWritable> {
  private IntWritable result = new IntWritable();
  public void reduce(Text k, Iterable<IntWritable> values,
      Context context)
      throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable v: values) {
      sum += v.get();
    }
    result.set(sum);
    context.write(k, result);
  }
}
	    

Pydoop

  • Python bindings for Hadoop
  • MapReduce & HDFS API
  • Started 10 years ago with the following goal


class Mapper(api.Mapper):
    def map(self, context):
        for w in context.value.split():
            context.emit(w, 1)

class Reducer(api.Reducer):
    def reduce(self, context):
        context.emit(context.key, sum(context.values))
	  

Already available in Hadoop

MapReduce:

  • Hadoop Streaming
    • any language (write executable scripts)
    • exchange k/v pairs via stdin/stdout
  • Hadoop Pipes (no howto in current version!)
    • any language (a C++ client is included)
    • communicate with framework via socket

HDFS: libhdfs (C, via JNI)

Hadoop Streaming

mapper.py


for line in sys.stdin:
    for word in line.split():
        sys.stdout.write("%s\t1\n" % word)
	    

reducer.py


out_k, out_v = None, 0
for line in sys.stdin:
    k, v = line.split("\t", 1)
    v = int(v)
    if k != out_k:
        if out_k is not None:
            sys.stdout.write("%s\t%d\n" % (out_k, out_v))
            out_v = 0
        out_k = k
    out_v += v
sys.stdout.write("%s\t%d\n" % (out_k, out_v))
	    

Hadoop Streaming – Fancy Reducer


#!/usr/bin/env python

import sys
from itertools import groupby
from operator import itemgetter

def istream():
    for line in sys.stdin:
        k, v = line.split("\t", 1)
        yield k, int(v)

for k, stream in groupby(istream(), itemgetter(0)):
    v = sum(_[1] for _ in stream)
    sys.stdout.write("%s\t%d\n" % (k, v))
	    

mrjob is based on Hadoop Streaming

Hadoop Pipes

SVG not supported

Python Pipes Task Runner

Options, options …

  • Wrap the whole C++ client code
  • Write everything in pure Python
    • Second thing we tried, too slow
  • Extension module only for performance-critical stuff
    • Current approach, via the C API

Main job: read/write Java types


public static void writeVLong(DataOutput stream, long i) {
  if (i >= -112 && i <= 127) {
    stream.writeByte((byte)i);
    return;
  }
  int len = -112;
  if (i < 0) {
    i ^= -1L;
    len = -120;
  }
  long tmp = i;
  while (tmp != 0) {
    tmp = tmp >> 8;
    len--;
  }
  stream.writeByte((byte)len);
  len = (len < -120) ? -(len + 120) : -(len + 112);
  for (int idx = len; idx != 0; idx--) {
    int shiftbits = (idx - 1) * 8;
    long mask = 0xFFL << shiftbits;
    stream.writeByte((byte)((i & mask) >> shiftbits));
  }
}
	    

org.apache.hadoop.io.WritableUtils

Read/write Java types – C++


int64_t deserializeLong(InStream& stream) {
  int8_t b;
  stream.read(&b, 1);
  if (b >= -112) {
    return b;
  }
  bool negative;
  int len;
  if (b < -120) {
    negative = true;
    len = -120 - b;
  } else {
    negative = false;
    len = -112 - b;
  }
  uint8_t barr[len];
  stream.read(barr, len);
  int64_t t = 0;
  for (int idx = 0; idx < len; idx++) {
    t = t << 8;
    t |= (barr[idx] & 0xFF);
  }
  if (negative) {
    t ^= -1ll;
  }
  return t;
}
	    

HadoopUtils

Read/write Java types: Python


static PyObject *
FileInStream_readVLong(FileInStreamObj *self) {
  int64_t rval;
  PyThreadState *state = PyEval_SaveThread();
  try {
    rval = HadoopUtils::deserializeLong(*self->stream);
  } catch (HadoopUtils::Error e) {
    PyEval_RestoreThread(state);
    PyErr_SetString(PyExc_IOError, e.getMessage().c_str());
    return NULL;
  }
  PyEval_RestoreThread(state);
  return Py_BuildValue("L", rval);
}
	    

Task runner


import pydoop.sercore as sercore

class BinaryProtocol(object):

    def __init__(self, istream, ...):
        self.istream = istream

    def __next__(self):
        cmd = self.stream.read_vlong()
        if cmd == AUTHENTICATION_REQ:
            digest = self.stream.read_bytes()
            challenge = self.stream.read_bytes()
            self.verify_digest(digest, challenge)
        elif cmd == START: ...
	    

HDFS


tOffset hdfsGetDefaultBlockSize(hdfsFS fs) {
  jobject jFS = (jobject)fs;
  jvalue jVal;
  jthrowable jthr;
  JNIEnv* env = getJNIEnv();
  if (env == NULL) {
    errno = EINTERNAL;
    return -1;
  }
  jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
                      "getDefaultBlockSize", "()J");
  if (jthr) ... /* handle error */
  return jVal.j;
}
	    

PyObject* FsClass_get_default_block_size(FsInfo* self) {
  tOffset size = hdfsGetDefaultBlockSize(self->_fs);
  return PyLong_FromSsize_t(size);
}
	    
Q.
Can I wrap Java code without going through JNI?
A.
Yes, we tried JPype at some point. Too slow.
Q.
Any other options for integrating Python with X?

Thank you