Rick's DevNotes
筆記關於我作品集
筆記類別
  • 全部
  • DockerDocker
  • NetworkNetwork
  • RxJSRxJS
  • NginxNginx
  • TypeScriptTypeScript
  • Data_Structure_And_AlgorithmData Structure And Algorithm
  • JavaScriptJavaScript
  • PostgreSQLPostgreSQL
  • ReactReact
  • GitGit

© 2026 Rick's DevNotes. All rights reserved.

# Observable# Stream# Web APIs

建立時間:2022/09/19

RxJS Creation Functions

在 RxJS 中,Creation Functions 提供了一組便利的工具,讓我們能快速建立 Observables,而不需要每次都手動撰寫 new Observable。這份筆記整合並翻譯了兩份內容,涵蓋了常見的 Creation Functions 與應用情境。


Creation Function #1 - of

of 用來建立一個會依序發送固定值並自動完成的 Observable。

「of」這個函式建立 Observable 的原理,其實很簡單,可以想像成下方的程式碼。

原理範例:


Creation Function #2 - from

接下來介紹 from 這個建立函式。from 可以把各種型態的資料轉成 Observable,例如把陣列變成 Observable。它的用法和 of 很像,不同的是 from 傳入的是一個陣列,而不是多個參數。from 也很常用來把 Promise 轉成 Observable。當我們訂閱這個 Observable 時,Promise resolve 的值會被發送出來,然後自動完成;如果 Promise 被 reject,則會發出錯誤。from 也能處理像是可迭代物件(例如生成器函式)或其他類似 Observable 的物件。

Promise 範例

接下來我們來看第二個例子:如何把 Promise 轉成 Observable。為什麼要這麼做呢?有時候我們手上已經有用 Promise 寫好的程式或 API,但想要把它放進 RxJS 的世界裡,這樣就能用 RxJS 的各種工具來處理非同步流程,或是把它和其他 Observable 結合起來,讓程式更好管理、更有彈性。


Creation Function #3 - fromEvent

接下來我們來看 fromEvent 這個建立函式。fromEvent 可以幫我們把各種事件來源變成 Observable。舉例來說,不管是網頁上的 DOM 事件、Node.js 的 event emitter,甚至 jQuery 的事件都可以用。很適合用來監聽像是按鈕點擊、表單輸入、視窗縮放等事件,每當事件發生時就會發出資料。

假設我們用 fromEvent 綁定一個按鈕的點擊事件,訂閱這個 Observable 就像是用 addEventListener 監聽事件一樣;而取消訂閱則像是用 removeEventListener 移除事件監聽。其實底層 RxJS 也是幫我們自動做這些事,讓我們不用自己手動加移除事件監聽器。

潛在問題:記憶體洩漏

若不正確移除事件監聽器,取消訂閱後事件仍可能觸發。


Creation Function #4 - timer

類似 setTimeout,等待指定時間後發送一個值並完成。

timer 這個建立函式可以幫我們建立一個 Observable,會在等待一段時間後發出一個值,然後就結束。我們可以把它想成 RxJS 版的 setTimeout。每次訂閱這個 Observable,都會重新啟動一個新的計時器。舉例來說,如果我們設定 2000 毫秒(也就是 2 秒),那麼訂閱後會等 2 秒,然後發出一個值(這裡是 0),接著就自動完成。這個 0 代表計時器發出的第一個值。

支援在取消訂閱時中止計時器:

關於 timer 這個函式是如何透過 Observable 所建立的,可以想像成下方的程式碼。


Creation Function #5 - interval

類似 setInterval,會依照時間間隔持續發送數值。

老樣子,我們透過 Observable 來復刻 interval 這個 function


Creation Function #6 - forkJoin

forkJoin 是一個相當實用的工具。它的工作方式可以想像成「等所有任務都完成後,再一次交出最終結果」。使用時,可以傳入一組 Observables,通常是一個陣列。當 forkJoin 被訂閱(subscribe)後,會在背後同時對這些 Observables 建立訂閱。 接著,它不會急著回傳任何值,而是 耐心等待所有 Observables 都完成(complete)。

這個特性特別適合用在 需要同時發送多個 HTTP 請求 的情境。例如,要一次向不同 API 取得資料,再等全部回應回來後,才進一步處理或顯示結果。

範例:HTTP 請求

使用 forkJoin 處理更複雜的情境

假設有兩個 Observables:A 和 B。 使用 forkJoin 建立了一個新的 Observable,並對它進行訂閱。此時,forkJoin 的邏輯會和之前一樣,分別對 A 與 B 建立訂閱。

