fix(pool)!: settle in-flight task promises on every unexpected worker exit#3211
fix(pool)!: settle in-flight task promises on every unexpected worker exit#3211jerome-benoit wants to merge 120 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
Adds rejection of pending task promises whose target worker has crashed, so that callers of pool.execute() no longer hang indefinitely (which on Node.js 24+ surfaces as ERR_UNSETTLED_TOP_LEVEL_AWAIT / exit code 13). The change extends the existing error once-handler in createAndSetupWorkerNode and introduces a new private helper that walks promiseResponseMap and rejects entries targeting the crashed worker node.
Changes:
- Compute the crashed worker node key once in the
errorhandler and reuse it for redistribution. - Add
rejectInFlightPromises(workerNodeKey, error)to reject and clean up matching entries inpromiseResponseMap.
When a worker crashes (error event), tasks already dispatched to it had their promises orphaned in promiseResponseMap — never resolved or rejected. This caused pool.execute() callers to hang indefinitely. On Node.js 24+, this manifests as exit code 13 (unsettled top-level await) when the event loop drains after all workers eventually exit. The fix iterates promiseResponseMap on worker error and rejects all promises targeting the crashed worker node before termination.
23e2df6 to
21e0835
Compare
…rtWorkerOnError in exit handler - Update promiseResponseMap workerNodeKey when tasks are stolen or redistributed so rejectInFlightTaskPromises targets the correct worker. - Remove readonly on PromiseResponseWrapper.workerNodeKey to allow mutation. - Guard exit handler worker restart with restartWorkerOnError === true for consistency with error handler. - Emit taskFinished and check busyEnd in rejectInFlightTaskPromises to unblock waitWorkerNodeEvents listeners. - Add regression test for in-flight task promise rejection on worker crash.
…tale index handleTaskExecutionResponse now resolves the worker node key from message.workerId via getWorkerNodeKeyByWorkerId() instead of using the stored promiseResponse.workerNodeKey which becomes stale after removeWorkerNode splices the array. Also update task queue tests to use tight mathematical bounds for per-worker executed task count under work-stealing: - lower bound: tasksQueueOptions.concurrency (in-flight tasks cannot be stolen) - upper bound: N*(M-C)+C (one worker steals all stealable tasks)
… handling - Extract handleWorkerNodeCrash to unify crash recovery logic between error handler (threads) and exit handler (cluster workers). - Detect cluster worker crashes via non-zero exit code in exit handler since cluster workers do not emit 'error' on uncaught exceptions. - Use WorkerTypes.cluster enum instead of string literal. - Add cluster crash regression test with crashWorker.cjs. - Refactor crash pool lifecycle to beforeAll/afterAll pattern. - Use emitter.once() for test error listeners to avoid listener leaks.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (3)
src/pools/abstract-pool.ts:1166
- Adding
this.opts.restartWorkerOnError === trueto the exit handler'sstartMinimumNumberOfWorkersgate changes pre-existing behavior for all worker exits, not just crashes. Previously, a worker that exited for any reason (e.g., a fixed worker that exited unexpectedly whilerestartWorkerOnErrorwas disabled, signal-based termination, etc.) would still trigger restoration of the minimum number of workers. With this change, when users setrestartWorkerOnError: false, the pool will permanently shrink belowminimumNumberOfWorkerson any worker exit, which can leave a fixed pool unable to process further tasks. Consider whether this gating should only apply to the crash-detected exit path (cluster non-zero exit code) rather than to all exits.
if (
this.started &&
!this.startingMinimumNumberOfWorkers &&
!this.destroying &&
this.opts.restartWorkerOnError === true
) {
this.startMinimumNumberOfWorkers(true)
}
src/pools/abstract-pool.ts:2304
rejectInFlightTaskPromisesdeliberately skips tasks whosetaskIdis still present in the crashed worker'stasksQueue, relying on the subsequent call toredistributeQueuedTasks(inhandleWorkerNodeCrash) to re-home those tasks. However,redistributeQueuedTasksaborts (break) as soon as it cannot find a destination worker (e.g., single-worker pool, or no eligible destination). In that case the skipped queued-task promises are never rejected and never re-dispatched — they will hang indefinitely, which is exactly the bug this PR is trying to eliminate for in-flight tasks. Additionally, whenenableTasksQueueistruebutrestartWorkerOnErrorisfalseand there is no other worker to receive the tasks, the same leak occurs. Consider rejecting any queued tasks that could not be redistributed (e.g., reject the leftover entries afterredistributeQueuedTasksreturns).
const queuedTaskIds = new Set<string>()
const workerNode = this.workerNodes[workerNodeKey]
for (const task of workerNode.tasksQueue) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
queuedTaskIds.add(task.taskId!)
}
for (const [taskId, promiseResponse] of this.promiseResponseMap) {
if (
promiseResponse.workerNodeKey === workerNodeKey &&
!queuedTaskIds.has(taskId)
) {
const crashError = new Error(
`Worker node crashed with error: '${error.message}'`
)
promiseResponse.asyncResource != null
? promiseResponse.asyncResource.runInAsyncScope(
promiseResponse.reject,
this.emitter,
crashError
)
: promiseResponse.reject(crashError)
promiseResponse.asyncResource?.emitDestroy()
this.promiseResponseMap.delete(taskId)
workerNode.emit('taskFinished', taskId)
}
}
this.checkAndEmitTaskExecutionFinishedEvents()
src/pools/abstract-pool.ts:1893
handleTaskExecutionResponsenow derivesworkerNodeKeyfrommessage.workerIdrather than the storedpromiseResponse.workerNodeKey. While this is more robust against stale keys, it changes semantics whengetWorkerNodeKeyByWorkerIdreturns-1(e.g., the worker has already been removed by the exit handler before the response message is processed): in that caseafterTaskExecutionHookis silently skipped, andtaskFinished/idleevents are not emitted on any worker node. Previously the stored key would have been used. Please confirm that this skipped bookkeeping (task usage counters, queue draining) is intentional, or fall back to the storedworkerNodeKeywhen the lookup fails.
const { asyncResource, reject, resolve } = promiseResponse
const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
const workerNode =
workerNodeKey !== -1 ? this.workerNodes[workerNodeKey] : undefined
if (workerError != null) {
this.emitter?.emit(PoolEvents.taskError, workerError)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const error = this.handleWorkerError(taskId!, workerError)
asyncResource != null
? asyncResource.runInAsyncScope(reject, this.emitter, error)
: reject(error)
} else {
asyncResource != null
? asyncResource.runInAsyncScope(resolve, this.emitter, data)
: resolve(data as Response)
}
asyncResource?.emitDestroy()
if (workerNodeKey !== -1) {
this.afterTaskExecutionHook(workerNodeKey, message)
}
queueMicrotask(() => {
workerNode?.emit('taskFinished', taskId)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.promiseResponseMap.delete(taskId!)
this.checkAndEmitTaskExecutionFinishedEvents()
if (
workerNodeKey !== -1 &&
this.opts.enableTasksQueue === true &&
!this.destroying
) {
if (
!this.isWorkerNodeBusy(workerNodeKey) &&
this.tasksQueueSize(workerNodeKey) > 0
) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
}
if (this.isWorkerNodeIdle(workerNodeKey)) {
workerNode?.emit('idle', {
workerNodeKey,
})
}
…utable array index workerNodeKey stored in promiseResponseMap drifts after removeWorkerNode splices the array. Sequential crashes at different indices caused in-flight promises to be missed. Store workerId (workerNode.info.id) as the stable identifier and match against it in rejectInFlightTaskPromises. Also remove dead startMinimumNumberOfWorkers call in handleWorkerNodeCrash for static workers (no-op since crashed node is still counted).
…andling - Hoist crashError allocation out of loop (avoid repeated stack capture) - Add rejectRemainingQueuedTaskPromises to handle tasks left in queue when redistributeQueuedTasks short-circuits (pool size 1, no eligible destination, cannotStealTask) - Replace flaky sleep(250) in crash tests with deterministic waitWorkerEvents(pool, 'exit', 1)
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
src/pools/abstract-pool.ts:1167
- In the cluster
exithandler path,handleWorkerNodeCrashis invoked beforethis.removeWorkerNode(workerNode).handleWorkerNodeCrashcallsredistributeQueuedTasks, which scansthis.workerNodesfor a destination — the crashed worker is excluded only viaworkerNode.info.ready(set tofalseat the top ofhandleWorkerNodeCrash). However, ifrestartWorkerOnError === trueandworkerNode.info.dynamic, a new dynamic worker is created insidehandleWorkerNodeCrashbefore the crashed node is removed, leaving the pool transiently above its target size. This is likely benign but is a meaningful change from the previous flow (where the error handler calledterminate()after restart/redistribute but before any exit-driven removal). Worth verifying behavior under the dynamic-pool + cluster crash scenario, which is not covered by the new tests (the new tests use aFixedClusterPool/FixedThreadPoolwithrestartWorkerOnError: false).
workerNode.registerOnceWorkerEventHandler('exit', (exitCode: number) => {
// Cluster workers do not emit 'error' on uncaught exceptions;
// detect crashes via non-zero exit code. Intentional signal-based kills
// produce a null code at runtime, skipping this.
const workerNodeKey = this.workerNodes.indexOf(workerNode)
if (
workerNode.info.type === WorkerTypes.cluster &&
workerNode.info.ready &&
workerNodeKey !== -1 &&
!this.destroying &&
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
exitCode != null &&
exitCode !== 0
) {
this.handleWorkerNodeCrash(
workerNode,
new Error(`Worker node exited with code ${exitCode.toString()}`)
)
}
this.removeWorkerNode(workerNode)
if (
this.started &&
!this.startingMinimumNumberOfWorkers &&
!this.destroying &&
this.opts.restartWorkerOnError === true
) {
this.startMinimumNumberOfWorkers(true)
}
})
- Add rejectTaskPromiseResponse() to encapsulate reject + emitDestroy sequence - Add runInAsyncScope() to centralize async resource scope execution - Add JSDoc to rejectRemainingQueuedTaskPromises for consistency
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (2)
src/pools/abstract-pool.ts:1166
- This change introduces a behavior change beyond fixing the in-flight promise issue: previously the exit handler called
startMinimumNumberOfWorkers(true)unconditionally on every worker exit, so a fixed pool whose worker process exited (for any reason) would always be replenished. Now replenishment only occurs whenrestartWorkerOnError === true. Users who setrestartWorkerOnError: falsewhile still expecting the pool to remain populated after non-error exits will see their pool shrink permanently. If this is intentional, it should be called out in the PR description / changelog; if not, consider gating only the crash-driven restart path and keep unconditional replenishment for clean exits.
this.started &&
!this.startingMinimumNumberOfWorkers &&
!this.destroying &&
this.opts.restartWorkerOnError === true
) {
this.startMinimumNumberOfWorkers(true)
}
src/pools/abstract-pool.ts:2290
promiseResponse.workerIdis typed asnumber | undefined, andinfo.idmay itself beundefinedfor a worker that has not yet been assigned an id. IfhandleTaskwere ever invoked for such a worker, the storedworkerIdwould beundefined; then on a worker crash,crashedWorkerId = workerNode.info.idwould also beundefinedand the equality checkpromiseResponse.workerId === crashedWorkerIdwould match every promise whose id was never set, potentially rejecting unrelated in-flight tasks. Consider guardingrejectInFlightTaskPromisesagainstcrashedWorkerId == null(early return) and/or refusing to store an entry inpromiseResponseMapwithout a concreteworkerId.
const crashedWorkerId = workerNode.info.id
const crashError = new Error(
`Worker node crashed with error: '${error.message}'`
)
for (const [taskId, promiseResponse] of this.promiseResponseMap) {
if (
promiseResponse.workerId === crashedWorkerId &&
!queuedTaskIds.has(taskId)
) {
this.rejectTaskPromiseResponse(promiseResponse, crashError)
this.promiseResponseMap.delete(taskId)
workerNode.emit('taskFinished', taskId)
}
}
…ling - Add crashHandled flag to WorkerInfo to prevent double crash handling - Extend exit handler crash detection to thread workers (not just cluster) - Add defensive guard to updatePromiseResponseWorkerId for invalid indices - Decrement executing/increment failed counters on crash rejection - Tighten stolen task upper bounds in test assertions
Strip narrative drift on five protected methods on AbstractPool: buildWorkerCrashError, destroyWorkerNode, handleWorkerNodeCrash, hasInFlightTaskForWorkerId, rejectInFlightTaskPromisesByRef, safeEmitPoolError. Specifically: - Drop redundant '@internal' tag (sibling protected methods do not carry it; visibility already conveys intent). - Replace numbered list in handleWorkerNodeCrash JSDoc with a single declarative sentence. - Remove parenthetical qualifiers ('stable reference', 'captured pre-await') from @param annotations. - Drop ALL-CAPS emphasis ('AFTER'). - Collapse blank line after first JSDoc sentence to match sibling density. No behavior change. Signed-off-by: Jérôme Benoit <[email protected]>
…ghtTaskPromises The 'ByRef' suffix has no sibling in the codebase; the established By-suffix convention is 'ByWorkerId' (e.g. getWorkerNodeKeyByWorkerId). Since there is no sibling 'rejectInFlightTaskPromisesById' to disambiguate against, the qualifier adds no value. Touches 4 source occurrences and 5 test occurrences. The method was introduced by this in-review PR and is not yet released. Signed-off-by: Jérôme Benoit <[email protected]>
Replace the three-line narrative comment explaining EventEmitter listener-order semantics with a single declarative sentence stating the invariant. The mechanism (registration order determines firing order) is Node-EE common knowledge and need not be restated; only the project-specific invariant (pool once-handlers must precede user handlers) needs to be encoded inline. Signed-off-by: Jérôme Benoit <[email protected]>
Six existing WorkerInfo flags (backPressure, backPressureStealing, continuousStealing, queuedTaskAbortion, stealing, stolen) use the '<Name> flag.\nThis flag is set to `true` when [condition].' pattern. The two new flags (crashHandled, terminating) used a divergent 'Set to `true` ...' / 'Never reset.' pattern. Aligned for naming coherence. Signed-off-by: Jérôme Benoit <[email protected]>
All five callers of buildWorkerCrashError are internal to AbstractPool; no subclass overrides or extends it. Sibling factory buildTasksQueueOptions is also private. Tighten the visibility to match. Signed-off-by: Jérôme Benoit <[email protected]>
Pure formatting helpers with no `this` dependency live in `src/pools/utils.ts` per existing convention (16 sibling exports). `formatExitDetail` was the only module-scope helper in `abstract-pool.ts` — relocated for structural consistency. Signed-off-by: Jérôme Benoit <[email protected]>
Two call sites built nearly-identical WorkerCrashError instances for unexpected worker exits — same options shape, message differs only by the ' during teardown' qualifier. Extracted to a single private factory `makeUnexpectedExitError(context, exitCode, signal, workerId)` discriminated by a 'lifecycle' | 'teardown' tag. Signed-off-by: Jérôme Benoit <[email protected]>
WorkerCrashErrorOptions JSDoc duplicated the field-level semantics (taskId, exitCode, signal) already documented on the WorkerCrashError class JSDoc and in the canonical docs/api.md discrimination contract. Trimmed to a one-line description matching WorkerTerminationErrorOptions for symmetry. AGENTS.md: no duplication. Signed-off-by: Jérôme Benoit <[email protected]>
Three distinct sources of warnings, each fixed at root:
1. Build TS2345 narrowing failure on `teardownCause` capture.
The `let teardownCause` mutation by the once-handlers prevents
TypeScript from preserving the `!= null` narrow across the
arrow-function closure in the `errorFactory` ternary.
Capture into a `const cause` after `captureEnabled = false` so
narrowing propagates through the closure.
2. Typedoc broken-link warnings on three JSDoc comments.
`{@link PoolEvents.error}` (enum member, non-navigable target)
and `{@link AbstractPool.rejectRemainingQueuedTaskPromises}`
(private method, excluded from docs) resolved but pointed at
undocumented symbols. Replaced with backtick literals.
3. Typedoc warning on `*Options` interfaces referenced by public
constructor signatures but `@internal` (intentionally unexported
for TS4063 declaration-emit compliance). Added to typedoc's
`intentionallyNotExported` allow-list.
Quality gates after fix:
- `pnpm lint --max-warnings=0` exit 0
- `pnpm build` exit 0, zero TS warnings
- `pnpm build:prod` exit 0
- `pnpm typedoc` exit 0, zero warnings
- `pnpm test` 352 passed / 1 skipped
Signed-off-by: Jérôme Benoit <[email protected]>
The pool's once-'exit' handler at createAndSetupWorkerNode calls
workerNode.terminate() after detecting an exit. Inside doTerminate(),
a fresh once('exit') listener was registered to wait for exit — but
the event has already fired, so waitWorkerExit never resolved and
the WORKER_TERMINATION_GRACE_MS (5000 ms) timer was hit on every
crash/destroy path, retaining the WorkerNode in memory for 5s per
worker.
Capture worker exit state in a private 'exited' flag set by an
once('exit') listener registered in the WorkerNode constructor (fires
first per EventEmitter FIFO order). Fast-path doTerminate() to
immediate cleanup when the flag is set, skipping the wait/grace race
and the redundant disconnect()/terminate()/kill() calls on a worker
that is already dead.
Regression test T11e asserts the fast-path completes in < 250 ms
(20× margin vs the 5000 ms slow-path).
Signed-off-by: Jérôme Benoit <[email protected]>
| * including SIGKILL/SIGSEGV/OOM-killer). Clean exits (`exitCode === 0`) | ||
| * always replenish regardless of this option. In-flight task promises | ||
| * bound to a crashed worker always reject with `WorkerCrashError` | ||
| * regardless of this option. |
The 02951ad change to include CJS artifacts (`lib/**/*.{mjs,cjs}`) was a regression: tests load `.mjs` artifacts only — the `.cjs` mirrors are never executed and report 0% coverage by construction, dragging the global coverage below the 80% threshold (49.4% lines) and breaking the CI 'Node.js 22.x on ubuntu-latest' job which runs `pnpm test:coverage`. Restore the narrower include from 51e25ef, which scopes coverage measurement to the artifacts the test suite actually exercises. Local `pnpm test:coverage` after revert: 90.57% lines, 84.64% branches, 90.71% statements, 93.26% functions. Signed-off-by: Jérôme Benoit <[email protected]>
Summary
pool.destroy()and unexpected worker exits left in-flight task promises inpromiseResponseMappermanently unsettled. On Node.js 24+ this manifests as exit code 13 (ERR_UNSETTLED_TOP_LEVEL_AWAIT) when the event loop drains.This PR settles every in-flight and queued task promise on every voluntary termination and unexpected worker exit, across thread and cluster pools.
Breaking changes
pool.execute()callers without an attached.catchnow observe rejections on:pool.destroy()reachingtasksFinishedTimeoutwhile tasks are in-flight →WorkerTerminationError.process.exit(N)) →WorkerCrashError.PoolEvents.errorpayload on the crash path is exclusively a typedWorkerCrashError(was: rawError). Discriminate viaerror.name; the original throw is exposed viaerror.cause.pool.destroy()is idempotent — concurrent calls share the same in-flight promise instead of throwing.ExitHandler<Worker>widened from(exitCode: number) => voidto(exitCode: number | null, signal?: NodeJS.Signals | null) => voidto surface cluster'exit'semantics (external signal kills, OOM). UnderstrictFunctionTypes, user handlers explicitly typed(code: number) => voidno longer assign — widen the parameter tonumber | nulland optionally acceptsignal.Users who already
await pool.execute()in atry/catchneed no change.New public API (
src/index.ts)WorkerCrashError—{ cause, exitCode, signal, taskId, workerId }WorkerTerminationError—{ cause, taskId, workerId }Discriminate via
error.name === '…'(dual-package CJS/ESM safe).instanceofworks only within a single bundle realm.Implementation
Crash detection (every worker type)
WorkerInfo.terminatingset synchronously before voluntary teardown so the once-'exit'handler skips crash detection.'exit'handler captures(exitCode, signal)and gates crash detection on!terminating && !destroying && !crashHandled && workerNodeKey !== -1 && abnormalExit. Both thread and cluster crashes detected uniformly.(exitCode === 0 || restartWorkerOnError === true)— clean exits replenish even withrestartWorkerOnError: false.WorkerInfo.crashHandledwrite-once flag short-circuits the symmetric'error'/'exit're-entry race.In-flight settlement
rejectInFlightTaskPromisesByRef(workerNode, workerId, errorFactory)— single helper, captures stable references beforeterminate()(use-after-remove safety), per-task error construction, returns the first rejection so callers route the emit.try/finallyaroundwait + reject + kill + terminate— cleanup runs even on synchronous helper throws (helper throws are surfaced viaPoolEvents.error).Event emission
PoolEvents.errorper worker crash, payloadWorkerCrashError.PoolEvents.errorper voluntary destruction with at least one in-flight task, payloadWorkerTerminationError.safeEmitPoolErrorskips emission when no listener is registered (Node throws on'error'with zero listeners) and swallows listener throws so cleanup cannot be aborted.Test coverage
22 test files, 323 passed, 5 skipped, 0 failed. Regression suite at
tests/pools/crash-recovery.test.mjs:worker.kill()(Windows)process.exit(N)mid-taskit.skip— F4 race-window)pool.destroy()with hung in-flight task / idle worker (no spurious cross-worker rejection)destroyWorkerNodewith in-flightcrashHandledwrite-once + direct mutation-killer for HOLE #2pool.destroy()callsenableTasksQueue: falsecrashTests use
{ retry: 0, timeout: 10_000 }and.catch-collection (noprocess.on('unhandledRejection')).Notes
Out of crash-recovery scope:
tests/pools/{thread,cluster}/fixed.test.mjsper-worker bounds (executed,stolen,stolenTasks) widened to ranges. Work-stealing-on-idle redistributes tasks non-deterministically across nodes; the totalexecutedTasksassertion remains exact.Validation
pnpm format— canonicalpnpm lint --max-warnings=0— exit 0pnpm build— exit 0pnpm test— 22/22 files, 323 passed, 5 skipped, 0 failednpx commitlint --from=upstream/master— 0 errors