码迷,mamicode.com
首页 > 其他好文 > 详细

Spark应用_PageView_UserView_HotChannel

时间:2017-03-04 21:10:33      阅读:367      评论:0      收藏:0      [点我收藏+]

标签:enter   textfile   otto   oid   tin   justify   sde   技术分享   流程图   

 


Spark应用_PageView_UserView_HotChannel

一、PV

对某一个页面的访问量,在页面中进行刷新一次就是一次pv

PV {p1, (u1,u2,u3,u1,u2,u4…)} 对同一个页面的浏览量进行统计,用户可以重复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class PV_ANA {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("PV_ANA")
.setMaster("local")
.set("spark.testing.memory", "2147480000");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logRDD = sc.textFile("f:/userLog");
String str = "View";
final Broadcast<String> broadcast = sc.broadcast(str);
pvAnalyze(logRDD, broadcast);
}
 
private static void pvAnalyze(JavaRDD<String> logRDD,
final Broadcast<String> broadcast) {
JavaRDD<String> filteredLogRDD = logRDD.filter
(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
 
@Override
public Boolean call(String s) throws Exception {
String actionParam = broadcast.value();
String action = s.split("\t")[5];
return actionParam.equals(action);
}
});
JavaPairRDD<String, String> pariLogRDD = filteredLogRDD.mapToPair
(new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
 
@Override
public Tuple2<String, String> call(String s)
throws Exception {
String pageId = s.split("\t")[3];
return new Tuple2<String, String>(pageId, null);
}
});
pariLogRDD.groupByKey().foreach(new VoidFunction
<Tuple2<String, Iterable<String>>>() {
private static final long serialVersionUID = 1L;
 
@Override
public void call(Tuple2<String, Iterable<String>> tuple)
throws Exception {
String pageId = tuple._1;
Iterator<String> iterator = tuple._2.iterator();
long count = 0L;
while (iterator.hasNext()) {
iterator.next();
count++;
}
System.out.println("PAGEID:" + pageId + "\t PV_COUNT:" + count);
}
});
}
 
}

二、UV

UV {p1, (u1,u2,u3,u4,u5…)} 对一个页面有多少用户访问,用户不可以重复

【方式一】

【流程图】

技术分享

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class UV_ANA {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("UV_ANA")
.setMaster("local")
.set("spark.testing.memory", "2147480000");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logRDD = sc.textFile("f:/userLog");
String str = "View";
final Broadcast<String> broadcast = sc.broadcast(str);
uvAnalyze(logRDD, broadcast);
}
 
private static void uvAnalyze(JavaRDD<String> logRDD,
final Broadcast<String> broadcast) {
JavaRDD<String> filteredLogRDD = logRDD.filter
(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
 
@Override
public Boolean call(String s) throws Exception {
String actionParam = broadcast.value();
String action = s.split("\t")[5];
return actionParam.equals(action);
}
});
JavaPairRDD<String, String> pairLogRDD = filteredLogRDD.mapToPair
(new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
 
@Override
public Tuple2<String, String> call(String s) throws Exception {
String pageId = s.split("\t")[3];
String userId = s.split("\t")[2];
return new Tuple2<String, String>(pageId, userId);
}
});
pairLogRDD.groupByKey().foreach(new VoidFunction
<Tuple2<String, Iterable<String>>>() {
private static final long serialVersionUID = 1L;
 
@Override
public void call(Tuple2<String, Iterable<String>> tuple)
throws Exception {
String pageId = tuple._1;
Iterator<String> iterator = tuple._2.iterator();
Set<String> userSets = new HashSet<>();
while (iterator.hasNext()) {
String userId = iterator.next();
userSets.add(userId);
}
System.out.println("PAGEID:" + pageId + "\t " +
"UV_COUNT:" + userSets.size());
}
});
}
}

【方式二】

【流程图】

技术分享

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class UV_ANAoptz {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("UV_ANAoptz")
.setMaster("local")
.set("spark.testing.memory", "2147480000");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logRDD = sc.textFile("f:/userLog");
String str = "View";
final Broadcast<String> broadcast = sc.broadcast(str);
uvAnalyzeOptz(logRDD, broadcast);
}
 
