const SMFLib = require('solclient-smf');
const { CapabilityType } = require('solclient-session');
const { ConsumerFSM } = require('./consumer-fsm');
const { ConsumerFSMEvent } = require('./consumer-fsm-event');
const { ConsumerFSMEventNames } = require('./consumer-fsm-event-names');
const { ErrorResponseSubcodeMapper,
  ErrorSubcode,
  OperationError } = require('solclient-error');
const { Flow, FlowOperation } = require('solclient-flow');
const { MessageConsumerEvent } = require('./message-consumer-event');
const { MessageConsumerEventName } = require('./message-consumer-event-names');
const { MessageConsumerProperties } = require('./message-consumer-properties');
const { MessageConsumerPropertiesValidator } = require('./message-consumer-properties-validator');
const { Queue, Topic } = require('solclient-destination');
const { QueueAccessType, QueuePermissions, QueueDiscardBehavior } = require('solclient-queue');


function formatEventName(eventName) {
  return `MessageConsumerEventName.${MessageConsumerEventName.describe(eventName)}`;
}

let localCounter = 0;
function getConsumerLocalName() {
  return `ConsumerFSM ${localCounter++}`;
}

/**
 * @classdesc
 * <b>This class is not exposed for construction by API users.</b>
 * A Message Consumer is created by calling {@link solace.Session#createMessageConsumer}.
 *
 * A MessageConsumer controls Guaranteed Message delivery to this client.
 *
 * Consumer characteristics and behavior are defined by {@link solace.MessageConsumerProperties}.
 * The properties can also be supplied as a simple key-value {Object}. The queue descriptor,
 * {@link solace.MessageConsumerProperties#queueDescriptor} must be specified to identify the
 * Guaranteed Message Queue or Guaranteed Message Topic Endpoint on the Solace Message Router.
 *
 * The MessageConsumer object is an EventEmitter, and will emit events to which the
 * application may choose to subscribe, such as the connection to the Solace Message Router
 * going up or down.
 *
 * If a registered listener for an emitted event throws an exception, this is caught and emitted as
 * an 'error'.
 *
 * @fires solace.MessageConsumerEventName#ACTIVE
 * @fires solace.MessageConsumerEventName#CONNECT_FAILED_ERROR
 * @fires solace.MessageConsumerEventName#DISPOSED
 * @fires solace.MessageConsumerEventName#DOWN
 * @fires solace.MessageConsumerEventName#DOWN_ERROR
 * @fires solace.MessageConsumerEventName#GM_DISABLED
 * @fires solace.MessageConsumerEventName#INACTIVE
 * @fires solace.MessageConsumerEventName#MESSAGE
 * @fires solace.MessageConsumerEventName#UP
 * @fires solace.MessageConsumerEventName#SUBSCRIPTION_OK
 * @fires solace.MessageConsumerEventName#SUBSCRIPTION_ERROR
 *
 *
 * @hideconstructor
 * @extends solace.Flow
 * @memberof solace
 */
class MessageConsumer extends Flow {
  constructor({ properties, sessionInterfaceFactory } = {}) {
    const applyProperties = new MessageConsumerProperties(properties);
    MessageConsumerPropertiesValidator.validate(applyProperties.browser ?
      'QueueBrowserProperties' : 'MessageConsumerProperties',
                                                applyProperties, properties);
    super(applyProperties, sessionInterfaceFactory, {
      direct: MessageConsumerEventName.MESSAGE,
      emits:  MessageConsumerEventName.values,
      formatEventName,
    });

    const superFormatter = this.logger.formatter;
    this.logger.formatter = (...args) => superFormatter('[message-consumer]', ...args);

    this._active = undefined;
    this._fsm = this._makeFSM();
    this.endpointErrorId = undefined;

    this._on(MessageConsumerEventName.ACTIVE, () => this._onFlowActive(true));
    this._on(MessageConsumerEventName.INACTIVE, () => this._onFlowActive(false));
    this._on(MessageConsumerEventName.DOWN_ERROR, this._onFlowDisconnected.bind(this));
    this._on(MessageConsumerEventName.UP, this._onFlowUp.bind(this));
    this._fsm.start(); // Subscriber flows self-manage, so they start immediately
  }

