getCount.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
const slowFunction = (name) => { let counter = 0; console.log(`starting ${name}`) while (counter < 1000000000 * 10) { console.log(`counter ${counter}`); counter++; } console.log(`finish ${name}`); return counter; } process.on('message', (message) => { const arr = message.split(' '); if (arr[0] == 'START') { slowFunction(arr[1]); process.send(`FINISH ${arr[1]}`); } }); |
index.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
var express = require("express"); var app = express(); const { addLongTask, pauseQueue, resumeQueue } = require('./tasker.js'); app.get('/example',function(req,res){ console.log(req.query); console.log(req.query.firstname); res.send("Query String printed to console"); }); app.get('/task',function(req,res) { addLongTask((pid) => { res.setHeader('Content-Type', 'application/json'); res.writeHead(200); res.end(`you've added process with id: ${pid}`); }, (pid) => { console.log(`running pid is ${pid}`); }); }); app.get('/resume',function(req,res) { console.log('Going to resume process'); resumeQueue(); }); app.get('/pause',function(req,res) { console.log('Going to pause executing process'); pauseQueue(); // pause all process in the queue // search out the process with the query ID. // kill it // resume queue // if just kill...then we kill the current running process // if given a pid, then we search it out. // 1) is the current running process THAT id? // 2) then look through the queue. /* if (child) { child.kill("SIGKILL"); console.log('kill the child process!!!!!!'); res.setHeader('Content-Type', 'application/json'); res.writeHead(200); res.end(`${new Date().toISOString()} {"message":"process killed"}`); } */ }); app.get('/hello',function(req,res){ console.log('Returning /hello results'); res.setHeader('Content-Type', 'application/json'); res.writeHead(200); res.end(`${new Date().toISOString()} {"message":"hello"}`); }); app.listen(8080); |
tasker.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
const { fork } = require('child_process'); const Queue = require('./queue.js'); let running = false; let currentExecuting; execute = async (obj) => { const { pid, process } = obj; process.send('START '+pid); return new Promise((resolve, reject) => { process.on('message', (message) => { const [status, pid] = message.split(' '); console.log(`${status} on process ${pid}`); if (Queue.count() == 0 && running) { running = false; } resolve({ status, pid }); }); }); } exports.pauseQueue = async() => { // All we have to do is PAUSE the executing process, and the whole queue will stand still. const { pid, process } = currentExecuting; console.log(`Pausing process ${pid}`); process.kill('SIGSTOP'); } exports.resumeQueue = async() => { // first pause the executing process const { pid, process } = currentExecuting; console.log(`Resuming process ${pid}`); process.kill('SIGCONT'); // then pause all the processes in the queue } exports.kill = async(pidToKill) => { // first we pause the queue this.pauseQueue(); // 1) is the executing process the pid to kill? const { pid, process } = currentExecuting; if (pidToKill == pid) { // kill the current executing process! process.kill("SIGKILL"); console.log(`killed the process ${$pid}`); } // 2) then search through the queue to kill the pid Queue.find(item => { const { pid, process} = item; if (pid === pidToKill) { process.kill("SIGKILL"); return true; } return false; }); // then we resume this.resumeQueue(); } exports.addLongTask = async (addedCB, runningCB) => { const child = fork(__dirname + '/getCount'); console.log(`enqueued child process ${child.pid}`); Queue.enqueue({ pid: child.pid, process: child }); addedCB(child.pid); // if its running, just add it to the queue to be auto processed if (running) { console.log(`Sorry process ${child.pid}! There are processes running, please wait your turn...`); return; } // if its not running and the queue is empty, then we set it off if (!running && !Queue.isEmpty()) { console.log(`Nothing is running and there are ${Queue.count()} tasks on the queue`); running = true; do { const obj = Queue.dequeue(); const { pid } = obj; currentExecuting = obj; runningCB(pid); await execute(obj); } while (Queue.count() > 0); } // always return the pid return child.pid; } |
queue.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
let _queue = []; exports.isEmpty = () => { return (_queue.length == 0); } exports.enqueue = (item) => { _queue.push(item); } exports.dequeue = () => { return _queue.shift(); } exports.count = () => { return _queue.length; } exports.print = () => { if (!isEmpty()) { for (let i = 0; i < _queue.length; i++) { _queue[i].toString(); } } } |