Node.js 长期以来以其单线程、事件驱动的特性闻名,这一特性非常适合 I/O 密集型应用,但在处理 CPU 密集型任务时可能会力不从心。
随着 Node.js 10.5 中工作线程(Worker Threads)的发布,实现了并行处理能力,尤其适用于CPU 密集型任务和需要隔离或共享资源的场景。
入门实例
计算斐波拉契数列:
创建一个文件: fibonacci.mjs
,作为worker 线程,内容为:
javascript
import { parentPort, workerData } from 'node:worker_threads';
const fibonacci = (n) => {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
};
// parentPort 是 node:worker_threads 模块中 `Worker 线程` 特有的属性,类型为 MessagePort 或 null(非 Worker 线程时为 null)。
// 作为 Worker 线程与父线程(通常为主线程)的双向通信接口,实现跨线程的消息传递。
// Worker 线程通过 parentPort 发送 / 接收消息,父线程通过 Worker 实例的事件接口(如 worker.on('message'))与之交互。
// 本质是 MessagePort 实例
// parentPort.postMessage(value[, transferList]): value:需传递的数据,必须是可序列化的 JavaScript 值(支持循环引用、RegExp、Map、Set 等)。
// parentPort.postMessage(value[, transferList]):transferList(可选):包含 Transferable 对象(如 ArrayBuffer、MessagePort、FileHandle)的数组,用于零拷贝传输(转移所有权后发送方不可再访问)。
parentPort.postMessage(fibonacci(workerData));
// 监听父线程消息并回复
parentPort.on('message', (msg) => {
console.log('Received from main:', msg);
parentPort.postMessage('Message received!');
});
// workerData
// workerData 是 Worker 线程中的一个只读属性,类型为 任意 JavaScript 值(由父线程传递并克隆而来)。
// 作为父线程向 Worker 线程传递初始化数据的载体,用于在 Worker 启动时传递配置参数、输入数据等。
创建一个文件: fibonacciMain.mjs
,作为主线程,内容为:
javascript
import { Worker,isMainThread } from 'node:worker_threads';
let worker;
const computeFibonacci = (num) => {
return new Promise((resolve, reject) => {
worker = new Worker('./fibonacci.mjs', { workerData: num });
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) reject(new Error(`Worker stopped with exit code ${code}`));
});
});
};
computeFibonacci(30).then(result => {
console.log(`Fibonacci result: ${result}`);
if (worker) {
worker.postMessage(result);
setTimeout(() => {
worker.terminate(); // 终止worker线程
}, 1000);
}
}).catch(err => {
console.error(err);
});
console.log("This is:" + (isMainThread?' main thread':' worker thread'))
// 输出结果:
// This is: main thread
// Fibonacci result: 832040
什么是 worker_threads
我们可以打印一下worker_threads模块的内容:
javascript
import worker from 'node:worker_threads';
console.log(worker);
输出结果如果:
javascript
{
isMainThread: true,
MessagePort: [Function: MessagePort],
MessageChannel: [Function: MessageChannel],
markAsUntransferable: [Function: markAsUntransferable],
moveMessagePortToContext: [Function: moveMessagePortToContext],
receiveMessageOnPort: [Function: receiveMessageOnPort],
resourceLimits: {},
threadId: 0,
SHARE_ENV: Symbol(nodejs.worker_threads.SHARE_ENV),
Worker: [class Worker extends EventEmitter],
parentPort: null,
workerData: null,
BroadcastChannel: [class BroadcastChannel extends EventTarget],
setEnvironmentData: [Function: setEnvironmentData],
getEnvironmentData: [Function: getEnvironmentData]
}
应用场景
在 Node.js 中,worker_threads
模块主要用于解决 CPU 密集型任务 的性能问题。通过将计算密集型任务分配到独立线程,避免阻塞主线程(Event Loop),从而提升应用的吞吐量和响应速度。以下是几个典型应用场景及实例:
场景 1:图像/视频处理
问题:压缩 1000 张高分辨率图片,单线程处理会导致主线程阻塞,无法响应其他请求。
解决方案:使用 Worker 线程并行处理。
javascript
// 主线程(main.js)
const { Worker } = require('worker_threads');
const path = require('path');
const imagePaths = ['image1.jpg', 'image2.jpg', /* ... 1000个路径 */];
imagePaths.forEach((imagePath) => {
const worker = new Worker(path.resolve('./image-worker.js'), {
workerData: imagePath
});
worker.on('message', (result) => {
console.log(`压缩完成: ${result}`);
});
worker.on('error', (err) => {
console.error(`Worker 错误: ${err}`);
});
});
// Worker 线程(image-worker.js)
const { workerData, parentPort } = require('worker_threads');
const sharp = require('sharp'); // 图像处理库
async function compressImage(imagePath) {
await sharp(imagePath)
.resize(800, 600)
.jpeg({ quality: 80 })
.toFile(`compressed_${imagePath}`);
}
compressImage(workerData)
.then(() => parentPort.postMessage(workerData))
.catch(console.error);
关键点:
- 每个图片处理任务独立运行在 Worker 中。
- 主线程仅负责任务调度,不参与实际计算。
场景 2:大数据计算(如矩阵运算)
问题:计算两个 10000x10000 矩阵相乘,单线程计算耗时极长。
解决方案:将矩阵分块,多线程并行计算。
javascript
// 主线程(main.js)
const { Worker, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
const matrixA = /* 生成矩阵 A */;
const matrixB = /* 生成矩阵 B */;
const chunkSize = 1000;
// 将矩阵分块
const chunks = [];
for (let i = 0; i < matrixA.length; i += chunkSize) {
chunks.push({
a: matrixA.slice(i, i + chunkSize),
b: matrixB,
startRow: i
});
}
// 为每个分块创建 Worker
const results = new Array(matrixA.length);
chunks.forEach((chunk) => {
const worker = new Worker(__filename, { workerData: chunk });
worker.on('message', (rows) => {
rows.forEach((row, idx) => {
results[chunk.startRow + idx] = row;
});
});
});
} else {
// Worker 线程
const { multiplyMatrices } = require('./math-utils');
const chunk = workerData;
const resultChunk = multiplyMatrices(chunk.a, chunk.b);
parentPort.postMessage(resultChunk);
}
关键点:
- 数据分片后由多个 Worker 并行处理。
- 主线程合并结果。
场景 3:实时数据分析
问题:实时处理来自传感器的 10 万条数据点,需快速完成统计分析。
解决方案:多线程并行处理数据分片。
javascript
// 主线程(main.js)
const { Worker } = require('worker_threads');
const sensorData = [/* 10 万条数据 */];
const threadCount = 4;
const chunkSize = Math.ceil(sensorData.length / threadCount);
for (let i = 0; i < threadCount; i++) {
const worker = new Worker('./stats-worker.js', {
workerData: {
data: sensorData.slice(i * chunkSize, (i + 1) * chunkSize),
chunkId: i
}
});
worker.on('message', (localStats) => {
// 合并所有 Worker 的统计结果
aggregateStats(localStats);
});
}
// Worker 线程(stats-worker.js)
const { workerData, parentPort } = require('worker_threads');
function calculateStats(data) {
const sum = data.reduce((acc, val) => acc + val, 0);
const avg = sum / data.length;
return { sum, avg };
}
const result = calculateStats(workerData.data);
parentPort.postMessage(result);
关键点:
- 数据均分到 4 个 Worker 并行计算。
- 最终聚合局部结果。
场景 4:文件批量压缩
问题:同步压缩 50 个大型日志文件耗时过长。
解决方案:多线程并行压缩。
javascript
// 主线程(main.js)
const { Worker } = require('worker_threads');
const zlib = require('zlib');
const fs = require('fs');
const files = ['log1.txt', 'log2.txt', /* ... 50个文件 */];
files.forEach((file) => {
const worker = new Worker('./compress-worker.js', {
workerData: file
});
worker.on('exit', () => {
console.log(`${file} 压缩完成`);
});
});
// Worker 线程(compress-worker.js)
const { workerData, parentPort } = require('worker_threads');
const zlib = require('zlib');
const fs = require('fs');
const fileContent = fs.readFileSync(workerData);
zlib.gzip(fileContent, (err, compressed) => {
fs.writeFileSync(`${workerData}.gz`, compressed);
parentPort.postMessage('done');
});
Worker Threads 最佳实践
- 避免过度线程化:线程创建有成本,建议使用线程池(如
poolifier
库)。 - 减少线程间通信:消息传递涉及序列化/反序列化开销。
- 错误处理:监听 Worker 的
error
和exit
事件。 - 共享内存:通过
SharedArrayBuffer
实现高效数据共享(需谨慎处理竞态条件)。
何时不用 Worker Threads?
- I/O 密集型任务:用异步 I/O + 主线程更高效。
- 超短任务:线程启动开销可能超过任务本身耗时。
- 需要跨进程通信:考虑使用
child_process
或cluster
。