  _makeFSM() {
    const properties = this._properties;
    const name = `${getConsumerLocalName()}`;
    return new ConsumerFSM({
      name,
      consumer:         this,
      sessionInterface: this._sessionInterface,
      properties,
    });
  }

  /**
   * Begins delivery of messages to this consumer. This method opens the protocol window
   * to the Solace Message Router so further messages can be received.
   *
   * A newly created consumer is in started state.
   *
   * If the consumer was already started, this method has no effect.
   *
   * A consumer is stopped by calling {@link solace.MessageConsumer.stop}
   *
   * @throws {solace.OperationError}
   * * if the Message Consumer is disposed.
   *   subcode = {@link solace.ErrorSubcode.INVALID_OPERATION}
   * * if the Message Consumer is disconnected.
   *   subcode = {@link solace.ErrorSubcode.INVALID_OPERATION}
   */
  start() {
    this._operationCheck(FlowOperation.START);
    this._fsm.requestStartDispatchUser();
  }

  /**
   * Stops messages from being delivered to this consumer from the Solace Message Router.
   * Messages may continue to be prefetched by the API and queued internally
   * until {@link solace.MessageConsumer#start} is called.
   *
   * If the consumer was already stopped, this method has no effect.
   *
   * @throws {solace.OperationError}
   * * if the Message Consumer is disconnected.
   *   subcode = {@link solace.ErrorSubcode.INVALID_OPERATION}
   */
  stop() {
    this._operationCheck(FlowOperation.STOP);
    this._fsm.requestStopDispatchUser();
  }

  /**
   * Connects the consumer immediately. The application should add event listeners (see
   * {@link solace.MessageConsumerEventName}). If there is no listener added for
   * {@link solace.MessageConsumerEventName#event:MESSAGE} then up to a window
   * {@link solace.MessageConsumerProperties.windowSize} of messages can be queued internally.
   * to the {@link solace.MessageConsumer} before calling this method.
   *
   * @throws {solace.OperationError}
   *  * if consumer is not supported by router for this client.
   *  subcode = {@link solace.ErrorSubcode.INVALID_OPERATION}
   *
   */
  connect() {
    if ((this._sessionInterface.getCapability(CapabilityType.GUARANTEED_MESSAGE_CONSUME)
        !== null) &&
        (!this._sessionInterface.isCapable(CapabilityType.GUARANTEED_MESSAGE_CONSUME))) {
      throw new OperationError('Consumer is not supported by router for this client',
              ErrorSubcode.INVALID_OPERATION, null);
    }
    super.connect();
    this.processFSMEvent(new ConsumerFSMEvent({ name: ConsumerFSMEventNames.FLOW_OPEN }));
  }

  /**
   * Initiates an orderly disconnection of the Message Consumer. The API will send any pending
   * client acknowledgements on the Message Consumer, then send an unbind request.
   * Any messages subsequently
   * received are discarded silently. When the unbind message is acknowledged, the application
   * receives a {@link solace.MessageConsumerEventName#event:DOWN} event if it has set a listener
   * for that event.
   *
   * @throws {solace.OperationError}
   * * if the Message Consumer is disconnected.
   *   subcode = {@link solace.ErrorSubcode.INVALID_OPERATION}
   */
  disconnect() {
    super.disconnect();
    this.processFSMEvent(new ConsumerFSMEvent({ name: ConsumerFSMEventNames.FLOW_CLOSE }));
  }

