MapReduce当中自定义对象的用法

自定义对象:
马克- to-win:马克 java社区:防盗版实名手机尾号: 73203。
马克-to-win @ 马克java社区:到目前为止,我们在hadoop网络上传的变量类型都是预定义的类型比如Text或IntWritable等,但有时需要我们,自己建一个类,把预定义的简单数据类型封装在里头而且还能像预定义的类型一样在hadoop网络中传输,这样更便于管理和运作。这样就需要向下面这样 implements Writable。实现write和readFields方法。思路:如果想求平均值的话,按照前一章讲的hello world方法,同一个键的所有值,同时都进同一个reduce方法,这样的话,我们可以以字符串的形式,把o1,p2,250.0和o1,p1,200.0两个字符串同时传到同一个 reduce之后,经过处理,求和求平均值就可以了,但是这种方法毕竟显得有点笨,(但基本能解决所有问题),所以我们可以用这一节所学的自定义对象的方法传值。显得高大上。讲述本节时,可以先讲一遍,明白了后,再讲一遍,说明为什么这么写程序。
 



需求:订单
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0

求出每个订单的平均值:

o1    o1    0.0    225.0
o2    o2    0.0    433.3333333333333
o3    o3    0.0    150.0



package com;

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

(购买完整教程)
    private String orderId;
    private Double amount;
/* 为了防止出现空指针,我们给后面不存在的变量赋予默认值*/
    private Double average=0.0;

    public Double getAverage() {
        return average;
    }

    public void setAverage(Double average) {
        this.average = average;
    }

    public OrderBeanSerial() {
    }

    public OrderBeanSerial(String orderId, Double amount) {
        this.orderId = orderId;
        this.amount = amount;
    }
    public void set(String orderId, Double amount) {
        this.orderId = orderId;
        this.amount = amount;
    }
    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Double getAmount() {
        return amount;
    }

    public void setAmount(Double amount) {
        this.amount = amount;
    }



    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeDouble(amount);
        out.writeDouble(average);
    }

    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.amount = in.readDouble();
        this.average = in.readDouble();
    }
/*最后reduce输出时用这里。既然输出在这里,输出要求有平均值,那当然要有average这一项,
否则这里写成sum/num,sum和num参数不好传过来。*/
    public String toString() {
        return orderId + "\t" + amount+ "\t" +average;
    }
}





package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class SerialMapper extends Mapper<LongWritable, Text, Text,OrderBeanSerial>{
    OrderBeanSerial bean = new OrderBeanSerial();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split(",");
        String orderId = fields[0];
        double amount = Double.parseDouble(fields[2]);
        bean.set(orderId, amount);
        context.write(new Text(orderId),bean);
    }
}



package com;

import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import org.apache.hadoop.io.Text;

public class SerialReducer extends Reducer<Text,OrderBeanSerial, Text, OrderBeanSerial>{

    protected void reduce(Text key, Iterable<OrderBeanSerial> values, Context context) throws IOException, InterruptedException {
        System.out.println("reduce key is 马克-to-win @ 马克java社区:"+key.toString());
/*  对于下面这组数据, 像helloworld一样,hello[1,1,1,1]一起进入reduce,这里是o1[o1,250.0,o1,200.0]一起进入reduce,下面的for有体现。在for中就好求平均值了。
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0

*/      
        Double d,sum=0.0;
        int num=0;
        for (OrderBeanSerial val : values) {
            d=val.getAmount();
            sum=sum+d;
            num++;
          
            System.out.println("val is "+val.toString());
            System.out.println("inside for key is "+key.toString());
        }

        OrderBeanSerial vals=new OrderBeanSerial();
        vals.set(key.toString(), 0.0);
        vals.setAverage(sum/num);
/*这样就是最后的文件里的输出结果了。正好符合需求。*/
        context.write(key, vals);
    }
}






package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SerialTestMark_to_win {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(SerialTest.class);
        job.setMapperClass(SerialMapper.class);
        job.setReducerClass(SerialReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(OrderBeanSerial.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(OrderBeanSerial.class);
        FileInputFormat.setInputPaths(job, new Path("e:/temp/input/serial.txt"));
/*保证下面的目录不存在*/      
        FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

文件当中的输出是:(分析:key和value都输出了,所以有两个o1)
o1    o1    0.0    225.0
o2    o2    0.0    433.3333333333333
o3    o3    0.0    150.0



输出结果:(分析amount的输出是无序的)

reduce key is 马克-to-win @ 马克java社区: o1
 INFO - reduce > reduce
 INFO -  map 100% reduce 78%
val is o1    200.0    0.0
inside for key is o1
 INFO - reduce > reduce
 INFO -  map 100% reduce 83%
val is o1    250.0    0.0
inside for key is o1
reduce key is 马克-to-win @ 马克java社区:o2
 INFO - reduce > reduce
 INFO -  map 100% reduce 89%
val is o2    700.0    0.0
inside for key is o2
 INFO - reduce > reduce
 INFO -  map 100% reduce 94%
val is o2    100.0    0.0
inside for key is o2
 INFO - reduce > reduce
 INFO -  map 100% reduce 100%
val is o2    500.0    0.0
inside for key is o2
reduce key is 马克-to-win @ 马克java社区:o3
 INFO - reduce > reduce
val is o3    150.0    0.0
inside for key is o3