码迷,mamicode.com
首页 > Web开发 > 详细

大数据技术之_18_大数据离线平台_04_数据分析 + Hive 之 hourly 分析 + 常用 Maven 仓库地址

时间:2019-04-17 23:28:47      阅读:336      评论:0      收藏:0      [点我收藏+]

标签:size   time   getc   Dimension   转换   ddd   ima   reducer   结束时间   

二十、数据分析20.1、统计表20.2、目标20.3、代码实现20.3.1、Mapper20.3.2、Reducer20.3.3、Runner20.3.4、测试二十一、Hive 之 hourly 分析21.1、目标21.2、目标解析21.3、创建 Mysql 结果表21.4、Hive 分析21.4.1、创建 Hive 外部表,关联 HBase 数据表21.4.2、创建临时表用于存放 pageview 和 launch 事件的数据(即存放过滤数据)21.4.3、提取 e_pv 和 e_l 事件数据到临时表中21.4.4、创建分析结果临时保存表21.4.5、分析活跃访客数21.4.6、分析会话长度21.4.7、创建最终结果表21.4.8、向结果表中插入数据21.4.9、使用 Sqoop 导出 数据到 Mysql,观察数据二十二、常用 Maven 仓库地址


二十、数据分析

20.1、统计表

技术图片
通过表结构可以发现,只要维度id确定了,那么 new_install_users 也就确定了。

 

20.2、目标

  按照不同维度统计新增用户。比如:将 日、周、月 新增用户统计出来。传入的时间参数是: -date 2017-08-14

20.3、代码实现

20.3.1、Mapper

  • Step1、创建 NewInstallUsersMapper 类,outputKey 为 StatsUserDimension,outputValue 为 Text。定义全局变量,Key 和 Value 的对象。

  • Step2、覆写 map 方法,在该方法中读取 HBase 中待处理的数据,分别要包含维度的字段信息以及必有的字段信息。比如:serverTime、platformName、platformVersion、browserName、browserVersion、uuid。

  • Step3、数据过滤以及时间字符串转换。

  • Step4、构建维度信息:天维度,周维度,月维度,platform 维度[(name, version)(name, all)(all, all)],browser 维度[(browser, all) (browser, version)]。

  • Step5、设置 outputValue 的值为 uuid。

  • Step6、按照不同维度设置 outputKey。

  • Step7、将封装好的数据写入到 Mapper 的上下文对象中,输出给 Reducer。

示例代码如下:
NewInstallUsersMapper.java

package com.z.transformer.mr.statistics;

import java.io.IOException;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

import com.z.transformer.common.DateEnum;
import com.z.transformer.common.EventLogConstants;
import com.z.transformer.common.GlobalConstants;
import com.z.transformer.common.KpiType;
import com.z.transformer.dimension.key.base.BrowserDimension;
import com.z.transformer.dimension.key.base.DateDimension;
import com.z.transformer.dimension.key.base.KpiDimension;
import com.z.transformer.dimension.key.base.PlatformDimension;
import com.z.transformer.dimension.key.stats.StatsCommonDimension;
import com.z.transformer.dimension.key.stats.StatsUserDimension;
import com.z.transformer.util.TimeUtil;

/**
 * 思路:思路:HBase 读取数据 --> HBaseInputFormat --> Mapper --> Reducer --> DBOutPutFormat--> 这接写入到 MySql 中
 * 
 * @author bruce
 */

public class NewInstallUserMapper extends TableMapper<StatsUserDimensionText{
    // Mapper 的 OutPutKey 和 OutPutValue
    // OutPutKey = StatsUserDimension 进行用户分析的组合维度(用户基本分析维度和浏览器分析维度)
    // OutPutValue = Text uuid(字符串)

    private static final Logger logger = Logger.getLogger(NewInstallUserMapper.class);

    // 定义列族
    private byte[] family = EventLogConstants.BYTES_EVENT_LOGS_FAMILY_NAME;

    // 定义输出 key
    private StatsUserDimension outputKey = new StatsUserDimension();
    // 定义输出 value
    private Text outputValue = new Text();

    // 映射输出 key 中的 StatsCommonDimension(公用维度) 属性,方便后续封装操作
    private StatsCommonDimension statsCommonDimension = this.outputKey.getStatsCommon();

