const { MessageConsumerEventName } = require('./message-consumer-event-names');

function formatEventName(x) {
  return `MessageConsumerEventName.${MessageConsumerEventName.describe(x)}`;
}

class MessageDispatcher {
  constructor({ emitter, autoAck, logger } = {}) {
    Object.assign(this, {
      emitter,
      queue:    [],
      dispatch: true,
      formatEventName,
      logger,
    });
    this._dispatchOne = autoAck ? this._dispatchOneAutoAck : this._dispatchOneBare;
    this.emitter.setOnFirstDirectListener(this._onFirstMessageListener.bind(this));
    // Although a listener may not be available, we set it to true so that we detect a transition to
    // false when we attempt to dispatch the first message, which will generate a log that
    // dispatching is stopped due to a missing listener.
    this._availableListener = true;
  }

  start() {
    this.dispatch = true;
    this._flush();
  }

  stop() {
    this.dispatch = false;
  }

  get length() {
    return this.queue.length;
  }

  push(message) {
    const { LOG_TRACE } = this.logger;
    this.queue.push(message);

    if (this.dispatch) {
      this._flush();
    } else {
      LOG_TRACE(`Dispatch disabled, message ${message.getGuaranteedMessageId()} queued locally`);
    }
  }

  _onFirstMessageListener() {
    const { LOG_DEBUG } = this.logger;
    if (!this._availableListener) {
      LOG_DEBUG(`Message listener available for dispatcher, ${this.queue.length} messages queued`);
      LOG_DEBUG(`Dispatcher started and connected: ${this.dispatch ? 'true' : 'false'}`);
      this._availableListener = true;
    }
    this._flush();
  }

  _flush() {
    const { LOG_DEBUG } = this.logger;
    // Check whether dispatch enabled for every element,
    // to handle when #stop is called from a message handler
    while (this.queue.length && this.dispatch && (this.emitter.directListenerCount() > 0)) {
      this._dispatchOne(this.queue.shift());
    }

    if (this.queue.length && this.dispatch &&
        (this.emitter.directListenerCount() === 0) && this._availableListener) {
      LOG_DEBUG('Message dispatching stopped: No message listener registered');
      this._availableListener = false;
    }
  }

  _dispatchOneAutoAck(message) {
    const { LOG_WARN } = this.logger;
    // Set the current outbound message, dispatch it, and clear the current outbound message
    let caught = null;

    // Auto-ack unless the receiver throws.
    caught = this._dispatchOneBare(message);
    // Outside of exception block because we want to throw normally from message.acknowledge()
    if (caught) {
      LOG_WARN(`Suppressing message acknowledgement for message ${message.getGuaranteedMessageId()
               } because client threw exception from listener`, caught);
    } else {
      // Did the user manually ack for some reason?
      if (message.isAcknowledged) {
        LOG_WARN(`Consumer configured to auto-acknowledge messages, but message ${
                 message.getGuaranteedMessageId()} was application acknowledged`);
        return;
      }
      message.acknowledge(); // No, so ack the message
    }
  }

  _dispatchOneBare(message) {
    const { LOG_WARN } = this.logger;
    let caught;
    // Requires the emitter's direct option to be MessageConsumerEventName.MESSAGE
    if (this.listenerCount === 0) {
      LOG_WARN(`No listeners to dispatch message ${message.getGuaranteedMessageId()}`);
    }
    // Since _dispatchOneBare uses emitDirect, this behaviour is not affected by the presence
    // of an 'error' handler.
    try {
      this.emitter.emitDirect(message);
    } catch (ex) {
      // User code threw an exception
      caught = this.emitter.formatErrorEvent(ex, MessageConsumerEventName.MESSAGE, message);
      // Also propagating to the common error handler
      this.emitter.emit('error', caught);
    }
    return caught;
  }

}

module.exports = { MessageDispatcher };
