Saturday 6 September 2014

Hadoop Streaming 2: Simple Example

Hadoop Streaming allows us to run non-jvm code on hadoop.
Regular Map-reduce.
  1. A Main method which configures the job, and lauches it
    • set # reducers
    • set mapper and reducer classes
    • set partitioner
    • set other hadoop configurations
  2. A Mapper Class
    • takes K,V inputs, writes K,V outputs
  3. A Reducer Class
    • takes K, Iterator[V] inputs, and writes K,V outputs
Hadoop Streaming is actually just a java library that implements these things, but instead of actually doing anything, it pipes data to scripts. Pass data between our Map and Reduce code via STDIN (standard input) and STDOUT (standard output).
The reducer interface for streaming is actually different than in Java. 
Instead of receiving reduce(k, Iterator[V]), your script is actually sent one line per value, including the key. So for example, instead of receiving:reduce('TRUE', Iterator(1, 1, 1, 1))It will receive:TRUE 1TRUE 1TRUE 1
Word Count Example:

1. Execute without Hadoop
$chmod +x mapper.py
$chmod +x reducer.py

$ cat data.csv | python mapper.py | sort | python reducer.py

2. Upload input data on HDFS 
$hadoop fs -put data.csv input/data

In addition to executable files, you can also package other auxiliary files (such as dictionaries, configuration files, etc) that may be used by the mapper and/or the reducer.
-file lib/pysftp.mod
Note, only pysftp.mod is shipped in the job jar file, instead of the whole directory.

$ hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.7.0.jar \
>-Dmapred.reduce.tasks=1 \
> -mapper mapper.py \
> -reducer reducer.py \
> -input input/data  \
> -output output/result \
> -file mapper.py \
> -file reducer.py


Mapper.py

#!/usr/bin/env python
"""A more advanced Mapper, using Python iterators and generators."""

import sys

def read_input(file):
    for line in file:
        # split the line into words
        yield line.split()

def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_input(sys.stdin)
    for words in data:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        for word in words:
            print '%s%s%d' % (word, separator, 1)

if __name__ == "__main__":
    main()


Reducer.py


#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin, separator=separator)
    # groupby groups multiple word-count pairs by word,
    # and creates an iterator that returns consecutive keys and their group:
    #   current_word - string containing a word (the key)
    #   group - iterator yielding all ["<current_word>", "<count>"] items
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            # count was not a number, so silently discard this item
            pass

if __name__ == "__main__":
    main()



Reference:
http://blog.matthewrathbone.com/2013/04/17/what-is-hadoop.html
http://blog.matthewrathbone.com/2013/11/17/python-map-reduce-on-hadoop---a-beginners-tutorial.html#
http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/#test-your-code-cat-data-map-sort-reduce



1 comment: