普通的实时计算优先考虑时效性,所以从数据源采集经过实时计算直接得到结果。如此做时效性更好,但是弊端是由于计算过程中的中间结果没有沉淀下来,所以当面对大量实时需求的时候,计算的复用性较差,开发成本随着需求增加直线上升。
实时数仓基于一定的数据仓库理念,对数据处理流程进行规划、分层,目的是提高数据的复用性。
例如下图:
- 例如:我们在普通实时SparkStreaming项目中,直接从数据源获取后通过过滤然后获取新增数据和变化数据,之后进行GMV的计算,但是弊端就是假设我后续的销售明细或者其他需求需要使用到新增数据和变化数据,则需要重新从数据源再次过滤进行数据处理;这样数据的复用性就会很差!
离线计算:
就是在计算开始前已知所有输入数据,输入数据不会产生变化,一般计算量级较大,计算时间也较长。例如今天早上一点,把昨天累积的日志,计算出所需结果。最经典的就是 Hadoop 的 MapReduce 方式;
一般是根据前一日的数据生成报表,虽然统计指标、报表繁多,但是对时效性不敏感。从技术操作的角度,这部分属于批处理的操作。即根据确定范围的数据一次性计算。
实时计算:
输入数据是可以以序列化的方式一个个输入并进行处理的,也就是说在开始的时候并不需要知道所有的输入数据。与离线计算相比,运行时间短,计算量级相对较小。强调计算过程的时间要短,即所查当下给出结果。主要侧重于对当日数据的实时监控,通常业务逻辑相对离线需求简单一下,统计指标也少一些,但是更注重数据的时效性,以及用户的交互性。从技术操作的角度,这部分属于流处理的操作。根据数据源源不断地到达进行实时的运算。
实时电商数仓项目分层
- ODS: 原始数据,日志和业务数据—>放到kafka中
- DWD: 根据数据对象为单位进行分流,比如订单、页面访问等等。—>从kafka中读取数据进行分流处理
- 使用侧输出流进行分流
- 日志数据和业务数据的事实表,我们都放到dwd层的
- 日志数据和业务数据的事实表放在kafka中的
- DIM: 维度数据—>维度表放到dim层
- 维表放到Hbase
- 为什么事实表放到kafka,而维表放到Hbase呢?
-
- 事实表和维表需要关联进行数据的统计操作,join实际上就是从维表中通过主键id查询相关维度信息然后放入事实表中。
-
- 由于kafka中的数据默认保存7天,不能永久保存数据;而比如用户维度信息表,它保存了用户的信息,需要长期使用。
- 另外一点kafka属于数据传输的消息中间件,仅是作为数据的缓存和订阅消费;无法根据用户id去查询某个数据表的信息。而Hbase却可以实现快速的查询,结合Phoenix使用
-
- DWM: 对于部分数据对象进行进一步加工,比如独立访问、跳出行为,也可以和维度进行关联,形成宽表,依旧是明细数据。
- dwd层和dws层中间部分,将共用的数据抽取放到了dwm层 —>放入到kafka中,需要接着被消费,被加工
- DWS: 根据某个主题将多个事实数据轻度聚合,形成主题宽表。—>放到ClickHouse中,供查询使用
- ADS: 把ClickHouse中的数据根据可视化需进行筛选聚合—>不落盘,不存储,直接对接SpringBoot接口进行数据展示;离线数仓解决了对历史数据的分析需求;实时计算解决的是时效性的数据分析。
- 离线数据在计算前已经知道所有的数据,而实时数据在计算时正在获取数据,边获取,边计算。
- 实时计算和离线计算以及即席查询的区别:
- 实时需求是7*24小时去运行的,离线需求固定性,每天有定时任务。而即席查询是暂时的需求。
- 即席查询的框架presto(基于内存当场计算)、kylin(预计算)多维分析–>(hive中有个with cube)
日常统计报表或分析图中需要包含当日部分
对于日常企业、网站的运营管理如果仅仅依靠离线计算,数据的时效性往往无法满足。
通过实时计算获得当日、分钟级、秒级甚至亚秒的数据更加便于企业对业务进行快速反应与调整。所以实时计算结果往往要与离线数据进行合并或者对比展示在 BI 或者统计平台中。
实时数据大屏监控需求
数据预警或提示
经过大数据实时计算得到的一些风控预警、营销信息提示,能够快速让风控或营销部分得到信息,以便采取各种应对。
比如,用户在电商、金融平台中正在进行一些非法或欺诈类操作,那么大数据实时计算可以快速的将情况筛选出来发送风控部门进行处理,甚至自动屏蔽。 或者检测到用户的行为对于某些商品具有较强的购买意愿,那么可以把这些“商机”推送给客服部门,让客服进
行主动的跟进。
实时推荐系统
实时推荐就是根据用户的自身属性结合当前的访问行为,经过实时的推荐算法计算,从而将用户可能喜欢的商品、新闻、视频等推送给用户。这种系统一般是由一个用户画像批处理加一个用户行为分析的流处理组合而成。
离线架构
-
sqoop同步数据的方式:
- 增量、全量、新增及变化,特殊。
- 新增:在SQL中有判断日期,where dt=‘今天’ ;创建时间等于当日。
- 全量:where 1=1;
- 新增及变化:在SQl中创建时间或修改时间等于今天;使用or做时间过滤
- 特殊就导一次
- 增量、全量、新增及变化,特殊。
-
Flume:
- source—>taildir source,channel—>kafka channel ; 没有sink
- TailDirsource:
优点: 断点续传,监控多目录多文件,实时监控
缺点: 当文件更名之后会重新读取该文件造成重复。- 出现重复的原因:taildirsource它是iNode+全路径表示一个文件,当文件名变化了,它会认为一个新文件。
- 解决的办法:
-
- 就是使用不更名的打印日志框架(logback)
-
- 修改源码,让TailDirSource判断文件时只看iNode值
-
KafkaChannel:
优点: 将数据写入Kafka, 省了一层sink组件。 这个组件在kafka中:既是生产者又是消费者
用法:
-
- Source-kafkaChannel-Sink
-
- Source-kafkaChannel
-
- kafkaChannel-Sink
-
实时架构
kafka框架的回顾:
producer:
-
如何保证生产者的数据不丢失?数据的可靠性方面采用ACk应答机制,参数0,1,-1
-
拦截器,序列化器,分区器
-
发送流程 sender main
-
幂等性,事务
-
分区规则—>
有指定分区则发往指定分区,没有指定分区则根据key值Hash;没有指定分区也没有key的时候,轮询(粘性)
Brock:
Topic:
副本:高可用
分区:高并发、负载均衡(防止热点数据)、避免忙闲不均
ISR、OSR、LEO、HW
Consumer:
分区分配规则、offset偏移量的保存(默认保存在__consumer_offsets)
其他:手动维护offset(mysql)
保存数据&保存offset写入一个事务,精准一次消费。
NO1. 业务数据:
实时架构中,业务数据使用Canal/Maxwell/FlinkCDC来检测mysql数据库的binlog日志,动态监测数据库中数据的变化;然后使用 Canal将变化的数据读写到kafka中。
binlog:开启为行级别
NO2. 日志数据:
在离线数仓的架构中使用Flume读取落盘的日志文件,而实时则直接将日志写入到kafka。
优点是:速度快,时效性高,减少了磁盘IO;缺点:耦合性高。
ODS: 将日志数据和业务数据写入ods层的kafka后,分为两个主题:1.行为日志主题 2.业务数据主题
DWD层:使用Flink连接kafka进行数据的消费,使用侧输出流分流到不同的主题中。
-
**日志数据:**会按照 页面数据、事件数据、曝光数据、启动数据和错误数据,因此要拆分为五个主题。
-
**业务数据:**除了一部分事实表需要存放到kafka中,另外的维表需要存放到Hbase上。
DWM层:我们使用Flink消费dwd的主题数据,中间可能需要关联维表;dwd层事实表关联Hbase上的维表然后形成dwm层的主题。
DWS层:使用Flink消费DWD层的数据和DWM的数据,然后将结果写出到ClickHouse,对接数据接口形成可视化页面。
ADS层:最终的指标存入Mysql或Redis
整体采用Lambda架构,将实时数仓和离线数仓整合到一起,复用kafka组件。
离线架构
- 优点: 耦合性低,稳定性高
- 缺点: 时效性差一点
- 说明:
-
- 项目经理(架构师)是大公司出来的,追求系统的稳定性
-
- 耦合性低,稳定性高
-
- 考虑到公司未来的发展,数据量一定会变得很大
-
- 早期的时候实时业务使用Sparkstreaming(微批次)
-
实时架构
- 优点: 时效性好
- 缺点: 耦合性高,稳定性低
- 说明:
- 1.突出优点,时效性好
- 2.Kafka集群高可用,挂一台两台是没有问题
-
- 数据量小,所有机器存在于同一个机房,传输没有问题4.架构还是公司项目经理(架构师)定的
模拟日志生成器的使用
这里提供了一个模拟生成数据的 jar 包,可以将日志发送给某一个指定的端口,需要大数据程序员了解如何从指定端口接收数据并对数据进行处理的流程。
- 开发需求:
-
- 使用Mock数据代替App/Web埋点数据,然后写SpringBoot程序接收日志数据进行处理(落盘或者send给kafka的Producer),本地使用idea启动SpringBoot程序,测试启动Mock数据的jar后,数据能否写入到磁盘文件中。启动zk、hadoop、kafka的服务,看数据能否发送到kafka的Topic中。
-
- 测试通过后将SpringBoot程序打成jar包上传到节点服务器;接着搭建Nginx负载均衡。
-
日志数据采集
SpringBoot: 分层可以提高代码复用性,解耦
Controller: 拦截用户请求,调用service, 响应请求
Service: 调用DAO,数据处理
DAO (Mapper): 获取数据
持久化层: 存储数据
(1)数据生成脚本存放于/opt/module/applog目录下—>日志数据生成脚本gmall-mock-log.jar,启动脚本时的配置文件application.yml
(3) 使用模拟日志生成器的jar 运行
(4) 目前还没有地址接收日志,所以程序运行后的结果会报错
- 注意:ZooKeeper 从 3.5 开始,AdminServer 的端口也是 8080,如果在本机启动了zk,那么可能看到 404、405 错误,意思是找到请求地址了,但是接收的方式不对。
日志采集模块-本地测试
创建SpringBoot工程
(1)创建Springboot工程
SpringBoot 整合 Kafka
1) 修改 SpringBoot 核心配置文件 application.propeties
2) 在 LoggerController 中添加方法, 将日志落盘并发送到 Kafka 主题中
3) 在 Resources 中添加 logback.xml 配置文件
4) 修改 hadoop102 上的 applog 目录下的 application.yml 配置文件
注意:mock.url 设置为自身 Windows 的 IP 地址
5) 测试
-
运行Windows 上的 Idea 程序 LoggerApplication
-
运行applog 下的jar 包
-
java -jar gmall-mock-log.jar
-
启动 kafka 消费者进行测试
bin/kafka-console-consumer.sh --bootstrap-server flink:9092 --topic ods_base_log -
查看数据消费情况
安装kafka可视化监测工具
1. kafka-Eagle客户端监测工具官网:
- 官网地址
源码: https://github.com/smartloli/kafka-eagle/
官网:https://www.kafka-eagle.org/
下载: http://download.kafka-eagle.org/
安装文档: https://docs.kafka-eagle.org/2.env-and-install
2. 软件安装
-
上传解压
-
cd kafka-eagle-bin-2.0.6
-
tar -zxvf kafka-eagle-web-2.0.6-bin.tar.gz
-
mv kafka-eagle-web-2.0.6 /opt/module/
-
删除kafka-eagle-web-2.0.6-bin空文件夹
-
修改配置
-
cd /opt/module/kafka-eagle-web-2.0.6/
-
cd conf—>vim system-config.properties
-
-
配置Kafka的Jmx
-
修改Kafka启动脚本 vim /opt/module/kafka_2.12-0.11.0.3/bin/kafka-server-start.sh
-
添加JMX的端口
-
配置环境变量
-
vim /etc/profile
-
source /etc/profile
-
启动集群
-
然后启动eagle
-
-
-
启动成功访问页面:http://192.168.191.115:18048
- 用户名【admin】密码【123456】
日志采集模块-打包单机部署
-
将gmall-logger达成jar包上传至/opt/module/applog,并将名字改为gmall-logger-new.jar
-
注意application.yml 中的端口一定要和java程序中配置的端口对应上
-
数据保存到/opt/module/applog/data目录下,运行gmall-logger-new.jar程序和gmall-mock-log.jar 程序,并启动kafka服务和消费者
-
最终数据打印到控制台,显示正常,数据写入log文件正常
至此证明单节点的日志采集和对接kafka完成
日志采集-Nginx负载均衡
负载均衡配置
模拟数据以后应该发给 nginx,然后 nginx 再转发给我们的日志服务器.
日志服务器由于我们是单节点,因此配置一台即可;如果是集群需要配置多台
-
- 打开 nginx 配置文件
- cd /opt/module/nginx/conf
vim nginx.conf 修改如下配置
- 测试,先启动日志服务器的日志处理程序;—>然后启动kafka消费者
- kafka消费者:kafka-console-consumer.sh --bootstrap-server flink:9092 --topic ods_base_log
- 最后启动日志mock程序
- 注意:不要忘了修改mock数据jar程序的对应配置文件application.yml文件中的端口为80
MySQL 的 binlog
(1) 什么是 binlog
MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。
一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景:
- 其一:MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志传递给 slaves 来达到 master- slave 数据一致的目的。
- 其二:自然就是数据恢复了,通过使用 mysqlbinlog 工具来使恢复数据。
二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件。
(2) binlog 的开启
-
开启Mysql的binlog日志检测—> vim /etc/my.cnf
-
测试向数据库中的某张表添加一条数据,看binlog能否监测到
(3) Binlog 的分类设置
mysql binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW
在配置文件中可以选择配置 binlog_format= statement|mixed|row
三种格式的区别:
- statement
语句级,binlog 会记录每次一执行写操作的语句。
相对 row 模式节省空间,但是可能产生不一致性,比如update tt set create_date=now();如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。
优点: 节省空间
缺点: 有可能造成数据不一致。
- row
行级, binlog 会记录每次操作后每行记录的变化。
优点:保持数据的绝对一致性。因为不管sql 是什么,引用了什么函数,他只记录执行后的效果。 缺点:占用较大空间。
- mixed
statement 的升级版,一定程度上解决了,因为一些情况而造成的statement 模式不一致问题;默认还是statement,在某些情况下譬如:
当函数中包含 UUID() 时;包含 AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照 ROW 的方式进行处理。
优点:节省空间,同时兼顾了一定的一致性。
缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要
对 binlog 的监控的情况都不方便。
综合上面对比,Cannel 想做监控分析,选择 row 格式比较合适
Flink-CDC
1 . 什么是 CDC
CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
2. CDC 的种类
CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:
3.Flink-CDC
Flink -CDC是内置了Debezium,然后可以直接获取业务数据库的数据变化,然后将数据直接写入到flink框架中,无需Canal–> kafka–>flink;因此效率比较高。
Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源,开源地址:
https://github.com/ververica/flink-cdc-connectors
FlinkCDC 案例实操
DataStream 方式的应用
pom依赖
代码案例
DataStream API官网案例
-
- 获取执行环境
-
- 通过FlinkCDC构建SourceFunction并读取数据
-
- 打印数据
-
- 启动任务
最终代码案例
案例测试
4) 启动 HDFS 集群
5) 启动程序
6) 在 MySQL 的 gmall-flink.z_user_info 表中添加、修改或者删除数据
7) 给当前的Flink 程序创建 Savepoint
8) 关闭程序以后从 Savepoint 重启程序
FlinkSQL 方式的应用
pom依赖
代码实现
Flink-CDC可以做到断点续传,但是需要设置checkpoint,从保存点读取。
自定义反序列化器
代码实现
原始的反序列化器仅是调用了内置的toString方法,对后期的数据处理不方便。因此需要自定义反序列化器。
自定义反序列化器代码
- FlinkCDC:
Datastream:
优点: 多库多表
缺点: 需要自定义反序列化器(灵活) - FlinkSQL:
优点: 不需要自定义反序列化器
缺点: 单表查询
Maxwell
安装Maxwell
将下载好的压缩包解压
解压 maxwell-1.25.0.tar.gz 到/opt/module 目录
初始化 Maxwell 元数据库
1) 在 MySQL 中建立一个 maxwell 库用于存储 Maxwell 的元数据
2) 设置安全级别
3) 分配一个账号可以操作该数据库
4) 分配这个账号可以监控其他数据库的权限
使用 Maxwell 监控抓取 MySQL 数据
-
拷贝配置文件
-
修改配置文件
注意:
默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱binlog 的顺序。如果要提高并行度,首先设置 kafka 的分区数>1,然后设置 producer_partition_by 属性可选值 producer_partition_by=database|table|primary_key|random| column
- 在/home/bin 目录下编写 maxwell.sh 启动脚本
- 授予执行权限
- 运行启动程序
[root@flink maxwell-1.25.0]$ maxwell.sh
- 启动Kafka 消费客户端,观察结果
为数据库gmall-flink下的base_trademark表中添加一条数据看maxwell能否监测到
Maxwell监测到表的变化,并将这个变化的数据信息写入到kafka,并打印到控制台
Canal 搭建使用教程
什么是 Canal
阿里巴巴 B2B 公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外, 所以衍生出了同步杭州和美国异地机房的需求,从 2010 年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。
Canal 是用java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用Canal Client 来处理获得的相关数据。(数据库同步需要阿里的Otter 中间件,基于Canal)。
使用场景
(1) 原始场景: 阿里 Otter 中间件的一部分
Otter 是阿里用于进行异地数据库之间的同步框架,Canal 是其中一部分。
Canal 的工作原理
(1) MySQL 主从复制过程
- Master 主库将改变记录,写到二进制日志(Binary log)中
- Slave 从库向 mysql master 发送 dump 协议,将master 主库的 binary log events拷贝到它的中继日志(relay log);
- Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库
(2) Canal 的工作原理
很简单,就是把自己伪装成 Slave,假装从Master 复制数据
Canal安装
下载安装包
- https://github.com/alibaba/canal/releases
解压安装
canal 单机版
- 修改/opt/module/canal/conf/canal.properties 的配置 vim canal.properties
- 这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111
- 修改 canal 的输出 model,默认 tcp,改为输出到 kafka
-
tcp 就是输出到 canal 客户端,通过编写 Java 代码处理
-
如果创建多个实例通过前面 canal 架构,我们可以知道,一个 canal 服务中可以有多个 instance,conf/下的每一个example 即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改 canal.properties 中的
canal.destinations=实例 1,实例 2,实例 3。 -
修改 instance.properties
-
我们这里只读取一个 MySQL 数据,所以只有一个实例,这个实例的配置文件在conf/example 目录下
-
-
注意:
默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱binlog 的顺序;如果要提高并行度,首先设置 kafka 的分区数>1,然后设置 canal.mq.partitionHash 属性.
单机 canal 测试
-
启动 canal
- cd /opt/module/canal/
- bin/startup.sh
-
看到CanalLauncher 你表示启动成功,同时会创建gmall2022_db_c 主题
-
启动Kafka 消费客户端测试,查看消费情况
- bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic gmall2020_db_c
-
运行/opt/module/dblog 中生成模拟数据
业务数据库采集
创建实时业务数据库—>gmall-flink
导入sql脚本,修改/etc/my.cnf 文件
重启 MySQL 使配置生效–>systemctl restart mysqld
使用脚本将业务数据和日志数据采集到kafka的对应topic中
复习Flink中的状态state
状态概念
状态区别
图示:
flink中的状态
引入状态编程
Flink 内置的很多算子,数据源 source,数据存储 sink、transform算子 都是有状态的。
flink状态的分类和基本数据结构
总结为2大类8种基本数据结构
- flink中的状态state分两大类:算子状态和键控状态
算子状态的使用
(1) 算子状态operator state
算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。
- 列表状态(List state)
将状态表示为一组数据的列表。 - 联合列表状态(Union list state)
也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。 - 广播状态(Broadcast state)
如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。
(3)operator state代码示例
算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问;
- 总结:算子状态作用范围限定为算子任务,是在一个任务中共享,任务中的算子可以访问。
operator state 代码实现
-
需要实现CheckpointedFunction 或ListCheckpointed接口
- 其中CheckpointedFunction需要实现如下两个方法:
- void snapshotState(FunctionSnapshotContext context) throws Exception
- void initializeState(FunctionInitializationContext context) throws Exception
- 其中CheckpointedFunction需要实现如下两个方法:
-
示例:
键控状态的使用
**键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个 key 对应的状态。**当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。Keyed State 很类似于一个分布式的 key-value map 数据结构,只能用于 KeyedStream(keyBy 算子处理之后)。
(1) Flink 的 Keyed State 支持以下五种数据类型:
-
ValueState[T]保存单个的值,值的类型为 T。
- get 操作: ValueState.value()
- set 操作: ValueState.update(value: T)
-
ListState[T]保存一个列表,列表里的元素的数据类型为 T。基本操作如下:
-
ListState.add(value: T)
-
ListState.addAll(values: java.util.List[T])
-
ListState.get()返回 Iterable[T]
-
ListState.update(values: java.util.List[T])
-
MapState[K, V]保存 Key-Value 对。
- MapState.get(key: K)
- MapState.put(key: K, value: V)
- MapState.contains(key: K)
- MapState.remove(key: K)
-
ReducingState[T]
-
AggregatingState[I, O]
State.clear()是清空操作。
(2)键控状态编程的使用大体上有3种方式:
- 使用富函数,如:main函数中keyBy之后调用算子,转入一个实现富函数 RichFlatMapFunction。
- 通过 RuntimeContext 注册 StateDescriptor。StateDescriptor 以状态 state 的名字和存储的数据类型为参数。
在 open()方法中创建 state 变量。
- 通过 RuntimeContext 注册 StateDescriptor。StateDescriptor 以状态 state 的名字和存储的数据类型为参数。
- 在main函数中,keyby之后调用process方法传入一个KeyedProcessFunction类的实现类。
- keyBy之后直接flatMapWithState ,flatMapWithState[(输出类型的泛型),状态值的泛型]
总结说明:
示例:keyed state的代码实现
第一种方式:
第二种方式:
第三种方式:
更多细节复习flink状态编程和容错机制
DWD 层数据准备实现思路
-
功能 1:环境搭建
-
功能 2:计算用户行为日志 DWD 层
-
功能 3:计算业务数据DWD 层
项目环境
在工程中新建模块 gmall-realtime
创建如下包结构
修改配置文件
在 pom.xml 添加如下配置
在 resources 目录下创建 log4j.properties 配置文件
功能2_准备用户行为日志 DWD 层
我们前面采集的日志数据已经保存到 Kafka 中,作为日志数据的 ODS 层, 从 Kafka 的ODS 层读取的日志数据我们将其分为 3 类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回Kafka 不同主题中,作为日志 DWD 层。
- 页面日志输出到主流
- 启动日志输出到启动侧输出流
- 曝光日志输出到曝光侧输出流
这这一层我们有两大任务:一是识别新老用户,二是利用侧输出流实现数据拆分。
识别新老用户
思路:
本身客户端业务有新老用户的标识is_new,但是不够准确,需要用实时计算再次确认(不涉及业务操作,只是单纯的做个状态确认)。
代码实现
1.接收 Kafka 数据,并进行转换
- 在 Kafka 的工具类中提供获取 Kafka 消费者的方法( 读)
- Flink 调用工具类读取数据的主程序
识别新老访客
保存每个 mid 的首次访问日期,每条进入该算子的访问记录,都会把 mid 对应的首次访问时间读取出来,只有首次访问时间不为空,则认为该访客是老访客,否则是新访客。同时如果是新访客且没有访问记录的话,会写入首次访问时间。
侧输出流实现数据拆分
思路:
根据日志数据内容,将日志数据分为 3 类,页面日志、启动日志和曝光日志。页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光日志侧输出流。另外注意曝光日志是页面日志的一种,因此页面日志包含了曝光日志;处理的时候要格外注意。
将不同流的数据推送到下游 kafka 的不同 Topic(分流)
1) 程序中调用 Kafka 工具类获取 Sink
2) 测试
- IDEA 中运行 baseLogApp 类
- 运行 logger.sh,启动 Nginx 以及日志处理服务
- 运行applog 下模拟生成数据的jar 包
- 到 Kafka 不同的主题下查看输出效果
dwd层日志操作总结
代码实现一共8步:
- 1.获取执行环境
- 2.消费 ods_base_log 主题数据创建流
- 3.将每行数据转换为JSON对象
- 4.新老用户校验 状态编程
- 5.分流 侧输出流 页面:主流 启动:侧输出流 曝光:侧输出流
- 6.提取侧输出流
- 7.将三个流进行打印并输出到对应的Kafka主题中
- 8.启动任务
测试
- 启动nginx.sh start、启动kafka
- 启动kafka消费者[root@flink kafka]# bin/kafka-console-consumer.sh --bootstrap-server flink:9092 --topic ods_base_log
- 启动生成日志数据脚本:cd /opt/module/applog
- java -jar gmall-mock-log.jar
kafka_Eagle页面显示:
功能3_准备业务数据 DWD 层
业务数据的变化,我们可以通过 FlinkCDC 采集到,但是FlinkCDC 是把全部数据统一写入一个 Topic 中, 这些数据包括事实数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从Kafka 的业务数据ODS 层读取数据,经过处理后,将维度数据保存到 Hbase,将事实数据写回Kafka 作为业务数据的 DWD 层。
主要任务
- 接收 Kafka 数据,过滤空值数据
对 FlinkCDC 抓取数据进行 ETL,有用的部分保留,没用的过滤掉。
- 实现动态分流功能
由于FlinkCDC 是把全部数据统一写入一个Topic 中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表。
在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如Hbase,Redis,MySQL 等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。
这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张表, 就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。
这种可以有两个方案实现
-
一种是用Zookeeper 存储,通过 Watch 感知数据变化;
-
另一种是用 mysql 数据库存储,周期性的同步;
-
另一种是用 mysql 数据库存储,使用广播流。
这里选择第二种方案,主要是 MySQL 对于配置数据初始化和维护管理,使用 FlinkCDC读取配置信息表,将配置流作为广播流与主流进行连接。
所以就有了如下图:
- 业务事实数据保存到 Kafka 的主题中
- 维度数据保存到 Hbase 的表中
代码实现
我们首先需要思考的是业务库中有46张表,那么如果使用侧输出流进行处理,就需要写46个if …else语句进行处理;如此将会十分繁琐,还容易出错!另外是如果后期业务数据增加,有新的表,就需要暂停计算程序,然后再补充侧输出流;显然这个操作很不合理!
如果我们可以通过动态的方式根据表创建kafka主题、Hbase表,就方便多了;那么我们该怎么办?
思路是:
-
创建一个gmall-realtime库,然后设计一个table_process表,对这个库开启binlog设置;然而表中字段该如何设置呢?
-
首先,我们处理这些表需要知道数据来源表名、操作类型operate type、写出到那里,也就是sink端是hbase还是kafka,如果是事实数据就写到kafka的主题中,如果是维度数据写出到Hbase;那么对应写出到那个主题或库下面的表;主题名称和hbase表名是什么?
-
其次是写出到topic或表中的字段有哪些,字段是否为主键;另外如果是建表,执行引擎是什么?等等需要一个扩展字段。
table_process表的字段怎么设置
建表语句
MySQL 对于配置数据初始化和维护管理,使用 FlinkCDC读取配置信息表,将配置流作为广播流与主流进行连接形成连接流。
所以就有了如下图:
代码逻辑,一共是9步:
-
- 获取执行环境
-
- 消费Kafka ods_base_db主题数据创建流
-
- 将每行数据转换为SON对象并过滤(delete)–>主流
-
- 使用FlinkCDC消费配置表并处理成–>广播流
-
- 连接主流和广播流
-
- 分流处理数据,广播流数据、主流数据根据广播流数据进行处理)
-
- 提取Kafka流数据和Hbase流数据
-
- 将Kafka数据写入Kafka主题,将Hbase数据写入Phoenix表
-
- 启动任分
接收 Kafka 数据,过滤空值数据
根据mysql的配置表_动态进行分流
1) 引入 pom.xml 依 赖
2) 在 Mysql 中创建数据库,创建配置表 table_process
建表语句:
3) 在 MySQL 配置文件中增加 gmall_realtime 开启 Binlog
4) 创建配置表实体类
5) 编写操作读取配置表形成广播流
6) 程序流程分析
TableProcessFunction(BroadcastProcessFunction)
7) 定义一个项目中常用的配置常量类 GmallConfig
8) 自定义函数 TableProcessFunction
9) 自定义函数 TableProcessFunction-open
10) 自定义函数 TableProcessFunction-processBroadcastElement
11) 自定义函数 TableProcessFunction-checkTable
12) 自定义函数 TableProcessFunction-processElement()
核心处理方法,根据 MySQL 配置表的信息为每条数据打标签,走Kafka 还是 Hbase
13) 自定义函数 TableProcessFunction-filterColumn()
校验字段,过滤掉多余的字段
14) 主程序 baseDBApp 中调用 TableProcessFunction 进行分流
分流 Sink之保存维度到Hbase(Phoenix)
1) 程序流程分析
维度数据写入Hbase
DimSink 继承了 RickSinkFunction,这个 function 得分两条时间线。
- 一条是任务启动时执行 open 操作(图中紫线),我们可以把连接的初始化工作放在此处一次性执行。
- 另一条是随着每条数据的到达反复执行 invoke()(图中黑线),在这里面我们要实现数据的保存,主要策略就是根据数据组合成 sql 提交给 hbase。
2) 因为要用单独的 schema,所以在程序中加入 hbase-site.xml
**注意:**为了开启 hbase 的 namespace 和 phoenix 的 schema 的映射,在程序中需要加这个配置文件,另外在 linux 服务上,也需要在 hbase 以及 phoenix 的 hbase-site.xml 配置文件中,加上以上两个配置,并使用 xsync 进行同步。
3) 在 phoenix 中执行
4) DimSink
5) 主程序 baseDBApp 中调用 DimSink
6) 测试
- 启动 HDFS、ZK、Kafka、FlinkCDCApp、Hbase
- 向 gmall_realtime 数据库的 table_process 表中插入测试数据
运行 idea 中的 baseDBApp
- 向 gmall数据库的 base_trademark 表中插入一条数据
- 通过 phoenix 查看 hbase 的 schema 以及表情况
分流 Sink 之保存业务数据到 Kafka 主题
1) 在 MyKafkaUtil 中添加如下方法
2) 在 MyKafkaUtil 中添加属性定义
3) 两个创建 FlinkKafkaProducer 方法对比
-
前者给定确定的 Topic
-
而后者除了缺省情况下会采用 DEFAULT_TOPIC,一般情况下可以根据不同的业务数据在 KafkaSerializationSchema 中通过方法实现。
4) 在主程序 baseDBApp 中加入新 KafkaSink
- 测试
- 启动 hdfs、zk、kafka、flinkcdc、hbase
- 向 gmall_realtime 数据库的 table_process 表中插入测试数据
在这里插入图片描述
运行 idea 中的 baseDBApp
- 运行rt_dblog 下的 jar 包,模拟生成数据
5) 主程序 baseDBApp 中调用 DimSink
6) 测试
- 启动 HDFS、ZK、Kafka、FlinkCDCApp、Hbase
- 向 gmall_realtime 数据库的 table_process 表中插入测试数据
运行 idea 中的 baseDBApp
- 向 gmall数据库的 base_trademark 表中插入一条数据
- 通过 phoenix 查看 hbase 的 schema 以及表情况
分流 Sink 之保存业务数据到 Kafka 主题
1) 在 MyKafkaUtil 中添加如下方法
2) 在 MyKafkaUtil 中添加属性定义
3) 两个创建 FlinkKafkaProducer 方法对比
-
前者给定确定的 Topic
-
而后者除了缺省情况下会采用 DEFAULT_TOPIC,一般情况下可以根据不同的业务数据在 KafkaSerializationSchema 中通过方法实现。
4) 在主程序 baseDBApp 中加入新 KafkaSink
- 测试
- 启动 hdfs、zk、kafka、flinkcdc、hbase
- 向 gmall_realtime 数据库的 table_process 表中插入测试数据
运行 idea 中的 baseDBApp
-
运行rt_dblog 下的 jar 包,模拟生成数据
-
查看控制台输出以及在配置表中配置的 kafka 主题名消费情况
思路
-
我们根据dws层的统计主题,把所有主题的需求指标都列出来,如果直接可以从dwd层获取的就无需加工;如果需要对数据进行加工,比如过滤去重、形成宽表等,那么这些指标应该放到dwm中间层。
DWM 层的定位是什么,DWM 层主要服务 DWS,因为部分需求直接从 DWD 层到DWS 层中间会有一定的计算量,而且这部分计算的结果很有可能被多个 DWS 层主题复用,所以部分 DWD 层会形成一层DWM,我们这里主要涉及业务。
dwm层的指标:
- 访客UV统计
- 跳出率统计
- 订单宽表
- 支付宽表
DWM 层-访客 UV 计算
DWM 层-跳出明细计算
需求分析与思路
什么是跳出
跳出就是用户成功访问了网站的一个页面后就退出,不在继续访问网站的其它页面。而跳出率就是用跳出次数除以访问次数。
关注跳出率,可以看出引流过来的访客是否能很快的被吸引,渠道引流过来的用户之间的质量对比,对于应用优化前后跳出率的对比也能看出优化改进的成果。
计算跳出行为的思路
首先要识别哪些是跳出行为,要把这些跳出的访客最后一个访问的页面识别出来。那么要抓住几个特征:
-
该页面是用户近期访问的第一个页面
- 这个可以通过该页面是否有上一个页面(last_page_id)来判断,如果这个表示为空,就说明这是这个访客这次访问的第一个页面。
-
**首次访问之后很长一段时间(自己设定),用户没继续再有其他页面的访问。**这第一个特征的识别很简单,保留 last_page_id 为空的就可以了。但是第二个访问的判断,其实有点麻烦,首先这不是用一条数据就能得出结论的,需要组合判断,要用一条存在的数据和不存在的数据进行组合判断。
- 而且要通过一个不存在的数据求得一条存在的数据。更麻烦的他并不是永远不存在,而是在一定时间范围内不存在。那么如何识别有一定失效的组合行为呢?
-
最简单的办法就是Flink 自带的 CEP 技术。这个CEP 非常适合通过多条数据组合来识别某个事件。
用户跳出事件,本质上就是一个条件事件加一个超时事件的组合。