Home Reference Source

core/utils/promise-queue.js

import Promise from 'es6-promise';
import { noop, wrapInPromise } from './misc';

/**
 * @private
 */
export class PromiseQueue {
  constructor(maxPendingPromises = 1, maxQueuedPromises = Infinity) {
    this.pendingPromises = 0;
    this.maxPendingPromises = maxPendingPromises;
    this.maxQueuedPromises = maxQueuedPromises;
    this.queue = [];
  }

  enqueue(promiseGenerator) {
    return new Promise((resolve, reject, notify) => {
      if (this.queue.length >= this.maxQueuedPromises) {
        reject(new Error('Queue limit reached'));
        return;
      }

      this.queue.push({
        promiseGenerator: promiseGenerator,
        resolve: resolve,
        reject: reject,
        notify: notify || noop
      });

      this._dequeue();
    });
  }

  getPendingLength() {
    return this.pendingPromises;
  }

  getQueueLength() {
    return this.queue.length;
  }

  _dequeue() {
    if (this.pendingPromises >= this.maxPendingPromises) {
      return false;
    }

    // Remove from queue
    const item = this.queue.shift();
    if (!item) {
      return false;
    }

    try {
      this.pendingPromises += 1;
      wrapInPromise(item.promiseGenerator())
        // Forward all stuff
        .then((value) => {
          // It is not pending now
          this.pendingPromises -= 1;
          // It should pass values
          item.resolve(value);
          this._dequeue();
        }, (err) => {
          // It is not pending now
          this.pendingPromises -= 1;
          // It should not mask errors
          item.reject(err);
          this._dequeue();
        }, (message) => {
          // It should pass notifications
          item.notify(message);
        });
    } catch (err) {
      this.pendingPromises -= 1;
      item.reject(err);
      this._dequeue();
    }

    return true;
  }
}