MapReduce当中的reduce当中的cleanup的用法

reduce当中的cleanup的用法:
马克- to-win:马克 java社区:防盗版实名手机尾号: 73203。
马克-to-win @ 马克java社区:上面的topN是解决每个组里的topN,比如每个订单中的最小的。但如果需要横向的比较所有的key(初学者忽略:cleanup方法慎用, 如果所有的key的数据巨大量怎么办?Map map = new HashMap();内存都不够了,所以考虑多步mapreduce或聪明的算法做法,每处理完一个key,就删除一些没用的空间,比如把不是topN的给删除了,这样可以释放一部分空间),选出topN,得用cleanup。

马克-to-win @ 马克java社区:从现在开始,我们讲一些特殊用法,我们知道,map是读一行执行一次,reduce是对每一key,或一组,执行一次。但是如果需求是当我们得到全部数据之后,需要再做一些处理,之后再输出怎么办?这时候setUp或cleanUp就登场了,他们像servlet的init和 destroy一样都只执行一次。map和reduce都有setUp或cleanUp,原理一样。我们只拿reduce做例子。

马克-to-win @ 马克java社区:这样对于最终数据的过滤筛选和输出步骤,要放在cleanUp中。前面我们的例子都是一行一行(对于map),一组一组(对于 reduce)输出,借助cleanup,我们可以全部拿到数据,完全按照java过去的算法,最后过滤输出。下面我们用它解决topN问题。

还以wordcount为例,求出单词出现数量前三名。




package com;

import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ReduceSetupTestMark_to_win {

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private IntWritable one = new IntWritable(1);
        private Text word = new Text();
        /*
          hello a hello win
          hello a to
          hello mark
         */
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            System.out.println("key is " + key.toString() + " value is " + value.toString());
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        Map map = new HashMap();
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            System.out.println("reduce key is " + key.toString());
            int sum = 0;
            for (IntWritable val : values) {
                System.out.println("val is " + val.toString());
                sum += val.get();
            }
            String keyString = key.toString();
            map.put(keyString, sum);
            // System.out.println("val is " + val.toString());
            // result.set(sum);
            // context.write(key, result);
        }

        protected void cleanup(Context context) throws IOException, InterruptedException {
            /* 放在一个List 当中, 之后排这个list, */
/*1)List<Integer> list = new ArrayList<Integer>();
2)public ArrayList(Collection<? extends E> c)
有两种类型的构造函数。
*/  
            List<Map.Entry<String, Integer>> list = new ArrayList<Map.Entry<String, Integer>>(map.entrySet());
            Collections.sort(list, new Comparator<Map.Entry<String, Integer>>() {
                public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {
                    return -1 * (o1.getValue().compareTo(o2.getValue()));
                }
            });
            for (int i = 0; i < 3; i++) {
                context.write(new Text(list.get(i).getKey()), new IntWritable(list.get(i).getValue()));
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "word count");
        job.setJarByClass(ReduceSetupTest.class);
        job.setMapperClass(TokenizerMapper.class);
        // job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        File file = new File("e:/temp/output");
        if (file.exists() && file.isDirectory()) {
            deleteFile(file);
        }

        FileInputFormat.setInputPaths(job, new Path("e:/temp/input/cleanup.txt"));
        FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));

        System.out.println("mytest hadoop successful");
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static boolean deleteFile(File dirFile) {
        if (!dirFile.exists()) {
            return false;
        }
        if (dirFile.isFile()) {
            return dirFile.delete();
        } else { /* 空目录就不进入for循环了, 进入到下一句dirFile.delete(); */
            for (File file : dirFile.listFiles()) {
                deleteFile(file);
            }
        }
        return dirFile.delete();
    }

}




输出结果:
reduce key is a
val is 1
val is 1
reduce key is hello
val is 1
val is 1
val is 1
val is 1
reduce key is mark
val is 1
reduce key is to
val is 1
reduce key is win
val is 1


输出的文件 :

hello    4
a    2
to    1