一、PV
对某一个页面的访问量,在页面中进行刷新一次就是一次pv
PV {p1, (u1,u2,u3,u1,u2,u4…)} 对同一个页面的浏览量进行统计,用户可以重复
| 
 1 
2 
3 
4 
5 
6 
7 
8 
9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
34 
35 
36 
37 
38 
39 
40 
41 
42 
43 
44 
45 
46 
47 
48 
49 
50 
51 
52 
53 
54 
55 
56 
57 
 | 
 public class PV_ANA { 
    public static void main(String[] args) { 
        SparkConf conf = new SparkConf() 
                .setAppName("PV_ANA") 
                .setMaster("local") 
                .set("spark.testing.memory", "2147480000"); 
        JavaSparkContext sc = new JavaSparkContext(conf); 
        JavaRDD<String> logRDD = sc.textFile("f:/userLog"); 
        String str = "View"; 
        final Broadcast<String> broadcast = sc.broadcast(str); 
        pvAnalyze(logRDD, broadcast); 
    } 
    private static void pvAnalyze(JavaRDD<String> logRDD, 
                                  final Broadcast<String> broadcast) { 
        JavaRDD<String> filteredLogRDD = logRDD.filter 
                (new Function<String, Boolean>() { 
                    private static final long serialVersionUID = 1L; 
                    public Boolean call(String s) throws Exception { 
                        String actionParam = broadcast.value(); 
                        String action = s.split("\t")[5]; 
                        return actionParam.equals(action); 
                    } 
                }); 
        JavaPairRDD<String, String> pariLogRDD = filteredLogRDD.mapToPair 
                (new PairFunction<String, String, String>() { 
                    private static final long serialVersionUID = 1L; 
                    public Tuple2<String, String> call(String s) 
                            throws Exception { 
                        String pageId = s.split("\t")[3]; 
                        return new Tuple2<String, String>(pageId, null); 
                    } 
                }); 
        pariLogRDD.groupByKey().foreach(new VoidFunction 
                <Tuple2<String, Iterable<String>>>() { 
            private static final long serialVersionUID = 1L; 
            public void call(Tuple2<String, Iterable<String>> tuple) 
                    throws Exception { 
                String pageId = tuple._1; 
                Iterator<String> iterator = tuple._2.iterator(); 
                long count = 0L; 
                while (iterator.hasNext()) { 
                    iterator.next(); 
                    count++; 
                } 
                System.out.println("PAGEID:" + pageId + "\t PV_COUNT:" + count); 
            } 
        }); 
    } 
} 
 | 
二、UV
UV {p1, (u1,u2,u3,u4,u5…)} 对一个页面有多少用户访问,用户不可以重复
【方式一】
【流程图】
| 
 1 
2 
3 
4 
5 
6 
7 
8 
9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
34 
35 
36 
37 
38 
39 
40 
41 
42 
43 
44 
45 
46 
47 
48 
49 
50 
51 
52 
53 
54 
55 
56 
57 
 | 
 public class UV_ANA { 
    public static void main(String[] args) { 
        SparkConf conf = new SparkConf() 
                .setAppName("UV_ANA") 
                .setMaster("local") 
                .set("spark.testing.memory", "2147480000"); 
        JavaSparkContext sc = new JavaSparkContext(conf); 
        JavaRDD<String> logRDD = sc.textFile("f:/userLog"); 
        String str = "View"; 
        final Broadcast<String> broadcast = sc.broadcast(str); 
        uvAnalyze(logRDD, broadcast); 
    } 
    private static void uvAnalyze(JavaRDD<String> logRDD, 
                                  final Broadcast<String> broadcast) { 
        JavaRDD<String> filteredLogRDD = logRDD.filter 
                (new Function<String, Boolean>() { 
                    private static final long serialVersionUID = 1L; 
                    public Boolean call(String s) throws Exception { 
                        String actionParam = broadcast.value(); 
                        String action = s.split("\t")[5]; 
                        return actionParam.equals(action); 
                    } 
                }); 
        JavaPairRDD<String, String> pairLogRDD = filteredLogRDD.mapToPair 
                (new PairFunction<String, String, String>() { 
                    private static final long serialVersionUID = 1L; 
                    public Tuple2<String, String> call(String s) throws Exception { 
                        String pageId = s.split("\t")[3]; 
                        String userId = s.split("\t")[2]; 
                        return new Tuple2<String, String>(pageId, userId); 
                    } 
                }); 
        pairLogRDD.groupByKey().foreach(new VoidFunction 
                <Tuple2<String, Iterable<String>>>() { 
            private static final long serialVersionUID = 1L; 
            public void call(Tuple2<String, Iterable<String>> tuple) 
                    throws Exception { 
                String pageId = tuple._1; 
                Iterator<String> iterator = tuple._2.iterator(); 
                Set<String> userSets = new HashSet<>(); 
                while (iterator.hasNext()) { 
                    String userId = iterator.next(); 
                    userSets.add(userId); 
                } 
                System.out.println("PAGEID:" + pageId + "\t " + 
                        "UV_COUNT:" + userSets.size()); 
            } 
        }); 
    } 
} 
 | 
