TypeScript 中的 `Shared Worker`

TypeScript 中的 `Shared Worker`

本文介绍 TypeScript 中的 Shared Worker

我们将通过 TypeScript 代码示例,详细说明 Shared Worker 的工作原理及其实践用法。

YouTube Video

TypeScript 中的 Shared Worker

Shared Worker在同一源下可被多个标签页、窗口与 iframe 共享的单个 Worker 进程。通过它,你可以在多个浏览器标签页之间处理共享的状态与资源。

例如,你可以高效实现共享的 WebSocket 连接跨标签页同步的缓存与队列处理以及互斥机制

Dedicated Worker 不同,Shared Worker 通过 onconnect 事件接收多个 MessagePort,并可与多个客户端进行多路复用通信。

应选择 Shared Worker 的场景

在以下情况中,使用 Shared Worker 是合适的。

  • 当你需要跨标签页的共享状态或互斥时
  • 当你希望共享单个 WebSocket 连接或 IndexedDB 访问时
  • 当你需要通知所有标签页(广播)时
  • 当你希望集中重型计算以节省资源时

相反,在以下情况中,其他方式更为合适。

  • 当你需要缓存控制或离线支持时,可以使用 Service Worker
  • 对于仅限单个标签页的重型处理,可以使用 Dedicated Worker

Shared Worker 的实现步骤

接下来,我们将使用 TypeScript 按步骤实现以下内容。

  • 类型安全的消息协议
  • 基于 Promise 的请求/响应(RPC)
  • 向所有标签页广播
  • 心跳与客户端清理

环境准备

创建用于编译每个使用 Shared Worker 的源文件的配置。

 1{
 2  "compilerOptions": {
 3    "target": "ES2020",
 4    "module": "ES2020",
 5    "lib": ["ES2020", "dom", "WebWorker"],
 6    "moduleResolution": "Bundler",
 7    "strict": true,
 8    "noEmitOnError": true,
 9    "outDir": "out",
10    "skipLibCheck": true
11  },
12  "include": ["src"]
13}
  • tsconfig-src.json 中,启用 DOM 和 Web Worker 的类型定义,以便安全地编译代码。

定义消息协议

通信的基础是带类型的消息约定。预先定义它可以使后续通信更安全、也更易扩展。

 1// worker-protocol.ts
 2// Define discriminated unions for structured messages
 3
 4export type RequestAction =
 5  | { type: 'ping' }
 6  | { type: 'echo'; payload: string }
 7  | { type: 'getTime' }
 8  | { type: 'set'; key: string; value: unknown }
 9  | { type: 'get'; key: string }
10  | { type: 'broadcast'; channel: string; payload: unknown }
11  | { type: 'lock.acquire'; key: string; timeoutMs?: number }
12  | { type: 'lock.release'; key: string };
  • RequestAction 是一种可区分联合(标记联合),用于表示发送到 worker 的请求类型,并定义了诸如 pinggetbroadcast 等操作。
1export interface RequestMessage {
2  kind: 'request';
3  id: string;
4  from: string;
5  action: RequestAction;
6}
  • RequestMessage 定义了客户端发送给 worker 的请求消息结构。
1export interface ResponseMessage {
2  kind: 'response';
3  id: string;
4  ok: boolean;
5  result?: unknown;
6  error?: string;
7}
  • ResponseMessage 定义了 worker 返回给客户端的响应消息结构。
1export interface BroadcastMessage {
2  kind: 'broadcast';
3  channel: string;
4  payload: unknown;
5  from: string;
6}
  • BroadcastMessage 定义了 worker 发送给其他客户端的广播消息结构。
1export type WorkerInMessage =
2  | RequestMessage
3  | { kind: 'heartbeat'; from: string }
4  | { kind: 'bye'; from: string };
  • WorkerInMessage 是一个类型,表示 worker 接收的所有消息,例如请求、心跳和断开连接通知。
1export type WorkerOutMessage = ResponseMessage | BroadcastMessage;
  • WorkerOutMessage 是一个类型,表示 worker 发送给客户端的响应或广播消息。
1export const randomId = () => Math.random().toString(36).slice(2);
  • randomId 是一个函数,用于生成随机的字母数字字符串,可用作消息 ID 等。

实现 Shared Worker

shared-worker.ts 中,注册通过 onconnect 事件连接的各个标签页,并处理消息。

1// shared-worker.ts
2/// <reference lib="webworker" />
  • 该指令让 TypeScript 加载 Web Worker 的类型定义。
1import {
2  WorkerInMessage,
3  WorkerOutMessage,
4  RequestMessage,
5  ResponseMessage,
6} from './worker-protocol.js';
  • 导入用于 worker 通信的类型定义。
