标签:
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="INFO"> <Appenders> <Console name="myConsole" target="SYSTEM_OUT"> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5level %logger{36} - %msg%n"/> </Console> <RollingFile name="myFile" fileName="/Users/guludada/Desktop/logs/app.log" filePattern="/Users/guludada/Desktop/logs/app-%d{yyyy-MM-dd-HH}.log"> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5level %logger{36} - %msg%n"/> <Policies> <TimeBasedTriggeringPolicy /> </Policies> </RollingFile> </Appenders> <Loggers> <Root level="Info"> <AppenderRef ref="myConsole"/> <AppenderRef ref="myFile"/> </Root> </Loggers> </Configuration>生成器代码:
package com.guludada.ordersInfo; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Random; // Import log4j classes. import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class ordersInfoGenerator { public enum paymentWays { Wechat,Alipay,Paypal } public enum merchantNames { 优衣库,天猫,淘宝,咕噜大大,快乐宝贝,守望先峰,哈毒妇,Storm,Oracle,Java,CSDN,跑男,路易斯威登, 暴雪公司,Apple,Sumsam,Nissan,Benz,BMW,Maserati } public enum productNames { 黑色连衣裙, 灰色连衣裙, 棕色衬衫, 性感牛仔裤, 圆脚牛仔裤,塑身牛仔裤, 朋克卫衣,高腰阔腿休闲裤,人字拖鞋, 沙滩拖鞋 } float[] skuPriceGroup = {299,399,699,899,1000,2000}; float[] discountGroup = {10,20,50,100}; float totalPrice = 0; float discount = 0; float paymentPrice = 0; private static final Logger logger = LogManager.getLogger(ordersInfoGenerator.class); private int logsNumber = 1000; public void generate() { for(int i = 0; i <= logsNumber; i++) { logger.info(randomOrderInfo()); } } public String randomOrderInfo() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(); String orderNumber = randomNumbers(5) + date.getTime(); String orderDate = sdf.format(date); String paymentNumber = randomPaymentWays() + "-" + randomNumbers(8); String paymentDate = sdf.format(date); String merchantName = randomMerchantNames(); String skuInfo = randomSkus(); String priceInfo = calculateOrderPrice(); return "orderNumber: " + orderNumber + " | orderDate: " + orderDate + " | paymentNumber: " + paymentNumber + " | paymentDate: " + paymentDate + " | merchantName: " + merchantName + " | sku: " + skuInfo + " | price: " + priceInfo; } private String randomPaymentWays() { paymentWays[] paymentWayGroup = paymentWays.values(); Random random = new Random(); return paymentWayGroup[random.nextInt(paymentWayGroup.length)].name(); } private String randomMerchantNames() { merchantNames[] merchantNameGroup = merchantNames.values(); Random random = new Random(); return merchantNameGroup[random.nextInt(merchantNameGroup.length)].name(); } private String randomProductNames() { productNames[] productNameGroup = productNames.values(); Random random = new Random(); return productNameGroup[random.nextInt(productNameGroup.length)].name(); } private String randomSkus() { Random random = new Random(); int skuCategoryNum = random.nextInt(3); String skuInfo ="["; totalPrice = 0; for(int i = 1; i <= 3; i++) { int skuNum = random.nextInt(3)+1; float skuPrice = skuPriceGroup[random.nextInt(skuPriceGroup.length)]; float totalSkuPrice = skuPrice * skuNum; String skuName = randomProductNames(); String skuCode = randomCharactersAndNumbers(10); skuInfo += " skuName: " + skuName + " skuNum: " + skuNum + " skuCode: " + skuCode + " skuPrice: " + skuPrice + " totalSkuPrice: " + totalSkuPrice + ";"; totalPrice += totalSkuPrice; } skuInfo += " ]"; return skuInfo; } private String calculateOrderPrice() { Random random = new Random(); discount = discountGroup[random.nextInt(discountGroup.length)]; paymentPrice = totalPrice - discount; String priceInfo = "[ totalPrice: " + totalPrice + " discount: " + discount + " paymentPrice: " + paymentPrice +" ]"; return priceInfo; } private String randomCharactersAndNumbers(int length) { String characters = "abcdefghijklmnopqrstuvwxyz0123456789"; String randomCharacters = ""; Random random = new Random(); for (int i = 0; i < length; i++) { randomCharacters += characters.charAt(random.nextInt(characters.length())); } return randomCharacters; } private String randomNumbers(int length) { String characters = "0123456789"; String randomNumbers = ""; Random random = new Random(); for (int i = 0; i < length; i++) { randomNumbers += characters.charAt(random.nextInt(characters.length())); } return randomNumbers; } public static void main(String[] args) { ordersInfoGenerator generator = new ordersInfoGenerator(); generator.generate(); } }
agent.sources = origin agent.channels = memorychannel agent.sinks = target agent.sources.origin.type = exec agent.sources.origin.command = tail -F /export/data/trivial/app.log agent.sources.origin.channels = memorychannel agent.sources.origin.interceptors = i1 agent.sources.origin.interceptors.i1.type = static agent.sources.origin.interceptors.i1.key = topic agent.sources.origin.interceptors.i1.value = ordersInfo agent.sinks.loggerSink.type = logger agent.sinks.loggerSink.channel = memorychannel agent.channels.memorychannel.type = memory agent.channels.memorychannel.capacity = 10000 agent.sinks.target.type = avro agent.sinks.target.channel = memorychannel agent.sinks.target.hostname = 172.16.124.130 agent.sinks.target.port = 4545
agent.sources = origin agent.channels = memorychannel agent.sinks = target agent.sources.origin.type = avro agent.sources.origin.channels = memorychannel agent.sources.origin.bind = 0.0.0.0 agent.sources.origin.port = 4545 agent.sinks.loggerSink.type = logger agent.sinks.loggerSink.channel = memorychannel agent.channels.memorychannel.type = memory agent.channels.memorychannel.capacity = 5000000 agent.channels.memorychannel.transactionCapacity = 1000000 agent.sinks.target.type = org.apache.flume.sink.kafka.KafkaSink #agent.sinks.target.topic = bigdata agent.sinks.target.brokerList=localhost:9092 agent.sinks.target.requiredAcks=1 agent.sinks.target.batchSize=100 agent.sinks.target.channel = memorychannel
config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=export/data/kafka zookeeper.connect=localhost:2181
config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9093 log.dir=/export/data/kafka zookeeper.connect=localhost:2181broker.id是kafka集群上每一个节点的单独标识,不能重复;listeners可以理解为每一个节点上Kafka进程要监听的端口,使用默认的就行; log.dir是Kafka的log文件(记录消息的log file)存放目录; zookeeper.connect就是Zookeeper的URI地址和端口。
> bin/kafka-server-start.sh config/server-1.properties & ... > bin/kafka-server-start.sh config/server-2.properties & ...
conf/storm.yaml
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ########### These MUST be filled in for a storm configuration storm.zookeeper.servers: - "ymhHadoop" - "ymhHadoop2" - "ymhHadoop3" storm.local.dir: "/export/data/storm/workdir" nimbus.host: "ymhHadoop" supervisor.slots.ports: -6700 -6701 -6702 -6703
package com.guludada.ordersanalysis; import java.util.UUID; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import storm.kafka.Broker; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StaticHosts; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import storm.kafka.trident.GlobalPartitionInformation; public class ordersAnalysisTopology { private static String topicName = "ordersInfo"; private static String zkRoot = "/stormKafka/"+topicName; public static void main(String[] args) { BrokerHosts hosts = new ZkHosts("ymhHadoop:2181,ymhHadoop2:2181,ymhHadoop3:2181"); SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName,zkRoot,UUID.randomUUID().toString()); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafkaSpout",kafkaSpout); builder.setBolt("merchantsSalesBolt", new merchantsSalesAnalysisBolt(), 2).shuffleGrouping("kafkaSpout"); Config conf = new Config(); conf.setDebug(true); if(args != null && args.length > 0) { conf.setNumWorkers(1); try { StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InvalidTopologyException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { conf.setMaxSpoutPending(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("ordersAnalysis", conf, builder.createTopology()); } } }
package com.guludada.domain; import java.util.ArrayList; import java.util.Date; public class ordersBean { Date createTime = null; String number = ""; String paymentNumber = ""; Date paymentDate = null; String merchantName = ""; ArrayList<skusBean> skuGroup = null; float totalPrice = 0; float discount = 0; float paymentPrice = 0; public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public String getNumber() { return number; } public void setNumber(String number) { this.number = number; } public String getPaymentNumber() { return paymentNumber; } public void setPaymentNumber(String paymentNumber) { this.paymentNumber = paymentNumber; } public Date getPaymentDate() { return paymentDate; } public void setPaymentDate(Date paymentDate) { this.paymentDate = paymentDate; } public String getMerchantName() { return merchantName; } public void setMerchantName(String merchantName) { this.merchantName = merchantName; } public ArrayList<skusBean> getSkuGroup() { return skuGroup; } public void setSkuGroup(ArrayList<skusBean> skuGroup) { this.skuGroup = skuGroup; } public float getTotalPrice() { return totalPrice; } public void setTotalPrice(float totalPrice) { this.totalPrice = totalPrice; } public float getDiscount() { return discount; } public void setDiscount(float discount) { this.discount = discount; } public float getPaymentPrice() { return paymentPrice; } public void setPaymentPrice(float paymentPrice) { this.paymentPrice = paymentPrice; } }本文例子中用不到skusbean,所以这里作者就没有写偷懒一下下
package com.guludada.domain; public class skusBean { ……………… }
package com.guludada.common; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.guludada.domain.ordersBean; public class logInfoHandler { SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public ordersBean getOrdersBean(String orderInfo) { ordersBean order = new ordersBean(); //从日志信息中过滤出订单信息 Pattern orderPattern = Pattern.compile("orderNumber:.+"); Matcher orderMatcher = orderPattern.matcher(orderInfo); if(orderMatcher.find()) { String orderInfoStr = orderMatcher.group(0); String[] orderInfoGroup = orderInfoStr.trim().split("\\|"); //获取订单号 String orderNum = (orderInfoGroup[0].split(":"))[1].trim(); order.setNumber(orderNum); //获取创建时间 String orderCreateTime = orderInfoGroup[1].trim().split(" ")[1] + " " + orderInfoGroup[1].trim().split(" ")[2]; try { order.setCreateTime(sdf_final.parse(orderCreateTime)); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } //获取商家名称 String merchantName = (orderInfoGroup[4].split(":"))[1].trim(); order.setMerchantName(merchantName); //获取订单总额 String orderPriceInfo = (orderInfoGroup[6].split("price:"))[1].trim(); String totalPrice = (orderPriceInfo.substring(2, orderPriceInfo.length()-3).trim().split(" "))[1]; order.setTotalPrice(Float.parseFloat(totalPrice)); return order; } else { return order; } } }
package com.guludada.ordersanalysis; import java.util.Map; import com.guludada.common.logInfoHandler; import com.guludada.domain.ordersBean; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class merchantsSalesAnalysisBolt extends BaseRichBolt { private OutputCollector _collector; logInfoHandler loginfohandler; JedisPool pool; public void execute(Tuple tuple) { String orderInfo = tuple.getString(0); ordersBean order = loginfohandler.getOrdersBean(orderInfo); //store the salesByMerchant infomation into Redis Jedis jedis = pool.getResource(); jedis.zincrby("orderAna:topSalesByMerchant", order.getTotalPrice(), order.getMerchantName()); } public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { this._collector = collector; this.loginfohandler = new logInfoHandler(); this.pool = new JedisPool(new JedisPoolConfig(), "ymhHadoop",6379,2 * 60000,"12345"); } public void declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub } }
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.guludada</groupId> <artifactId>Storm_OrdersAnalysis</artifactId> <packaging>war</packaging> <version>0.0.1-SNAPSHOT</version> <name>Storm_OrdersAnalysis Maven Webapp</name> <url>http://maven.apache.org</url> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.6</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.6</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.8.1</version> </dependency> </dependencies> <build> <finalName>Storm_OrdersAnalysis</finalName> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.guludada.ordersanalysis.ordersAnalysisTopology</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
Flume+Kafka+Storm+Redis实时分析系统基本架构
标签:
原文地址:http://blog.csdn.net/ymh198816/article/details/51998085