如何做大表和大表的关联?

马克-to-win @ 马克java社区:如何做大表和大表的关联? 对于大表和大表的关联: 1.reducejoin可以解决关联问题,但不完美,有数据倾斜的可能,如前所述。 2.思路:将其中一个大表进行切分,成多个小表再进行关联。
马克- to-win:马克 java社区:防盗版实名手机尾号: 73203。
package com;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;

public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    Map<String, String> dictMap = new HashMap<>();
    Text k = new Text();

    protected void setup(Context context) throws IOException, InterruptedException {
        String path = context.getCacheFiles()[0].getPath();
        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
        String line;
        while (StringUtils.isNotEmpty(line = br.readLine())) {
            String[] fields = line.split(",");
            dictMap.put(fields[0], fields[1]+","+fields[2]);
        }
        br.close();
    }

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String orderLine = value.toString();
        String[] fields = orderLine.split(",");
        String pNameprice = dictMap.get(fields[1]);
        String[] fieldspNameprice = pNameprice.split(",");
        k.set(orderLine + "," + fieldspNameprice[0]+","+fieldspNameprice[1]);
        context.write(k, NullWritable.get());
    }
}





package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;
import java.net.URI;
public class MapJoinTestMark_to_win {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(MapJoinTest.class);
        job.setMapperClass(MapJoinMapper.class);
        // 这里省略reduce(因为map端已经完成)
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        File file = new File("e:/temp/output");
        if (file.exists() && file.isDirectory()) {
            deleteFile(file);
        }
        FileInputFormat.setInputPaths(job, new Path("e:/temp/input/order.txt"));
        FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
        System.out.println("马克-to-win @马克java社区 successful");
          /* 马克-to-win @ 马克java社区:Add a file to be localized:这样缓存hdfs或文件系统的文件到task运行节点的local了,
             uri: The uri of the cache to be localized:比如:o.addCacheFile(new URI("hdfs://url:port/filename"));或者本地文件:new URI("file:/E:/temp/input/product.txt")
          public void addCacheFile(URI uri) {
          */
        job.addCacheFile(new URI("file:/E:/temp/input/product.txt"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
  

    public static boolean deleteFile(File dirFile) {
        if (!dirFile.exists()) {
            return false;
        }
        if (dirFile.isFile()) {
            return dirFile.delete();
        } else { /* 空目录就不进入for循环了, 进入到下一句dirFile.delete(); */
            for (File file : dirFile.listFiles()) {
                deleteFile(file);
            }
        }
        return dirFile.delete();
    }
}


文件输出结果:

1,p1,3,java,11
1,p2,2,c,22
2,p3,3,c#,33
2,p4,1,python,44
2,p5,4,js,66
3,p1,5,java,11