1export default {};
2declare const self: SharedWorkerGlobalScope;
  • 显式声明 selfShared Worker 的全局作用域。
1type Client = {
2  id: string;
3  port: MessagePort;
4  lastBeat: number;
5};
  • Client 是一种类型,表示每个客户端的标识符、通信端口以及最近一次心跳的时间戳。
1const clients = new Map<string, Client>();
2const kv = new Map<string, unknown>();
3const locks = new Map<string, string>();
4const HEARTBEAT_TIMEOUT = 30_000;
  • 管理已连接客户端列表、键值存储、锁状态和超时时间。
1function send(port: MessagePort, msg: WorkerOutMessage) {
2  port.postMessage(msg);
3}
  • send 是一个工具函数,用于向指定端口发送消息。
1function respond(req: RequestMessage, ok: boolean, result?: unknown, error?: string): ResponseMessage {
2  return { kind: 'response', id: req.id, ok, result, error };
3}
  • respond 为请求生成响应消息。
1function broadcast(from: string, channel: string, payload: unknown) {
2  for (const [id, c] of clients) {
3    send(c.port, { kind: 'broadcast', channel, payload, from });
4  }
5}
  • broadcast 在指定通道向所有客户端发送消息。
1function handleRequest(clientId: string, port: MessagePort, req: RequestMessage) {
  • handleRequest 按类型处理传入请求,并将结果返回给客户端。
 1  const { action } = req;
 2  try {
 3    switch (action.type) {
 4      case 'ping':
 5        send(port, respond(req, true, 'pong'));
 6        break;
 7      case 'echo':
 8        send(port, respond(req, true, action.payload));
 9        break;
10      case 'getTime':
11        send(port, respond(req, true, new Date().toISOString()));
12        break;
13      case 'set':
14        kv.set(action.key, action.value);
15        send(port, respond(req, true, true));
16        break;
17      case 'get':
18        send(port, respond(req, true, kv.get(action.key)));
19        break;
20      case 'broadcast':
21        broadcast(clientId, action.channel, action.payload);
22        send(port, respond(req, true, true));
23        break;
  • 在这段代码中,根据接收到的请求类型,处理消息的收发、数据的读取与保存以及广播。
 1      case 'lock.acquire': {
 2        const owner = locks.get(action.key);
 3        if (!owner) {
 4          locks.set(action.key, clientId);
 5          send(port, respond(req, true, { owner: clientId }));
 6        } else if (owner === clientId) {
 7          send(port, respond(req, true, { owner }));
 8        } else {
 9          const start = Date.now();
10          const tryWait = () => {
11            const current = locks.get(action.key);
12            if (!current) {
13              locks.set(action.key, clientId);
14              send(port, respond(req, true, { owner: clientId }));
15            } else if ((action.timeoutMs ?? 5000) < Date.now() - start) {
16              send(port, respond(req, false, undefined, 'lock-timeout'));
17            } else {
18              setTimeout(tryWait, 25);
19            }
20          };
21          tryWait();
22        }
23        break;
24      }
  • 此代码实现了客户端为指定键获取锁的流程。如果该锁尚未被持有,会立即获取;如果同一客户端再次请求,也同样视为成功。如果已有其他客户端持有该锁,则每 25 毫秒重试一次,直到锁被释放;若超过指定的超时时间(默认 5 秒),则返回错误。
 1      case 'lock.release':
 2        if (locks.get(action.key) === clientId) {
 3          locks.delete(action.key);
 4          send(port, respond(req, true, true));
 5        } else {
 6          send(port, respond(req, false, undefined, 'not-owner'));
 7        }
 8        break;
 9      default:
10        send(port, respond(req, false, undefined, 'unknown-action'));
11    }
12  } catch (e: any) {
13    send(port, respond(req, false, undefined, e?.message ?? 'error'));
14  }
15}
  • 这段代码释放客户端持有的锁;如果客户端缺少权限或操作未知,则返回错误响应。
1function handleInMessage(c: Client, msg: WorkerInMessage) {
2  if (msg.kind === 'heartbeat') {
3    c.lastBeat = Date.now();
4  } else if (msg.kind === 'bye') {
5    cleanupClient(msg.from);
6  } else if (msg.kind === 'request') {
7    handleRequest(c.id, c.port, msg);
8  }
9}
  • handleInMessage 解析来自客户端的消息,并处理请求与心跳。
1function cleanupClient(clientId: string) {
2  for (const [key, owner] of locks) {
3    if (owner === clientId) locks.delete(key);
4  }
5  clients.delete(clientId);
6}
  • cleanupClient 从注册表和锁状态中移除已断开连接的客户端。
