相关动态
Flink 系例 之 电商项目 - 购物订单大屏监控实战 (示例)
2024-11-10 18:10

如果有看我写的 Flink 系例的前期文章,大部份是写 Flink 各算子、窗口、方法、Table&SQL、连接器等的直接用法与示例,那是为了尽可能的将学习知识点的基础应用简化成直接的示例成果,将概念通过单一示例展现,将复杂度降低增加学习的简易性,以免一开始看到一堆的概念算子就心生退意。程序员嘛,大部份时候,习惯于短平快的内容,希望 10 分钟能看完的东西,别婆婆妈妈扯个把小时,但往往这样,只能撑握皮毛,确无法深入理解,这算是行业人的通病吧,但各有所爱也无法统一,就这样吧多说无益。

Fl<em></em>ink 系例 之 电商项目 - 购物订单大屏监控实战 (示例)

本章以模拟一个电商平台的日常订单数据统计系统为设计基础,将电商平台的实时订单通过 Flink 实时流计算能力,按聚合维度实时计算,输出订单流计算结果,再通过监控大屏展示,实时快速撑握电商平台订单数据趋势、分类占比、销量排行等,从而全局了解电商平台业务运行状态,为电商平台高层决策、运营、分析、成长等提供最基础、最实时的数据依据。

