首页 » 网站建设 » CDC_PHP技巧_37 手游基于 Flink CDC Hudi 湖仓一体筹划实践

CDC_PHP技巧_37 手游基于 Flink CDC Hudi 湖仓一体筹划实践

访客 2024-11-27 0

扫一扫用手机浏览

文章目录 [+]

本文作者是 37 手游大数据开拓徐润柏,先容了 37 手游为何选择 Flink 作为打算引擎,并如何基于 Flink CDC + Hudi 构建新的湖仓一体方案,紧张内容包括:

Flink CDC 基本知识先容

CDC_PHP技巧_37 手游基于 Flink CDC  Hudi 湖仓一体筹划实践

Hudi 基本知识先容

CDC_PHP技巧_37 手游基于 Flink CDC  Hudi 湖仓一体筹划实践
(图片来自网络侵删)

37 手游的业务痛点和技能方案选型

37 手游湖仓一体先容

Flink CDC + Hudi 实践

总结

一、Flink-CDC 2.0

Flink CDC Connectors 是 Apache Flink 的一个 source 真个连接器,目前 2.0 版本支持从 MySQL 以及 Postgres 两种数据源中获取数据,2.1 版本社区确定会支持 Oracle,MongoDB 数据源。

Fink CDC 2.0 的核心 feature,紧张表现为实现了以下三个非常主要的功能:

全程无锁,不会对数据库产生须要加锁所带来的风险;多并行度,全量数据的读取阶段支持水平扩展,使亿级别的大表可以通过加大并行度来加快读取速率;断点续传,全量阶段支持 checkpoint,纵然任务因某种缘故原由退出了,也可通过保存的 checkpoint 对任务进行规复实现数据的断点续传。

Flink CDC 2.0 详解核心改进

二、Hudi

Apache Hudi 目前被业内描述为环绕数据库内核构建的流式数据湖平台 (Streaming Data Lake Platform)。

由于 Hudi 拥有良好的 Upsert 能力,并且 0.10 Master 对 Flink 版本支持至 1.13.x,因此我们选择通过 Flink + Hudi 的办法为 37 手游的业务场景供应分钟级 Upsert 数据的剖析查询能力。

三、37 手游的业务痛点和技能方案选型

1. 旧架构与业务痛点

1.1 数据实时性不足

日志类数据通过 sqoop 每 30min 同步前 60min 数据到 Hive;数据库类数据通过 sqoop 每 60min 同步当天全量数据到 Hive;数据库类数据通过 sqoop 每天同步前 60 天数据到 Hive。

1.2 业务代码逻辑繁芜且难掩护

目前 37 手游还有很多的业务开拓沿用 MySQL + PHP 的开拓模式,代码逻辑繁芜且很难掩护;相同的代码逻辑,每每流处理须要开拓一份代码,批处理则须要另开拓一份代码,不能复用。

1.3 频繁重刷历史数据

频繁地重刷历史数据来担保数据同等。

1.4 Schema 变更频繁

由于业务需求,常常须要添加表字段。

1.5 Hive 版本低

目前 Hive 利用版本为 1.x 版本,并且升级版本比较困难;不支持 Upsert;不支持行级别的 delete。

由于 37 手游的业务场景,数据 upsert、delete 是个很常见的需求。
以是基于 Hive 数仓的架构对业务需求的知足度不足。

2. 技能选型

在同步工具的选型上考虑过 Canal 和 Maxwell。
但 Canal 只适宜增量数据的同步并且须要支配,掩护起来相对较重。
而 Maxwell 虽然比较轻量,但与 Canal 一样须要合营 Kafka 等行列步队利用。
比拟之下,Flink CDC 可以通过配置 Flink connector 的办法基于 Flink-SQL 进行利用,十分轻巧,并且完美契合基于 Flink-SQL 的流批一体架构。

在存储引擎的选型上,目前最热门的数据湖产品当属:Apache Hudi,Apache Iceberg 和 DeltaLake,这些在我们的场景下各有利害。
终极,基于 Hudi 对高下游生态的开放、对全局索引的支持、对 Flink 1.13 版本的支持,以及对 Hive 版本的兼容性 (Iceberg 不支持 Hive1.x 的版本) 等缘故原由,选择了 Hudi 作为湖仓一体和流批一体的存储引擎。

针对上述存在的业务痛点以及选型比拟,我们的终极方案为:以 Flink1.13.2 作为打算引擎,依赖 Flink 供应的流批统一的 API,基于 Flink-SQL 实现流批一体,Flink-CDC 2.0 作为 ODS 层的数据同步工具以及 Hudi-0.10 Master 作为存储引擎的湖仓一体,办理掩护两套代码的业务痛点。

四、新架构与湖仓一体

37 手游的湖仓一体方案,是 37 手游流批一体架构的一部分。
通过湖仓一体、流批一体,准实时场景下做到了:数据同源、同打算引擎、同存储、同打算口径。
数据的时效性可以到分钟级,能很好的知足业务准实时数仓的需求。
下面是架构图:

