Win7 Eclipse 搭建spark java1.8环境:WordCount helloworld例子

Win7 Eclipse 搭建spark java1.8环境:WordCount helloworld例子
马克- to-win:马克 java社区:防盗版实名手机尾号: 73203。
马克-to-win @ 马克java社区:在eclipse oxygen上创建一个普通的java项目,然后把spark-assembly-1.6.1-hadoop2.6.0.jar这个包导进工程就ok了。只要启动start-dfs,下面的程序就可以运行了。


package com;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

public class WordCount1 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("wc");
/*没有下面的话, 会报一个错误,java.lang.IllegalArgumentException: System memory 259522560 must be at least 4.718592E8(470M). Please use a larger heap size.这是memory不够,导致无法启动SparkContext*/      
        conf.set("spark.testing.memory", "2000000000");
        JavaSparkContext sc = new JavaSparkContext(conf);
/*下面的这种倒入的方法也行*/      
 //       JavaRDD<String> text = sc.textFile("hdfs://localhost:9000/README.txt");
/*原文件是:o1abc 45
o1abc 77
o1abc o1abc */      
        JavaRDD<String> text = sc.textFile("E://temp//input//friend.txt");
        List<String> strList = text.collect();
/*输出str:o1abc 45
str:o1abc 77
str:o1abc o1abc*/      
        for (String str : strList) {
            System.out.println("str:" + str);
        }
/*Interface FlatMapFunction<T,R>, Iterable<R> call(T t)(注意之后的版本,返回值有所变化。)*/      
        JavaRDD<String> words = text.flatMap(new FlatMapFunction<String, String>() {
/*List的super Interface 是java.lang.Iterable*/          
            public Iterable<String> call(String line) throws Exception {
                System.out.println("flatMap once, line is "+line );
                String[] wordsArray=line.split(" ");
                List<String> wordsList=Arrays.asList(wordsArray);
                return wordsList;
            }
        });
        List<String> wordsList = words.collect();
/*输出
flatMap once, line is o1abc 45
flatMap once, line is o1abc 77
flatMap once, line is o1abc o1abc
 




word:o1abc
word:45
word:o1abc
word:77
word:o1abc
word:o1abc*/      
        for (String word : wordsList) {
            System.out.println("word:" + word);
        }  
/* http://spark.apache.org/docs/latest/
 Interface PairFunction<T,K,V>
A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
scala.Tuple2<K,V>     call(T t)
*/  
/*
flatMap once, line is o1abc 45(这句说明前面语句再次被执行)
in tuple2 word: o1abc
t._1 is 马克-to-win @ 马克java社区:o1abc t._2 is 1
in tuple2 word: 45
t._1 is 马克-to-win @ 马克java社区:45 t._2 is 1
flatMap once, line is o1abc 77
in tuple2 word: o1abc
t._1 is 马克-to-win @ 马克java社区:o1abc t._2 is 1
in tuple2 word: 77
t._1 is 马克-to-win @ 马克java社区:77 t._2 is 1
flatMap once, line is o1abc o1abc
in tuple2 word: o1abc
t._1 is 马克-to-win @ 马克java社区:o1abc t._2 is 1
in tuple2 word: o1abc
t._1 is 马克-to-win @ 马克java社区: o1abc t._2 is 1
*/      
        JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String word) throws Exception {
                System.out.println("in tuple2 word: " + word);
                Tuple2<String, Integer> t=new Tuple2<String, Integer>(word, 1);
                System.out.println("t._1 is 马克-to-win @ 马克java社区:" +t._1+" t._2 is " +t._2);
                return t;
            }
        });
/*
注意tuple自带括号。
listPair is (o1abc,1)
listPair is (45,1)
listPair is (o1abc,1)
listPair is (77,1)
listPair is (o1abc,1)
listPair is (o1abc,1)*/      
        List<Tuple2<String,Integer>> listPairs = pairs.collect();
        for (Tuple2<String, Integer> listPair : listPairs) {
            System.out.println("listPair is "+listPair);
        }
/*Interface Function2<T1,T2,R>, R call(T1 v1,T2 v2)*/
        JavaPairRDD<String, Integer> results = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {          
            public Integer call(Integer value1, Integer value2) throws Exception {
                return value1 + value2;
            }
        });
/*resultsPair is (o1abc,4)
resultsPair is (45,1)
resultsPair is (77,1)*/      
        List<Tuple2<String,Integer>> resultsPairs = results.collect();
        for (Tuple2<String, Integer> resultsPair : resultsPairs) {
            System.out.println("resultsPair is "+resultsPair);
        }
/*Interface VoidFunction<T>  void call(T t)
word:o1abc count:4
word:45 count:1
word:77 count:1
*/      
        results.foreach(new VoidFunction<Tuple2<String,Integer>>() {
            public void call(Tuple2<String, Integer> tuple) throws Exception {
                System.out.println("word:" + tuple._1 + " count:" + tuple._2);
            }
        });

        sc.close();
    }
}