Avro I/O¶
Pydoop transparently supports reading and writing Avro records in MapReduce applications. This must be
enabled by setting appropriate options in pydoop submit
(see below).
The following program implements a (slightly modified) version of the color count example from the Avro docs:
from collections import Counter
import pydoop.mapreduce.api as api
import pydoop.mapreduce.pipes as pp
class Mapper(api.Mapper):
def map(self, ctx):
user = ctx.value
color = user['favorite_color']
if color is not None:
ctx.emit(user['office'], Counter({color: 1}))
class Reducer(api.Reducer):
def reduce(self, ctx):
s = sum(ctx.values, Counter())
ctx.emit('', {'office': ctx.key, 'counts': s})
def __main__():
pp.run_task(pp.Factory(Mapper, reducer_class=Reducer))
The application counts the per-office occurrence of favorite colors in a dataset of user records with the following structure:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "office", "type": "string"},
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
User records are read from an Avro container stored on HDFS, and results are written to another Avro container with the following schema:
{
"namespace": "example.avro",
"type": "record",
"name": "Stats",
"fields": [
{"name": "office", "type": "string"},
{"name": "counts", "type": {"type": "map", "values": "long"}}
]
}
Pydoop transparently serializes and/or deserializes Avro data as needed, allowing you to work directly with Python dictionaries. To get this behavior, enable Avro I/O and specify the output schema as follows:
export STATS_SCHEMA=$(cat stats.avsc)
pydoop submit \
-D pydoop.mapreduce.avro.value.output.schema="${STATS_SCHEMA}" \
--avro-input v --avro-output v \
--upload-file-to-cache color_count.py \
color_count input output
The --avro-input v
and --avro-output v
flags specify that we
want to work with Avro records on MapReduce values; the other possible
choices are "k"
, where records are exchanged over keys, and
"kv"
, which assumes that the top-level record structure has two
fields named "key"
and "value"
and passes the former on keys
and the latter on values.
Note that we did not have to specify any input schema: in this case, Avro automatically falls back to the writer schema, i.e., the one that’s been used to write the container file.
The examples/avro
directory contains examples for all I/O modes.
Avro-Parquet I/O¶
The above example focuses on Avro containers. However, Pydoop supports any input/output format that exchanges Avro records. In particular, it can be used to read from and write to Avro-Parquet files, i.e., Parquet files that use the Avro object model.
Note
Make sure you have Parquet version 1.6 or later to avoid running
into object reuse problems. More
generally, the record writer must be aware of the fact that records
passed to its write
method are mutable and can be reused by the
caller.
The following application reproduces the k-mer count example from the ADAM docs:
import pydoop.mapreduce.api as api
import pydoop.mapreduce.pipes as pp
WIDTH = 5
def window(s, width):
for i in range(len(s) - width + 1):
yield s[i: i + width]
class Mapper(api.Mapper):
def map(self, ctx):
seq = ctx.value['sequence']
for kmer in window(seq, WIDTH):
ctx.emit(kmer, 1)
class Reducer(api.Reducer):
def reduce(self, ctx):
ctx.emit(ctx.key, sum(ctx.values))
def __main__():
pp.run_task(pp.Factory(Mapper, reducer_class=Reducer))
To run the above program, execute pydoop submit as follows:
export PROJECTION=$(cat projection.avsc)
pydoop submit \
-D parquet.avro.projection="${PROJECTION}" \
--upload-file-to-cache kmer_count.py \
--input-format parquet.avro.AvroParquetInputFormat \
--avro-input v --libjars "path/to/the/parquet/jar" \
kmer_count input output
Since we are using an external input format (Avro container input and
output formats are integrated into the Java Pydoop code), we have to
specify the corresponding class via --input-format
and its jar
with --libjars
. The optional parquet projection allows to extract
only selected fields from the input data. Note that, in this case,
reading input records from values is not an option: that’s how
AvroParquetInputFormat
works.
More Avro-Parquet examples are available under examples/avro
.
Running the examples¶
To run the Avro examples you have to install the Python Avro package
(you can get it from the Avro web site), while the avro
jar is
included in Hadoop and the avro-mapred
one is included in Pydoop.
Part of the examples code (e.g., input generation) is written in Java.
Compilation and packaging into a jar is handled by the bash runners,
but Maven needs to be installed on the
client machine.