Hadoop Streaming allows us to run non-jvm code on hadoop.
Regular Map-reduce.
- A Main method which configures the job, and lauches it
- set # reducers
- set mapper and reducer classes
- set partitioner
- set other hadoop configurations
- A Mapper Class
- takes K,V inputs, writes K,V outputs
- A Reducer Class
- takes K, Iterator[V] inputs, and writes K,V outputs
Hadoop Streaming is actually just a java library that implements these things, but instead of actually doing anything, it pipes data to scripts. Pass data between our Map and Reduce code via
STDIN
(standard input) and STDOUT
(standard output).
The reducer interface for streaming is actually different than in Java.
Instead of receiving
Instead of receiving
reduce(k, Iterator[V])
, your script is actually sent one line per value, including the key. So for example, instead of receiving:reduce('TRUE', Iterator(1, 1, 1, 1))It will receive:TRUE 1TRUE 1TRUE 1
Word Count Example:
1. Execute without Hadoop
$chmod +x mapper.py1. Execute without Hadoop
$chmod +x reducer.py
$ cat data.csv | python mapper.py | sort | python reducer.py
2. Upload input data on HDFS
$hadoop fs -put data.csv input/data
In addition to executable files, you can also package other auxiliary files (such as dictionaries, configuration files, etc) that may be used by the mapper and/or the reducer.
-file lib/pysftp.mod
Note, only pysftp.mod is shipped in the job jar file, instead of the whole directory.
$ hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.7.0.jar \
>-Dmapred.reduce.tasks=1 \
> -mapper mapper.py \
> -reducer reducer.py \
> -input input/data \
> -output output/result \
> -file mapper.py \
> -file reducer.py
Mapper.py
#!/usr/bin/env python """A more advanced Mapper, using Python iterators and generators.""" import sys def read_input(file): for line in file: # split the line into words yield line.split() def main(separator='\t'): # input comes from STDIN (standard input) data = read_input(sys.stdin) for words in data: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 for word in words: print '%s%s%d' % (word, separator, 1) if __name__ == "__main__": main()
Reducer.py
#!/usr/bin/env python """A more advanced Reducer, using Python iterators and generators.""" from itertools import groupby from operator import itemgetter import sys def read_mapper_output(file, separator='\t'): for line in file: yield line.rstrip().split(separator, 1) def main(separator='\t'): # input comes from STDIN (standard input) data = read_mapper_output(sys.stdin, separator=separator) # groupby groups multiple word-count pairs by word, # and creates an iterator that returns consecutive keys and their group: # current_word - string containing a word (the key) # group - iterator yielding all ["<current_word>", "<count>"] items for current_word, group in groupby(data, itemgetter(0)): try: total_count = sum(int(count) for current_word, count in group) print "%s%s%d" % (current_word, separator, total_count) except ValueError: # count was not a number, so silently discard this item pass if __name__ == "__main__": main()
Reference:
http://blog.matthewrathbone.com/2013/04/17/what-is-hadoop.html
http://blog.matthewrathbone.com/2013/11/17/python-map-reduce-on-hadoop---a-beginners-tutorial.html#
http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/#test-your-code-cat-data-map-sort-reduce
http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/#test-your-code-cat-data-map-sort-reduce
Well written. Thanks for sharing. Keep updating more and more SEO Online Training
ReplyDeleteJava Online Training
python Online Training
Salesforce Online Training
Tableau Online Training
AWS Online training
Dot Net OnlineTraining
DevOps Online Training
Selenium Online Training