const MessageLib = require('solclient-message');
const SessionLib = require('solclient-session');
const SMFLib = require('solclient-smf');
const StatType = require('solclient-stats');
const {
  ErrorSubcode,
  OperationError,
} = require('solclient-error');
const {
  LOG_TRACE,
  LOG_DEBUG,
  LOG_INFO,
  LOG_WARN,
} = require('solclient-log');
const {
  SDTField,
  SDTFieldType,
  SDTStreamContainer,
} = require('solclient-sdt');
const { CacheCBInfo } = require('./cache-cb-info');
const { CacheContext } = require('./cache-context');
const { CacheGetResult } = require('./cache-get-result');
const { CacheLiveDataAction } = require('./cache-live-data-actions');
const { CacheRequest } = require('./cache-request');
const { CacheRequestResult } = require('./cache-request-result');
const { CacheRequestType } = require('./cache-request-types');
const { CacheReturnCode } = require('./cache-return-codes');
const { CacheReturnSubcode } = require('./cache-return-subcodes');
const { CacheSessionProperties } = require('./cache-session-properties');
const { CacheSessionSubscribeInfo } = require('./cache-session-subscribe-info');
const { Destination,
        Topic } = require('solclient-destination');

const { CACHE_REQUEST_PREFIX } = CacheContext;

const noOp = () => undefined;

/**
 * @classdesc
 * <b>This class is not exposed for construction by API users. Users should obtain an instance from
 * {@link solace.Session#createCacheSession}</b>
 * <p>
 * A session for performing cache requests.
 *
 * Applications must use {@link solace.Session#createCacheSession} to construct this class.
 *
 * The supplied {@link solace.CacheSessionProperties} will be copied. Subsequent modifications
 * to the passed properties will not modify the session. The properties may be reused.
 *
 * @hideconstructor
 * @memberof solace
 */
class CacheSession {
  /*
   * @constructor
   * @private
   *
   * @param {solace.CacheSessionProperties} props The properties for the cache session.
   * @param {solace.Session} session The Session on which the CacheSession will issue
   *  cache requests.
   * @param {Object} privateInterface Private methods exposed to this CacheSession.
   *
   * @throws {solace.OperationError}
   *  * if the parameters have an invalid type or value;
   *    subcode {@link ErrorSubcode.PARAMETER_INVALID_TYPE}.
   */
  constructor(props, session, privateInterface) {
    CacheSession._validateProps(props);
    const properties = new CacheSessionProperties(props.cacheName,
                                                  props.maxAgeSec,
                                                  props.maxMessages,
                                                  props.timeoutMsec);
    Object.assign(this,
                  {
                    _outstandingRequests:          {},
                    _outstandingIDs:               {},
                    _disposed:                     false,
                    _nextMessageCallbackInfo:      null,
                    _nextSessionEventCallbackInfo: null,
                    _properties:                   properties,
                    _session:                      session,
                    _sessionIF:                    privateInterface,
                  });
    this._connectToSession(session);
  }

  /**
   * @param {solace.Session} session The session to connect
   * @private
   */
  _connectToSession(session) {
    this._nextSessionEventCallbackInfo = session.getEventCBInfo();
    this._nextMessageCallbackInfo = session.getMessageCBInfo();
    session.setMessageCBInfo(new SessionLib.MessageRxCBInfo((_, message) => {
      this._handleMessage(message);
    }, this));
    session.setEventCBInfo(this._createCompoundEventCB(this._nextSessionEventCallbackInfo));
  }

  /**
   * Takes the session's existing event callback and sets it as the next delegate in a
   * chain of responsibility. The next delegate could be the client application, or another
   * cache session.
   * @param {SessionEventCBInfo} nextDelegate The next event receiver in the chain
   * @returns {SessionEventCBInfo} An event receiver that delegates to the supplied receiver
   * @private
   */
  _createCompoundEventCB(nextDelegate) {
    return new SessionLib.SessionEventCBInfo((session, sessionEvent, userObject, rfuObject) => {
      this._handleSessionEvent(nextDelegate, session, sessionEvent, userObject, rfuObject);
    }, null);
  }

  /**
   * Modified chain of responsibility. Handles the event, then passes the event to the next
   * delegate.
   * @param {SessionEventCBInfo} nextDelegate The next delegate to notify
   * @param {solace.Session} session The associated session
   * @param {SessionEvent} sessionEvent The session event to dispatch
   * @private
   */
  _handleSessionEvent(nextDelegate, session, sessionEvent /*, userObject, rfuObject */) {
    const passEvent = this._processSessionEvent(session, sessionEvent);
    if (!passEvent) {
      return;
    }
    const cbUserObject = nextDelegate.userObject;
    if (!cbUserObject) {
      nextDelegate.sessionEventCBFunction(session, sessionEvent);
    } else {
      nextDelegate.sessionEventCBFunction(session, sessionEvent, cbUserObject);
    }
  }

