MapReduce当中Combiner的用法

马克-to-win @ 马克java社区:防盗版实名手机尾号:73203。在上一章的helloworld例子中,每一个map都可能会产生大量的本地输出,这些输出会通过网络到达reducer端,这样会非常浪费带宽。解决这个问题可以通过Combiner。Combiner的作用就是对map端的输出先做一次合并,是 MapReduce的一种优化手段之一。



package com;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class WordCountCombinerMark_to_win {
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private IntWritable one = new IntWritable(1);
        private Text word = new Text();
/*org.apache.hadoop.mapreduce.Mapper.Context,java.lang.InterruptedException, 想看map的源代码,按control,点击,出现Attach Source Code,点击External Location/External File,找到源代码,就在Source目录下*/
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            System.out.println("key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:73203"+key.toString()+" value is "+value.toString());            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class CombinerClass extends Reducer<Text,IntWritable,Text,IntWritable>
    {
         private IntWritable result = new IntWritable();
         public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
         {
             System.out.println("combiner key is "+key.toString());
             int sum = 0;
             for (IntWritable val : values)
             {
                 System.out.println("combine val is "+val.toString());
                 sum += val.get();//对相同的单词数量进行累加
             }
             result.set(sum);
             context.write(key, result);
         }
    }

  
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            System.out.println("reduce key is "+key.toString());
            int sum = 0;
            for (IntWritable val : values) {
                System.out.println("val is "+val.toString());
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCountCombinerMark_to_win.class);
        job.setMapperClass(TokenizerMapper.class);
(购买完整教程)
 //       job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/README.txt"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output2"));
        System.out.println("mytest hadoop successful");
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}




输出结果:

mytest hadoop successful
 INFO - session.id is deprecated. Instead, use dfs.metrics.session-id
 INFO - Initializing JVM Metrics with processName=JobTracker, sessionId=
 WARN - No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
 INFO - Total input paths to process : 1
 INFO - number of splits:1
 INFO - Submitting tokens for job: job_local1963823567_0001
 INFO - The url to track the job: http://localhost:8080/
 INFO - OutputCommitter set in config null
 INFO - Running job: job_local1963823567_0001
 INFO - File Output Committer Algorithm version is 1
 INFO - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
 INFO - Waiting for map tasks
 INFO - Starting task: attempt_local1963823567_0001_m_000000_0
 INFO - File Output Committer Algorithm version is 1
 INFO - ProcfsBasedProcessTree currently is supported only on Linux.
 INFO -  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@af988b
 INFO - Processing split: hdfs://localhost:9000/README.txt:0+41
 INFO - (EQUATOR) 0 kvi 26214396(104857584)
 INFO - mapreduce.task.io.sort.mb: 100
 INFO - soft limit at 83886080
 INFO - bufstart = 0; bufvoid = 104857600
 INFO - kvstart = 26214396; length = 6553600
 INFO - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:732030 value is hello a hello win
key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:7320319 value is hello a to
key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:7320331 value is hello mark
 INFO -
 INFO - Starting flush of map output
 INFO - Spilling map output
 INFO - bufstart = 0; bufend = 76; bufvoid = 104857600
 INFO - kvstart = 26214396(104857584); kvend = 26214364(104857456); length = 33/6553600