  /**
   * Returns the destination that should be used to publish messages that this consumer
   * will receive.
   * * For topic endpoints, this is the topic to which the topic endpoint is subscribed.
   * * For queues, this is the associated queue destination.
   *
   * The destination returned can
   * be used to set the ReplyTo field in a message, or otherwise communicated
   * to partners that need to send messages to this Message Consumer. This is especially useful
   * for temporary endpoints (Queues and Topic Endpoints), as the destination
   * is unknown before the endpoint is created.
   *
   * This method will succeed after {@link solace.MessageConsumerEventName#event:UP} for temporaries
   * with generated destinations.
   *
   * @throws {solace.OperationError}
   * * if the {@link solace.MessageConsumer} is disconnected and the destination is temporary.
   *
   * @returns {solace.Destination} The publishing destination that delivers to this consumer.
   */
  getDestination() {
    const destination = this._fsm.getDestination();
    if (destination instanceof Queue) {
      return new Queue(destination);
    }
    return new Topic(destination);
  }

  // Application has disconnected the session, we must continue to orderly shut down
  // unbinding Message Consumers,
  // but Message Consumers that are up merely treat this as a 'down' event and
  // transition to awaitingSessionUp
  _disconnectSession() {
    super._disconnectSession();
    this.processFSMEvent(new ConsumerFSMEvent({ name: ConsumerFSMEventNames.SESSION_DISCONNECT }));
  }

  _operationCheck(operation) {
    super._operationCheck(operation);
    switch (operation) {
      case FlowOperation.GET_DESTINATION:
        if (this._isDisconnected()) {
          throw new OperationError('Cannot get destination of a disconnected flow',
                                   ErrorSubcode.INVALID_OPERATION);
        }
        break;
      default:
    }
  }

  // ----

  /**
   * @param {any} messageId The message ID to ack
   * @internal
   */
  applicationAck(messageId) {
    const { LOG_TRACE } = this.logger;
    LOG_TRACE(`Adding application ack for ${messageId}`);
    this._fsm.applicationAck(messageId);
  }

  getDisposedEvent() { // eslint-disable-line class-methods-use-this
    return MessageConsumerEventName.DISPOSED;
  }

  /**
   * @param {solace.Message} message The data message to handle
   * @internal
   */
  handleDataMessage(message) {
    const { LOG_TRACE } = this.logger;
    LOG_TRACE('Handling data message');
    message.setMessageConsumer(this);
    this._fsm.acceptMessage(message);
  }

  /**
   * @override
   * @param {solace.AdProtocolMessage} message The control message to handle
   * @internal
   */
  handleUncorrelatedControlMessage(message) {
    const { LOG_INFO, LOG_DEBUG, LOG_TRACE } = this.logger;
    LOG_INFO('Handling uncorrelated control message');
    const msgType = message.msgType;
    const { SMFAdProtocolMessageType } = SMFLib;
    switch (msgType) {
      case SMFAdProtocolMessageType.UNBIND: {
        const responseCode = message.smfHeader.pm_respcode;
        const description = message.smfHeader.pm_respstr;
        const errorSubcode = ErrorResponseSubcodeMapper.getADErrorSubcode(responseCode,
                                                                          description);
        LOG_TRACE(`Handling uncorrelated UNBIND. endpointErrorId: ${message.getEndpointErrorId()}`);
        if (message.getEndpointErrorId() !== undefined) {
          this.endpointErrorId = message.getEndpointErrorId();
        }
        this.processFSMEvent(new ConsumerFSMEvent({
          name: ConsumerFSMEventNames.FLOW_UNBOUND,
        },
          new OperationError(description, errorSubcode, responseCode)
        ));
      }
        break;
      case SMFAdProtocolMessageType.FLOWCHANGEUPDATE:
        this.processFSMEvent(new ConsumerFSMEvent({
          name: ConsumerFSMEventNames.FLOW_ACTIVE_IND,
        }, {
          active: message.getActiveFlow(),
        }));
        break;
      default:
        LOG_DEBUG('Dropping unhandled AD control message: ',
                  SMFAdProtocolMessageType.describe(msgType));
    }
  }

  /**
   * Creates and returns copy of the properties for this MessageConsumer.
   *
   * If the object was constructed using an {@link solace.AbstractQueueDescriptor},
   * and the queue descriptor was subsequently connected to an endpoint, the
   * `MessageConsumerProperties` returned will include a {@link solace.QueueDescriptor}
   * that contains the resolved name.
   *
   * A new copy of the properties object is returned each time this property is accessed.
   * The returned object cannot be polled for mutations such as the one described above.
   *
   * @returns {solace.MessageConsumerProperties} The properties associated with this object.
   */
  getProperties() {
    return super.getProperties();
  }