  /**
   * @param {Message} message The message to forward
   * @private
   */
  _sendToNextDelegate(message) {
    const cbUserObject = this._nextMessageCallbackInfo.userObject;
    if (!cbUserObject) {
      this._nextMessageCallbackInfo.messageRxCBFunction(this._session, message);
    } else {
      this._nextMessageCallbackInfo.messageRxCBFunction(this._session, message, cbUserObject);
    }
  }

  /**
   * @param {solace.Session} session The associated session
   * @param {SessionEvent} event The event to process
   * @returns {Boolean} true if event should pass to next delegate
   * @private
   */
  _processSessionEvent(session, event) {
    switch (event.sessionEventCode) {
      case SessionLib.SessionEventCode.SUBSCRIPTION_ERROR:
      case SessionLib.SessionEventCode.SUBSCRIPTION_OK:
        return this._checkSubscriptionStatus(event);
      case SessionLib.SessionEventCode.DOWN_ERROR:
        this.dispose();
        return true;
      default:
        LOG_TRACE(`Unhandled session event: ${event.sessionEventCode}`);
        return true;
    }
  }

  /**
   * @param {SessionEvent} event The event to check
   * @returns {Boolean} `true` if event should pass to next delegate
   * @private
   */
  _checkSubscriptionStatus(event) {
    // Incremental checks for whether this is our subscription.
    if ((event.correlationKey === null || event.correlationKey === undefined) ||
        (!(event.correlationKey instanceof CacheSessionSubscribeInfo)) ||
        (event.correlationKey.cacheSession !== this)) {
      return true;
    }
    const request = this._getOutstandingRequest(event.correlationKey.correlationID);
    if (!request) {
      LOG_WARN(`No request found for subscription success on ${event.correlationKey.topic}`);
      return true;
    }
    if (event.sessionEventCode === SessionLib.SessionEventCode.SUBSCRIPTION_OK) {
      this._handleSubscriptionSuccess(request, event.correlationKey.topic);
      return false;
    }
    this._handleSubscriptionError(request, event);
    return false;
  }

  /**
   * @param {CacheRequest} requestIn The cache on which to register success
   * @param {Topic} topic The topic on which to issue the request.
   * @private
   */
  _handleSubscriptionSuccess(requestIn /*, topic */) {
    // Null out this field and check completion status.
    const request = requestIn;
    request.subscriptionWaiting = null;
    this._startCacheRequest(request);
  }

  /**
   * @param {CacheRequest} request The cache request on which to register failure
   * @private
   */
  _handleSubscriptionError(request/*, event */) {
    this._terminateRequest(request, CacheReturnCode.FAIL, CacheReturnSubcode.SUBSCRIPTION_ERROR);
  }

  /**
   * @param {CacheRequest} request The cache request on which to check completion
   * @private
   */
  _checkRequestCompletion(request) {
    if (request.childRequests.length) {
      // Not finished with spawned child requests.
      LOG_DEBUG(`Awaiting termination of ${request.childRequests.length} children`);
      return;
    }
    if (request.subscriptionWaiting) {
      // Waiting for confirmation of subscription to a topic.
      LOG_DEBUG('Awaiting subscription');
      return;
    }
    if (request.timeoutHandle !== null && (!request.replyReceived)) {
      LOG_DEBUG('Awaiting timeout');
      return;
    }
    if (request.parentRequest) {
      // We're no longer waiting for a child to complete.
      const parent = request.parentRequest;
      request.cancel();
      this._unregisterRequest(request);
      this._checkRequestCompletion(parent);
      return;
    }

    // This is a parent request and it is done.
    let code;
    let subcode;
    if (request.isSuspect) {
      code = CacheReturnCode.INCOMPLETE;
      subcode = CacheReturnSubcode.SUSPECT_DATA;
    } else if (request.dataReceived) {
      code = CacheReturnCode.OK;
      if (request.liveDataFulfilled) {
        subcode = CacheReturnSubcode.LIVE_DATA_FULFILL;
      } else {
        subcode = CacheReturnSubcode.REQUEST_COMPLETE;
      }
    } else if (request.replyReceived) {
      code = CacheReturnCode.INCOMPLETE;
      subcode = CacheReturnSubcode.NO_DATA;
    } else {
      throw new Error('Sanity: should never happen');
    }
    this._terminateRequest(request, code, subcode);
  }

