@Dale-Lin
2022-09-09T12:02:41.000000Z
字数 10757
阅读 392
NodeJs
早期在单核 CPU 系统中,为了实现多任务的运行,引入了进程的概念;不同程序运行在数据和指令相互隔离的进程中,通过时间片的轮转调度执行,由于 CPU 时间片切换和执行很快,看上去像是同一时间运行了多个程序。
进程切换时需要保存相关硬件现场、进程控制块等信息,所以系统开销很大。为了进一步提高系统吞吐率,在同一进程执行时更充分地利用 CPU 资源,引入了线程的概念
线程是操作系统调度执行的最小单位,它们依附于进程中,共享同一个进程的资源,基本不拥有/很少拥有系统资源,线程之间切换的开销极小。
首先执行一下 Node 代码:
const http = require('http')
http.createServer((req, res) => {
res.writeHead(200);
res.end('Hello World');
}).listen(8000);
console.log('process id', process.pid);
在控制台查看这个 Node 服务进程的详细信息:
PID COMMAND %CPU TIME #TH #WQ #POR MEM PURG CMPRS PGRP PPID STATE BOOSTS %CPU_ME
35919 node 0.0 00:00.09 7 0 35 8564K 0B 8548K 35919 35622 sleeping *0[1] 0.00000
可以看到 #TH
(threads)列显示为 7,说明一个 Node 进程中并非只有一个线程。
NodeJS 的一个进程通常包含:
v8_thread_pool
,主要用来执行代码调优、GC 等后台任务;uv_thread_pool
。其中异步 I/O 线程池,如果执行程序中不包含 I/O 操作如文件读写等,则默认线程池大小为 0(上述服务),否则 Node 会初始化大小为 4 的异步 I/O 线程池
uv_thread_pool
(线程数会变成 11);
我们也可以通过process.env.UV_THREADPOOL_SIZE
自己设定线程池大小。需要注意的是在 Node 中网络 I/O 并不占用线程池。
Node 进程中并非只有一个线程,而是只有一个 JS 执行主线程。
JS 只有一个执行主线程,是如何支持高并发的?
Node 进程中通过 libuv 实现了一个 uv_event_loop
事件循环机制,当执行线程发生阻塞事件(如 I/O 操作)时,主线程会把耗时的操作放进事件队列中,然后继续执行后续程序。
事件循环机制会尝试从 libuv 线程池 uv_thread_pool
中取出一个空闲线程去执行队列中的操作,执行完毕后将结果通知主线程,主线程执行相关回调,并将 libuv 线程归还线程池,来保证非阻塞的 I/O 和主线程的高效执行。
事件循环机制使 Node 实现了 I/O-Sensitive 场景下的高并发,但对 CPU-Sensitive 的场景,主线程将会长时间阻塞,无法处理额外的请求。
Node 提供了 child_process 模块来进行进程的生成、通信、销毁等,以充分发挥多核 CPU 的性能以应对 CPU-Sentitive 场景。
stdin/stdout/stderr
默认会在子进程和父进程之间用 pipe 建立。
child_process.spawn()
:异步生成子进程(避免主进程阻塞),同步方法是 child_process.spawnSync()
const { spawn } = require("node:child_process")
const ls = spawn('ls', ['-lh', '/usr'])
ls.stdout.on('data', (data) => {
console.log(`stdout: ${data}`)
})
ls.stderr.on('data', (data) => {
console.error(`stderr: ${data}`)
})
ls.on('close', (code) => {
console.log(`child process exited with code ${code}`)
})
child_process.exec()
:生成一个 shell 并在上面运行命令,完成后会在 callback 中回调 stdout/stderr
;有同步版本。
const { exec } = require('node:child_process')
exec('"/path/to/test_file/test.sh" arg1 arg2')
// shell 环境变量的转译
exec('echo "The \\$HOME variable is $HOME"')
谨防不安全的用户输入攻击。
child_process.execFile()
:类似 .exec()
;直接执行命令,默认不会先生成一个 shell;有同步版本。
child_process.fork()
:生成一个新进程,并唤起一个内置的 module 来建立 IPC(进程间通信),父子进程可以互相通信。
if (process.argv[2] === 'child') {
setTimeout(() => {
console.log('Hello from child!')
}, 1_000)
} else {
const { fork } = require('child_process')
const controller = new AbortController()
const { signal } = controller
const child = fork(__filename, ['child'], { signal })
child.on('error', (err) => {
// Abort Error will called
console.error(err)
})
controller.abort()
}
Node 中,父子进程可通过 IPC 信道收发消息,IPC 由 libuv 通过 pipe 实现,一旦子进程被创建且通信方式为 IPC(`options.stdio 被设置成 ' ipc'),父子进程就可以双向通信
options.stdio
用来设置父子进程间的 pipes。
默认情况下,子进程的 stdin/stdout/stderr 会被重定向到子进程对象对应的 subprocess.stdin
/subprocess.stdout
/subprocess.stderr
流上,相当于设置了 ['pipe', 'pipe', 'pipe']
'pipe'
:['pipe', 'pipe', 'pipe']
的简写;'overlapped'
:['overlapped', 'overlapped', 'overlapped']
的简写;'ignore'
:['ignore', 'ignore', 'ignore']
的简写;'inherit'
:['inherit', 'inherit', 'inherit']
或 [0, 1, 2]
的简写;数组的三项 fd(文件描述符)分别对应子进程的 stdin, stdout, stderr。
'pipe'
:在子进程和父进程之间创建一个 pipe。父进程可以通过子进程对象的 subprocess.stdio[0/1/2]
或者 subprocess.stdin/stdout/stderr
访问;因为这并非真实的 Unix pipes,所以子进程不能使用。'overlapped'
:在非 windows 系统上,和 'pipe'
相同;在 windows 系统上,会把 FILE_FLAG_OVERLAPPED
标志设置到 handle 上。'ipc'
:在父子进程间创建一个传递 messages/fd 的 IPC 通道。一个子进程最多只有一个 IPC stdio fd。 subprocess.send
方法向某个子进程发送消息,并通过向子进程添加 message
事件回调来接收子进程传来的信息;子进程通过 process.send
方法向父进程发送信息,并通过 process 的 message
事件接受父进程传来的信息。'ignore'
:忽略子进程某个 fd。'inherit'
:将父进程对应的 stdio 流传过去,即 process.stdin/stdout/stderr
// parent process
const cp = require('node:child_process')
const n = cp.fork(`${__dirname}/sub.js`)
// receive n's message
n.on('message', m => {
console.log('PARENT got message: ', m)
})
// send msg to n
n.send({
hello: 'world'
})
// sub.js
// receive msg from PARENT
process.on('message', m => {
console.log('CHILD got message: ', m)
})
// send msg to PARENT
process.send({
foo: 'bar',
baz: NaN
})
cluster 模块实现了单一 master 控制进程和多 worker 执行进程的通用集群模式。
cluster master 可以创建、销毁进程,并可以和 worker 进程之间通信(IPC);worker 进程负责执行耗时的任务,之间不能直接通信。
cluster 模块实现了 LB 调度算法。在类 unix 系统中,cluster 使用轮转调度(round-robin),node 中维护一个可用 worker 进程的队列 free、一个任务队列 handles,当一个任务到来时,free 队首节点出列处理任务,并返回确认处理标识,依次调度。在 win 系统中, node 通过 Shared handle 处理 LB,通过将 fd,port 等信息传递给子进程,子进程通过这些信息创建对应的 SocketHandle/ServerHandle,然后进行响应端口的绑定和监听,处理任务。
cluster 大大简化了多进程模型的使用:
const cluster = require('cluster')
// 斐波那契
function fib(num) {
return num < 2 ? num : fib(num - 1) + fib(num - 2)
}
if (cluster.isMaster) {
// 计算 43 和 44
for (let i = 43; i < 45; i++) {
const worker = cluster.fork() // fork worker 进程
worker.send({ num: i })
worker.on('message', msg => {
const { data, num } = msg
console.log(`receive fib(${num}) calculate result ${data}`)
worker.kill()
})
}
// 监听 worker 进程退出事件
cluster.on('exit', worker => {
// worker.isDead()
console.log(`worker ${worker.process.pid} killed!`)
// cluster.workers 是 { workerId: worker } 的集合对象
if (Object.keys(cluster.workers).length === 0)
console.log('calculate main process end')
})
}
else {
// worker process runs
process.on('message', message => {
const { num } = message
console.log(`child pid ${process.pid} receive num ${num}`)
const data = fib(num)
process.send({ data, num })
})
}
fork() creates a new process by duplicating the calling process. The new process is referred to as the child process. The calling process is referred to as the parent process.
The child process and the parent process run in separate memory spaces. At the time of fork() both memory spaces have the same content. Memory writes, file mappings (mmap(2)), and unmappings (munmap(2)) performed by one of the processes do not affect the other.
node:worker_threads
和 child_process
/cluster
之间的区别在于,worker_threads
可以通过传输 ArrayBuffer
实例或 SharedArrayBuffer
实例来共享内存。
const {
Worker,
isMainThread,
getEnvironmentData,
setEnvironmentData
} = require('node:worker_threads')
if (isMainThread) {
setEnvironmentData('key', 'value')
const worker = new Worker(__filename)
} else {
console.log(getEnvironmendData('key')) // Prints 'value'
}
例如:
const {
Worker, inMainThread, parentPort, workerData
} = require('node:worker_threads')
if (isMainThread) {
module.exports = function parseJSAsync(script) {
// spawn a worker thread for each call
return new Promise((resolve, reject) => {
const worker = new Worker(__filename, {
workerData: script
})
worker.on('message', resolve)
worker.on('exit', code => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`))
}
})
worker.on('error', reject)
})
}
}
else {
const { parse } = require('@babel/parser')
const script = workerData
parentPort.postMessage(parse(script), { sourceType: 'module' })
}
上例在每次调用 parseJSAsync
的时候都生成了一个 worker 线程,在实际运用的时候最好有一个线程池来管理,否则过量线程会造成负向效果。
worker.postMessage(value[, transferList])
向 worker 线程发送消息。
options
process.argv
参数process.env
true
且第一个参数是脚本字符串的时候,会在 worker online 的时候立刻执行。require('node:worker_threads').workerData
使用。worker.terminate()
尽快终止 worker 线程中的 JS 执行。
worker.threadId
worker 线程的 id
worker.parentPort
worker 线程和父线程通信的 MessagePort。使用 parentPort.postMessage()
向父线程发送消息,使用 parentPort.on('message')
接收父线程发来的消息。
worker.receiveMessageOnPort(port)
手动从一个 MessagePort
接收一条(最早的)消息,如果没有可消费的消息,返回 undefined
。手动接收的消息不会触发 'message' 事件。
const { MessageChannel, receiveMessageFromPort } = require('node:worker_threads')
const { port1, port2 } = new MessageChannel()
port1.portMessage({ hello: 'world!' })
console.log(receiveMessageOnPort(port2))
// Prints: { message: { hello: 'world!' } }
console.log(receiveMessageOnPort(port2))
// Prints: undefined
worker.workerData
任意的 JS 值,在 worker 被 new Worker
创建的时候传入的数据,通过 require('node:worker_threads').workerData
取到。
MessageChannel
worker.MessageChannel
是一个异步双向信道,New MessageChannel
yields 一个包含 port1
和 port2
属性名的对象用来连接。
const { MessageChannel } = require('node:worker_threads')
const { port1, port2 } = new MessageChannel()
port1.on('message', message => console.log('received', message))
port2.postMessage({ foo: 'bar' })
// Prints: received { foo: 'bar' }
实现线程池的时候,最好使用 AsyncResource
API 来通知分析工具(例如 async_context_trace )任务和结果之间的关系,详见 node:async_hooks
。
AsyncResource
AsyncResource
用来触发异步资源的生命周期事件,用继承方式来使用。
import {
AsyncResource,
// executionAsyncId
} from 'node:async_hooks'
// AsyncResource() 构造器的 triggerAsyncId 为空时会使用 async_hook.executionAsyncId()
// type 为异步事件类型的字符串
// requireManualDestroy 为 false 的时候
// AsnycResource.emitDestroy 方法只在有 destroy 钩子的情况下调用 GC
// const asyncResource = new AsyncResource(
// type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false }
//)
class DBQuery extends AsyncResource {
constructor(db) {
super('DBQuery')
this.db = db
}
getInfo(query, callback) {
this.db.get(query, (er, data) => {
// asyncResource method
this.runInAsyncScope(callback, null, err, data)
})
}
close() {
this.db = null
// asyncResource method
this.emitDestroy()
}
}
asyncResource.runInAsyncScope(fn[, thisArg, ..args])
在 async resource 的执行上下文里调用某个函数。
asyncResource.emitDestroy()
触发 destroy
事件及相关钩子。这个方法只能手动调用,且只能调用一次,重复调用会抛出错误。如果此时资源在等待 GC,那么所有 destroy
钩子不会因此执行。
假设 worker 线程如下:
// task_processer.js
const { parentPort } = require('node:worker_threads')
parentPort.on('message', ({ a, b }) => {
parentPort.postMessage(a + b)
})
设计一个线程池:
// worker_pool.js
const { AsyncResource } = require('node:async_hooks')
const { EventEmitter } = require('node:events')
const path = require('node:path')
const { Worker } = require('node:worker_threads')
const kTaskInfo = Symbol('kTaskInfo') // kTaskInfo 属性
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent') // worker 空闲事件标识
// 异步任务类
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo')
this.callback = callback
}
done(er, result) {
this.runInAsyncScope(this.callback, null, er, result)
this.emitDestroy()
}
}
class WorkerPool extends EventEmitter {
constructor(numThreads) {
super()
this.numThreads = numThreads
this.workers = [] // 所有线程集合
this.freeWorkers = [] // 空闲线程
this.tasks = [] // 暂存任务队列
for (let i = 0; i < numThreads; i++)
this.addNewWorker()
// 监听 worker 空闲事件,处理暂存的任务
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift()
this.runTask(task, callback)
}
})
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
this.tasks.push({ task, callback })
return;
}
const worker = this.freeWorkers.pop()
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback) // 存任务 callback 到 asyncResource 类属性上
worker.postMessage(task) // 发送任务到子线程
}
addNewWorker() {
const newWorker = new Worker(path.resolve(__dirname, 'task_processer.js'))
newWorker.on('message', result => {
// 收到子线程回调
// 1. 调用 asyncSource 类的 done 方法
newWorker[kTaskInfo].done(null, result)
// 2. 取消 [kTaskInfo] 绑定
newWorker[kTaskInfo] = null
// 3. 将线程归还,并触发 kWorkerFreedEvent
this.freeWorkers.push(newWorker)
this.emit(kWorkerFreedEvent)
})
newWorker.on('error', er => {
if (newWorker[kTaskInfo])
newWorker[kTaskInfo].done(er, null)
else
this.emit('error', er)
// 删除这个线程,再重新生成一个
this.workers.splice(this.workers.indexOf(newWorker), 1)
this.addNewWorker()
})
// 添加到线程集合和空闲队列
// 并触发 kWorkerFreedEvent
this.workers.push(newWorker)
this.freeWorkers.push(newWorker)
this.emit(kWorkerFreedEvent)
}
close() {
for (const worker of this.workers) worker.terminate()
}
}
module.exports = WorkerPool
使用:
const WorkerPool = require('./worker_pool.js')
const os = require('node:os')
const pool = new WorkerPool(os.cpus().length)
let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42 + i, b: 100 }, (er, result) => {
console.log(i, er, result)
if (++finished === 10) {
pool.close()
}
})
}