Simone Leo – CRS4
void map(Object key, String value) {
for (String word: tokenize(value)) {
emit(word, 1);
}
}
void reduce(String key, Iterator values) {
int count = 0;
for (int v: values) {
count += v;
}
emit(key, count);
}
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object k, Text v, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(v.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text k, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable v: values) {
sum += v.get();
}
result.set(sum);
context.write(k, result);
}
}
class Mapper(api.Mapper):
def map(self, context):
for w in context.value.split():
context.emit(w, 1)
class Reducer(api.Reducer):
def reduce(self, context):
context.emit(context.key, sum(context.values))
MapReduce:
HDFS: libhdfs (C, via JNI)
mapper.py
for line in sys.stdin:
for word in line.split():
sys.stdout.write("%s\t1\n" % word)
reducer.py
out_k, out_v = None, 0
for line in sys.stdin:
k, v = line.split("\t", 1)
v = int(v)
if k != out_k:
if out_k is not None:
sys.stdout.write("%s\t%d\n" % (out_k, out_v))
out_v = 0
out_k = k
out_v += v
sys.stdout.write("%s\t%d\n" % (out_k, out_v))
#!/usr/bin/env python
import sys
from itertools import groupby
from operator import itemgetter
def istream():
for line in sys.stdin:
k, v = line.split("\t", 1)
yield k, int(v)
for k, stream in groupby(istream(), itemgetter(0)):
v = sum(_[1] for _ in stream)
sys.stdout.write("%s\t%d\n" % (k, v))
mrjob is based on Hadoop Streaming
Options, options …
public static void writeVLong(DataOutput stream, long i) {
if (i >= -112 && i <= 127) {
stream.writeByte((byte)i);
return;
}
int len = -112;
if (i < 0) {
i ^= -1L;
len = -120;
}
long tmp = i;
while (tmp != 0) {
tmp = tmp >> 8;
len--;
}
stream.writeByte((byte)len);
len = (len < -120) ? -(len + 120) : -(len + 112);
for (int idx = len; idx != 0; idx--) {
int shiftbits = (idx - 1) * 8;
long mask = 0xFFL << shiftbits;
stream.writeByte((byte)((i & mask) >> shiftbits));
}
}
org.apache.hadoop.io.WritableUtils
int64_t deserializeLong(InStream& stream) {
int8_t b;
stream.read(&b, 1);
if (b >= -112) {
return b;
}
bool negative;
int len;
if (b < -120) {
negative = true;
len = -120 - b;
} else {
negative = false;
len = -112 - b;
}
uint8_t barr[len];
stream.read(barr, len);
int64_t t = 0;
for (int idx = 0; idx < len; idx++) {
t = t << 8;
t |= (barr[idx] & 0xFF);
}
if (negative) {
t ^= -1ll;
}
return t;
}
HadoopUtils
static PyObject *
FileInStream_readVLong(FileInStreamObj *self) {
int64_t rval;
PyThreadState *state = PyEval_SaveThread();
try {
rval = HadoopUtils::deserializeLong(*self->stream);
} catch (HadoopUtils::Error e) {
PyEval_RestoreThread(state);
PyErr_SetString(PyExc_IOError, e.getMessage().c_str());
return NULL;
}
PyEval_RestoreThread(state);
return Py_BuildValue("L", rval);
}
import pydoop.sercore as sercore
class BinaryProtocol(object):
def __init__(self, istream, ...):
self.istream = istream
def __next__(self):
cmd = self.stream.read_vlong()
if cmd == AUTHENTICATION_REQ:
digest = self.stream.read_bytes()
challenge = self.stream.read_bytes()
self.verify_digest(digest, challenge)
elif cmd == START: ...
tOffset hdfsGetDefaultBlockSize(hdfsFS fs) {
jobject jFS = (jobject)fs;
jvalue jVal;
jthrowable jthr;
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
"getDefaultBlockSize", "()J");
if (jthr) ... /* handle error */
return jVal.j;
}
PyObject* FsClass_get_default_block_size(FsInfo* self) {
tOffset size = hdfsGetDefaultBlockSize(self->_fs);
return PyLong_FromSsize_t(size);
}