Simple Hadoop Streaming Example

Hadoop is a software framework that distributes workloads across many machines. With a moderate-sized data center, it can process huge amounts of data in a brief period of time -thinks weeks equals hours. Because it is open-source, easy to use, and works, it is rapidly becoming a default tool in many analytic shops working with large datasets. In this post, I will give a very simple example of how to use Hadoop using Python. For more on Hadoop see here or type Hadoop overview in your favorite search engine.

For this example, let’s use the US Census County Business Patterns dataset.
Location: http://www2.census.gov/econ2007/CBP_CSV
Download: zbp07detail.zip then unzip the data.

This file provides a number of business establishments (EST) by NAICS and Zip code for the US. We are interested in the total number of establishments (EST) by zip code (zip).

The first step is to create the mapper. The map step creates the key the Hadoop system uses to keep parts of the data together for processing. The map step is expected to output the key followed by a tab (‘/t’) then the rest of the data required for processing. For this example, we will use Python. Create the file, map.py, and add the code below. The key which tells the Hadoop system how to group the data is defined as the value before the first tab (‘/t’).

#!/usr/bin/env python
import sys
for lineIn in sys.stdin:
    zip = lineIn[0:5]
#  Note: Key is defined here
print zip + '\t' + lineIn 

Now we create the reducer. The reducer step performs the analysis or data manipulation work. You only need the reducer step if the analysis requires a group by key which the map step is providing. In this example, it is a zip code. Create the file, reducer.py, and add the code below. In this step, you need to make sure you split the piped stream into the key and data line.

#!/usr/bin/env python
import sys
counts={}
for lineIn in sys.stdin:
    # Note: We separate the key here
    key, line = lineIn.rsplit('\t',1)
    aLine = line.split(',')
    counts[key] = counts.get(key,0) + int(aLine[3])
    for k , v in counts.items():
       print k + ',' +str(v)

To test the code just run the map and reduce steps alone. If it is a large file make sure to use a sub-sample.

cat zbp07detail.txt | python map.py -> zbp07detail.map.txt
cat zbp07detail.map.txt | python reduce.py -> Results.test.txt 

Now just run following commands at the shell prompt:

hadoop fs -put /zbp07detail.txt /user/mydir/ 
hadoop jar HadoopHome/contrib/streaming/hadoop-XXXX-streaming.jar \
-input /user/mydir/zbp07detail.txt \
-output /user/mydir/out \ -mapper ./map.py \ -reducer ./reducer.py \ -file ./map.py \ -file ./reducer.py
hadoop fs -cat /user/mydir/out > Results.txt

That is all there is to it. The file Results.txt will have the sums by zip.