1setInterval(() => {
2  const now = Date.now();
3  for (const [id, c] of clients) {
4    if (now - c.lastBeat > HEARTBEAT_TIMEOUT) cleanupClient(id);
5  }
6}, 10_000);
  • 使用 setInterval 定期检查所有客户端的心跳,并清理已超时的连接。
 1self.onconnect = (e: MessageEvent) => {
 2  const port = (e.ports && e.ports[0]) as MessagePort;
 3  const clientId = crypto.randomUUID?.() ?? Math.random().toString(36).slice(2);
 4  const client: Client = { id: clientId, port, lastBeat: Date.now() };
 5  clients.set(clientId, client);
 6
 7  port.addEventListener('message', (ev) => handleInMessage(client, ev.data as WorkerInMessage));
 8  send(port, { kind: 'broadcast', channel: 'system', payload: { hello: true, clientId }, from: 'worker' });
 9  port.start();
10};
  • 当新标签页或页面连接到 Shared Worker 时,会调用 onconnect,注册客户端并开始通信。

  • 在整个文件中,实现了 Shared Worker 的基本机制,从而在多个浏览器标签页之间实现共享状态管理与通信。

客户端封装(RPC)

接下来,创建一个基于 Promise 的 RPC 客户端。

1// shared-worker-client.ts
2import {
3  RequestAction,
4  RequestMessage,
5  WorkerOutMessage,
6  randomId
7} from './worker-protocol.js';
  • 导入用于 worker 通信的类型定义与工具函数。
1export type BroadcastHandler = (msg: {
2  channel: string;
3  payload: unknown;
4  from: string
5}) => void;
  • 这里定义接收到广播消息时运行的回调函数的类型。
1export class SharedWorkerClient {
  • SharedWorkerClient 是一个与 Shared Worker 通信的客户端类,用于发送请求并处理响应。
1  private worker: SharedWorker;
2  private port: MessagePort;
3  private pending = new Map<string, {
4    resolve: (v: any) => void;
5    reject: (e: any) => void
6  }>();
  • 这些变量分别是 worker 实例、与 worker 的通信端口,以及用于跟踪等待响应请求的映射。
1  private clientId = randomId();
2  private heartbeatTimer?: number;
3  private onBroadcast?: BroadcastHandler;
  • 这些变量保存客户端标识符、发送心跳的定时器,以及广播接收处理器。
1  constructor(url: URL, name = 'app-shared', onBroadcast?: BroadcastHandler) {
2    this.worker = new SharedWorker(url, { name, type: 'module' as any });
3    this.port = this.worker.port;
4    this.onBroadcast = onBroadcast;
  • 在构造函数中,初始化与 Shared Worker 的连接,并设置消息监听与心跳发送。
 1    this.port.addEventListener('message', (ev) => {
 2      const msg = ev.data as WorkerOutMessage;
 3      if (msg.kind === 'response') {
 4        const p = this.pending.get(msg.id);
 5        if (p) {
 6          this.pending.delete(msg.id);
 7          msg.ok ? p.resolve(msg.result) : p.reject(new Error(msg.error || 'error'));
 8        }
 9      } else if (msg.kind === 'broadcast') {
10        this.onBroadcast?.(msg);
11      }
12    });
13    this.port.start();
  • 在这里接收来自 worker 的消息,并处理响应或广播。
1    this.heartbeatTimer = window.setInterval(() => {
2      this.port.postMessage({ kind: 'heartbeat', from: this.clientId });
3    }, 10_000);
  • 定期发送心跳消息以保持连接存活。
1    window.addEventListener('beforeunload', () => {
2      try {
3        this.port.postMessage({ kind: 'bye', from: this.clientId });
4      } catch {}
5      if (this.heartbeatTimer) clearInterval(this.heartbeatTimer);
6    });
7  }
  • 在窗口关闭前向 worker 发送断开连接通知。
1  request<T = unknown>(action: RequestAction): Promise<T> {
2    const id = randomId();
3    return new Promise<T>((resolve, reject) => {
4      this.pending.set(id, { resolve, reject });
5      this.port.postMessage({ kind: 'request', id, from: this.clientId, action });
6    });
7  }
  • request 方法向 worker 发送指定的操作,并以 Promise 形式接收结果。
1  ping() {
2    return this.request<string>({ type: 'ping' });
3  }
4  echo(payload: string) {
5    return this.request<string>({ type: 'echo', payload });
6  }
7  getTime() {
8    return this.request<string>({ type: 'getTime' });
9  }
  • 这些是用于基本通信测试和获取当前时间的工具方法。
1  set(key: string, value: unknown) {
2    return this.request<boolean>({ type: 'set', key, value });
3  }
4  get<T = unknown>(key: string) {
5    return this.request<T>({ type: 'get', key });
6  }
  • 这些是用于存取键值对的方法。
1  broadcast(channel: string, payload: unknown) {
2    return this.request<boolean>({ type: 'broadcast', channel, payload });
3  }
  • 这是一个通过 worker 向其他客户端发送广播消息的方法。
1  lockAcquire(key: string, timeoutMs?: number) {
2    return this.request<{ owner: string }>({ type: 'lock.acquire', key, timeoutMs });
3  }
4  lockRelease(key: string) {
5    return this.request<boolean>({ type: 'lock.release', key });
6  }
7}
  • 这些是用于获取和释放锁以对共享资源实现互斥的方法。
  • 在整个文件中,实现了一个客户端 API,使每个浏览器标签页都能与 Shared Worker 进行安全的异步通信。