  /**
   * @param {CacheRequest} parentRequest The parent node for the new request
   * @param {CacheGetResult} cacheGetResult The result of this request
   * @private
   */
  _sendSeeOther(parentRequest, cacheGetResult) {
    const clusterName = cacheGetResult.clusterNameStream.getNext().getValue();
    // var root = parentRequest.getRootRequest();
    LOG_DEBUG(`See Other for ${clusterName}. Sending child request`);
    const childRequest = new CacheRequest(this,
                                          CacheRequestType.GET_MSG_REQUEST,
                                          parentRequest.requestID,
                                          new CacheCBInfo(noOp, null),
                                          parentRequest.liveDataAction,
                                          parentRequest.topic,
                                          clusterName);
    // Add this request to its parent
    parentRequest.addChild(childRequest);
    // Start the request
    this._registerRequest(childRequest);
    childRequest.startRequestTimeout(CacheSession._handleCacheRequestTimeout,
                                     this._properties.timeoutMsec);
    this._startCacheRequest(childRequest, // request to send
                            null, // no session ID
                            null, // no specific instance target
                            true); // don't return other clusters
  }

  /**
   * @param {CacheRequest} parentRequest The parent node for the new request
   * @param {CacheGetResult} cacheGetResult The result of this request
   * @private
   */
  _sendGetNext(parentRequest, cacheGetResult) {
    LOG_DEBUG('Cache result has more, sending GET_NEXT_MSG_REQUEST as child');
    // This is how to trigger "Invalid Session":
    //require("child_process").execSync('sleep 11');
    const nextRequest = new CacheRequest(this,
                                         CacheRequestType.GET_NEXT_MSG_REQUEST,
                                         parentRequest.requestID,
                                         new CacheCBInfo(noOp, null),
                                         parentRequest.liveDataAction,
                                         parentRequest.topic,
                                         parentRequest.cacheName);
    // Set up parent-child relationship
    parentRequest.addChild(nextRequest);
    // Start the request
    this._registerRequest(nextRequest);
    nextRequest.startRequestTimeout(CacheSession._handleCacheRequestTimeout,
                                    this._properties.timeoutMsec);
    this._startCacheRequest(nextRequest, // request to send
                            cacheGetResult.sessionID, // supplied session ID
                            cacheGetResult.replyTo); // supplied cache instance target
  }

  /**
   * @param {Message} message The message to inspect and handle
   * @private
   */
  _handleMessage(message) {
    // Determine if the message is associated with one of this session's requests
    const correlationID = message.getCorrelationId();
    const request = correlationID === null || correlationID === undefined
      ? null
      : this._outstandingRequests[correlationID];

    // This could be live data on a relevant topic.  Check that.
    if (!request) {
      if (this._relevantLiveData(message)) {
        this._sendToNextDelegate(message);
      }
      return;
    }

    // It's ours!
    LOG_DEBUG(`Processing reply to ${request}`);
    // bug 36404: We have a response. Cancel the timeout for this request.
    request.clearRequestTimeout();
    const streamField = message.getSdtContainer();
    const stream = streamField && streamField.getValue();
    if (!stream) {
      LOG_INFO(
        `Invalid message format for cache response: no SDT container (${
        streamField}) or stream (${stream})`);
      this._terminateRequest(request, CacheReturnCode.FAIL, CacheReturnSubcode.ERROR_RESPONSE);
    }
    // The session will never see this reply, so we need to update the stats
    // from here.
    this._incStat(StatType.RX_REPLY_MSG_RECVED);
    request.replyReceived = true;

    // If the request was fulfilled by live data, discard the reply and stop processing
    // the response.
    if (request.getRootRequest().liveDataFulfilled) {
      this._incStat(StatType.CACHE_REQUEST_FULFILL_DISCARD_RESPONSE);
      this._checkRequestCompletion(request);
      return;
    }

    if (!stream) {
      LOG_INFO('Invalid cache response did not fulfill request. Skipping response processing');
      return;
    }

    try {
      // Fill the result object with values from the result stream.
      const result = new CacheGetResult();
      result.readFromStream(stream);
      result.replyTo = message.getReplyTo();
      if (result.responseString) {
        LOG_DEBUG(`Cluster response: ${result.responseString}`);
      }
        if (result.responseCode === 7 || result.responseString == "Invalid Session") {
          LOG_INFO(`Cluster response indicates invalid session: ${result.responseString} code: ${result.responseCode}`);
          this._terminateRequest(request, CacheReturnCode.FAIL, CacheReturnSubcode.INVALID_SESSION);
        }
      // Apply the isSuspect flag. Don't clear it if the result is already suspect.
      request.isSuspect = request.isSuspect || result.isSuspect;
      // Get any inner messages.
      const messages = CacheSession._decodeMessageStream(request, result);
      // Update statistics.
      this._incStat(StatType.RX_CACHE_MSG, messages.length);
      // If we have more results to come, send a get next request.
      if (result.hasMore) {
        this._sendGetNext(request, result);
      }
      // If we have more clusters to visit, send requests to those clusters.
      if (result.clusterNameStream) {
        LOG_DEBUG('Receiving cluster stream');
        while (result.clusterNameStream.hasNext()) {
          this._sendSeeOther(request, result);
        }
      }
      // Forward any retrieved messages.
      if (messages) {
        messages.forEach((m) => {
          this._sendToNextDelegate(m);
        });
      }
      this._checkRequestCompletion(request);
    } catch (exception) {
      LOG_INFO(`Invalid message format for cache response: ${exception.stack}`);
      this._terminateRequest(request, CacheReturnCode.FAIL, CacheReturnSubcode.ERROR_RESPONSE);
    }
  }