其实上述计算模式通过其它第三方框架或平台等进行离线计算也一样是可以完成,如:sprak、hodoop 等,但 Flink 的优势就是实时数据流计算,不需要等待数据批量入仓后再进行统一清洗、加工、计算、存储等,Flink 的实时流计算引擎可以将数据分段(按时间或数量)快速计算,就像水龙头一样打开(数据)水流入到桶内,对装满不同大小桶的(数据)水进行计算,如:体积、重量等(窗口内聚合计算,则水龙头源源不断的流出水,从而保证了当前看到数据即为最新计算结果,效率和体验都是最佳选择。

以电商项目运作模式为起点,将电商平台中对各维度计算的应用场景,再结合我们学习的 Flink 流式计算技术,融合到真实业务中,通过技术加速业务成长,通过业务检验新技术的可行性,从而推动新技术的落地与大面积的应用

根据电商项目的作业场景,我们选取以订单系统系统进行实时维度统计,将数据流按以下几个场景,采用 Flink 流式实时计算能力进行开发实现

1. 商品累计销售总额(按分钟刷新

2. 商品累计销售总量(按分钟刷新

3. 每分钟商品销售流量

4. 每分钟商品分类销售量排名

5. 品牌营销能力:按累计销量、累计销售额计算

6. 消费前十排名用户(按分钟刷新

7. 性别购买能力(按分钟刷新,统计各性别购买总额与占比

8. 每 10 分钟商品销量(统计各性别分时段购买总量

1. 将前期学习的算子、方法、窗口、连接器、Table&SQLAPI、水印等知识点串连汇集

2. 根据电商项目特点,结合业务需求,将 Flink 能力特性与业务流程融合到项目实践中

3. 检验前期学习的知识,加强动手能力,通过编码成果来验证设计目标、运作流程等满足业务需求目标

4. 通过学习新的技术知识或设计新业务流程架构,实现架构知识与设计能力提升。

:其实最终目标,就是。

项目采用技术点如下

  • flink 开发库

  • kafka 集群

  • vue2 + element.ui + echarts (大屏显示

  • spring boot webflux (大屏后端服务)

  • redis 存储实时计算数据(生产因该考虑数据同步落库到 mysql 进行持久化

项目流程

1. 一个正常流程的电商订单系统,会将先所有用户的订单发送到订单表或订单队列中,我们此处的 “微服务” 则是用示例代码模拟订单系统,将订单推送到 kafka 集群中,用来做业务削峰与订单缓存。

2. 通过集成 Flink 库,开发多维度实时流计算客户端,上传到 Flink 集群中,提交启动运行 Task JOB 服务。

3.Flink 客户端从 kafka 中获取订单数据,实时计算各窗口限定颗粒度的数据流对象,将算子结果输出到 redis 中。

4. 大屏监控服务实时或定时轮询获取 Redis 的最新实时计算结果,并对维度数据进行格式化,输出到前端,前端根据各维度要求生成对应图表效果

前端采用 vue2.x + element.ui

主要用到图表展示组件 echarts

进入前端项目根目录,下载安装依赖模块

安装完毕后,再命令运行前端项目

或打包后上传到 nginx 服务中做静态资源

工程目录结构

后端项目与整个 flink 项目示例整合在一个工程项目包内,工程名称叫 flink-examples,其中划分不同的业务模块,以下为工程结构

flink-examples

------ connectors(中件间连接器示例模块

------ examples (模拟电商订单数据并推送到 kafka 中,以及 flink 核心流处理客户端)

------ stream(数据流与算子、方法、窗口等示例代码

------ tableapi(table&sql 与中件间的使用示例代码

------ web(获取 flink 算子计算后的存储结果,提供给前端展示

  • jdk 1.8

  • springboot 2.3.4.RELEASE

  • redis 3.x

  • flink 1.11.1

因工程示例本地已开发完毕,直接进行打包

模块打包成 jar 包,其中的 examples 模块打包后是独立可运行的客户端,也是上传到 flink 集群平台执行 TaskJOB 的客户端 jar 包

启动 Flink 的 Standalone 模式集群服务

  • 主机:192.168.110.35(master)、192.168.110.35(slaves

  • flink 安装目录: /opt/flink-1.11.1

由于已提前搭建好 Standalone 模式集群,则直接进入 master 下直接启动集群。如果未搭建 flink 的 Standalone 模式集群, 参见另一文章

只需要在 master 主机下启动 flink-cluster 集群,在 master 主服务启动过程中,会执行远程命令启动所有 slaves 从服务

启动 Kafka 中件间服务,整个工程项目中,电商平台订单数据存放在 kafka 消息队列中,集成 Flink 的 job 客户端将从 Kafka 消息队列中拉取订单数据

  • 主机:192.168.110.35(单节点

  • kafka 安装目录:/opt/kafka_2.11-2.2.2

由于已提前搭建好 Kafka 中间件,则直接进入安装目录启动消息服务。如果未搭建 Kafka 消息中件间服务, 参见另一文章

完成此步后,接下来可以访问 flink 平台,运行客户端

在打包后提交到 flink 平台运行过程中有一个 jar 执行错误,如下

:是在 mavne 打包过程中某些依赖 jar 包执行出错,以及 jar 包重复引用等,在打成 jar 过程中,meta-INF 中多了一些 *.SF,*.DSA,*.RSA 文件导致的(签名摘要文件

  • 1. 手动解压 jar 包,将 meta-INF 中的 *.SF,*.DSA,*.RSA 文件删除后,重新打成 jar 包上传到 Flink 平台

  • 2. 或者在 pom.xml 中配置 maven 打包插件做过滤

地址栏访问 flink 管理平台

访问端口可在 /opt/flink-1.11.1/conf/flink-conf.yaml 配置中直接修改,本平台配置为 8083

默认进入的是客户端主页,在主页中显示 Available Task Slots = 32(翻译过来叫可用的任务槽,是 Flink 根据 /opt/flink-1.11.1/conf/flink-conf.yaml 配置文件中的 taskmanager.numberOfTaskSlots: 16,识别当前集群的可用总任务数。两台主机的配置相同,CPU 均为 8 核 16 进程,按照一个 Task Slots 分配一个 CPU 进程。因此两台主机累计可用 CPU 核心进程为 32 进程,Total Task Slots 则 Master 和 Slaves 各配置为 16,合计 ,后续在提交运行 Jar 客户端时,需要配置的并行度,即指的就是当前 Available Task Slots 范围内的可用数。但 Available Task Slots 与 Task JOB 的并行度,并不太容易理清关系,按照网上有一个 Flink Task Slots 计算公式

但我个人的理解,即当前 TaskJOB 中所有算子并行度合计的最大可用数,即为 的剩余数

这个我没有认真去求证,但有一篇博文件可以作为参考来理解 slot ,链接地址

所以实际生产使用,需要评估 job 客户端 Slots 使用量,以免无法最大化发挥与利用平台有效资源

从左侧 Submit New Job 菜单进入,点击 Add New 按钮,在弹出窗口中选择我们上一步打包的 Job 客户端,即 examples 的打包后的 jar 文件,该 jar 文件包含了 flink 开发库、kafka 客户端、redis 客户端等依赖包;点击打开后,开始上传 jar 包,上传速度与网络以及包大小有关

打工本地工程,在 examples 模块源码 的 src》test》java》com.flink.test.CreateKafkaMsg 类中,直接右键》run ,该类模拟创建订单数据并向 kafka 发送订单消息的示例,假设不间断产生电商平台订单数据

数据结构如下

选择 examples-1.0-SNAPSHOT.jar 客户端,在展开的输入框中,按以下内容输入

  • Entry Class:com.flink.examples.StartFlinkKafkaServer

  • Parallelism:16(并行度

  • Program Arguments: 参数(无

  • Savepoint Path: 打印输出文件路径(无

点击 Submit,提交 Task Job 作业,Flink 平台分配资源进行执行

通过平台显示,当前运行算子数量有为 7,其中子节点的 6 个为算子,起始根节点为 source 数据流加载算子方法,采用 flink-kafka 中间件连接器从 kafka 消息队列中获取 mq 订单数据源,分配给不同的业务算子,进行聚合计算;在每个算子的方法中,会将窗口统计数据存储到 redis 中,提供给大屏监控服务的后台使用

在运行前端项目后,访问 即后看到大屏监控从后端服务中(后端服务从 redis 中加载指标数据)获取数据生成各维度指标图标。界面在展示过程中,会定时轮询后台服务接口,获取更新数据,刷新前端图表

采用了一个 Demo 专门模拟生成电商订单数据,不停向 Kafka 推送结构化订单数据,目的是为仿照电商平台架构中的订单生成与数据削峰缓存处理过程;在由 Flink 平台获取 kafka 中的数据流,放到算子中进行聚合计算,输出结果到 redis 中,大屏不停的轮询后台应用获取数据进行展示;流程简单而言:订单》Kafka》Flink》Redis《后台《前端

CreateKafkaMsg.java

从 Kafka 中获取数据,将源源不断的数据流分别按不同的统计业务场景,分别在不同的算子与窗口下进行聚合计算。

StartFlinkKafkaServer.java

其它略..... 以工程源码为主

当 JOB 作业提交到到 Flink 平台后,为了确认 Jar 客户端的运行情况,除了在 Job 作业详情总览界面上查看算子运行状态外,还可以在 Jar 客户端正常运行过程中,从 Flink Dashboard 平台 JobManager 中查看作业的执行日志,用于分析与排查 TaskJob 作业的执行情况,也可以将开发过程中,需要的程序日志信息等在此功能窗口中打印用于数据跟踪

整个学习与开发过程,几乎没有的理论性的长篇总结,主要以场景为切入点,通过示例实践了解整个流程

1.flink 客户端可以在本地开发环境上运行,同理也可以部署在独立服务器上单节点运行,但采用 flink 通常需要考虑大规则的数据应用场景,服务架构以集群为主,分为 on Yarn 和 Standalone 两种集群模式。

2.flink 提供中间件连接器,可以将一个中间件的数据做为输入通道,如:es,mysql,redis,mq 等,做为源源不断的数据来源,写入到 Flink 的数据(批)流中,通过将数据流按窗口、水印等放到一个或分多个计算算子中,进行聚合计算,在将计算结果进行合并或归类,再通过中间件连接器将结果输出到中间件中(es,mysql,redis 等

3. 一旦提交启用 Flink 客户端后,Flink 会一直处于运行中(无输出源,则批流处理完毕打印日志后 JOB 停止,不断的按时长或数量等窗口分段滑动或滚动模式计算周期内的数据。

4.flink 可以通过 dataStreamApi 开发客户端算子和数输流输入输出功能,也同样可以用 Table SQL 开发相同功能

此示例只是为了演示一个电商平台的数据流实时处理过程,但生产环境下实时计算方案大同小异,相差的只是业务场景的不一样

Gitee

内容未做仔细审稿,如有错误,敬请指出

    以上就是本篇文章【Flink 系例 之 电商项目 - 购物订单大屏监控实战 (示例)】的全部内容了,欢迎阅览 ! 文章地址:http://yybeili.xhstdz.com/news/4684.html 
     栏目首页      相关文章      动态      同类文章      热门文章      网站地图      返回首页 物流园资讯移动站 http://yybeili.xhstdz.com/mobile/ , 查看更多   
最新文章
国际站:SEO Checker诊断工具助力商家诊断详情页,提升seo效果
国际站商家看过来:众所周知,一个优质的商品描述,可以提升买家转化的同时,还可以带来更多免费流量,从而让商家获得更多询盘和
【富蕴网站优化】在网站优化中有哪些常用的网站推广方式?
1、,百度,google的优化,针对,音乐,mp3下载,电影、游戏等一级,二级,甚至关键字优化。Seo介绍的网站很多,就不在这里重复
公众号简单爬虫--把公众号文章全部转载到网站(二)
根据上一篇的方法,我们得到了一个获取列表信息的地址,而且是用get方法就可以的地址.那么事情就变得很简单了,就是常规的爬取信息
1.【typecho】个人博客安装—使用群晖演示
哈喽,大家好今天给大家带来的是最近在群晖上安装一个博客的演示。先给大家看一下安装好之后的效果。虽然说现在使用博客和看博客
SEO优化攻略,揭秘网站排名提升与流量最大化技巧
本文深入解析SEO优化策略,通过关键词研究、高质量内容创作、链接建设等技术手段,帮助提升网站在搜索引擎中的排名,从而实现流
【HMNOTE搜狗手机输入法下载】小米HMNOTE搜狗手机输入法12.1.1免费下载
搜狗输入法,拥有超大中文词库,输入更加精准,智能。搜狗智能旺仔带你用表达,斗图,妙语,输入更加有趣。******特色功能******
vivo S19 Pro:全焦段人像拍照的5G游戏续航新宠
在智能手机市场日益同质化的今天,vivo S19 Pro以其独特的全焦段人像拍照功能、强大的5G性能以及出色的游戏续航能力,成为了众多
12月12日,星期四, 每天60秒读懂全世界!
新闻来源:百度热搜榜1. 25年老员工被开除 法院判赔98万近日,工作25年的李某被安排学习员工手册,全程玩手机遭公司开除,引发热
刚子扯谈:网站运营在左 技术在右 真TM扯
2013年8月5日,雨未下,天猴焖开片语:今天的扯谈内容是我转载我Java学习交流群里面一个哥们,当然我推荐他加入了朋友的网络分析
做seo营销网站/百度惠生活商家入驻
假定我们定义了一个Persion的message type,我们的用法可能如下定义 和 package tutorial;message Person {required int
相关文章