const SMFLib = require('solclient-smf');
// No idea why the usual import idiom does not work.
//const { CapabilityType } = require('solclient-session');
const SolclientSession = require('solclient-session');
const { ErrorResponseSubcodeMapper,
        ErrorSubcode,
        OperationError } = require('solclient-error');
const { LogFormatter } = require('solclient-log');
const { Long } = require('solclient-convert');
const { MessageIds } = require('./message-ids');
const { MessagePublisherAcknowledgeMode } = require('./message-publisher-acknowledge-modes');
const { MessagePublisherEventName } = require('./message-publisher-event-names');
const { PrivateFlowEventName } = require('solclient-flow');
const { PublisherFSMEvent } = require('./publisher-fsm-event');
const { PublisherFSMEventNames } = require('./publisher-fsm-event-names');
const { PublisherStateNames } = require('./publisher-state-names');
const { State, StateMachine } = require('solclient-fsm');
const { StatType } = require('solclient-stats');
const { TransportReturnCode } = require('solclient-transport');

const ConnectReason = {
  INIT:     0, // never connected: message renumbering required
  RESUME:   1, // connected before
  FAILOVER: 2, // after a DR failover: message renumbering and message republish event required
};

class PublisherFSM extends StateMachine {
  constructor({ publisher, name, sessionInterface, properties } = {}) {
    super({ name });
    /** @type {MessagePublisher} */
    const fsm = this;
    const currentState = () => {
      const state = fsm.getCurrentState();
      return state ? state.getName() : '<not running>';
    };
    this.logger = new LogFormatter((...args) => [
      `[session=${sessionInterface.sessionIdHex}]`,
      `[message-publisher-fsm=${publisher.flowIdDec}]`,
      `[${currentState()}]`,
      ...args,
    ]);
    this.log = this.logger.wrap(this.log, this);
    const {
      LOG_TRACE,
      LOG_DEBUG,
      LOG_INFO,
      LOG_WARN,
    } = this.logger;

    Object.assign(this, {
      _publisher:                 publisher,
      _acknowledgeMode:           properties.acknowledgeMode,
      _acknowledgeTimeoutInMsecs: properties.acknowledgeTimeoutInMsecs,
      _sessionInterface:          sessionInterface,
      _windowSize:                properties.windowSize,
      _stateEvents:               [],
    });
    // The publisher is not even constructed when publisherProperties.enabled is false
    // so we could just set _guaranteedEnabled to true, but lets be thorough as at
    // some point we may want to have sendADMessage() be responsible for throwing all
    // errors. As it is, when publisher properties enabled is false, the session-fsm
    // throws an error and when the message-spool is shutdown (close-flow received) the
    // publisher-fsm thtows and error.
    this._guaranteedEnabled = properties.enabled;
    this._sendWindow = properties.windowSize;
    this._resetConnectedInfo();
    this._notifiedWindowClosed = false;
    // We need a boolean to track the transport flow
    // controlled state. prepareAdMessageAndSend is called by the
    // session to prepare a message for publish and we
    // need to return whether or not we are flow controlled
    // without invoking the FSM or calling methods on the
    // FSM.
    // We consider all down states and all resending states equal
    // to flow-controlled as in all states me must queue the message
    // to the unacked list and not attempt to send directly.
    // This is strictly a performance issue.
    this._transportFlowControlled = true;

    this.initial(function onInitial() {
      return this.transitionTo(fsm.PublisherUnbound,
                               (context) => {
                                 LOG_TRACE(`Starting ${context.getStateMachine().getName()}`);
                               }
      );
    });

    fsm.unhandledEventReaction(function onUnhandledEvent(event) {
      switch (event.getName()) {
        case PublisherFSMEventNames.FLOW_UNBOUND:
          // the router has closed the flow, likely due to operator
          // shutdown on the message spool. Throw on any attempt
          // to publish
          this._guaranteedEnabled = false;
          this._publisher.emit(MessagePublisherEventName.GUARANTEED_MESSAGING_DOWN);
          return this.transitionTo(
            fsm.PublisherUnbound,
            (context) => {
              LOG_TRACE(`Received close publisher for ${context.getStateMachine().getName()}`);
            });
        case PublisherFSMEventNames.DISPOSE:
          LOG_TRACE('Received dispose request');
          break;
        case PublisherFSMEventNames.TRANSPORT_FULL:
          LOG_TRACE('Received TRANSPORT_FULL');
          break;
        case PublisherFSMEventNames.CAN_SEND:
          // This is ok because the session just sends the publisher CAN_SEND whenever received
          // from transport, even if the publisher is not in use, or hasn't caused the flow control
          LOG_TRACE('Received CAN_SEND when not flow controlled');
          break;
        default:
          LOG_TRACE(`Ignoring event ${event.getName()}`);
      }
      return this;
    });

    fsm.PublisherUnbound = new State({
      name:          PublisherStateNames.UNBOUND,
      parentContext: fsm,
    }, {
      emitDownAndBindWaiting() {
        LOG_TRACE('Emit down and bind waiting');
        publisher.emit(MessagePublisherEventName.DOWN);
        publisher.emit(PrivateFlowEventName.BIND_WAITING);
      },
    })
      .entry(function onEntry() {
        this.emitDownAndBindWaiting();
        fsm._connectRetryCount = properties.connectRetryCount;
      })
      .reaction(PublisherFSMEventNames.FLOW_UNBOUND, function onFlowUnbind() {
        return this.internalTransition();
      })
      .reaction(PublisherFSMEventNames.SESSION_UP, function onSessionUp() {
        return this.transitionTo(fsm.PublisherOpenFlowSent);
      });

    fsm.PublisherOpenFlowSent = new State({
      name:          PublisherStateNames.OPENFLOWSENT,
      parentContext: fsm,
    }, {
      emitOpenFlowFailedError(details) {
        publisher.emit(MessagePublisherEventName.CONNECT_FAILED_ERROR, details);
      },
      /**
       * @param {AdProtocolMessage} adpMsg An OPENFLOW response.
       * @description Handle an incoming Guaranteed Messaging Protocol Message.
       * @returns {?} The result of processing an event, or null if no event was dispatched.
       * @private
       */
      handleOpenFlowResponse(adpMsg) {
        const smfRespHeader = adpMsg.smfHeader;
        const respCode = smfRespHeader.pm_respcode;
        /*
         * Assured Control Protocol messages are received on publisher and consumer flows. The
         * message types for each are unique, so we can determine whether it is a publisher or
         * consumer by message type.  A specific publisher or consumer is found by the flowId, or
         * by the correlation tag in the case of OPEN-FLOW (publisher) or BIND (consumer) responses.
         */
        if (adpMsg.msgType !== SMFLib.SMFAdProtocolMessageType.OPENPUBFLOW) {
          return fsm.processEvent(new PublisherFSMEvent(
              { name: PublisherFSMEventNames.FLOW_FAILED },
              {
                returnCode:  respCode,
                description: `Unexpected response: ${SMFLib.SMFAdProtocolMessageType.describe(adpMsg.msgType)}`,
              }
          ));
        }

        /*
         * The response code will indicate whether we create a PUB_FLOW_UP (200 OK) event
         * or a PUB_FLOW_FAIL (any other response) event, or treat this as an invalid
         * message (received a OPEN-PUB-FLOW request).
         */

        if (respCode === null) {
          // Drop message and increment stats
          publisher.incStat(StatType.RX_DISCARD_SMF_UNKNOWN_ELEMENT);
          LOG_DEBUG(`Drop Open-Publisher-Flow Request message on sessionId 0x${
                    sessionInterface.sessionIdHex}`);
          return null;
        }

        if (respCode !== 200) {
          const respStr = smfRespHeader.pm_respstr;
          const mappedSubcode = ErrorResponseSubcodeMapper.getADErrorSubcode(respCode, respStr);
          return fsm.processEvent(new PublisherFSMEvent(
            { name: PublisherFSMEventNames.FLOW_FAILED },
            {
              subcode:     mappedSubcode,
              returnCode:  respCode,
              description: respStr,
            })
          );
        }

        LOG_TRACE('Handling OPENPUBFLOW message');

        // typical response: { lastmsgidacked window flowid flowname publisher_id }

        const lastMsgIDAcked = adpMsg.getLastMsgIdAcked();
        const window = adpMsg.getWindow();
        const flowId = adpMsg.getFlowId();
        const flowName = adpMsg.getFlowName();
        const publisherId = adpMsg.getPublisherId();

        LOG_DEBUG(`OPENPUBFLOW response attributes: lastMsgIDAcked=${lastMsgIDAcked} window=${window} flowId=${flowId} flowName=${flowName} publisherId=${publisherId}`);

        LOG_TRACE(`Local before handling response: ${fsm._messageIds}`);

        if (window === undefined) {
          return fsm.processEvent(
            new PublisherFSMEvent({ name: PublisherFSMEventNames.FLOW_FAILED },
                                  { description: 'Window parameter not found' })
          );
        }
        if (window > this._windowSize) {
          return fsm.processEvent(
            new PublisherFSMEvent({ name: PublisherFSMEventNames.FLOW_FAILED },
                                  { description: 'Invalid window negotiation' })
          );
        }
        // reduce sendWindow by the size of unAckedList but do not reduce below zero
        fsm._sendWindow = window - fsm._unackedList.length;
        if (fsm._sendWindow < 0) fsm._sendWindow = 0;

        // update publisher info before renumbering
        Object.assign(fsm._publisher, {
          name: flowName,
          flowId,
          publisherId,
        });
        // we may have been disabled by a previous closeFlow messsage, now that
        // we know hte message spool is enabled again, set _guaranteedEnabled back
        // to true
        fsm._guaranteedEnabled = true;

        if ((fsm._connectReason === ConnectReason.INIT) ||
          (fsm._connectReason === ConnectReason.FAILOVER)) {
          // reset 'lastSent' before renumbering
          fsm._messageIds.setLastSent(lastMsgIDAcked);

          LOG_DEBUG(`Renumbering unacked/unsent messages: fsm._messageIds=${fsm._messageIds}, lastMsgIDAcked=${lastMsgIDAcked}, type=${fsm._connectReason}`);
          if (fsm._connectReason === ConnectReason.FAILOVER) {
            publisher.emit(MessagePublisherEventName.FLOW_NAME_CHANGED, {
              messages: [...fsm._unackedList],
              count:    fsm._unackedList.length,
            });
          }
          fsm._connectReason = ConnectReason.RESUME;
          fsm._unackedList.forEach((message) => {
            const oldId = message.getGuaranteedMessageId();
            fsm._renumber(message);
            LOG_TRACE(`Renumbering message ID: from ${oldId} to ${message.getGuaranteedMessageId()}`);
            fsm._messageIds.setLastSent(message.getGuaranteedMessageId());
          });
        } else {
          fsm._unackedList.forEach((message) => {
            message.setFlowId(flowId);
            message.setPublisherId(publisher.publisherId);
            LOG_TRACE(`Set FlowId to ${flowId} in msg# ${message.getGuaranteedMessageId()}`);
          });
        }
        // Either way, make sure none of the messages is above the size limit:
        const payloadSizeLimit = fsm._sessionInterface.getCapability(SolclientSession.CapabilityType.MAX_GUARANTEED_MSG_SIZE).getValue();
        fsm._unackedList.forEach((message) => {
          if (payloadSizeLimit < message._memoized_payload.length) {
            LOG_WARN(`Message size ${message._memoized_payload.length} above broker limit ${payloadSizeLimit}`);
          }
        });


        if (fsm._unackedList.length) {
          fsm._handleAck(lastMsgIDAcked, false, adpMsg, true);
          // the starting point for retransmitting.  If lastMsgIdAcked doesn't
          // ack anything this does not get updates, causing us to possible send
          // messages out of order, or not start sending at all
          fsm._firstUnackedToSend = fsm._unackedList[0];
        } else {
          fsm._messageIds.lastAcked = Long.fromValue(lastMsgIDAcked);
        }
        LOG_TRACE(`Local after applying lastMsgIDAcked: ${fsm._messageIds}`);

        return fsm.processEvent(
          new PublisherFSMEvent({ name: PublisherFSMEventNames.FLOW_UP }));
      },
      /**
       * @returns {?} The result of processing an BIND_TIMEOUT event
       * @private
       */
      handleOpenFlowTimeout() {
        LOG_INFO('Open publisher connection timeout');
        return fsm.processEvent(
          new PublisherFSMEvent({ name: PublisherFSMEventNames.BIND_TIMEOUT }));
      },
      handleUnknownFlowName() {
        LOG_INFO('Flow name unknown, republish required');
        // Don't send the FLOW_NAME_CHANGED message yet -- it specifically indicates
        // that duplicate messages should be expected. Wait until the flow is successfully
        // connected and messages are being renumbered.
        fsm._resetConnectedInfo(true);
        return this.externalTransitionTo(fsm.PublisherOpenFlowSent);
      },
      /**
       * Send a Publisher Open Flow Request.
       * @private
       */
      sendOpenFlow() {
        const correlationTag = sessionInterface.getCorrelationTag();
        LOG_TRACE(`sendOpenFlow correlationTag: ${correlationTag}`);
        const openPubFlowMsg = SMFLib.AdProtocolMessage.getOpenMessagePublisher(
          fsm._messageIds.lastAcked,
          fsm._messageIds.lastSent,
          properties.windowSize,
          fsm._publisher._flowName,
          correlationTag
        );
        LOG_TRACE('sendOpenFlow openPubFlowMsg constituents:');
        LOG_TRACE(`fsm._messageIds.lastAcked:${fsm._messageIds.lastAcked} fsm._messageIds.lastSent:${fsm._messageIds.lastSent} properties.windowSize:${properties.windowSize} fsm._publisher._flowName:${fsm._publisher._flowName} `);
        sessionInterface.sendControl(openPubFlowMsg);
        sessionInterface.enqueueRequest(correlationTag,
                                        () => this.handleOpenFlowTimeout(),
                                        properties.connectTimeoutInMsecs,
                                        null,
                                        rxMsgObj => this.handleOpenFlowResponse(rxMsgObj));
        LOG_TRACE('Sent open publisher connection');
      },
    })
      .entry(function onEntry() {
        try {
          this.sendOpenFlow();
        } catch (e) {
          LOG_WARN(`Exception during bind attempt: ${e}`);
          fsm.processEvent(new PublisherFSMEvent({ name: PublisherFSMEventNames.SESSION_DOWN }));
        }
      })
      .reaction(PublisherFSMEventNames.FLOW_CLOSE, function onFlowClose() {
        return this.transitionTo(fsm.PublisherCloseFlowSent);
      })
      .reaction(PublisherFSMEventNames.FLOW_UP, function onFlowUp() {
        return this.transitionTo(fsm.PublisherUp);
      })
      .reaction(PublisherFSMEventNames.SESSION_DOWN, function onSessionDown() {
        return this.transitionTo(fsm.PublisherUnbound);
      })
      .reaction(PublisherFSMEventNames.BIND_TIMEOUT, function onOpenFlowTimeout() {
        if (fsm._connectRetryCount > 0) {
          fsm._connectRetryCount--;
          return this.externalTransitionTo(fsm.PublisherOpenFlowSent);
        }
        this.emitOpenFlowFailedError({
          subcode:     ErrorSubcode.TIMEOUT,
          description: 'Open publisher connection failed due to timeout',
        });
        return this.transitionTo(fsm.PublisherUnbound);
      })
      .reaction(PublisherFSMEventNames.FLOW_FAILED, function onFlowFailed(pEvent) {
        const { subcode, returnCode, description } = pEvent;
        LOG_TRACE(`FLOW_FAILED in PublisherOpenFlowSent state: subcode: ${subcode}, returnCode: ${returnCode}, description: ${description}`);
        switch (pEvent.subcode) {
          case ErrorSubcode.UNKNOWN_FLOW_NAME:
            // DR or long HA failover
            return this.handleUnknownFlowName();
          // case ErrorSubcode.GM_NOT_READY: Fail the session
          default:
            this.emitOpenFlowFailedError({
              event: pEvent,
              subcode,
              returnCode,
              description,
            });
            // Otherwise, the flow is invalid
            fsm._resetConnectedInfo();
        }
        return this.transitionTo(fsm.PublisherUnbound);
      });

    fsm.PublisherCloseFlowSent = new State({
      name:          PublisherStateNames.CLOSEFLOWSENT,
      parentContext: fsm,
    }, {
      handleCloseFlowResponse(response) {
        const smfRespHeader = response.smfHeader;
        const respCode = smfRespHeader.pm_respcode;

        if (response.msgType !== SMFLib.SMFAdProtocolMessageType.CLOSEPUBFLOW) {
          return fsm.processEvent(new PublisherFSMEvent(
            { name: PublisherFSMEventNames.FLOW_FAILED },
            {
              returnCode:  respCode,
              description: `Unexpected response: ${SMFLib.SMFAdProtocolMessageType.describe(response.msgType)}`,
            }));
        }

        if (respCode === null) {
          // Drop message  and increment stats
          publisher.incStat(StatType.RX_DISCARD_SMF_UNKNOWN_ELEMENT);
          LOG_DEBUG(`Drop Close-Publisher-Flow Request message on sessionId 0x${
                    sessionInterface.sessionIdHex}`);
          return null;
        }

        if (respCode !== 200) {
          fsm.processEvent(
            new PublisherFSMEvent({ name: PublisherFSMEventNames.FLOW_FAILED },
                                  {
                                    returnCode:  respCode,
                                    description: smfRespHeader.pm_respstr,
                                  }));
        }

        return fsm.processEvent(
          new PublisherFSMEvent({ name: PublisherFSMEventNames.FLOW_UNBOUND }));
      },

      handleCloseFlowTimeout() {
        LOG_INFO('Close publisher connection timeout.');
        return fsm.processEvent(
          new PublisherFSMEvent({ name: PublisherFSMEventNames.UNBIND_TIMEOUT }));
      },

      sendCloseFlow() {
        const correlationTag = sessionInterface.getCorrelationTag();
        const closePubFlowMsg = SMFLib.AdProtocolMessage.getCloseMessagePublisher(
          fsm._publisher.flowId,
          correlationTag
        );
        sessionInterface.sendControl(closePubFlowMsg);
        sessionInterface.enqueueRequest(correlationTag,
                                        () => this.handleCloseFlowTimeout(),
                                        properties.connectTimeoutInMsecs,
                                        null,
                                        rxMsgObj => this.handleCloseFlowResponse(rxMsgObj));
        LOG_TRACE('Sent close publisher connection');
      },
    })
      .entry(function onEntry() {
        this.sendCloseFlow();
        return this;
      })
      .reaction(PublisherFSMEventNames.ACK, function onAck(event) {
        fsm._handleAckEvent(event);
        return this.internalTransition();
      })
      .reaction(PublisherFSMEventNames.FLOW_UNBOUND, function onFlowUnbound() {
        return this.transitionTo(fsm.PublisherUnbound);
      })
      .reaction(PublisherFSMEventNames.FLOW_FAILED, function onCloseFlowFailed(/*pEvent*/) {
        this.transitionTo(fsm.PublisherUnbound);
      })
      .reaction(PublisherFSMEventNames.UNBIND_TIMEOUT, function onCloseFlowTimeout() {
        return this.transitionTo(fsm.PublisherCloseFlowSent);
      });

    fsm.PublisherUp = new State({
      name:          PublisherStateNames.UP,
      parentContext: fsm,
    }, {
      emitFlowUp() {
        publisher.emit(MessagePublisherEventName.UP);
      },
    })
      .initial(function initial() {
        return this.transitionTo(
          fsm._unackedList.length
            ? fsm.PublisherRetransmitting
            : fsm.PublisherDataXfer
        );
      })
      .entry(function onEntry() {
        // The state isn't changed on entry, so don't emit yet.
        LOG_DEBUG('Flow is UP');
        fsm._scheduleStateEvents(fsm.PublisherUp, () => this.emitFlowUp());
        return this;
      })
      .reaction(PublisherFSMEventNames.ACK, function onAck(event) {
        LOG_DEBUG('Ack received');
        fsm._handleAckEvent(event);
        return this.internalTransition();
      })
      .reaction(PublisherFSMEventNames.ACK_TIMEOUT, function onAckTimeout() {
        // the starting point when we get the CAN_SEND
        fsm._firstUnackedToSend = fsm._unackedList[0];
        return this.transitionTo(fsm.PublisherRetransmitting);
      })
        .reaction(PublisherFSMEventNames.FLOW_CLOSE, function onFlowClose() {
          return this.transitionTo(fsm.PublisherCloseFlowSent);
        })
        .reaction(PublisherFSMEventNames.SESSION_DOWN, function onSessionDown() {
          return this.transitionTo(fsm.PublisherUnbound);
        })
        .reaction(PublisherFSMEventNames.TRANSPORT_FULL, function onWindowClosed() {
          return this.internalTransition();
        });

    fsm.PublisherDataXfer = new State({
      name:          PublisherStateNames.DATA_XFER,
      parentContext: fsm.PublisherUp,
    })
      .entry(() => {
        // publisher is up and capable of sending GM messages directly from the application
        fsm._transportFlowControlled = false;
        fsm._scheduleStateEvents(fsm.PublisherDataXfer, () => fsm._maybeEmitCanSend());
      })
      .reaction(PublisherFSMEventNames.TRANSPORT_FULL, function onTransportFull() {
        return this.transitionTo(fsm.PublisherFlowControlled);
      })
      .exit(() => {
        // publisher cannot send messages to transport and must queue in unAckedList until
        // re-entering PublisherDataXfer
        // set a FSM boolean that is checked in prepareADMessageAndSend() we
        // need to avoid FSM interactions on the fast path so resort to
        // this boolean.
        fsm._transportFlowControlled = true;
      });

    fsm.PublisherFlowControlled = new State({
      name:          PublisherStateNames.FLOW_CONTROLLED,
      parentContext: fsm.PublisherUp,
    })
      .reaction(PublisherFSMEventNames.TRANSPORT_FULL, function onTransportFull() {
        // Unusual event, as only the FSM can send messages when we're in flow-controlled state
        LOG_INFO('Attempt to send while flow controlled');
        // Fall out and do the action for PublisherUp
        return this.internalTransition();
      })
      .reaction(PublisherFSMEventNames.CAN_SEND, function onCanSend() {
        //
        // start sending from tune unAcked list.
        return this.transitionTo(fsm.PublisherRetransmitting);
      });

    fsm.PublisherRetransmitting = new State({
      name:          PublisherStateNames.RETRANSMITTING,
      parentContext: fsm.PublisherUp,
    }, {
      retransmit() {
        try {
          fsm._resendFromUnacked();
        } catch (ex) {
          // Resend failed:
          if (ex instanceof OperationError && ex.subcode === ErrorSubcode.INSUFFICIENT_SPACE) {
            LOG_DEBUG('Publisher resendFromUnacked blocked due to insufficient space, wait for CAN_SEND');
            fsm.processEvent(
              new PublisherFSMEvent({ name: PublisherFSMEventNames.TRANSPORT_FULL }));
          } else {
            // send failed.  Fail the publisher
            LOG_INFO(`Publisher resendFromUnacked failed: ${ex}`);
            fsm.processEvent(
              new PublisherFSMEvent({ name: PublisherFSMEventNames.FLOW_FAILED }));
          }
        }
      },
    })
      .entry(function onEntry() {
        this.retransmit();
      })
      .reaction(PublisherFSMEventNames.RESEND_COMPLETE, function onResendComplete() {
        return this.transitionTo(fsm.PublisherDataXfer);
      })
      .reaction(PublisherFSMEventNames.TRANSPORT_FULL, function onTransportFull() {
        // Transport flow controlled while resending/recovering from flow control. Go
        // to flow controlled state and wait for CAN_SEND
        LOG_DEBUG(`Transport full while retransmitting, unacked remaining: ${fsm._unackedList.length}`);
        // Fall out and do the action for PublisherUp
        return this.transitionTo(fsm.PublisherFlowControlled);
      });
  }