  /**
   * @param {Message} message The message to inspect and handle
   * @returns {Boolean} `true` if the next delegate should handle this message
   * @private
   */
  _relevantLiveData(message) {
    // If the next message processor does not belong to a cache
    // session, and this is a CRQ message, suppress it.
    if (message.getCorrelationId() &&
        message.getCorrelationId().startsWith(CACHE_REQUEST_PREFIX) &&
        !(this._nextMessageCallbackInfo.userObject instanceof CacheSession)) {
      LOG_WARN('DROP: Dropping CRQ reply due to no remaining Cache Session processors on message ' +
               'callback chain');
      this._incStat(StatType.RX_REPLY_MSG_DISCARD);
      return false;
    }
    // Otherwise, assume the message should flow through.
    // If all _performLiveDataAction calls return true, we return true;
    return Object.keys(this._outstandingRequests).every(checkCID =>
      this._performLiveDataAction(this._outstandingRequests[checkCID], message));
  }

  /**
   * @param {CacheRequest} requestIn The cache request for which to perform the action
   * @param {Message} message The message to process on the request
   * @returns {Boolean} true if message should flow through after action.
   * @private
   */
  _performLiveDataAction(requestIn, message) {
    const request = requestIn;
    request.dataReceived = true;
    switch (request.liveDataAction) {
      case CacheLiveDataAction.QUEUE:
        request.queuedLiveData.push(message);
        return false;

      case CacheLiveDataAction.FULFILL:
        if (!request.liveDataFulfilled) {
          this._fulfillRequest(request);
        }
        return true;

      default:
        return true;
    }
  }

  /**
   * @param {CacheRequest} requestIn The request to fulfill
   * @private
   */
  _fulfillRequest(requestIn) {
    const request = requestIn;
    request.liveDataFulfilled = true;
    this._trackCompletionStats(CacheReturnCode.OK, CacheReturnSubcode.LIVE_DATA_FULFILL);
    // We have more work to do here -- we need to return the live data first.
    // Schedule the notification for later.
    setTimeout(() => {
      CacheSession._notifyCallback(request,
                                   CacheReturnCode.OK,
                                   CacheReturnSubcode.LIVE_DATA_FULFILL,
                                   request.getTopic(),
                                   null);
    }, 0);
  }

  /**
   * Disposes the session.  No cache requests will be sent by this CacheSession after it is
   * _disposed.
   *
   * Any subsequent operations on the session will throw {OperationError}.
   *
   * Any pending operations will immediately terminate, returning
   *   * {@link solace.CacheRequestResult}
   *     * #returnCode === {@link solace.CacheReturnCode.INCOMPLETE}
   *     * #subcode === {@link solace.CacheReturnSubcode.CACHE_SESSION_DISPOSED}
   * @throws {solace.OperationError} if the CacheSession is already _disposed.
   */
  dispose() {
    const toTerminate = Object.keys(this._outstandingRequests)
      .map(correlationID => this._outstandingRequests[correlationID])
      .filter(request => request instanceof CacheRequest);
    toTerminate.forEach((request) => {
      this._terminateRequest(request,
                             CacheReturnCode.INCOMPLETE,
                             CacheReturnSubcode.CACHE_SESSION_DISPOSED);
    });
    this._outstandingRequests = [];
    // Restore original listeners
    this._session.setEventCBInfo(this._nextSessionEventCallbackInfo);
    this._session.setMessageCBInfo(this._nextMessageCallbackInfo);
    // Set _disposed
    this._disposed = true;
  }

