TypeScript 中的 `Shared Worker`

TypeScript 中的 `Shared Worker`

本文將說明 TypeScript 中的 Shared Worker

我們將透過 TypeScript 程式碼範例,詳細說明 Shared Worker 的運作方式與實務用法。

YouTube Video

TypeScript 中的 Shared Worker

Shared Worker 是一個在同源下可被多個分頁、視窗與 iframe 共享的單一工作執行緒。透過它,你可以在多個瀏覽器分頁之間處理共享的狀態與資源。

例如,你可以有效率地實作共享的 WebSocket 連線跨分頁同步的快取與佇列處理,以及互斥機制

Dedicated Worker 不同,Shared Worker 會透過 onconnect 事件接收多個 MessagePort,並可對多個用戶端進行多路複用通訊。

應該選擇 Shared Worker 的情況

在以下情況下,使用 Shared Worker 較為合適。

  • 需要跨分頁共享狀態或進行互斥管理時
  • 想要共用單一的 WebSocket 連線或 IndexedDB 存取時
  • 需要通知所有分頁(廣播)時
  • 希望集中執行重度運算以節省資源時

相反地,下列情況更適合採用其他方式。

  • 當你需要快取控制或離線支援時,可以使用 Service Worker
  • 對於僅限於單一分頁的繁重運算,可以使用 Dedicated Worker

Shared Worker 的實作步驟

接下來我們將使用 TypeScript 逐步實作以下內容。

  • 具型別安全的訊息協定
  • 以 Promise 為基礎的請求/回應(RPC)
  • 向所有分頁廣播
  • 心跳檢測與用戶端清理

環境設定

建立設定,以編譯每個使用 Shared Worker 的原始檔。

 1{
 2  "compilerOptions": {
 3    "target": "ES2020",
 4    "module": "ES2020",
 5    "lib": ["ES2020", "dom", "WebWorker"],
 6    "moduleResolution": "Bundler",
 7    "strict": true,
 8    "noEmitOnError": true,
 9    "outDir": "out",
10    "skipLibCheck": true
11  },
12  "include": ["src"]
13}
  • tsconfig-src.json 中啟用 DOM 與 Web Worker 的型別定義,讓程式碼能安全地編譯。

定義訊息協定

通訊的基礎是具型別的訊息契約。事先定義可讓後續通訊更安全、也更容易擴充。

 1// worker-protocol.ts
 2// Define discriminated unions for structured messages
 3
 4export type RequestAction =
 5  | { type: 'ping' }
 6  | { type: 'echo'; payload: string }
 7  | { type: 'getTime' }
 8  | { type: 'set'; key: string; value: unknown }
 9  | { type: 'get'; key: string }
10  | { type: 'broadcast'; channel: string; payload: unknown }
11  | { type: 'lock.acquire'; key: string; timeoutMs?: number }
12  | { type: 'lock.release'; key: string };
  • RequestAction 是一種可判別聯合(標記聯合),用來表示發送到 worker 的請求類型,並定義 pinggetbroadcast 等操作。
1export interface RequestMessage {
2  kind: 'request';
3  id: string;
4  from: string;
5  action: RequestAction;
6}
  • RequestMessage 定義了從用戶端傳送至 worker 的請求訊息結構。
1export interface ResponseMessage {
2  kind: 'response';
3  id: string;
4  ok: boolean;
5  result?: unknown;
6  error?: string;
7}
  • ResponseMessage 定義了 worker 回傳給用戶端的回應訊息結構。
1export interface BroadcastMessage {
2  kind: 'broadcast';
3  channel: string;
4  payload: unknown;
5  from: string;
6}
  • BroadcastMessage 定義了 worker 傳送給其他用戶端的廣播訊息結構。
1export type WorkerInMessage =
2  | RequestMessage
3  | { kind: 'heartbeat'; from: string }
4  | { kind: 'bye'; from: string };
  • WorkerInMessage 是一種型別,表示 worker 接收的所有訊息,例如請求、心跳與斷線通知。
1export type WorkerOutMessage = ResponseMessage | BroadcastMessage;
  • WorkerOutMessage 是一種型別,表示 worker 傳送給用戶端的回應或廣播訊息。
1export const randomId = () => Math.random().toString(36).slice(2);
  • randomId 是一個函式,用於產生隨機英數字串,供訊息 ID 等用途。

實作 Shared Worker

shared-worker.ts 中,透過 onconnect 事件註冊連線的分頁並處理訊息。

1// shared-worker.ts
2/// <reference lib="webworker" />
  • 此指示讓 TypeScript 載入 Web Worker 的型別定義。
1import {
2  WorkerInMessage,
3  WorkerOutMessage,
4  RequestMessage,
5  ResponseMessage,
6} from './worker-protocol.js';
  • 匯入用於 worker 通訊的型別定義。