  isDisconnected() {
    if (!this.getCurrentState()) return true;
    return !!this.getActiveState(PublisherStateNames.UNBOUND);
  }

  /**

   * Prepare and send a Guaranteed Message. This method updates FSM variables including
   * lastSendMessage.
   *
   * @private
   * @param {Message} dataMsg The message to prepare
   * @returns {TransportReturnCode} return the status from the transport send
   */
  prepareAdMessageAndSend(dataMsg) {
    if (!this._guaranteedEnabled) {
      throw new OperationError('Session does not provide Guaranteed Message Publish capability',
        ErrorSubcode.GM_UNAVAILABLE,
        'close flow received from message-router');
    }
    if (this._sendWindow <= 0) {
      this._publisher.incStat(StatType.TX_WINDOW_CLOSED);
      this._notifiedWindowClosed = true;
      throw new OperationError(
        'Guaranteed Message Window Closed',
        ErrorSubcode.INSUFFICIENT_SPACE
      );
    }


    const unackedList = this._unackedList;
    const {
      LOG_TRACE,
      LOG_DEBUG,
      LOG_INFO,
      LOG_WARN,
    } = this.logger;

    if (dataMsg._payload_is_memoized) {
      dataMsg._payload_is_memoized = false;
      dataMsg._memoized_csumm = undefined;
      dataMsg._memoized_payload = undefined;
    }
    const dupMsg = dataMsg.clone();
    const payloadSize = SMFLib.Codec.Encode.adaptMessageToSmf_payloadMemoize(dupMsg);
    var payloadSizeLimit = 0;
    try {
      payloadSizeLimit = this._sessionInterface.getCapability(SolclientSession.CapabilityType.MAX_GUARANTEED_MSG_SIZE).getValue();
      LOG_TRACE(`Payload size limit: ${payloadSizeLimit}`);
    } catch (e) {
      LOG_INFO('Can\'t pre-check payload size, broker not connected yet?');
      LOG_TRACE(e.stack);
    }
    if ((0 < payloadSizeLimit) && (payloadSize > payloadSizeLimit)) {
      throw new OperationError(
        `Encoded payload size (${payloadSize}) exceeds broker size limit (MAX_GUARANTEED_MSG_SIZE, ${payloadSizeLimit})`,
        ErrorSubcode.MESSAGE_TOO_LARGE
      );
    }

    --this._sendWindow;
    this._renumber(dupMsg);
    this._cloneNumbers(dupMsg, dataMsg);

    unackedList.push(dupMsg);
    // Update the messgeIds, lastSend/next values only on a successful send or enqueue,
    // from this point on we will return OK from this send method.
    const msgId = dupMsg.getGuaranteedMessageId();
    this._messageIds.setLastSent(msgId);
    LOG_TRACE(`Prepare and send AD message ID = ${msgId}, 
      unackedListSize = ${unackedList.length}, sendWindow = ${this._sendWindow}`);
    // Note that the transport sender can be flow controlled at the transport
    // level, which means the message should not be sent. So simply return
    // We also consider set-up/down-states as _transportFlowControlled. We must be up
    // and not retransmitting to
    // send directly from application space.
    if (this._transportFlowControlled) {
      // we may receive acknowledgements while transport flow controlled which can
      // cause our firstUnAckedToSend to become undefined, if this is the first message
      // queued in that case, set firstUnAckedToSend
      if (this._firstUnackedToSend === undefined) {
        this._firstUnackedToSend = dupMsg;
      }
      return TransportReturnCode.OK;
    }
    // We use the session sendToTransport directly which may throw or  otherwise
    // return an error. If so, catch the eror and remove the message from the unackedlist
    // before rethrowing the error.
    let returnCode;
    try {
      returnCode = this._sessionInterface.sendToTransport(dupMsg);
      if (returnCode !== TransportReturnCode.OK) {
        if (returnCode === TransportReturnCode.NO_SPACE) {
          returnCode = TransportReturnCode.OK;
          this._firstUnackedToSend = dupMsg;  // the starting point when we get the CAN_SEND
          this.processEvent(new PublisherFSMEvent({ name: PublisherFSMEventNames.TRANSPORT_FULL }));
        } else {
          //
          LOG_DEBUG(`prepareAdMessageAndSend: saw returnCode = ${returnCode}`);
        }
      } else {
        // The message has successfully been sent once. Set the redelivered flag in case we need to
        // resend it later.
        dupMsg.setRedelivered(true);
      }
      // TBD: Should we start AckTimer when flow controlled?
      this._startAckTimer();
    } catch (ex) {
      if (ex instanceof OperationError) {
        LOG_DEBUG(`prepareAdMessageAndSend: caught OperationError: ${ErrorSubcode.describe(ex.subcode)} - ${ex.message}`);
        // OperationErrors are encoding or other errors caused by the field contents the
        // application has set on the message.  So we throw the error back at the
        // application after undoing the queueing operations.
        unackedList.pop();
        this._messageIds.setLastSent(dupMsg.getGuaranteedPreviousMessageId());
        ++this._sendWindow;
        throw (ex);
      } else {
        LOG_DEBUG(`prepareAdMessageAndSend: caught ${ex.message}`);
        throw ex;
      }
    }
    return TransportReturnCode.OK;
  }

