Using setInterval() with a .map method in array and returning as a promise

“Simplicity is a great virtue but it requires hard work to achieve it and education to appreciate it. And to make matters worse: complexity sells better.”Edsger W. Dijkstra

The accepted “lightweight” solution is nearly 20,000 lines of code and depends on both CoffeeScript and Lua. What if you could trade all of that for just 50 lines of JavaScript?

Let’s say we have some job that takes some amount of time to compute some result –

async function job(x) {
  // job consumes some time
  await sleep(rand(5000))
  // job computes a result
  return x * 10
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(job))
  .then(console.log, console.error)
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]

This runs all twelve (12) jobs at once. If these were requests to a remote, some connections could be rejected because you are flooding the server with too much simultaneous traffic. By modeling a Pool of threads, we control the flow of the parallelized jobs –

// my pool with four threads
const pool = new Pool(4)

async function jobQueued(x) {
  // wait for pool thread
  const close = await pool.open()
  // run the job and close the thread upon completion
  return job(x).then(close)
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
  .then(console.log, console.error)
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]

Functions should be small and do just one thing. This makes it easier to write individual features and promotes a higher degree of reusability, allowing you to combine several simiple features into more sosphisticated ones. Above you already saw rand and sleep

const rand = x =>
  Math.random() * x

const sleep = ms =>
  new Promise(r => setTimeout(r, ms))

If we want to throttle each job, we can specialize sleep to ensure a minimum runtime –

const throttle = (p, ms) =>
  Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)

async function jobQueued(x) {
  const close = await pool.open()
  // ensure job takes at least 3 seconds before freeing thread
  return throttle(job(x), 3000).then(close)
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
  .then(console.log, console.error)
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]

We can add some console.log messages to ensure things are running properly. And we will add a random sleep at the beginning of the job to show that the tasks can queue in any order without affecting the order of the result –

async function jobQueued(x) {
  await sleep(rand(5000))
  console.log("queueing", x)
  const close = await pool.open()
  console.log("  sending", x)
  const result = await throttle(job(x), 3000).then(close)
  console.log("    received", result)
  return result
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
  .then(console.log, console.error)
console.log thread 1 thread 2 thread 3 thread 4
queueing 12
   sending 12 open
queueing 9
   sending 9 open
queueing 8
   sending 8 open
queueing 4
   sending 4 open
queueing 10
queueing 6
queueing 7
queueing 2
queueing 11
      received 120 closed
   sending 11 open
queueing 3
queueing 5
queueing 1
      received 80 closed
   sending 1 open
      received 90 closed
   sending 5 open
      received 110 closed
   sending 3 open
      received 40 closed
   sending 2 open
      received 10 closed
   sending 7 open
      received 50 closed
   sending 6 open
      received 20 closed
   sending 10 open
      received 30 closed
      received 70 closed
      received 60 closed
      received 100 closed
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120]

Above, our pool was initialized with size=4 so up to four jobs will run concurrently. After we see sending four times, a job must be completed and we see received before the next job begins. queueing can happen at any time. You may also notice Pool processes queued jobs using an efficient last-in-first-out (LIFO) order but the order of the result is maintained.

Moving on with our implementation, like our other functions, we can write thread in a simple way –

const effect = f => x =>
  (f(x), x)

const thread = close =>
  [new Promise(r => { close = effect(r) }), close]

function main () {
  const [t, close] = thread()
  console.log("please wait...")
  setTimeout(close, 3000)
  return t.then(_ => "some result")
}

main().then(console.log, console.error)
please wait...
(3 seconds later)
some result

And now we can use thread to write more sophisticated features like Pool

class Pool {
  constructor (size = 4) {
    Object.assign(this, { pool: new Set, stack: [], size })
  }
  open () {
    return this.pool.size < this.size
      ? this.deferNow()
      : this.deferStacked()
  }
  deferNow () {
    const [t, close] = thread()
    const p = t
      .then(_ => this.pool.delete(p))
      .then(_ => this.stack.length && this.stack.pop().close())
    this.pool.add(p)
    return close
  }
  deferStacked () {
    const [t, close] = thread()
    this.stack.push({ close })
    return t.then(_ => this.deferNow())
  }
}

And just like that your program is complete. In the functioning demo below, I condensed the definitions so we can see them all at once. Run the program to verify the result in your own browser –

class Pool {
  constructor (size = 4) { Object.assign(this, { pool: new Set, stack: [], size }) }
  open () { return this.pool.size < this.size ? this.deferNow() : this.deferStacked() }
  deferNow () { const [t, close] = thread(); const p = t.then(_ => this.pool.delete(p)).then(_ => this.stack.length && this.stack.pop().close()); this.pool.add(p); return close }
  deferStacked () { const [t, close] = thread(); this.stack.push({ close }); return t.then(_ => this.deferNow()) }
}
const rand = x => Math.random() * x
const effect = f => x => (f(x), x)
const thread = close => [new Promise(r => { close = effect(r) }), close]
const sleep = ms => new Promise(r => setTimeout(r, ms))
const throttle = (p, ms) => Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)

const myJob = x => sleep(rand(5000)).then(_ => x * 10)
const pool = new Pool(4)

async function jobQueued(x) {
  await sleep(rand(5000))
  console.log("queueing", x)
  const close = await pool.open()
  console.log("  sending", x)
  const result = await throttle(myJob(x), 3000).then(close)
  console.log("    received", result)
  return result
}

Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(jobQueued))
  .then(JSON.stringify)
  .then(console.log, console.error)
.as-console-wrapper { min-height: 100%; }

Hopefully you learned something fun about JavaScript! If you enjoyed this, try expanding on Pool features. Maybe add a simple timeout function that ensures a job completes within a certain amount of time. Or maybe add a retry function that re-runs a job if it produces an error or times out. To see Pool applied to another problem, see this Q&A. If you have any questions, I’m happy to assist 😀

Leave a Comment