  /**
   * Gets the cache session properties.
   *
   * @returns {solace.CacheSessionProperties} The properties for the session.
   * @throws {solace.OperationError} if the CacheSession is disposed.
   */
  getProperties() {
    return this._properties;
  }

  /**
   * Issues an asynchronous cache request. The result of the request will be returned via the
   * listener. Messages returned as a result of issuing the request will be returned to the
   * application via the {@link solace.MessageRxCBInfo} associated with this
   * {@link solace.CacheSession}'s {@link solace.Session}
   *
   * @param {Number} requestID The application-assigned ID number for the request.
   * @param {solace.Destination} topic The topic destination for which the cache request will be
   *    made.
   * @param {Boolean} subscribe If true, the session will subscribe to the given {Topic}, if it is
   * not already subscribed, before performing the cache request.
   * @param {solace.CacheLiveDataAction} liveDataAction The action to perform when the
   *    {@link solace.CacheSession} receives live data on the given topic.
   * @param {solace.CacheCBInfo} cbInfo Callback info for the cache request.
   *
   * @throws {solace.OperationError} In the following cases:
   * * If the CacheSession is disposed.
   *    Subcode: {@link solace.ErrorSubcode.INVALID_OPERATION}
   * * If one or more parameters were invalid.
   *    Subcode: {@link solace.ErrorSubcode.PARAMETER_INVALID_TYPE}
   * * If the supplied topic and live data action cannot be combined.
   *    Subcode: {@link solace.ErrorSubcode.PARAMETER_CONFLICT}
   * * If the supplied topic or live data action cannot be used given the current outstanding
   *    requests.
   *    Subcode: {@link solace.ErrorSubcode.PARAMETER_CONFLICT}
   */
  sendCacheRequest(requestID, topic, subscribe, liveDataAction, cbInfo) {
    if (arguments.length !== 5) {
      throw new OperationError(`sendCacheRequest() invoked with an illegal argument count of ${
        arguments.length}`);
    }
    if (typeof subscribe !== 'boolean') {
      throw new OperationError(
        `Invalid subscribe flag argument, should be a boolean but was ${typeof subscribe}`);
    }
    if (typeof requestID !== 'number' || Number.isNaN(requestID)) {
      throw new OperationError('Invalid requestID', ErrorSubcode.PARAMETER_INVALID_TYPE, null);
    }
    if (this._outstandingIDs[requestID]) {
      throw new OperationError('Request already in progress with this requestID');
    }
    if (!(topic instanceof Destination)) {
      throw new OperationError('Invalid topic', ErrorSubcode.PARAMETER_INVALID_TYPE, (typeof topic));
    }
    topic.validate();
    if (!(liveDataAction === CacheLiveDataAction.FLOW_THRU || liveDataAction ===
        CacheLiveDataAction.FULFILL || liveDataAction === CacheLiveDataAction.QUEUE)) {
      throw new OperationError('Invalid live data action', ErrorSubcode.PARAMETER_OUT_OF_RANGE);
    }
    if (topic.isWildcarded() && liveDataAction !== CacheLiveDataAction.FLOW_THRU) {
      throw new OperationError('Wildcarded topic not supported for this live data action',
        ErrorSubcode.PARAMETER_CONFLICT);
    }
    if (!(cbInfo instanceof CacheCBInfo)) {
      throw new OperationError('Callback info was not an instance of CacheCBInfo');
    }
    if (this._disposed) {
      CacheSession._notifyCallbackError(cbInfo, requestID, CacheReturnCode.FAIL,
                                        CacheReturnSubcode.CACHE_SESSION_DISPOSED, topic,
                                        'Cache request failed: the cache session is disposed.');
      return;
    }
    if (this._session._disposed) {
      CacheSession._notifyCallbackError(cbInfo, requestID, CacheReturnCode.FAIL,
                                        CacheReturnSubcode.INVALID_SESSION, topic,
                                        'Cache request failed: the session is disposed.');
      return;
    }

    const request = new CacheRequest(this,
      CacheRequestType.GET_MSG_REQUEST,
      requestID,
      cbInfo,
      liveDataAction,
      topic,
      this._properties.cacheName);

    const matchingRequestKeys = Object.keys(this._outstandingRequests).filter(key =>
      this._outstandingRequests[key].topic.getName() === topic.getName());

    if (matchingRequestKeys.length) {
      // Topic name matches mean a conflict unless both requests are FLOW_THRU
      const conflictKeys = (
        liveDataAction !== CacheLiveDataAction.FLOW_THRU
          ? matchingRequestKeys
          : matchingRequestKeys.filter(k =>
            this._outstandingRequests[k].liveDataAction !== CacheLiveDataAction.FLOW_THRU));
      if (conflictKeys.length) {
        const conflictRequest = this._outstandingRequests[conflictKeys[0]];
        LOG_WARN(`Existing request ${conflictRequest} conflicts. Rejecting request ${request}`);
         // Register this request so that it is not dismissed as an orphan.
        this._registerRequest(request);
        this._terminateRequest(request,
                               CacheReturnCode.FAIL,
                               CacheReturnSubcode.REQUEST_ALREADY_IN_PROGRESS);
        return;
      }
    }

    this._registerRequest(request);
    request.startRequestTimeout(CacheSession._handleCacheRequestTimeout,
                                this._properties.timeoutMsec);
    if (subscribe) {
      const waitingForSubscribeInfo = new CacheSessionSubscribeInfo(request.correlationID, topic,
        this);
      request._subscriptionWaiting = waitingForSubscribeInfo;
      this._session.subscribe(topic, true, waitingForSubscribeInfo);
      return;
    }
    this._startCacheRequest(request);
  }