MySQL 数据通过 Flink CDC 进入到 Kafka。
之以是数据先入 Kafka 而不是直接入 Hudi,是为了实现多个实时任务复用 MySQL 过来的数据,避免多个任务通过 Flink CDC 接 MySQL 表以及 Binlog,对 MySQL 库的性能造成影响。

通过 CDC 进入到 Kafka 的数据除了落一份到离线数据仓库的 ODS 层之外,会同时按照实时数据仓库的链路,从 ODS->DWD->DWS->OLAP 数据库,末了供报表等数据做事利用。
实时数仓的每一层结果数据会准实时的落一份到离线数仓,通过这种办法做到程序一次开拓、指标口径统一,数据统一。

从架构图上,可以看到有一步数据改动 (重跑历史数据) 的动作,之以是有这一步是考虑到:有可能存在由于口径调度或者前一天的实时任务打算结果缺点,导致重跑历史数据的情形。

而存储在 Kafka 的数据有失落效韶光,不会存太久的历史数据,重跑良久的历史数据无法从 Kafka 中获取历史源数据。
再者,如果把大量的历史数据再一次推到 Kafka,走实时打算的链路来改动历史数据,可能会影响当天的实时作业。
以是针对重跑历史数据,会通过数据改动这一步来处理。

总体上说,37 手游的数据仓库属于 Lambda 和 Kappa 混搭的架构。
流批一体数据仓库的各个数据链路有数据质量校验的流程。
第二天对前一天的数据进行对账,如果前一天实时打算的数据无非常,则不须要改动数据,Kappa 架构已经足够。

五、Flink CDC 2.0 + Kafka + Hudi 0.10 实践

1. 环境准备

Flink 1.13.2.../lib/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar (修正 Master 分支的 Hudi Flink 版本为 1.13.2 然后构建).../lib/hadoop-mapreduce-client-core-2.7.3.jar (办理 Hudi ClassNotFoundException)../lib/flink-sql-connector-mysql-cdc-2.0.0.jar../lib/flink-format-changelog-json-2.0.0.jar../lib/flink-sql-connector-kafka_2.11-1.13.2.jar

source 端 MySQL-CDC 表定义:

create table sy_payment_cdc ( ID BIGINT, ... PRIMARY KEY(ID) NOT ENFORCED) with( 'connector' = 'mysql-cdc', 'hostname' = '', 'port' = '', 'username' = '', 'password' = '', 'database-name' = '', 'table-name' = '', 'connect.timeout' = '60s', 'scan.incremental.snapshot.chunk.size' = '100000', 'server-id'='5401-5416');

值得把稳的是:scan.incremental.snapshot.chunk.size 参数须要根据实际情形来配置,如果表数据量不大,利用默认值即可。

Sink 端 Kafka+Hudi COW 表定义:

create table sy_payment_cdc2kafka ( ID BIGINT, ... PRIMARY KEY(ID) NOT ENFORCED) with ( 'connector' = 'kafka', 'topic' = '', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = '', 'properties.group.id' = '', 'key.format' = '', 'key.fields' = '', 'format' = 'changelog-json');create table sy_payment2Hudi ( ID BIGINT, ... PRIMARY KEY(ID) NOT ENFORCED)PARTITIONED BY (YMD)WITH ( 'connector' = 'Hudi', 'path' = 'hdfs:///data/Hudi/m37_mpay_tj/sy_payment', 'table.type' = 'COPY_ON_WRITE', 'partition.default_name' = 'YMD', 'write.insert.drop.duplicates' = 'true', 'write.bulk_insert.shuffle_by_partition' = 'false', 'write.bulk_insert.sort_by_partition' = 'false', 'write.precombine.field' = 'MTIME', 'write.tasks' = '16', 'write.bucket_assign.tasks' = '16', 'write.task.max.size' = '', 'write.merge.max_memory' = '');

针对历史数据入 Hudi,可以选择离线 bulk_insert 的办法入湖,再通过 Load Index Bootstrap 加载数据后接回增量数据。
bulk_insert 办法入湖数据的唯一性依赖源真个数据本身,在接回增量数据时也须要做到担保数据不丢失。

这里我们选择更为大略的调度任务资源的办法将历史数据入湖。
依赖 Flink 的 checkpoint 机制,不管是 CDC 2.0 入 Kafka 期间还是 Kafka 入 Hudi 期间,都可以通过指定 checkpoint 的办法对任务进行重启并且数据不会丢失。

我们可以在配置 CDC 2.0 入 Kafka,Kafka 入 Hudi 任务时调大内存并配置多个并行度,加快历史数据入湖,等到所有历史数据入湖后,再相应的调小入湖任务的内存配置并且将 CDC 入 Kafka 的并行度设置为 1,由于增量阶段 CDC 是单并行度,然后指定 checkpoint 重启任务。

