码迷,mamicode.com
首页 > 移动开发 > 详细

移动分发端 基础统计指标经典业务代码节选--留存用户统计

时间:2015-10-28 15:54:34      阅读:248      评论:0      收藏:0      [点我收藏+]

标签:

.net + sql        

                    #region 构造统计日期临时表

                    DataTable originalDates = new DataTable();
                    DataColumn col = new DataColumn("OriginalStartDate", typeof(Int32));
                    originalDates.Columns.Add(col);
                    col = new DataColumn("OriginalEndDate", typeof(Int32));
                    originalDates.Columns.Add(col);
                    int step = endDate.Subtract(startDate).Days + 1;
                    for (int i = 1; i <= 30; i++)
                    {
                        //如果是天的周期,增加到30日留存
                        if (period != PeriodOptions.Daily && i > 6) break;
                        DataRow newRow = originalDates.NewRow();
                        DateTime pStartDate;
                        DateTime pEndDate = Utility.GetNextStatDate(period, startDate, endDate, -i, out pStartDate);
                        newRow["OriginalStartDate"] = int.Parse(pStartDate.ToString("yyyyMMdd"));
                        newRow["OriginalEndDate"] = int.Parse(pEndDate.ToString("yyyyMMdd"));
                        originalDates.Rows.Add(newRow);
                    }

                    #endregion

                    //计算汇总
                    SqlParameter[] paramters = new SqlParameter[]
                    {
                        SqlParamHelper.MakeInParam("@dt", SqlDbType.Structured),
                        SqlParamHelper.MakeInParam("@StartDate", SqlDbType.Int, 4, startDate.ToString("yyyyMMdd")),
                        SqlParamHelper.MakeInParam("@EndDate", SqlDbType.Int, 4, endDate.ToString("yyyyMMdd")),
                        SqlParamHelper.MakeInParam("@Period", SqlDbType.TinyInt, 1, (int)period)                        
                    };
                    paramters[0].TypeName = "dbo.OriginalDatesType";
                    paramters[0].Value = originalDates;
                    DataSet ds = SqlHelper.ExecuteDataset(ComputingDB_ConnString, CommandType.StoredProcedure, "PR_StatRetainedUsers", paramters);

ALTER PROCEDURE [dbo].[PR_StatRetainedUsers] (
	@dt OriginalDatesType readonly,	
	@StartDate int,
	@EndDate int, 
	@Period tinyint
	)
AS
begin
	create table #RetainedUsers(SoftID int,Platform tinyint,ChannelID int,OriginalStatDate int,RetainedUserCount int)
	declare @sql nvarchar(max);
	if (@Period = 1) begin
		set @sql = N‘insert into #RetainedUsers(SoftID,Platform,ChannelID,OriginalStatDate,RetainedUserCount)
					select A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate,COUNT(distinct A.IMEI)
					from ...‘ + CAST((@StartDate / 10000) as nvarchar(10)) + N‘ A with(nolock) inner join @dt B
					on A.LoginDate=@StartDate and A.FirstLoginDate between B.OriginalStartDate and B.OriginalEndDate and (FromCache=0 or FromCache is null)
					group by A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate;‘
		exec sp_executesql @sql, N‘@StartDate int,@dt OriginalDatesType readonly‘, @StartDate, @dt	
	end else begin
		if (@StartDate/10000 = @EndDate / 10000) begin
			set @sql = N‘insert into #RetainedUsers(SoftID,Platform,ChannelID,OriginalStatDate,RetainedUserCount)
						select A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate,COUNT(distinct A.IMEI)
						from ....‘ + CAST((@StartDate / 10000) as nvarchar(10)) + N‘ A with(nolock) inner join @dt B
						on A.Part=@part and A.LoginDate between @StartDate and @EndDate and A.FirstLoginDate between B.OriginalStartDate and B.OriginalEndDate and (FromCache=0 or FromCache is null)
						group by A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate;‘
		end else begin
			set @sql = N‘insert into #RetainedUsers(SoftID,Platform,ChannelID,OriginalStatDate,RetainedUserCount)
						select A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate,COUNT(distinct A.IMEI)
						from (
							select * from ....‘ + CAST((@StartDate / 10000) as nvarchar(10)) + N‘ with(nolock) where Part=@part and LoginDate between @StartDate and @EndDate and (FromCache=0 or FromCache is null)
							union all
							select * from ....
							
							
							‘ + CAST((@EndDate / 10000) as nvarchar(10)) + N‘ with(nolock) where Part=@part and LoginDate between @StartDate and @EndDate and (FromCache=0 or FromCache is null)) A
							inner join @dt B
						on A.FirstLoginDate between B.OriginalStartDate and B.OriginalEndDate
						group by A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate;‘
		end	
		declare @part tinyint = 0;	
		while @part < 128 begin		 
			
			--if (@Period <> 12 or @part = 0) begin
			exec sp_executesql @sql, N‘@part tinyint,@StartDate int,@EndDate int,@dt OriginalDatesType readonly‘, @Part, @StartDate, @EndDate, @dt;
			--end
			
			set @part = @part + 1
		end
	end
	
	select @Period Period,OriginalStatDate,@EndDate StatDate,SoftID,Platform,-1 ID2,-1 ID1,0 OriginalNewUserCount,SUM(RetainedUserCount) RetainedUserCount 
	from #RetainedUsers
	group by SoftID,Platform,OriginalStatDate
	
	select @Period Period,OriginalStatDate,@EndDate StatDate,SoftID,Platform,ChannelID ID2,-1 ID1,0 OriginalNewUserCount,SUM(RetainedUserCount) RetainedUserCount 
	from #RetainedUsers
	group by SoftID,Platform,ChannelID,OriginalStatDate	
	
	drop table #RetainedUsers
end

