Flink 实时读取Mysql 增量⽇志数据并写⼊GreenPlumMysql
FlinkStreamETL
0.功能说明
概括:利⽤Flink实时统计Mysql数据库BinLog⽇志数据,并将流式数据注册为流表,利⽤Flink SQL将流表与Mysql的维表进⾏JOIN,最后将计算结果实时写⼊Greenplum/Mysql。
文豆php1.需求分析
1.1需求实时统计各个地区会议室的空置率,预定率,并在前端看板上实时展⽰。源系统的数据库是Mysql ,它有三张表,分别是:t_meeting_info(会议室预定信息表)、t_meeting_location(属地表,表)、t_meeting_address(会议室属地表,维度表)。
1.2说明
t_meeting_info 表中的数据每时每刻都在更新数据,若通过JDBC ⽅式定时查询Mysql ,会给源系统数据库造成⼤量⽆形的压⼒,甚⾄会影响正常业务的使⽤,并且时效性也不⾼。需要在基本不影响Mysql 正常使⽤的情况下完成对增量数据的处理。
上⾯三张表的DDL 语句如下:
mysql面试题csdnt_meeting_info(会议室预定信息表,这张表数据会实时更新)CREATE  TABLE  `t_meeting_info ` (  `id ` int (11) NOT  NULL  AUTO_INCREMENT  COMMENT  '主键id',  `meeting_code ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci NOT  NULL  COMMENT  '会议业务唯⼀编号',  `msite ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '会议名称',  `mcontent ` varchar (4096) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '会议内容',  `attend_count ` int (5) DEFAULT  NULL  COMMENT  '参会⼈数',  `type ` int (5) DEFAULT  NULL  COMMENT  '会议类型 1 普通会议 2 融合会议 3 视频会议 4 电话会议',  `status ` int (255) DEFAULT  NULL  COMMENT  '会议状态 ',  `address_id ` int (11) DEFAULT  NULL  COMMENT  '会议室id',  `email ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '创建⼈邮箱',  `contact_tel ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '',  `create_user_name ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '创建⼈姓名',  `create_user_id ` varchar (100) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '创建⼈⼯号',  `creator_org ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '创建⼈组织',  `m
start_date ` datetime  DEFAULT  NULL  COMMENT  '会议开始时间',  `mend_date ` datetime  DEFAULT  NULL  COMMENT  '会议结束时间',  `create_time ` datetime  DEFAULT  NULL  COMMENT  '创建时间',  `update_user ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '更新⼈',  `update_time ` datetime  DEFAULT  NULL  COMMENT  '更新时间',  `company ` int (10) DEFAULT  NULL  COMMENT  '会议所在属地code',  `sign_status ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '预留字段',  PRIMARY  KEY  (`id `) USING  BTREE ,  KEY  `t_meeting_info_meeting_code_index ` (`meeting_code `) USING  BTREE ,  KEY  `t_meeting_info_address_id_index ` (`address_id `) USING  BTREE ,  KEY  `t_meeting_info_create_user_id_index ` (`create_user_id `)) ENGINE =InnoDB  AUTO_INCREMENT =65216 DEFAULT  CHARSET =utf8 ROW_FORMAT =DYNAMIC COMMENT ='会议主表';
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
js幻灯片轮播19
20
21
22
23
24
25
26
t_meeting_location(属地表,地区维表)
t_meeting_address(会议室属地表,会议室维表)
2.实现⽅案⽅案如下图所⽰:利⽤Canal监听Mysql数据库的增量BinLog⽇志数据(JSON格式)将增量⽇志数据作为Kafka的⽣产者,Flink解析Kafka的Topic 中的数据并消费将计算后的流式数据(Stream)注册为Flink 中的表(Table)
最后利⽤Flink与t_meeting_location、t_meeting_address维表进⾏JOIN,将最终的结果写⼊数据库。CREATE  TABLE  `t_meeting_location ` (  `id ` int (11) NOT  NULL  AUTO_INCREMENT ,  `short_name ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '属地简称',  `full_name ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '属地全称',  `code ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci NOT  NULL  COMMENT  '属地code',  `region_id ` int (11) DEFAULT  NULL  COMMENT  '地区id',  `create_user ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '创建⼈',  `update_user ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '更新⼈',  `create_time ` datetime  DEFAULT  NULL  COMMENT  '创建时间',  `update_time ` datetime  DEFAULT  NULL  CO
MMENT  '更新时间',  PRIMARY  KEY  (`id `) USING  BTREE ,  UNIQUE  KEY  `t_meeting_location_code_uindex ` (`code `) USING  BTREE ) ENGINE =InnoDB  AUTO_INCREMENT =103 DEFAULT  CHARSET =utf8 ROW_FORMAT =DYNAMIC COMMENT ='属地表';1
2
3
特效相机下载安装
4
5
6
7
8
9
10
11
12
13CREATE  TABLE  `t_meeting_address ` (  `id ` int (11) NOT  NULL  AUTO_INCREMENT  COMMENT  '主键id',  `name ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '会议室名称',  `location ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '所在属地',  `shared ` int (3) DEFAULT  NULL  COMMENT  '是否共享 0 默认不共享 1 全部共享 2 选择性共享',  `cost ` int (10) DEFAULT  NULL  COMMENT  '每⼩时成本',  `size ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '会议室容量⼤⼩',  `bvm_ip ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  'BVM IP',  `type ` int (2) DEFAULT  NULL  COMMENT  '会议室类型 1 普通会议室  2 视频会议室',  `create_time ` datetime  DEFAULT  NULL  COMMENT  '创建时间',  `create_user ` varchar (255) CHARACTER  SET  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '创建⼈',  `update_time ` datetime  DEFAULT  NULL  COMMENT  '更新时间',  `update_user ` varchar (255) CHARACTER  SE
T  utf8 COLLATE  utf8_general_ci DEFAULT  NULL  COMMENT  '更新⼈',  `status ` int (2) DEFAULT  NULL  COMMENT  '是否启⽤ ,0 未启⽤ 1已启⽤ 2已删除',  `order ` int (5) DEFAULT  NULL  COMMENT  '排序',  `approve ` int (2) DEFAULT  NULL  COMMENT  '是否审批 0 不审批 1 审批',  PRIMARY  KEY  (`id `) USING  BTREE ,  KEY  `t_meeting_address_location_index ` (`location `) USING  BTREE ,  KEY  `order ` (`order `) USING  BTREE ) ENGINE =InnoDB  AUTO_INCREMENT =554 DEFAULT  CHARSET =utf8 ROW_FORMAT =DYNAMIC COMMENT ='会议室表';
1
2
3
4
5
return0不写会怎么样6
7
8
9
10
11
12
13
14
15
16
17
ligerui树配置
18
19
20
需要服务器:CentOS7,JDK8、Scala 2.12.6、Mysql、Canal、Flink1.9、Zookkeeper、Kafka 3.可视化⽅案
Tableau实时刷新Greenplum,FineBI也可以(秒级)
DataV也可以每⼏秒刷新⼀次
Flink计算后的结果,写⼊到缓存,前端开发可视化组件进⾏展⽰(实时展⽰)。
4.项⽬地址
由于CSDN不⽅便粘贴图⽚,详细内容请见:
5.参考⽬录
[1].
[2].
[3].
[4].
[5].
[6].
[7].
[8].
[9].
[10].Flink` 流与维表的关联
[11].Flink DataStream流表与维表Join(Async` I/O)
12. `flink 流表join mysql表
13. `flink1.9 使⽤LookupableTableSource实现异步维表关联
14. Flink异步之⽭盾-锋利的Async I/O
15.Flink 的时间属性及原理解析