接下來觀察它們的行為:

  1. Observable A 發出一個值
    • 此時 forkJoin 並不會輸出任何東西,因為兩個訂閱都還在進行。它只是把 A 最新發出的值暫存在記憶體中。
  2. Observable B 發出值 1
    • 同樣地,forkJoin 仍不會輸出。它只會把這個值記錄為 B 的最新值。
  3. Observable B 發出完成(complete)通知
    • 雖然 B 已經結束,但因為 A 還沒有完成,forkJoin 依舊不會輸出任何結果。
  4. Observable A 再次發出一個值
    • forkJoin 會更新記憶體中 A 的最新值,但依舊不輸出。
  5. Observable A 發出完成通知
    • 此時,A 和 B 都已完成,forkJoin 終於會輸出結果。輸出內容就是 A 和 B 各自最後一次發出的值,然後再發出完成(complete)。

從這個過程可以看出,forkJoin 的核心特性是:

  • 等待所有輸入的 Observables 完成後才會輸出。
  • 在完成之前,只會不斷更新「最新已知的值」,但不會實際輸出任何東西。

錯誤情境

回到先前的 HTTP 請求範例。假設請求 A 先回應,而請求 B 失敗並拋出了錯誤。 由於發生錯誤,已經不可能讓兩個訂閱都順利完成。

在這種情況下,forkJoin 會立即發出這個錯誤。 需要特別注意的是,一旦 forkJoin 發出錯誤,就代表整個訂閱結束,後續不會再有任何值被發出。


Creation Function #7 - combineLatest

combineLatest 與 forkJoin 有類似的模式,但有個重要差異:它會更頻繁地輸出值。

和 forkJoin 一樣,combineLatest 也會接收一個 Observables 陣列 作為輸入,並在底層對它們進行訂閱。但不同的是,combineLatest 的邏輯會在 任一輸入 Observable 發出新值時,立即輸出一組最新的值。

運作流程

假設有兩個 Observables:A 和 B。 透過 combineLatest 建立一個新的 Observable,並把 A 和 B 作為輸入。當訂閱這個新 Observable 時,內部會分別訂閱 A 和 B。

接下來觀察它們的通知行為:

  1. Observable A 先發出一個值
    • 此時 combineLatest 不會馬上輸出,因為還缺少來自另一個來源 B 的值。(重點:combineLatest 需要 至少每個來源各一個值,才會開始輸出。)
  2. Observable B 發出值 1
    • 現在每個來源都至少有一個值,combineLatest 就會輸出一組陣列,內容是 目前各來源的最新值。
  3. 後續 A 或 B 繼續發出新值
    • 每當有來源發出新值,combineLatest 都會立即輸出更新後的陣列。
  4. Observable B 完成(complete)
    • 這個事件本身不會造成特別的影響。只要 A 還能發出值,combineLatest 就會繼續輸出更新後的結果。
  5. 最後一個來源完成
    • 當所有輸入的 Observables 都完成時,combineLatest 才會發出完成(complete)通知。此時因為沒有更多值可輸出,整個流程結束。

特點

  • 至少需要每個來源輸出一次,才會開始發送。
  • 後續每次任一來源輸出新值,都會組合並發送。
  • 若其中一個來源錯誤,整個 Observable 會立即錯誤並結束。

錯誤情境

另外來看看如果其中一個輸入 Observable 發出了錯誤,會發生什麼情況。

如同先前所提,只要來源 Observables 持續發出值,combineLatest 就會輸出最新值的組合,以陣列形式提供。然而,假設此時 Observable B 發出了錯誤。 錯誤代表執行過程中出現了問題,因此 combineLatest 會將這個錯誤直接傳遞出去。

一旦發生錯誤,後續即使 Observable A 繼續發出值,也不會再有任何輸出,因為 combineLatest 已經進入錯誤狀態。此時,combineLatest 的 Teardown 邏輯 會關閉對 A 的訂閱。


總結

  • of:發送固定值後完成。
  • from:可轉換陣列、Promise 或其他 iterable。
  • fromEvent:將事件來源轉換成 Observable。
  • timer:延遲後發送一次。
  • interval:固定間隔持續發送。
  • forkJoin:等待所有來源完成,發送最後的組合。
  • combineLatest:任一來源有新值時發送最新組合。

Creation Functions 讓我們能用簡潔的語法建立強大的 Observables,並靈活處理同步與非同步資料流。

參考資料

  • RxJs
  • RxJs In Practice
  • RxJS 7 and Observables: Introduction