Dart Stream Transformers 建構資料管線:轉換邏輯與錯誤處理實作要點

Published on: | Last updated:

所以說,Dart 這個 StreamTransformer 到底是在幹嘛?

欸,我最近在搞一些 Dart 的東西,然後又碰到了 Stream。你知道的,就是那個非同步的資料流。一開始用都覺得還好,但資料一複雜,哇,那個 code 就開始變得跟義大利麵一樣,纏在一起,超可怕。

想像一下你家的水管,水龍頭打開,水就嘩啦嘩啦流出來。這就是 Stream。但如果流出來的水有時候是熱水、有時候是冰水,有時候還夾帶沙子,你總不能直接喝吧?你得在水管中間加裝一些東西,比如過濾器把沙子濾掉、加熱器或冷卻器來調節溫度。對,StreamTransformer 基本上就是做這個的。

資料流通過轉換器的概念示意圖
資料流通過轉換器的概念示意圖

重點一句話:StreamTransformer 就像是幫你的資料水管裝上各種可插拔、可組合的過濾器和加工站。

老實說,這東西讓你把複雜的資料處理邏輯,拆成一個個獨立、可重複使用的小單元。程式碼會乾淨很多,真的。

最簡單的用法:內建的那些就很好用

在你跳下去自己造輪子之前,Dart 本身就給了幾個最常用的「濾心」。說真的,大概八成的狀況下,光用這幾個就夠了。

把資料變個樣子:.map()

這個最單純。就是來的東西,我都幫你加工一下再送出去。比如說,數字進來,我全部乘以二。


Stream<int> numbers = Stream.fromIterable([1, 2, 3, 4]);

numbers
  .map((number) => number * 2)
  .listen((result) => print(result));
// Console 會印出: 2, 4, 6, 8

這很實用啊,像是從 API 拿到一堆 JSON,你就可以用 `map` 把每一筆都轉成你定義好的 Class 物件。很直覺。

挑三揀四:.where()

這個也很好懂,就是設定一個條件,不符合的就直接丟掉,根本不往下送。例如,我只要偶數。


// 承接上面的 numbers stream
numbers
  .where((number) => number.isEven)
  .listen((result) => print(result));
// Console 會印出: 2, 4

使用者輸入的字串,長度太短的就不要;感應器的讀數,波動太小的就忽略。都是用 `where` 來做,超方便。

什麼時候該自己動手做一個?

好,問題來了。如果 `map` 跟 `where` 沒辦法滿足你,或是你的邏輯太複雜,每次都要複製貼上一大段,那差不多就是時候自己做一個專屬的 Transformer 了。

什麼叫複雜的邏輯?

  • 不只過濾,還要轉換,甚至還想改變事件的頻率。
  • 你的處理邏輯需要一些「設定值」,例如一個溫度的門檻、一個字串的最小長度。
  • 這段邏輯在 App 的很多地方都會用到,你不想一直重複寫。

說到這個,官方文件(就是 Dart API docs)都寫得很清楚,但寫法很...學術。如果你去看一些台灣開發者的分享,例如在 iThome 鐵人賽上的文章,他們通常會用更實戰的角度切入,強調的是「解決什麼問題」。我自己是覺得,官方文件像是字典,告訴你每個字的意思;而社群文章比較像食譜,教你怎麼把這些材料兜成一道菜。

實作指引:來做一個「溫度過濾器」

我們來模擬一個情境。假設你在做一個天氣 App,感應器會一直傳來溫度讀數,但你只關心超過 20 度的溫度。這個我們就能自己做一個 `TemperatureFilter`。

看起來好像很可怕,但其實你只要搞懂兩個主要零件:`StreamTransformerBase` 跟 `EventSink`。

第一步:定義你的 Transformer 外殼

這一步是告訴 Dart:「嘿,我要做一個新的 Transformer 囉!」

開發者在撰寫 StreamTransformer 的專注神情
開發者在撰寫 StreamTransformer 的專注神情

import 'dart:async';

// 繼承 StreamTransformerBase,然後告訴它「進來是 int,出去也是 int」
class TemperatureFilter extends StreamTransformerBase<int, int> {
  final int threshold; // 這是我們的設定值,就是溫度的門檻

  TemperatureFilter(this.threshold); // 透過建構子把門檻傳進來

  @override
  Stream<int> bind(Stream<int> stream) {
    // 這一行是關鍵,它把進來的 stream 跟我們真正的處理邏輯(Sink)綁在一起
    return Stream.eventTransformed(
      stream,
      (sink) => _TemperatureSink(sink, threshold),
    );
  }
}

這邊 `StreamTransformerBase` 裡的兩個 `int`,別被它嚇到。它只是在問你:「進來的資料是什麼型別?出去的資料又是什麼型別?」就這樣而已。

然後 `bind` 這個方法,你可以想像成是把你的 Transformer 接上水管的那個動作。它會回傳一個新的、處理過的 Stream。

第二步:實作真正的處理邏輯 EventSink

`EventSink` 才是真正做事的地方。所有資料都會流到這裡來,你要在這裡決定它的生死。


// 這是一個內部的 class,外面的人不需要知道
class _TemperatureSink implements EventSink<int> {
  final EventSink<int> _outputSink; // 這是「下一站」的入口
  final int _threshold;

  _TemperatureSink(this._outputSink, this._threshold);

  // 最重要的方法:add()。每一筆資料進來都會跑這裡
  @override
  void add(int data) {
    // 我們的邏輯很簡單:如果溫度大於等於門檻
    if (data >= _threshold) {
      // 就把它送到下一站
      _outputSink.add(data);
    }
    // 如果不符合,就什麼都不做,等於是把它丟掉了
  }

