一、湖仓一体的架构
数据湖和数据仓库
既然聊湖仓一体,我们先了解一下什么是湖,什么是仓。数据湖是一个很老的概念,在近些年又被热炒。业界对于数据湖到现在也没有一个统一的定义。AWS 是最早在云上推出数据湖解决方案的云服务提供商,在这里我们便引用 AWS 对数据湖的定义:“数据湖是一个集中式的存储库,允许存储任意结构的数据并且能将它应用于大数据处理,以及进行实时分析和机器学习等相关的应用场景。” 同样我们也借助于 AWS 对数据仓库做这样的定义:“数据仓库是信息的一个中央存储库。” 这里的信息是可对其进行分析,并且可做出更明智的决策。
这个定义还有详细的展开。AWS 这张图通过展示了从湖到仓的数据流向的关系,来演示数据湖与数据仓库之间的区别和联系。首先数据最初是存在于数据湖或是数据库中,然后经过数据筛选和准备之后,就会流向数据仓库来进行一些高价值的分析。这个对比表格很直观的从数据、Schema、性价比、数据质量、用户和分析这 6 个维度给出数据湖和仓的对比。
湖仓一体的先例
今年我们听说阿里巴巴提及的“湖仓一体”的概念。不知道大家有没有想过湖仓一体在业界是否有成功的先例?我个人认为是有的。今年 (2020年)9 月份,一家叫 Snowflake 的公司在纽交所上市。Snowflake 是一家做云数仓的公司,基于云厂商提供的基础设施提供 SaaS 平台,面向中小企业提供数据的托管和分析服务。Snowflake 自称自己是一家云数仓公司,并且在 16 年的数据顶会上发表了一篇论文来介绍他们弹性数仓的架构以及一些技术的细节。
Snowflake 其实是基于云上的对象存储,一份存储多份计算,并且计算与存储分离的这样一套架构。其实这就是 AWS 以及现在主流云厂商所主推的这个数据湖的架构。Snowflake上市的首日,他的市值就飙升到了 700 亿美元的规模。所以我个人认为 Snowflake 可以算是实行湖仓一体的一个最成功的先例。大家可以去了解一下刚谈到的这篇论文。我摘出了这 5 个点来和大家做简单的分享:
首先第一点,是没有走现在传统数仓所广泛应用的 Shared-Nothing 这个架构,而是转向 Shared-Data 这个架构。其次,论文中重点提及的存储和计算分离,是文中我觉得最有价值的一个观点。他提出了统一存储然后弹性计算的这样一个观念。第三,数仓及服务是我认为他们商业化最成功的点。它将数仓提供了一个 SaaS 化的体验,并且摒弃传统上大家认为的数仓是大而重的偏见。第四,高可用这一块是提高用户体验和容错的很关键的一个点。最后,结构化延伸到半结构化这一块已经体现当时他们能够探索湖上通用数据的能力。这虽然是 16 年的一篇论文,但里面的观念并不算陈旧并且仍然值得我们去学习。后续我们会简单介绍几个被我们吸收并且将会去实践的一些点,而且这些点也是 T3 出行在实现湖仓一体上很关键的地方。
Shared - Nothing 架构的优势
首先,作为一个被很多传统的数仓广泛应用的一个架构,Shared-Nothing 还是有一些架构上的优势:
第一点,Table 上的数据可以进行跨节点的水平分区,并且每个节点有自己的本地存储。每个节点的计算资源,只关注处理每个节点自己存储的数据。所以它的另一个优点就是它的处理机制相对简单,是数仓领域很典型的一个架构。Shared - Nothing 架构的劣势
这套架构其实也有一些不足的地方:
最大的一点就是他耦合了计算与存储资源,同时也带来第二个问题,就是弹性不足。具体可以体现在 2 个方面。a、集群在扩缩容的时候,数据需要被大量重分布b、没有办法简单地卸载不用的计算资源第三个问题是,耦合计算和存储资源同时也就造成了它的可用性是相当有限的。由于这些称之为有状态的计算,所以在失败或者是升级的时候会显著影响性能,并会导致服务整体不可用的状态。最后是同构的资源与异构的负载的问题。因为在数仓的场景中,我们有很多异构的负载,比如说批量的加载,查询,报表的大规模计算分析等等。但 Shared-Nothing 架构的资源是同构的,所以这带来两者之间的碰撞。Shared - Data 架构
基于这些问题,Snowflake 提出了一个叫做 Multi-Cluster Shared-Data 架构。这里我们对官方的图做了一个简单的微调。
这个架构的第一个优势是它没有数据孤岛,是一个统一的存储。这也就能够将存储从计算中进行解耦。第二个优势是基于现在的对象存储去容纳结构化和非结构化数据。第三,它的集群规模是可以弹性作用的。第四,上述特征同时也带来了按需计算这个低成本优点。接下来我们以分层的形式来 review 这个架构。从整体上来看,它的结构大致分为三个层次。
最底层是云厂商提供的对象存储,也就是用户的存储。中间层是多用途多份的计算集群。再往上是数据湖的管理服务,它存载的是一个大的 SaaS 化的平台,是对整个底层存储以及计算集群的管理的角色。Shared - Data 的持续高可用
接下来一个点是这个架构的高可用。这里可以简单分解为 2 个方面。第一个是失败容错,第二个是在线升级。
首先,作为一个 SaaS 化的应用,它的容错性是需要体现在整体架构上。这里我们同样分层来回顾一下。
最底层的存储层利用了云厂商的对象存储能力,他本身是一个跨中心复制以及接近无限扩容的一个机制,所以用户基本无需关心。再往上是多元的计算集群。每个计算集群是在同一个数据中心内,来保证它网络传输的性能。这里就提到一个问题,有可能某一个计算集群会有节点失败的问题。假如在一次查询中有一个节点失败,这些计算节点会将这个状态返回上面的服务层。服务层在接受这个失败后,会将这个计算再次传递到可用的节点中进行二次查询。所以 Shared-Data 存储和计算分离的这种架构上节点近乎是无状态的计算。这种架构的一个节点失败就不是一个非常大的问题。再往上服务层对于元数据的存储也是利用了对象存储的这个能力。所以这个服务层基本上可以看做是无状态的服务。最上层是一个负载均衡器,可以进行服务的冗余和负载的均摊。第二点在线升级这一块主要利用两个设计,其实这也并不是很新颖的做法。一个是在计算层和服务层的多方面的映射,然后灰度的切换。这里可以看到在计算层是分多版本的,并且这些版本之间会共享本地的 Cache。服务层的元数据管理也是在多方面共享。这其实也是架构内的子 Shared-Data,对于多版本之间的数据共享能做到再升级和平滑灰度的能力。接下来我的同事(王祥虎)会跟大家介绍这 3 个框架以及它们是如何融合并最终支撑 T3 湖仓一体的实践。在介绍第二个议题前他会先介绍我们的主框架,Hudi 和 Kylin 框架,然后再介绍他们三者之间是如何两两融合。最后再介绍T3是如何构建湖仓一体的。
二、Flink/Hudi/Kylin 介绍与融合
Hudi
首先来了解一下 Hudi 是什么。Hudi 最初是由 Uber 的工程师为了满足他们的数据分析需求设计开发的一个数据湖框架。它于 2019 年 1 月份加入到 Apache 孵化器,并于 2020 年 5 月顺利毕业,成为 Apache 的顶级项目。Hudi 的名字来源于 Hadoop Upserts Deletes and Incrementals 的缩写。也就是说,Hudi 是一个支持插入、更新、删除、以及增量处理的数据湖框架。除此之外,它还支持事务性 ACID 增量处理、存储管理和时间管理。Hudi 能够管理云上超大规模上百 PB 的分析型数据集,对于所有的云服务都开箱即用,非常的方便,而且已经在 Uber 内部稳定运行了接近 4 年。
下图是 Hudi 的插件化架构。我们可以看到,Hudi 在存储、数据处理引擎、表类型、索引类型、查询视图和查询引擎方面都有比较宽松的支持。也就是说,他不与某一个组件绑定。
在存储方面,Hudi 可以支持 HDFS,OSS 和 S3。在数据处理引擎方面,Hudi 支持 Flink 和 Spark。Java 和 Python 客户端已经在社区支持中。Hudi 支持两种表,COW 和 MOR,这两种表分别对应低延迟的查询和快速摄入两种场景。在索引方面,Hudi 支持 Bloom 和 HBase 等 4 种索引类型。底层用了 Parquet 和 Avro 存储数据,社区还正在做 ORC 格式的支持以及 SQL支持,相信不久的将来会跟大家见面。Hudi 支持 3 种查询,读优化查询,增量查询和快照查询。而在查询引擎方面,有 Spark 、Presto、Hive 和 Impala,实际上一些其他的组件已经支持了。
下面详细的介绍一下存储模式和视图。
第一个是 Copy On Write 模式,对应到 Hudi 的 COW 表。它是一种侧重低延时的数据查询场景的表,底层使用 Parquet 数据文件存储数据,能够支持快照查询和增量查询两种查询方式。在查询引擎方面,大家可以看到上面有 5 个引擎,他们对快照查询、增量查询和读优化 3 种视图都有不同程度的支持。Merge On Read 表对 Copy On Write 有不同层面的互补,可以看到它侧重于快速的数据摄入场景。使用 Parquet 文件来存储具体的数据,使用行式 Avro 增量文件来存储操作日志,类似于 HBase WAL。它支持 Hudi 所有 3 种视图,可以看到 Hive,Spark SQL,Spark Datasource, Presto 和 Impala 对于读优化查询都是支持的。而 Hive, Spark SQL 只支持到了快照查询。这种组件支持的信息大家以后可以到官网上查询。在出行业务中,订单会有支付长尾的属性。也就是说一个订单开始之后,它的支付环节可能会拖的比较久。换言之,它可能会在这个用户下一次出行前才进行支付(也或许会更久,甚至永远不支付)。这种长尾属性将会导致一个超长的业务闭环窗口,会导致我们无法准确预测数据的更新时机。如果存在多级更新的话,链路会比较长,更新成本也非常的高。
下图是我们的长尾更新引发的冷数据频繁更新示意图。左侧是业务库,右侧是有依赖关系的 3 张示意表。当业务库有数据更新时,右侧需要更新的数据可能已经归档到性能相对较差的设备上,增加数据更新成本。而且如果这次数据更新会引发长链路级联更新的话,这种慢速的 I/O 还会被进一步放大。
数据的可靠性也是数据 ETL 中不可避免的问题。可能由于机器故障或者计算逻辑导致加工处理的数据失真或者完全不对,就会给运营的决策造成很大的影响。数字延迟性方面,在基于 Hive 构件的传统架构中,由于 Hive 缺少索引机制,所以数据更新大都会导致数据分区重写,且没有办法原地删除。其次小文件问题会增加 NameNode 存储和查询的负担,拖慢进程,在一定程度上增加数据延迟性。
Kylin 框架
我们再来介绍一下这个 Kylin 框架。相比较 Hudi,大家应该会对 Kylin 相对熟悉一些,它是一个开源的分布式分析型数据仓库,能够提供 Hadoop/Spark SQL 之上的数据查询窗口。最初是由 eBay 开放并贡献到开源社区,能够在亚秒内查询巨大的表。它的秘诀其实就是做预计算,针对一个星型拓扑结构数据立方体,预算多个维度组合的度量把结果写出到输出表,对外暴露查询接口实现实时查询,也就是用空间来换取存取时间。
Kylin 在今年的 9 月份发布了 4.0 alpha 版本,这是在 Kylin3 之后一个重大架构升级。使用 Parquet 代替 Hbase 存储,从而提升了文件的扫描性能,也减轻甚至消除了 Hbase 的维护负担。Kylin4 重新实现 Spark 构建引擎和查询引擎,使得计算和存储分离,也更加适用云原生的技术趋势。
Flink/Hudi/Kylin 框架之间的融合
伴随 Kylin3.1 发布,Kylin 与 Flink 就融合已经完成。这个特性是在 2019 年完成的,Kylin 与 Flink 的集成开始于去年 1 月,通过 Flink Batch 实现。关于 Hudi 融合,可以说 Kylin 和 Hudi 天生就是兼容的,因为 Hudi 可以将自己暴露成一张 Hive 表,用户可以像读取 Hive 一样使用 Hudi 的数据,这样对Kylin会非常友好。因为 Kylin 可以把 Hudi 当成一张 Hive 表无缝使用数据。Hudi 和 Flink 融合这个特性是我今年对社区的主要贡献。这个两张截图对应 Hudi 和 Flink 融合路上的2个里程碑式的PR。
第一个 Hudi client 支持多引擎,将 Hudi 与 Spark 解耦,让 Hudi 支持多引擎成为可能。第二个是 Flink 客户端基本实现贡献到社区,让 Hudi 可以真正意义上写入 Flink 数据表。这 2 个改动非常大,加在一起已经超过了 1 万行的代码,也可以说是今年 Hudi 社区比较亮眼的一个特性。Hudi 和 Flink 的融合过程
下面来详细介绍下 Hudi 和 Flink 融合过程。Hudi 原本只支持 Spark 引擎,所以第一步是将 Hudi 与 Spark 解耦之后再去集成我们想要的引擎。
解耦的难点在于 Hudi 最初没有考虑多引擎的支持,所以从数据源读取数据到最终将数据写出到 Hudi 表,RDD 无处不在。连普通的工具类都会使用 RDD 作为基本的操作单元。与 Spark 解耦,我们评估到他的改动非常的大。其次是 Flink 与 Spark 核心抽象上的差异。Spark 认为数据是有限的数据集,而 Flink 认为数据是无界的,是一种数据流。这种抽象上的差异导致我们很难统一出一个通用的抽象。
这次改动对于 Hudi 来说是伤筋动骨的,因此我们决定要优先保证原版 Hudi 的功能和性能,当然也牺牲了部分 Flink Stream API。让 Flink 来操作 list,而用Spark 操作 RDD。这样就可以抽取一个泛型出来形成一个统一的抽象层。
抽象原则:
统一使用泛型 I、K、O 代替。去 Spark 化,抽象层 API 都是引擎无关的,难以在抽象层实现的,我们会把它改为抽象方法下推到 Spark 子类实现。不影响原版,抽象层尽量的减少改动,以保证固定的功能性。引入 HoodieEngineContext 代替 JavaSparkContext, 提供运行时的上下文。下面说 Flink Client DAG,这里主要分了 5 部分,
第一部分是 Kafka Streaming Source,主要用来接收Kafka数据并转换成 List。第二个是 InstantGeneratorOperator,一个 Flink 算子, 用来生成全局唯一的 instant。第三是 KeyBy 分区操作,根据 partitionPath 分区避免多个子任务将数据写入同一个分区造成冲突。第四个是 WriteProcessOperator,这也是我们自定义的一个算子。这个算子是写操作实际发生的地方。第五个是 CommitSink,他会接受上游 WriteProcessOperator 发来的数据,根据上游数据判断是否提交事务。下面是 Flink 更新的代码示例。左侧是原版里面 HoodieWriteClient 简化的版本,可以看到 insert 函数的入参是 RDD,返回值也是 RDD。右侧抽象之后的 abstract 可以看到它的入参变成了泛型I,返回值变成了 O,有兴趣的话大家可以去了解一下。
下面是我们对 Flink 如何融合的另外一个想法,就是希望做出一个 streaming source,使用 Flink 构建一个完整的从 Hudi 表读数据,再写出到 Hudi 表的 ETL 管道。
然后是我们初步的设想。左侧灰色的图里面有 5 列的 Hudi 元数据。最左侧是 hoodie_commit_time 事务列表。每一个 hoodie_commit_time 对应一个事务,每一个事务对应一批的数据。每一批数据中的每一条记录都会有一个提交的序列号,就是第 2 列 hoodie_commit_seqno 序列号。hoodie_commit_time 和 hoodie_commit_seqno 这种映射关系跟 Kafka 中的分区和 offset 的这种映射关系非常类似。后期我们可能会基于这种特点实现一个 Hoodie Streaming Source。
基于这 3 个框架之间的融合关系,我们发现分别用于计算、分析、存储的这 3 个引擎之间是相互兼容的。并且他们能够支持湖仓一体,向云原生体系靠拢。
三、T3 出行结构湖仓一体的实践
最后我们来看一看 T3 出行是如何构建湖仓一体的。这是我们 T3 出行车联网的架构,可以看到是从底向上,从基础支持到上层不停的赋能,并与车企的信息系统、国家信息平台做交互。作为一家车联网驱动的出行公司,我们收集到了人、车、路等相关的数据,每一种数据都有它自己的应用场景, 数据之间并不孤立,相互赋能,共同支持 T3 智慧出行。
这是我们的存储和计算分离的数据库架构,整个架构分为了两层,一层是计算层,一层是存储层。
计算层我们用到了 Flink、Spark、Kylin 和 Presto 并且搭配 ES 做任务调度。数据分析和展示方面用到了达芬奇和 Zeppelin。在存储层,我们使用了阿里云 OSS 并搭配 HDFS 做数据存储。数据格式方面使用 Hudi 作为主要的存储格式,并配合 Parquet、ORC 和 Json 文件。在计算和存储之前,我们加了一个 Alluxio 来加速提升数据处理性能。资源管理方面我用到了 Yarn,在后期时机成熟的时候也会转向 K8s。在当前存储计算分离的趋势下,我们也是以湖存储为核心,在它周围构建了湖加速湖计算、OLAP 分析、交互式查询、可视化等等一整套的大数据生态体系。
T3对 Hudi 的应用场景
下面是我们 T3 内部对 Hudi 的几个应用场景。
一个是近实时的流数据管道。我们可以从左侧通过 Log、MySQL 或者直接读取业务数据的 Kafka,把数据导入到数据管道中,再使用 Flink 或者原版的 DeltaStreamer 将流式数据输入到列表中。近实时的流式数据处理的 Flink UI 界面上可以看到之前介绍的 DAG 的几个算子都在里面,比如 source、instant_generator 等。
另一个是近实时的数据分析场景。我们使用 Hive、Spark 或 Presto 查询数据,并最终用达芬奇或者 Zeppelin 做最终的数据报表。这是我们用 Hudi 构建的增量数据管道。最左侧 CDC 数据捕获之后要更新到后面的一系列的表。有了 Hudi 之后,因为 Hudi 支持索引和增量数据处理,我们只需要去更新需要更新的数据就可以了,不需要再像以前那样去更新整个分区或者更新整个表。
最后的一个场景是将前面介绍的用 Flink 将线上或者业务数据订阅 ETL 到 Hudi 表中供机器学习使用。但是机器学习是需要有数据基础的,所以我们利用 Hudi 将线上的数据增量发布到线下环境,进行模型训练或者调参。之后再将模型发布到线上为我们的业务提供服务。