combiner key is a
combine val is 1
combine val is 1
combiner key is hello
combine val is 1
combine val is 1
combine val is 1
combine val is 1
combiner key is mark
combine val is 1
combiner key is to
combine val is 1
combiner key is win
combine val is 1
 INFO - Finished spill 0
 INFO - Task:attempt_local1963823567_0001_m_000000_0 is done. And is in the process of committing
 INFO - map
 INFO - Task 'attempt_local1963823567_0001_m_000000_0' done.
 INFO - Finishing task: attempt_local1963823567_0001_m_000000_0
 INFO - map task executor complete.
 INFO - Waiting for reduce tasks
 INFO - Starting task: attempt_local1963823567_0001_r_000000_0
 INFO - File Output Committer Algorithm version is 1
 INFO - ProcfsBasedProcessTree currently is supported only on Linux.
 INFO - Job job_local1963823567_0001 running in uber mode : false
 INFO -  map 100% reduce 0%
 INFO -  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@7ec6b2
 INFO - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@15fb15a
 INFO - MergerManager: memoryLimit=181665792, maxSingleShuffleLimit=45416448, mergeThreshold=119899424, ioSortFactor=10, memToMemMergeOutputsThreshold=10
 INFO - attempt_local1963823567_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
 INFO - localfetcher#1 about to shuffle output of map attempt_local1963823567_0001_m_000000_0 decomp: 52 len: 56 to MEMORY
 INFO - Read 52 bytes from map-output for attempt_local1963823567_0001_m_000000_0
 INFO - closeInMemoryFile -> map-output of size: 52, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->52
 INFO - EventFetcher is interrupted.. Returning
 INFO - 1 / 1 copied.
 INFO - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
 INFO - Merging 1 sorted segments
 INFO - Down to the last merge-pass, with 1 segments left of total size: 48 bytes
 INFO - Merged 1 segments, 52 bytes to disk to satisfy reduce memory limit
 INFO - Merging 1 files, 56 bytes from disk
 INFO - Merging 0 segments, 0 bytes from memory into reduce
 INFO - Merging 1 sorted segments
 INFO - Down to the last merge-pass, with 1 segments left of total size: 48 bytes
 INFO - 1 / 1 copied.
 INFO - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
reduce key is a
val is 2
reduce key is hello
val is 4
reduce key is mark
val is 1
reduce key is to
val is 1
reduce key is win
val is 1
 INFO - Task:attempt_local1963823567_0001_r_000000_0 is done. And is in the process of committing
 INFO - 1 / 1 copied.
 INFO - Task attempt_local1963823567_0001_r_000000_0 is allowed to commit now
 INFO - Saved output of task 'attempt_local1963823567_0001_r_000000_0' to hdfs://localhost:9000/output2/_temporary/0/task_local1963823567_0001_r_000000
 INFO - reduce > reduce
 INFO - Task 'attempt_local1963823567_0001_r_000000_0' done.
 INFO - Finishing task: attempt_local1963823567_0001_r_000000_0
 INFO - reduce task executor complete.
 INFO -  map 100% reduce 100%
 INFO - Job job_local1963823567_0001 completed successfully
 INFO - Counters: 35
    File System Counters
        FILE: Number of bytes read=454
        FILE: Number of bytes written=604038
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=82
        HDFS: Number of bytes written=30
        HDFS: Number of read operations=13
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=4
    Map-Reduce Framework
        Map input records=3
        Map output records=9
        Map output bytes=76
        Map output materialized bytes=56
        Input split bytes=97
        Combine input records=9
        Combine output records=5
        Reduce input groups=5
        Reduce shuffle bytes=56
        Reduce input records=5
        Reduce output records=5
        Spilled Records=10
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=50
        Total committed heap usage (bytes)=243384320
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters
        Bytes Read=41
    File Output Format Counters
        Bytes Written=30




当两个map和combiner相遇:

