标签:
一、背景
微博,一个DAU上亿、每日发博量几千万的社交性产品,拥有庞大的数据集。如何高效得从如此规模的数据集中挖掘出有价值的信息,以增强用户粘性,提高信息传播速度,就成了重中之重。因此,我们引入了hadoop 分布式计算平台,对用户数据和内容数据进行分析和挖掘,作为广告推荐的基础。
二、问题及解决方案
在hadoop平台上进行开发时,主要遇到了以下一些问题:
2.1 数据量庞大
问题:无论在进行针对用户的协同过滤运算,还是在计算用户可能错过的微博中,无一例外的都遇到了数据量太大无法进行运算的情况。因此,精简计算数据成为了亟待解决的问题。
解决方案一:在用户推荐方面,可以对候选集合被推荐的概率进行预估,将具有极小推荐机会的数据忽略不计。目前通用的方法,就是直接选取活跃用户作为计算对象,既能够降低计算量,又能够保证获得预期的推荐效果。另外,对于一些超级节点,比如拥有很多粉丝的V用户,它会衍生出大量的关系链,导致计算规模暴增和数据分布偏移。对于这类节点,需要将与其相关的数据进行优选过滤。简而言之,就是优选候选节点。
解决方案二:在微博内容推荐方面,主要从微博内容的质量入手。对于那些信息量少、色情、垃圾等内容的微博,需要将其剔除,以保证候选集的质量。通过对同类微博推荐产品的点击日志统计后,发现无图微博的点击率较低,而该类微博大概占总微博数的10%,在对推荐效果影响不大的前提下,将该类数据从候选集中剔除,也能够大大降低计算量。
2.2 HDFS数据与线下交互不便
问题一:目前数据挖掘方面的业务基本都是放到hadoop平台上来进行,计算结果保存在HDFS上。而HDFS上的数据必须通过hadoop平台通道机中转后再传送至服务器端,传输效率较低。
解决方案:针对HDFS数据与线下交互不便的问题,我们在hadoop gateway上搭建了socat服务。socat是一个多功能的网络工具,它是两个独立数据通道之间的双向数据传输的继电器。这些数据通道包含文件、管道、设备、TCP/UDP、SSL\SOCKS4客户端或者代理CONNECT。在任何一台与gateway互通的服务器上,拉取相应的hadoop 和jdk,就可以方便地与hadoop平台交互,实现互通。
问题二:目前对于离线数据,我们常用lushan来进行挂载,但其数据格式与hdfs上默认支持的数据格式不同,无法直接使用。
解决方案:直接在hadoop平台生成lushan需要的文件格式。我们继承了FileOutputFormat,实现相应的write方法,生成了一个LushanFileOutputFormat,用于直接将结果数据以lushan数据格式输出。同理,用户也可以实现任何自定义输入和输出格式。
2.3业务逻辑复杂且运行过程不便监控
问题一:在日常的数据挖掘中,往往需要综合多种数据,业务逻辑纷繁复,用户只能自己实现业务流程。
问题二:hadoop job 正式上线后,用户最关注的就是该job是否正常执行,一旦异常能否及时收到通知,而人工通过jobtracker来监测是不现实的。
解决方案:基于以上两点,我们引入了hadoop平台提供的调度系统(Scheduler System)。用户可以将业务分为几个子模块,每个模块作为一个节点来实现对应的功能。用户只需要通过图形化界面将相互独立或者依赖的job节点进行连接,即可完成整个业务流程的搭建,还能够实现节点的复用。用户还可以对相应的节点设置监控报警信息,一旦出错,调度系统会根据用户设置进行报警提示。
图1 调度系统项目流程图
2.4 mapreduce开发过程繁琐
问题:做过mapreduce开发的人可能都有一个同感,除了核心逻辑以外,需要敲入大量相对固定的代码,比如map/reduce函数的定义,Job的输入、输出数据以及对应的数据格式等等。这些信息相对固定,但又不可或缺。
解决方案:mapreduce开发框架为此诞生了。该框架致力于让程序员将关注点放在核心功能的实现上,更简便的实现map/reduce的调用流程。其功能说明如下:
Driver.java
实现模块集成化,在运行时通过指定类名来执行相应的操作。可以将多种功能集合到一个jar包中,便于维护。见如下例子,就可以将GetUserSchoolPro 类加入到jar包中进行调用。
ProgramDriver pgd = new ProgramDriver();
pgd.addClass(GetUserCompanyPro.Name, GetUserCompanyPro.class, "GetUserCompanyPro");
假设生成的jar包为frame_mapred.jar,其执行方式如下:
hadoop jar frame_mapred.jar GetUserCompanyPro -companydata /dw/ods/ods_user_career/$yestday -outputpath /dw_ext/recmd/dongna/userinfo/user_company/$yestday_d -outputformat text -reducenum 200
FrameMapred.java:
该部分提供4个接口可供用户使用, AddMapper函数用来进行map操作,AddReducer函数用来进行reduce操作,loadResource函数可以用来从本地加载资源数据至内存中,供map/reduce 使用。
// generate data only with map
public static int AddMapper(Configuration conf, JobConf job, String strInputPath, String strOutputPath, Class<? extends InputFormat> clsInputFileFormat, Class<? extends Mapper> clsMapClass, String strOutputFormat, Class<?> clsMapOutputKey, Class<?> clsMapOutputValue)
//generate data with map and reduce
public static int AddMapper(JobConf job, String strInputPath, Class<? extends InputFormat> clsInputFileFormat, Class<? extends Mapper> clsMapClass, Class<?> clsMapOutputKey, Class<?> clsMapOutputValue)
//add reduce
public static int AddReducer(Configuration conf, JobConf job, String strOutputPath, String strOutputFormat, Class<? extends Reducer> clsRedClass, Class<?> clsOutputKey, Class<?> clsOutputValue)
// load local resource
public static int loadResource(JobConf job, String strFilePath, String strResName)
另外,在日常工作中,经常会针对badcase来查错,无一例外的需要查看各种中间数据的正确性。由于hadoop生成数据大部分都是非文本数据,就必须要先编写解析程序以达到目的。基于此,该hadoop开发框架中对于常见的rcfile, sequencefile 文件也提供了通用的解析工具,以期降低这方面的人力消耗。
三、系统架构
3.1 获取离线数据架构
图2 获取离线数据架构图
该框架可以实现Hadoop数据挖掘-线下加载的自动化,可靠性较高。通过调度系统定时启动或者由外部调用接口触发计算流程,计算完毕后,数据存储至HDFS上。线下存储服务通过访问SOCAT可以与HDFS进行数据交互,同时线下存储服务中的数据也可以通过SOCAT中转上传至HDFS。
3.2 数据实时获取架构
请求数据发送至RIN(统一数据入口),经队列消费程序确定数据的获取位置(后台存储、OPENAPI,HADOOP)后并分发。通过访问OPENAPI和后台存储来获取全部数据后直接进行数据分发。有时需要hadoop平台和存储服务相结合并对数据进行合并,再进行数据分发。
图3 数据实时获取架构图
四、发展
4.1 hadoop 开发框架扩展
目前hadoop开发框架的功能还不完善,主要有以下几个功能:
支持基于map/reduce业务的快捷开发
将相互独立的功能模块进行打包,便于维护
RCFile、SequenceFile、LZO文件的解析工具
后续会添加以下功能:
充实通用工具包,提供转置、倒排、简单map/reduce的数据抽取工具。
编写通用的MapReduce作业的链接工具,能够支持具有依赖、预处理和后处理阶段的链接。以减少中间阶段的IO,提高效率。
4.2 R9 Interface 任务提交平台
该平台致力于远程提交MapReduce任务和Hive sql操作,并能够与线下实现互通,完成数据分发及存储,结合报警监控工具保障整个业务流程的可控性。其框架图如下:
图4 R9 Interface 框架图
标签:
原文地址:http://www.cnblogs.com/leetieniu2014/p/5444894.html