码迷,mamicode.com
首页 > 其他好文 > 详细

新闻网大数据实时分析可视化系统项目——10、数据采集/存储/分发完整流程测试

时间:2019-05-10 16:31:07      阅读:232      评论:0      收藏:0      [点我收藏+]

标签:项目打包   工具   开发工具   aci   单位   sleep   stat   oid   local   

(一)idea工具开发数据生成模拟程序

1.在idea开发工具中构建weblogs项目,编写数据生成模拟程序。

package main.java;

import java.io.*;

public class ReadWrite {

      static String readFileName;

      static String writeFileName;

      public static void main(String args[]){

           readFileName = args[0];

           writeFileName = args[1];

          try {

             // readInput();

            readFileByLines(readFileName);

          }catch(Exception e){

          }

      }

 

    public static void readFileByLines(String fileName) {

        FileInputStream fis = null;

        InputStreamReader isr = null;

        BufferedReader br = null;

        String tempString = null;

        try {

            System.out.println("以行为单位读取文件内容,一次读一整行:");

            fis = new FileInputStream(fileName);// FileInputStream

            // 从文件系统中的某个文件中获取字节

            isr = new InputStreamReader(fis,"GBK");

            br = new BufferedReader(isr);

            int count=0;

            while ((tempString = br.readLine()) != null) {

                count++;

                // 显示行号

                Thread.sleep(300);

                String str = new String(tempString.getBytes("UTF8"),"GBK");

                System.out.println("row:"+count+">>>>>>>>"+tempString);

                method1(writeFileName,tempString);

                //appendMethodA(writeFileName,tempString);

            }

            isr.close();

        } catch (IOException e) {

            e.printStackTrace();

        } catch (InterruptedException e) {

            e.printStackTrace();

        } finally {

            if (isr != null) {

                try {

                    isr.close();

                } catch (IOException e1) {

                }

            }

        }

    }

    public static void method1(String file, String conent) {

        BufferedWriter out = null;

        try {

            out = new BufferedWriter(new OutputStreamWriter(

                    new FileOutputStream(file, true)));

            out.write("\n");

            out.write(conent);

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            try {

                out.close();

            } catch (IOException e) {

                e.printStackTrace();

            }

        }

    }

}

2.参照前面idea工具项目打包方式,将该项目打成weblogs.jar包,然后上传至bigdata-pro01.kfk.com节点的/opt/jars目录下(目录需要提前创建)

3.将weblogs.jar分发到另外两个节点

1)在另外两个节点上分别创建/opt/jars目录

mkdir /opt/jars

2)将weblogs.jar分发到另外两个节点

scp weblogs.jar bigdata-pro02.kfk.com:/opt/jars/

scp weblogs.jar bigdata-pro03.kfk.com:/opt/jars/

4.编写运行模拟程序的shell脚本

1)在bigdata-pro02.kfk.com节点的/opt/datas目录下,创建weblog-shell.sh脚本。

vi weblog-shell.sh

#/bin/bash

echo "start log......"

#第一个参数是原日志文件,第二个参数是日志生成输出文件

java -jar /opt/jars/weblogs.jar /opt/datas/weblog.log /opt/datas/weblog-flume.log

修改weblog-shell.sh可执行权限

chmod 777 weblog-shell.sh

2)将bigdata-pro02.kfk.com节点上的/opt/datas/目录拷贝到bigdata-pro03节点.kfk.com

scp -r /opt/datas/ bigdata-pro03.kfk.com:/opt/datas/

3)修改bigdata-pro02.kfk.com和bigdata-pro03.kfk.com节点上面日志采集文件路径。以bigdata-pro02.kfk.com节点为例。

vi flume-conf.properties

agent2.sources = r1

agent2.channels = c1

agent2.sinks = k1

 

agent2.sources.r1.type = exec

#修改采集日志文件路径,bigdata-pro03.kfk.com节点也是修改此处

agent2.sources.r1.command = tail -F /opt/datas/weblog-flume.log

