RxJS Creation Functions
在 RxJS 中,Creation Functions 提供了一組便利的工具,讓我們能快速建立 Observables,而不需要每次都手動撰寫 new Observable。這份筆記整合並翻譯了兩份內容,涵蓋了常見的 Creation Functions 與應用情境。
Creation Function #1 - of
of 用來建立一個會依序發送固定值並自動完成的 Observable。
「of」這個函式建立 Observable 的原理,其實很簡單,可以想像成下方的程式碼。
原理範例:
Creation Function #2 - from
接下來介紹 from 這個建立函式。from 可以把各種型態的資料轉成 Observable,例如把陣列變成 Observable。它的用法和 of 很像,不同的是 from 傳入的是一個陣列,而不是多個參數。from 也很常用來把 Promise 轉成 Observable。當我們訂閱這個 Observable 時,Promise resolve 的值會被發送出來,然後自動完成;如果 Promise 被 reject,則會發出錯誤。from 也能處理像是可迭代物件(例如生成器函式)或其他類似 Observable 的物件。
Promise 範例
接下來我們來看第二個例子:如何把 Promise 轉成 Observable。為什麼要這麼做呢?有時候我們手上已經有用 Promise 寫好的程式或 API,但想要把它放進 RxJS 的世界裡,這樣就能用 RxJS 的各種工具來處理非同步流程,或是把它和其他 Observable 結合起來,讓程式更好管理、更有彈性。
Creation Function #3 - fromEvent
接下來我們來看 fromEvent 這個建立函式。fromEvent 可以幫我們把各種事件來源變成 Observable。舉例來說,不管是網頁上的 DOM 事件、Node.js 的 event emitter,甚至 jQuery 的事件都可以用。很適合用來監聽像是按鈕點擊、表單輸入、視窗縮放等事件,每當事件發生時就會發出資料。
假設我們用 fromEvent 綁定一個按鈕的點擊事件,訂閱這個 Observable 就像是用 addEventListener 監聽事件一樣;而取消訂閱則像是用 removeEventListener 移除事件監聽。其實底層 RxJS 也是幫我們自動做這些事,讓我們不用自己手動加移除事件監聽器。
潛在問題:記憶體洩漏
若不正確移除事件監聽器,取消訂閱後事件仍可能觸發。
Creation Function #4 - timer
類似 setTimeout,等待指定時間後發送一個值並完成。
timer 這個建立函式可以幫我們建立一個 Observable,會在等待一段時間後發出一個值,然後就結束。我們可以把它想成 RxJS 版的 setTimeout。每次訂閱這個 Observable,都會重新啟動一個新的計時器。舉例來說,如果我們設定 2000 毫秒(也就是 2 秒),那麼訂閱後會等 2 秒,然後發出一個值(這裡是 0),接著就自動完成。這個 0 代表計時器發出的第一個值。
支援在取消訂閱時中止計時器:
關於 timer 這個函式是如何透過 Observable 所建立的,可以想像成下方的程式碼。
Creation Function #5 - interval
類似 setInterval,會依照時間間隔持續發送數值。
老樣子,我們透過 Observable 來復刻 interval 這個 function
Creation Function #6 - forkJoin
forkJoin 是一個相當實用的工具。它的工作方式可以想像成「等所有任務都完成後,再一次交出最終結果」。使用時,可以傳入一組 Observables,通常是一個陣列。當 forkJoin 被訂閱(subscribe)後,會在背後同時對這些 Observables 建立訂閱。 接著,它不會急著回傳任何值,而是 耐心等待所有 Observables 都完成(complete)。
這個特性特別適合用在 需要同時發送多個 HTTP 請求 的情境。例如,要一次向不同 API 取得資料,再等全部回應回來後,才進一步處理或顯示結果。
範例:HTTP 請求
使用 forkJoin 處理更複雜的情境
假設有兩個 Observables:A 和 B。 使用 forkJoin 建立了一個新的 Observable,並對它進行訂閱。此時,forkJoin 的邏輯會和之前一樣,分別對 A 與 B 建立訂閱。
接下來觀察它們的行為:
- Observable A 發出一個值
- 此時 forkJoin 並不會輸出任何東西,因為兩個訂閱都還在進行。它只是把 A 最新發出的值暫存在記憶體中。
- Observable B 發出值 1
- 同樣地,forkJoin 仍不會輸出。它只會把這個值記錄為 B 的最新值。
- Observable B 發出完成(complete)通知
- 雖然 B 已經結束,但因為 A 還沒有完成,forkJoin 依舊不會輸出任何結果。
- Observable A 再次發出一個值
- forkJoin 會更新記憶體中 A 的最新值,但依舊不輸出。
- Observable A 發出完成通知
- 此時,A 和 B 都已完成,forkJoin 終於會輸出結果。輸出內容就是 A 和 B 各自最後一次發出的值,然後再發出完成(complete)。
從這個過程可以看出,forkJoin 的核心特性是:
- 等待所有輸入的 Observables 完成後才會輸出。
- 在完成之前,只會不斷更新「最新已知的值」,但不會實際輸出任何東西。
錯誤情境
回到先前的 HTTP 請求範例。假設請求 A 先回應,而請求 B 失敗並拋出了錯誤。 由於發生錯誤,已經不可能讓兩個訂閱都順利完成。
在這種情況下,forkJoin 會立即發出這個錯誤。 需要特別注意的是,一旦 forkJoin 發出錯誤,就代表整個訂閱結束,後續不會再有任何值被發出。
Creation Function #7 - combineLatest
combineLatest 與 forkJoin 有類似的模式,但有個重要差異:它會更頻繁地輸出值。
和 forkJoin 一樣,combineLatest 也會接收一個 Observables 陣列 作為輸入,並在底層對它們進行訂閱。但不同的是,combineLatest 的邏輯會在 任一輸入 Observable 發出新值時,立即輸出一組最新的值。
運作流程
假設有兩個 Observables:A 和 B。 透過 combineLatest 建立一個新的 Observable,並把 A 和 B 作為輸入。當訂閱這個新 Observable 時,內部會分別訂閱 A 和 B。
接下來觀察它們的通知行為:
- Observable A 先發出一個值
- 此時 combineLatest 不會馬上輸出,因為還缺少來自另一個來源 B 的值。(重點:combineLatest 需要 至少每個來源各一個值,才會開始輸出。)
- Observable B 發出值 1
- 現在每個來源都至少有一個值,combineLatest 就會輸出一組陣列,內容是 目前各來源的最新值。
- 後續 A 或 B 繼續發出新值
- 每當有來源發出新值,combineLatest 都會立即輸出更新後的陣列。
- Observable B 完成(complete)
- 這個事件本身不會造成特別的影響。只要 A 還能發出值,combineLatest 就會繼續輸出更新後的結果。
- 最後一個來源完成
- 當所有輸入的 Observables 都完成時,combineLatest 才會發出完成(complete)通知。此時因為沒有更多值可輸出,整個流程結束。
特點
- 至少需要每個來源輸出一次,才會開始發送。
- 後續每次任一來源輸出新值,都會組合並發送。
- 若其中一個來源錯誤,整個 Observable 會立即錯誤並結束。
錯誤情境
另外來看看如果其中一個輸入 Observable 發出了錯誤,會發生什麼情況。
如同先前所提,只要來源 Observables 持續發出值,combineLatest 就會輸出最新值的組合,以陣列形式提供。然而,假設此時 Observable B 發出了錯誤。 錯誤代表執行過程中出現了問題,因此 combineLatest 會將這個錯誤直接傳遞出去。
一旦發生錯誤,後續即使 Observable A 繼續發出值,也不會再有任何輸出,因為 combineLatest 已經進入錯誤狀態。此時,combineLatest 的 Teardown 邏輯 會關閉對 A 的訂閱。
總結
- of:發送固定值後完成。
- from:可轉換陣列、Promise 或其他 iterable。
- fromEvent:將事件來源轉換成 Observable。
- timer:延遲後發送一次。
- interval:固定間隔持續發送。
- forkJoin:等待所有來源完成,發送最後的組合。
- combineLatest:任一來源有新值時發送最新組合。
Creation Functions 讓我們能用簡潔的語法建立強大的 Observables,並靈活處理同步與非同步資料流。