    private long date, endOfDate; // 定义运行天的起始时间戳和结束时间戳
    private long firstThisWeekOfDate, endThisWeekOfDate; // 定义运行天所属周的起始时间戳和结束时间戳
    private long firstThisMonthOfDate, firstDayOfNextMonth; // 定义运行天所属月的起始时间戳和结束时间戳

    // 创建 kpi 维度对象
    private KpiDimension newInstallUsersKpiDimension = new KpiDimension(KpiType.NEW_INSTALL_USER.name);
    private KpiDimension browserNewInstallUsersKpiDimension = new KpiDimension(KpiType.BROWSER_NEW_INSTALL_USER.name);

    // 定义一个特殊占位的浏览器维度对象
    private BrowserDimension defaultBrowserDimension = new BrowserDimension("""");

    // 初始化操作
    @Override
    protected void setup(Mapper<ImmutableBytesWritable, Result, StatsUserDimension, Text>.Context context)
            throws IOException, InterruptedException 
{
        // 1、获取参数配置项的上下文
        Configuration conf = context.getConfiguration();
        // 2、获取我们给定的运行时间参数,获取运行的是哪一天的数据
        String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES);

        // 传入时间所属当前天开始的时间戳,即当前天的0点0分0秒的毫秒值
        this.date = TimeUtil.parseString2Long(date);
        // 传入时间所属当前天结束的时间戳
        this.endOfDate = this.date + GlobalConstants.DAY_OF_MILLISECONDS;
        // 传入时间所属当前周的第一天的时间戳
        this.firstThisWeekOfDate = TimeUtil.getFirstDayOfThisWeek(this.date);
        // 传入时间所属下一周的第一天的时间戳
        this.endThisWeekOfDate = TimeUtil.getFirstDayOfNextWeek(this.date);
        // 传入时间所属当前月的第一天的时间戳
        this.firstThisMonthOfDate = TimeUtil.getFirstDayOfThisMonth(this.date);
        // 传入时间所属下一月的第一天的时间戳
        this.firstDayOfNextMonth = TimeUtil.getFirstDayOfNextMonth(this.date);
    }

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context)
            throws IOException, InterruptedException 
{
        // 1、获取属性,参数值,即读取 HBase 中的数据:serverTime、platformName、platformVersion、browserName、browserVersion、uuid
        String serverTime = Bytes
                .toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME)));
        String platformName = Bytes
                .toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PLATFORM)));
        String platformVersion = Bytes
                .toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_VERSION)));
        String browserName = Bytes
                .toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME)));
        String browserVersion = Bytes
                .toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION)));
        String uuid = Bytes
                .toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_UUID)));

        // 2、针对数据进行简单过滤(实际开发中过滤条件更多)
        if (StringUtils.isBlank(platformName) || StringUtils.isBlank(uuid)) {
            logger.debug("数据格式异常,直接过滤掉数据:" + platformName);
            return// 过滤掉无效数据
        }

        // 属性处理
        long longOfServerTime = -1;
        try {
            longOfServerTime = Long.valueOf(serverTime); // 将字符串转换为long类型
        } catch (Exception e) {
            logger.debug("服务器时间格式异常:" + serverTime);
            return// 服务器时间异常的数据直接过滤掉
        }

        // 3、构建维度信息
        // 获取当前服务器时间对应的当天维度的对象
        DateDimension dayOfDimension = DateDimension.buildDate(longOfServerTime, DateEnum.DAY);
        // 获取当前服务器时间对应的当周维度的对象
        DateDimension weekOfDimension = DateDimension.buildDate(longOfServerTime, DateEnum.WEEK);
        // 获取当前服务器时间对应的当月维度的对象
        DateDimension monthOfDimension = DateDimension.buildDate(longOfServerTime, DateEnum.MONTH);
        // 还可以获取 当季维度、当年维度......

        // 构建平台维度对象
        List<PlatformDimension> platforms = PlatformDimension.buildList(platformName, platformVersion);
        // 构建浏览器维度对象
        List<BrowserDimension> browsers = BrowserDimension.buildList(browserName, browserVersion);

        // 4、设置 outputValue
        this.outputValue.set(uuid);

        // 5、设置 outputKey
        for (PlatformDimension pf : platforms) {
            // 设置浏览器维度(是个空的)
            this.outputKey.setBrowser(this.defaultBrowserDimension);
            // 设置平台维度
            this.statsCommonDimension.setPlatform(pf);

            // 下面的代码是处理对应于 stats_user 表的统计数据

            // 设置 kpi 维度
            this.statsCommonDimension.setKpi(this.newInstallUsersKpiDimension);

            // 处理不同时间维度的情况
            // 处理天维度数据,要求服务器时间处于指定日期的范围:[today, endOfDate)
            if (longOfServerTime >= date && longOfServerTime < endOfDate) {
                // 设置时间维度为服务器时间当天的维度
                this.statsCommonDimension.setDate(dayOfDimension);
                // 输出数据
                context.write(outputKey, outputValue);
            }

            // 处理周维度数据,范围:[firstThisWeekOfDate, endThisWeekOfDate)
            if (longOfServerTime >= firstThisWeekOfDate && longOfServerTime < endThisWeekOfDate) {
                // 设置时间维度为服务器时间所属周的维度
                this.statsCommonDimension.setDate(weekOfDimension);
                // 输出数据
                context.write(outputKey, outputValue);
            }

            // 处理月维度数据,范围:[firstThisMonthOfDate, firstDayOfNextMonth)
            if (longOfServerTime >= firstThisMonthOfDate && longOfServerTime < firstDayOfNextMonth) {
                // 设置时间维度为服务器时间所属月的维度
                this.statsCommonDimension.setDate(monthOfDimension);
                // 输出数据
                context.write(outputKey, outputValue);
            }

            // 下面的代码是处理对应于 stats_device_browser 表的统计数据

            // 设置 kpi 维度
            this.statsCommonDimension.setKpi(this.browserNewInstallUsersKpiDimension);
            for (BrowserDimension br : browsers) {
                // 设置浏览器维度
                this.outputKey.setBrowser(br);

                // 处理不同时间维度的情况
                // 处理天维度数据,要求当前事件的服务器时间处于指定日期的范围内,[今天0点, 明天0点)
                if (longOfServerTime >= date && longOfServerTime < endOfDate) {
                    // 设置时间维度为服务器时间当天的维度
                    this.statsCommonDimension.setDate(dayOfDimension);
                    // 输出数据
                    context.write(outputKey, outputValue);
                }

                // 处理周维度数据,范围:[firstThisWeekOfDate, endThisWeekOfDate)
                if (longOfServerTime >= firstThisWeekOfDate && longOfServerTime < endThisWeekOfDate) {
                    // 设置时间维度为服务器时间所属周的维度
                    this.statsCommonDimension.setDate(weekOfDimension);
                    // 输出数据
                    context.write(outputKey, outputValue);
                }

                // 处理月维度数据,范围:[firstThisMonthOfDate, firstDayOfNextMonth)
                if (longOfServerTime >= firstThisMonthOfDate && longOfServerTime < firstDayOfNextMonth) {
                    // 设置时间维度为服务器时间所属月的维度
                    this.statsCommonDimension.setDate(monthOfDimension);
                    // 输出数据
                    context.write(outputKey, outputValue);
                }
            }
        }

    }
}

20.3.2、Reducer

  • Step1、创建 NewInstallUserReducer<StatsUserDimension, Text, StatsUserDimension, MapWritableValue> 类,覆写 reduce 方法。

  • Step2、统计 uuid 出现的次数,并且去重。

  • Step3、将数据拼装到 outputValue 中。

  • Step4、设置数据业务 KPI 类型,最终输出数据。

维度类结构图

技术图片

我们再来回顾下大数据离线平台架构图:

技术图片
示例代码如下:
NewInstallUserReducer.java

 

package com.z.transformer.mr.statistics;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import com.z.transformer.common.KpiType;
import com.z.transformer.dimension.key.stats.StatsUserDimension;
import com.z.transformer.dimension.value.MapWritableValue;

public class NewInstallUserReducer extends Reducer<StatsUserDimensionTextStatsUserDimensionMapWritableValue{

    // 保存唯一 id 的集合 Set,用于计算新增的访客数量
    private Set<String> uniqueSets = new HashSet<String>();

    // 定义输出 value
    private MapWritableValue outputValue = new MapWritableValue();

    @Override
    protected void reduce(StatsUserDimension key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException 
{
        // 1、统计 uuid 出现的次数,去重
        for (Text uuid : values) { // 增强 for 循环,遍历 values
            this.uniqueSets.add(uuid.toString());
        }

        // 2、输出数据拼装
        MapWritable map = new MapWritable();
        map.put(new IntWritable(-1), new IntWritable(this.uniqueSets.size()));
        this.outputValue.setValue(map);

        // 3、设置 outputValue 数据对应描述的业务指标(kpi)
        if (KpiType.BROWSER_NEW_INSTALL_USER.name.equals(key.getStatsCommon().getKpi().getKpiName())) {
            // 表示处理的是 browser new install user kpi 的计算
            this.outputValue.setKpi(KpiType.BROWSER_NEW_INSTALL_USER);
        } else if (KpiType.NEW_INSTALL_USER.name.equals(key.getStatsCommon().getKpi().getKpiName())) {
            // 表示处理的是 new install user kpi 的计算
            this.outputValue.setKpi(KpiType.NEW_INSTALL_USER);
        }

        // 4、输出数据
        context.write(key, outputValue);
    }
}

20.3.3、Runner

  • Step1、创建 NewInstallUserRunner 类,实现 Tool 接口。

  • Step2、添加时间处理函数,用来截取参数。

  • Step3、组装 Job。

  • Step4、设置 HBase InputFormat(设置从 HBase 中读取的数据都有哪些)。

  • Step5、自定义 OutPutFormat 并设置。

示例代码如下:
NewInstallUserRunner.java

package com.z.transformer.mr.statistics;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.z.transformer.common.EventLogConstants;
import com.z.transformer.common.EventLogConstants.EventEnum;
import com.z.transformer.common.GlobalConstants;
import com.z.transformer.dimension.key.stats.StatsUserDimension;
import com.z.transformer.dimension.value.MapWritableValue;
import com.z.transformer.mr.TransformerMySQLOutputFormat;
import com.z.transformer.util.TimeUtil;

public class NewInstallUserRunner implements Tool {

    // 给定一个参数表示参数上下文
    private Configuration conf = null;

    public static void main(String[] args) {
        try {
            int exitCode = ToolRunner.run(new NewInstallUserRunner(), args);
            if (exitCode == 0) {
                System.out.println("运行成功");
            } else {
                System.out.println("运行失败");
            }
            System.exit(exitCode);
        } catch (Exception e) {
            System.err.println("执行异常:" + e.getMessage());
        }
    }

    @Override
    public void setConf(Configuration conf) {
        // 添加自己开发环境所有需要的其他资源属性文件
        conf.addResource("transformer-env.xml");
        conf.addResource("output-collector.xml");
        conf.addResource("query-mapping.xml");

        // 创建 HBase 的 Configuration 对象
        this.conf = HBaseConfiguration.create(conf);
    }

    @Override
    public Configuration getConf() {
        return this.conf;
    }

    @Override
    public int run(String[] args) throws Exception {
        // 1、获取参数上下文对象
        Configuration conf = this.getConf();

        // 2、处理传入的参数,将参数添加到上下文中
        this.processArgs(conf, args);

        // 3、创建 Job
        Job job = Job.getInstance(conf, "new_install_users");

        // 4、设置 Job 的 jar 相关信息
        job.setJarByClass(NewInstallUserRunner.class);

        // 5、设置 IntputFormat 相关配置参数
        this.setHBaseInputConfig(job);

        // 6、设置 Mapper 相关参数
        // 在 setHBaseInputConfig 已经设置了

        // 7、设置 Reducer 相关参数
        job.setReducerClass(NewInstallUserReducer.class);
        job.setOutputKeyClass(StatsUserDimension.class);
        job.setOutputValueClass(MapWritableValue.class);

        // 8、设置 OutputFormat 相关参数,使用一个自定义的 OutputFormat
        job.setOutputFormatClass(TransformerMySQLOutputFormat.class);

        // 9、Job 提交运行
        boolean result = job.waitForCompletion(true);
        // 10、运行成功返回 0,失败返回 -1
        return result ? 0 : -1;
    }

    /**
     * 处理时间参数,如果没有传递参数的话,则默认清洗前一天的。
     * 
     * Job脚本如下: bin/yarn jar ETL.jar com.z.transformer.mr.etl.AnalysisDataRunner -date 2017-08-14
     * 
     * @param args
     */

    private void processArgs(Configuration conf, String[] args) {
        String date = null;
        for (int i = 0; i < args.length; i++) {
            if ("-date".equals(args[i])) {
                if (i + 1 < args.length) {
                    date = args[i + 1];
                    break;
                }
            }
        }
        // 查看是否需要默认参数
        if (StringUtils.isBlank(date) || !TimeUtil.isValidateRunningDate(date)) {
            date = TimeUtil.getYesterday(); // 默认时间是昨天
        }
        // 保存到上下文中间
        conf.set(GlobalConstants.RUNNING_DATE_PARAMES, date);
    }

    /**
     * 设置从 hbase 读取数据的相关配置信息
     * 
     * @param job
     * @throws IOException
     */

    private void setHBaseInputConfig(Job job) throws IOException {
        Configuration conf = job.getConfiguration();

        // 获取已经执行ETL操作的那一天的数据
        String dateStr = conf.get(GlobalConstants.RUNNING_DATE_PARAMES); // 2017-08-14

        // 因为我们要访问 HBase 中的多张表,所以需要多个 Scan 对象,所以创建 Scan 集合
        List<Scan> scans = new ArrayList<Scan>();

        // 开始构建 Scan 集合
        // 1、构建 Hbase Scan Filter 对象
        FilterList filterList = new FilterList();
        // 2、构建只获取 Launch 事件的 Filter
        filterList.addFilter(new SingleColumnValueFilter(
                EventLogConstants.BYTES_EVENT_LOGS_FAMILY_NAME, // 列族
                Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), // 事件名称
                CompareOp.EQUAL, // 等于判断
                Bytes.toBytes(EventEnum.LAUNCH.alias))); // Launch 事件的别名
        // 3、构建部分列的过滤器 Filter
        String[] columns = new String[] { 
                EventLogConstants.LOG_COLUMN_NAME_PLATFORM, // 平台名称
                EventLogConstants.LOG_COLUMN_NAME_VERSION, // 平台版本
                EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, // 浏览器名称
                EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION, // 浏览器版本
                EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, // 服务器时间
                EventLogConstants.LOG_COLUMN_NAME_UUID, // 访客唯一标识符 uuid
                EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME // 确保根据事件名称过滤数据有效,所以需要该列的值
        };

        // 创建 getColumnFilter 方法用于得到 Filter 对象
        // 根据列名称过滤数据的 Filter
        filterList.addFilter(this.getColumnFilter(columns));

        // 4、数据来源表所属日期是哪些
        long startDate, endDate; // Scan 的表区间属于[startDate, endDate)

        long date = TimeUtil.parseString2Long(dateStr); // 传入时间所属当前天开始的时间戳,即当前天的0点0分0秒的毫秒值
        long endOfDate = date + GlobalConstants.DAY_OF_MILLISECONDS; // 传入时间所属当前天结束的时间戳

        long firstDayOfWeek = TimeUtil.getFirstDayOfThisWeek(date); // 传入时间所属当前周的第一天的时间戳
        long lastDayOfWeek = TimeUtil.getFirstDayOfNextWeek(date); // 传入时间所属下一周的第一天的时间戳
        long firstDayOfMonth = TimeUtil.getFirstDayOfThisMonth(date); // 传入时间所属当前月的第一天的时间戳
        long lastDayOfMonth = TimeUtil.getFirstDayOfNextMonth(date); // 传入时间所属下一月的第一天的时间戳

        // [date,
        // [firstDayOfWeek
        // [firstDayOfMonth
        // 选择最小的时间戳作为数据输入的起始时间,date 一定大于等于其他两个 first 时间戳值

        // 获取起始时间
        startDate = Math.min(firstDayOfMonth, firstDayOfWeek);

        // 获取结束时间
        endDate = TimeUtil.getTodayInMillis() + GlobalConstants.DAY_OF_MILLISECONDS;
        if (endOfDate > lastDayOfWeek || endOfDate > lastDayOfMonth) {
            endDate = Math.max(lastDayOfMonth, lastDayOfWeek);
        } else {
            endDate = endOfDate;
        }

        // 获取连接对象,执行,这里使用 HBase 的 新 API
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = null;
        try {
            admin = connection.getAdmin();
        } catch (Exception e) {
            throw new RuntimeException("创建 HBaseAdmin 对象失败", e);
        }

        // 5、构建我们 scan 集合
        try {
            for (long begin = startDate; begin < endDate;) {
                // 格式化 HBase 的后缀
                String tableNameSuffix = TimeUtil.parseLong2String(begin, TimeUtil.HBASE_TABLE_NAME_SUFFIX_FORMAT); // 20170814
                // 构建表名称:tableName = event_logs20170814
                String tableName = EventLogConstants.HBASE_NAME_EVENT_LOGS + tableNameSuffix;

                // 需要先判断表存在,然后当表存在的情况下,再构建 Scan 对象
                if (admin.tableExists(TableName.valueOf(tableName))) {
                    // 表存在,进行 Scan 对象创建
                    Scan scan = new Scan();
                    // 需要扫描的 HBase 表名设置到 Scan 对象中
                    scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName));
                    // 设置过滤对象
                    scan.setFilter(filterList);
                    // 添加到 Scan 集合中
                    scans.add(scan);
                }

                // begin 累加
                begin += GlobalConstants.DAY_OF_MILLISECONDS;
            }
        } finally {
            // 关闭 Admin 连接
            try {
                admin.close();
            } catch (Exception e) {
                // nothing
            }
        }

        // 访问 HBase 表中的数据
        if (scans.isEmpty()) {
            // 没有表存在,那么 Job 运行失败
            throw new RuntimeException("HBase 中没有对应表存在:" + dateStr);
        }


        // 指定Mapper,注意导入的是mapreduce包下的,不是mapred包下的,后者是老版本
        TableMapReduceUtil.initTableMapperJob(
                scans, // Scan 扫描控制器集合
                NewInstallUserMapper.class, // 设置 Mapper 类
                StatsUserDimension.class,  // 设置 Mapper 输出 key 类型
                Text.class, // 设置 Mapper 输出 value 值类型
                job,  // 设置给哪个 Job
                true); // 如果在 Windows 上本地运行,则 addDependencyJars 参数必须设置为 false,如果打成 jar 包提交 Linux 上运行设置为 true,默认为 true
    }

    /**
     * 获取一个根据列名称过滤数据的 Filter
     * 
     * @param columns
     * @return
     */

    private Filter getColumnFilter(String[] columns) {
        byte[][] prefixes = new byte[columns.length][];
        for (int i = 0; i < columns.length; i++) {
            prefixes[i] = Bytes.toBytes(columns[i]);
        }
        return new MultipleColumnPrefixFilter(prefixes);
    }
}

20.3.4、测试

Step1、使用 maven 插件:maven-shade-plugin,将第三方依赖的 jar 全部打包进去,需要在 pom.xml 中配置依赖。参考【章节 十七、工具代码导入】中的 pom.xml 文件。

1、-P local clean package(不打包第三方jar)
2、-P dev clean package install(打包第三方jar)(推荐使用这种,本案例使用这种方式)

Step2、在 hadoop-env.sh 添加内容:

[atguigu@hadoop102 hadoop]$ pwd
/opt/module/hadoop-2.7.2/etc/hadoop
[atguigu@hadoop102 hadoop]$ vim hadoop-env.sh

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*

尖叫提示:修改该配置后,需要配置分发,然后重启集群,方可生效!!!

Step3、打包成功后,将要运行的 transformer-0.0.1-SNAPSHOT.jar 拷贝至 /opt/module/hbase/lib 目录下,然后同步到其他机器或者配置分发:

同步到其他机器
[atguigu@hadoop102 ~]$ scp -r /opt/module/hbase/lib/transformer-0.0.1-SNAPSHOT.jar hadoop103:/opt/module/hbase/lib/
[atguigu@hadoop102 ~]$ scp -r /opt/module/hbase/lib/transformer-0.0.1-SNAPSHOT.jar hadoop104:/opt/module/hbase/lib/

或者配置分发
[atguigu@hadoop102 ~]$ xsync /opt/module/hbase/lib/transformer-0.0.1-SNAPSHOT.jar

尖叫提示:如果没有同步到其他机器或者配置分发,会出现类找不到异常,如下:

执行异常:java.lang.RuntimeExceptionjava.lang.ClassNotFoundExceptionClass com.z.transformer.dimension.key.stats.StatsUserDimension not found

4、运行 jar 包,命令如下:

先进行数据清洗
$ /opt/module/hadoop-2.7.2/bin/yarn jar /opt/module/hbase/lib/transformer-0.0.1-SNAPSHOT.jar com.z.transformer.mr.etl.AnalysisDataRunner -date 2015-12-20

再进行统计运算
$ /opt/module/hadoop-2.7.2/bin/yarn jar /opt/module/hbase/lib/transformer-0.0.1-SNAPSHOT.jar com.z.transformer.mr.statistics.NewInstallUserRunner -date 2015-12-20

二十一、Hive 之 hourly 分析

尖叫提示:由于 “-” 在 HBase 的表名中允许,在 Hive 的表名中不可以是 “-”,即在 Hive 中,“-” 是特殊字符,为了方便和统一,所以我们将 “-” 的地方替换为 “_”。这样就三者统一了。即 HDFS 上存放数据的目录变为 /event_logs/2015/12/20,HBase 数据库中的表名变为 event_logs20151220,Hive 中的表名为 event_logsxxx。

21.1、目标

  分析一天 24 个时间段的新增用户、活跃用户、会话个数和会话长度四个指标,最终将结果保存到 HDFS 中,使用 sqoop 导出到 Mysql。

21.2、目标解析

  • 新增用户:分析 launch 事件中各个不同时间段的 uuid 数量
  • 活跃用户:分析 pageview 事件中各个不同时间段的 uuid 数量
  • 会话个数:分析 pageview 事件中各个不同时间段的 会话id 数量
  • 会话长度:分析 pageview 事件中各个不同时间段内所有会话时长的总和

21.3、创建 Mysql 结果表

21.4、Hive 分析

21.4.1、创建 Hive 外部表,关联 HBase 数据表

 

21.4.2、创建临时表用于存放 pageview 和 launch 事件的数据(即存放过滤数据)

 

21.4.3、提取 e_pv 和 e_l 事件数据到临时表中

 

21.4.4、创建分析结果临时保存表

 

21.4.5、分析活跃访客数

Step1、具体平台,具体平台版本(platform:name, version:version)

 

Step2、具体平台,所有版本(platform:name, version:all)

 

Step3、所有平台,所有版本(platform:all, version:all)

 

21.4.6、分析会话长度

  将每个会话的长度先要计算出来,然后统计一个时间段的各个会话的总和。

Step1、具体平台,具体平台版本(platform:name, version:version)

 

Step2、具体平台,所有版本(platform:name, version:all)

 

Step3、所有平台,所有版本(platform:all, version:all)

 

21.4.7、创建最终结果表

  我们在这里需要创建一个和 Mysql 表结构一致的 Hive 表,便于后期使用 Sqoop 导出数据到 Mysql 中。

 

21.4.8、向结果表中插入数据

  我们需要 platform_dimension_id int, date_dimension_id int, kpi_dimension_id int 三个字段,所以我们需要使用 UDF 函数生成对应的字段。

Step1、编写 UDF 函数,见代码
Step2、编译打包 UDF 函数代码

 

Step3、上传 UDF 代码 jar 包到 HDFS

 

Step4、使用 UDF 的 jar

 

Step5、执行最终数据统计

 

21.4.9、使用 Sqoop 导出 数据到 Mysql,观察数据

 

二十二、常用 Maven 仓库地址

常用 Maven 仓库地址
  中央库:http://repo.maven.apache.org/maven2/
  CDN库:https://repository.cloudera.com/artifactory/cloudera-repos/
  Maven 中央仓库最近更新的 Artifact:http://maven.outofmemory.cn/
  Search/Browse/Explore Maven Repository:https://mvnrepository.com/

大数据技术之_18_大数据离线平台_04_数据分析 + Hive 之 hourly 分析 + 常用 Maven 仓库地址

标签:size   time   getc   Dimension   转换   ddd   ima   reducer   结束时间   

原文地址:https://www.cnblogs.com/chenmingjun/p/10726899.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!