Node.js 长期以来以其单线程、事件驱动的特性闻名,这一特性非常适合 I/O 密集型应用,但在处理 CPU 密集型任务时可能会力不从心。

随着 Node.js 10.5 中工作线程(Worker Threads)的发布,实现了并行处理能力,尤其适用于CPU 密集型任务和需要隔离或共享资源的场景。

入门实例

计算斐波拉契数列:

创建一个文件: fibonacci.mjs,作为worker 线程,内容为:

javascript 复制代码
import { parentPort, workerData } from 'node:worker_threads';

const fibonacci = (n) => {
  if (n <= 1) return n;
  return fibonacci(n - 1) + fibonacci(n - 2);
};

// parentPort 是 node:worker_threads 模块中 `Worker 线程` 特有的属性,类型为 MessagePort 或 null(非 Worker 线程时为 null)。
// 作为 Worker 线程与父线程(通常为主线程)的双向通信接口,实现跨线程的消息传递。
// Worker 线程通过 parentPort 发送 / 接收消息,父线程通过 Worker 实例的事件接口(如 worker.on('message'))与之交互。
// 本质是 MessagePort 实例
// parentPort.postMessage(value[, transferList]): value:需传递的数据,必须是可序列化的 JavaScript 值(支持循环引用、RegExp、Map、Set 等)。
// parentPort.postMessage(value[, transferList]):transferList(可选):包含 Transferable 对象(如 ArrayBuffer、MessagePort、FileHandle)的数组,用于零拷贝传输(转移所有权后发送方不可再访问)。
parentPort.postMessage(fibonacci(workerData));

// 监听父线程消息并回复
parentPort.on('message', (msg) => {
  console.log('Received from main:', msg);
  parentPort.postMessage('Message received!');
});


// workerData
// workerData 是 Worker 线程中的一个只读属性,类型为 任意 JavaScript 值(由父线程传递并克隆而来)。
// 作为父线程向 Worker 线程传递初始化数据的载体,用于在 Worker 启动时传递配置参数、输入数据等。

创建一个文件: fibonacciMain.mjs,作为主线程,内容为:

javascript 复制代码
import { Worker,isMainThread } from 'node:worker_threads';

let worker;
const computeFibonacci = (num) => {
  return new Promise((resolve, reject) => {
    worker = new Worker('./fibonacci.mjs', { workerData: num });   
    worker.on('message', resolve);
    worker.on('error', reject);
    worker.on('exit', (code) => {
      if (code !== 0) reject(new Error(`Worker stopped with exit code ${code}`));
    });
  });
};
computeFibonacci(30).then(result => {
  console.log(`Fibonacci result: ${result}`);
  if (worker) {
    worker.postMessage(result);
    setTimeout(() => {
      worker.terminate(); // 终止worker线程 
    }, 1000);
   
  }
}).catch(err => {
  console.error(err);
});

console.log("This is:" + (isMainThread?' main thread':' worker thread'))

// 输出结果:
// This is: main thread
// Fibonacci result: 832040

什么是 worker_threads

我们可以打印一下worker_threads模块的内容:

javascript 复制代码
import worker from 'node:worker_threads';
console.log(worker);

输出结果如果:

javascript 复制代码
{
  isMainThread: true,
  MessagePort: [Function: MessagePort],
  MessageChannel: [Function: MessageChannel],
  markAsUntransferable: [Function: markAsUntransferable],
  moveMessagePortToContext: [Function: moveMessagePortToContext],
  receiveMessageOnPort: [Function: receiveMessageOnPort],
  resourceLimits: {},
  threadId: 0,
  SHARE_ENV: Symbol(nodejs.worker_threads.SHARE_ENV),
  Worker: [class Worker extends EventEmitter],
  parentPort: null,
  workerData: null,
  BroadcastChannel: [class BroadcastChannel extends EventTarget],
  setEnvironmentData: [Function: setEnvironmentData],
  getEnvironmentData: [Function: getEnvironmentData]
}

应用场景

在 Node.js 中,worker_threads 模块主要用于解决 CPU 密集型任务 的性能问题。通过将计算密集型任务分配到独立线程,避免阻塞主线程(Event Loop),从而提升应用的吞吐量和响应速度。以下是几个典型应用场景及实例:

场景 1:图像/视频处理

问题:压缩 1000 张高分辨率图片,单线程处理会导致主线程阻塞,无法响应其他请求。

解决方案:使用 Worker 线程并行处理。

javascript 复制代码
// 主线程(main.js)
const { Worker } = require('worker_threads');
const path = require('path');

const imagePaths = ['image1.jpg', 'image2.jpg', /* ... 1000个路径 */];

imagePaths.forEach((imagePath) => {
  const worker = new Worker(path.resolve('./image-worker.js'), {
    workerData: imagePath
  });

  worker.on('message', (result) => {
    console.log(`压缩完成: ${result}`);
  });

  worker.on('error', (err) => {
    console.error(`Worker 错误: ${err}`);
  });
});