  /**
   * Resets the router state contained in the consumer, e.g. on VRN change
   *
   * @memberof MessageConsumer
   * @internal
   */
  onVRNChanged() {
    this.processFSMEvent(new ConsumerFSMEvent({
      name: ConsumerFSMEventNames.VIRTUALROUTER_NAME_CHANGED,
    }));
  }

  /**
   * After the MessageConsumer has connected to an endpoint
   * ({@link solace.MessageConsumerEventName#UP}), accesstype represents
   *  the access type for the endpoint to which this Message Consumer is bound.
   * @name solace.MessageConsumer.accessType
   * @type {solace.QueueAccessType}
   */
  get accessType() {
    return this._accessType;
  }
  /**
   * @param {solace.QueueAccessType} value The value to set
   * @internal
   */
  set accessType(value) {
    this._accessType = value;
  }

  /**
   * Whether the consumer is active. If active indications for the consumer are not
   * enabled, this will return undefined.
   * @type {?Boolean}
   * @internal
   */
  get active() {
    return this._active;
  }
  /**
   * @param {Boolean} value The value to set
   * @internal
   */
  set active(value) {
    if (value !== this._active) {
      this._emit(value ? MessageConsumerEventName.ACTIVE : MessageConsumerEventName.INACTIVE);
    }
    this._active = value;
  }

  /**
   * After the MessageConsumer has connected as indicated by the event
   * {@link solace.MessageConsumerEventName#event:UP}, queueDiscardBehavior represents
   * the discard behavior flags for the endpoint to which this Message Consumer is bound.
   * @name solace.MessageConsumer.queueDiscardBehaviour
   * @type {solace.QueueDiscardBehavior}
   */
  get queueDiscardBehavior() {
    return this._queueDiscardBehavior;
  }
  /**
   * @param {solace.QueueDiscardBehavior} value The value to set
   * @internal
   */
  set queueDiscardBehavior(value) {
    this._queueDiscardBehavior = value;
  }

  /**
   * After the MessageConsumer has connected as indicated by the event
   * {@link solace.MessageConsumerEventName#event:UP}
   * respectsTTL is `true` when the endpoint respects Time To Live on messages
   * and 'false' otherwise.
   * @name solace.MessageConsumer.respectsTTL
   * @type {Boolean}
   */
  get respectsTTL() {
    return this._respectsTTL;
  }
  /**
   * @param {Boolean} value The value to set
   * @internal
   */
  set respectsTTL(value) {
    this._respectsTTL = value;
  }

  /**
   * Gets the flow ID for this consumer. This number will change between reconnects
   * and is purely informational.
   * @type {Long}
   * @internal
   */
  get flowId() {
    return this._flowId;
  }
  /**
   * @param {Long} value The value to set
   * @internal
   */
  set flowId(value) {
    this._flowId = value;
  }

  /**
   * After the MessageConsumer has connected as indicated by the event
   * {@link solace.MessageConsumerEventName#event:UP}, this property represents
   * permissions granted by the router to this user on this Message Consumer
   * @name solace.MessageConsumer.permissions
   * @type {solace.QueuePermissions}
   */
  get permissions() {
    return this._permissions || 0;
  }
  /**
   * @param {Number} value The value to set
   * @internal
   */
  set permissions(value) {
    this._permissions = value;
  }

  _onFlowActive(isActive) {
    const { LOG_DEBUG } = this.logger;
    LOG_DEBUG(`Flow (flowId = ${this._flowId}) became ${isActive ? 'active' : 'inactive'}`);
    this._active = isActive;
  }

  _onFlowDisconnected(error) {
    const { LOG_INFO } = this.logger;
    LOG_INFO(`${this} disconnected: ${error}.message`);
  }

  _disposeFSM() {
    const { LOG_INFO } = this.logger;
    LOG_INFO('Disposing FSM');
    this.processFSMEvent(new ConsumerFSMEvent({ name: ConsumerFSMEventNames.DISPOSE }));
  }

  _onFlowUp() {
    const { LOG_INFO } = this.logger;
    LOG_INFO(`Flow is up: flowId = ${this._flowId}`);
  }

  [util_inspect_custom]() {
    return Object.assign(super[util_inspect_custom](), {
      'destination':          this._destination,
      'accessType':           QueueAccessType.describe(this.accessType),
      'permissions':          QueuePermissions.describe(this.permissions),
      'respectsTTL':          this.respectsTTL,
      'active':               this.wantFlowChangeNotify ? this.active : '(indications disabled)',
      'wantFlowChangeNotify': this.wantFlowChangeNotify,
      'queueDiscardBehavior': QueueDiscardBehavior.describe(this.queueDiscardBehavior),
      'maxWindowSize':        this._fsm.maxWindowSize,
    });
  }

  toString() {
    return util_inspect(this);
  }

  _isDisconnected() {
    return this._fsm.isDisconnected();
  }

  /**
   * Subscribe the queue to a topic, always requesting confirmation from the router.
   *
   * {@link solace.MessageConsumerEventName.SUBSCRIPTION_OK} is generated when subscription is
   * added successfully; otherwise, session event
   * {@link solace.MessageConsumerEventName.SUBSCRIPTION_ERROR} is generated.
   *
   * When the application receives the event
   * {@link solace.MessageConsumerEventName.SUBSCRIPTION_ERROR}, it
   * can obtain the failed topic subscription by calling
   * {@link solace.MessageConsumerEvent#reason}.
   * The returned string is in the format of "Topic: <failed topic subscription>".
   *
   * @param {solace.Destination} topic The topic destination subscription to add.
   * @param {Object} correlationKey If specified, this value is
   *                                echoed in the messageConsumer event within
   *                                {@link MessageConsumerEvent}.
   * @param {Number} requestTimeout The request timeout period (in milliseconds). If specified, this
   *                                value overwrites readTimeoutInMsecs property in
   *                                {@link SessionProperties}.
   *
   * @throws {solace.OperationError}
   * * if the session is disposed or disconnected,
   *   or the consumer is inactive, down, disconnected, or disposed.
   *   Or if the consumer is bound to a topic endpoint instead of a queue.
   *   Subcode: {@link solace.ErrorSubcode.INVALID_OPERATION}.
   * * if the parameters have an invalid type.
   *   Subcode: {@link solace.ErrorSubcode.PARAMETER_INVALID_TYPE}.
   * * if the parameters have an invalid value.
   *   Subcode: {@link solace.ErrorSubcode.PARAMETER_OUT_OF_RANGE}.
   * * if the topic has invalid syntax.
   *   Subcode: {@link solace.ErrorSubcode.INVALID_TOPIC_SYNTAX}.
   * * if there's no space in the transport to send the request.
   *   Subcode: {@link solace.ErrorSubcode.INSUFFICIENT_SPACE}.  See:
   *   {@link solace.SessionEventCode#event:CAN_ACCEPT_DATA}.
   * * if the topic is a shared subscription and the peer router does not support Shared
   *   Subscriptions.
   *   Subcode: {@link solace.ErrorSubcode.SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}.
   * * if the topic is a shared subscription and the client does not allowed Shared
   *   Subscriptions.
   *   Subcode: {@link solace.ErrorSubcode.SHARED_SUBSCRIPTIONS_NOT_ALLOWED}.
   */
  addSubscription(topic, correlationKey, requestTimeout) {
    const callback = (success, subCode, respCode, respText) => {
      if (success) {
        const event = new MessageConsumerEvent(
          MessageConsumerEventName.SUBSCRIPTION_OK,
          respText,
          respCode,
          subCode,
          correlationKey,
          `Topic: ${topic.getName()}`
        );
        this._emit(MessageConsumerEventName.SUBSCRIPTION_OK, event);
      } else {
        const error = new MessageConsumerEvent(
          MessageConsumerEventName.SUBSCRIPTION_ERROR,
          respText,
          respCode,
          subCode,
          correlationKey,
          `Topic: ${topic.getName()}`
        );
        this._emit(MessageConsumerEventName.SUBSCRIPTION_ERROR, error);
      }
    };

    this._sessionInterface.updateQueueSubscription(
      topic,
      this._fsm.getDestination(),
      true,
      this,
      callback,
      requestTimeout);
  }
  /**
   * Unsubscribe the queue from a topic, requesting confirmation from the router.
   *
   * {@link solace.MessageConsumerEventName.SUBSCRIPTION_OK} is generated when subscription is
   * removed successfully; otherwise, session event
   * {@link solace.MessageConsumerEventName.SUBSCRIPTION_ERROR} is generated.
   *
   * When the application receives the message consumer event
   * {@link solace.MessageConsumerEventName.SUBSCRIPTION_ERROR}, it
   * can obtain the failed topic subscription by calling
   * {@link solace.MessageConsumerEvent#reason}. The returned
   * string is in the format "Topic: <failed topic subscription>".
   *
   * @param {solace.Destination} topic The topic destination subscription to remove.
   * @param {Object} correlationKey If <code>null</code> or undefined, a Correlation Key is not set
   *                                in the confirmation session event.
   * @param {Number} requestTimeout The request timeout period (in milliseconds). If specified, this
   *                                value overwrites readTimeoutInMsecs property in
   *                                {@link SessionProperties}.
   *
   * @throws {solace.OperationError}
   * * if the session is disposed or disconnected,
   *   or the consumer is inactive, down, disconnected, or disposed.
   *   Or if the consumer is bound to a topic endpoint instead of a queue.
   *   Subcode: {@link solace.ErrorSubcode.INVALID_OPERATION}.
   * * if the parameters have an invalid type.
   *   Subcode: {@link solace.ErrorSubcode.PARAMETER_INVALID_TYPE}.
   * * if the parameters have an invalid value.
   *   Subcode: {@link solace.ErrorSubcode.PARAMETER_OUT_OF_RANGE}.
   * * if the topic has invalid syntax.
   *   Subcode: {@link solace.ErrorSubcode.INVALID_TOPIC_SYNTAX}.
   * * if there's no space in the transport to send the request.
   *   Subcode: {@link solace.ErrorSubcode.INSUFFICIENT_SPACE}.  See:
   *   {@link solace.SessionEventCode#event:CAN_ACCEPT_DATA}.
   * * if the topic is a shared subscription and the peer router does not support Shared
   *   Subscriptions.
   *   Subcode: {@link solace.ErrorSubcode.SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}.
   * * if the topic is a shared subscription and the client does not allowed Shared
   *   Subscriptions.
   *   Subcode: {@link solace.ErrorSubcode.SHARED_SUBSCRIPTIONS_NOT_ALLOWED}.
   */
  removeSubscription(topic, correlationKey, requestTimeout) {
    const callback = (success, subCode, respCode, respText) => {
      if (success) {
        const event = new MessageConsumerEvent(
          MessageConsumerEventName.SUBSCRIPTION_OK,
          respText,
          respCode,
          subCode,
          correlationKey,
          `Topic: ${topic.getName()}`
        );
        this._emit(MessageConsumerEventName.SUBSCRIPTION_OK, event);
      } else {
        const error = new MessageConsumerEvent(
          MessageConsumerEventName.SUBSCRIPTION_ERROR,
          respText,
          respCode,
          subCode,
          correlationKey,
          `Topic: ${topic.getName()}`
        );
        this._emit(MessageConsumerEventName.SUBSCRIPTION_ERROR, error);
      }
    };

    this._sessionInterface.updateQueueSubscription(
      topic,
      this._fsm.getDestination(),
      false,
      this,
      callback,
      requestTimeout);
  }
}

module.exports.MessageConsumer = MessageConsumer;
