MapReduce基础

本章讲了围绕着Mapreduce知识点的相关14个问题,学过后可以基本胜任MapReduce编程工作。

MapReduce的输入文件是两个

对于MapReduce程序,如何输入文件是两个文件?
马 克-to-win @ 马克java社区:这一小节,我们将继续第一章大数据入门的HelloWorld例子做进一步的研究。看一看split有两个的这种情况,这里,同时我们研究如何输入文件是两个文件。
马克- to-win:马克 java社区:防盗版实名手机尾号: 73203。





package com;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCountTwoFile {
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            System.out.println("key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:73203"+key.toString()+" value is "+value.toString());
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            System.out.println("reduce key is "+key.toString());
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCountTwoFile.class);
        job.setMapperClass(TokenizerMapper.class);
//        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/README.txt"));
(购买完整教程)
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output2"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}







当然如果程序不硬编码的话,用上一章的配置运行参数方法也可以运行。

右击项目名称,选择“run as”-“run configurations”,在“Arguments”里加入三个参数
hdfs://localhost:9000/README.txt
hdfs://localhost:9000/README1.txt
hdfs://localhost:9000/output2



然后点击“Run”即可运行。

6)结果查看

打开新生成的part-r-00000文件:

a    3
hello    6
lisi    1
mark    1
to    1
win    1
zhangsan    1




里面给出了两个文件中每个字的出现次数。

源文件readme.txt:

hello a hello win
hello a to
hello mark

源文件readme1.txt:

hello zhangsan
hello a lisi




执行结果是:

otherArgs is hdfs://localhost:9000/README.txthdfs://localhost:9000/README1.txt
mytest hadoop successful
 INFO - session.id is deprecated. Instead, use dfs.metrics.session-id
 INFO - Initializing JVM Metrics with processName=JobTracker, sessionId=
 WARN - No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
 INFO - Total input paths to process : 2
 INFO - number of splits:2
 INFO - Submitting tokens for job: job_local358187217_0001
 INFO - The url to track the job: http://localhost:8080/
 INFO - Running job: job_local358187217_0001
 INFO - OutputCommitter set in config null
 INFO - File Output Committer Algorithm version is 1
 INFO - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
 INFO - Waiting for map tasks
 INFO - Starting task: attempt_local358187217_0001_m_000000_0
 INFO - File Output Committer Algorithm version is 1
 INFO - ProcfsBasedProcessTree currently is supported only on Linux.
 INFO -  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@af988b
 INFO - Processing split: hdfs://localhost:9000/README.txt:0+41
 INFO - Job job_local358187217_0001 running in uber mode : false
 INFO -  map 0% reduce 0%
 INFO - (EQUATOR) 0 kvi 26214396(104857584)
 INFO - mapreduce.task.io.sort.mb: 100
 INFO - soft limit at 83886080
 INFO - bufstart = 0; bufvoid = 104857600
 INFO - kvstart = 26214396; length = 6553600
 INFO - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
key is 马克-to-win @ 马克java社区:0 value is hello a hello win
key is 马克-to-win @ 马克java社区:19 value is hello a to
key is 马克-to-win @ 马克java社区:31 value is hello mark
 INFO -
 INFO - Starting flush of map output
 INFO - Spilling map output
 INFO - bufstart = 0; bufend = 76; bufvoid = 104857600
 INFO - kvstart = 26214396(104857584); kvend = 26214364(104857456); length = 33/6553600
 INFO - Finished spill 0
 INFO - Task:attempt_local358187217_0001_m_000000_0 is done. And is in the process of committing
 INFO - map
 INFO - Task 'attempt_local358187217_0001_m_000000_0' done.
 INFO - Finishing task: attempt_local358187217_0001_m_000000_0
 INFO - Starting task: attempt_local358187217_0001_m_000001_0
 INFO - File Output Committer Algorithm version is 1
 INFO - ProcfsBasedProcessTree currently is supported only on Linux.
 INFO -  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@45618a
 INFO - Processing split: hdfs://localhost:9000/README1.txt:0+28
 INFO - (EQUATOR) 0 kvi 26214396(104857584)
 INFO - mapreduce.task.io.sort.mb: 100
 INFO - soft limit at 83886080
 INFO - bufstart = 0; bufvoid = 104857600
 INFO - kvstart = 26214396; length = 6553600
 INFO - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
