标签:
监控指定文件夹,读取文件(新文件动态读取)里的内容,统计单词的数量。
FileSpout.java,监控文件夹,读取新文件内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
package com.test.stormtest.wordcount; import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class FileSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; private File target = new File( "F:" + File.separator + "test" ); private Collection<File> cacheFiles = null ; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this .collector = collector; //启动的时候,将文件夹内的所有文件的内容发射出去 cacheFiles = FileUtils.listFiles(target, null , true ); for (File file : cacheFiles) { emitFileConent(file); } } public void nextTuple() { try { Thread.sleep( 5000 ); } catch (InterruptedException e1) { e1.printStackTrace(); } //监控新文件,将新文件的内容发射出去 Collection<File> files = FileUtils.listFiles(target, null , true ); for (File file : files) { if (!cacheFiles.contains(file)) { emitFileConent(file); } } cacheFiles = files; } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "line" )); } //将文件内容按行发射出去 private void emitFileConent(File file) { try { List<String> lines = FileUtils.readLines(file); for (String line : lines) { this .collector.emit( new Values(line)); } } catch (IOException e) { e.printStackTrace(); } } } |
SplitBolt.java,将行拆分成单词
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
package com.test.stormtest.wordcount; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class SplitBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector = null ; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this .collector = collector; } public void execute(Tuple input) { String line = input.getStringByField( "line" ); String[] words = line.split( " " ); for (String word : words) { this .collector.emit( new Values(word)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "word" )); } } |
SumBolt.java 统计单词数量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package com.test.stormtest.wordcount; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; public class SumBolt extends BaseRichBolt{ private static final long serialVersionUID = 1L; private Map<String, Long> countMap = null ; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { countMap = new HashMap<String, Long>(); } public void execute(Tuple input) { String word = input.getStringByField( "word" ); Long count = countMap.get(word); if (count == null ) { count = 0L; } countMap.put(word, ++count); System.out.println( "-----------------------------------------------" ); Set<Entry<String, Long>> entries = countMap.entrySet(); for (Entry<String, Long> entry : entries) { System.out.println(entry.getKey() + ": " + entry.getValue()); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { } } |
WordCountTopology.java 驱动类,本地模式提交topology
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
package com.test.stormtest.wordcount; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; public class WordCountTopology { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout( "filespout" , new FileSpout()); builder.setBolt( "splitbolt" , new SplitBolt()).shuffleGrouping( "filespout" ); builder.setBolt( "sumtblot" , new SumBolt()).fieldsGrouping( "splitbolt" , new Fields( "word" )); LocalCluster cluster = new LocalCluster(); Config config = new Config(); config.setDebug( true ); cluster.submitTopology( "wordcount" , config, builder.createTopology()); Utils.sleep( 20000 ); cluster.killTopology( "wordcount" ); cluster.shutdown(); } } |
标签:
原文地址:http://www.cnblogs.com/lishouguang/p/4559206.html