【方式二】
【流程图】
| 
 1 
2 
3 
4 
5 
6 
7 
8 
9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
34 
35 
36 
37 
38 
39 
40 
41 
42 
43 
44 
45 
46 
47 
48 
49 
50 
51 
52 
53 
54 
55 
56 
57 
58 
59 
60 
61 
62 
 | 
 public class UV_ANAoptz { 
    public static void main(String[] args) { 
        SparkConf conf = new SparkConf() 
                .setAppName("UV_ANAoptz") 
                .setMaster("local") 
                .set("spark.testing.memory", "2147480000"); 
        JavaSparkContext sc = new JavaSparkContext(conf); 
        JavaRDD<String> logRDD = sc.textFile("f:/userLog"); 
        String str = "View"; 
        final Broadcast<String> broadcast = sc.broadcast(str); 
        uvAnalyzeOptz(logRDD, broadcast); 
    } 
    private static void uvAnalyzeOptz(JavaRDD<String> logRDD, 
                                      final Broadcast<String> broadcast) { 
        JavaRDD<String> filteredLogRDD = logRDD.filter 
                (new Function<String, Boolean>() { 
                    private static final long serialVersionUID = 1L; 
                    public Boolean call(String s) throws Exception { 
                        String actionParam = broadcast.value(); 
                        String action = s.split("\t")[5]; 
                        return actionParam.equals(action); 
                    } 
                }); 
        JavaPairRDD<String, String> pairRDD = filteredLogRDD.mapToPair 
                (new PairFunction<String, String, String>() { 
                    private static final long serialVersionUID = 1L; 
                    public Tuple2<String, String> call(String s) 
                            throws Exception { 
                        String pageId = s.split("\t")[3]; 
                        String userId = s.split("\t")[2]; 
                        return new Tuple2<String, String>(pageId + "_" + 
                                userId, null); 
                    } 
                }); 
        JavaPairRDD<String, Iterable<String>> groupUp2LogRDD = pairRDD.groupByKey(); 
        Map<String, Object> countByKey = groupUp2LogRDD.mapToPair 
                (new PairFunction<Tuple2<String, Iterable<String>>, 
                        String, String>() { 
                    private static final long serialVersionUID = 1L; 
                    public Tuple2<String, String> call(Tuple2<String, 
                            Iterable<String>> tuple) 
                            throws Exception { 
                        String pu = tuple._1; 
                        String[] spilted = pu.split("_"); 
                        String pageId = spilted[0]; 
                        return new Tuple2<String, String>(pageId, null); 
                    } 
                }).countByKey(); 
        Set<String> keySet = countByKey.keySet(); 
        for (String key : keySet) { 
            System.out.println("PAGEID:" + key + "\tUV_COUNT:" + 
                    countByKey.get(key)); 
        } 
    } 
} 
 | 
三、热门版块下用户访问的数量
统计出热门版块中最活跃的top3用户。
【方式一】
【流程图】
| 
 1 
