[关闭]
@Dale-Lin 2022-09-09T12:02:41.000000Z 字数 10757 阅读 392

Node 进程和线程调度/事件循环

NodeJs


定义

早期在单核 CPU 系统中,为了实现多任务的运行,引入了进程的概念;不同程序运行在数据和指令相互隔离的进程中,通过时间片的轮转调度执行,由于 CPU 时间片切换和执行很快,看上去像是同一时间运行了多个程序。
进程切换时需要保存相关硬件现场、进程控制块等信息,所以系统开销很大。为了进一步提高系统吞吐率,在同一进程执行时更充分地利用 CPU 资源,引入了线程的概念

线程是操作系统调度执行的最小单位,它们依附于进程中,共享同一个进程的资源,基本不拥有/很少拥有系统资源,线程之间切换的开销极小。

单线程?

首先执行一下 Node 代码:

  1. const http = require('http')
  2. http.createServer((req, res) => {
  3. res.writeHead(200);
  4. res.end('Hello World');
  5. }).listen(8000);
  6. console.log('process id', process.pid);

在控制台查看这个 Node 服务进程的详细信息:

  1. PID COMMAND %CPU TIME #TH #WQ #POR MEM PURG CMPRS PGRP PPID STATE BOOSTS %CPU_ME
  2. 35919 node 0.0 00:00.09 7 0 35 8564K 0B 8548K 35919 35622 sleeping *0[1] 0.00000

可以看到 #TH(threads)列显示为 7,说明一个 Node 进程中并非只有一个线程。

NodeJS 的一个进程通常包含:

其中异步 I/O 线程池,如果执行程序中不包含 I/O 操作如文件读写等,则默认线程池大小为 0(上述服务),否则 Node 会初始化大小为 4 的异步 I/O 线程池 uv_thread_pool(线程数会变成 11);
我们也可以通过 process.env.UV_THREADPOOL_SIZE 自己设定线程池大小。需要注意的是在 Node 中网络 I/O 并不占用线程池。

Node 进程中并非只有一个线程,而是只有一个 JS 执行主线程。

事件循环

JS 只有一个执行主线程,是如何支持高并发的?

Node 进程中通过 libuv 实现了一个 uv_event_loop 事件循环机制,当执行线程发生阻塞事件(如 I/O 操作)时,主线程会把耗时的操作放进事件队列中,然后继续执行后续程序。

事件循环机制会尝试从 libuv 线程池 uv_thread_pool 中取出一个空闲线程去执行队列中的操作,执行完毕后将结果通知主线程,主线程执行相关回调,并将 libuv 线程归还线程池,来保证非阻塞的 I/O 和主线程的高效执行。

子进程

事件循环机制使 Node 实现了 I/O-Sensitive 场景下的高并发,但对 CPU-Sensitive 的场景,主线程将会长时间阻塞,无法处理额外的请求。

Node 提供了 child_process 模块来进行进程的生成、通信、销毁等,以充分发挥多核 CPU 的性能以应对 CPU-Sentitive 场景。

stdin/stdout/stderr 默认会在子进程和父进程之间用 pipe 建立。

主要 api

  1. const { spawn } = require("node:child_process")
  2. const ls = spawn('ls', ['-lh', '/usr'])
  3. ls.stdout.on('data', (data) => {
  4. console.log(`stdout: ${data}`)
  5. })
  6. ls.stderr.on('data', (data) => {
  7. console.error(`stderr: ${data}`)
  8. })
  9. ls.on('close', (code) => {
  10. console.log(`child process exited with code ${code}`)
  11. })
  1. const { exec } = require('node:child_process')
  2. exec('"/path/to/test_file/test.sh" arg1 arg2')
  3. // shell 环境变量的转译
  4. exec('echo "The \\$HOME variable is $HOME"')

谨防不安全的用户输入攻击。

  1. if (process.argv[2] === 'child') {
  2. setTimeout(() => {
  3. console.log('Hello from child!')
  4. }, 1_000)
  5. } else {
  6. const { fork } = require('child_process')
  7. const controller = new AbortController()
  8. const { signal } = controller
  9. const child = fork(__filename, ['child'], { signal })
  10. child.on('error', (err) => {
  11. // Abort Error will called
  12. console.error(err)
  13. })
  14. controller.abort()
  15. }

通信

