Source: lib/client.js

// Copyright (c) 2017 Intel Corporation. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

'use strict';

const rclnodejs = require('./native_loader.js');
const DistroUtils = require('./distro.js');
const Entity = require('./entity.js');
const {
  TypeValidationError,
  TimeoutError,
  AbortError,
} = require('./errors.js');
const debug = require('debug')('rclnodejs:client');

// Polyfill for AbortSignal.any() for Node.js <= 20.3.0
// AbortSignal.any() was added in Node.js 20.3.0
// See https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal/any_static
if (!AbortSignal.any) {
  AbortSignal.any = function (signals) {
    // Filter out null/undefined values and validate inputs
    const validSignals = Array.isArray(signals)
      ? signals.filter((signal) => signal != null)
      : [];

    // If no valid signals, return a never-aborting signal
    if (validSignals.length === 0) {
      return new AbortController().signal;
    }

    const controller = new AbortController();
    const listeners = [];

    // Cleanup function to remove all event listeners
    const cleanup = () => {
      listeners.forEach(({ signal, listener }) => {
        signal.removeEventListener('abort', listener);
      });
    };

    for (const signal of validSignals) {
      if (signal.aborted) {
        cleanup();
        controller.abort(signal.reason);
        return controller.signal;
      }

      const listener = () => {
        cleanup();
        controller.abort(signal.reason);
      };

      signal.addEventListener('abort', listener);
      listeners.push({ signal, listener });
    }

    return controller.signal;
  };
}

/**
 * @class - Class representing a Client in ROS
 * @hideconstructor
 */

class Client extends Entity {
  constructor(handle, nodeHandle, serviceName, typeClass, options) {
    super(handle, typeClass, options);
    this._nodeHandle = nodeHandle;
    this._serviceName = serviceName;
    this._sequenceNumberToCallbackMap = new Map();
  }

  /**
   * This callback is called when a response is sent back from service
   * @callback ResponseCallback
   * @param {Object} response - The response sent from the service
   * @see [Client.sendRequest]{@link Client#sendRequest}
   * @see [Node.createService]{@link Node#createService}
   * @see {@link Client}
   * @see {@link Service}
   */

  /**
   * Send the request and will be notified asynchronously if receiving the response.
   * @param {object} request - The request to be submitted.
   * @param {ResponseCallback} callback - Thc callback function for receiving the server response.
   * @return {undefined}
   * @see {@link ResponseCallback}
   */
  sendRequest(request, callback) {
    if (typeof callback !== 'function') {
      throw new TypeValidationError('callback', callback, 'function', {
        entityType: 'service',
        entityName: this._serviceName,
      });
    }

    let requestToSend =
      request instanceof this._typeClass.Request
        ? request
        : new this._typeClass.Request(request);

    let rawRequest = requestToSend.serialize();
    let sequenceNumber = rclnodejs.sendRequest(this._handle, rawRequest);
    debug(`Client has sent a ${this._serviceName} request.`);
    this._sequenceNumberToCallbackMap.set(sequenceNumber, callback);
  }

  /**
   * Send the request and return a Promise that resolves with the response.
   * @param {object} request - The request to be submitted.
   * @param {object} [options] - Optional parameters for the request.
   * @param {number} [options.timeout] - Timeout in milliseconds for the request.
   * @param {AbortSignal} [options.signal] - AbortSignal to cancel the request.
   * @return {Promise<object>} Promise that resolves with the service response.
   * @throws {module:rclnodejs.TimeoutError} If the request times out (when options.timeout is exceeded).
   * @throws {module:rclnodejs.AbortError} If the request is manually aborted (via options.signal).
   * @throws {Error} If the request fails for other reasons.
   */
  sendRequestAsync(request, options = {}) {
    return new Promise((resolve, reject) => {
      let sequenceNumber = null;
      let isResolved = false;
      let isTimeout = false;

      const cleanup = () => {
        if (sequenceNumber !== null) {
          this._sequenceNumberToCallbackMap.delete(sequenceNumber);
        }
        isResolved = true;
      };

      let effectiveSignal = options.signal;

      if (options.timeout !== undefined && options.timeout >= 0) {
        const timeoutSignal = AbortSignal.timeout(options.timeout);

        timeoutSignal.addEventListener('abort', () => {
          isTimeout = true;
        });

        if (options.signal) {
          effectiveSignal = AbortSignal.any([options.signal, timeoutSignal]);
        } else {
          effectiveSignal = timeoutSignal;
        }
      }

      if (effectiveSignal) {
        if (effectiveSignal.aborted) {
          const error = isTimeout
            ? new TimeoutError('Service request', options.timeout, {
                entityType: 'service',
                entityName: this._serviceName,
              })
            : new AbortError('Service request', undefined, {
                entityType: 'service',
                entityName: this._serviceName,
              });
          reject(error);
          return;
        }

        effectiveSignal.addEventListener('abort', () => {
          if (!isResolved) {
            cleanup();
            const error = isTimeout
              ? new TimeoutError('Service request', options.timeout, {
                  entityType: 'service',
                  entityName: this._serviceName,
                })
              : new AbortError('Service request', undefined, {
                  entityType: 'service',
                  entityName: this._serviceName,
                });
            reject(error);
          }
        });
      }

      try {
        let requestToSend =
          request instanceof this._typeClass.Request
            ? request
            : new this._typeClass.Request(request);

        let rawRequest = requestToSend.serialize();
        sequenceNumber = rclnodejs.sendRequest(this._handle, rawRequest);

        debug(`Client has sent a ${this._serviceName} request (async).`);

        this._sequenceNumberToCallbackMap.set(sequenceNumber, (response) => {
          if (!isResolved) {
            cleanup();
            resolve(response);
          }
        });
      } catch (error) {
        cleanup();
        reject(error);
      }
    });
  }

