TypeScript 中的 `Shared Worker`
本文介绍 TypeScript 中的 Shared Worker。
我们将通过 TypeScript 代码示例,详细说明 Shared Worker 的工作原理及其实践用法。
YouTube Video
TypeScript 中的 Shared Worker
Shared Worker 是在同一源下可被多个标签页、窗口与 iframe 共享的单个 Worker 进程。通过它,你可以在多个浏览器标签页之间处理共享的状态与资源。
例如,你可以高效实现共享的 WebSocket 连接、跨标签页同步的缓存与队列处理以及互斥机制。
与 Dedicated Worker 不同,Shared Worker 通过 onconnect 事件接收多个 MessagePort,并可与多个客户端进行多路复用通信。
应选择 Shared Worker 的场景
在以下情况中,使用 Shared Worker 是合适的。
- 当你需要跨标签页的共享状态或互斥时
- 当你希望共享单个 WebSocket 连接或 IndexedDB 访问时
- 当你需要通知所有标签页(广播)时
- 当你希望集中重型计算以节省资源时
相反,在以下情况中,其他方式更为合适。
- 当你需要缓存控制或离线支持时,可以使用
Service Worker。 - 对于仅限单个标签页的重型处理,可以使用
Dedicated Worker。
Shared Worker 的实现步骤
接下来,我们将使用 TypeScript 按步骤实现以下内容。
- 类型安全的消息协议
- 基于 Promise 的请求/响应(RPC)
- 向所有标签页广播
- 心跳与客户端清理
环境准备
创建用于编译每个使用 Shared Worker 的源文件的配置。
1{
2 "compilerOptions": {
3 "target": "ES2020",
4 "module": "ES2020",
5 "lib": ["ES2020", "dom", "WebWorker"],
6 "moduleResolution": "Bundler",
7 "strict": true,
8 "noEmitOnError": true,
9 "outDir": "out",
10 "skipLibCheck": true
11 },
12 "include": ["src"]
13}- 在
tsconfig-src.json中,启用 DOM 和 Web Worker 的类型定义,以便安全地编译代码。
定义消息协议
通信的基础是带类型的消息约定。预先定义它可以使后续通信更安全、也更易扩展。
1// worker-protocol.ts
2// Define discriminated unions for structured messages
3
4export type RequestAction =
5 | { type: 'ping' }
6 | { type: 'echo'; payload: string }
7 | { type: 'getTime' }
8 | { type: 'set'; key: string; value: unknown }
9 | { type: 'get'; key: string }
10 | { type: 'broadcast'; channel: string; payload: unknown }
11 | { type: 'lock.acquire'; key: string; timeoutMs?: number }
12 | { type: 'lock.release'; key: string };RequestAction是一种可区分联合(标记联合),用于表示发送到 worker 的请求类型,并定义了诸如ping、get和broadcast等操作。
1export interface RequestMessage {
2 kind: 'request';
3 id: string;
4 from: string;
5 action: RequestAction;
6}RequestMessage定义了客户端发送给 worker 的请求消息结构。
1export interface ResponseMessage {
2 kind: 'response';
3 id: string;
4 ok: boolean;
5 result?: unknown;
6 error?: string;
7}ResponseMessage定义了 worker 返回给客户端的响应消息结构。
1export interface BroadcastMessage {
2 kind: 'broadcast';
3 channel: string;
4 payload: unknown;
5 from: string;
6}BroadcastMessage定义了 worker 发送给其他客户端的广播消息结构。
1export type WorkerInMessage =
2 | RequestMessage
3 | { kind: 'heartbeat'; from: string }
4 | { kind: 'bye'; from: string };WorkerInMessage是一个类型,表示 worker 接收的所有消息,例如请求、心跳和断开连接通知。
1export type WorkerOutMessage = ResponseMessage | BroadcastMessage;WorkerOutMessage是一个类型,表示 worker 发送给客户端的响应或广播消息。
1export const randomId = () => Math.random().toString(36).slice(2);randomId是一个函数,用于生成随机的字母数字字符串,可用作消息 ID 等。
实现 Shared Worker
在 shared-worker.ts 中,注册通过 onconnect 事件连接的各个标签页,并处理消息。
1// shared-worker.ts
2/// <reference lib="webworker" />
- 该指令让 TypeScript 加载 Web Worker 的类型定义。
1import {
2 WorkerInMessage,
3 WorkerOutMessage,
4 RequestMessage,
5 ResponseMessage,
6} from './worker-protocol.js';- 导入用于 worker 通信的类型定义。
1export default {};
2declare const self: SharedWorkerGlobalScope;- 显式声明
self是Shared Worker的全局作用域。
1type Client = {
2 id: string;
3 port: MessagePort;
4 lastBeat: number;
5};Client是一种类型,表示每个客户端的标识符、通信端口以及最近一次心跳的时间戳。
1const clients = new Map<string, Client>();
2const kv = new Map<string, unknown>();
3const locks = new Map<string, string>();
4const HEARTBEAT_TIMEOUT = 30_000;- 管理已连接客户端列表、键值存储、锁状态和超时时间。
1function send(port: MessagePort, msg: WorkerOutMessage) {
2 port.postMessage(msg);
3}send是一个工具函数,用于向指定端口发送消息。
1function respond(req: RequestMessage, ok: boolean, result?: unknown, error?: string): ResponseMessage {
2 return { kind: 'response', id: req.id, ok, result, error };
3}respond为请求生成响应消息。
1function broadcast(from: string, channel: string, payload: unknown) {
2 for (const [id, c] of clients) {
3 send(c.port, { kind: 'broadcast', channel, payload, from });
4 }
5}broadcast在指定通道向所有客户端发送消息。
1function handleRequest(clientId: string, port: MessagePort, req: RequestMessage) {handleRequest按类型处理传入请求,并将结果返回给客户端。
1 const { action } = req;
2 try {
3 switch (action.type) {
4 case 'ping':
5 send(port, respond(req, true, 'pong'));
6 break;
7 case 'echo':
8 send(port, respond(req, true, action.payload));
9 break;
10 case 'getTime':
11 send(port, respond(req, true, new Date().toISOString()));
12 break;
13 case 'set':
14 kv.set(action.key, action.value);
15 send(port, respond(req, true, true));
16 break;
17 case 'get':
18 send(port, respond(req, true, kv.get(action.key)));
19 break;
20 case 'broadcast':
21 broadcast(clientId, action.channel, action.payload);
22 send(port, respond(req, true, true));
23 break;- 在这段代码中,根据接收到的请求类型,处理消息的收发、数据的读取与保存以及广播。
1 case 'lock.acquire': {
2 const owner = locks.get(action.key);
3 if (!owner) {
4 locks.set(action.key, clientId);
5 send(port, respond(req, true, { owner: clientId }));
6 } else if (owner === clientId) {
7 send(port, respond(req, true, { owner }));
8 } else {
9 const start = Date.now();
10 const tryWait = () => {
11 const current = locks.get(action.key);
12 if (!current) {
13 locks.set(action.key, clientId);
14 send(port, respond(req, true, { owner: clientId }));
15 } else if ((action.timeoutMs ?? 5000) < Date.now() - start) {
16 send(port, respond(req, false, undefined, 'lock-timeout'));
17 } else {
18 setTimeout(tryWait, 25);
19 }
20 };
21 tryWait();
22 }
23 break;
24 }- 此代码实现了客户端为指定键获取锁的流程。如果该锁尚未被持有,会立即获取;如果同一客户端再次请求,也同样视为成功。如果已有其他客户端持有该锁,则每 25 毫秒重试一次,直到锁被释放;若超过指定的超时时间(默认 5 秒),则返回错误。
1 case 'lock.release':
2 if (locks.get(action.key) === clientId) {
3 locks.delete(action.key);
4 send(port, respond(req, true, true));
5 } else {
6 send(port, respond(req, false, undefined, 'not-owner'));
7 }
8 break;
9 default:
10 send(port, respond(req, false, undefined, 'unknown-action'));
11 }
12 } catch (e: any) {
13 send(port, respond(req, false, undefined, e?.message ?? 'error'));
14 }
15}- 这段代码释放客户端持有的锁;如果客户端缺少权限或操作未知,则返回错误响应。
1function handleInMessage(c: Client, msg: WorkerInMessage) {
2 if (msg.kind === 'heartbeat') {
3 c.lastBeat = Date.now();
4 } else if (msg.kind === 'bye') {
5 cleanupClient(msg.from);
6 } else if (msg.kind === 'request') {
7 handleRequest(c.id, c.port, msg);
8 }
9}handleInMessage解析来自客户端的消息,并处理请求与心跳。
1function cleanupClient(clientId: string) {
2 for (const [key, owner] of locks) {
3 if (owner === clientId) locks.delete(key);
4 }
5 clients.delete(clientId);
6}cleanupClient从注册表和锁状态中移除已断开连接的客户端。
1setInterval(() => {
2 const now = Date.now();
3 for (const [id, c] of clients) {
4 if (now - c.lastBeat > HEARTBEAT_TIMEOUT) cleanupClient(id);
5 }
6}, 10_000);- 使用
setInterval定期检查所有客户端的心跳,并清理已超时的连接。
1self.onconnect = (e: MessageEvent) => {
2 const port = (e.ports && e.ports[0]) as MessagePort;
3 const clientId = crypto.randomUUID?.() ?? Math.random().toString(36).slice(2);
4 const client: Client = { id: clientId, port, lastBeat: Date.now() };
5 clients.set(clientId, client);
6
7 port.addEventListener('message', (ev) => handleInMessage(client, ev.data as WorkerInMessage));
8 send(port, { kind: 'broadcast', channel: 'system', payload: { hello: true, clientId }, from: 'worker' });
9 port.start();
10};-
当新标签页或页面连接到
Shared Worker时,会调用onconnect,注册客户端并开始通信。 -
在整个文件中,实现了
Shared Worker的基本机制,从而在多个浏览器标签页之间实现共享状态管理与通信。
客户端封装(RPC)
接下来,创建一个基于 Promise 的 RPC 客户端。
1// shared-worker-client.ts
2import {
3 RequestAction,
4 RequestMessage,
5 WorkerOutMessage,
6 randomId
7} from './worker-protocol.js';- 导入用于 worker 通信的类型定义与工具函数。
1export type BroadcastHandler = (msg: {
2 channel: string;
3 payload: unknown;
4 from: string
5}) => void;- 这里定义接收到广播消息时运行的回调函数的类型。
1export class SharedWorkerClient {SharedWorkerClient是一个与Shared Worker通信的客户端类,用于发送请求并处理响应。
1 private worker: SharedWorker;
2 private port: MessagePort;
3 private pending = new Map<string, {
4 resolve: (v: any) => void;
5 reject: (e: any) => void
6 }>();- 这些变量分别是 worker 实例、与 worker 的通信端口,以及用于跟踪等待响应请求的映射。
1 private clientId = randomId();
2 private heartbeatTimer?: number;
3 private onBroadcast?: BroadcastHandler;- 这些变量保存客户端标识符、发送心跳的定时器,以及广播接收处理器。
1 constructor(url: URL, name = 'app-shared', onBroadcast?: BroadcastHandler) {
2 this.worker = new SharedWorker(url, { name, type: 'module' as any });
3 this.port = this.worker.port;
4 this.onBroadcast = onBroadcast;- 在构造函数中,初始化与
Shared Worker的连接,并设置消息监听与心跳发送。
1 this.port.addEventListener('message', (ev) => {
2 const msg = ev.data as WorkerOutMessage;
3 if (msg.kind === 'response') {
4 const p = this.pending.get(msg.id);
5 if (p) {
6 this.pending.delete(msg.id);
7 msg.ok ? p.resolve(msg.result) : p.reject(new Error(msg.error || 'error'));
8 }
9 } else if (msg.kind === 'broadcast') {
10 this.onBroadcast?.(msg);
11 }
12 });
13 this.port.start();- 在这里接收来自 worker 的消息,并处理响应或广播。
1 this.heartbeatTimer = window.setInterval(() => {
2 this.port.postMessage({ kind: 'heartbeat', from: this.clientId });
3 }, 10_000);- 定期发送心跳消息以保持连接存活。
1 window.addEventListener('beforeunload', () => {
2 try {
3 this.port.postMessage({ kind: 'bye', from: this.clientId });
4 } catch {}
5 if (this.heartbeatTimer) clearInterval(this.heartbeatTimer);
6 });
7 }- 在窗口关闭前向 worker 发送断开连接通知。
1 request<T = unknown>(action: RequestAction): Promise<T> {
2 const id = randomId();
3 return new Promise<T>((resolve, reject) => {
4 this.pending.set(id, { resolve, reject });
5 this.port.postMessage({ kind: 'request', id, from: this.clientId, action });
6 });
7 }request方法向 worker 发送指定的操作,并以Promise形式接收结果。
1 ping() {
2 return this.request<string>({ type: 'ping' });
3 }
4 echo(payload: string) {
5 return this.request<string>({ type: 'echo', payload });
6 }
7 getTime() {
8 return this.request<string>({ type: 'getTime' });
9 }- 这些是用于基本通信测试和获取当前时间的工具方法。
1 set(key: string, value: unknown) {
2 return this.request<boolean>({ type: 'set', key, value });
3 }
4 get<T = unknown>(key: string) {
5 return this.request<T>({ type: 'get', key });
6 }- 这些是用于存取键值对的方法。
1 broadcast(channel: string, payload: unknown) {
2 return this.request<boolean>({ type: 'broadcast', channel, payload });
3 }- 这是一个通过 worker 向其他客户端发送广播消息的方法。
1 lockAcquire(key: string, timeoutMs?: number) {
2 return this.request<{ owner: string }>({ type: 'lock.acquire', key, timeoutMs });
3 }
4 lockRelease(key: string) {
5 return this.request<boolean>({ type: 'lock.release', key });
6 }
7}- 这些是用于获取和释放锁以对共享资源实现互斥的方法。
- 在整个文件中,实现了一个客户端 API,使每个浏览器标签页都能与
Shared Worker进行安全的异步通信。
使用示例
在 demo.ts 中,我们使用先前创建的 SharedWorkerClient 类并验证其行为。它按顺序执行一系列函数,包括通信测试、读写数据、广播以及锁的处理。
1// demo.ts
2import { SharedWorkerClient } from './shared-worker-client.js';
3
4const client = new SharedWorkerClient(new URL('./shared-worker.js', import.meta.url), 'app-shared', (b) => {
5 console.log('[BROADCAST]', b.channel, JSON.stringify(b.payload));
6});
7
8async function demo() {
9 console.log('ping:', await client.ping());
10 console.log('echo:', await client.echo('hello'));
11 console.log('time:', await client.getTime());
12
13 // Counter update
14 await client.set('counter', (await client.get<number>('counter')) ?? 0);
15 const c1 = await client.get<number>('counter');
16 await client.set('counter', (c1 ?? 0) + 1);
17 console.log('counter:', await client.get<number>('counter'));
18
19 // Broadcast test
20 await client.broadcast('notify', { msg: 'Counter updated' });
21
22 // Lock test
23 const key = 'critical';
24 console.log(`[lock] Trying to acquire lock: "${key}"`);
25 const lockResult = await client.lockAcquire(key, 2000);
26 console.log(`[lock] Lock acquired for key "${key}":`, lockResult);
27
28 try {
29 console.log(`[lock] Simulating critical section for key "${key}"...`);
30 await new Promise((r) => setTimeout(r, 250));
31 console.log(`[lock] Critical section completed for key "${key}"`);
32 } finally {
33 console.log(`[lock] Releasing lock: "${key}"`);
34 const releaseResult = await client.lockRelease(key);
35 console.log(`[lock] Lock released for key "${key}":`, releaseResult);
36 }
37}
38
39demo().catch(console.error);- 该代码是一个演示,使用
Shared Worker在多个浏览器标签页之间共享并同步数据与状态。通过基于消息的通信,你可以以松耦合的方式安全地交换异步消息,从而更易于管理不同上下文之间的通信。此外,通过使用 RPC,以直观的类似方法调用的风格抽象了与 worker 的通信,提升了可维护性和可读性。
在 HTML 中进行测试
1<!DOCTYPE html>
2<html lang="en">
3<head>
4 <meta charset="UTF-8" />
5 <meta name="viewport" content="width=device-width, initial-scale=1.0" />
6 <title>Shared Worker Demo</title>
7</head>
8<body>
9 <h1>SharedWorker Demo</h1>
10 <p>Open the browser console to see the output.</p>
11
12 <script type="module">
13 import './demo.js';
14 </script>
15</body>
16</html>设计与运维注意事项
在设计和运维时,牢记以下要点将有助于构建更健壮、可扩展的系统。
- 你可以采用一种可区分联合(标记联合),它允许基于
kind或type进行分支。 - 使用关联 ID 以正确匹配请求与响应。
- 通过心跳与自动清理可以防止遗留锁。
- 实施版本化,以灵活适应未来的协议变更。
- 定义清晰的错误码可以让 UI 端的处理和调试更容易。
总结
Shared Worker 是在多个浏览器标签页之间共享数据与状态的核心机制。
这里介绍的结构提供了类型安全的 RPC 通信、通过心跳进行存活监测以及加锁机制,使其成为可直接用于生产的健壮设计。
基于此机制,你还可以实现以下应用。
- 串行化 IndexedDB 访问
- WebSocket 连接的整合与共享
- 构建跨多个标签页的任务队列
- 节流与进度通知分发
由此可见,借助 Shared Worker 可以在多个标签页之间安全且高效地共享数据与处理。
您可以在我们的YouTube频道上使用Visual Studio Code跟随上述文章进行学习。 请也查看我们的YouTube频道。