RxJS 筆記 - Pipeable / Flattening Operators
在 RxJS 中,Pipeable Operators 與 Flattening Operators 是改變資料流與控制訂閱行為的核心工具。
Pipeable Operators 基礎觀念
我們先來看一個最基本的 Subscription 情境,在不使用任何 Pipeable Operators 下,有一個 source Observable,當我們對它呼叫 subscribe 之後,裡面的邏輯就會開始執行,並依序發送各種 notifications(例如 next、error、complete)。這些通知會被傳遞給我們的 Observer。
如果我們在 source Observable 和 Observer 之間加入一個 Pipeable Operator。 Operator 會把 source Observable 當作輸入,並回傳一個新的 Observable。因此,我們訂閱的其實是這個「加上 operator 的新 Observable」。
Pipeable Operators 會 接收一個 Observable 作為輸入,並 回傳一個新的 Observable,而不會改變原始來源。透過連接多個 Operator,我們可以逐步改變資料流:
當我們訂閱這樣的 Observable 時,operator 會先攔截資料,然後在背後訂閱原本的 source。每一個資料(通知)都會先經過 operator 處理,決定要不要傳下去,或是把它改成別的內容。我們可以把很多 operators 串在一起,因為一個 operator 的結果會變成下一個 operator 的輸入。最後送到 Observer 的資料,可能跟最一開始 source 發的不一樣。每個 operator 都會處理收到的資料,再決定怎麼傳給下一個。
Pipeable Operators 範例
1. filter
當 source 發出一個值時,這個 operator 會依照我們設定的條件,決定是否將它傳遞到輸出。
2. map
接著來看另一個常用的 operator —— map。它就像 JavaScript 陣列裡的 map 一樣:對於每個輸入值,都可以回傳一個新的值。這個新值可以根據來源值計算,也可以是完全不同的值。
3. tap
tap operator 像是一個「旁觀者」,能在不中斷通知流程的情況下執行 side effect。 它常用於我們串接多個 operators 時,想在管線中的某個階段觀察通知,例如用 console.log 來查看發生了什麼。
NOTE: 需要注意的是,tap 不會改變任何通知,所有通知都會原樣傳遞,只是額外執行 side effect 而已。
4. debounceTime
debounceTime 讓 Observable 多了一個「時間維度」,用來防抖 (debouncing)。 舉例來說,如果設定時間為 2 秒,而 source 在短時間內快速發出三個值, debounceTime 會等到發射暫停 2 秒後,才重新輸出最後那個值。
常用於:
- 避免密集的重新計算造成效能問題
- 降低對伺服器發送 HTTP 請求的頻率
這在某些情境下特別有用:
- 避免效能問題:如果更新值需要大量計算,可能造成畫面延遲。使用 debounceTime 可以減少觸發次數,讓介面更順暢。
- 降低伺服器壓力:例如調整一個 slider,若沒有 debounce,每次事件觸發都會送出一個 HTTP 請求,短時間內可能產生數十個請求。有了 debounceTime,就能只送出最後的結果。
NOTE: debounceTime 會只保留「最後一次輸入」並延遲輸出,非常適合避免過度頻繁的觸發。
5. catchError
catchError 用來處理錯誤,並在來源 Observable 失敗時提供一個後備來源 (fallback Observable)。
- 它只針對 error 通知。
- 當來源 Observable 發生錯誤時,catchError 不會重新發送該錯誤,而是改為訂閱我們提供的 fallback Observable。
- 後續由 fallback Observable 發送的所有通知,會傳遞給輸出。
但有時候,我們可能不想提供任何 fallback value,而是只想捕捉錯誤並什麼都不顯示。這時可以用 RxJS 內建的 EMPTY Observable。顧名思義,EMPTY 什麼都不會發出,一旦被訂閱就會立即完成,而不會送出任何值。可使用 EMPTY 表示錯誤後直接完成而不發送值:
Flattening Operators
Flattening Operators 概念有點像 catchError:在發生錯誤時會去訂閱一個新的 Observable。 不同的是,Flattening Operators 會在每次收到新的值 (next) 時建立新的訂閱。
這特別適合用在 ex: 每次使用者更新輸入值,就要重新向伺服器查詢的情境。
RxJS 提供了幾種 Flattening Operators:
- concatMap(最安全的選擇,建議不確定時先用它)
- switchMap
- mergeMap
- exhaustMap
它們主要的差異在於 如何處理併發 (concurrency),但核心概念相同:
Flattening Operators:
- 會根據來源 Observable 發送的每個值 (next),建立一個新的 inner Subscription。
- 將這些 inner Observable 發出的值傳遞給輸出。
- 只要來源還在發送,Flattening Operator 就會不斷為新值建立訂閱。
Flattening Operators 會為來源 Observable 的每個值建立內部訂閱,並將其輸出「攤平」到同一個資料流。常見的有
concatMap、switchMap、mergeMap與exhaustMap。
1. concatMap
假設我們的 source Observable 代表使用者的搜尋輸入,每次輸入變更時,我們用 concatMap 來發送請求到伺服器,並將回應輸出。
- 當 source Observable 發出一個值時,這個值會進入 concatMap 的邏輯。
- concatMap 會建立一個新的 inner Subscription,訂閱我們提供的 newStream$。
- 例如 newStream$ 總是依序輸出 1、2,然後完成。
- 如果來源再發出值(例如 B),concatMap 會再建立一個新的 inner Subscription。 同樣,輸出會得到 1 和 2,但不會 complete。
NOTE: Flattening 的意思就是:concatMap 把多個 inner Subscriptions 的值「攤平」到同一個輸出,讓整個流程能持續運作,而不會因 inner 的完成而中斷。
- 優點:順序安全、避免記憶體洩漏。
- 缺點:若內部訂閱很慢,會延遲後續處理。
Flattening Operators 與錯誤處理
如果 concatMap 裡面的 inner Observable 發生錯誤時會怎樣。
- 當 source Observable 發出一個值時,concatMap 會建立一個 inner Subscription(像是呼叫 apiReq)。
- 如果這個 inner Observable 發生錯誤(error):
- 這個錯誤會直接傳到外層的訂閱(subscribe)。
- 一旦發生錯誤,整個主流程(main/outer Subscription)會立刻結束,不會再繼續。
- 這時 Flattening Operator 也會自動幫我們取消對 source 的訂閱,因為流程已經結束了。
NOTE: 上述行為對所有 Flattening Operators 都一樣,不只 concatMap
NOTE: 如果 inner 完成(complete),這個 complete 不會傳到外層;但如果 inner 發生錯誤(error),這個錯誤會傳出去並終止整個訂閱。
該如何處理 Flattening Operators 發生的錯誤
有時候,我們不希望 inner Observable 發生錯誤時,整個流程就被中斷。我們想讓主流程繼續下去。
怎麼做:
假設 source Observable 發出 A,經過 concatMap 建立 inner Observable(例如 apiReq$)。如果 inner Observable 出錯,只要在裡面加上 catchError,把錯誤轉成 complete,這樣這次的 inner 結束,但不會影響外層。因為 Flattening Operators 不會把 inner 的 complete 傳出去,所以主流程還是會繼續。
NOTE: 只要在 inner Observable 用 catchError,就能防止錯誤讓整個流程中斷,主流程可以繼續接收後面的資料。
concatMap 的並發處理 (Concurrency)
前面說的行為,其實適用於所有 Flattening Operators(像是 mergeMap、switchMap、exhaustMap、concatMap)。它們最大的不同點,就是遇到還沒結束的 inner Observable 時,會怎麼處理。
以 concatMap 為例:
- source Observable 發出 A,concatMap 就建立一個 inner Observable。
- inner 發出兩個值,這兩個值會直接傳到輸出。
- 如果這時 source 又發出新值(例如 B),但前一個 inner 還沒結束,
- concatMap 會等到前一個 inner 結束後,才開始處理 B。
- 如果前一個 inner 一直沒結束,後面的值就會被卡住,流程也會停在這裡。
這看起來像缺點,但其實有好處:
- concatMap 不會讓我們不小心留下沒結束的 inner Observable。
- 如果 inner 沒有結束,我們會馬上發現,因為 source 的新值都不會被處理。
- 這樣可以幫助我們避免記憶體洩漏,因為沒清掉的訂閱常常是主因。
總結:concatMap 雖然有可能「卡住」,但這個特性反而能讓我們在開發時及早發現問題。
這種做法在某些情況下特別有用,比方說當 inner Observable 是用來發送修改資料庫的 HTTP 請求時。我們通常會希望這些請求一個接一個來,這樣才能確保最後存到伺服器的資料是正確的。如果我們想避免記憶體沒被釋放,或是資料處理順序出錯(像是多個請求同時發生導致結果混亂),用 concatMap 會比較安全。
2. switchMap
switchMap 的用法跟 concatMap 很像:我們一樣要提供一個函式,這個函式會根據每個來源值回傳一個 Observable。
運作方式很簡單:
- source 發出 A,switchMap 就建立並訂閱一個新的 inner Observable。
- A 的回應還沒回來時,
- source 又發出 B,switchMap 會馬上取消 A,直接去訂閱新的 B。
- B 有結果時,就把值傳到輸出,然後完成。
如果我們想要每次只處理最新的請求,不需要等前一個結束,switchMap 就很適合用在這種情境。
switchMap 需要注意的地方
用 switchMap 時要小心幾件事:
- HTTP 請求可能已經送出
- 當我們開始訂閱時,瀏覽器會立刻把 HTTP 請求送到伺服器。
- 就算我們馬上取消訂閱,請求還是可能已經送出,只是我們不會收到回應。
- 請求順序可能會亂
- 如果我們很快又發出新的請求,前一個被取消的請求還是有可能比新的請求晚到伺服器,這樣一來,資料就有可能出現錯亂。
NOTE: 如果 switchMap 用在會改變伺服器資料的操作 (Mutation),結果可能會不如預期。
適合 switchMap 的情境
如果我們只是查詢資料,只在意最新輸入的結果,那 switchMap 就很適合,因為能馬上反應最新輸入而且保證我們拿到的是最後一次輸入的回應。
3. mergeMap
mergeMap 可以同時處理多個內部訂閱(inner Subscriptions),不需要等前一個結束。
運作方式很簡單:
- source Observable 發出 A,mergeMap 就建立第一個 inner Subscription。
- 如果這時又發出 B,mergeMap 會直接再建立第二個 inner Subscription,不會等 A 結束。
- 接下來:
- inner1 收到資料並完成,資料就會被輸出,然後 inner1 結束。
- source 再發出 C,mergeMap 立刻建立第三個 inner Subscription。
- inner3 很快完成,資料輸出,inner3 結束。
- 最後 inner2 也完成,資料輸出,inner2 結束。
重點整理:
- mergeMap 不會等前一個 inner 結束,新值來就直接開新的 inner Subscription。
- 所有 inner Subscription 的資料都會直接輸出(攤平成一條線)。
- 每個 inner 完成時只會結束自己,不會影響整體流程,不會造成記憶體問題。
NOTE: mergeMap 很適合需要同時處理多個請求或事件的情境,不用等上一個做完。
mergeMap 的特性與風險
mergeMap 可以讓多個請求同時進行,不用等前一個完成。只要有任何一個 inner Subscription 有資料,就會馬上輸出,所有 inner 的結果都會合併在一起。不過要注意,這樣做不會保證資料的順序,輸出的結果可能和來源或請求的順序不一樣,容易出現亂序。如果來源在短時間內發出很多值,就會同時產生很多 inner Subscription;如果這些 inner 沒有正常結束,數量會一直增加,最後可能造成記憶體用量暴增甚至當機。特別是在處理 HTTP 請求時,要小心這種情況,因為請求和伺服器回應的順序都可能和我們想的不一樣,容易導致資料混亂。
NOTE: mergeMap 適合並行處理,但要確保 inner Observables 會完成;否則容易造成 記憶體洩漏與亂序問題。
三種 Flattening Operator 比較
| Operator | 行為模式 | 優點 | 缺點 |
|---|---|---|---|
| concatMap | 排隊逐一處理 | 順序安全、避免漏訂閱 | 可能速度慢 |
| switchMap | 取消舊訂閱,保留最新值 | 即時反應、僅保留最新 | 資料順序可能混亂 |
| mergeMap | 並行多個訂閱 | 高效能並行處理 | 記憶體洩漏風險、順序不可預測 |
總結建議
- 不確定選哪個 → 先用 concatMap(最安全)。
- 需要最新輸入結果 → switchMap。
- 需要並行效能 → mergeMap(但要小心記憶體洩漏與順序問題)。
總結
- Pipeable Operators:用來篩選、轉換、監控或處理錯誤,例如
filter、map、tap、debounceTime、catchError。 - Flattening Operators:控制並行訂閱與結果輸出,例如
concatMap、switchMap、mergeMap。