hadoop

@MapConfig
public static class MapTask extends Mapper<LongWritable, Text, Text, Text> {
    private Text mKey = new Text();
    private Text mValue = new Text();
    private StringBuilder sb = new StringBuilder();
    private Map<String,Integer> map=new HashMap<String,Integer>();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        String enddate =context.getConfiguration().get("key_enddate");
        int period =Integer.parseInt(context.getConfiguration().get("period"));
        int step=Integer.parseInt(context.getConfiguration().get("step"));
        DateTime curstatdate=DateTime.parseToDateTime(enddate,"yyyyMMdd");

        //设置要计算留存的时间
        if (period==PeriodOptions.GetValueByEnum(PeriodOptions.Daily)){
            for (DateTime startdate=curstatdate.addDays(-30);new Double(DateTime.minusDay(curstatdate,startdate)).intValue()>0;startdate=startdate.addDays(1)){
                Integer tmp=Integer.parseInt(startdate.toString("yyyyMMdd"));
                map.put(startdate.toString("yyyyMMdd"),tmp);
            }
        }else if(period==PeriodOptions.GetValueByEnum(PeriodOptions.Weekly)){
            for (DateTime startdate=curstatdate.addDays(-48);DateTime.minusDay(curstatdate,startdate)>0;startdate=startdate.addDays(1)){
                if (WeekOptions.GetEnumByValue(startdate.getDayOfWeek())== WeekOptions.SUNDAY){
                    Integer tmp=Integer.parseInt(startdate.toString("yyyyMMdd"));
                    for (DateTime substartdate=startdate.addDays(-step);new Double(DateTime.minusDay(startdate,substartdate)).intValue()>=0;substartdate=substartdate.addDays(1)){
                        map.put(substartdate.toString("yyyyMMdd"),tmp);
                    }
                }
            }
        }else if(period==PeriodOptions.GetValueByEnum(PeriodOptions.NaturalMonth)){
            for (DateTime startdate=curstatdate.addMonths(-7).addDays(1);DateTime.minusDay(curstatdate,startdate)>0;startdate=startdate.addDays(1)){
                if (startdate.addDays(1).day()==1){
                    Integer tmp=Integer.parseInt(startdate.toString("yyyyMMdd"));
                    for (DateTime substartdate=startdate.addMonths(-1).addDays(1);new Double(DateTime.minusDay(startdate,substartdate)).intValue()>0;substartdate=substartdate.addDays(1)){
                        map.put(substartdate.toString("yyyyMMdd"),tmp);
                    }
                }
            }
        }
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String str = value.toString();
        String[] params = StringUtils.splitByWholeSeparatorPreserveAllTokens(str, "\t");

        if (!map.containsKey(params[23]) || (Integer.parseInt(params[15])&1)!=0 ){
            return;
        }
        int firstlogintime=map.get(params[23]);

        String enddate =context.getConfiguration().get("key_enddate");

        sb.delete(0, sb.length());
        //key<0:softid,2:platform,22 firstchannelid>
        //value<23 firstlogindate,4:logindate,3:imei>
        sb.append(params[0]).append("\t")
                .append(params[2]).append("\t")
                .append(params[22]);
        mKey.set(sb.toString());
        mValue.set(firstlogintime + "\t" + enddate + "\t" + params[3]);
        context.write(mKey, mValue);

        sb.delete(0, sb.length());
        sb.append(params[0]).append("\t")
                .append(params[2]).append("\t")
                .append(-1);
        mKey.set(sb.toString());
        mValue.set(firstlogintime + "\t" + enddate + "\t" + params[3]);
        context.write(mKey, mValue);
    }
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        map.clear();
    }
}

//key<0:softid,2:platform,22 firstchannelid>
//value<23 firstlogindate,4:logindate,3:imei>
@CombineConfig
public static class CombineTask extends Reducer<Text, Text, Text, Text> {

    Text mvalue=new Text();
    //留存用户
    private Multiset<String> multiset=HashMultiset.create();
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {

        multiset.clear();
        for (Text item:values){
            multiset.add(item.toString());
        }

        for (String item:multiset.elementSet()){
            mvalue.set(item);
            context.write(key,mvalue);
        }
    }

}

//key<0:softid,2:platform,22 firstchannelid>
//value<23 firstlogindate,4:logindate,3:imei>
@ReduceConfig
public static class ReduceTask extends Reducer<Text, Text, Text, Text> {

    private Text mValue = new Text();
    //留存用户
    private Map<String, Multiset<Object>> MRetained = new HashMap<>();
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {

        for (Map.Entry<String, Multiset<Object>> map : MRetained.entrySet()) {
            MRetained.get(map.getKey()).clear();
        }
        MRetained.clear();

        for (Text item : values) {
            String[] params = StringUtils.splitByWholeSeparatorPreserveAllTokens(item.toString(), "\t");
            String mapkey = params[0] + "\t" + params[1];

            if (!params[0].equals(params[1])) {
                if (!MRetained.containsKey(mapkey)) {
                    MRetained.put(mapkey, HashMultiset.create());
                }
                MRetained.get(mapkey).add(params[2]);
            }
        }
        String period = context.getConfiguration().get("period");
        for (Map.Entry<String, Multiset<Object>> map : MRetained.entrySet()) {
            if (map.getKey().split("\t") == null ) {
                continue;
            }
            mValue.set(period
                    + "\t" + map.getKey()
                    + "\t" + 0
                    + "\t" + map.getValue().elementSet().size());
            //softid,platform,channelid,period,originaldate,statdate,OriginalNewUserCount,RetainedUserCount
            context.write(key, mValue);
        }
    }

}











































移动分发端 基础统计指标经典业务代码节选--留存用户统计

标签:

原文地址:http://my.oschina.net/osenlin/blog/523099

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!