Samza 的一個(gè)更有趣的特點(diǎn)是有狀態(tài)的流處理。任務(wù)可以通過(guò) Samza 提供的 API 來(lái)存儲(chǔ)和查詢(xún)數(shù)據(jù)。該數(shù)據(jù)存儲(chǔ)在與流任務(wù)相同的機(jī)器上; 與通過(guò)網(wǎng)絡(luò)連接到遠(yuǎn)程數(shù)據(jù)庫(kù)相比,Samza 的本地狀態(tài)允許您以更好的性能讀寫(xiě)大量數(shù)據(jù)。Samza 將這種狀態(tài)復(fù)制到多臺(tái)機(jī)器上以實(shí)現(xiàn)容錯(cuò)(下面詳細(xì)描述)。
一些流處理作業(yè)不需要狀態(tài):如果您只需要一次轉(zhuǎn)換一個(gè)消息,或者根據(jù)某些條件過(guò)濾掉消息,則您的工作可能很簡(jiǎn)單。對(duì)任務(wù)進(jìn)程方法的每次調(diào)用都會(huì)處理一個(gè)傳入消息,每個(gè)消息都與所有其他消息無(wú)關(guān)。
然而,能夠維護(hù)狀態(tài)為復(fù)雜的流處理作業(yè)開(kāi)辟了許多可能性:加入輸入流,分組消息和聚合消息組。通過(guò)與 SQL 的類(lèi)比,查詢(xún)的 select 和 where 子句通常是無(wú)狀態(tài)的,但是連接,分組和聚合函數(shù)(如 sum 和 count)需要狀態(tài)。Samza 尚未提供更高級(jí)別的類(lèi)似 SQL 的語(yǔ)言,但它提供了可用于實(shí)現(xiàn)流聚合和連接的較低級(jí)別的原語(yǔ)。
首先,我們來(lái)看一下可以在消費(fèi)者網(wǎng)站后端看到的狀態(tài)流處理的一些簡(jiǎn)單例子。在本頁(yè)后面,我們將討論如何使用 Samza 內(nèi)置的鍵值存儲(chǔ)功能實(shí)現(xiàn)這些應(yīng)用程序。
示例:計(jì)算每個(gè)用戶(hù)每小時(shí)的頁(yè)面瀏覽量
在這種情況下,您的狀態(tài)通常由多個(gè)計(jì)數(shù)器組成,當(dāng)處理消息時(shí)會(huì)增加計(jì)數(shù)器。聚合通常限于時(shí)間窗口(例如1分鐘,1小時(shí),1天),以便您可以隨時(shí)間觀察活動(dòng)的變化。這種窗口處理對(duì)于排名和相關(guān)性是常見(jiàn)的,檢測(cè)“趨勢(shì)主題”,以及實(shí)時(shí)報(bào)告和監(jiān)視。
最簡(jiǎn)單的實(shí)現(xiàn)將這種狀態(tài)保持在內(nèi)存中(例如任務(wù)實(shí)例中的哈希映射),并在每個(gè)時(shí)間窗口的末尾將其寫(xiě)入數(shù)據(jù)庫(kù)或輸出流。但是,您需要考慮當(dāng)容器發(fā)生故障并且內(nèi)存中的狀態(tài)丟失時(shí)會(huì)發(fā)生什么。您可以通過(guò)再次處理當(dāng)前窗口中的所有消息來(lái)還原它,但如果窗口長(zhǎng)時(shí)間可能需要很長(zhǎng)時(shí)間。Samza 可以通過(guò)使?fàn)顟B(tài)容錯(cuò)而不是試圖重新計(jì)算來(lái)加速這種恢復(fù)。
示例:通過(guò) user_id 將用戶(hù)配置文件表加入到用戶(hù)設(shè)置表中,并發(fā)出連接的流
您可能會(huì)想:在流處理系統(tǒng)中加入兩個(gè)表格是否有意義?如果您的數(shù)據(jù)庫(kù)可以提供數(shù)據(jù)庫(kù)中的所有更改的日志。數(shù)據(jù)庫(kù)和更改日志流之間存在對(duì)偶性:您可以將每個(gè)數(shù)據(jù)更改發(fā)布到流中,如果從頭到尾消耗整個(gè)流,則可以重構(gòu)數(shù)據(jù)庫(kù)的全部?jī)?nèi)容。Samza 專(zhuān)為符合這一理念的數(shù)據(jù)處理工作而設(shè)計(jì)。
如果您有多個(gè)數(shù)據(jù)庫(kù)表的更改日志流,您可以編寫(xiě)一個(gè)流處理作業(yè),將每個(gè)表的最新?tīng)顟B(tài)保存在本地鍵值存儲(chǔ)中,您可以比通過(guò)對(duì)原始數(shù)據(jù)庫(kù)進(jìn)行查詢(xún)更快地訪(fǎng)問(wèn)它?,F(xiàn)在,當(dāng)一個(gè)表中的數(shù)據(jù)發(fā)生變化時(shí),可以將其與另一個(gè)表中相同鍵的最新數(shù)據(jù)相加,并輸出加入的結(jié)果。
數(shù)據(jù)規(guī)范化的幾個(gè)現(xiàn)實(shí)生活的例子基本上是這樣工作的:
這些用例中的每一個(gè)都是大量復(fù)雜的數(shù)據(jù)規(guī)范化問(wèn)題,可以被認(rèn)為是在許多輸入表上構(gòu)建物化視圖。Samza 可以有力地實(shí)施這些數(shù)據(jù)處理流水線(xiàn)。
示例:使用用戶(hù)的郵政編碼來(lái)增加一個(gè)頁(yè)面視圖流(可能允許在后期通過(guò)郵政編碼進(jìn)行聚合)
將邊信息連接到實(shí)時(shí) Feed 是流處理的經(jīng)典用途。這在廣告,相關(guān)性排名,欺詐檢測(cè)等領(lǐng)域尤為常見(jiàn)。諸如頁(yè)面瀏覽的活動(dòng)事件通常僅包括少量屬性,例如觀看者的 ID 和觀看的項(xiàng)目,但不包括觀看者的詳細(xì)屬性和所查看的項(xiàng)目,例如用戶(hù)的郵政編碼。如果要通過(guò)查看器或查看項(xiàng)目的屬性聚合流,則需要分別與 users 表或 items 表一起加入。
在數(shù)據(jù)倉(cāng)庫(kù)術(shù)語(yǔ)中,您可以將原始事件流視為中心事實(shí)表中的行,這些行需要與維度表相結(jié)合,以便您可以在分析中使用維度的屬性。
示例:將廣告點(diǎn)擊次數(shù)加入到廣告展示流中(將廣告展示時(shí)間的信息鏈接到點(diǎn)擊點(diǎn)擊的信息)
流連接對(duì)于“幾乎對(duì)齊”的流很有用,您希望在多個(gè)輸入流中接收相關(guān)事件,并且您希望將它們組合成一個(gè)輸出事件。您不能同時(shí)依賴(lài)到達(dá)流處理器的事件,但您可以設(shè)置允許事件擴(kuò)展的最長(zhǎng)時(shí)間。
為了執(zhí)行流之間的連接,您的作業(yè)需要緩存要加入的時(shí)間窗口的事件。對(duì)于短時(shí)間窗口,您可以在內(nèi)存中進(jìn)行此操作(如果機(jī)器發(fā)生故障,則可能會(huì)丟失事件)。您還可以使用 Samza 的狀態(tài)存儲(chǔ)來(lái)緩沖事件,這樣可以緩沖更多的消息,而不是內(nèi)存中的內(nèi)容。
連接和聚合有許多變化,但大多數(shù)是上述模式的變化和組合。
那么系統(tǒng)如何支持這種狀態(tài)處理呢?我們將通過(guò)描述我們?cè)谄渌魈幚硐到y(tǒng)中看到的內(nèi)容,然后描述 Samza 所做的工作。
在學(xué)術(shù)流處理系統(tǒng)中常見(jiàn)的一種簡(jiǎn)單方法是定期將任務(wù)的整體內(nèi)存數(shù)據(jù)保存到持久存儲(chǔ)中。如果內(nèi)存中狀態(tài)僅由幾個(gè)值組成,則此方法效果很好。但是,您必須在每個(gè)檢查點(diǎn)上存儲(chǔ)完整的任務(wù)狀態(tài),這隨著任務(wù)狀態(tài)的增長(zhǎng)而變得越來(lái)越昂貴。不幸的是,連接和聚合的許多非平凡用例有大量的狀態(tài) - 通常是很多千兆字節(jié)。這使得國(guó)家完全不切實(shí)際。
一些學(xué)術(shù)系統(tǒng)除了完整的檢查點(diǎn)之外還會(huì)產(chǎn)生差異,如果只有一些狀態(tài)自最后一個(gè)檢查點(diǎn)以來(lái)發(fā)生變化,則它們會(huì)更小。Storm的Trident抽象類(lèi)似地保持內(nèi)存中的緩存狀態(tài),并定期向遠(yuǎn)程存儲(chǔ)(如 Cassandra)寫(xiě)入任何更改。但是,如果大多數(shù)狀態(tài)保持不變,則此優(yōu)化將有所幫助。在一些使用情況下,例如流連接,在該狀態(tài)下有很多的流失是正常的,所以這種技術(shù)本質(zhì)上會(huì)降低為每個(gè)消息發(fā)出遠(yuǎn)程數(shù)據(jù)庫(kù)請(qǐng)求(見(jiàn)下文)。
用于狀態(tài)處理的另一種常見(jiàn)模式是將狀態(tài)存儲(chǔ)在外部數(shù)據(jù)庫(kù)或鍵值存儲(chǔ)中。傳統(tǒng)的數(shù)據(jù)庫(kù)復(fù)制可用于使數(shù)據(jù)庫(kù)容錯(cuò)。架構(gòu)看起來(lái)像這樣:
Samza 允許這種處理方式 - 沒(méi)有任何東西阻止您查詢(xún)作業(yè)中的遠(yuǎn)程數(shù)據(jù)庫(kù)或服務(wù)。然而,遠(yuǎn)程數(shù)據(jù)庫(kù)可能會(huì)對(duì)有狀態(tài)流處理有問(wèn)題:
Samza 允許任務(wù)以與上述方法不同的方式維護(hù)狀態(tài):
想象一下,您需要一個(gè)遠(yuǎn)程數(shù)據(jù)庫(kù),對(duì)其進(jìn)行分區(qū)以匹配流處理作業(yè)中的任務(wù)數(shù)量,并將每個(gè)分區(qū)與其任務(wù)共同定位。結(jié)果如下:
如果機(jī)器故障,則該機(jī)器上運(yùn)行的所有任務(wù)及其數(shù)據(jù)庫(kù)分區(qū)都將丟失。為了使它們高度可用,對(duì)數(shù)據(jù)庫(kù)分區(qū)的所有寫(xiě)入都將復(fù)制到持久的更新日志(通常為 Kafka)?,F(xiàn)在,當(dāng)機(jī)器發(fā)生故障時(shí),我們可以在另一臺(tái)機(jī)器上重新啟動(dòng)任務(wù),并使用此更改日志來(lái)恢復(fù)數(shù)據(jù)庫(kù)分區(qū)的內(nèi)容。
請(qǐng)注意,每個(gè)任務(wù)只能訪(fǎng)問(wèn)自己的數(shù)據(jù)庫(kù)分區(qū),而不是任何其他任務(wù)的分區(qū)。這很重要:當(dāng)您通過(guò)提供更多的計(jì)算資源來(lái)擴(kuò)展您的工作時(shí),Samza 需要將任務(wù)從一臺(tái)機(jī)器移到另一臺(tái)機(jī)器。通過(guò)給每個(gè)任務(wù)自己的狀態(tài),任務(wù)可以重新定位而不影響作業(yè)的操作。如果需要,您可以重新分配流,以使特定數(shù)據(jù)庫(kù)分區(qū)的所有消息都路由到同一個(gè)任務(wù)實(shí)例。
日志壓縮在更改日志主題的后臺(tái)運(yùn)行,并確保更改日志不會(huì)無(wú)限期增長(zhǎng)。如果您在存儲(chǔ)中多次覆蓋相同的值,則日志壓縮僅保留最近的值,并拋出日志中的任何舊值。如果從商店中刪除一個(gè)項(xiàng)目,則日志壓縮也會(huì)將其從日志中刪除。通過(guò)正確的調(diào)整,更改日志不會(huì)比數(shù)據(jù)庫(kù)本身大得多。
通過(guò)這種體系結(jié)構(gòu),Samza 允許任務(wù)能夠維持大量的容錯(cuò)狀態(tài),性能幾乎與純內(nèi)存實(shí)現(xiàn)一樣好。只有一些限制:
沒(méi)有什么可以阻止您使用外部數(shù)據(jù)庫(kù),但是對(duì)于許多用例,Samza 的本地狀態(tài)是啟用狀態(tài)流處理的強(qiáng)大工具。
任何存儲(chǔ)引擎都可以插入 Samza,如下所述。開(kāi)箱即用,Samza 搭載了一個(gè)使用 JNI API 構(gòu)建在 RocksDB 上的鍵值存儲(chǔ)實(shí)現(xiàn)。
RocksDB 有幾個(gè)不錯(cuò)的屬性。它的內(nèi)存分配不在 Java 堆中,這使得它比基于 Java 的存儲(chǔ)引擎更加內(nèi)存高效,并且不太容易進(jìn)行垃圾回收暫停。對(duì)于適合內(nèi)存的小型數(shù)據(jù)集來(lái)說(shuō),速度非??? 大于內(nèi)存的數(shù)據(jù)集較慢但仍然可能。它是日志結(jié)構(gòu),允許非??焖俚膶?xiě)入。它還包括對(duì)塊壓縮的支持,這有助于減少 I / O 和內(nèi)存使用。
Samza 在 RocksDB 前面增加了一個(gè)內(nèi)存緩存層,避免了經(jīng)常訪(fǎng)問(wèn)的對(duì)象和批次寫(xiě)入的反序列化成本。如果快速連續(xù)更新多個(gè)相同的密鑰,則批處理將這些更新合并為單個(gè)寫(xiě)入。當(dāng)任務(wù)提交時(shí),寫(xiě)入將刷新到更改日志。
要在作業(yè)中使用鍵值存儲(chǔ),請(qǐng)將以下內(nèi)容添加到作業(yè)配置中:
# Use the key-value store implementation for a store called "my-store"
stores.my-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
# Use the Kafka topic "my-store-changelog" as the changelog stream for this store.
# This enables automatic recovery of the store after a failure. If you don't
# configure this, no changelog stream will be generated.
stores.my-store.changelog=kafka.my-store-changelog
# Encode keys and values in the store as UTF-8 strings.
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
stores.my-store.key.serde=string
stores.my-store.msg.serde=string
有關(guān) serde 選項(xiàng)的更多信息,請(qǐng)參閱序列化部分。
這是一個(gè)簡(jiǎn)單的例子,將每個(gè)傳入的消息寫(xiě)入商店:
public class MyStatefulTask implements StreamTask, InitableTask {
private KeyValueStore<String, String> store;
public void init(Config config, TaskContext context) {
this.store = (KeyValueStore<String, String>) context.getStore("my-store");
}
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
store.put((String) envelope.getKey(), (String) envelope.getMessage());
}
}
以下是完整的鍵值存儲(chǔ)API:
public interface KeyValueStore<K, V> {
V get(K key);
void put(K key, V value);
void putAll(List<Entry<K,V>> entries);
void delete(K key);
KeyValueIterator<K,V> range(K from, K to);
KeyValueIterator<K,V> all();
}
配置參考中記錄了鍵值存儲(chǔ)的其他配置屬性。
目前,Samza 提供了一種狀態(tài)存儲(chǔ)工具,可以將狀態(tài)存儲(chǔ)從更改日志流恢復(fù)到用戶(hù)指定的目錄以進(jìn)行重用和調(diào)試。
samza-example/target/bin/state-storage-tool.sh \
--config-path=file:///path/to/job/config.properties \
--path=directory/to/put/state/stores
Samza 還提供了一個(gè)工具來(lái)讀取正在運(yùn)行的工作的 RocksDB 的價(jià)值。
samza-example/target/bin/read-rocksdb-tool.sh \
--config-path=file:///path/to/job/config.properties \
--db-path=/tmp/nm-local-dir/state/test-state/Partition_0 \
--db-name=test-state \
--string-key=a,b,c
限制:
RocksDbKeyValueReader kvReader = new RocksDbKeyValueReader(dbName, pathOfdb, config)
Object value = kvReader.get(key)
RocksDB 有幾個(gè)粗糙的邊緣。建議您閱讀 RocksDB 調(diào)整指南。需要注意的其他一些注意事項(xiàng)是:
在本節(jié)前面,我們討論了有狀態(tài)流處理的一些示例用例。我們來(lái)看看如何使用諸如 Samza 的 RocksDB 商店等鍵值存儲(chǔ)引擎實(shí)現(xiàn)這些功能。
示例:計(jì)算每個(gè)用戶(hù)每小時(shí)的頁(yè)面瀏覽量
實(shí)施:您需要兩個(gè)處理階段。
請(qǐng)注意,此工作有效地停留在小時(shí)標(biāo)記以輸出其結(jié)果。這對(duì) Samza 是完全正確的,因?yàn)閽呙桄I值存儲(chǔ)的內(nèi)容是相當(dāng)快的。當(dāng)工作正在做這個(gè)小時(shí)工作時(shí),輸入流被緩沖。
示例:通過(guò) user_id 將用戶(hù)配置文件表加入到用戶(hù)設(shè)置表中,并發(fā)出連接的流
實(shí)現(xiàn):作業(yè)訂閱用戶(hù)配置文件數(shù)據(jù)庫(kù)和用戶(hù)設(shè)置數(shù)據(jù)庫(kù)的更改流,都由 user_id 分區(qū)。該作業(yè)保留一個(gè)由 user_id 鍵入的鍵值存儲(chǔ)區(qū),其中包含最新的配置文件記錄和每個(gè) user_id 的最新設(shè)置記錄。當(dāng)一個(gè)新的事件從兩個(gè)流進(jìn)來(lái)時(shí),作業(yè)將查找其存儲(chǔ)中的當(dāng)前值,更新相應(yīng)的字段(取決于是否是配置文件更新或設(shè)置更新),并將新加入的記錄寫(xiě)回商店。存儲(chǔ)的更新日志加倍為任務(wù)的輸出流。
示例:使用用戶(hù)的郵政編碼來(lái)增加一個(gè)頁(yè)面視圖流(可能允許在后期通過(guò)郵政編碼進(jìn)行聚合)
實(shí)施:作業(yè)訂閱用戶(hù)配置文件更新流和頁(yè)面瀏覽事件流。兩個(gè)流都必須用 user_id 進(jìn)行分區(qū)。該作業(yè)維護(hù)一個(gè)鍵值存儲(chǔ)區(qū),其中 key 是 user_id,該值是用戶(hù)的郵政編碼。每當(dāng)作業(yè)接收到配置文件更新時(shí),它將從配置文件更新中提取用戶(hù)的新郵政編碼,并將其寫(xiě)入商店。每次收到頁(yè)面瀏覽事件時(shí),它會(huì)從商店中讀取該用戶(hù)的郵政編碼,并使用添加的郵政編碼字段發(fā)送頁(yè)面查看事件。
如果下一階段需要通過(guò)郵政編碼進(jìn)行匯總,則可以將郵政編碼用作作業(yè)輸出流的分區(qū)鍵。這確保了相同郵政編碼的所有事件都被發(fā)送到同一流分區(qū)。
示例:將廣告點(diǎn)擊次數(shù)加入到廣告展示流中(將廣告展示時(shí)間的信息鏈接到點(diǎn)擊點(diǎn)擊的信息)
在此示例中,我們假設(shè)廣告的每次展示都有唯一的標(biāo)識(shí)符,例如 UUID,并且相同的標(biāo)識(shí)符包含在展示和點(diǎn)擊事件中。該標(biāo)識(shí)符用作連接密鑰。
實(shí)施:按照展示 ID 或用戶(hù) ID 分配廣告點(diǎn)擊和廣告展示流(假設(shè)具有相同展示 ID 的兩個(gè)事件始終具有相同的用戶(hù) ID)。該任務(wù)保留兩個(gè)商店,一個(gè)包含點(diǎn)擊事件,一個(gè)包含展示事件,使用展示 ID 作為兩個(gè)商店的關(guān)鍵。當(dāng)作業(yè)收到點(diǎn)擊事件時(shí),它會(huì)在展示商店中查找相應(yīng)的展示,反之亦然。如果找到匹配項(xiàng),則發(fā)送連接對(duì),并刪除條目。如果沒(méi)有找到匹配項(xiàng),則將事件寫(xiě)入相應(yīng)的商店。定期地,作業(yè)將掃描兩個(gè)商店,并刪除在連接的時(shí)間窗口內(nèi)未匹配的任何舊事件。
Samza 的容錯(cuò)機(jī)制(將本地商店的寫(xiě)入發(fā)送到復(fù)制的更改日志)與存儲(chǔ)引擎的數(shù)據(jù)結(jié)構(gòu)和查詢(xún) API 完全分離。雖然鍵值存儲(chǔ)引擎對(duì)于通用處理是有利的,但您可以通過(guò)實(shí)施StorageEngine接口輕松地為其他類(lèi)型的查詢(xún)添加自己的存儲(chǔ)引擎。Samza 的模式特別適用于與流任務(wù)相同的過(guò)程中作為庫(kù)運(yùn)行的嵌入式存儲(chǔ)引擎。
其他存儲(chǔ)引擎的一些想法可能是有用的:持久堆(用于運(yùn)行前N個(gè)查詢(xún)),近似算法(如bloom過(guò)濾器和超文本記錄)或全文索引(如Lucene)。(補(bǔ)丁歡迎?。?/p>
如關(guān)于檢查點(diǎn)的部分所述,Samza 目前只支持在出現(xiàn)故障的情況下至少提供一次交付保證(有時(shí)稱(chēng)為“保證交貨”)。這意味著如果任務(wù)失敗,則不會(huì)丟失任何消息,但可能會(huì)重新傳遞某些消息。
對(duì)于上面討論的許多有狀態(tài)處理使用情況,這不是一個(gè)問(wèn)題:如果消息對(duì)狀態(tài)的影響是冪等的,則對(duì)同一消息進(jìn)行多次處理是安全的。例如,如果商店包含每個(gè)用戶(hù)的郵政編碼,則兩次處理相同的配置文件更新沒(méi)有任何效果,因?yàn)橹貜?fù)的更新不會(huì)更改郵政編碼。
但是,對(duì)于非冪等操作(如計(jì)數(shù)),至少一次交貨保證可能會(huì)給出不正確的結(jié)果。如果Samza任務(wù)失敗并重新啟動(dòng),則可能會(huì)在發(fā)生故障之前不久處理的一些消息進(jìn)行雙重計(jì)數(shù)。我們計(jì)劃在未來(lái)的Samza發(fā)行版中解決這個(gè)限制。
更多建議: