Home Reference Source

packages/remote-protocol/src/Session.js

/**
 * Copyright 2017 Moshe Simantov
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import EventEmitter from 'events';
import { randomBytes } from 'crypto';
import { Action, RequestAction } from './actions';

const kStream = Symbol('stream');
const kRequestTimeout = Symbol('requestTimeout');
const kNextReqId = Symbol('nextReqId');
const kRequestsMap = Symbol('requestsMap');
const kRequestsTimer = Symbol('requestsTimer');
const kEndTimeout = Symbol('endedTimeout');
const kEndTimer = Symbol('isEnded');

// Default timeout for request
const REQUEST_DEFAULT_TIMEOUT = 20e3; // 20 seconds

// How often check for timeout requests
const REQUEST_TIMEOUT_INTERVAL = 100; // 100ms

// If there are open requests, how much time to wait before closing the connection
const SESSION_END_DEFAULT_TIMEOUT = 200; // 200 milliseconds

const REQUEST_ID_BYTES = 4;
const MAX_OPEN_REQUESTS = 256 ** REQUEST_ID_BYTES;

function generateRequestId() {
  return randomBytes(REQUEST_ID_BYTES).readUIntBE(0, REQUEST_ID_BYTES);
}

/**
 * @example
 * import { Session } from 'remote-protocol';
 *
 * const session = new Session(objectStream);
 */
export default class Session extends EventEmitter {
  /**
   * Create a new session with an object-stream to the other peer.
   *
   * @param {Stream} objectStream An object duplex stream
   * @param {object} [opts] Options
   * @param {null|number} [opts.timeout=20e3] Default timeout for requests in milliseconds. If
   * null is given, request will not timeout.
   * @param {null|number} [opts.endTimeout=200] Default timeout to end the session in
   * milliseconds after the other ended the stream.
   */
  constructor(objectStream, opts = {}) {
    if (
      typeof objectStream !== 'object' ||
      typeof objectStream.on !== 'function' ||
      typeof objectStream.write !== 'function'
    ) {
      throw new TypeError(`Invalid stream argument: ${objectStream}`);
    }

    super();

    // for promises that listen for close event
    this.setMaxListeners(0);

    const { timeout, endTimeout } = opts;
    if (timeout !== undefined) {
      if (timeout !== null && typeof timeout !== 'number') {
        throw new TypeError(`Invalid "timeout" option: ${timeout}`);
      }

      /**
       * @type {null|number}
       * @private
       */
      this[kRequestTimeout] = timeout;
    } else {
      this[kRequestTimeout] = REQUEST_DEFAULT_TIMEOUT;
    }

    if (endTimeout !== undefined) {
      if (typeof endTimeout !== 'number') {
        throw new TypeError(`Invalid "endTimeout" option: ${endTimeout}`);
      }

      /**
       * @type {null|number}
       * @private
       */
      this[kEndTimeout] = endTimeout;
    } else {
      this[kEndTimeout] = SESSION_END_DEFAULT_TIMEOUT;
    }

    /**
     * @type {Stream}
     * @private
     */
    this[kStream] = objectStream;

    /**
     * @type {number}
     * @private
     */
    this[kNextReqId] = generateRequestId();

    /**
     * @type {Map}
     * @private
     */
    this[kRequestsMap] = new Map();

    /**
     * @type {null|TimeoutID}
     * @private
     */
    this[kRequestsTimer] = null;

    this[kStream].on('data', action => {
      if (!(action instanceof Action)) {
        this.destroy(
          new TypeError(`Unrecognized action received: ${typeof action}`)
        );
        return;
      }

      try {
        Promise.resolve(action.exec(this)).catch(err => {
          if (!this[kStream]) return;

          this.destroy(err);
        });
      } catch (err) {
        this.destroy(err);
      }
    });

    this[kStream].on('end', () => {
      if (!this[kStream]) return;

      const requests = this[kRequestsMap];
      this[kRequestsMap] = null;

      requests.forEach(({ onRejected }) => {
        onRejected(new Error('Session ended before request completed'));
      });

      this.emit('end');

      if (this.isEnded) {
        this[kStream] = null;
        this.destroy();
        return;
      }

      if (this[kEndTimeout]) {
        /**
         * @type {TimeoutID}
         * @private
         */
        this[kEndTimer] = setTimeout(() => {
          this[kEndTimer] = true;
          this.destroy();
        }, this[kEndTimeout]);
      }
    });

    this[kStream].on('finish', () => {
      if (!this[kStream]) return;

      this.emit('finish');

      if (!this[kRequestsMap]) {
        this[kStream] = null;
        this.destroy();
      }
    });

    this[kStream].on('close', () => {
      if (!this[kStream]) return;

      this[kStream] = null;
      this.destroy();
    });

    this[kStream].on('error', err => {
      if (!this[kStream]) return;

      this[kStream] = null;
      this.destroy(err);
    });
  }

