标签:
申明:以下代码仅作学习参考使用,勿使用在商业用途。
数据以及测试代码的获取请点击 —— [ 这里 ]
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
/*
* step 1, the mapper:
*
* -我们为每一个单词添加属性 1.获取形如(word,1)的 JavaPairRDD<String, Integer>。单词作为key
*
* step 2, the reducer:
* -合并统计.
*
*
*/
public class Ex0Wordcount implements Serializable {
public static String pathToFile = "data/wordcount.txt";
public static SparkConf conf = null;
public static JavaSparkContext sc = null;
static {
conf = new SparkConf().setAppName("Wordcount")
.set("spark.driver.allowMultipleContexts", "true");
//.setMaster("spark://master:7077");
conf.set("spark.executor.memory", "1000m");
conf .setMaster("local[*]"); // here local mode. And * means you will use
// as much as you have cores.
sc = new JavaSparkContext(conf);
sc.addJar("/home/hadoop/tools/jars/1.jar");
}
public static void main(String[] args) {
Ex0Wordcount wc = new Ex0Wordcount();
wc.filterOnWordcount();
}
public JavaRDD<String> loadData() {
JavaRDD<String> words = sc.textFile(pathToFile).flatMap(
new FlatMapFunction<String, String>() {
public Iterable call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
return words;
}
/**
* Now count how much each word appears!
*/
public JavaPairRDD<String, Integer> wordcount() {
JavaRDD<String> words = loadData();
// code here
JavaPairRDD<String, Integer> couples = words
.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s)
throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
});
// code here
JavaPairRDD<String, Integer> result = couples
.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i0, Integer i1)
throws Exception {
return i0 + i1;
}
});
return result;
}
/**
* Now keep the word which appear strictly more than 4 times!
*/
public JavaPairRDD<String, Integer> filterOnWordcount() {
JavaPairRDD<String, Integer> wordcounts = wordcount();
List<Tuple2<String, Integer>> output = wordcounts.collect();
JavaPairRDD<String, Integer> filtered = null;
// int count=0;
for (Tuple2<?, ?> tuple : output) {
if (Integer.parseInt(tuple._2() + "") > 4) {
// filtered=tuple;
System.out.println(tuple._1() + "==" + tuple._2());
}
}
return filtered;
}
}
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.cassandra.cli.CliParser.newColumnFamily_return;
import org.apache.cassandra.thrift.Cassandra.system_add_column_family_args;
import org.apache.spark.Partition;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.util.TaskCompletionListener;
import scala.Function0;
import scala.Function1;
import scala.Tuple2;
import scala.runtime.BoxedUnit;
import utils.Parse;
import utils.Tweet;
/**
* The Java Spark API documentation:
* http://spark.apache.org/docs/latest/api/java/index.html
*
* 我们使用包含了8198个tweet数据记录。数据格式如下:
*
* {"id":"572692378957430785", "user":"Srkian_nishu :)", "text":
* "@always_nidhi @YouTube no i dnt understand bt i loved of this mve is rocking"
* , "place":"Orissa", "country":"India"}
*
* 目标: 找出user所有的tweet账户(一个user可能包含多个tweet账户,如Srkian_nishu的tweet账户有[
* 572692378957430785,...])
*
*/
public class Ex1UserMining implements Serializable{
private static String pathToFile = "data/reduced-tweets.json";
public static void main(String[] args) {
Ex1UserMining userMining=new Ex1UserMining();
//userMining.tweetsByUser();
userMining.filterOnTweetUser();
System.exit(0);
}
public JavaRDD<Tweet> loadData() {
// Create spark configuration and spark context
SparkConf conf = new SparkConf().setAppName("User mining")
.set("spark.driver.allowMultipleContexts", "true")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load the data and parse it into a Tweet.
// Look at the Tweet Object in the TweetUtils class.
JavaRDD<Tweet> tweets = sc.textFile(pathToFile).map(
new Function<String, Tweet>() {
public Tweet call(String line) throws Exception {
// TODO Auto-generated method stub
return Parse.parseJsonToTweet(line);
}
});
return tweets;
}
/**
* For each user return all his tweets
*/
public JavaPairRDD<String, Iterable<Tweet>> tweetsByUser() {
JavaRDD<Tweet> tweets = loadData();
// TODO write code here
// Hint: the Spark API provides a groupBy method
JavaPairRDD<String, Iterable<Tweet>> tweetsByUser = tweets.groupBy(new Function<Tweet, String>() {
@Override
public String call(Tweet tweet) throws Exception {
return tweet.getUser();
}
});
return tweetsByUser;
}
/**
* Compute the number of tweets by user
*/
public JavaPairRDD<String, Integer> tweetByUserNumber() {
JavaRDD<Tweet> tweets = loadData();
// TODO write code here
// Hint: think about what you did in the wordcount example
JavaPairRDD<String, Integer> count = tweets.mapToPair(new PairFunction<Tweet, String ,Integer>() {
@Override
public Tuple2<String, Integer> call(Tweet tweet) throws Exception {
return new Tuple2<String, Integer>(tweet.getUser(), 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) throws Exception {
return a+b;
}
});
return count;
}
public void filterOnTweetUser() {
JavaPairRDD<String, Iterable<Tweet>> filtered = tweetsByUser();
//filtered.values().splits().get(0);
filtered.keyBy(new Function<Tuple2<String,Iterable<Tweet>>,Tuple2<String,ArrayList<Long>> >() {
@Override
public Tuple2<String, ArrayList<Long>> call(
Tuple2<String, Iterable<Tweet>> t) throws Exception {
ArrayList< Long> arrayList=new ArrayList<Long>();
arrayList.add(t._2().iterator().next().getId());
return new Tuple2<String, ArrayList<Long>>(t._1(),arrayList);
}
});
List<Tuple2<String, Iterable<Tweet>>> output = filtered.collect();
Iterator< Tuple2<String, Iterable<Tweet>>> it2=output.iterator();
List<Long> list=new ArrayList<Long>();
while (it2.hasNext()) {
Tuple2<String, Iterable<utils.Tweet>> tuple2 = (Tuple2<String, Iterable<utils.Tweet>>) it2.next();
Iterator<Tweet> iterator= tuple2._2().iterator();
while (iterator.hasNext()) {
Tweet tweet = (Tweet) iterator.next();
list.add(tweet.getId());
// System.out.println(tuple2._1()+"=="+tweet.getId());
}
System.out.println(tuple2._1()+"=="+ShowData(list));
list.clear();
}
//return filtered;
}
public String ShowData(List<Long> list) {
String str="[";
for (int i = 0; i < list.size(); i++) {
str+=list.get(i)+",";
}
return str+"]";
}
public JavaPairRDD<String, Integer> filterOnTweetcount() {
JavaPairRDD<String, Integer> tweetcounts = tweetByUserNumber();
List<Tuple2<String, Integer>> output = tweetcounts.collect();
JavaPairRDD<String, Integer> filtered = null;
// int count=0;
for (Tuple2<?, ?> tuple : output) {
// filtered=tuple;
System.out.println(tuple._1() + "==" + tuple._2());
}
return filtered;
}
}
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import utils.Parse;
import utils.Tweet;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* The Java Spark API documentation:
* http://spark.apache.org/docs/latest/api/java/index.html
* 我们使用包含了8198个tweet数据记录。数据格式如下:
*
* {"id":"572692378957430785", "user":"Srkian_nishu :)", "text":
* "@always_nidhi @YouTube no i dnt understand bt i loved of this mve is rocking"
* , "place":"Orissa", "country":"India"}
*
* 目标: 1.找出所有被@的人 2.计算每个人被@到的次数,找出前10个@次数最多的人
*
*
* Use the Ex2TweetMiningTest to implement the code.
*/
public class Ex2TweetMining implements Serializable {
/**
*
*/
private static String pathToFile = "data/reduced-tweets.json";
private static String saveAsTextFile = "out1/out1.txt";
/**
* Load the data from the json file and return an RDD of Tweet
*/
public JavaRDD<Tweet> loadData() {
// create spark configuration and spark context
SparkConf conf = new SparkConf().setAppName("Tweet mining").setMaster(
"local[*]");
// .setMaster("spark://master:7077");
conf.set("spark.driver.allowMultipleContexts", "true");
JavaSparkContext sc = new JavaSparkContext(conf);
// sc.addJar("/home/sun/jars/tutorial-all.jar");
// load the data and create an RDD of Tweet
// JavaRDD<Tweet> tweets =
// sc.textFile("hdfs://master:9000/sparkdata/reduced-tweets.json")
JavaRDD<Tweet> tweets = sc.textFile(pathToFile).map(
new Function<String, Tweet>() {
public Tweet call(String line) throws Exception {
// TODO Auto-generated method stub
return Parse.parseJsonToTweet(line);
}
});
return tweets;
}
/**
* Find all the persons mentioned on tweets (case sensitive)
*/
public JavaRDD<String> mentionOnTweet() {
JavaRDD<Tweet> tweets = loadData();
// You want to return an RDD with the mentions
// Hint: think about separating the word in the text field and then find
// the mentions
// TODO write code here
JavaRDD<String> mentions = tweets
.flatMap(new FlatMapFunction<Tweet, String>() {
public Iterable<String> call(Tweet t) throws Exception {
String text = t.getText();
Set<String> set = new HashSet<String>();
String[] words = text.split(" ");
for (String word : words) {
if (word.startsWith("@")) {
set.add(word);
}
}
return set;
}
});
return mentions;
}
/**
* Count how many times each person is mentioned
*/
public JavaPairRDD<String, Integer> countMentions() {
JavaRDD<String> mentions = mentionOnTweet();
// Hint: think about what you did in the wordcount example
// TODO write code here
JavaPairRDD<String, Integer> mentionCount = mentions.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String t)
throws Exception {
return new Tuple2<String, Integer>(t, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
});
// mentionCount.saveAsTextFile("hdfs://master:9000/sparkdata/tweets-m4");
// mentionCount.saveAsTextFile(saveAsTextFile);
return mentionCount;
}
/**
* Find the 10 most mentioned persons by descending order
*/
public List<Tuple2<Integer, String>> top10mentions() {
JavaPairRDD<String, Integer> counts = countMentions();
// Hint: take a look at the sorting and take methods
// TODO write code here
List<Tuple2<Integer, String>> mostMentioned = (List<Tuple2<Integer, String>>) counts
.mapToPair(
new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(
Tuple2<String, Integer> tuple2)
throws Exception {
return new Tuple2<Integer, String>(tuple2._2(),
tuple2._1());
}
}).sortByKey(false).take(10);
return mostMentioned;
}
public void filterOnTweetTop10Mentions() {
List<Tuple2<Integer, String>> output = top10mentions();
Iterator<Tuple2<Integer, String>> it2 = output.iterator();
// List<Long> list=new ArrayList<Long>();
while (it2.hasNext()) {
Tuple2<Integer, String> tuple2 = (Tuple2<Integer, String>) it2
.next();
System.out.println(tuple2._1() + "==" + tuple2._2());
}
// System.out.println(tuple2._1()+"=="+ShowData(list));
// list.clear();
}
// return filtered;
public static void main(String[] args) {
Ex2TweetMining ex2TweetMining = new Ex2TweetMining();
ex2TweetMining.filterOnTweetTop10Mentions();
/*
* JavaPairRDD<String, Integer> res = ex2TweetMining.countMentions();
* System.out.println(res.take(1));
*/
}
}
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import utils.Parse;
import utils.Tweet;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* The Java Spark API documentation: http://spark.apache.org/docs/latest/api/java/index.html
*
* 我们使用包含了8198个tweet数据记录。数据格式如下:
*
* {"id":"572692378957430785", "user":"Srkian_nishu :)", "text":
* "@always_nidhi @YouTube no i dnt understand bt i loved of this mve is rocking"
* , "place":"Orissa", "country":"India"}
*
* 目标: 1.找出所有所有被标记(”#“)到的人。
* 2.找出每个被标记(“#”)的人被(”@“)到的次数,求出次数前十
*
*
*/
public class Ex3HashtagMining implements Serializable{
private static String pathToFile = "data/reduced-tweets.json";
public static void main(String[] args) {
Ex3HashtagMining ex3HashtagMining=new Ex3HashtagMining();
ex3HashtagMining.filterOnTweetTop10HashtagMining();
}
/**
* Load the data from the json file and return an RDD of Tweet
*/
public JavaRDD<Tweet> loadData() {
// create spark configuration and spark context
SparkConf conf = new SparkConf()
.setAppName("Hashtag mining")
.set("spark.driver.allowMultipleContexts", "true")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Tweet> tweets = sc.textFile(pathToFile).map(new Function<String, Tweet>() {
public Tweet call(String line) throws Exception {
// TODO Auto-generated method stub
return Parse.parseJsonToTweet(line);
}
});
return tweets;
}
/**
* Find all the hashtags mentioned on tweets
*/
public JavaRDD<String> hashtagMentionedOnTweet() {
JavaRDD<Tweet> tweets = loadData();
// You want to return an RDD with the mentions
// Hint: think about separating the word in the text field and then find the mentions
// TODO write code here
JavaRDD<String> mentions = tweets.flatMap(new FlatMapFunction<Tweet, String>() {
@Override
public Iterable<String> call(Tweet tweet) throws Exception {
return Arrays.asList(tweet.getText().split(" "));
}
}).filter(new Function<String, Boolean>() {
@Override
public Boolean call(String string) throws Exception {
return string.startsWith("#")&&string.length()>1;
}
});
return mentions;
}
/**
* Count how many times each hashtag is mentioned
*/
public JavaPairRDD<String,Integer> countMentions() {
JavaRDD<String> mentions = hashtagMentionedOnTweet();
// Hint: think about what you did in the wordcount example
// TODO write code here
JavaPairRDD<String, Integer> counts = mentions.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) throws Exception {
return a+b;
}
});
return counts;
}
/**
* Find the 10 most popular Hashtags by descending order
*/
public List<Tuple2<Integer, String>> top10HashTags() {
JavaPairRDD<String, Integer> counts = countMentions();
// Hint: take a look at the sorting and take methods
// TODO write code here
List<Tuple2<Integer, String>> top10 = (List<Tuple2<Integer, String>>) counts.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple2)
throws Exception {
return new Tuple2<Integer, String>(tuple2._2(), tuple2._1());
}
}).sortByKey(false).take(10);
return top10;
}
public void filterOnTweetTop10HashtagMining() {
List<Tuple2<Integer, String>> output = top10HashTags();
Iterator<Tuple2<Integer, String>> it2 = output.iterator();
// List<Long> list=new ArrayList<Long>();
while (it2.hasNext()) {
Tuple2<Integer, String> tuple2 = (Tuple2<Integer, String>) it2
.next();
System.out.println(tuple2._1() + "==" + tuple2._2());
}
// System.out.println(tuple2._1()+"=="+ShowData(list));
// list.clear();
}
}
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
import utils.Parse;
import utils.Tweet;
/**
* 目标 : 建立标记的索引视图
*
* 说明: 例如对于标记#spark,它出现在tweet1, tweet3, tweet39中。 建立的索引应该返回(#spark, List(tweet1,tweet3, tweet39))
*
*/
public class Ex4InvertedIndex implements Serializable{
private static String pathToFile = "data/reduced-tweets.json";
public static void main(String[] args) {
ShowData();
}
/**
* Load the data from the json file and return an RDD of Tweet
*/
public static JavaRDD<Tweet> loadData() {
// create spark configuration and spark context
SparkConf conf = new SparkConf()
.setAppName("Inverted index")
.set("spark.driver.allowMultipleContexts", "true")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Tweet> tweets = sc.textFile(pathToFile).map(new Function<String, Tweet>() {
public Tweet call(String line) throws Exception {
// TODO Auto-generated method stub
return Parse.parseJsonToTweet(line);
}
});
return tweets;
}
public static void ShowData(){
Map<String, Iterable<Tweet>> output=invertedIndex();
Iterator<Entry<String, Iterable<Tweet>>> it2 = output.entrySet().iterator();
List<Long> list=new ArrayList<Long>();
while (it2.hasNext()) {
Entry<String, Iterable<Tweet>> tuple2 = it2.next();
Iterator<Tweet> iterator= tuple2.getValue().iterator();
while (iterator.hasNext()) {
Tweet tweet = (Tweet) iterator.next();
list.add(tweet.getId());
// System.out.println(tuple2._1()+"=="+tweet.getId());
}
System.out.println(tuple2.getKey()+ "==" +PrintData(list));
list.clear();
}
}
public static String PrintData(List<Long> list ){
String str="[";
for (int i = 0; i < list.size(); i++) {
str+=list.get(i)+",";
}
return str+"]";
}
public static void ShowData1(){
Map<String, Iterable<Tweet>> output=invertedIndex();
Iterator<Entry<String, Iterable<Tweet>>> it2 = output.entrySet().iterator();
// List<Long> list=new ArrayList<Long>();
while (it2.hasNext()) {
Entry<String, Iterable<Tweet>> tuple2 = it2.next();
System.out.println(tuple2.getKey()+ "==" + tuple2.getValue());
}
}
public static Map<String, Iterable<Tweet>> invertedIndex() {
JavaRDD<Tweet> tweets = loadData();
// for each tweet, extract all the hashtag and then create couples (hashtag,tweet)
// Hint: see the flatMapToPair method
// TODO write code here
JavaPairRDD<String, Tweet> pairs = tweets.flatMapToPair(new PairFlatMapFunction<Tweet, String, Tweet>() {
@Override
public Iterable<Tuple2<String, Tweet>> call(Tweet tweet)
throws Exception {
List results = new ArrayList();
List<String> hashtags = new ArrayList();
List<String> words = Arrays.asList(tweet.getText().split(" "));
for (String word: words) {
if (word.startsWith("#") && word.length() > 1) {
hashtags.add(word);
}
}
for (String hashtag : hashtags) {
Tuple2<String, Tweet> result = new Tuple2<>(hashtag, tweet);
results.add(result);
}
return results;
}
});
// We want to group the tweets by hashtag
// TODO write code here
JavaPairRDD<String, Iterable<Tweet>> tweetsByHashtag = pairs.groupByKey();
// Then return the inverted index (= a map structure)
// TODO write code here
Map<String, Iterable<Tweet>> map =tweetsByHashtag.collectAsMap();
return map;
}
}
标签:
原文地址:http://blog.csdn.net/u014028392/article/details/51340100