private static void uvAnalyzeOptz(JavaRDD<String> logRDD,
final Broadcast<String> broadcast) {
JavaRDD<String> filteredLogRDD = logRDD.filter
(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
 
@Override
public Boolean call(String s) throws Exception {
String actionParam = broadcast.value();
String action = s.split("\t")[5];
return actionParam.equals(action);
}
});
JavaPairRDD<String, String> pairRDD = filteredLogRDD.mapToPair
(new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
 
@Override
public Tuple2<String, String> call(String s)
throws Exception {
String pageId = s.split("\t")[3];
String userId = s.split("\t")[2];
return new Tuple2<String, String>(pageId + "_" +
userId, null);
}
});
JavaPairRDD<String, Iterable<String>> groupUp2LogRDD = pairRDD.groupByKey();
Map<String, Object> countByKey = groupUp2LogRDD.mapToPair
(new PairFunction<Tuple2<String, Iterable<String>>,
String, String>() {
private static final long serialVersionUID = 1L;
 
@Override
public Tuple2<String, String> call(Tuple2<String,
Iterable<String>> tuple)
throws Exception {
String pu = tuple._1;
String[] spilted = pu.split("_");
String pageId = spilted[0];
return new Tuple2<String, String>(pageId, null);
}
}).countByKey();
Set<String> keySet = countByKey.keySet();
for (String key : keySet) {
System.out.println("PAGEID:" + key + "\tUV_COUNT:" +
countByKey.get(key));
}
}
}

三、热门版块下用户访问的数量

统计出热门版块中最活跃的top3用户。

【方式一】

【流程图】

技术分享

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
public class HotChannel {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("HotChannel")
.setMaster("local")
.set("spark.testing.memory", "2147480000");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logRDD = sc.textFile("f:/userLog");
String str = "View";
final Broadcast<String> broadcast = sc.broadcast(str);
hotChannel(sc, logRDD, broadcast);
}
 
private static void hotChannel(JavaSparkContext sc, JavaRDD<String> logRDD,
final Broadcast<String> broadcast) {
JavaRDD<String> filteredLogRDD = logRDD.filter
(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
 
@Override
public Boolean call(String v1) throws Exception {
String actionParam = broadcast.value();
String action = v1.split("\t")[5];
return actionParam.equals(action);
}
});
JavaPairRDD<String, String> channel2nullRDD = filteredLogRDD.mapToPair
(new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
 
@Override
public Tuple2<String, String> call(String s) throws Exception {
String channel = s.split("\t")[4];
return new Tuple2<String, String>(channel, null);
}
});
Map<String, Object> channelPVMap = channel2nullRDD.countByKey();
Set<String> keySet = channelPVMap.keySet();
List<SortObj> channels = new ArrayList<>();
for (String channel : keySet) {
channels.add(new SortObj(channel, Integer.valueOf
(channelPVMap.get(channel) + "")));
}
Collections.sort(channels, new Comparator<SortObj>() {
 
@Override
public int compare(SortObj o1, SortObj o2) {
return o2.getValue() - o1.getValue();
}
});
List<String> hotChannelList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
hotChannelList.add(channels.get(i).getKey());
}
for (String channel : hotChannelList) {
System.out.println("channel:" + channel);
}
final Broadcast<List<String>> hotChannelListBroadcast =
sc.broadcast(hotChannelList);
JavaRDD<String> filterRDD = logRDD.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
List<String> hostChannels = hotChannelListBroadcast.value();
String channel = s.split("\t")[4];
String userId = s.split("\t")[2];
return hostChannels.contains(channel) && !"null".equals(userId);
}
});
JavaPairRDD<String, String> channel2UserRDD = filterRDD.mapToPair
(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s)
throws Exception {
String[] splited = s.split("\t");
String channel = splited[4];
String userId = splited[2];
return new Tuple2<String, String>(channel, userId);
}
});
channel2UserRDD.groupByKey().foreach(new VoidFunction
<Tuple2<String, Iterable<String>>>() {
@Override
public void call(Tuple2<String, Iterable<String>> tuple)
throws Exception {
String channel = tuple._1;
Iterator<String> iterator = tuple._2.iterator();
Map<String, Integer> userNumMap = new HashMap<>();
while (iterator.hasNext()) {
String userId = iterator.next();
Integer count = userNumMap.get(userId);
if (count == null) {
count = 1;
} else {
count++;
}
userNumMap.put(userId, count);
}
List<SortObj> lists = new ArrayList<>();
Set<String> keys = userNumMap.keySet();
for (String key : keys) {
lists.add(new SortObj(key, userNumMap.get(key)));
}
Collections.sort(lists, new Comparator<SortObj>() {
 
@Override
public int compare(SortObj O1, SortObj O2) {
return O2.getValue() - O1.getValue();
}
});
System.out.println("HOT_CHANNEL:" + channel);
for (int i = 0; i < 3; i++) {
SortObj sortObj = lists.get(i);
System.out.println(sortObj.getKey() + "=="
+ sortObj.getValue());
}
}
});
}
}

【方式二】

【流程图】