  _handleAckEvent(event) {
    this._publisher.incStat(StatType.TX_ACKS_RXED);
    this._handleAck(event.ack || event.nack, !!event.nack, event.ctrlMessage);
  }

  _handleAck(id, nack, ctrlMessage = undefined, openFlow = false) {
    const {
      _messageIds: messageIds,
      _unackedList: unackedList,
    } = this;
    const { LOG_DEBUG, LOG_INFO } = this.logger;

    if (messageIds.lastAcked.gte(id)) {
      if (openFlow) {
        LOG_DEBUG(`Implicit acks up to date: remote ack for ${id}, local ids ${this._messageIds}`);
      } else {
        LOG_INFO(`Dropping ack: remote ack for ${id}, local ids ${this._messageIds}`);
      }
      return;
    }

    const reportAcked = [];

    // Assumption: the unacked message list is in increasing order of message ID.
    // The assured message ID should be automatically generated, immutable and
    // monotonically increasing.

    // While the acked ID is greater than an element at the beginning of the unacked list...
    while (unackedList.length &&
           id.gte(unackedList[0].getGuaranteedMessageId())) {
      // That unacked message is now acked. Shift it off and append to list of acked.
      reportAcked.push(unackedList.shift());
    }

    // if we have removed the firstUnacked, reset it to the beginning of the list
    if (unackedList.indexOf(this._firstUnackedToSend) === -1) {
      this._firstUnackedToSend = unackedList[0];
    }


    // Recover some window space from the acked IDs.
    this._sendWindow += reportAcked.length;
    LOG_DEBUG('Send window size is now', this._sendWindow);


    // The last acked ID is now the one we received.
    // Don't let an exception in event dispatching prevent this from being set -- do it now.
    messageIds.lastAcked = id;

    // If we're NACKing, it's only the last message.
    const reportNacked = nack ? reportAcked.pop() : null;

    // Any ACKs?
    // reportAcked is constant from here on
    const numAcked = reportAcked.length;
    if (numAcked) {
      if (this._acknowledgeMode === MessagePublisherAcknowledgeMode.PER_MESSAGE) {
        LOG_DEBUG(`Ack received: lastAckedMsgId=${id}, numAckedMsgs=${numAcked}, numUnackedMsgs=${unackedList.length}`);
        for (let i = 0; i < numAcked; ++i) {
          this._publisher.emit(MessagePublisherEventName.ACKNOWLEDGED_MESSAGE, reportAcked[i]);
        }
      } else {
        const lastAckedMessage = reportAcked[numAcked - 1];
        LOG_DEBUG(`Acking single message with ID ${lastAckedMessage.getGuaranteedMessageId()} from router ack on ${id}`);
        this._publisher.emit(MessagePublisherEventName.ACKNOWLEDGED_MESSAGE, lastAckedMessage);
      }
    }

    // Terminating NACK?
    if (reportNacked) {
      LOG_DEBUG(`Nacking single message with ID ${reportNacked.getGuaranteedMessageId()} from router ack on ${id}`);
      this._publisher.emit(MessagePublisherEventName.REJECTED_MESSAGE, reportNacked, ctrlMessage);
    }

    LOG_DEBUG('Unacked messages remaining: ', unackedList.length);

    if (unackedList.length) {
      // There are more messages to be acked. Reset the ack timer.
      this._resetAckTimer();
    } else {
      this._clearAckTimer();
    }
    //
    // send can-send to applicatino if necessary.
    this._maybeEmitCanSend();
  }