  // 錯誤處理也很重要
  @override
  void addError(Object error, [StackTrace? stackTrace]) {
    // 如果上游發生錯誤,我們也直接把它傳下去
    _outputSink.addError(error, stackTrace);
  }

  // 水管關閉的時候,也要跟著關閉
  @override
  void close() {
    _outputSink.close();
  }
}

你看,`add` 方法裡面就是我們最核心的 `if` 判斷。`_outputSink` 就是通往下一個 `listen` 或下一個 `transform` 的管道。我們把符合條件的資料 `add` 進去,它就會繼續往下流了。

第三步:用起來!

做好了之後,用起來就跟內建的 `map` 或 `where` 一模一樣。


void main() {
  Stream<int> temperatures = Stream.fromIterable([15, 20, 25, 10, 30, 19]);

  temperatures
      .transform(TemperatureFilter(20)) // 在這裡使用我們自製的 Transformer!
      .listen(
        (temp) => print('高溫警報: $temp 度'),
        onError: (error) => print('發生錯誤: $error'),
      );
  // Console 會印出:
  // 高溫警報: 20 度
  // 高溫警報: 25 度
  // 高溫警報: 30 度
}

是不是很酷?你把一堆邏輯,打包成一個叫做 `TemperatureFilter(20)` 的小元件。以後哪裡需要,就直接拿來用,超級乾淨。

決策建議:我該用哪個?

說了這麼多,你可能會有點亂。簡單說,你可以用這個表來快速決定:

方式 適用情境 我自己的OS(碎碎念)
直接用 .map() / .where() 超簡單、一次性的轉換或過濾。通常是一對一的。 就...一行能搞定的事,真的別想太多。code 能被讀懂最重要。
StreamTransformer.fromHandlers() 比 map/where 複雜一點,但又懶得寫一整個 Class 的時候。 懶人包啦。適合那種「我想加個小條件再往下送」的臨時工。處理 error 或 done 事件也方便。
繼承 StreamTransformerBase 有複雜的狀態邏輯、需要接收設定參數、而且這段邏輯會在很多地方重複使用。 這個就是你的重裝武器了。當你發現同樣的髒活要幹好幾次時,就該花點時間把它包成這個。

有些地方別硬用啊

雖然 StreamTransformer 很好用,但它不是萬靈丹。有時候用它反而把事情搞複雜了。

  • 超簡單的轉換:如果只是把 int 轉 String,直接用 `map((i) => i.toString())` 就好,寫一個 Transformer 根本是殺雞用牛刀。
  • 需要跨事件的狀態管理:如果你的轉換需要「記住」上一個事件是什麼,然後根據上上個事件來決定這次要幹嘛...那 StreamTransformer 可能會變得很痛苦。這時候你可能需要的是 Bloc、Riverpod 這類更完整的狀態管理方案。
  • 單純的事件通知:如果只是想知道「按鈕被按了」,用一個簡單的 callback 或 `Future` 就行了,拉一整條 Stream 水管進來有點小題大作。

效能考量:順序很重要!

當你開始把好幾個 Transformer 串在一起用,像串燒一樣,有個小細節會大大影響效能。

永遠記得:先過濾,再加工。

你想想,假設你有 1000 顆馬鈴薯要處理,你的目標是「挑出大的,然後削皮」。你會先把 1000 顆全部削完皮,再從裡面挑出大的嗎?當然不會啊!一定是先挑出那 100 顆大的,然後只削這 100 顆的皮。

寫成 code 就是這樣:

過濾與加工的順序對效能的影響
過濾與加工的順序對效能的影響

// 比較沒效率的寫法:先對全部做昂貴的加工,再過濾
stream
    .map(expensiveOperation) // 1000 次
    .where(filterCondition); // 1000 次

// 比較聰明的寫法:先過濾掉不要的,再對剩下的加工
stream
    .where(filterCondition) // 1000 次
    .map(expensiveOperation); // 只做 100 次

這在資料量大的時候,差別會非常非常明顯。算是一個小技巧,但超級重要。

好了,差不多就是這樣。StreamTransformer 一開始看會覺得有點繞,但一旦你掌握了「把處理邏輯打包成獨立元件」這個核心思想,你會發現它在處理複雜非同步資料流的時候,真的是個超級英雄。

你遇過最需要用 StreamTransformer 來處理的「髒資料」是什麼?留言分享一下吧,搞不好我們都踩過一樣的坑!

Related to this topic:

Comments

  1. Guest 2025-11-19 Reply
    其實 Stream Transformer 我真的用滿多次的。想到有一回在公司做那種報表的串流,其實就是即時收資料…一開始很煩欸,要過濾、要轉格式、然後還得管錯誤。每次都忘記 try-catch,也不是故意,反正只要沒處理 error,那串 stream 就直接爆掉,有好幾次搞到大家 debug 超久,頭都有點昏。 之後我就變得比較小心,就變成寫 transformer 必定會多加 onError,那個 try-catch 也不會偷懶直接補進去。印象很深,有段時間要從 server 收 JSON,一堆格式怪怪的,如果型別沒仔細檢查,超容易遇到 null 或是整結構大走鐘,一碰到這種 pipeline 直接癱瘓。 唉說真的,有些時候 stream 處理一半需求就又換了,要嘛插 filter,要嘛 mapping…搞一搞其實用 StreamTransformer 感覺像疊積木,很靈活,不用擔心重構弄半天。另外同事問類似東西,我都跟他們講,transformer 給它善用,不夠自己寫一個,拜託千萬不要全部東西塞 listen 裡面,到最後真的誰都救不了你(認真)。
撥打專線 LINE免費通話