const { assert } = require('solclient-eskit');
const { MessageConsumer } = require('./message-consumer');
const { MessageConsumerEventName } = require('./message-consumer-event-names');

class ConsumerFlows {
  /**
   * Defines a set of Guaranteed Messaging Flows.  There are two sets allocated, one
   * for publishers (typically just one entry) and one for Subscribers.
   * There is a separate set for flows in RECONNECTING state.
   * These need special treatment during SESSION_DOWN:
   * they need to be notified, but not waited on by the session FSM.
   * Flows in _reconnectingFlows are not in _allFLows.
   * @param {any} options Construction options
   * @private
   */
  constructor() {
    this._allFlows = new Set();
    this._reconnectingFlows = new Set();
    // Flows by id (UP)
    this._flowsById = {};
  }

  add(flow) {
    assert(flow instanceof MessageConsumer, 'Flow was not a consumer');
    if (this._allFlows.has(flow)) {
      return flow;
    }

    const flowUp = () => {
      // Don't worry about flow collisions -- the router assigned the new flow to the same ID
      // so it won't address the old flow. Retain the old flow in _allFlows and make sure we
      // dispose the right flow.
      this._flowsById[flow.flowId] = flow;
    };
    const flowDisposed = () => {
      const flowId = flow.flowId;
      this._allFlows.delete(flow);
      this._reconnectingFlows.delete(flow);
      const flowById = this._flowsById[flowId];
      // If this test fails, the router reassigned the flow ID, and we were only keeping the
      // _allFlows set reference for blanket cleanup.
      if (flowById === flow) {
        delete this._flowsById[flowId];
      }
      // Flow will be disposed and this will happen automatically
    };
    const flowReconnecting = () => {
      flowDisposed();
      this._reconnectingFlows.add(flow);
    };
    const flowReconnected = () => {
      flowUp();
      this._allFlows.add(flow);
      this._reconnectingFlows.delete(flow);
    };
    const flowDown = () => {
      this._reconnectingFlows.delete(flow);
    };

    flow._on(MessageConsumerEventName.UP, flowUp);
    flow._on(MessageConsumerEventName.RECONNECTED, flowReconnected);
    flow._on(MessageConsumerEventName.DISPOSED, flowDisposed);
    flow._on(MessageConsumerEventName.RECONNECTING, flowReconnecting);
    flow._on(MessageConsumerEventName.DOWN, flowDown);
    flow._on(MessageConsumerEventName.DOWN_ERROR, flowDown);

    this._allFlows.add(flow);
    return flow;
  }

  get flows() {
    return Array.from(this._allFlows);
  }
  get reconnectingFlows() {
    return Array.from(this._reconnectingFlows);
  }

  getFlowById(flowId) {
    return this._flowsById[flowId];
  }

  disposeAll() {
    this._allFlows.forEach(flow => flow.dispose());
  }

}

module.exports.ConsumerFlows = ConsumerFlows;