Node 中,父子进程可通过 IPC 信道收发消息,IPC 由 libuv 通过 pipe 实现,一旦子进程被创建且通信方式为 IPC(`options.stdio 被设置成 ' ipc'),父子进程就可以双向通信

  1. // parent process
  2. const cp = require('node:child_process')
  3. const n = cp.fork(`${__dirname}/sub.js`)
  4. // receive n's message
  5. n.on('message', m => {
  6. console.log('PARENT got message: ', m)
  7. })
  8. // send msg to n
  9. n.send({
  10. hello: 'world'
  11. })
  1. // sub.js
  2. // receive msg from PARENT
  3. process.on('message', m => {
  4. console.log('CHILD got message: ', m)
  5. })
  6. // send msg to PARENT
  7. process.send({
  8. foo: 'bar',
  9. baz: NaN
  10. })

cluster

cluster 模块实现了单一 master 控制进程和多 worker 执行进程的通用集群模式。

cluster master 可以创建、销毁进程,并可以和 worker 进程之间通信(IPC);worker 进程负责执行耗时的任务,之间不能直接通信。

cluster 模块实现了 LB 调度算法。在类 unix 系统中,cluster 使用轮转调度(round-robin),node 中维护一个可用 worker 进程的队列 free、一个任务队列 handles,当一个任务到来时,free 队首节点出列处理任务,并返回确认处理标识,依次调度。在 win 系统中, node 通过 Shared handle 处理 LB,通过将 fd,port 等信息传递给子进程,子进程通过这些信息创建对应的 SocketHandle/ServerHandle,然后进行响应端口的绑定和监听,处理任务。

cluster 大大简化了多进程模型的使用:

  1. const cluster = require('cluster')
  2. // 斐波那契
  3. function fib(num) {
  4. return num < 2 ? num : fib(num - 1) + fib(num - 2)
  5. }
  6. if (cluster.isMaster) {
  7. // 计算 43 和 44
  8. for (let i = 43; i < 45; i++) {
  9. const worker = cluster.fork() // fork worker 进程
  10. worker.send({ num: i })
  11. worker.on('message', msg => {
  12. const { data, num } = msg
  13. console.log(`receive fib(${num}) calculate result ${data}`)
  14. worker.kill()
  15. })
  16. }
  17. // 监听 worker 进程退出事件
  18. cluster.on('exit', worker => {
  19. // worker.isDead()
  20. console.log(`worker ${worker.process.pid} killed!`)
  21. // cluster.workers 是 { workerId: worker } 的集合对象
  22. if (Object.keys(cluster.workers).length === 0)
  23. console.log('calculate main process end')
  24. })
  25. }
  26. else {
  27. // worker process runs
  28. process.on('message', message => {
  29. const { num } = message
  30. console.log(`child pid ${process.pid} receive num ${num}`)
  31. const data = fib(num)
  32. process.send({ data, num })
  33. })
  34. }

fork() creates a new process by duplicating the calling process. The new process is referred to as the child process. The calling process is referred to as the parent process.

The child process and the parent process run in separate memory spaces. At the time of fork() both memory spaces have the same content. Memory writes, file mappings (mmap(2)), and unmappings (munmap(2)) performed by one of the processes do not affect the other.

worker_threads

node:worker_threadschild_process/cluster 之间的区别在于,worker_threads 可以通过传输 ArrayBuffer 实例或 SharedArrayBuffer 实例来共享内存。

用法

  1. const {
  2. Worker,
  3. isMainThread
  4. getEnvironmentData,
  5. setEnvironmentData
  6. } = require('node:worker_threads')
  7. if (isMainThread) {
  8. setEnvironmentData('key', 'value')
  9. const worker = new Worker(__filename)
  10. } else {
  11. console.log(getEnvironmendData('key')) // Prints 'value'
  12. }

例如:

  1. const {
  2. Worker, inMainThread, parentPort, workerData
  3. } = require('node:worker_threads')
  4. if (isMainThread) {
  5. module.exports = function parseJSAsync(script) {
  6. // spawn a worker thread for each call
  7. return new Promise((resolve, reject) => {
  8. const worker = new Worker(__filename, {
  9. workerData: script
  10. })
  11. worker.on('message', resolve)
  12. worker.on('exit', code => {
  13. if (code !== 0) {
  14. reject(new Error(`Worker stopped with exit code ${code}`))
  15. }
  16. })
  17. worker.on('error', reject)
  18. })
  19. }
  20. }
  21. else {
  22. const { parse } = require('@babel/parser')
  23. const script = workerData
  24. parentPort.postMessage(parse(script), { sourceType: 'module' })
  25. }

上例在每次调用 parseJSAsync 的时候都生成了一个 worker 线程,在实际运用的时候最好有一个线程池来管理,否则过量线程会造成负向效果。

  1. const { MessageChannel, receiveMessageFromPort } = require('node:worker_threads')
  2. const { port1, port2 } = new MessageChannel()
  3. port1.portMessage({ hello: 'world!' })
  4. console.log(receiveMessageOnPort(port2))
  5. // Prints: { message: { hello: 'world!' } }
  6. console.log(receiveMessageOnPort(port2))
  7. // Prints: undefined
  1. const { MessageChannel } = require('node:worker_threads')
  2. const { port1, port2 } = new MessageChannel()
  3. port1.on('message', message => console.log('received', message))
  4. port2.postMessage({ foo: 'bar' })
  5. // Prints: received { foo: 'bar' }

线程池

实现线程池的时候,最好使用 AsyncResource API 来通知分析工具(例如 async_context_trace )任务和结果之间的关系,详见 node:async_hooks

Class: AsyncResource

AsyncResource 用来触发异步资源的生命周期事件,用继承方式来使用。

  1. import {
  2. AsyncResource,
  3. // executionAsyncId
  4. } from 'node:async_hooks'
  5. // AsyncResource() 构造器的 triggerAsyncId 为空时会使用 async_hook.executionAsyncId()
  6. // type 为异步事件类型的字符串
  7. // requireManualDestroy 为 false 的时候
  8. // AsnycResource.emitDestroy 方法只在有 destroy 钩子的情况下调用 GC
  9. // const asyncResource = new AsyncResource(
  10. // type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false }
  11. //)
  12. class DBQuery extends AsyncResource {
  13. constructor(db) {
  14. super('DBQuery')
  15. this.db = db
  16. }
  17. getInfo(query, callback) {
  18. this.db.get(query, (er, data) => {
  19. // asyncResource method
  20. this.runInAsyncScope(callback, null, err, data)
  21. })
  22. }
  23. close() {
  24. this.db = null
  25. // asyncResource method
  26. this.emitDestroy()
  27. }
  28. }

部分 API

线程池示例

假设 worker 线程如下:

  1. // task_processer.js
  2. const { parentPort } = require('node:worker_threads')
  3. parentPort.on('message', ({ a, b }) => {
  4. parentPort.postMessage(a + b)
  5. })

设计一个线程池:

  1. // worker_pool.js
  2. const { AsyncResource } = require('node:async_hooks')
  3. const { EventEmitter } = require('node:events')
  4. const path = require('node:path')
  5. const { Worker } = require('node:worker_threads')
  6. const kTaskInfo = Symbol('kTaskInfo') // kTaskInfo 属性
  7. const kWorkerFreedEvent = Symbol('kWorkerFreedEvent') // worker 空闲事件标识
  8. // 异步任务类
  9. class WorkerPoolTaskInfo extends AsyncResource {
  10. constructor(callback) {
  11. super('WorkerPoolTaskInfo')
  12. this.callback = callback
  13. }
  14. done(er, result) {
  15. this.runInAsyncScope(this.callback, null, er, result)
  16. this.emitDestroy()
  17. }
  18. }
  19. class WorkerPool extends EventEmitter {
  20. constructor(numThreads) {
  21. super()
  22. this.numThreads = numThreads
  23. this.workers = [] // 所有线程集合
  24. this.freeWorkers = [] // 空闲线程
  25. this.tasks = [] // 暂存任务队列
  26. for (let i = 0; i < numThreads; i++)
  27. this.addNewWorker()
  28. // 监听 worker 空闲事件,处理暂存的任务
  29. this.on(kWorkerFreedEvent, () => {
  30. if (this.tasks.length > 0) {
  31. const { task, callback } = this.tasks.shift()
  32. this.runTask(task, callback)
  33. }
  34. })
  35. }
  36. runTask(task, callback) {
  37. if (this.freeWorkers.length === 0) {
  38. this.tasks.push({ task, callback })
  39. return;
  40. }
  41. const worker = this.freeWorkers.pop()
  42. worker[kTaskInfo] = new WorkerPoolTaskInfo(callback) // 存任务 callback 到 asyncResource 类属性上
  43. worker.postMessage(task) // 发送任务到子线程
  44. }
  45. addNewWorker() {
  46. const newWorker = new Worker(path.resolve(__dirname, 'task_processer.js'))
  47. newWorker.on('message', result => {
  48. // 收到子线程回调
  49. // 1. 调用 asyncSource 类的 done 方法
  50. newWorker[kTaskInfo].done(null, result)
  51. // 2. 取消 [kTaskInfo] 绑定
  52. newWorker[kTaskInfo] = null
  53. // 3. 将线程归还,并触发 kWorkerFreedEvent
  54. this.freeWorkers.push(newWorker)
  55. this.emit(kWorkerFreedEvent)
  56. })
  57. newWorker.on('error', er => {
  58. if (newWorker[kTaskInfo])
  59. newWorker[kTaskInfo].done(er, null)
  60. else
  61. this.emit('error', er)
  62. // 删除这个线程,再重新生成一个
  63. this.workers.splice(this.workers.indexOf(newWorker), 1)
  64. this.addNewWorker()
  65. })
  66. // 添加到线程集合和空闲队列
  67. // 并触发 kWorkerFreedEvent
  68. this.workers.push(newWorker)
  69. this.freeWorkers.push(newWorker)
  70. this.emit(kWorkerFreedEvent)
  71. }
  72. close() {
  73. for (const worker of this.workers) worker.terminate()
  74. }
  75. }
  76. module.exports = WorkerPool

使用:

  1. const WorkerPool = require('./worker_pool.js')
  2. const os = require('node:os')
  3. const pool = new WorkerPool(os.cpus().length)
  4. let finished = 0;
  5. for (let i = 0; i < 10; i++) {
  6. pool.runTask({ a: 42 + i, b: 100 }, (er, result) => {
  7. console.log(i, er, result)
  8. if (++finished === 10) {
  9. pool.close()
  10. }
  11. })
  12. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注