  /**
   * True if the session is open.
   * @type {boolean}
   */
  get isOpen() {
    return !!this[kStream];
  }

  /**
   * True if the session is no longer readable.
   * @type {boolean}
   */
  get isEnded() {
    return !this[kStream] || this[kStream].ended;
  }

  /**
   * The number of requests sent and waiting for response.
   * @type {number}
   */
  get openRequests() {
    if (!this[kRequestsMap]) return 0;

    return this[kRequestsMap].size;
  }

  /**
   * Return session constructor name.
   *
   * @override
   * @return {string}
   */
  toString() {
    return this.constructor.name;
  }

  /**
   * Fetch an arbitrary value.
   * If the given value is instance of {@link Action}, fetch it. Otherwise return it as is.
   *
   * @param {Action|*} value The given value
   * @return {*}
   */
  fetch(value) {
    if (value instanceof Action) {
      return value.fetch(this);
    }

    return value;
  }

  /**
   * Convert the given value to a value that can be send ove to the other peer.
   * If the value can be send as-is, return the same value (default implementation).
   *
   * @param {*} value The value to dispatch
   * @return {*}
   */
  // eslint-disable-next-line class-methods-use-this
  dispatch(value) {
    return value;
  }

  /**
   * Send an action for execution to the other peer.
   *
   * @param {Action} action The action to send
   * @return {Session}
   */
  send(action) {
    if (!(action instanceof Action)) {
      throw new TypeError(
        `Expect first argument to be instance of Action: ${action}`
      );
    }

    if (!this[kStream]) {
      throw new Error(`${this} already destroy`);
    }

    this[kStream].write(action);
    return this;
  }

  /**
   * Request from the other peer to fetch the given action and send back it's response value.
   *
   * @param {Action} action the given action to fetch by the peer
   * @param {object} [opts] Options
   * @param {number|null} [opts.timeout] Override the default timeout for this request (See {@link constructor}).
   * @param {function} onFulfilled a function that will be called with the response value on the
   * first arguments as-is (ie. without `Promise.resolve`).
   * @param {function} onRejected a function that will be called with an error at the first
   * argument if the fetching throws an error or the request timeout as reached.
   * @return {Session}
   */
  request(action, ...args) {
    let opts;
    let onFulfilled;
    let onRejected;

    if (args.length < 3) {
      opts = {};
      onFulfilled = args[0];
      onRejected = args[1];
    } else {
      opts = args[0];
      onFulfilled = args[1];
      onRejected = args[2];

      if (typeof opts !== 'object') {
        throw new TypeError(`Expect "options" to be an object: ${typeof opts}`);
      }
    }

    if (!(action instanceof Action)) {
      throw new TypeError(
        `Expect "action" to be instance of Action: ${action}`
      );
    }

    if (typeof onFulfilled !== 'function') {
      throw new TypeError(
        `Expect "onFulfilled" to be a function: ${typeof onFulfilled}`
      );
    }

    if (typeof onRejected !== 'function') {
      throw new TypeError(
        `Expect "onRejected" to be a function: ${typeof onRejected}`
      );
    }

    if (!this[kStream]) {
      throw new Error(`${this} already destroyed`);
    }

    if (!this[kRequestsMap] || this.isEnded) {
      throw new Error('Session already ended');
    }

    let { timeout } = opts;
    if (timeout !== undefined) {
      if (timeout !== null && typeof timeout !== 'number') {
        throw new TypeError(`Invalid "timeout" option: ${timeout}`);
      }
    } else {
      timeout = this[kRequestTimeout];
    }

    let waitUntil;
    if (timeout) {
      waitUntil = Date.now() + timeout;
    } else {
      waitUntil = null;
    }

    const reqId = this.fetchRequestId();

    this[kRequestsMap].set(reqId, {
      waitUntil,
      onFulfilled,
      onRejected,
    });

    this.send(new RequestAction(reqId, action));

    if (!this[kRequestsTimer]) {
      this.requestsTimeoutInterval();
    }

    return this;
  }