技术分享

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
public class HotChannelOpz {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("hotChannelOpz")
.setMaster("local")
.set("spark.testing.memory", "2147480000");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logRDD = sc.textFile("f:/userLog");
String str = "View";
final Broadcast<String> broadcast = sc.broadcast(str);
hotChannelOpz(sc, logRDD, broadcast);
}
 
private static void hotChannelOpz(JavaSparkContext sc, JavaRDD<String> logRDD,
final Broadcast<String> broadcast) {
JavaRDD<String> filteredLogRDD = logRDD.filter
(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
 
@Override
public Boolean call(String v1) throws Exception {
String actionParam = broadcast.value();
String action = v1.split("\t")[5];
return actionParam.equals(action);
}
});
 
JavaPairRDD<String, String> channel2nullRDD = filteredLogRDD.mapToPair
(new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
 
@Override
public Tuple2<String, String> call(String val)
throws Exception {
String channel = val.split("\t")[4];
 
return new Tuple2<String, String>(channel, null);
}
});
Map<String, Object> channelPVMap = channel2nullRDD.countByKey();
Set<String> keySet = channelPVMap.keySet();
List<SortObj> channels = new ArrayList<>();
for (String channel : keySet) {
channels.add(new SortObj(channel, Integer.valueOf
(channelPVMap.get(channel) + "")));
}
Collections.sort(channels, new Comparator<SortObj>() {
 
@Override
public int compare(SortObj o1, SortObj o2) {
return o2.getValue() - o1.getValue();
}
});
List<String> hotChannelList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
hotChannelList.add(channels.get(i).getKey());
}
final Broadcast<List<String>> hotChannelListBroadcast =
sc.broadcast(hotChannelList);
JavaRDD<String> filtedRDD = logRDD.filter
(new Function<String, Boolean>() {
 
@Override
public Boolean call(String v1) throws Exception {
List<String> hostChannels = hotChannelListBroadcast.value();
String channel = v1.split("\t")[4];
String userId = v1.split("\t")[2];
return hostChannels.contains(channel) &&
!"null".equals(userId);
}
});
JavaPairRDD<String, String> user2ChannelRDD = filtedRDD.mapToPair
(new PairFunction<String, String, String>() {
private static final long serialVersionUID = 1L;
 
@Override
public Tuple2<String, String> call(String val)
throws Exception {
String[] splited = val.split("\t");
String userId = splited[2];
String channel = splited[4];
return new Tuple2<String, String>(userId, channel);
}
});
JavaPairRDD<String, String> userVistChannelsRDD =
user2ChannelRDD.groupByKey().
flatMapToPair(new PairFlatMapFunction
<Tuple2<String, Iterable<String>>, String, String>() {
private static final long serialVersionUID = 1L;
 
@Override
public Iterable<Tuple2<String, String>> call
(Tuple2<String, Iterable<String>> tuple)
throws Exception {
String userId = tuple._1;
Iterator<String> iterator = tuple._2.iterator();
Map<String, Integer> channelMap = new HashMap<>();
while (iterator.hasNext()) {
String channel = iterator.next();
Integer count = channelMap.get(channel);
if (count == null)
count = 1;
else
count++;
channelMap.put(channel, count);
}
List<Tuple2<String, String>> list = new ArrayList<>();
Set<String> keys = channelMap.keySet();
for (String channel : keys) {
Integer channelNum = channelMap.get(channel);
list.add(new Tuple2<String, String>(channel,
userId + "_" + channelNum));
}
return list;
}
});
userVistChannelsRDD.groupByKey().foreach(new VoidFunction
<Tuple2<String, Iterable<String>>>() {
 
@Override
public void call(Tuple2<String, Iterable<String>> tuple)
throws Exception {
String channel = tuple._1;
Iterator<String> iterator = tuple._2.iterator();
List<SortObj> list = new ArrayList<>();
while (iterator.hasNext()) {
String ucs = iterator.next();
String[] splited = ucs.split("_");
String userId = splited[0];
Integer num = Integer.valueOf(splited[1]);
list.add(new SortObj(userId, num));
}
Collections.sort(list, new Comparator<SortObj>() {
@Override
public int compare(SortObj o1, SortObj o2) {
return o2.getValue() - o1.getValue();
}
});
System.out.println("HOT_CHANNLE:" + channel);
for (int i = 0; i < 3; i++) {
SortObj sortObj = list.get(i);
System.out.println(sortObj.getKey() + "==="
+ sortObj.getValue());
}
}
});
}
}

Spark应用_PageView_UserView_HotChannel

标签:enter   textfile   otto   oid   tin   justify   sde   技术分享   流程图   

原文地址:http://www.cnblogs.com/haozhengfei/p/b34b27ee2f9e79dc3c6d56820f790e42.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!