1export default {};
2declare const self: SharedWorkerGlobalScope;
  • 明確宣告 selfShared Worker 的全域作用域。
1type Client = {
2  id: string;
3  port: MessagePort;
4  lastBeat: number;
5};
  • Client 是一種型別,表示每個用戶端的識別碼、通訊埠與最近一次心跳的時間戳。
1const clients = new Map<string, Client>();
2const kv = new Map<string, unknown>();
3const locks = new Map<string, string>();
4const HEARTBEAT_TIMEOUT = 30_000;
  • 管理已連線用戶端的清單、鍵值儲存、鎖定狀態與逾時時間。
1function send(port: MessagePort, msg: WorkerOutMessage) {
2  port.postMessage(msg);
3}
  • send 是一個工具函式,將訊息送至指定的埠。
1function respond(req: RequestMessage, ok: boolean, result?: unknown, error?: string): ResponseMessage {
2  return { kind: 'response', id: req.id, ok, result, error };
3}
  • respond 會為請求產生回應訊息。
1function broadcast(from: string, channel: string, payload: unknown) {
2  for (const [id, c] of clients) {
3    send(c.port, { kind: 'broadcast', channel, payload, from });
4  }
5}
  • broadcast 會在指定頻道向所有用戶端傳送訊息。
1function handleRequest(clientId: string, port: MessagePort, req: RequestMessage) {
  • handleRequest 會依類型處理傳入的請求,並將結果回傳給用戶端。
 1  const { action } = req;
 2  try {
 3    switch (action.type) {
 4      case 'ping':
 5        send(port, respond(req, true, 'pong'));
 6        break;
 7      case 'echo':
 8        send(port, respond(req, true, action.payload));
 9        break;
10      case 'getTime':
11        send(port, respond(req, true, new Date().toISOString()));
12        break;
13      case 'set':
14        kv.set(action.key, action.value);
15        send(port, respond(req, true, true));
16        break;
17      case 'get':
18        send(port, respond(req, true, kv.get(action.key)));
19        break;
20      case 'broadcast':
21        broadcast(clientId, action.channel, action.payload);
22        send(port, respond(req, true, true));
23        break;
  • 在此程式碼中,會根據收到的請求類型,處理訊息的收發、資料的讀取與儲存,以及廣播。
 1      case 'lock.acquire': {
 2        const owner = locks.get(action.key);
 3        if (!owner) {
 4          locks.set(action.key, clientId);
 5          send(port, respond(req, true, { owner: clientId }));
 6        } else if (owner === clientId) {
 7          send(port, respond(req, true, { owner }));
 8        } else {
 9          const start = Date.now();
10          const tryWait = () => {
11            const current = locks.get(action.key);
12            if (!current) {
13              locks.set(action.key, clientId);
14              send(port, respond(req, true, { owner: clientId }));
15            } else if ((action.timeoutMs ?? 5000) < Date.now() - start) {
16              send(port, respond(req, false, undefined, 'lock-timeout'));
17            } else {
18              setTimeout(tryWait, 25);
19            }
20          };
21          tryWait();
22        }
23        break;
24      }
  • 此程式碼實作了客戶端為指定的鍵取得鎖的流程。如果該鎖尚未被持有,會立即取得;同一個客戶端再次請求時,也會視為成功。如果鎖已被其他客戶端持有,則每 25 毫秒重試一次直到釋放;若超過指定的逾時(預設 5 秒),則回應錯誤。
 1      case 'lock.release':
 2        if (locks.get(action.key) === clientId) {
 3          locks.delete(action.key);
 4          send(port, respond(req, true, true));
 5        } else {
 6          send(port, respond(req, false, undefined, 'not-owner'));
 7        }
 8        break;
 9      default:
10        send(port, respond(req, false, undefined, 'unknown-action'));
11    }
12  } catch (e: any) {
13    send(port, respond(req, false, undefined, e?.message ?? 'error'));
14  }
15}
  • 此程式碼會釋放用戶端持有的鎖;若用戶端缺乏權限或動作未知,則回傳錯誤回應。
1function handleInMessage(c: Client, msg: WorkerInMessage) {
2  if (msg.kind === 'heartbeat') {
3    c.lastBeat = Date.now();
4  } else if (msg.kind === 'bye') {
5    cleanupClient(msg.from);
6  } else if (msg.kind === 'request') {
7    handleRequest(c.id, c.port, msg);
8  }
9}
  • handleInMessage 會解析從用戶端收到的訊息,並處理請求與心跳。
1function cleanupClient(clientId: string) {
2  for (const [key, owner] of locks) {
3    if (owner === clientId) locks.delete(key);
4  }
5  clients.delete(clientId);
6}
  • cleanupClient 會從註冊表與鎖定狀態中移除已斷線的用戶端。
