layout | title | subtitle | minutes |
---|---|---|---|
page |
Introduction to Hadoop |
Creating a Reducer |
15 |
- Create python-based reducer function and test against non-Hadoop input
Each reducer will process a collection of KEYS, each KEY associated with a set of VALUEs. It should be noted that in a Python-based Hadoop Streaming environment, intermediate data come into each reducer not as a collection of separated KEY/VALUES groups, but still as a single stream of data whose elements are sorted by KEYs. Create a Python file named reducer01.py with the following content:
#!/usr/bin/env python
import sys
current_movie = None
current_rating_sum = 0
current_rating_count = 0
for line in sys.stdin:
line = line.strip()
movie, rating = line.split("\t", 1)
try:
rating = float(rating)
except ValueError:
continue
if current_movie == movie:
current_rating_sum += rating
current_rating_count += 1
else:
if current_movie:
rating_average = current_rating_sum / current_rating_count
print ("%s\t%s" % (current_movie, rating_average))
current_movie = movie
current_rating_sum = rating
current_rating_count = 1
if current_movie == movie:
rating_average = current_rating_sum / current_rating_count
print ("%s\t%s" % (current_movie, rating_average))
To test this script on non-Hadoop data, we can run the following command:
!ssh dsciu001 hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null | head -n 10 | python mapper03.py | sort | python reducer01.py
Crocodile Dundee II (1988) 3.0 1
Departed, The (2006) 5.0 1
Forrest Gump (1994) 4.0 1
Free Willy 2: The Adventure Home (1995) 2.5 1
Gone Girl (2014) 4.0 1
Interstellar (2014) 4.0 1
Matrix, The (1999) 3.5 1
Prince of Egypt, The (1998) 4.0 1
Whiplash (2014) 5.0 1
The sort pipe in the middle will sort all the intermediate data coming from the mapper, effectively emulate Hadoop's shuffle behavior. This test is somewhat meaningless, as it only tells us that there is no error in our reduce code, but it does not tell us whether our reducer works correctly or not. We need to test for the following:
- Do all instances of all pairs coming from the mapper arrive at the reducer?
- Are the calculations correct?
In order to do this, typically we would need to set up a smaller sample data set and manually verify the answers to all the above question. Alternatively, we can also leverage Linux's capabilities to customize our mapping pipeline.
!ssh dsciu001 hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null | head -n 1000 | python mapper03.py | grep Matrix
Matrix, The (1999) 3.5
Matrix, The (1999) 4.0
Matrix, The (1999) 2.5
Matrix, The (1999) 4.5
Matrix, The (1999) 2.0
The above adjusted command lists five possible instances of review for The Matrix. We also modify the reducer to include the values of current_rating_sum and current_rating_count in the output report:
print ("%s\t%s\t%s\t%s" % (current_movie, rating_average, current_rating_sum, current_rating_count))
Testing this output against the reducer, we observe that our reducer's operations are correct.
!ssh dsciu001 hdfs dfs -cat /repository/movielens/ratings.csv 2>/dev/null | head -n 1000 | python mapper03.py | grep Matrix | sort | python reducer01.py
Matrix, The (1999) 3.3 16.5 5
While the reducer code uses variables such as movie and current_movie, in principle, this reducer can take any set of KEY/GROUP OF VALUES and calculate the average value of this KEY. Use this reducer together with mapGenre.py to test the finding of rating averages of movie genres.