按照上面表定义的参数配置,配置 16 个并行度,Flink TaskManager 内存大小为 50G 的情形下,单表 15 亿历史数据入至 Hudi COW 表示适用时 10 小时,单表 9 亿数据入至 Hudi COW 表示适用时 6 小时。
当然这个耗时很大一部分是 COW 写放大的特性,在大数据量的 upsert 模式下耗时较多。

目前我们的集群由 200 多台机器组成,在线的流打算任务总数有 200 多,总数据量靠近 2PB。

如果集群资源很有限的情形下,可以根据实际情形调度 Hudi 表以及 Flink 任务的内存配置,还可以通过配置 Hudi 的限流参数 write.rate.limit 让历史数据缓慢入湖。

之前 Flink CDC 1.x 版本由于全量 snapshot 阶段单并行度读取的缘故原由,当时亿级以上的表在全量 snapshot 读取阶段就须要耗费很永劫光,并且 checkpoint 会失落败无法担保数据的断点续传。

以是当时入 Hudi 是采取先启动一个 CDC 1.x 的程序将此刻开始的增量数据写入 Kafka,之后再启动其余一个 sqoop 程序拉取当前的所有数据至 Hive 后,通过 Flink 读取 Hive 的数据写 Hudi,末了再把 Kafka 的增量数据从头消费接回 Hudi。
由于 Kafka 与 Hive 的数据存在交集,因此数据不会丢失,加上 Hudi 的 upsert 能力担保了数据唯一。

但是,这种办法的链路太长操作困难,如今通过 CDC 2.0 在全量 snapshot 阶段支持多并行度以及 checkpoint 的能力,确实大大降落了架构的繁芜度。

2. 数据比对

由于生产环境用的是 Hive1.x,Hudi 对付 1.x 还不支持数据同步,以是通过创建 Hive 外部表的办法进行查询,如果是 Hive2.x 以上版本,可参考 Hive 同步章节;创建 Hive 外部表 + 预创建分区;auxlib 文件夹添加 Hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar。

CREATE EXTERNAL TABLE m37_mpay_tj.`ods_sy_payment_f_d_b_ext`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `ID` bigint, ... )PARTITIONED BY ( `dt` string)ROW FORMAT SERDE 'org.apache.hadoop.Hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT 'org.apache.Hudi.hadoop.HoodieParquetInputFormat'OUTPUTFORMAT 'org.apache.hadoop.Hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION 'hdfs:///data/Hudi/m37_mpay_tj/sy_payment'

终极查询 Hudi 数据 (Hive 外部表的形式) 与原来 sqoop 同步的 Hive 数据做比对得到:

总数同等;按天分组统计数量同等;按天分组统计金额同等。
六、总结

湖仓一体以及流批一体架构比拟传统数仓架构紧张有以下几点好处:

Hudi 供应了 Upsert 能力,办理频繁 Upsert/Delete 的痛点;供应分钟级的数据,比传统数仓有更高的时效性;基于 Flink-SQL 实现了流批一体,代码掩护本钱低;数据同源、同打算引擎、同存储、同打算口径;选用 Flink CDC 作为数据同步工具,省却 sqoop 的掩护本钱。

末了针对频繁增加表字段的痛点需求,并且希望后续同步下贱系统的时候能够自动加入这个字段,目前还没有完美的办理方案,希望 Flink CDC 社区能在后续的版本供应 Schema Evolution 的支持。

Reference

[1] MySQL CDC 文档: https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html

[2] Hudi Flink 答疑解惑:https://www.yuque.com/docs/share/01c98494-a980-414c-9c45-152023bf3c17?#

[3] Hudi 的一些设计:https://www.yuque.com/docs/share/5d1c383d-c3fc-483a-ad7e-d8181d6295cd?#

「链接」

本文为阿里云原创内容,未经许可不得转载。

标签:

相关文章

PHP实现文字转图片的代码与应用

图片处理技术在各个领域得到了广泛应用。在PHP编程中,文字转图片功能同样具有很高的实用价值。本文将针对PHP实现文字转图片的代码进...

网站建设 2025-03-02 阅读1 评论0

NAN0017探索新型纳米材料的奥秘与应用

纳米技术作为一门新兴的交叉学科,近年来在材料科学、生物医学、电子工程等领域取得了举世瞩目的成果。其中,NAN0017作为一种新型纳...

网站建设 2025-03-02 阅读5 评论0

L26368XO代码其背后的创新与突破

编程语言在各个领域发挥着越来越重要的作用。在众多编程语言中,L26368XO代码以其独特的优势,成为了业界关注的焦点。本文将深入剖...

网站建设 2025-03-02 阅读1 评论0

HTML字体背景打造个化网页设计的关键元素

网页设计已经成为现代网络传播的重要手段。在众多网页设计元素中,字体和背景的搭配尤为关键。本文将从HTML字体背景设置的角度,探讨其...

网站建设 2025-03-02 阅读1 评论0