1setInterval(() => {
2  const now = Date.now();
3  for (const [id, c] of clients) {
4    if (now - c.lastBeat > HEARTBEAT_TIMEOUT) cleanupClient(id);
5  }
6}, 10_000);
  • 使用 setInterval 週期性檢查所有用戶端的心跳,並清理已逾時的連線。
 1self.onconnect = (e: MessageEvent) => {
 2  const port = (e.ports && e.ports[0]) as MessagePort;
 3  const clientId = crypto.randomUUID?.() ?? Math.random().toString(36).slice(2);
 4  const client: Client = { id: clientId, port, lastBeat: Date.now() };
 5  clients.set(clientId, client);
 6
 7  port.addEventListener('message', (ev) => handleInMessage(client, ev.data as WorkerInMessage));
 8  send(port, { kind: 'broadcast', channel: 'system', payload: { hello: true, clientId }, from: 'worker' });
 9  port.start();
10};
  • 當新的分頁或頁面連線至 Shared Worker 時會呼叫 onconnect,以註冊用戶端並開始通訊。

  • 在本檔案中,實作了 Shared Worker 的基本機制,使能在多個瀏覽器分頁之間進行共享狀態管理與通訊。

用戶端封裝(RPC)

接著,建立一個以 Promise 為基礎的 RPC 用戶端。

1// shared-worker-client.ts
2import {
3  RequestAction,
4  RequestMessage,
5  WorkerOutMessage,
6  randomId
7} from './worker-protocol.js';
  • 匯入用於 worker 通訊的型別定義與工具函式。
1export type BroadcastHandler = (msg: {
2  channel: string;
3  payload: unknown;
4  from: string
5}) => void;
  • 在此定義接收廣播訊息時執行的回呼函式型別。
1export class SharedWorkerClient {
  • SharedWorkerClient 是一個用戶端類別,與 Shared Worker 通訊,負責傳送請求並處理回應。
1  private worker: SharedWorker;
2  private port: MessagePort;
3  private pending = new Map<string, {
4    resolve: (v: any) => void;
5    reject: (e: any) => void
6  }>();
  • 這些變數為 worker 實例、與 worker 的通訊埠,以及用來追蹤待回應請求的對照表(Map)。
1  private clientId = randomId();
2  private heartbeatTimer?: number;
3  private onBroadcast?: BroadcastHandler;
  • 這些變數存放用戶端識別碼、傳送心跳的計時器,以及廣播接收處理器。
1  constructor(url: URL, name = 'app-shared', onBroadcast?: BroadcastHandler) {
2    this.worker = new SharedWorker(url, { name, type: 'module' as any });
3    this.port = this.worker.port;
4    this.onBroadcast = onBroadcast;
  • 在建構子中,初始化與 Shared Worker 的連線,並設定訊息監聽與心跳傳送。
 1    this.port.addEventListener('message', (ev) => {
 2      const msg = ev.data as WorkerOutMessage;
 3      if (msg.kind === 'response') {
 4        const p = this.pending.get(msg.id);
 5        if (p) {
 6          this.pending.delete(msg.id);
 7          msg.ok ? p.resolve(msg.result) : p.reject(new Error(msg.error || 'error'));
 8        }
 9      } else if (msg.kind === 'broadcast') {
10        this.onBroadcast?.(msg);
11      }
12    });
13    this.port.start();
  • 在此接收來自 worker 的訊息,並處理回應或廣播。
1    this.heartbeatTimer = window.setInterval(() => {
2      this.port.postMessage({ kind: 'heartbeat', from: this.clientId });
3    }, 10_000);
  • 週期性傳送心跳訊息以維持連線存活。
1    window.addEventListener('beforeunload', () => {
2      try {
3        this.port.postMessage({ kind: 'bye', from: this.clientId });
4      } catch {}
5      if (this.heartbeatTimer) clearInterval(this.heartbeatTimer);
6    });
7  }
  • 在視窗關閉前向 worker 傳送斷線通知。
1  request<T = unknown>(action: RequestAction): Promise<T> {
2    const id = randomId();
3    return new Promise<T>((resolve, reject) => {
4      this.pending.set(id, { resolve, reject });
5      this.port.postMessage({ kind: 'request', id, from: this.clientId, action });
6    });
7  }
  • request 方法會將指定的動作傳送至 worker,並以 Promise 形式接收結果。
1  ping() {
2    return this.request<string>({ type: 'ping' });
3  }
4  echo(payload: string) {
5    return this.request<string>({ type: 'echo', payload });
6  }
7  getTime() {
8    return this.request<string>({ type: 'getTime' });
9  }
  • 這些是用於基本通訊測試與取得目前時間的工具方法。
1  set(key: string, value: unknown) {
2    return this.request<boolean>({ type: 'set', key, value });
3  }
4  get<T = unknown>(key: string) {
5    return this.request<T>({ type: 'get', key });
6  }
  • 這些是用於儲存與取得鍵值對的方法。
1  broadcast(channel: string, payload: unknown) {
2    return this.request<boolean>({ type: 'broadcast', channel, payload });
3  }
  • 這是透過 worker 向其他用戶端傳送廣播訊息的方法。
1  lockAcquire(key: string, timeoutMs?: number) {
2    return this.request<{ owner: string }>({ type: 'lock.acquire', key, timeoutMs });
3  }
4  lockRelease(key: string) {
5    return this.request<boolean>({ type: 'lock.release', key });
6  }
7}
  • 這些是用於取得與釋放鎖,以對共享資源達成互斥的方法。
  • 在本檔案中,實作了讓各瀏覽器分頁與 Shared Worker 之間進行安全非同步通訊的用戶端 API。

