-
Notifications
You must be signed in to change notification settings - Fork 12
/
WordCount.scala
62 lines (56 loc) · 2.3 KB
/
WordCount.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.Reducer
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.util.GenericOptionsParser
import scala.collection.JavaConversions._
// This class performs the map operation, translating raw input into the key-value
// pairs we will feed into our reduce operation.
class TokenizerMapper extends Mapper[Object,Text,Text,IntWritable] {
val one = new IntWritable(1)
val word = new Text
override
def map(key:Object, value:Text, context:Mapper[Object,Text,Text,IntWritable]#Context) = {
for (t <- value.toString().split("\\s")) {
word.set(t)
context.write(word, one)
}
}
}
// This class performs the reduce operation, iterating over the key-value pairs
// produced by our map operation to produce a result. In this case we just
// calculate a simple total for each word seen.
class IntSumReducer extends Reducer[Text,IntWritable,Text,IntWritable] {
override
def reduce(key:Text, values:java.lang.Iterable[IntWritable], context:Reducer[Text,IntWritable,Text,IntWritable]#Context) = {
val sum = values.foldLeft(0) { (t,i) => t + i.get }
context.write(key, new IntWritable(sum))
}
}
// This class configures and runs the job with the map and reduce classes we've
// specified above.
object WordCount {
def main(args:Array[String]):Int = {
val conf = new Configuration()
val otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs
if (otherArgs.length != 2) {
println("Usage: wordcount <in> <out>")
return 2
}
val job = new Job(conf, "word count")
job.setJarByClass(classOf[TokenizerMapper])
job.setMapperClass(classOf[TokenizerMapper])
job.setCombinerClass(classOf[IntSumReducer])
job.setReducerClass(classOf[IntSumReducer])
job.setOutputKeyClass(classOf[Text])
job.setOutputValueClass(classOf[IntWritable])
FileInputFormat.addInputPath(job, new Path(args(0)))
FileOutputFormat.setOutputPath(job, new Path((args(1))))
if (job.waitForCompletion(true)) 0 else 1
}
}