Node.js Worker Threads – על קצה המזלג

כידוע, Async ו Await הם לא פתרונות קסם, ולעיתים, אנו נדרשים לבצע מטלה שעלולה "לתקוע" את הEvent Loop, כמו חישוב מורכב , ליטרציה כבדה של מערכים ועוד.
אם יש לכם פונקציה שעושה חישוב מורכב, להוסיף בתחילתה async לא ייפתור את הבעיה והיא עדיין תתקע את הEvent Loop.
להזכירכם , NODE עובד בצורה של Event Loop שהוא Single Threaded למעשה.

(ליתר דיוק, הוא דווקא כן עושה שימוש בTHREADS , דהיינו הlibuv מייצר POOL של 4 THREADS כברירת מחדל מה שמאפשר לEVENT LOOP להעביר "משימות כבדות" לPOOL בצורה אוטומטית, בעיקר משימות שקשורות למערכת הקבצים, הצפנות, דחיסות וכו. אבל זה כבר נושא אחר).
בכל מיקרה, זאת האחריות שלנו כמפתחים לכתוב קוד שעושה שימוש בEvent Loop בצורה הנכונה ביותר.

נו, אז מה הפיתרון?

פיתרון אחד לבעיה שאומנם לא מתאים תמיד מבחינת ארכיטקטורה נכונה לכל המקרים, אבל לעיתים דווקא כן יכול להתאים הוא שימוש בWorker Threads, הדגשים הם:

  • לכל THREAD יש Process אחד ואת הEVENT LOOP שלו. – מה שרץ בWorker Thread אחד לא משפיע על השני מבחינת חסימה (None Blocking)

  • ניתן לחלוק זיכרון (לדוגמה באמצעות SharedArrayBuffer) וניתן להעביר מידע לThread שלנו.

  • לכל THREAD יש את הINSTANCE של הV8 וlubuv שלו (ISOLATED, וכן, הם צורכים משאבים)
  • מומלץ בעיקר למשימות שדורשות משאבי CPU

בואו נתחיל ונכתוב את הקוד הבא בParent שלנו (יכול להיות גם בApp.js שלכם):

//Parent Code:
const { Worker } = require('worker_threads');
const worker = new Worker('./worker.js');

בשורה 3 אנו טוענים לWORKER את הקובץ שאנחנו רוצים שהWORKER יריץ, במקרה שלנו worker.js.

נוסיף לקובץ את השורות הבאות:

//Subscribing for message on our Parent.
worker.on('message', message => console.log(message))

//Sending Message to our Worker
worker.postMessage('Hello');

בשורה 2, אנו "נרשמים" (Subscribe) להודעה מהWORKER שלנו, משמע כאשר הWORKER שלנו ישלח לנו הודעה במקרה שלנו הPARENT יריץ console.log(message).

בשורה 5, אנו שולחים לWORKER שלנו message עם הערך 'Hello', עד כאן, לא קרה בעצם יותר מדי, יצרנו WORKER ועשינו Subscribe לMessage ממנו, שלחנו לו Message עם הערך Hello. אבל הParent שלנו עדיין לא ידפיס לconsole את הmessage והסיבה לכך היא שהWorker שלנו כרגע לא שולח לParent
Message בחזרה.

בworker.js נכתוב את הקוד הבא:

const { parentPort } = require('worker_threads');
parentPort.on('message', message => 
    parentPort.postMessage({ hello: message })
);

בשורה 2 אנו עושים Subscribe מהWorker שלנו לParent, דהיינו, כאשר הWorker שלנו יקבל Message הוא יריץ את הפקודה
parentPort.postMessage({ hello: message });
מה שישלח למעשה לParent שלנו .

עד כאן למעשה יצרנו סינכרון מלא בין הParent שלנו לבין הWorker Thread שלו. הנה הקוד המלא:

//Parent Code:
const { Worker } = require('worker_threads');
const worker = new Worker('./worker.js');

//Subscribing for message on our Parent.
worker.on('message', message => console.log(message));

//Sending Message to our Worker
worker.postMessage('Hello');
const { parentPort } = require('worker_threads');
parentPort.on('message', message => 
    parentPort.postMessage({ hello: message });
)

אני ממליץ כמעט תמיד ליצור קובץ נפרד לThread שלנו (במקרה של הדוגמא כאן הוא .worker.js).
אבל במידה ולמקרה הספציפי, קובץ נפרד פחות מתאים נוכל להשתמש בטכניקה הבאה של שימוש בisMainThread כדי להכניס את כל הקוד שלנו לקובץ אחד:

const {Worker, isMainThread} = require('worker_threads');

//האם אנו רצים מה
//Parent?
if(isMainThread) {
//יוצרים Worker 
// חדש ושולחים אליו את הקובץ הנוכחי
 const worker = new Worker(__filename);
} else {
  //קוד זה רץ ב
  //Worker
 console.log('Hi from your worker!');
}

במקרה הנל אנו עושים שימוש בisMainThread שמאפשר לנו לדעת האם אנחנו רצים כרגע מתוך הWorker Thread או מתוך הParent שלנו ולעשות הפרדה בין הCode שירוץ.


איך מעבירים מידע ראשוני לWorker שלנו?

כדי להעביר DATA לWorker , נוכל לעשות שימוש באפשרות workerData, לדוגמה:

const { worker, inMainThread, workerData} = require('worker_threads');

if(isMainThread) {
 const worker = new Worker(__filename, { workerData: 'Hi!' })
} else {
 console.log(workerData); // will print 'Hi!'
}

יש עוד דברים מעניין שניתן לעשות בהם שימוש כמו SHARE_ENV ,eval, ממליץ לכם לתת הצצה בתיעוד ולגלות דברים מעניינים.

אופציה נוספת שאני חושב ששווה לתת דגש עליה היא resourceLimit:

const { Worker } = require('worker_threads');
const worker = new Worker('./worker.js', { resourceLimits: {maxOldGenerationSizeMb: 10} })

במקרה הנל אנו מגבילים את הגודל של הHEAP (זיכרון) ל10MB. אם הWorker יגיע למגבלה הזאת הוא יושמד עם שגיאה.

אילו Events קיימים בWorker?

  • message , לדוגמא parentPort.postMessage()
  • exit – כשהworker שלנו הפסיק.
  • online – כאשר הWorker שלנו התחיל להריץ את הקוד שלו
  • error – כאשר נזרקה מהworker שלנו uncaught exception.

הערה לגבי השימוש בpostMessage:


בדוגמאות הנל עשינו שימוש ב postMessage כדי לשלוח מידע/הודעות בין הPARENT לThread. חשוב מאוד לדעת שכאשר אנו עושים שימוש בPostMessage, הדאטא שאנו מעבירים בעצם "משוכפל" – Cloned עם כל החסרונות / ייתרונות בכך, דהיינו, שיכפול של דאטא מורכב יכול לעלות לנו בהרבה כח CPU, ככל שהדאטא יותר מורכב/מסובך/עמוק יותר כך הוא יידרוש יותר כח מחשוב.

שלא נדבר על הRAM הכפול בעצם שאנו צורכים כדי לשמור את הדאטא הנוסף הזה בזיכרון.
מבחינת ארכיטקטורה, מאוד חשוב לשים לב לזה. כדי ללמוד יותר על הנושא ניתן להריץ חיפוש בגוגל עם המונח is postmessage slow, וגם להכיר את האפשרות של TransferList ועוד. (אולי בהזדמנות אכתוב על זה פוסט נפרד).

מצאתם טעות? הערות? שאלות? הסתדרתם? נתקעתם? כתבו לי בתגובות!