  /**
   * @param {solace.Session} session The session associated with the request
   * @param {SessionEvent} sessionEvent The session event
   * @param {CacheRequest} userObject The user-specified context object
   * @private
   */
  _handleCacheRequestFailed(session, sessionEvent, userObject /*, rfuObject */) {
    this._terminateRequest(userObject.getRequestID(),
                           CacheReturnCode.FAIL,
                           CacheReturnSubcode.ERROR_RESPONSE);
  }

  /**
   * @param {CacheRequest} request The new request to register
   * @private
   */
  _registerRequest(request) {
    this._outstandingRequests[request.correlationID] = request;
    if (!request.parentRequest) {
      this._outstandingIDs[request.requestID] = request;
    }
  }

  /**
   * @param {Number} correlationID The ID of an existing request
   * @returns {CacheRequest} The existing request with the given ID
   * @private
   */
  _getOutstandingRequest(correlationID) {
    return this._outstandingRequests[correlationID];
  }

  /**
   *
   * @param {CacheRequest} request The request to start
   * @param {?Number} sessionID Only when returned from a cache instance as part of a
   * previous message
   * @param {?Topic} destination Only when required by a cache reply
   * @param {?Boolean} suppressClusters True when other clusters should be excluded from
   * the response, as in a request resulting from a "see other"
   * @private
   */
  _startCacheRequest(request, sessionID, destination, suppressClusters) {
    const message = new MessageLib.Message();

    // Prepare message
    message.setCorrelationId(request.correlationID);
    if (destination) {
      message.setDestination(destination);
    } else {
      message.setDestination(
        Topic.createFromName(this._properties.cachePrefix + request.cacheName));
    }
    message.setReplyTo(Topic.createFromName(this._session.getSessionProperties().p2pInboxInUse));
    message.setDeliverToOne(request.cacheMessageType === CacheRequestType.GET_MSG_REQUEST);

    // Prepare stream container
    const stream = new SDTStreamContainer();
    stream.addField(SDTFieldType.UINT32, request.cacheMessageType);
    stream.addField(SDTFieldType.UINT32, CacheRequest.VERSION);
    stream.addField(SDTFieldType.STRING, request.topic.getName());
    stream.addField(SDTFieldType.UINT32, CacheRequest.REPLY_SIZE_LIMIT);

    if (typeof sessionID === 'number') {
      LOG_DEBUG(`Including session ID: ${sessionID}`);
      stream.addField(SDTFieldType.UINT32, sessionID);
    }

    stream.addField(SDTFieldType.UINT32, this._properties.maxMessages);
    stream.addField(SDTFieldType.UINT32, this._properties.maxAgeSec);
    if (request.cacheMessageType === CacheRequestType.GET_MSG_REQUEST) {
      stream.addField(SDTFieldType.BOOL,
                      this._properties.includeOtherClusters && (!suppressClusters));
    }

    stream.addField(SDTFieldType.BOOL, false); // includeTimestamps, 6.17.1
    if (request.cacheMessageType === CacheRequestType.GET_MSG_REQUEST) {
      stream.addField(SDTFieldType.UINT32, Math.round(this._properties.timeoutMsec / 1000));
    }

    // Load stream container
    message.setSdtContainer(SDTField.create(SDTFieldType.STREAM, stream));
    try {
      LOG_DEBUG(`Sending ${request}`);
      this._session.send(message);

      if (!request.parentRequest) { // Don't count child requests in CACHE_REQUEST_SENT total
        this._incStat(StatType.CACHE_REQUEST_SENT);
      }
    } catch (e) {
      LOG_INFO(`Failed to send request: ${e.message}`);
      this._terminateRequest(request, CacheReturnCode.FAIL, CacheReturnSubcode.ERROR_RESPONSE, e);
    }
  }

