• 例子:词频统计 WordCount 程序

    例子:词频统计 WordCount 程序

    下面是 Hadoop 提供的词频统计 WordCount 程序 示例。运行运行改程序之前,请确保 HDFS 已经启动。

    1. import java.io.BufferedReader;
    2. import java.io.FileReader;
    3. import java.io.IOException;
    4. import java.net.URI;
    5. import java.util.ArrayList;
    6. import java.util.HashSet;
    7. import java.util.List;
    8. import java.util.Set;
    9. import java.util.StringTokenizer;
    10. import org.apache.hadoop.conf.Configuration;
    11. import org.apache.hadoop.fs.Path;
    12. import org.apache.hadoop.io.IntWritable;
    13. import org.apache.hadoop.io.Text;
    14. import org.apache.hadoop.mapreduce.Job;
    15. import org.apache.hadoop.mapreduce.Mapper;
    16. import org.apache.hadoop.mapreduce.Reducer;
    17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    19. import org.apache.hadoop.mapreduce.Counter;
    20. import org.apache.hadoop.util.GenericOptionsParser;
    21. import org.apache.hadoop.util.StringUtils;
    22. public class WordCount2 {
    23. public static class TokenizerMapper
    24. extends Mapper<Object, Text, Text, IntWritable>{
    25. static enum CountersEnum { INPUT_WORDS }
    26. private final static IntWritable one = new IntWritable(1);
    27. private Text word = new Text();
    28. private boolean caseSensitive;
    29. private Set<String> patternsToSkip = new HashSet<String>();
    30. private Configuration conf;
    31. private BufferedReader fis;
    32. @Override
    33. public void setup(Context context) throws IOException,
    34. InterruptedException {
    35. conf = context.getConfiguration();
    36. caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
    37. if (conf.getBoolean("wordcount.skip.patterns", true)) {
    38. URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
    39. for (URI patternsURI : patternsURIs) {
    40. Path patternsPath = new Path(patternsURI.getPath());
    41. String patternsFileName = patternsPath.getName().toString();
    42. parseSkipFile(patternsFileName);
    43. }
    44. }
    45. }
    46. private void parseSkipFile(String fileName) {
    47. try {
    48. fis = new BufferedReader(new FileReader(fileName));
    49. String pattern = null;
    50. while ((pattern = fis.readLine()) != null) {
    51. patternsToSkip.add(pattern);
    52. }
    53. } catch (IOException ioe) {
    54. System.err.println("Caught exception while parsing the cached file '"
    55. + StringUtils.stringifyException(ioe));
    56. }
    57. }
    58. @Override
    59. public void map(Object key, Text value, Context context
    60. ) throws IOException, InterruptedException {
    61. String line = (caseSensitive) ?
    62. value.toString() : value.toString().toLowerCase();
    63. for (String pattern : patternsToSkip) {
    64. line = line.replaceAll(pattern, "");
    65. }
    66. StringTokenizer itr = new StringTokenizer(line);
    67. while (itr.hasMoreTokens()) {
    68. word.set(itr.nextToken());
    69. context.write(word, one);
    70. Counter counter = context.getCounter(CountersEnum.class.getName(),
    71. CountersEnum.INPUT_WORDS.toString());
    72. counter.increment(1);
    73. }
    74. }
    75. }
    76. public static class IntSumReducer
    77. extends Reducer<Text,IntWritable,Text,IntWritable> {
    78. private IntWritable result = new IntWritable();
    79. public void reduce(Text key, Iterable<IntWritable> values,
    80. Context context
    81. ) throws IOException, InterruptedException {
    82. int sum = 0;
    83. for (IntWritable val : values) {
    84. sum += val.get();
    85. }
    86. result.set(sum);
    87. context.write(key, result);
    88. }
    89. }
    90. public static void main(String[] args) throws Exception {
    91. Configuration conf = new Configuration();
    92. GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    93. String[] remainingArgs = optionParser.getRemainingArgs();
    94. if (!(remainingArgs.length != 2 | | remainingArgs.length != 4)) {
    95. System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
    96. System.exit(2);
    97. }
    98. Job job = Job.getInstance(conf, "word count");
    99. job.setJarByClass(WordCount2.class);
    100. job.setMapperClass(TokenizerMapper.class);
    101. job.setCombinerClass(IntSumReducer.class);
    102. job.setReducerClass(IntSumReducer.class);
    103. job.setOutputKeyClass(Text.class);
    104. job.setOutputValueClass(IntWritable.class);
    105. List<String> otherArgs = new ArrayList<String>();
    106. for (int i=0; i < remainingArgs.length; ++i) {
    107. if ("-skip".equals(remainingArgs[i])) {
    108. job.addCacheFile(new Path(remainingArgs[++i]).toUri());
    109. job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
    110. } else {
    111. otherArgs.add(remainingArgs[i]);
    112. }
    113. }
    114. FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
    115. FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
    116. System.exit(job.waitForCompletion(true) ? 0 : 1);
    117. }
    118. }

    待输入的样本文件如下:

    1. $ bin/hadoop fs -ls /user/joe/wordcount/input/
    2. /user/joe/wordcount/input/file01
    3. /user/joe/wordcount/input/file02
    4. $ bin/hadoop fs -cat /user/joe/wordcount/input/file01
    5. Hello World, Bye World!
    6. $ bin/hadoop fs -cat /user/joe/wordcount/input/file02
    7. Hello Hadoop, Goodbye to hadoop.

    运行程序:

    1. $ bin/hadoop jar wc.jar WordCount2 /user/joe/wordcount/input /user/joe/wordcount/output

    输出如下:

    1. $ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
    2. Bye 1
    3. Goodbye 1
    4. Hadoop, 1
    5. Hello 2
    6. World! 1
    7. World, 1
    8. hadoop. 1
    9. to 1

    通过 DistributedCache 来设置单词过滤的策略:

    1. $ bin/hadoop fs -cat /user/joe/wordcount/patterns.txt
    2. \.
    3. \,
    4. \!
    5. to

    再次运行,这次增加了更多的选项:

    1. $ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=true /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

    输出如下:

    1. $ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
    2. Bye 1
    3. Goodbye 1
    4. Hadoop 1
    5. Hello 2
    6. World 2
    7. hadoop 1

    再次运行,这次去掉了大小写敏感:

    1. $ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=false /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

    输出如下:

    1. $ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
    2. bye 1
    3. goodbye 1
    4. hadoop 2
    5. hello 2
    6. horld 2