  _maybeEmitCanSend() {
    const { LOG_TRACE } = this.logger;
    if (!this._notifiedWindowClosed) return;
    if (this._sendWindow === 0) {
      // Should log this since it is called AFTER the state change
      LOG_TRACE('Suppressing CAN_SEND with zero window available');
      return;
    }
    this._notifiedWindowClosed = false; // Set before emitting in case we re-enter
    this._publisher.emit(MessagePublisherEventName.CAN_SEND);
  }

  _resendFromUnacked() {
    const { LOG_ERROR, LOG_INFO, LOG_DEBUG } = this.logger;
    // We choose to start the ack timer after the message resend. The resend could
    // take longer than the ack timeout in poor conditions.
    //
    // If any message is successfully resent, we need to restart the ack timer,
    // even if we are throwing.
    const list = this._unackedList;
    let resendIndex = list.indexOf(this._firstUnackedToSend);
    if (resendIndex === -1) {
      // first Unacked may be null if all have been resent alreadygrunt -
      // in which case the list  should be empty
      if (this._firstUnackedToSend) {
        LOG_ERROR(`Could not find first Unacked Messages in unacked message list: msgId = ${this._firstUnackedToSend.getGuaranteedMessageId}`);
      }
      if (list.length === 0) {
        // Nothing to resend: return to DataXfer
        LOG_DEBUG(`Nothing to resend: ${this._messageIds.toString()}`);
        this.processEvent(
          new PublisherFSMEvent({ name: PublisherFSMEventNames.RESEND_COMPLETE })
        );
      }
      return;
    }
    LOG_DEBUG(`Resending unacked messages from ${resendIndex} to ${list.length - 1}: `,
              list.map(m => m.getGuaranteedMessageId().toString()));
    while (resendIndex < list.length) {
      if (list[resendIndex].getPublisherId() !== this._publisher.publisherId) {
        LOG_ERROR(`Resending on invalid publisherId '${list[resendIndex].getPublisherId()}'when it should be '${this._publisher.publisherId}'`);
      }
      const returnCode = this._sessionInterface.sendData(list[resendIndex]);
      if (returnCode === TransportReturnCode.NO_SPACE) {
        // the starting point when we get the CAN_SEND from the transport
        this._firstUnackedToSend = list[resendIndex];
        LOG_INFO('Publisher sendMessage blocked due to insufficient space, wait for CAN_SEND');
        this.processEvent(
          new PublisherFSMEvent({ name: PublisherFSMEventNames.TRANSPORT_FULL }));
        return;
      }
      if (returnCode !== TransportReturnCode.OK) {
        // session-FSM is already processing the error
        return;
      }
      // The message has successfullly been sent once. Set the redelivered flag in case we need to
      // resend it later
      list[resendIndex].setRedelivered(true);
      resendIndex++;
      this._startAckTimer();
    }
    // Resend successful: return to DataXfer
    LOG_DEBUG(`Resend complete: ${this._messageIds.toString()}`);
    this.processEvent(
      new PublisherFSMEvent({ name: PublisherFSMEventNames.RESEND_COMPLETE })
    );
  }