key is 0 value is hello zhangsan
key is 16 value is hello a lisi
 INFO -
 INFO - Starting flush of map output
 INFO - Spilling map output
 INFO - bufstart = 0; bufend = 48; bufvoid = 104857600
 INFO - kvstart = 26214396(104857584); kvend = 26214380(104857520); length = 17/6553600
 INFO - Finished spill 0
 INFO - Task:attempt_local358187217_0001_m_000001_0 is done. And is in the process of committing
 INFO - map
 INFO - Task 'attempt_local358187217_0001_m_000001_0' done.
 INFO - Finishing task: attempt_local358187217_0001_m_000001_0
 INFO - map task executor complete.
 INFO - Waiting for reduce tasks
 INFO - Starting task: attempt_local358187217_0001_r_000000_0
 INFO - File Output Committer Algorithm version is 1
 INFO - ProcfsBasedProcessTree currently is supported only on Linux.
 INFO -  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@743b96
 INFO - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@12ec0de
 INFO - MergerManager: memoryLimit=181665792, maxSingleShuffleLimit=45416448, mergeThreshold=119899424, ioSortFactor=10, memToMemMergeOutputsThreshold=10
 INFO - attempt_local358187217_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
 INFO - localfetcher#1 about to shuffle output of map attempt_local358187217_0001_m_000001_0 decomp: 60 len: 64 to MEMORY
 INFO - Read 60 bytes from map-output for attempt_local358187217_0001_m_000001_0
 INFO - closeInMemoryFile -> map-output of size: 60, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->60
 INFO - localfetcher#1 about to shuffle output of map attempt_local358187217_0001_m_000000_0 decomp: 96 len: 100 to MEMORY
 INFO - Read 96 bytes from map-output for attempt_local358187217_0001_m_000000_0
 INFO - closeInMemoryFile -> map-output of size: 96, inMemoryMapOutputs.size() -> 2, commitMemory -> 60, usedMemory ->156
 INFO - EventFetcher is interrupted.. Returning
 INFO - 2 / 2 copied.
 INFO - finalMerge called with 2 in-memory map-outputs and 0 on-disk map-outputs
 INFO - Merging 2 sorted segments
 INFO - Down to the last merge-pass, with 2 segments left of total size: 148 bytes
 INFO - Merged 2 segments, 156 bytes to disk to satisfy reduce memory limit
 INFO - Merging 1 files, 158 bytes from disk
 INFO - Merging 0 segments, 0 bytes from memory into reduce
 INFO - Merging 1 sorted segments
 INFO - Down to the last merge-pass, with 1 segments left of total size: 150 bytes
 INFO - 2 / 2 copied.
 INFO - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
reduce key is a
val is 1
val is 1
val is 1
reduce key is hello
val is 1
val is 1
val is 1
val is 1
val is 1
val is 1
reduce key is lisi
val is 1
reduce key is mark
val is 1
reduce key is to
val is 1
reduce key is win
val is 1
reduce key is zhangsan
val is 1
 INFO -  map 100% reduce 0%
 INFO - Task:attempt_local358187217_0001_r_000000_0 is done. And is in the process of committing
 INFO - 2 / 2 copied.
 INFO - Task attempt_local358187217_0001_r_000000_0 is allowed to commit now
 INFO - Saved output of task 'attempt_local358187217_0001_r_000000_0' to hdfs://localhost:9000/output13/_temporary/0/task_local358187217_0001_r_000000
 INFO - reduce > reduce
 INFO - Task 'attempt_local358187217_0001_r_000000_0' done.
 INFO - Finishing task: attempt_local358187217_0001_r_000000_0
 INFO - reduce task executor complete.
 INFO -  map 100% reduce 100%
 INFO - Job job_local358187217_0001 completed successfully
 INFO - Counters: 35
    File System Counters
        FILE: Number of bytes read=1624
        FILE: Number of bytes written=900830
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=179
        HDFS: Number of bytes written=48
        HDFS: Number of read operations=28
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=5
    Map-Reduce Framework
        Map input records=5
        Map output records=14
        Map output bytes=124
        Map output materialized bytes=164
        Input split bytes=195
        Combine input records=0
        Combine output records=0
        Reduce input groups=7
        Reduce shuffle bytes=164
        Reduce input records=14
        Reduce output records=7
        Spilled Records=28
        Shuffled Maps =2
        Failed Shuffles=0
        Merged Map outputs=2
        GC time elapsed (ms)=65
        Total committed heap usage (bytes)=461156352
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters
        Bytes Read=69
    File Output Format Counters
        Bytes Written=48