// Worker 线程(image-worker.js)
const { workerData, parentPort } = require('worker_threads');
const sharp = require('sharp'); // 图像处理库

async function compressImage(imagePath) {
  await sharp(imagePath)
    .resize(800, 600)
    .jpeg({ quality: 80 })
    .toFile(`compressed_${imagePath}`);
}

compressImage(workerData)
  .then(() => parentPort.postMessage(workerData))
  .catch(console.error);

关键点

  • 每个图片处理任务独立运行在 Worker 中。
  • 主线程仅负责任务调度,不参与实际计算。

场景 2:大数据计算(如矩阵运算)

问题:计算两个 10000x10000 矩阵相乘,单线程计算耗时极长。

解决方案:将矩阵分块,多线程并行计算。

javascript 复制代码
// 主线程(main.js)
const { Worker, isMainThread, parentPort } = require('worker_threads');

if (isMainThread) {
  const matrixA = /* 生成矩阵 A */;
  const matrixB = /* 生成矩阵 B */;
  const chunkSize = 1000;

  // 将矩阵分块
  const chunks = [];
  for (let i = 0; i < matrixA.length; i += chunkSize) {
    chunks.push({
      a: matrixA.slice(i, i + chunkSize),
      b: matrixB,
      startRow: i
    });
  }

  // 为每个分块创建 Worker
  const results = new Array(matrixA.length);
  chunks.forEach((chunk) => {
    const worker = new Worker(__filename, { workerData: chunk });
    worker.on('message', (rows) => {
      rows.forEach((row, idx) => {
        results[chunk.startRow + idx] = row;
      });
    });
  });

} else {
  // Worker 线程
  const { multiplyMatrices } = require('./math-utils');

  const chunk = workerData;
  const resultChunk = multiplyMatrices(chunk.a, chunk.b);
  parentPort.postMessage(resultChunk);
}

关键点

  • 数据分片后由多个 Worker 并行处理。
  • 主线程合并结果。

场景 3:实时数据分析

问题:实时处理来自传感器的 10 万条数据点,需快速完成统计分析。

解决方案:多线程并行处理数据分片。

javascript 复制代码
// 主线程(main.js)
const { Worker } = require('worker_threads');

const sensorData = [/* 10 万条数据 */];
const threadCount = 4;
const chunkSize = Math.ceil(sensorData.length / threadCount);

for (let i = 0; i < threadCount; i++) {
  const worker = new Worker('./stats-worker.js', {
    workerData: {
      data: sensorData.slice(i * chunkSize, (i + 1) * chunkSize),
      chunkId: i
    }
  });

  worker.on('message', (localStats) => {
    // 合并所有 Worker 的统计结果
    aggregateStats(localStats);
  });
}

// Worker 线程(stats-worker.js)
const { workerData, parentPort } = require('worker_threads');

function calculateStats(data) {
  const sum = data.reduce((acc, val) => acc + val, 0);
  const avg = sum / data.length;
  return { sum, avg };
}

const result = calculateStats(workerData.data);
parentPort.postMessage(result);

关键点

  • 数据均分到 4 个 Worker 并行计算。
  • 最终聚合局部结果。

场景 4:文件批量压缩

问题:同步压缩 50 个大型日志文件耗时过长。

解决方案:多线程并行压缩。

javascript 复制代码
// 主线程(main.js)
const { Worker } = require('worker_threads');
const zlib = require('zlib');
const fs = require('fs');

const files = ['log1.txt', 'log2.txt', /* ... 50个文件 */];

files.forEach((file) => {
  const worker = new Worker('./compress-worker.js', {
    workerData: file
  });

  worker.on('exit', () => {
    console.log(`${file} 压缩完成`);
  });
});

// Worker 线程(compress-worker.js)
const { workerData, parentPort } = require('worker_threads');
const zlib = require('zlib');
const fs = require('fs');

const fileContent = fs.readFileSync(workerData);
zlib.gzip(fileContent, (err, compressed) => {
  fs.writeFileSync(`${workerData}.gz`, compressed);
  parentPort.postMessage('done');
});

Worker Threads 最佳实践

  1. 避免过度线程化:线程创建有成本,建议使用线程池(如 poolifier 库)。
  2. 减少线程间通信:消息传递涉及序列化/反序列化开销。
  3. 错误处理:监听 Worker 的 errorexit 事件。
  4. 共享内存:通过 SharedArrayBuffer 实现高效数据共享(需谨慎处理竞态条件)。

何时不用 Worker Threads?

  • I/O 密集型任务:用异步 I/O + 主线程更高效。
  • 超短任务:线程启动开销可能超过任务本身耗时。
  • 需要跨进程通信:考虑使用 child_processcluster