  _resetConnectedInfo(failover = false) {
    const { LOG_DEBUG } = this.logger;
    LOG_DEBUG('Resetting connected flow info');

    if (this._ackTimer) this._clearAckTimer();

    Object.assign(this, {
      _messageIds: new MessageIds(),
    });

    Object.assign(this._publisher, {
      publisherId: undefined,
      flowId:      undefined,
      flowName:    null,
    });

    if (failover) {
      this._connectReason = ConnectReason.FAILOVER;
    } else {
      // Full reset
      this._unackedList = [];
      this._connectReason = ConnectReason.INIT;
    }
  }

  _clearAckTimer() {
    const { LOG_TRACE } = this.logger;
    LOG_TRACE(`Clear ack timer ${this._ackTimer ? this._ackTimer : 'undefined or null or zero'}`);
    if (!this._ackTimer) return;
    clearTimeout(this._ackTimer);
    this._ackTimer = null;
  }

  _emitStateEvents() {
    const { LOG_TRACE } = this.logger;
    LOG_TRACE('Emitting deferred state events');
    while (this._stateEvents.length) {
      const pair = this._stateEvents.shift();
      const state = pair[0];
      const event = pair[1];
      // If the state requesting this event is still active...
      if (this.getActiveState(state.getName())) {
        // then do its action
        event.apply(state);
      }
    }
  }

