阿里云Flink+Hologres:構建企業(yè)級一站式實時數(shù)倉
作者|徐榜江 余文兵 趙紅梅
隨著大數(shù)據(jù)的迅猛發(fā)展,企業(yè)越來越重視數(shù)據(jù)的價值,這就意味著需要數(shù)據(jù)盡快到達企業(yè)分析決策人員,以最大化發(fā)揮數(shù)據(jù)價值。企業(yè)最常見的做法就是通過構建實時數(shù)倉來滿足對數(shù)據(jù)的快速探索。在業(yè)務建設過程中,實時數(shù)倉需要支持數(shù)據(jù)實時寫入與更新、業(yè)務敏捷快速響應、數(shù)據(jù)自助分析、運維操作便捷、云原生彈性擴縮容等一系列需求,而這就依賴一個強大的實時數(shù)倉解決方案。阿里云實時計算 Flink 版(以下簡稱“阿里云 Flink”)提供全增量一體化數(shù)據(jù)同步技術、強大的流式 ETL 等能力,支持海量數(shù)據(jù)實時入倉入湖。阿里云 Hologres 作為新一代實時數(shù)倉引擎能同時解決 OLAP 多維分析、在線服務、離線數(shù)據(jù)加速等多個業(yè)務查詢場景,通過阿里云 Flink 與 Hologres 的強強結合,實現(xiàn)全鏈路的數(shù)據(jù)探索實時化、數(shù)據(jù)分析敏捷化,快速助力業(yè)務構建企業(yè)級一站式實時數(shù)倉,實現(xiàn)更具時效更智能的業(yè)務決策。
在本文中,我們將會介紹阿里云 Flink、阿里云 Hologres 在構建實時數(shù)倉上所具備的核心能力以及二者結合的最佳解決方案,用戶通過阿里云 Flink+Hologres 實時數(shù)倉解決方案,可以顯著降低數(shù)倉建設門檻,讓數(shù)據(jù)發(fā)揮更大的價值,助力各行各業(yè)實現(xiàn)數(shù)字化升級。
Flink CDC 核心能力
Apache Flink 是開源的大數(shù)據(jù)流式計算引擎,支持處理數(shù)據(jù)庫、Binlog、在線日志等多種實時數(shù)據(jù),提供端到端亞秒級實時數(shù)據(jù)分析能力,并通過標準 SQL 降低實時業(yè)務開發(fā)門檻。伴隨著實時化浪潮的發(fā)展和深化,F(xiàn)link 已逐步演進為流處理的領軍角色和事實標準,并蟬聯(lián) Apache 社區(qū)最活躍項目。
Flink CDC 是阿里云計算平臺事業(yè)部 2020 年 7 月開源的一款數(shù)據(jù)集成框架,與 Flink 生態(tài)深度融合,具有全增量一體化、無鎖讀取、并發(fā)讀取、分布式架構等技術優(yōu)勢,既可以替代傳統(tǒng)的 DataX 和 Canal 工具做數(shù)據(jù)同步,也支持數(shù)據(jù)庫數(shù)據(jù)實時入湖入倉,同時還具備強大的數(shù)據(jù)加工能力。
在構建實時數(shù)倉的過程中,數(shù)據(jù)采集是必需的組件。在傳統(tǒng)的 ETL 架構里,采集層國外用戶通常選擇 Debezium,國內(nèi)用戶則習慣用 DataX 和 Canal,采集工具負責采集數(shù)據(jù)庫的全量數(shù)據(jù)和增量數(shù)據(jù)。采集到的數(shù)據(jù)會輸出到消息中間件如 Kafka,然后通過 Flink 計算引擎實時消費消息中間件數(shù)據(jù)做計算層的數(shù)據(jù)清洗和數(shù)據(jù)加工,加工完成后再寫入目的端(裝載層),通常是各種數(shù)據(jù)庫、數(shù)據(jù)湖和數(shù)據(jù)倉庫。在傳統(tǒng) ETL 鏈路中,數(shù)據(jù)采集工具與消息隊列是比較重的組件,可能維護在不同的團隊,在上游的數(shù)據(jù)源有業(yè)務變更或者這些組件需要升級維護時,整個鏈路的維護成本會非常大。
通過使用 Flink CDC 去替換上圖中的數(shù)據(jù)采集組件與消息隊列,將采集層(Extraction)和計算層(Transformation)合并,簡化了整個 ETL 分析鏈路,用戶可以使用更少的組件完成數(shù)據(jù)鏈路的搭建,整體架構帶來更低的運維開銷和更少的硬件成本、更好的數(shù)據(jù)鏈路穩(wěn)定性、以及降低端到端的數(shù)據(jù)延遲。除了穩(wěn)定性的提升,F(xiàn)link CDC 的另一個優(yōu)勢就是用戶只需要寫 SQL 腳本就能完成 CDC 數(shù)據(jù)的清洗,加工和同步,極大地降低了用戶使用門檻。
除全增量一體化同步能力外,阿里云 Flink CDC 還提供了表結構變更自動同步、整庫同步、分庫分表合并同步等諸多企業(yè)級特性,方便用戶快速打通數(shù)據(jù)孤島,實現(xiàn)業(yè)務價值。
1.1 全增量一體化同步
Flink CDC 通過增量快照讀取算法在開源數(shù)據(jù)集成領域率先支持了無鎖讀取、并行讀取、斷點續(xù)傳、不丟不重四個重要特性。其中無鎖讀取徹底解決了數(shù)據(jù)同步對上游業(yè)務數(shù)據(jù)庫的死鎖風險,并行讀取很好地滿足了海量數(shù)據(jù)同步的需求,斷點續(xù)傳和不丟不重特性則是提升了同步鏈路的穩(wěn)定性和可靠性。
增量快照讀取算法的核心思路就是在全量讀取階段把表分成一個個 chunk 進行并發(fā)讀取,在進入增量階段后只需要一個 task 進行單并發(fā)讀取 Binlog 日志,在全量和增量自動切換時,通過無鎖算法保障一致性。這種設計在提高讀取效率的同時,進一步節(jié)約了資源,實現(xiàn)了全增量一體化的數(shù)據(jù)同步。配合阿里云實時計算產(chǎn)品提供的資源自動調(diào)優(yōu)特性,F(xiàn)link CDC 作業(yè)的資源可以做到自動擴縮容,無需手動介入。
1.2 表結構變更自動同步
隨著業(yè)務的迭代和發(fā)展,數(shù)據(jù)源的表結構變更是經(jīng)常會發(fā)生的操作。用戶需要及時地去修改數(shù)據(jù)同步作業(yè)以適配最新的表結構,這一方面帶來了較大的運維成本,也影響了同步管道的穩(wěn)定性和數(shù)據(jù)的時效性。阿里云 Flink 支持通過 Catalog 來實現(xiàn)元數(shù)據(jù)的自動發(fā)現(xiàn)和管理,配合 CTAS (Create Table AS)語法,用戶可以通過一行 SQL 實現(xiàn)數(shù)據(jù)的同步和表結構變更自動同步。
Flink SQL> USE CATALOG holo;
Flink SQL> CREATE TABLE user AS TABLE mysql.`order_db`.`user`;
CTAS 語句會解析成一個 Flink 作業(yè)執(zhí)行,這個 Flink 作業(yè)源頭支持讀取數(shù)據(jù)變更和表結構變更并同步到下游,數(shù)據(jù)和表結構變更都可以保證順序,上述 CTAS 語句運行時結構變更同步的效果如下圖所示。
示例如果在上游 MySQL 的 user 表中新增一列 age,并插入一條 id 為 27,年齡為 30 的記錄。
MySQL> ALTER TABLE `user` ADD COLUMN `age` INT;
MySQL> INSERT INTO `user` (id, name, age) VALUES (27, 'Tony', 30);
user 表上的數(shù)據(jù)和結構變更都能實時地自動同步到下游 Hologres 的 user 表中,id 為 12,16 和 19 的歷史數(shù)據(jù),新增的列會自動補 NULL 值。
1.3 整庫同步
在實時數(shù)倉構建中,用戶經(jīng)常需要將整個數(shù)據(jù)庫同步到數(shù)倉中做進一步的分析,一張表一個同步作業(yè)的方式不但浪費資源,也會給上游數(shù)據(jù)庫產(chǎn)生較大的壓力。針對這類用戶痛點,阿里云 Flink CDC 提供了整庫同步特性。整庫同步功能通過 CDAS (Create Database AS) 語法配合 Catalog 實現(xiàn)。
Flink SQL> USE CATALOG holo;
Flink SQL> CREATE DATABASE holo_order AS DATABASE
mysql.`order_db` INCLUDING ALL TABLES;
例如 MySQL Catalog 和 Hologres Catalog 配合 CDAS 語法,可以完成 MySQL 到 Hologres 的全增量數(shù)據(jù)同步。CDAS 語句會解析成一個 Flink 作業(yè)執(zhí)行,這個 Flink 作業(yè)自動解析源表的表結構及相應的參數(shù),并將指定的一個或多個數(shù)據(jù)庫同步到下游 Hologres 數(shù)倉中,整個過程用戶無需手寫 DDL 語句,無需用戶在 Hologres 提前創(chuàng)建表,就能快速實現(xiàn)數(shù)據(jù)的整庫同步。
CDAS 作業(yè)默認提供表結構變更同步能力,所有表的結構變更都會按照發(fā)生順序同步至下游 Hologres 實時數(shù)倉,CDAS 語法也支持過濾不需要同步的表。
1.4 分庫分表合并同步
分庫分表是高并發(fā)業(yè)務系統(tǒng)采用的經(jīng)典數(shù)據(jù)庫設計,通常我們需要將分庫分表的業(yè)務數(shù)據(jù)匯聚到一張數(shù)倉中的大表,方便后續(xù)的數(shù)據(jù)分析,即分庫分表合并同步的場景。針對這種場景,阿里云 Flink CDC 提供了分庫分表合并同步特性,通過在 CTAS 語法支持源庫和源表的正則表達式,源數(shù)據(jù)庫的分表可以高效地合并同步到下游 Hologres 數(shù)倉中。
Flink SQL> USE CATALOG holo;
Flink SQL> CREATE TABLE order AS TABLE mysql.`order_db.*`.`order_.*`;
述 CTAS 語句中的源庫名 order_db.* 是個正則表達式,可以匹配當前 MySQL 實例下的 order_db01,order_db02 和 order_db03 三個庫,源表名 order* 也是個正則表達式,可以匹配三個庫下所有以 order打頭的表。
針對分庫分表同步場景,用戶只需要提供分庫分表的正則表達式就可以將這多張分庫分表合并同步到下游 Hologres 數(shù)倉的 ordder 表中。與其他 CDAS 語句一樣,分庫分表同步場景默認提供表結構變更自動同步特性,下游 Hologres 表的 schema 為所有分表合并后的最寬 schema。分庫分表同步時每行記錄所屬的庫名和表名會作為額外的兩個字段自動寫入到 user 表中,庫名(上圖中 db 列)、表名(上圖中 tbl 列)和原主鍵(上圖中 id 列) 會一起作為下游 Hologres user 表的聯(lián)合主鍵,保證 Hologres user 表上主鍵的唯一性。
Hologres 核心能力
阿里云 Hologres 是自研的一站式實時數(shù)據(jù)倉庫引擎,支持海量數(shù)據(jù)實時寫入、實時更新、實時分析,支持標準 SQL(兼容 PostgreSQL 協(xié)議),提供 PB 級數(shù)據(jù)多維分析(OLAP)與即席分析以及高并發(fā)低延遲的在線數(shù)據(jù)服務(Serving),與阿里云 Flink、MaxCompute、DataWorks 等深度融合,為企業(yè)提供離在線一體化全棧數(shù)倉解決方案。
2.1 高性能實時寫入與更新
數(shù)據(jù)寫入的時效性是實時數(shù)倉的重要能力之一。對于 BI 類等延遲不敏感的業(yè)務查詢,如果寫入時延幾秒甚至幾分鐘可能是可以接受的。而對于很多生產(chǎn)系統(tǒng),如實時風控、實時大屏等場景,要求數(shù)據(jù)寫入即可見。如果寫入出現(xiàn)延遲,就會查詢不到最新的數(shù)據(jù),嚴重影響線上業(yè)務決策。在實時數(shù)倉整個數(shù)據(jù)處理鏈路中,Hologres 作為一站式實時數(shù)據(jù)倉庫引擎,提供海量數(shù)據(jù)高性能的實時寫入,數(shù)據(jù)寫入即可查詢,無延遲。
同時在數(shù)倉場景上,數(shù)據(jù)來源復雜,會涉及到非常多的數(shù)據(jù)更新、修正的場景,Hologres 可以通過主鍵(Primary Key, PK)提供高性能的 Upsert 能力,整個寫入和更新過程確保 Exactly Once,滿足對對數(shù)據(jù)的合并、更新等需求。
下圖為 Hologres 128C 實例下,10 個并發(fā)實時寫入 20 列的列存表的測試結果。其中豎軸表示每秒寫入記錄數(shù),橫軸為 4 個寫入場景:
●Append Only:寫入表無主鍵,寫入能力 230 萬+的 RPS。
●INSERT:寫入表有主鍵,如果主鍵沖突就丟棄新行,寫入能力 200 萬 RPS。
●UPDATE-1:寫入表有主鍵,表中原始數(shù)據(jù)量為 2 億,按照主鍵 Upsert,寫入能力 80 萬的 RPS。
●UPDATE-2:寫入表有主鍵,表中數(shù)據(jù)量為 20 億,按照主鍵做 Upsert,寫入能力 70 萬的 RPS。
2.2 實時 OLAP 分析
Hologres 采用可擴展的 MPP 全并行計算,支持行存、列存、行列共存等多種存儲模式,同時支持多種索引類型。通過分布式處理 SQL 以及向量化的算子,能夠?qū)?CPU 資源發(fā)揮到極致,從而支持海量數(shù)據(jù)亞秒級分析,無需預計算,就能支持實時多維分析、即席分析等多種實時 OLAP 分析的場景,再直接無縫對接上層應用/服務,滿足所見即所得的分析體驗。
下圖為 Hologres 128C 實例下,TPCH 100G 標準數(shù)據(jù)集下的測試結果,橫軸表示 query,縱軸是響應時間:
隨著實時數(shù)倉的廣泛應用,越來越多的企業(yè)把實時數(shù)倉作為在線服務系統(tǒng)提供在線查詢。Hologres 作為 HSAP(Hybrid Serving and Analytics Processing, 服務與分析一體化)的最佳落地實踐,除了具備處理分析型 Query 的能力外,還具備十分強大的在線服務 Serving 能力(高 QPS 點查),例如 KV 點查與向量檢索。在 KV 點查場景中,Holgres 通過 SQL 接口可以支持百萬級的 QPS 吞吐與極低的延時。通過 Hologres 能夠做到一套系統(tǒng)、一份數(shù)據(jù)支持同時 OLAP 分析和在線服務兩種場景,簡化數(shù)據(jù)架構。
下圖為 Hologres 128C 實例下,CPU 消耗 25%的點查測試性能:
實時數(shù)據(jù)倉庫 Hologres 提供高 QPS 低延遲的寫入能力,支持在線服務的查詢場景,還支持復雜的多維分析 OLAP 查詢。當不同類型,不同復雜的任務請求到 Hologres 實例上時,Hologres 不僅需要確保任務的正常運行,還要確保系統(tǒng)的穩(wěn)定性。當前 Hologres 支持通過共享存儲的一主多從子實例的高可用架構,實現(xiàn)了完整的讀寫分離功能,保障 不同業(yè)務場景的 SLA。
1.讀寫分離:實現(xiàn)了完整的讀寫分離功能,保障不同業(yè)務場景的 SLA,在高吞吐的數(shù)據(jù)寫入和復雜的 ETL 作業(yè)、OLAP 查詢、AdHoc 查詢、在線服務等場景中,系統(tǒng)負載物理上完全隔離,不會因?qū)懭肴蝿债a(chǎn)生了查詢?nèi)蝿盏亩秳印?/p>
2.多類型負載資源隔離:一個主實例可以配置四個只讀實例,實例之間可以根據(jù)業(yè)務情況配置不同規(guī)格,系統(tǒng)負載物理上完全隔離,避免相互影響而帶來抖動。
3.實例間數(shù)據(jù)毫秒級異步同步延遲:P99 5ms 內(nèi)。
2.5 Binlog 訂閱
類似于傳統(tǒng)數(shù)據(jù)庫 MySQL 中的 Binlog 概念,Binlog 用來記錄數(shù)據(jù)庫中表數(shù)據(jù)的修改記錄,比如 Insert/Delete/Update 的操作。在 Hologres 中,表的 Binlog 是一種強 Schema 格式的數(shù)據(jù),Binlog 記錄的序列號(BigInt),在單 shard 內(nèi)單調(diào)遞增,類似于 Kafka 中的 Offset 概念。通過阿里云 Flink 消費 Hologres Binlog,可以實現(xiàn)數(shù)倉分層間的全鏈路實時開發(fā),在分層治理的前提下,縮短數(shù)據(jù)加工端到端延遲,同時提升實時數(shù)倉分層的開發(fā)效率。
阿里云 Flink x Hologres 一站式企業(yè)級實時數(shù)倉解決方案
3.1 實時數(shù)倉 ETL
ETL( Extract-Transform-Load)是比較傳統(tǒng)的數(shù)據(jù)倉庫建設方法,業(yè)務庫的數(shù)據(jù) Binlog 經(jīng)過阿里云 Flink 的 ETL 處理之后,數(shù)據(jù)寫入到實時數(shù)倉 Hologres 中,然后進行各類數(shù)據(jù)查詢分析。ETL 的方法核心是需要在數(shù)倉中具備完善的數(shù)倉模型分層,通常按照 ODS(Operational Data Source)> DWD(Data Warehouse Detail)> DWS(Data Warehouse Summary)> ADS(Application Data Service)分層,整個數(shù)倉鏈路比較完善。
在這個鏈路中,需要將數(shù)據(jù)源比如 MySQL 的 Binlog 數(shù)據(jù)通過阿里云 Flink CDC 同步到消息隊列 Kafka,再通過阿里云 Flink 將 ODS 的數(shù)據(jù)進行過濾,清洗,邏輯轉化等操作,形成對不同的業(yè)務主題模型的 DWD 數(shù)據(jù)明細層,同時將數(shù)據(jù)發(fā)送到 Kafka 集群,之后再通過阿里云 Flink 將 DWD 的數(shù)據(jù)進行輕度的匯總操作,形成業(yè)務上更加方便查詢的 DWS 輕度匯總層數(shù)據(jù),再將數(shù)據(jù)寫入 Kafka 集群。最后再面向業(yè)務具體的應用層的需求,在 DWS 層基礎上通過阿里云 Flink 實時處理形成 ADS 數(shù)據(jù)應用層,寫入實時數(shù)倉 Hologres 進行存儲和分析,支持業(yè)務各種不同類型的報表,畫像等業(yè)務場景。
實時數(shù)倉 ETL 的處理優(yōu)點是數(shù)倉各種層次比較完備,職責清晰,但是缺點是 Flink 結合 Kafka 集群維護復雜,處理鏈路比較長,歷史數(shù)據(jù)修正復雜,ADS 應用層的數(shù)據(jù)實時性會弱,其次數(shù)據(jù)在各個 Kafka 中不便于查詢,不便于檢查數(shù)據(jù)質(zhì)量,也不便于實現(xiàn) schema 的動態(tài)變化。
3.2 實時數(shù)倉 ELT
隨著業(yè)務對數(shù)據(jù)的時效性要求越來越高時,相較于 ETL 復雜繁雜的處理鏈路,業(yè)務需要更快速的將數(shù)據(jù)實時入倉,因此 ELT 變成了比較流行的處理方法。ELT 是英文 Extract-Load-Transform 的縮寫,我們可將 ELT 理解為一個數(shù)據(jù)遷移集成的過程。在這個過程中,我們可以對數(shù)據(jù)源關系型數(shù)據(jù)庫比如 MySQL、PostgresSQL 和非關系型數(shù)據(jù)庫比如 HBase、Cassandra 等業(yè)務庫的 Binlog,消息隊列比如 Datahub、Kafka 中的埋點采集日志等數(shù)據(jù),經(jīng)過阿里云 Flink 實時抽取,然后加載到 Hologres 中進行相關的 OLAP 分析和在線服務。
在這個鏈路中,阿里云 Flink 負責數(shù)據(jù)的實時入倉以及數(shù)據(jù)的清洗關聯(lián),清洗后的數(shù)據(jù)實時寫入 Hologres,由 Hologres 直接存儲明細數(shù)據(jù)。在 Hologres 中可以簡化分層,以明細層為主,按需搭配其他匯總層,通過 Hologres 強大的數(shù)據(jù)處理能力直接對接報表、應用等上層查詢服務。上層的分析 SQL 無法固化,通常在 ADS 層以邏輯視圖(View)封裝 SQL 邏輯,上層應用直接查詢封裝好的 View,實現(xiàn)即席查詢。
實時數(shù)倉中采取 ELT 的方式進行建設,會給數(shù)據(jù)和業(yè)務帶來比較大的收益,詳細如下:
●靈活性:將原始的業(yè)務數(shù)據(jù)直接入倉,形成 ODS 層的數(shù)據(jù),在數(shù)倉中通過 View 可以靈活地對數(shù)據(jù)進行轉換(Transformation)的處理,View 可以隨時根據(jù)業(yè)務進行調(diào)整。
●成本低:數(shù)據(jù)倉庫的架構比較清晰,鏈路比較短,運維成本比較低。
●指標修正簡單:上層都是 View 邏輯封裝,只需要更新底表的數(shù)據(jù)即可,無需逐層修正數(shù)據(jù)。
但是該方案也存在一些缺點,當 View 的邏輯較為復雜,數(shù)據(jù)量較多時,查詢性能較低。因此比較適合于數(shù)據(jù)來源于數(shù)據(jù)庫和埋點系統(tǒng),對 QPS 要求不高,對靈活性要求比較高,且計算資源較為充足的場景。
3.3 實時數(shù)倉分層(Streaming Warehouse 方案)
按照傳統(tǒng)數(shù)倉的開發(fā)方法論,采用 ODS>DWD>DWS>ADS 開發(fā)的方法,通過阿里云 Flink 和 Hologres Binlog 的組合關系,支持層與層之間有狀態(tài)的全鏈路事件實時驅(qū)動。在該方案中,數(shù)據(jù)通過阿里云 Flink CDC 實時入倉至 Hologres,再通過阿里云 Flink 訂閱 Hologres Binlog,實現(xiàn)數(shù)據(jù)在不同層次之間的連續(xù)加工,最后寫入 Hologres 對接應用查詢。
通過這個方案,Hologres 可以達到像 Kafka、Datahub 等消息隊列同等的能力,增加數(shù)據(jù)復用的能力,一個 Table 的數(shù)據(jù)既可以提供給下游阿里云 Flink 任務消費,還可以對接上游 OLAP/在線服務查詢,不僅節(jié)省了成本,還簡化數(shù)倉架構,同時也讓數(shù)倉中的每一個層次都可以實時構建、實時查詢,提升數(shù)據(jù)的流轉效率。
在實時數(shù)倉中,流計算任務和批處理任務都是分兩條工作流進行開發(fā)的,也即是 Kappa 架構模式。在這套數(shù)倉架構中,會存在人力成本過高,數(shù)據(jù)鏈路冗余,數(shù)據(jù)口徑不一致,開發(fā)效率低下的一些問題。
為了解決這些問題,阿里云 Flink+Hologres 提供了流批一體的能力。在該場景中,將輸入層統(tǒng)一變成 Hologres,通過一套業(yè)務邏輯代碼達到流和批處理的能力,其中 Flink SQL 的 Stream 任務消費 Hologres Binlog 提供流式處理,F(xiàn)link SQL 的 Batch 任務讀取 Hologres 表的原始數(shù)據(jù)達到批處理能力,經(jīng)過 Flink 統(tǒng)一的計算處理之后,統(tǒng)一寫入存儲至 Hologres。
阿里云 Flink 結合 Hologres 的流批一體技術,統(tǒng)一了數(shù)據(jù)輸入層、實時離線計算層和數(shù)據(jù)分析存儲層,極大的提升了數(shù)據(jù)開發(fā)的效率,保證了數(shù)據(jù)的質(zhì)量。
阿里云 Flink 與 Hologres 深度集成,助力企業(yè)快速構建一站式實時數(shù)倉:
●可通過阿里云 Flink 實時寫入 Hologres,高性能寫入與更新,數(shù)據(jù)寫入即可見,無延遲,滿足實時數(shù)倉高性能低延遲寫入需求;
●可通過阿里云 Flink 的全量讀取、Binlog 讀取、CDC 讀取、全增量一體化等多種方式,讀取 Hologres 源表數(shù)據(jù),無需額外組件,統(tǒng)一計算和存儲,加速數(shù)據(jù)流轉效率;
●可通過阿里云 Flink 讀取 Hologres 維表,助力高性能維表關聯(lián)、數(shù)據(jù)打?qū)挼榷喾N應用場景;
●阿里云 Flink 與 Hologres 元數(shù)據(jù)打通,通過 Hologres Catalog,實現(xiàn)元數(shù)據(jù)自動發(fā)現(xiàn),極大提升作業(yè)開發(fā)效率和正確性。
通過阿里云 Flink 與 Hologres 的實時數(shù)倉標準解決方案,能夠支撐多種實時數(shù)倉應用場景,如實時推薦、實時風控等,滿足企業(yè)的實時分析需求。下面我們將會介紹阿里云 Flink + Hologres 的典型應用場景,助力業(yè)務更加高效的搭建實時數(shù)倉。
4.1 海量數(shù)據(jù)實時入倉
實時數(shù)倉搭建的第一步便是海量數(shù)據(jù)的實時入倉,基于阿里云 Flink CDC 可以簡單高效地將海量數(shù)據(jù)同步到實時數(shù)倉中,并能將增量數(shù)據(jù)以及表結構變更實時同步到數(shù)倉中。而整個流程只需在阿里云 Flink 上定義一條 CREATE DATABASE AS DATABASE 的 SQL 即可(詳細步驟可參考 實時入倉快速入門[4])。經(jīng)測試,對于 MySQL 中的 TPC-DS 1T 數(shù)據(jù)集,使用阿里云 Flink 64 并發(fā),只需 5 小時便能完全同步到 Hologres,TPS 約 30 萬條/秒。在增量 Binlog 同步階段,使用阿里云 Flink 單并發(fā),同步性能達到 10 萬條/秒。
數(shù)據(jù)實時入倉形成了 ODS 層的數(shù)據(jù)后,通常需要將事實數(shù)據(jù)與維度數(shù)據(jù)利用 Flink 多流 Join 的能力實時地打平成寬表,結合 Hologres 寬表極佳的多維分析性能,助力上層業(yè)務查詢提速。阿里云 Flink 支持以全增量一體化的模式讀取 Hologres 表,即先讀取全量數(shù)據(jù)再平滑切換到讀取 CDC 數(shù)據(jù),整個過程保證數(shù)據(jù)的不重不丟。因此基于阿里云 Flink 可以非常方便地實時加工和打?qū)?Hologres 的 ODS 層數(shù)據(jù),完成 DWD 層的寬表模型構建。
數(shù)據(jù)倉庫中我們通常需要關心的就是建模,數(shù)據(jù)模型通常分為四種:寬表模型、星型模型、雪花模型、星座模型(Hologres 均支持),在這里我們重點要提到的是寬表模型的建設。寬表模型通常是指將業(yè)務主體相關的指標、維表、屬性關聯(lián)在一起的模型表,也可以泛指將多個事實表和多個維度表相關聯(lián)到一起形成的寬表。
寬表建設通常的做法就是通過阿里云 Flink 的雙流 Join 來實現(xiàn),包括 Regular Join,Interval Join,Temporal Join。對于主鍵關聯(lián)的場景(即 Join 條件分別是兩條流的主鍵),我們可以將 Join 的工作下沉到 Hologres 去做,通過 Hologres 的局部更新功能來實現(xiàn)寬表 Merge,從而省去了 Flink Join 的狀態(tài)維護成本。比如廣告場景中,一個 Flink 任務處理廣告曝光數(shù)據(jù)流,統(tǒng)計每個產(chǎn)品的曝光量,以產(chǎn)品 ID 作為主鍵,更新到產(chǎn)品指標寬表中。同時,另一個 Flink 任務處理廣告點擊數(shù)據(jù)流,統(tǒng)計每個產(chǎn)品的點擊量,也以產(chǎn)品 ID 作為主鍵,更新到產(chǎn)品指標寬表中。整個過程不需要進行雙流 Join,最終 Hologres 會自己完成整行數(shù)據(jù)的組裝。基于得到的產(chǎn)品指標寬表,用戶可以方便地在 Hologres 進行廣告營銷的分析,例如計算產(chǎn)品的 CTR=點擊數(shù)/曝光數(shù)。下圖和代碼示例展示了如何從雙流 Join 改為寬表 Merge。
CREATE TABLE ods_ad_click (
product_id INT,
click_id BIGINT,
click_time TIMESTAMP
) WITH ('connector'='datahub', 'topic'='..');
CREATE TABLE ods_ad_impressions (
product_id INT,
imp_id BIGINT,
imp_time TIMESTAMP
) WITH ('connector'='datahub', 'topic'='..');
CREATE TABLE dws_ad_product (
product_id INT,
click_cnt BIGINT,
imp_cnt BIGINT,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH ('connector'='hologres','insertOrUpdate'='true');
INSERT INTO dws_ad_product (product_id, click_cnt)
SELECT product_id, COUNT(click_id) as click_cnt
FROM ods_ad_click
GROUP BY product_id;
INSERT INTO dws_ad_product (product_id, imp_cnt)
SELECT product_id, COUNT(imp_id) AS imp_cnt
FROM ods_ad_impressions
GROUP BY product_id;
使用 Hologres 寬表的 Merge 能力,不僅可以提升流作業(yè)的開發(fā)效率,還能減少流作業(yè)所需要的資源消耗,也能夠更容易的維護各個流作業(yè),讓作業(yè)之間不會相互影響。但需要注意的是,寬表 Merge 僅限于使用在主鍵關聯(lián)的場景,并不適用于數(shù)倉中常見的星型模型和雪花模型,所以在大部分場景仍需使用 Flink 的雙流 Join 來完成寬表建模。
4.4 實時維表 Lookup
在實時數(shù)倉中,在構建 DWD 層的數(shù)據(jù)過程中,一般都是通過阿里云 Flink 來讀取消息隊列比如 Datahub 上的 ODS 數(shù)據(jù),同時需要關聯(lián)維表來形成 DWD 層。在阿里云 Flink 的計算過程中,需要高效的讀取維表的能力,Hologres 可以通過高 QPS 低延遲的點查能力來滿足實現(xiàn)這類場景需求。比如我們需要通過 ODS 的數(shù)據(jù)去 Join 維表形成 DWD 層的時候,就可以利用 Hologres 提供的點查能力,在該模式中,通常使用行存表的主鍵點查模式提高維表的 Lookup 效率。具體的實現(xiàn)類似如下:
依托阿里云 Flink+Hologres 解決方案,企業(yè)可以快速構建一站式實時數(shù)倉,助力實時推薦、實時風控、實時大屏等多種業(yè)務場景,實現(xiàn)對數(shù)據(jù)的快速處理,極速探索查詢。目前該方案已在阿里巴巴內(nèi)部、眾多云上企業(yè)生產(chǎn)落地,成為實時數(shù)倉的最佳解決方案之一。
以某知名全球 TOP20 游戲公司業(yè)務為例,其通過阿里云 Flink+Hologres 實時數(shù)倉方案,替換開源 Flink+Presto+HBase+ClickHouse 架構,簡化數(shù)據(jù)處理鏈路、統(tǒng)一數(shù)倉架構、統(tǒng)一存儲、查詢性能提升 100%甚至更多,完美支撐數(shù)據(jù)分析、廣告投放、實時決策等多個場景,助力業(yè)務快速增長。
5.1 業(yè)務困難:ETL 鏈路復雜、OLAP 查詢慢
客戶原數(shù)倉架構使用全套開源組件,架構圖如下。其中開源 Flink 做 ETL 處理,處理后寫入 ClickHouse、Starocks 等 OLAP 引擎。
這套架構遇見的主要痛點有:
1、ETL 鏈路復雜
●為了解決數(shù)據(jù)實時 ETL,客戶通過 Flink CDC + Hudi 做流批一體。但由于上游業(yè)務數(shù)據(jù)經(jīng)常變更表結構,而開源 Flink CDC 缺乏 Schema Evolution 的能力,每次表結構變更都需要任務重新啟動,操作非常麻煩,浪費大量開發(fā)時間。
●Hudi 的查詢性能不滿足業(yè)務需求,還需要再加一個 Presto 做加速查詢,造成鏈路冗余。
2、OLAP 架構冗余,查詢慢
客戶主要是靠買量發(fā)行作為游戲推廣的重要手段,為了解決廣告歸因的實時決策場景對查詢加速的需要,于是部署了開源 Presto、ClickHouse、HBase 等多套集群搭建混合 OLAP 平臺。帶來的問題有:
●平臺需要維護多套集群,導致運維變得非常復雜。
●開發(fā)需要在各種 SQL 中切換,為開發(fā)團隊帶來了許多困擾。由于 ClickHouse 缺乏主鍵,在歸因分析時需要使用 Last Click 模型,帶來了大量的額外工作。
●同時 OLAP 引擎的查詢性能沒有辦法很好的滿足業(yè)務需求,沒辦法根據(jù)數(shù)據(jù)實時決策。
●數(shù)據(jù)需要在多個 OLAP 系統(tǒng)中存儲,造成存儲冗余,導致成本壓力劇增。
基于上面的痛點,客戶開始重新做技術選型,并使用阿里云 Flink+Hologres 來替換現(xiàn)有的開源數(shù)倉架構。
5.2 架構升級:阿里云 Flink+Hologres 統(tǒng)一數(shù)據(jù)存儲與服務
通過阿里云 Flink+Hologres 替換后的數(shù)據(jù)鏈路如下:
●數(shù)據(jù)源數(shù)據(jù)通過 Flink CDC 能力寫入 Kafka 做前置清洗,清洗后通過阿里云 Flink 進行 ETL 處理。
●阿里云 Flink 經(jīng)過 ETL 后的數(shù)據(jù)實時寫入 Hologres,通過 Hologres 替換了 Kafka 作為實時數(shù)倉的中間數(shù)據(jù)層,統(tǒng)一了流批存儲。
●在 Hologres 中根據(jù) ODS > DWD > DWS 層匯總加工。在 ODS 層,阿里云 Flink 訂閱 Hologres Binlog,計算后寫入 Hologres DWD 層,DWD 層在 Hologres 中匯總成 DWS 層,最后 DWS 對接上層報表和數(shù)據(jù)服務等業(yè)務。
●為了存儲的統(tǒng)一,也將原離線 Hive 數(shù)據(jù)替換成阿里云 MaxCompute,以 MaxCompute 為離線主要鏈路。因 Hologres 與 MaxCompute 的高效互通能力,Hologres 通過外表離線加速查詢 MaxCompute,并將歷史數(shù)據(jù)定期歸檔至 MaxCompute。
5.3 業(yè)務收益:架構統(tǒng)一,性能提升 100%
通過架構升級后,客戶的顯著業(yè)務收益如下:
●依托阿里云 Flink+Hologres,數(shù)據(jù)可以實時寫入 Hologres,寫入即可見,并且 Hologres 有主鍵,能夠支撐高性能的寫入更新能力,百萬級更新毫秒級延遲。
●阿里云 Flink 提供 Schema Evolution 的能力,自動感知上游表結構變更并同步 Hologres,改造后的實時 ETL 鏈路通過訂閱 Hologres Binlog 日志來完成,降低鏈路維護成本。
●通過 Hologres 統(tǒng)一了數(shù)據(jù)查詢出口,經(jīng)過客戶實測,Hologres 可以達到毫秒級延遲,相比開源 ClickHouse 性能提升 100%甚至更多,JOIN 查詢性能快 10 倍。
●升級后數(shù)倉架構變得更加靈活簡潔,統(tǒng)一了存儲,只需要一套系統(tǒng)就能滿足業(yè)務需求,降低運維壓力和運維成本。
本站所有文章、數(shù)據(jù)、圖片均來自互聯(lián)網(wǎng),一切版權均歸源網(wǎng)站或源作者所有。
如果侵犯了你的權益請來信告知我們刪除。郵箱:business@qudong.com