  /**
   * End this session stream.
   * @return {Session}
   */
  end() {
    if (this[kStream]) {
      this[kStream].end();
    }

    return this;
  }

  /**
   * Destroy this session and it's stream.
   *
   * @emits {error} If an error given, emit and "error" event as well.
   * @emits {close} When the session destroyed
   * @param {Error} [err] Optional error
   * @return {void}
   */
  destroy(err) {
    if (this[kRequestsMap] === undefined) return;

    if (this[kStream]) {
      const stream = this[kStream];
      delete this[kStream];

      stream.destroy();
    }

    if (this[kRequestsTimer]) {
      clearTimeout(this[kRequestsTimer]);
    }

    if (this[kEndTimer]) {
      clearTimeout(this[kEndTimer]);
    }

    const requests = this[kRequestsMap];

    delete this[kRequestTimeout];
    delete this[kNextReqId];
    delete this[kRequestsMap];
    delete this[kRequestsTimer];
    delete this[kEndTimer];
    delete this[kStream];

    if (requests && requests.size) {
      requests.forEach(({ onRejected }) => {
        onRejected(err || new Error('Session closed before request completed'));
      });
    } else if (err) {
      this.emit('error', err);
    }

    this.emit('close');
    this.removeAllListeners();
  }

  // Actions methods

  /**
   * @ignore
   *
   * @param {number} reqId
   * @param {*|undefined} value
   * @param {*|undefined} rejected
   * @return {void}
   */
  response(reqId, value, rejected) {
    if (!this[kRequestsMap]) {
      throw new Error("Can't handle response, session already ended");
    }

    const req = this[kRequestsMap].get(reqId);
    if (!req) {
      throw new Error(`Response already sent: ${reqId}`);
    }

    this[kRequestsMap].delete(reqId);
    const { onFulfilled, onRejected } = req;

    if (!rejected) {
      onFulfilled(value);
    } else {
      onRejected(value);
    }
  }

  // Internal methods

  /**
   * @private
   * @return {void}
   */
  requestsTimeoutInterval() {
    this[kRequestsTimer] = null;
    if (!this[kRequestsMap]) return;

    const now = Date.now();
    const requests = [];
    let isDone = true;

    this[kRequestsMap].forEach(({ waitUntil }, key) => {
      if (!waitUntil) return;

      if (waitUntil > now) {
        if (isDone) isDone = false;
        return;
      }

      requests.push(key);
    });

    if (requests.length) {
      process.nextTick(() => {
        requests.forEach(reqId => {
          this.response(reqId, new Error('Request timeout'), true);
        });
      });
    }

    if (isDone) return;

    this[kRequestsTimer] = setTimeout(
      () => this.requestsTimeoutInterval(),
      REQUEST_TIMEOUT_INTERVAL
    );

    if (this[kRequestsTimer].unref) this[kRequestsTimer].unref();
  }

  /**
   * @private
   * @return {number}
   */
  fetchRequestId() {
    if (!this[kRequestsMap]) {
      throw new Error("Can't fetch request id, session already ended");
    }

    let reqId;

    if (this[kRequestsMap].size >= MAX_OPEN_REQUESTS) {
      throw new Error(
        `Maximum open requests number reached: ${MAX_OPEN_REQUESTS}`
      );
    }

    do {
      reqId = this[kNextReqId];
      this[kNextReqId] += 1;

      if (this[kNextReqId] >= MAX_OPEN_REQUESTS) {
        return 0;
      }
    } while (this[kRequestsMap].has(reqId));

    return reqId;
  }
}