  /**
   * Increments a stat.
   * @param  {String} statType The stat to increment
   * @param  {Number} value    The amount by which to increment the state
   * @private
   */
  _incStat(statType, value) {
    if (!this._session) {
      LOG_DEBUG("Can't log stat: session is disposed");
      return;
    }
    if (!this._sessionIF) {
      LOG_INFO("Can't log stat: session statistics not available");
      return;
    }
    this._sessionIF.incStat(statType, value);
  }

  /**
   * Unregisters the given request.
   * @param  {CacheRequest} request The request to unreguster
   * @private
   */
  _unregisterRequest(request) {
    delete this._outstandingRequests[request.correlationID];
    delete this._outstandingIDs[request.requestID];
  }

  /**
   * @param {CacheReturnCode} returnCode The return code for this completion
   * @param {CacheReturnSubcode} subcode The subcode for this completion
   * @private
   */
  _trackCompletionStats(returnCode, subcode) {
    switch (returnCode) {
      case CacheReturnCode.OK:
        this._incStat(StatType.CACHE_REQUEST_OK_RESPONSE);
        if (subcode === CacheReturnSubcode.LIVE_DATA_FULFILL) {
          this._incStat(StatType.CACHE_REQUEST_LIVE_DATA_FULFILL);
        }
        break;
      case CacheReturnCode.INCOMPLETE:
        this._incStat(StatType.CACHE_REQUEST_INCOMPLETE_RESPONSE);
        break;
      case CacheReturnCode.FAIL:
        this._incStat(StatType.CACHE_REQUEST_FAIL_RESPONSE);
        break;
      default:
        throw new Error('Sanity: no return code supplied');
    }
  }

  /**
   * @param {CacheRequest} requestIn The request to terminate
   * @param {CacheReturnCode} returnCode The return code for this operation
   * @param {CacheReturnSubcode} subcode The subcode for this operation
   * @param {Error} error Any error associated with this operation
   * @private
   */
  _terminateRequest(requestIn, returnCode, subcode, error) {
    const request = requestIn.getRootRequest();
    if (!this._outstandingRequests[request.correlationID]) {
      // Request is unknown or was previously terminated
      return;
    }
    const cbInfo = request.cbInfo;
    if (!cbInfo) {
      LOG_WARN(`No callback info provided for ${request}. Cannot notify`);
      return; // Cannot continue
    }
    const callback = cbInfo.getCallback();
    if (!callback) {
      LOG_WARN(`No callback provided for ${request}. Cannot notify`);
      return; // Cannot continue
    }
    const topic = request.getTopic();
    if (!topic) {
      LOG_WARN(`No topic provided for ${request}`);
    }
    request.queuedLiveData.forEach(data => this._sendToNextDelegate(data));

    // Unregister before callback so that the client application can treat the request ID
    // as "freed" and reuse it
    request.cancel();
    this._unregisterRequest(request);

    if (!request.liveDataFulfilled) {
      // All of this has already been done on fulfill.
      this._trackCompletionStats(returnCode, subcode);
      CacheSession._notifyCallback(request, returnCode, subcode, topic, error);
    }
  }

  /**
   * @param {CacheRequest} requestIn The request with a result to decode
   * @param {CacheGetResult} result The result to decode
   * @returns {Array.<Message>} The messages contained in the result
   * @private
   */
  static _decodeMessageStream(requestIn, result) {
    if (!result.messageStream) {
      return [];
    }

    LOG_DEBUG('Receiving messages');
    const messages = [];
    const request = requestIn;
    while (result.messageStream.hasNext()) {
      request.dataReceived = true;
      const data = result.messageStream.getNext().getValue();
      const innerMessage = SMFLib.Codec.Decode.decodeCompoundMessage(data, 0);
      if (!innerMessage) {
        continue;
      }
      const cacheStatus = result.isSuspect
        ? MessageLib.MessageCacheStatus.SUSPECT
        : MessageLib.MessageCacheStatus.CACHED;
      innerMessage._setCacheStatus(cacheStatus);
      innerMessage._setCacheRequestID(request.requestID);
      messages.push(innerMessage);
    }
    LOG_DEBUG(`${messages.length} cached messages received`);
    return messages;
  }

