标签:cycle submit top git tostring tar _for play gets
1.安装启动zookeeper
/home/hadoop/app/zookeeper-3.4.5-cdh5.7.0/conf/zoo.cfg
bin/zkServer start
2.安装启动logstash
/home/hadoop/app/logstash-2.4.1/project.conf
bin/logstash -f project.conf
3.安装启动kafka
/home/hadoop/app/kafka_2.11-0.9.0.0/config/server.properties
bin/kafka-server-start.sh -daemon config/server.properties
4.Storm编程
#!usr/bin/env python # coding=UTF-8 import random import time infos = [ "116.397026,39.918058", "116.410886,39.881949" , "116.272876,39.99243", "116.544079,40.417555" , "116.225404,40.258186", "116.38631,39.937209" , "116.399466,39.989743" ] phones = [ ‘15211111111‘, ‘15222222222‘ , ‘15233333333‘, ‘15244444444‘ , ‘15255555555‘, ‘15266666666‘ , ‘15277777777‘, ‘15288888888‘ , ‘15299999999‘, ‘15200000000‘ , ‘13267878348‘, ‘13432841176‘ ] def sample_phone(): return random.sample(phones, 1)[0] def smaple_info(): return random.sample(infos, 1)[0] def generate_log(count=3): time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) f = open("/home/hadoop/app/logstash-2.4.1/access.log", "a+") while count >= 1: query_log = "{phone}\t{info}\t[{local_time}]".format(phone=sample_phone(),info=smaple_info(),local_time=time_str) print(query_log) f.write(query_log+"\n") count = count - 1 if __name__ == ‘__main__‘: generate_log(10)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.liaomj</groupId> <artifactId>strom</artifactId> <version>1.0-SNAPSHOT</version> <name>strom</name> <!-- FIXME change it to the project‘s website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>16.0.1</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.1</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.4</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.0</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-jdbc</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-Java</artifactId> <version>5.1.31</version> </dependency> </dependencies> <build> <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --> <plugins> <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle --> <plugin> <artifactId>maven-clean-plugin</artifactId> <version>3.1.0</version> </plugin> <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --> <plugin> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.1</version> </plugin> <plugin> <artifactId>maven-jar-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-install-plugin</artifactId> <version>2.5.2</version> </plugin> <plugin> <artifactId>maven-deploy-plugin</artifactId> <version>2.8.2</version> </plugin> <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle --> <plugin> <artifactId>maven-site-plugin</artifactId> <version>3.7.1</version> </plugin> <plugin> <artifactId>maven-project-info-reports-plugin</artifactId> <version>3.0.0</version> </plugin> </plugins> </pluginManagement> </build> </project>
package com.kafka; import com.google.common.collect.Maps; import org.apache.storm.jdbc.bolt.JdbcInsertBolt; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.common.HikariCPConnectionProvider; import org.apache.storm.jdbc.mapper.JdbcMapper; import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.kafka.BrokerHosts; import org.apache.storm.kafka.KafkaSpout; import org.apache.storm.kafka.SpoutConfig; import org.apache.storm.kafka.ZkHosts; import org.apache.storm.topology.TopologyBuilder; import java.util.Map; import java.util.UUID; public class KafkaTopology { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); // kafka使用的zk地址
String id = UUID.randomUUID().toString(); SpoutConfig spoutConfig = new SpoutConfig(hosts,topic,zkRoot,id); spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); String SPOUT_ID = KafkaSpout.class.getSimpleName(); builder.setSpout(SPOUT_ID,kafkaSpout); String BOLT_ID = LogProcessBolt.class.getSimpleName(); builder.setBolt(BOLT_ID,new LogProcessBolt()).shuffleGrouping(SPOUT_ID); //JDBC配置参数 Map hikariConfigMap = Maps.newHashMap(); hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource"); hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/storm"); hikariConfigMap.put("dataSource.user","root"); hikariConfigMap.put("dataSource.password","root"); ConnectionProvider connectionProvider; connectionProvider = new HikariCPConnectionProvider(hikariConfigMap); //表名 String tableName = "stat"; JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider); JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper) .withInsertQuery("insert into stat values(?,?,?)") .withQueryTimeoutSecs(30); builder.setBolt("JdbcInsertBolt",userPersistanceBolt).shuffleGrouping(BOLT_ID); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(KafkaTopology.class.getSimpleName(),new Config(),builder.createTopology()); // try { // StormSubmitter.submitTopology(KafkaTopology.class.getSimpleName(), // new Config(),builder.createTopology()); // } catch (Exception e) { // e.printStackTrace(); // } } }
package com.kafka; import clojure.lang.IFn; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; public class LogProcessBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { try { // 固定写法,获得一个字节数组 byte[] binaryByField = input.getBinaryByField("bytes"); String value = new String(binaryByField); String[] splits = value.split("\t"); String phone = splits[0]; String[] temp = splits[1].split(","); String longitude = temp[0]; String latitude = temp[1]; long time = Long.parseLong(DataUtils.getTime(splits[2])); System.out.println(phone+","+longitude+","+latitude+","+time); collector.emit(new Values(time, Double.parseDouble(latitude), Double.parseDouble(longitude))); this.collector.ack(input); }catch (Exception e){ this.collector.fail(input); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("time","latitude","longitude")); } }
package com.kafka; import java.text.SimpleDateFormat; import java.util.Date; public class DataUtils { public static String getTime(String time) throws Exception{ String s = time.substring(1,time.length()-1); String res; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = simpleDateFormat.parse(s); long ts = date.getTime(); res = String.valueOf(ts); return res; } // public static void main(String[] args) throws Exception { // System.out.println(DataUtils.getTime("2019-12-23 23:00:00")); // } }
-- 查看10分钟前的信息 select longitude,latitude,count(1) from storm.stat where time>unix_timestamp(date_sub(current_timestamp(),interval 10 minute))*1000 group by longitude,latitude;
标签:cycle submit top git tostring tar _for play gets
原文地址:https://www.cnblogs.com/liaomj/p/12503615.html