  /**
   * @private
   */
  _handleAckTimeout() {
    const { LOG_TRACE } = this.logger;
    LOG_TRACE('Ack Timeout');
    this._ackTimer = null;
    this._publisher.incStat(StatType.TX_ACK_TIMEOUT);
    this.processEvent(new PublisherFSMEvent({ name: PublisherFSMEventNames.ACK_TIMEOUT }));
  }

  /**
   * If the remote flow changes, any remote state applied to the unacked messages needs to
   * be reapplied.
   *
   * @param {solace.Message} message The message to renumber.
   * @private
   */
  _renumber(message) {
    const messageIds = this._messageIds;
    const current = messageIds.next;
    message.setGuaranteedPreviousMessageId(messageIds.lastSent);
    message.setGuaranteedMessageId(current);

    const publisher = this._publisher;
    message.setFlowId(publisher.flowId);
    message.setPublisherId(publisher.publisherId);
  }

  _cloneNumbers(fromMsg, toMsg) {
    toMsg.setGuaranteedPreviousMessageId(fromMsg.getGuaranteedPreviousMessageId());
    toMsg.setGuaranteedMessageId(fromMsg.getGuaranteedMessageId());
    toMsg.setFlowId(fromMsg.getFlowId());
    toMsg.setPublisherId(fromMsg.getPublisherId());
  }

