相关动态
基于spark和flink的电商数据分析项目
2024-11-10 21:34

基于spark和fl<em></em>ink的电商数据分析项目

本文是原项目的一次重写。主要是用Dataframe代替原来的RDD,并在一些实现上进行优化,还有就是实时流计算改用Flink进行实现。 项目分为用户访问session模块、页面转跳转化率统计、热门商品离线统计和广告流量实时统计四部分组成。

用户访问session

该模块主要是对用户访问session进行统计分析,包括session的聚合指标计算、按时间比例随机抽取session、获取每天点击、下单和购买排名前10的品类、并获取top10品类的点击量排名前10的session。主要使用Spark Dataframe。

页面单跳转化率统计

该模块主要是计算关键页面之间的单步跳转转化率,涉及到页面切片算法以及页面流匹配算法。主要使用Spark Dataframe。

热门商品离线统计

该模块主要实现每天统计出各个区域的top3热门商品。主要使用Spark Dataframe。

广告流量实时统计

经过实时黑名单过滤的每天各省各城市广告点击实时统计、每天各省topn热门广告、各广告近1小时内每分钟的点击趋势。主要使用Spark streaming或Flink。

输入表


输出表


统计出符合条件的session中,各访问时长、步长的占比,并将结果保存到MySQL中。符合条件的session指搜索过某些关键词的用户、访问时间在某个时间段内的用户、年龄在某个范围内的用户、职业在某个范围内的用户、所在某个城市的用户,所发起的session。

除了将原rdd的实现改为DF外,本文还在两方面进行了优化。第一是join前提前filter。原实现是先从用户动作表中计算出访问时长、步长后和用户信息表进行关联后再filter的,这无疑是对一些无关的用户多余地计算了访问时长和步长,也增加了join是shuffle的数据量。第二点是原实现采用accumulator实现个访问时长人数和各步长人数的统计,这会增加driver的负担。而重写后的代码基于DF,且利用when函数对访问时长和步长进行离散化,最后利用聚合函数得出统计结果,让所有统计都在executors中并行执行。


根据各时长、步长的比例抽样。原实现利用rdd和scala自身数据结构和方法来实现,新的实现直接利用dataframe的统计函数sampleBy实现。

用,其中fractions为Map,是每distinct key和其需要抽取的比例,如("a" -> 0.8)就是从key为a的数据中抽80%条


分别计算出各商品的点击数、下单数、支付次数,然后将三个结果进行连接,并排序。排序规则是点击数大的排前面,相同时下单数大的排前面,然后再相同时支付次数大的排前面。这里的优化点是采用rdd的takeOrdered取前十,它的底层是每个分区一个最小堆,取出每个分区的前10,然后再汇总。这样省去了原来实现当中的sortbykey+take,该方法进行了全排序,效率较低。


对于top10的品类,每一个都要获取对它点击次数排名前10的session。 原代码的实现是先groupByKey,统计出每个sessionid对各品类的点击次数,然后再跟前10热门品类连接来减少数据,然后再用groupBuKey,对每个分组数据toList后排序取前10。这个实现并不太好,首先它一开始的groupByKey对非Top10热门品类的数据进行了统计,这是一种浪费。更好的做法是提前filter,即先利用热门品类这个名单进行filter。然后,原代码在实现filter使用的是将热门品类名单parallelise到集群然后利用join实现过滤。这会触发不必要的shuffle,更好的实现进行broadcast join,将名单广播出去后进行join。然后groupByKey的统计也是一个问题,它没有map side聚合,容易OOM,更好的实现是采用DF的groupby + agg。得出统计数据后利用windowfunction取得各热门品类的前十session。即一次shuffle就可以完成需求,windowfunction在这个并不需要shuffle,因为经过前面的shuffle聚合,df已经具有partitioner了,在原节点就可以计算出topn。


计算关键页面之间的单步跳转转化率。方法是先获取目标页面,如1,2,3,将它们拼接成1_2, 2_3得出两个目标转跳形式。同样需要在df的数据中产生页面转跳。方法是利用windowfunction将数据按sessionid分组,访问时间升序排序,然后利用concat_ws和window的lag函数实现当前页面id与前一条数据的页面id的拼接。集群数据中产生转跳数据后,利用filter筛选出之前的目标转跳形式。最后按这些形式分组统计数量,便得出每种转跳的数量,将它collect为map。另外还需要计算起始页1的数量,简单的filter和count实现。接下来就可以根据这些数据计算转跳率了。遍历目标转跳形式,从map中获取相应的数量,然后除以起始页/上一页的数量,进而得出结果。


原数据没有地区列和城市列(有城市id),所以先广播一个地区城市表,然后根据城市id进行join。之后按照地区和商品分组进行计数。最后利用windowfunction取各地区topn。


经过实时黑名单过滤的每天各省各城市广告点击实时统计、每天各省topn热门广告、各广告近1小时内每分钟的点击趋势。这部分原代码采用Spark Streaming实现,我将之改为基于Flink的实现。下面会首先介绍Spark Streaming的实现,然后到Flink。


创建流,利用预先广播的黑名单过滤信息,然后利用过滤后的信息更新黑名单、计算广告点击流量、统计每天每个省份top3热门广告、统计一个小时窗口内每分钟各广告的点击量。


实现实时的动态黑名单机制:将每天对某个广告点击超过100次的用户拉黑。提取出日期(yyyyMMdd)、userid、adid,然后reduceByKey统计这一批数据的结果,并批量插入MySQL。然后过滤出新的黑名单用户,实现为从MySQL中查找每条数据的用户是否对某条广告的点击超过100次,即成为了新的黑名单用户,找到后进行distinct操作得出新增黑名单用户,并更新到MySQL。


每天各省各城市各广告的点击流量实时统计。分组,key为日期+省份+城市+广告id,利用updateStateByKey实现累加。新的统计结果更新到MySQL。


利用上一步得到的结果,即key为日期+省份+城市+广告id,value为累积点击量,进行统计及分组topn。reduceByKey + windowfunction

同样在累积数据的基础上操作,提取出时间,然后利用固定窗口实现需求。


Flink的思路是通过三个KeyedProcessFunction来实现的,因为他有state(累积各key的值)和timer(定时删除state)功能。 第一个KeyedProcessFunction是记录每个userId-adId键的量,当达到阈值时对这类信息进行截流,从而实现黑名单的更新和过滤。 第二个是记录每个province的数据量,即每个省的广告点击量 第三个是记录一个map,里面统计每个省的点击量,当进行了一定数量的更新后,就输出一次这个map的前n个kv对(以排好序的string的形式),从而实现topn功能。


Launcher类


AdLog类

广告日志类以及处理过程产生的一些新case class


Schema类


FilterBlackListUser类


AccProvClick类

代码形式和上面的类几乎一样


BetterGenerateTopK类

    以上就是本篇文章【基于spark和flink的电商数据分析项目】的全部内容了,欢迎阅览 ! 文章地址:http://yybeili.xhstdz.com/quote/66772.html 
     栏目首页      相关文章      动态      同类文章      热门文章      网站地图      返回首页 物流园资讯移动站 http://yybeili.xhstdz.com/mobile/ , 查看更多   
发表评论
0评