  /**
   * Closure call context; `this` will be redefined
   * @param {CacheRequest} cacheRequest The request that timed out
   * @private
   */
  static _handleCacheRequestTimeout(cacheRequest) {
    const context = cacheRequest.cacheSession;
    if (!context._getOutstandingRequest(cacheRequest.correlationID)) {
      LOG_INFO(`Timeout for ${cacheRequest} was not unregistered. Ignoring`);
      // already completed
      return;
    }

    // bug 36404: Cache request timeout is to be interpreted as timeout per session request-reply,
    // not timeout per cache request-reply.
    // Implementation: Timeouts on parent requests are cancelled when a child request is spawned.
    // Timeouts on child requests cause the root request to fail.
    LOG_INFO(`Request ${cacheRequest} timed out`);
    context._terminateRequest(cacheRequest.getRootRequest(), CacheReturnCode.INCOMPLETE,
                              CacheReturnSubcode.REQUEST_TIMEOUT);
  }


  /**
   * @param {CacheRequest} request The request that is notifying
   * @param {CacheReturnCode} returnCode The return code for the notification
   * @param {CacheReturnSubcode} subcode The subcode for the notification
   * @param {Destination} topic The topic associated with the notification
   * @param {Error} error Any error associated with the notification
   * @private
   */
  static _notifyCallback(request, returnCode, subcode, topic, error) {
    const cbInfo = request.cbInfo;
    const callback = cbInfo.getCallback();
    callback(request.requestID,
             new CacheRequestResult(returnCode, subcode, topic, error),
             cbInfo.getUserObject());
  }

  /**
   * @param {solace.CacheCBInfo} cbInfo Callback info for the cache request.
   * @param {Number} requestID The application-assigned ID number for the request
   * @param {CacheReturnCode} returnCode The return code for the notification
   * @param {CacheReturnSubcode} subcode The subcode for the notification
   * @param {Destination} topic The topic associated with the notification
   * @param {Error} error Any error associated with the notification
   * @private
   */
  static _notifyCallbackError(cbInfo, requestID, returnCode, subcode, topic, error) {
    const callback = cbInfo.getCallback();
    callback(requestID,
             new CacheRequestResult(returnCode, subcode, topic, error),
             cbInfo.getUserObject());
  }

  /**
   * Validates the cache session properties.
   * @param {solace.CacheSessionProperties} props The properties to validate
   * @private
   */
  static _validateProps(props) {
    if ((typeof (props.cacheName) !== 'string')) {
      throw new OperationError('Invalid parameter type for cacheName', ErrorSubcode.PARAMETER_INVALID_TYPE);
    }
    // Create from name will throw if the cacheName is invalid.  This performs more checking than
    // the legacy API did, but I think it is all justified as we shouldn't be able to use anything
    // that would be rejected here as a cacheName.
    if (Topic.createFromName(props.cacheName).isWildcarded()) {
      throw new OperationError(
        `Invalid cacheName '${props.cacheName}'. The cacheName cannot be wildcarded`,
        ErrorSubcode.PARAMETER_OUT_OF_RANGE);
    }
    if ((typeof (props.maxAgeSec) !== 'number')) {
      throw new OperationError('Invalid parameter type for maxAgeSec', ErrorSubcode.PARAMETER_INVALID_TYPE);
    }
    if (props.maxAgeSec < 0) {
      throw new OperationError('Invalid value for maxAgeSec; must be >= 0', ErrorSubcode.PARAMETER_OUT_OF_RANGE);
    }
    if ((typeof (props.maxMessages) !== 'number')) {
      throw new OperationError('Invalid parameter type for maxMessages', ErrorSubcode.PARAMETER_INVALID_TYPE);
    }
    if (props.maxMessages < 0) {
      throw new OperationError('Invalid value for maxMessages; must be >= 0', ErrorSubcode.PARAMETER_OUT_OF_RANGE);
    }
    if ((typeof (props.timeoutMsec) !== 'number')) {
      throw new OperationError('Invalid parameter type for timeoutMsec', ErrorSubcode.PARAMETER_INVALID_TYPE);
    }
    if (props.timeoutMsec < 3000) {
      throw new OperationError('Invalid value for timeoutMsec; must be >= 3000', ErrorSubcode.PARAMETER_OUT_OF_RANGE);
    }
  }

}

module.exports.CacheSession = CacheSession;