使用範例

demo.ts 中,我們使用先前建立的 SharedWorkerClient 類別並驗證其行為。它會依序執行一系列函式,包括通訊測試、資料讀寫、廣播以及鎖的處理。

 1// demo.ts
 2import { SharedWorkerClient } from './shared-worker-client.js';
 3
 4const client = new SharedWorkerClient(new URL('./shared-worker.js', import.meta.url), 'app-shared', (b) => {
 5  console.log('[BROADCAST]', b.channel, JSON.stringify(b.payload));
 6});
 7
 8async function demo() {
 9  console.log('ping:', await client.ping());
10  console.log('echo:', await client.echo('hello'));
11  console.log('time:', await client.getTime());
12
13  // Counter update
14  await client.set('counter', (await client.get<number>('counter')) ?? 0);
15  const c1 = await client.get<number>('counter');
16  await client.set('counter', (c1 ?? 0) + 1);
17  console.log('counter:', await client.get<number>('counter'));
18
19  // Broadcast test
20  await client.broadcast('notify', { msg: 'Counter updated' });
21
22  // Lock test
23  const key = 'critical';
24  console.log(`[lock] Trying to acquire lock: "${key}"`);
25  const lockResult = await client.lockAcquire(key, 2000);
26  console.log(`[lock] Lock acquired for key "${key}":`, lockResult);
27
28  try {
29    console.log(`[lock] Simulating critical section for key "${key}"...`);
30    await new Promise((r) => setTimeout(r, 250));
31    console.log(`[lock] Critical section completed for key "${key}"`);
32  } finally {
33    console.log(`[lock] Releasing lock: "${key}"`);
34    const releaseResult = await client.lockRelease(key);
35    console.log(`[lock] Lock released for key "${key}":`, releaseResult);
36  }
37}
38
39demo().catch(console.error);
  • 此程式碼是一個示範,使用 Shared Worker 在多個瀏覽器分頁之間共享與同步資料與狀態。透過以訊息為基礎的通訊,你可以以低耦合且安全的方式交換非同步訊息,讓管理不同上下文之間的通訊更容易。此外,使用 RPC 可將與 worker 的通訊抽象為直覺、類方法呼叫的風格,提升可維護性與可讀性。

在 HTML 中測試

 1<!DOCTYPE html>
 2<html lang="en">
 3<head>
 4  <meta charset="UTF-8" />
 5  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
 6  <title>Shared Worker Demo</title>
 7</head>
 8<body>
 9  <h1>SharedWorker Demo</h1>
10  <p>Open the browser console to see the output.</p>
11
12  <script type="module">
13    import './demo.js';
14  </script>
15</body>
16</html>

設計與運作考量

在設計與操作時,留意以下要點將有助於建立更健壯且具擴充性的系統。

  • 你可以採用可判別聯合(標記聯合),以便根據 kindtype 進行分支。
  • 使用關聯 ID來正確對應請求與回應。
  • 心跳與自動清理可以防止遺留的鎖。
  • 實作版本化,以彈性因應未來的協議變更。
  • 定義明確的錯誤代碼能讓 UI 端的處理與除錯更容易。

總結

Shared Worker 是在多個瀏覽器分頁之間共享資料與狀態的核心機制。

此處介紹的架構提供了型別安全的 RPC 通訊、透過心跳的存活監控與鎖定機制,是可直接在生產環境中使用的穩健設計。

在此機制之上,你也可以實作以下應用。

  • 序列化 IndexedDB 存取
  • WebSocket 連線的整合與共用
  • 建立跨多個分頁的工作佇列
  • 節流與進度通知傳遞

如你所見,運用 Shared Worker 能在多個分頁之間安全且高效地共享資料與處理。

您可以在我們的 YouTube 頻道上使用 Visual Studio Code 來跟隨上述文章一起學習。 請也查看我們的 YouTube 頻道。

YouTube Video