2 
3 
4 
5 
6 
7 
8 
9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
34 
35 
36 
37 
38 
39 
40 
41 
42 
43 
44 
45 
46 
47 
48 
49 
50 
51 
52 
53 
54 
55 
56 
57 
58 
59 
60 
61 
62 
63 
64 
65 
66 
67 
68 
69 
70 
71 
72 
73 
74 
75 
76 
77 
78 
79 
80 
81 
82 
83 
84 
85 
86 
87 
88 
89 
90 
91 
92 
93 
94 
95 
96 
97 
98 
99 
100 
101 
102 
103 
104 
105 
106 
107 
108 
109 
110 
111 
112 
113 
114 
115 
116 
117 
118 
119 
 | 
 public class HotChannel { 
    public static void main(String[] args) { 
        SparkConf conf = new SparkConf() 
                .setAppName("HotChannel") 
                .setMaster("local") 
                .set("spark.testing.memory", "2147480000"); 
        JavaSparkContext sc = new JavaSparkContext(conf); 
        JavaRDD<String> logRDD = sc.textFile("f:/userLog"); 
        String str = "View"; 
        final Broadcast<String> broadcast = sc.broadcast(str); 
        hotChannel(sc, logRDD, broadcast); 
    } 
    private static void hotChannel(JavaSparkContext sc, JavaRDD<String> logRDD, 
                                   final Broadcast<String> broadcast) { 
        JavaRDD<String> filteredLogRDD = logRDD.filter 
                (new Function<String, Boolean>() { 
                    private static final long serialVersionUID = 1L; 
                    public Boolean call(String v1) throws Exception { 
                        String actionParam = broadcast.value(); 
                        String action = v1.split("\t")[5]; 
                        return actionParam.equals(action); 
                    } 
                }); 
        JavaPairRDD<String, String> channel2nullRDD = filteredLogRDD.mapToPair 
                (new PairFunction<String, String, String>() { 
                    private static final long serialVersionUID = 1L; 
                    public Tuple2<String, String> call(String s) throws Exception { 
                        String channel = s.split("\t")[4]; 
                        return new Tuple2<String, String>(channel, null); 
                    } 
                }); 
        Map<String, Object> channelPVMap = channel2nullRDD.countByKey(); 
        Set<String> keySet = channelPVMap.keySet(); 
        List<SortObj> channels = new ArrayList<>(); 
        for (String channel : keySet) { 
            channels.add(new SortObj(channel, Integer.valueOf 
                    (channelPVMap.get(channel) + ""))); 
        } 
        Collections.sort(channels, new Comparator<SortObj>() { 
            public int compare(SortObj o1, SortObj o2) { 
                return o2.getValue() - o1.getValue(); 
            } 
        }); 
        List<String> hotChannelList = new ArrayList<>(); 
        for (int i = 0; i < 3; i++) { 
            hotChannelList.add(channels.get(i).getKey()); 
        } 
        for (String channel : hotChannelList) { 
            System.out.println("channel:" + channel); 
        } 
        final Broadcast<List<String>> hotChannelListBroadcast = 
                sc.broadcast(hotChannelList); 
        JavaRDD<String> filterRDD = logRDD.filter(new Function<String, Boolean>() { 
            public Boolean call(String s) throws Exception { 
                List<String> hostChannels = hotChannelListBroadcast.value(); 
                String channel = s.split("\t")[4]; 
                String userId = s.split("\t")[2]; 
                return hostChannels.contains(channel) && !"null".equals(userId); 
            } 
        }); 
        JavaPairRDD<String, String> channel2UserRDD = filterRDD.mapToPair 
                (new PairFunction<String, String, String>() { 
                    public Tuple2<String, String> call(String s) 
                            throws Exception { 
                        String[] splited = s.split("\t"); 
                        String channel = splited[4]; 
                        String userId = splited[2]; 
                        return new Tuple2<String, String>(channel, userId); 
                    } 
                }); 
        channel2UserRDD.groupByKey().foreach(new VoidFunction 
                <Tuple2<String, Iterable<String>>>() { 
            public void call(Tuple2<String, Iterable<String>> tuple) 
                    throws Exception { 
                String channel = tuple._1; 
                Iterator<String> iterator = tuple._2.iterator(); 
                Map<String, Integer> userNumMap = new HashMap<>(); 
                while (iterator.hasNext()) { 
                    String userId = iterator.next(); 
                    Integer count = userNumMap.get(userId); 
                    if (count == null) { 
                        count = 1; 
                    } else { 
                        count++; 
                    } 
                    userNumMap.put(userId, count); 
                } 
                List<SortObj> lists = new ArrayList<>(); 
                Set<String> keys = userNumMap.keySet(); 
                for (String key : keys) { 
                    lists.add(new SortObj(key, userNumMap.get(key))); 
                } 
                Collections.sort(lists, new Comparator<SortObj>() { 
                    public int compare(SortObj O1, SortObj O2) { 
                        return O2.getValue() - O1.getValue(); 
                    } 
                }); 
                System.out.println("HOT_CHANNEL:" + channel); 
                for (int i = 0; i < 3; i++) { 
                    SortObj sortObj = lists.get(i); 
                    System.out.println(sortObj.getKey() + "==" 
                            + sortObj.getValue()); 
                } 
            } 
        }); 
    } 
} 
 | 
