Sorting in Hadoop with Itertools

Previously, I gave a simple example of a python map/reduce program. In this example I will show you how to sort the data within a key using itertools, a powerful python library focusing on looping algorithms. This example is an extension of Michael Noll’s detailed Hadoop example. The data in this example will be comma-delimited.

AccountId, SequenceId,Amount,Product
0123,9,12.50,Product9
0123,5, 4.60,Product5
0123,7,54.00,Product7
0123,2,34.75,Product2
0123,3, 6.34,Product3
0123,8,14.50,Product8
0123,1,52.56,Product1
0123,4,78.45,Product4
0123,6,89.50,Product6
0321,1,2.12,Product1
0321,8,90.50,Product8
0321,3, 2.35,Product3
0321,4,56.25,Product4
0321,9,71.00,Product9
0321,2,24.75,Product2
0321,7,34.34,Product7
0321,6,34.23,Product6
0321,5,37.03,Product5       

Notice the data is sorted by the AccountId however for a given account the data is not sorted by SequenceId. One important thing to remember is Hadoop does not preserve the order of the data when passing it to the nodes which will run the reduce step. The map step will group the data by the specified key but even if the data is sorted on the server correctly, it is not guaranteed that order will be preserved in the reduce. If your analysis requires the data to be sorted not just grouped by key this must be handled in the reduce step. Now let suppose you need to know the ordered sequence of products purchased for a given account. For clarity I numbered the product in accordance with its sequenceId. While this is easy to achieve with the python base library thankfully there is a library which has a number of function that makes this quite elegant to code, itertools. First let’s create a simple map step, mapper.py.

#!/usr/bin/env python 
import sys 
for line in sys.stdin: 
    # Split line by deliminator
    aline = line.split(',') 
    # Choose the first part as map key
    key= aline[0] 
    # Output map key followed by a tab and the rest of the data
    print '%s\t%s' % (key, line.strip()) 

This will define the AccountId as the key. Remember even if you included the SequenceId in the key it will not help you. The Hadoop server will then just send each record randomly across all the nodes. If you need to do an analysis by account a given node will not see all the account’s transactions.

Now, here is the reduce step, reducer.py:

#!/usr/bin/env python
import sys
from itertools import groupby
from operator  import itemgetter 

First, we define how to parse the input data stream. Notice how simple it is to handle to different delimitations, the tab to define the key for the Hadoop server and the comma to separate out the data fields.

def read_mapper_output(file ,sortBy ,  sepA,  sepB) :
    for line in file: 
        key1,line1 = line.rstrip().split(sepA, 1)
        aline      = line.split(sepB)
        # Create a second key to use on our data structure
        key2       = aline[sortBy]
        yield key1 , key2,  line1 

Yield defines this function as an iterator. And a structure was created with account number, sequence ID, and the entire data line.

def main(separator='\t'  ): 
    # Define the iterator
    data   = read_mapper_output(sys.stdin, 1, '\t'  , ',' ) 
    # Loop through the iterator grouped by the first key.
    for grpKey, grpDt in groupby(data, itemgetter(0)): 
        #set the product list blank
        products=''
        #Sort the grouped data by the second key.
        for key1, key2, line in sorted(grpDt , key=itemgetter(1)):  
             aline =line.split(',')
             # Populate you product list
             products = products + ',' +   aline[3]     
             print grpKey + products 

Finally, we call out the main function main as the default.

 
if __name__ == "__main__":
    main() 

Run the following to test the code:

cat test.data | python mapper.py | python reducer.py

It will output:

0123,Product1,Product2,Product3,Product4,Product5,Product6,Product7,Product8,Product9
0321,Product1,Product2,Product3,Product4,Product5,Product6,Product7,Product8,Product9

Even complex problems have an elegant and fast solution with Python.

Simple Hadoop Streaming Example

Hadoop is a software framework that distributes workloads across many machines. With a moderate-sized data center, it can process huge amounts of data in a brief period of time -thinks weeks equals hours. Because it is open-source, easy to use, and works, it is rapidly becoming a default tool in many analytic shops working with large datasets. In this post, I will give a very simple example of how to use Hadoop using Python. For more on Hadoop see here or type Hadoop overview in your favorite search engine.

For this example, let’s use the US Census County Business Patterns dataset.
Location: http://www2.census.gov/econ2007/CBP_CSV
Download: zbp07detail.zip then unzip the data.

This file provides a number of business establishments (EST) by NAICS and Zip code for the US. We are interested in the total number of establishments (EST) by zip code (zip).

The first step is to create the mapper. The map step creates the key the Hadoop system uses to keep parts of the data together for processing. The map step is expected to output the key followed by a tab (‘/t’) then the rest of the data required for processing. For this example, we will use Python. Create the file, map.py, and add the code below. The key which tells the Hadoop system how to group the data is defined as the value before the first tab (‘/t’).

#!/usr/bin/env python
import sys
for lineIn in sys.stdin:
    zip = lineIn[0:5]
#  Note: Key is defined here
print zip + '\t' + lineIn 

Now we create the reducer. The reducer step performs the analysis or data manipulation work. You only need the reducer step if the analysis requires a group by key which the map step is providing. In this example, it is a zip code. Create the file, reducer.py, and add the code below. In this step, you need to make sure you split the piped stream into the key and data line.

