@@ -14,7 +14,11 @@ in other languages.
1414The ` AsyncLocalStorage ` and ` AsyncResource ` classes are part of the
1515` async_hooks ` module:
1616
17- ``` js
17+ ``` mjs
18+ import async_hooks from ' async_hooks' ;
19+ ```
20+
21+ ``` cjs
1822const async_hooks = require (' async_hooks' );
1923```
2024
@@ -40,7 +44,39 @@ The following example uses `AsyncLocalStorage` to build a simple logger
4044that assigns IDs to incoming HTTP requests and includes them in messages
4145logged within each request.
4246
43- ``` js
47+ ``` mjs
48+ import http from ' http' ;
49+ import { AsyncLocalStorage } from ' async_hooks' ;
50+
51+ const asyncLocalStorage = new AsyncLocalStorage ();
52+
53+ function logWithId (msg ) {
54+ const id = asyncLocalStorage .getStore ();
55+ console .log (` ${ id !== undefined ? id : ' -' } :` , msg);
56+ }
57+
58+ let idSeq = 0 ;
59+ http .createServer ((req , res ) => {
60+ asyncLocalStorage .run (idSeq++ , () => {
61+ logWithId (' start' );
62+ // Imagine any chain of async operations here
63+ setImmediate (() => {
64+ logWithId (' finish' );
65+ res .end ();
66+ });
67+ });
68+ }).listen (8080 );
69+
70+ http .get (' http://localhost:8080' );
71+ http .get (' http://localhost:8080' );
72+ // Prints:
73+ // 0: start
74+ // 1: start
75+ // 0: finish
76+ // 1: finish
77+ ```
78+
79+ ``` cjs
4480const http = require (' http' );
4581const { AsyncLocalStorage } = require (' async_hooks' );
4682
@@ -299,7 +335,35 @@ The `init` hook will trigger when an `AsyncResource` is instantiated.
299335
300336The following is an overview of the ` AsyncResource ` API.
301337
302- ``` js
338+ ``` mjs
339+ import { AsyncResource , executionAsyncId } from ' async_hooks' ;
340+
341+ // AsyncResource() is meant to be extended. Instantiating a
342+ // new AsyncResource() also triggers init. If triggerAsyncId is omitted then
343+ // async_hook.executionAsyncId() is used.
344+ const asyncResource = new AsyncResource (
345+ type, { triggerAsyncId: executionAsyncId (), requireManualDestroy: false }
346+ );
347+
348+ // Run a function in the execution context of the resource. This will
349+ // * establish the context of the resource
350+ // * trigger the AsyncHooks before callbacks
351+ // * call the provided function `fn` with the supplied arguments
352+ // * trigger the AsyncHooks after callbacks
353+ // * restore the original execution context
354+ asyncResource .runInAsyncScope (fn, thisArg, ... args);
355+
356+ // Call AsyncHooks destroy callbacks.
357+ asyncResource .emitDestroy ();
358+
359+ // Return the unique ID assigned to the AsyncResource instance.
360+ asyncResource .asyncId ();
361+
362+ // Return the trigger ID for the AsyncResource instance.
363+ asyncResource .triggerAsyncId ();
364+ ```
365+
366+ ``` cjs
303367const { AsyncResource , executionAsyncId } = require (' async_hooks' );
304368
305369// AsyncResource() is meant to be extended. Instantiating a
@@ -446,7 +510,14 @@ database connection pools, can follow a similar model.
446510Assuming that the task is adding two numbers, using a file named
447511` task_processor.js ` with the following content:
448512
449- ``` js
513+ ``` mjs
514+ import { parentPort } from ' worker_threads' ;
515+ parentPort .on (' message' , (task ) => {
516+ parentPort .postMessage (task .a + task .b );
517+ });
518+ ```
519+
520+ ``` cjs
450521const { parentPort } = require (' worker_threads' );
451522parentPort .on (' message' , (task ) => {
452523 parentPort .postMessage (task .a + task .b );
@@ -455,7 +526,95 @@ parentPort.on('message', (task) => {
455526
456527a Worker pool around it could use the following structure:
457528
458- ``` js
529+ ``` mjs
530+ import { AsyncResource } from ' async_hooks' ;
531+ import { EventEmitter } from ' events' ;
532+ import path from ' path' ;
533+ import { Worker } from ' worker_threads' ;
534+
535+ const kTaskInfo = Symbol (' kTaskInfo' );
536+ const kWorkerFreedEvent = Symbol (' kWorkerFreedEvent' );
537+
538+ class WorkerPoolTaskInfo extends AsyncResource {
539+ constructor (callback ) {
540+ super (' WorkerPoolTaskInfo' );
541+ this .callback = callback;
542+ }
543+
544+ done (err , result ) {
545+ this .runInAsyncScope (this .callback , null , err, result);
546+ this .emitDestroy (); // `TaskInfo`s are used only once.
547+ }
548+ }
549+
550+ export default class WorkerPool extends EventEmitter {
551+ constructor (numThreads ) {
552+ super ();
553+ this .numThreads = numThreads;
554+ this .workers = [];
555+ this .freeWorkers = [];
556+ this .tasks = [];
557+
558+ for (let i = 0 ; i < numThreads; i++ )
559+ this .addNewWorker ();
560+
561+ // Any time the kWorkerFreedEvent is emitted, dispatch
562+ // the next task pending in the queue, if any.
563+ this .on (kWorkerFreedEvent, () => {
564+ if (this .tasks .length > 0 ) {
565+ const { task , callback } = this .tasks .shift ();
566+ this .runTask (task, callback);
567+ }
568+ });
569+ }
570+
571+ addNewWorker () {
572+ const worker = new Worker (new URL (' task_processer.js' , import .meta.url));
573+ worker .on (' message' , (result ) => {
574+ // In case of success: Call the callback that was passed to `runTask`,
575+ // remove the `TaskInfo` associated with the Worker, and mark it as free
576+ // again.
577+ worker[kTaskInfo].done (null , result);
578+ worker[kTaskInfo] = null ;
579+ this .freeWorkers .push (worker);
580+ this .emit (kWorkerFreedEvent);
581+ });
582+ worker .on (' error' , (err ) => {
583+ // In case of an uncaught exception: Call the callback that was passed to
584+ // `runTask` with the error.
585+ if (worker[kTaskInfo])
586+ worker[kTaskInfo].done (err, null );
587+ else
588+ this .emit (' error' , err);
589+ // Remove the worker from the list and start a new Worker to replace the
590+ // current one.
591+ this .workers .splice (this .workers .indexOf (worker), 1 );
592+ this .addNewWorker ();
593+ });
594+ this .workers .push (worker);
595+ this .freeWorkers .push (worker);
596+ this .emit (kWorkerFreedEvent);
597+ }
598+
599+ runTask (task , callback ) {
600+ if (this .freeWorkers .length === 0 ) {
601+ // No free threads, wait until a worker thread becomes free.
602+ this .tasks .push ({ task, callback });
603+ return ;
604+ }
605+
606+ const worker = this .freeWorkers .pop ();
607+ worker[kTaskInfo] = new WorkerPoolTaskInfo (callback);
608+ worker .postMessage (task);
609+ }
610+
611+ close () {
612+ for (const worker of this .workers ) worker .terminate ();
613+ }
614+ }
615+ ` ` `
616+
617+ ` ` ` cjs
459618const { AsyncResource } = require (' async_hooks' );
460619const { EventEmitter } = require (' events' );
461620const path = require (' path' );
@@ -553,7 +712,23 @@ were scheduled.
553712
554713This pool could be used as follows:
555714
556- ``` js
715+ ` ` ` mjs
716+ import WorkerPool from ' ./worker_pool.js' ;
717+ import os from ' os' ;
718+
719+ const pool = new WorkerPool (os .cpus ().length );
720+
721+ let finished = 0 ;
722+ for (let i = 0 ; i < 10 ; i++ ) {
723+ pool .runTask ({ a: 42 , b: 100 }, (err , result ) => {
724+ console .log (i, err, result);
725+ if (++ finished === 10 )
726+ pool .close ();
727+ });
728+ }
729+ ` ` `
730+
731+ ` ` ` cjs
557732const WorkerPool = require (' ./worker_pool.js' );
558733const os = require (' os' );
559734
@@ -579,7 +754,22 @@ The following example shows how to use the `AsyncResource` class to properly
579754associate an event listener with the correct execution context. The same
580755approach can be applied to a [` Stream` ][] or a similar event-driven class.
581756
582- ``` js
757+ ` ` ` mjs
758+ import { createServer } from ' http' ;
759+ import { AsyncResource , executionAsyncId } from ' async_hooks' ;
760+
761+ const server = createServer ((req , res ) => {
762+ req .on (' close' , AsyncResource .bind (() => {
763+ // Execution context is bound to the current outer scope.
764+ }));
765+ req .on (' close' , () => {
766+ // Execution context is bound to the scope that caused 'close' to emit.
767+ });
768+ res .end ();
769+ }).listen (3000 );
770+ ` ` `
771+
772+ ` ` ` cjs
583773const { createServer } = require (' http' );
584774const { AsyncResource , executionAsyncId } = require (' async_hooks' );
585775
@@ -593,6 +783,7 @@ const server = createServer((req, res) => {
593783 res .end ();
594784}).listen (3000 );
595785` ` `
786+
596787[` AsyncResource` ]: #async_context_class_asyncresource
597788[` EventEmitter` ]: events.md#events_class_eventemitter
598789[` Stream` ]: stream.md#stream_stream
0 commit comments