  processResponse(sequenceNumber, response) {
    if (this._sequenceNumberToCallbackMap.has(sequenceNumber)) {
      debug(`Client has received ${this._serviceName} response from service.`);
      let callback = this._sequenceNumberToCallbackMap.get(sequenceNumber);
      this._sequenceNumberToCallbackMap.delete(sequenceNumber);
      callback(response.toPlainObject(this.typedArrayEnabled));
    } else {
      debug(
        `Client has received an unexpected ${this._serviceName} with sequence number ${sequenceNumber}.`
      );
    }
  }

  /**
   * Checks if the service is available.
   * @return {boolean} true if the service is available.
   */
  isServiceServerAvailable() {
    return rclnodejs.serviceServerIsAvailable(this._nodeHandle, this.handle);
  }

  /**
   * Wait until the service server is available or a timeout is reached. This
   * function polls for the service state so it may not return as soon as the
   * service is available.
   * @param {number} timeout The maximum amount of time to wait for, if timeout
   * is `undefined` or `< 0`, this will wait indefinitely.
   * @return {Promise<boolean>} true if the service is available.
   */
  async waitForService(timeout = undefined) {
    let deadline = Infinity;
    if (timeout !== undefined && timeout >= 0) {
      deadline = Date.now() + timeout;
    }
    let waitMs = 5;
    let serviceAvailable = this.isServiceServerAvailable();
    while (!serviceAvailable && Date.now() < deadline) {
      waitMs *= 2;
      waitMs = Math.min(waitMs, 1000);
      if (timeout !== undefined && timeout >= -1) {
        waitMs = Math.min(waitMs, deadline - Date.now());
      }
      await new Promise((resolve) => setTimeout(resolve, waitMs));
      serviceAvailable = this.isServiceServerAvailable();
    }
    return serviceAvailable;
  }

  static createClient(nodeHandle, serviceName, typeClass, options) {
    let type = typeClass.type();
    let handle = rclnodejs.createClient(
      nodeHandle,
      serviceName,
      type.interfaceName,
      type.pkgName,
      options.qos
    );
    return new Client(handle, nodeHandle, serviceName, typeClass, options);
  }

  /**
   * @type {string}
   */
  get serviceName() {
    return rclnodejs.getClientServiceName(this._handle);
  }

  /**
   * Configure introspection.
   * @param {Clock} clock - Clock to use for service event timestamps
   * @param {QoS} qos - QoSProfile for the service event publisher
   * @param {ServiceIntrospectionState} introspectionState - State to set introspection to
   */
  configureIntrospection(clock, qos, introspectionState) {
    if (DistroUtils.getDistroId() <= DistroUtils.getDistroId('humble')) {
      console.warn(
        'Service introspection is not supported by this version of ROS 2'
      );
      return;
    }

    let type = this.typeClass.type();
    rclnodejs.configureClientIntrospection(
      this.handle,
      this._nodeHandle,
      clock.handle,
      type.interfaceName,
      type.pkgName,
      qos,
      introspectionState
    );
  }

  /**
   * Get the logger name for this client.
   * @returns {string} The logger name for this client.
   */
  get loggerName() {
    return rclnodejs.getNodeLoggerName(this._nodeHandle);
  }
}

module.exports = Client;