#!/usr/bin/env python
import sys
counts={}
for lineIn in sys.stdin:
    # Note: We separate the key here
    key, line = lineIn.rsplit('\t',1)
    aLine = line.split(',')
    counts[key] = counts.get(key,0) + int(aLine[3])
    for k , v in counts.items():
       print k + ',' +str(v)

To test the code just run the map and reduce steps alone. If it is a large file make sure to use a sub-sample.

cat zbp07detail.txt | python map.py -> zbp07detail.map.txt
cat zbp07detail.map.txt | python reduce.py -> Results.test.txt 

Now just run following commands at the shell prompt:

hadoop fs -put /zbp07detail.txt /user/mydir/ 
hadoop jar HadoopHome/contrib/streaming/hadoop-XXXX-streaming.jar \
-input /user/mydir/zbp07detail.txt \
-output /user/mydir/out \ -mapper ./map.py \ -reducer ./reducer.py \ -file ./map.py \ -file ./reducer.py
hadoop fs -cat /user/mydir/out > Results.txt

That is all there is to it. The file Results.txt will have the sums by zip.

JAQL Data Management

JAQL is a JSON query language similar to SQL. One key difference is the dataset is accessed more like objects. A great overview is found here:JaqlOverview the strength of JAQL is it allows users simple, extendable code to manipulate data that is in a non-proprietary, readable, and commonly used file format. It is primarily used when processing data with Hadoop. JAQL tends to crash when it has exceptions so I would copy-paste my command from an editor. Also never press the up arrow.

Example Data:  
Customer
CustomerID Name Age
1 Joe 23
2 Mika 45
3 Lin 34
4 Sara 56
5 Susan 18
PurchaseOrder
POID CustomerID Purchase
1 3 Fiction
2 1 Biography
3 1 Fiction
4 2 Biography
5 3 Fiction
6 4 Fiction

 

Create Data
$Customer = [ {CustomerId:1, name: Joe , age: 23}
, {CustomerId:1, name: Mika , age: 45}
, {CustomerId:1, name: Lin , age: 34}
, {CustomerId:1, name: Sara , age: 56}
, {CustomerId:1, name: Susan , age: 18}];

$PurchaseOrder = [
{POID: 1, CustomerId:3, Purchase: Fiction }
{POID: 2, CustomerId:1, Purchase: Biography }
{POID: 3, CustomerId:1, Purchase: Fiction }
{POID: 4, CustomerId:2, Purchase: Biography }
{POID: 5, CustomerId:3, Purchase: Fiction }
{POID: 6, CustomerId:3, Purchase: Fiction } ]; 
SELECT
hdfs($Customers) ; 
hdfs($PurchaseOrder) ;
CustomerID Name Age
1 Joe 23
2 Mika 45
3 Lin 34
4 Sara 56
5 Susan 18

Sort BY

$Customers -> sort by [$.age];
CustomerID Name Age
5 Susan 18
1 Joe 23
3 Lin 34
2 Mika 45
4 Sara 56
Filter
$Customers -> filter $.age == 18 ;
CustomerID Name Age
5 Susan 18
INNER JOIN
join $Customers, $PurchaseOrder where $Customers.CustomerId == $PurchaseOrder.CustomerId into {$Customers.name, $PurchaseOrder.*} ; 
CustomerID Name Age POID Purchase
1 Joe 23 2 Biography
1 Joe 23 3 Fiction
2 Mika 45 4 Biography
3 Lin 34 1 Fiction
3 Lin 34 5 Fiction
4 Sara 56 6 Fiction
LEFT OUTER JOIN

join preserve $Customers, $PurchaseOrder where $Customers.CustomerId == $PurchaseOrder.CustomerId into {$Customers.name, $PurchaseOrder.*} ;

CustomerID Name Age POID Purchase
1 Joe 23 2 Biography
1 Joe 23 3 Fiction
2 Mika 45 4 Biography
3 Lin 34 1 Fiction
3 Lin 34 5 Fiction
4 Sara 56 6 Fiction
5 Susan 18 NULL NULL
GROUP BY
 
 $Customers -> group by $Customers_group = $.CustomerID into {$Customers_group, $Customers.age, total: count($) }; 
Name Age Orders
Joe 23 1
Mika 45 2
Lin 34 2
Sara 56 1
UPDATE

$Customer1= $Customers -> filter $.CustomerId == 1 -> transform {CustomerId: $.CustomerId, name: $.name ,age: 16} ; $Customerne1= $Customers -> filter $.CustomerId != 1 ; hdfs($Customer1 , $Customerne1);

Customer
CustomerID Name Age
1 Joe 26
2 Mika 45
3 Lin 34
4 Sara 56
5 Susan 18
INSERT
$Customer6=[{CustomerId:6,name: Terry ,age:50}]; hdfs($Customers , $Customer6); -> write(file( Customers.dat ));
Customer
CustomerID Name Age
1 Joe 23
2 Mika 45
3 Lin 34
4 Sara 56
5 Susan 18
6 Terry 50
DELETE
$Customerne1= $Customers ->filter $.CustomerId != 1
Customer
CustomerID Name Age
2 Mika 45
3 Lin 34
4 Sara 56
5 Susan 18