package com;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCountTwoFileCombiner {
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            System.out.println("key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:73203"+key.toString()+" value is "+value.toString());
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

  
    public static class CombinerClass extends Reducer<Text,IntWritable,Text,IntWritable>
    {
         private IntWritable result = new IntWritable();
         public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
         {
             System.out.println("combiner key is "+key.toString());
             int sum = 0;
             for (IntWritable val : values)
             {
                 System.out.println("combine val is "+val.toString());
                 sum += val.get();//对相同的单词数量进行累加
             }
             result.set(sum);
             context.write(key, result);
         }
    }
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            System.out.println("reduce key is "+key.toString());
            int sum = 0;
            for (IntWritable val : values) {
                System.out.println("reduce val is "+val.toString());
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCountTwoFileCombiner.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(CombinerClass.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/README.txt"));
        FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/README1.txt"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output2"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}




输出结果:

 INFO - session.id is deprecated. Instead, use dfs.metrics.session-id
 INFO - Initializing JVM Metrics with processName=JobTracker, sessionId=
 WARN - No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
 INFO - Total input paths to process : 2
 INFO - number of splits:2
 INFO - Submitting tokens for job: job_local244211701_0001
 INFO - The url to track the job: http://localhost:8080/
 INFO - Running job: job_local244211701_0001
 INFO - OutputCommitter set in config null
 INFO - File Output Committer Algorithm version is 1
 INFO - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
 INFO - Waiting for map tasks
 INFO - Starting task: attempt_local244211701_0001_m_000000_0
 INFO - File Output Committer Algorithm version is 1
 INFO - ProcfsBasedProcessTree currently is supported only on Linux.
 INFO -  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@9a1b89
 INFO - Processing split: hdfs://localhost:9000/README.txt:0+41
 INFO - (EQUATOR) 0 kvi 26214396(104857584)
 INFO - mapreduce.task.io.sort.mb: 100
 INFO - soft limit at 83886080
 INFO - bufstart = 0; bufvoid = 104857600
 INFO - kvstart = 26214396; length = 6553600
 INFO - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:732030 value is hello a hello win
key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:7320319 value is hello a to
key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:7320331 value is hello mark
 INFO -
 INFO - Starting flush of map output
 INFO - Spilling map output
 INFO - bufstart = 0; bufend = 76; bufvoid = 104857600
 INFO - kvstart = 26214396(104857584); kvend = 26214364(104857456); length = 33/6553600
combiner key is a
combine val is 1
combine val is 1
combiner key is hello
combine val is 1
combine val is 1
combine val is 1
combine val is 1
combiner key is mark
combine val is 1
combiner key is to
combine val is 1
combiner key is win
combine val is 1
 INFO - Finished spill 0
 INFO - Task:attempt_local244211701_0001_m_000000_0 is done. And is in the process of committing
 INFO - map
 INFO - Task 'attempt_local244211701_0001_m_000000_0' done.
 INFO - Finishing task: attempt_local244211701_0001_m_000000_0
 INFO - Starting task: attempt_local244211701_0001_m_000001_0
 INFO - File Output Committer Algorithm version is 1
 INFO - ProcfsBasedProcessTree currently is supported only on Linux.
 INFO -  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@147ad13
 INFO - Processing split: hdfs://localhost:9000/README1.txt:0+28
 INFO - (EQUATOR) 0 kvi 26214396(104857584)
 INFO - mapreduce.task.io.sort.mb: 100
 INFO - soft limit at 83886080
 INFO - bufstart = 0; bufvoid = 104857600
 INFO - kvstart = 26214396; length = 6553600
 INFO - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:732030 value is hello zhangsan
key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:7320316 value is hello a lisi
 INFO -
 INFO - Starting flush of map output
 INFO - Spilling map output
 INFO - bufstart = 0; bufend = 48; bufvoid = 104857600
 INFO - kvstart = 26214396(104857584); kvend = 26214380(104857520); length = 17/6553600
combiner key is a
combine val is 1
combiner key is hello
combine val is 1
combine val is 1
combiner key is lisi
combine val is 1
combiner key is zhangsan
combine val is 1
 INFO - Finished spill 0
 INFO - Job job_local244211701_0001 running in uber mode : false
 INFO - Task:attempt_local244211701_0001_m_000001_0 is done. And is in the process of committing
 INFO -  map 50% reduce 0%
 INFO - map
 INFO - Task 'attempt_local244211701_0001_m_000001_0' done.
 INFO - Finishing task: attempt_local244211701_0001_m_000001_0
 INFO - map task executor complete.
 INFO - Waiting for reduce tasks
 INFO - Starting task: attempt_local244211701_0001_r_000000_0
 INFO - File Output Committer Algorithm version is 1
 INFO - ProcfsBasedProcessTree currently is supported only on Linux.
 INFO -  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@1a2d165
 INFO - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@c45226
 INFO - MergerManager: memoryLimit=181665792, maxSingleShuffleLimit=45416448, mergeThreshold=119899424, ioSortFactor=10, memToMemMergeOutputsThreshold=10
 INFO - attempt_local244211701_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
 INFO - localfetcher#1 about to shuffle output of map attempt_local244211701_0001_m_000000_0 decomp: 52 len: 56 to MEMORY
 INFO - Read 52 bytes from map-output for attempt_local244211701_0001_m_000000_0
 INFO - closeInMemoryFile -> map-output of size: 52, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->52
 INFO - localfetcher#1 about to shuffle output of map attempt_local244211701_0001_m_000001_0 decomp: 48 len: 52 to MEMORY
 INFO - Read 48 bytes from map-output for attempt_local244211701_0001_m_000001_0
 INFO - closeInMemoryFile -> map-output of size: 48, inMemoryMapOutputs.size() -> 2, commitMemory -> 52, usedMemory ->100
 INFO - EventFetcher is interrupted.. Returning
 INFO - 2 / 2 copied.
 INFO - finalMerge called with 2 in-memory map-outputs and 0 on-disk map-outputs
 INFO - Merging 2 sorted segments
 INFO - Down to the last merge-pass, with 2 segments left of total size: 92 bytes
 INFO - Merged 2 segments, 100 bytes to disk to satisfy reduce memory limit
 INFO - Merging 1 files, 102 bytes from disk
 INFO - Merging 0 segments, 0 bytes from memory into reduce
 INFO - Merging 1 sorted segments
 INFO - Down to the last merge-pass, with 1 segments left of total size: 94 bytes
 INFO - 2 / 2 copied.
 INFO - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
reduce key is a
reduce val is 1
reduce val is 2
reduce key is hello
reduce val is 4
reduce val is 2
reduce key is lisi
reduce val is 1
reduce key is mark
reduce val is 1
reduce key is to
reduce val is 1
reduce key is win
reduce val is 1
reduce key is zhangsan
reduce val is 1
 INFO - Task:attempt_local244211701_0001_r_000000_0 is done. And is in the process of committing
 INFO - 2 / 2 copied.
 INFO - Task attempt_local244211701_0001_r_000000_0 is allowed to commit now
 INFO - Saved output of task 'attempt_local244211701_0001_r_000000_0' to hdfs://localhost:9000/output2/_temporary/0/task_local244211701_0001_r_000000
 INFO - reduce > reduce
 INFO - Task 'attempt_local244211701_0001_r_000000_0' done.
 INFO - Finishing task: attempt_local244211701_0001_r_000000_0
 INFO - reduce task executor complete.
 INFO -  map 100% reduce 100%
 INFO - Job job_local244211701_0001 completed successfully
 INFO - Counters: 35
    File System Counters
        FILE: Number of bytes read=1512
        FILE: Number of bytes written=902094
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=179
        HDFS: Number of bytes written=48
        HDFS: Number of read operations=28
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=5
    Map-Reduce Framework
        Map input records=5
        Map output records=14
        Map output bytes=124
        Map output materialized bytes=108
        Input split bytes=195
        Combine input records=14
        Combine output records=9
        Reduce input groups=7
        Reduce shuffle bytes=108
        Reduce input records=9
        Reduce output records=7
        Spilled Records=18
        Shuffled Maps =2
        Failed Shuffles=0
        Merged Map outputs=2
        GC time elapsed (ms)=63
        Total committed heap usage (bytes)=460161024
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters
        Bytes Read=69
    File Output Format Counters
        Bytes Written=48