agent2.sources.r1.channels = c1

 

agent2.channels.c1.type = memory

agent2.channels.c1.capacity = 10000

agent2.channels.c1.transactionCapacity = 10000

agent2.channels.c1.keep-alive = 5

 

agent2.sinks.k1.type = avro

agent2.sinks.k1.channel = c1

agent2.sinks.k1.hostname = bigdata-pro01.kfk.com

agent2.sinks.k1.port = 5555

(二)编写启动flume服务程序的shell脚本

1.在bigdata-pro02.kfk.com节点的flume安装目录下编写flume启动脚本。

vi flume-kfk-start.sh

#/bin/bash

echo "flume-2 start ......"

bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent2 -Dflume.root.logger=INFO,console

2.在bigdata-pro03.kfk.com节点的flume安装目录下编写flume启动脚本。

vi flume-kfk-start.sh

#/bin/bash

echo "flume-3 start ......"

bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent3 -Dflume.root.logger=INFO,console

3.在bigdata-pro01.kfk.com节点的flume安装目录下编写flume启动脚本。

vi flume-kfk-start.sh

#/bin/bash

echo "flume-1 start ......"

bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent1 -Dflume.root.logger=INFO,console

(三)编写Kafka Consumer执行脚本

1.在bigdata-pro01.kfk.com节点的Kafka安装目录下编写Kafka Consumer执行脚本

vi kfk-test-consumer.sh

#/bin/bash

echo "kfk-kafka-consumer.sh start ......"

bin/kafka-console-consumer.sh --zookeeper bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181 --from-beginning --topic weblogs

2.将kfk-test-consumer.sh脚本分发另外两个节点

scp kfk-test-consumer.sh bigdata-pro02.kfk.com:/opt/modules/kakfa_2.11-0.8.2.1/

scp kfk-test-consumer.sh bigdata-pro03.kfk.com:/opt/modules/kakfa_2.11-0.8.2.1/

(四)启动模拟程序并测试

在bigdata-pro02.kfk.com节点启动日志产生脚本,模拟产生日志是否正常。

/opt/datas/weblog-shell.sh

技术图片

(五)启动数据采集所有服务

1.启动Zookeeper服务

bin/zkServer.sh start

2.启动hdfs服务

sbin/start-dfs.sh

3.启动HBase服务

 bin/start-hbase.sh

创建hbase业务表

bin/hbase shell

create ‘weblogs‘,‘info‘

4.启动Kafka服务

bin/kafka-server-start.sh config/server.properties &

创建业务数据topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic weblogs --replication-factor 1 --partitions 1

5.配置flume相关环境变量

vi flume-env.sh

export JAVA_HOME=/opt/modules/jdk1.7.0_67

export HADOOP_HOME=/opt/modules/hadoop-2.5.0

export HBASE_HOME=/opt/modules/hbase-0.98.6-cdh5.3.0

(六)完成数据采集全流程测试

1.在bigdata-pro01.kfk.com节点上启动flume聚合脚本,将采集的数据分发到Kafka集群和hbase集群。

./flume-kfk-start.sh

2.在bigdata-pro02.kfk.com节点上完成数据采集

1)使用shell脚本模拟日志产生

cd /opt/datas/

./weblog-shell.sh

2)启动flume采集日志数据发送给聚合节点

./flume-kfk-start.sh

3.在bigdata-pro03.kfk.com节点上完成数据采集

1)使用shell脚本模拟日志产生

cd /opt/datas/

./weblog-shell.sh

2)启动flume采集日志数据发送给聚合节点

./flume-kfk-start.sh

4.启动Kafka Consumer查看flume日志采集情况

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic weblogs --from-beginning

5.查看hbase数据写入情况

./hbase-shell

count ‘weblogs‘

新闻网大数据实时分析可视化系统项目——10、数据采集/存储/分发完整流程测试

标签:项目打包   工具   开发工具   aci   单位   sleep   stat   oid   local   

原文地址:https://www.cnblogs.com/ratels/p/10844870.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!