使用示例

demo.ts 中,我们使用先前创建的 SharedWorkerClient 类并验证其行为。它按顺序执行一系列函数,包括通信测试、读写数据、广播以及锁的处理。

 1// demo.ts
 2import { SharedWorkerClient } from './shared-worker-client.js';
 3
 4const client = new SharedWorkerClient(new URL('./shared-worker.js', import.meta.url), 'app-shared', (b) => {
 5  console.log('[BROADCAST]', b.channel, JSON.stringify(b.payload));
 6});
 7
 8async function demo() {
 9  console.log('ping:', await client.ping());
10  console.log('echo:', await client.echo('hello'));
11  console.log('time:', await client.getTime());
12
13  // Counter update
14  await client.set('counter', (await client.get<number>('counter')) ?? 0);
15  const c1 = await client.get<number>('counter');
16  await client.set('counter', (c1 ?? 0) + 1);
17  console.log('counter:', await client.get<number>('counter'));
18
19  // Broadcast test
20  await client.broadcast('notify', { msg: 'Counter updated' });
21
22  // Lock test
23  const key = 'critical';
24  console.log(`[lock] Trying to acquire lock: "${key}"`);
25  const lockResult = await client.lockAcquire(key, 2000);
26  console.log(`[lock] Lock acquired for key "${key}":`, lockResult);
27
28  try {
29    console.log(`[lock] Simulating critical section for key "${key}"...`);
30    await new Promise((r) => setTimeout(r, 250));
31    console.log(`[lock] Critical section completed for key "${key}"`);
32  } finally {
33    console.log(`[lock] Releasing lock: "${key}"`);
34    const releaseResult = await client.lockRelease(key);
35    console.log(`[lock] Lock released for key "${key}":`, releaseResult);
36  }
37}
38
39demo().catch(console.error);
  • 该代码是一个演示,使用 Shared Worker 在多个浏览器标签页之间共享并同步数据与状态。通过基于消息的通信,你可以以松耦合的方式安全地交换异步消息,从而更易于管理不同上下文之间的通信。此外,通过使用 RPC,以直观的类似方法调用的风格抽象了与 worker 的通信,提升了可维护性和可读性。

在 HTML 中进行测试

 1<!DOCTYPE html>
 2<html lang="en">
 3<head>
 4  <meta charset="UTF-8" />
 5  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
 6  <title>Shared Worker Demo</title>
 7</head>
 8<body>
 9  <h1>SharedWorker Demo</h1>
10  <p>Open the browser console to see the output.</p>
11
12  <script type="module">
13    import './demo.js';
14  </script>
15</body>
16</html>

设计与运维注意事项

在设计和运维时,牢记以下要点将有助于构建更健壮、可扩展的系统。

  • 你可以采用一种可区分联合(标记联合),它允许基于 kindtype 进行分支。
  • 使用关联 ID 以正确匹配请求与响应。
  • 通过心跳与自动清理可以防止遗留锁。
  • 实施版本化,以灵活适应未来的协议变更。
  • 定义清晰的错误码可以让 UI 端的处理和调试更容易。

总结

Shared Worker 是在多个浏览器标签页之间共享数据与状态的核心机制。

这里介绍的结构提供了类型安全的 RPC 通信、通过心跳进行存活监测以及加锁机制,使其成为可直接用于生产的健壮设计。

基于此机制,你还可以实现以下应用。

  • 串行化 IndexedDB 访问
  • WebSocket 连接的整合与共享
  • 构建跨多个标签页的任务队列
  • 节流与进度通知分发

由此可见,借助 Shared Worker 可以在多个标签页之间安全且高效地共享数据与处理。

您可以在我们的YouTube频道上使用Visual Studio Code跟随上述文章进行学习。 请也查看我们的YouTube频道。

YouTube Video