  _resetAckTimer() {
    this._clearAckTimer();
    this._startAckTimer();
  }

  _scheduleStateEvents(state, event) {
    this._stateEvents.push([state, event]);
    this._setPostEventAction(() => this._emitStateEvents(), 'Emit state events');
  }

  _setPostEventAction(action, desc = 'No action') {
    const { LOG_DEBUG, LOG_WARN } = this.logger;
    if (this._postEventAction && this._postEventAction.desc === desc) {
      LOG_DEBUG('Keeping same post event action');
      return;
    }

    if (this._postEventAction && this._postEventAction.desc) {
      LOG_WARN(`Replacing post event action ${this._postEventAction.desc} with ${desc}`);
    }
    this._postEventAction = { action: action || (() => {}), desc };
    this.setPostEventAction(() => {
      this._postEventAction.action();
      this._postEventAction = null;
    });
  }

  /**
   * @private
   */
  _startAckTimer() {
    // const { LOG_TRACE } = this.logger;
    // LOG_TRACE(`Start ack timer ${this._ackTimer ? this._ackTimer : 'undefined or null or zero'
    //           }: ${this._acknowledgeTimeoutInMsecs} ms`);
    if (this._ackTimer) return;
    this._ackTimer = setTimeout(() => this._handleAckTimeout(),
                                this._acknowledgeTimeoutInMsecs);
  }

}

module.exports.PublisherFSM = PublisherFSM;
