當(dāng)前無論是傳統(tǒng)企業(yè)還是互聯(lián)網(wǎng)公司對(duì)大數(shù)據(jù)實(shí)時(shí)分析和處理的要求越來越高,數(shù)據(jù)越實(shí)時(shí)價(jià)值越大,面向毫秒~秒級(jí)的實(shí)時(shí)大數(shù)據(jù)計(jì)算場(chǎng)景,Spark和Flink各有所長(zhǎng)。CarbonData是一種高性能大數(shù)據(jù)存儲(chǔ)方案,已在20+企業(yè)生產(chǎn)環(huán)境上部署應(yīng)用,其中最大的單一集群數(shù)據(jù)規(guī)模達(dá)到幾萬億。
為幫助開發(fā)者更深入的了解這三個(gè)大數(shù)據(jù)開源技術(shù)及其實(shí)際應(yīng)用場(chǎng)景,9月8日,InfoQ聯(lián)合華為云舉辦了一場(chǎng)實(shí)時(shí)大數(shù)據(jù)Meetup,集結(jié)了來自Databricks、華為及美團(tuán)點(diǎn)評(píng)的大咖級(jí)嘉賓前來分享。
本文整理了其中的部分精彩內(nèi)容,同時(shí),作為本次活動(dòng)的承辦方,InfoQ整理上傳了所有講師的演講PPT,感興趣的同學(xué)可以下載講師PPT獲取完整資料 。
Spark Structured Streaming特性介紹 (講師PPT下載)
作為Spark Structured Streaming最核心的開發(fā)人員、Databricks工程師,Tathagata Das(以下簡(jiǎn)稱“TD”)在開場(chǎng)演講中介紹了Structured Streaming的基本概念,及其在存儲(chǔ)、自動(dòng)流化、容錯(cuò)、性能等方面的特性,在事件時(shí)間的處理機(jī)制,最后帶來了一些實(shí)際應(yīng)用場(chǎng)景。
首先,TD對(duì)流處理所面對(duì)的問題和概念做了清晰的講解。TD提到,因?yàn)榱魈幚砭哂腥缦嘛@著的復(fù)雜性特征,所以很難建立非常健壯的處理過程:
? 一是數(shù)據(jù)有各種不同格式(Jason、Avro、二進(jìn)制)、臟數(shù)據(jù)、不及時(shí)且無序;
? 二是復(fù)雜的加載過程,基于事件時(shí)間的過程需要支持交互查詢,和機(jī)器學(xué)習(xí)組合使用;
? 三是不同的存儲(chǔ)系統(tǒng)和格式(SQL、NoSQL、Parquet等),要考慮如何容錯(cuò)。
因?yàn)榭梢赃\(yùn)行在Spark SQL引擎上,Spark Structured Streaming天然擁有較好的性能、良好的擴(kuò)展性及容錯(cuò)性等Spark優(yōu)勢(shì)。除此之外,它還具備豐富、統(tǒng)一、高層次的API,因此便于處理復(fù)雜的數(shù)據(jù)和工作流。再加上,無論是Spark自身,還是其集成的多個(gè)存儲(chǔ)系統(tǒng),都有豐富的生態(tài)圈。這些優(yōu)勢(shì)也讓Spark Structured Streaming得到更多的發(fā)展和使用。
流的定義是一種無限表(unbounded table),把數(shù)據(jù)流中的新數(shù)據(jù)追加在這張無限表中,而它的查詢過程可以拆解為幾個(gè)步驟,例如可以從Kafka讀取JSON數(shù)據(jù),解析JSON數(shù)據(jù),存入結(jié)構(gòu)化Parquet表中,并確保端到端的容錯(cuò)機(jī)制。其中的特性包括:
? 支持多種消息隊(duì)列,比如Files/Kafka/Kinesis等。
? 可以用join(), union()連接多個(gè)不同類型的數(shù)據(jù)源。
? 返回一個(gè)DataFrame,它具有一個(gè)無限表的結(jié)構(gòu)。
? 你可以按需選擇SQL(BI分析)、DataFrame(數(shù)據(jù)科學(xué)家分析)、DataSet(數(shù)據(jù)引擎),它們有幾乎一樣的語義和性能。
? 把Kafka的JSON結(jié)構(gòu)的記錄轉(zhuǎn)換成String,生成嵌套列,利用了很多優(yōu)化過的處理函數(shù)來完成這個(gè)動(dòng)作,例如from_json(),也允許各種自定義函數(shù)協(xié)助處理,例如Lambdas, flatMap。
? 在Sink步驟中可以寫入外部存儲(chǔ)系統(tǒng),例如Parquet。在Kafka sink中,支持foreach來對(duì)輸出數(shù)據(jù)做任何處理,支持事務(wù)和exactly-once方式。
? 支持固定時(shí)間間隔的微批次處理,具備微批次處理的高性能性,支持低延遲的連續(xù)處理(Spark 2.3),支持檢查點(diǎn)機(jī)制(check point)。
? 秒級(jí)處理來自Kafka的結(jié)構(gòu)化源數(shù)據(jù),可以充分為查詢做好準(zhǔn)備。
Spark SQL把批次查詢轉(zhuǎn)化為一系列增量執(zhí)行計(jì)劃,從而可以分批次地操作數(shù)據(jù)。
在容錯(cuò)機(jī)制上,Structured Streaming采取檢查點(diǎn)機(jī)制,把進(jìn)度offset寫入stable的存儲(chǔ)中,用JSON的方式保存支持向下兼容,允許從任何錯(cuò)誤點(diǎn)(例如自動(dòng)增加一個(gè)過濾來處理中斷的數(shù)據(jù))進(jìn)行恢復(fù)。這樣確保了端到端數(shù)據(jù)的exactly-once。
在性能上,Structured Streaming重用了Spark SQL優(yōu)化器和Tungsten引擎,而且成本降低了3倍!!更多的信息可以參考作者的blog。
Structured Streaming隔離處理邏輯采用的是可配置化的方式(比如定制JSON的輸入數(shù)據(jù)格式),執(zhí)行方式是批處理還是流查詢很容易識(shí)別。同時(shí)TD還比較了批處理、微批次-流處理、持續(xù)流處理三種模式的延遲性、吞吐性和資源分配情況。
在時(shí)間窗口的支持上,Structured Streaming支持基于事件時(shí)間(event-time)的聚合,這樣更容易了解每隔一段時(shí)間發(fā)生的事情。同時(shí)也支持各種用戶定義聚合函數(shù)(User Defined Aggregate Function,UDAF)。另外,Structured Streaming可通過不同觸發(fā)器間分布式存儲(chǔ)的狀態(tài)來進(jìn)行聚合,狀態(tài)被存儲(chǔ)在內(nèi)存中,歸檔采用HDFS的Write Ahead Log (WAL)機(jī)制。當(dāng)然,Structured Streaming還可自動(dòng)處理過時(shí)的數(shù)據(jù),更新舊的保存狀態(tài)。因?yàn)闅v史狀態(tài)記錄可能無限增長(zhǎng),這會(huì)帶來一些性能問題,為了限制狀態(tài)記錄的大小,Spark使用水印(watermarking)來刪除不再更新的舊的聚合數(shù)據(jù)。允許支持自定義狀態(tài)函數(shù),比如事件或處理時(shí)間的超時(shí),同時(shí)支持Scala和Java。
TD在演講中也具體舉例了流處理的應(yīng)用情況。在蘋果的信息安全平臺(tái)中,每秒將產(chǎn)生有百萬級(jí)事件,Structured Streaming可以用來做缺陷檢測(cè),下圖是該平臺(tái)架構(gòu):
在該架構(gòu)中,一是可以把任意原始日志通過ETL加載到結(jié)構(gòu)化日志庫中,通過批次控制可很快進(jìn)行災(zāi)難恢復(fù);二是可以連接很多其它的數(shù)據(jù)信息(DHCP session,緩慢變化的數(shù)據(jù));三是提供了多種混合工作方式:實(shí)時(shí)警告、歷史報(bào)告、ad-hoc分析、統(tǒng)一的API允許支持各種分析(例如實(shí)時(shí)報(bào)警系統(tǒng))等,支持快速部署。四是達(dá)到了百萬事件秒級(jí)處理性能。
華為大數(shù)據(jù)架構(gòu)師蔡強(qiáng)在以CarbonData為主題的演講中主要介紹了企業(yè)對(duì)數(shù)據(jù)應(yīng)用的挑戰(zhàn)、存儲(chǔ)產(chǎn)品的選型決策,并深入講解了CarbonData的原理及應(yīng)用,以及對(duì)未來的規(guī)劃等。
企業(yè)中包含多種數(shù)據(jù)應(yīng)用,從商業(yè)智能、批處理到機(jī)器學(xué)習(xí),數(shù)據(jù)增長(zhǎng)快速、數(shù)據(jù)結(jié)構(gòu)復(fù)雜的特征越來越明顯。在應(yīng)用集成上,需要也越來越多,包括支持SQL的標(biāo)準(zhǔn)語法、JDBC和ODBC接口、靈活的動(dòng)態(tài)查詢、OLAP分析等。
針對(duì)當(dāng)前大數(shù)據(jù)領(lǐng)域分析場(chǎng)景需求各異而導(dǎo)致的存儲(chǔ)冗余問題,CarbonData提供了一種新的融合數(shù)據(jù)存儲(chǔ)方案,以一份數(shù)據(jù)同時(shí)支持支持快速過濾查找和各種大數(shù)據(jù)離線分析和實(shí)時(shí)分析,并通過多級(jí)索引、字典編碼、預(yù)聚合、動(dòng)態(tài)Partition、實(shí)時(shí)數(shù)據(jù)查詢等特性提升了IO掃描和計(jì)算性能,實(shí)現(xiàn)萬億數(shù)據(jù)分析秒級(jí)響應(yīng)。蔡強(qiáng)在演講中對(duì)CarbonData的設(shè)計(jì)思路做了詳細(xì)講解。
? 在數(shù)據(jù)統(tǒng)一存儲(chǔ)上:通過數(shù)據(jù)共享減少孤島和冗余,支持多種業(yè)務(wù)場(chǎng)景以產(chǎn)生更大價(jià)值。
? 大集群:區(qū)別于以往的單機(jī)系統(tǒng),用戶希望新的大數(shù)據(jù)存儲(chǔ)方案能應(yīng)對(duì)日益增多的數(shù)據(jù),隨時(shí)可以通過增加資源的方式橫向擴(kuò)展,無限擴(kuò)容。
? 易集成:提供標(biāo)準(zhǔn)接口,新的大數(shù)據(jù)方案與企業(yè)已采購的工具和IT系統(tǒng)要能無縫集成,支撐老業(yè)務(wù)快速遷移。另外要與大數(shù)據(jù)生態(tài)中的各種軟件能無縫集成。
? 高性能:計(jì)算與存儲(chǔ)分離,支持從GB到PB大規(guī)模數(shù)據(jù),十萬億數(shù)據(jù)秒級(jí)響應(yīng)。
? 開放生態(tài):與大數(shù)據(jù)生態(tài)無縫集成,充分利用云存儲(chǔ)和Hadoop集群的優(yōu)勢(shì)。
數(shù)據(jù)布局如下圖,CarbonData用一個(gè)HDFS文件構(gòu)成一個(gè)Block,包含若干Blocklet作為文件內(nèi)的列存數(shù)據(jù)塊,F(xiàn)ile Header/Fille Footer提供元數(shù)據(jù)信息,內(nèi)置Blocklet索引以及Blocklet級(jí)和Page級(jí)的統(tǒng)計(jì)信息,壓縮編碼采用RLE、自適應(yīng)編碼、Snappy/Zstd壓縮,數(shù)據(jù)類型支持所有基礎(chǔ)和復(fù)雜類型:
Carbon表支持索引,支持Segment級(jí)(注:一個(gè)批次數(shù)據(jù)導(dǎo)入為一個(gè)segment)的讀寫和數(shù)據(jù)靈活管理,如按segment進(jìn)行數(shù)據(jù)老化和查詢等,文件布局如下:
? Spark Driver將集中式的索引存在內(nèi)存中,根據(jù)索引快速過濾數(shù)據(jù),Hive metastore存儲(chǔ)表的元數(shù)據(jù)(表的信息等)。
? 一次Load/Insert對(duì)應(yīng)生成一個(gè)Segment, 一個(gè)Segment包含多個(gè)Shard, 一個(gè)Shard就是一臺(tái)機(jī)器上導(dǎo)入的多個(gè)數(shù)據(jù)文件和一個(gè)索引文件組成。每個(gè)Segment 包含數(shù)據(jù)和元數(shù)據(jù)(CarbonData File和Index文件),不同的Segment可以有不同的文件格式,支持更多其他格式(CSV, Parquet),采用增量的數(shù)據(jù)管理方式,處理比分區(qū)管理的速度快很多。
查詢時(shí)會(huì)將filter和projection下推到DataMap(數(shù)據(jù)地圖)。它的執(zhí)行模型如下:
? 主要包括Index DataMap和MV DataMap兩種不同DataMap,三級(jí)Index索引架構(gòu)減少了Spark Task數(shù)和磁盤IO,MV可以進(jìn)行預(yù)匯聚和join的操作,用數(shù)據(jù)入庫時(shí)間換取查詢時(shí)間。
? DataMap根據(jù)實(shí)際數(shù)據(jù)量大小選擇集中式或者分布式存儲(chǔ),以避免大內(nèi)存問題。
? DataMap支持內(nèi)存或磁盤的存儲(chǔ)方式。
最后,蔡強(qiáng)也分析了CarbonData的具體使用和未來計(jì)劃。
在使用上,CarbonData提供了非常豐富的功能特性,用戶可權(quán)衡入庫時(shí)間、索引粒度和查詢性能,增量入庫等方面來靈活設(shè)置。表操作與SparkSQL深度集成,支持高檢測(cè)功能的可配置Table Properties。語法和API保持SparkSQL一致,支持并發(fā)導(dǎo)入、更新、合并和查詢。DataMap類似一張視圖表,可用于加速Carbon表查詢,通過datamap_provider支持Bloomfilter、Pre-aggregate、MV三種類型的地圖。流式入庫與Structured Streaming集成,實(shí)現(xiàn)準(zhǔn)實(shí)時(shí)分析。支持同時(shí)查詢實(shí)時(shí)數(shù)據(jù)和歷史數(shù)據(jù),支持預(yù)聚合并自動(dòng)刷新,聚合查詢會(huì)先檢查聚合操作,從而取得數(shù)據(jù)返回客戶端。準(zhǔn)實(shí)時(shí)查詢,提供了Stream SQL標(biāo)準(zhǔn)接口,建立臨時(shí)的Source表和Sink表。支持類似Structured Streaming(結(jié)構(gòu)化流)的邏輯語句和調(diào)度作業(yè)。
CarbonData從2016年進(jìn)入孵化器到2017年畢業(yè),一共發(fā)布了10多個(gè)穩(wěn)定的版本,今年9月份將會(huì)迎來1.5.0版的發(fā)布。1.5.0將支持Spark File Format,增強(qiáng)對(duì)S3上數(shù)據(jù)的支持,支持Spark2.3和Hadoop3.1以及復(fù)雜類型的支持。而1.5.1主要會(huì)對(duì)MV支持增量的加載,增強(qiáng)對(duì)DataMap的選擇,以及增強(qiáng)了對(duì)Presto的支持。