【方式二】
【流程图】
| 
 1 
2 
3 
4 
5 
6 
7 
8 
9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
34 
35 
36 
37 
38 
39 
40 
41 
42 
43 
44 
45 
46 
47 
48 
49 
50 
51 
52 
53 
54 
55 
56 
57 
58 
59 
60 
61 
62 
63 
64 
65 
66 
67 
68 
69 
70 
71 
72 
73 
74 
75 
76 
77 
78 
79 
80 
81 
82 
83 
84 
85 
86 
87 
88 
89 
90 
91 
92 
93 
94 
95 
96 
97 
98 
99 
100 
101 
102 
103 
104 
105 
106 
107 
108 
109 
110 
111 
112 
113 
114 
115 
116 
117 
118 
119 
120 
121 
122 
123 
124 
125 
126 
127 
128 
129 
130 
131 
132 
133 
134 
135 
136 
137 
138 
139 
140 
141 
142 
143 
144 
145 
146 
147 
148 
 | 
 public class HotChannelOpz { 
    public static void main(String[] args) { 
        SparkConf conf = new SparkConf() 
                .setAppName("hotChannelOpz") 
                .setMaster("local") 
                .set("spark.testing.memory", "2147480000"); 
        JavaSparkContext sc = new JavaSparkContext(conf); 
        JavaRDD<String> logRDD = sc.textFile("f:/userLog"); 
        String str = "View"; 
        final Broadcast<String> broadcast = sc.broadcast(str); 
        hotChannelOpz(sc, logRDD, broadcast); 
    } 
    private static void hotChannelOpz(JavaSparkContext sc, JavaRDD<String> logRDD, 
                                      final Broadcast<String> broadcast) { 
        JavaRDD<String> filteredLogRDD = logRDD.filter 
                (new Function<String, Boolean>() { 
                    private static final long serialVersionUID = 1L; 
                    public Boolean call(String v1) throws Exception { 
                        String actionParam = broadcast.value(); 
                        String action = v1.split("\t")[5]; 
                        return actionParam.equals(action); 
                    } 
                }); 
        JavaPairRDD<String, String> channel2nullRDD = filteredLogRDD.mapToPair 
                (new PairFunction<String, String, String>() { 
                    private static final long serialVersionUID = 1L; 
                    public Tuple2<String, String> call(String val) 
                            throws Exception { 
                        String channel = val.split("\t")[4]; 
                        return new Tuple2<String, String>(channel, null); 
                    } 
                }); 
        Map<String, Object> channelPVMap = channel2nullRDD.countByKey(); 
        Set<String> keySet = channelPVMap.keySet(); 
        List<SortObj> channels = new ArrayList<>(); 
        for (String channel : keySet) { 
            channels.add(new SortObj(channel, Integer.valueOf 
                    (channelPVMap.get(channel) + ""))); 
        } 
        Collections.sort(channels, new Comparator<SortObj>() { 
            public int compare(SortObj o1, SortObj o2) { 
                return o2.getValue() - o1.getValue(); 
            } 
        }); 
        List<String> hotChannelList = new ArrayList<>(); 
        for (int i = 0; i < 3; i++) { 
            hotChannelList.add(channels.get(i).getKey()); 
        } 
        final Broadcast<List<String>> hotChannelListBroadcast = 
                sc.broadcast(hotChannelList); 
        JavaRDD<String> filtedRDD = logRDD.filter 
                (new Function<String, Boolean>() { 
                    public Boolean call(String v1) throws Exception { 
                        List<String> hostChannels = hotChannelListBroadcast.value(); 
                        String channel = v1.split("\t")[4]; 
                        String userId = v1.split("\t")[2]; 
                        return hostChannels.contains(channel) && 
                                !"null".equals(userId); 
                    } 
                }); 
        JavaPairRDD<String, String> user2ChannelRDD = filtedRDD.mapToPair 
                (new PairFunction<String, String, String>() { 
                    private static final long serialVersionUID = 1L; 
                    public Tuple2<String, String> call(String val) 
                            throws Exception { 
                        String[] splited = val.split("\t"); 
                        String userId = splited[2]; 
                        String channel = splited[4]; 
                        return new Tuple2<String, String>(userId, channel); 
                    } 
                }); 
        JavaPairRDD<String, String> userVistChannelsRDD = 
                user2ChannelRDD.groupByKey(). 
                flatMapToPair(new PairFlatMapFunction 
                        <Tuple2<String, Iterable<String>>, String, String>() { 
                    private static final long serialVersionUID = 1L; 
                    public Iterable<Tuple2<String, String>> call 
                            (Tuple2<String, Iterable<String>> tuple) 
                            throws Exception { 
                        String userId = tuple._1; 
                        Iterator<String> iterator = tuple._2.iterator(); 
                        Map<String, Integer> channelMap = new HashMap<>(); 
                        while (iterator.hasNext()) { 
                            String channel = iterator.next(); 
                            Integer count = channelMap.get(channel); 
                            if (count == null) 
                                count = 1; 
                            else 
                                count++; 
                            channelMap.put(channel, count); 
                        } 
                        List<Tuple2<String, String>> list = new ArrayList<>(); 
                        Set<String> keys = channelMap.keySet(); 
                        for (String channel : keys) { 
                            Integer channelNum = channelMap.get(channel); 
                            list.add(new Tuple2<String, String>(channel, 
                                    userId + "_" + channelNum)); 
                        } 
                        return list; 
                    } 
                }); 
        userVistChannelsRDD.groupByKey().foreach(new VoidFunction 
                <Tuple2<String, Iterable<String>>>() { 
            public void call(Tuple2<String, Iterable<String>> tuple) 
                    throws Exception { 
                String channel = tuple._1; 
                Iterator<String> iterator = tuple._2.iterator(); 
                List<SortObj> list = new ArrayList<>(); 
                while (iterator.hasNext()) { 
                    String ucs = iterator.next(); 
                    String[] splited = ucs.split("_"); 
                    String userId = splited[0]; 
                    Integer num = Integer.valueOf(splited[1]); 
                    list.add(new SortObj(userId, num)); 
                } 
                Collections.sort(list, new Comparator<SortObj>() { 
                    public int compare(SortObj o1, SortObj o2) { 
                        return o2.getValue() - o1.getValue(); 
                    } 
                }); 
                System.out.println("HOT_CHANNLE:" + channel); 
                for (int i = 0; i < 3; i++) { 
                    SortObj sortObj = list.get(i); 
                    System.out.println(sortObj.getKey() + "===" 
                            + sortObj.getValue()); 
                } 
            } 
        }); 
    } 
} 
 | 
        


