• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2021 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15import {Status} from '@pigweed/pw_status';
16import {Message} from 'google-protobuf';
17
18import WaitQueue = require('wait-queue');
19
20import {PendingCalls, Rpc} from './rpc_classes';
21
22export type Callback = (a: any) => any;
23
24class RpcError extends Error {
25  status: Status;
26
27  constructor(rpc: Rpc, status: Status) {
28    let message = '';
29    if (status === Status.NOT_FOUND) {
30      message = ': the RPC server does not support this RPC';
31    } else if (status === Status.DATA_LOSS) {
32      message = ': an error occurred while decoding the RPC payload';
33    }
34
35    super(`${rpc.method.name} failed with error ${Status[status]}${message}`);
36    this.status = status;
37  }
38}
39
40class RpcTimeout extends Error {
41  readonly rpc: Rpc;
42  readonly timeoutMs: number;
43
44  constructor(rpc: Rpc, timeoutMs: number) {
45    super(`${rpc.method.name} timed out after ${timeoutMs} ms`);
46    this.rpc = rpc;
47    this.timeoutMs = timeoutMs;
48  }
49}
50
51/** Represent an in-progress or completed RPC call. */
52export class Call {
53  // Responses ordered by arrival time. Undefined signifies stream completion.
54  private responseQueue = new WaitQueue<Message | undefined>();
55  protected responses: Message[] = [];
56
57  private rpcs: PendingCalls;
58  private rpc: Rpc;
59
60  private onNext: Callback;
61  private onCompleted: Callback;
62  private onError: Callback;
63
64  status?: Status;
65  error?: Status;
66  callbackException?: Error;
67
68  constructor(
69    rpcs: PendingCalls,
70    rpc: Rpc,
71    onNext: Callback,
72    onCompleted: Callback,
73    onError: Callback
74  ) {
75    this.rpcs = rpcs;
76    this.rpc = rpc;
77
78    this.onNext = onNext;
79    this.onCompleted = onCompleted;
80    this.onError = onError;
81  }
82
83  /* Calls the RPC. This must be called immediately after construction. */
84  invoke(request?: Message, ignoreErrors = false): void {
85    const previous = this.rpcs.sendRequest(
86      this.rpc,
87      this,
88      ignoreErrors,
89      request
90    );
91
92    if (previous !== undefined && !previous.completed) {
93      previous.handleError(Status.CANCELLED);
94    }
95  }
96
97  get completed(): boolean {
98    return this.status !== undefined || this.error !== undefined;
99  }
100
101  private invokeCallback(func: () => {}) {
102    try {
103      func();
104    } catch (err: unknown) {
105      if (err instanceof Error) {
106        console.error(
107          `An exception was raised while invoking a callback: ${err}`
108        );
109        this.callbackException = err;
110      }
111      console.error(`Unexpected item thrown while invoking callback: ${err}`);
112    }
113  }
114
115  handleResponse(response: Message): void {
116    this.responses.push(response);
117    this.responseQueue.push(response);
118    this.invokeCallback(() => this.onNext(response));
119  }
120
121  handleCompletion(status: Status) {
122    this.status = status;
123    this.responseQueue.push(undefined);
124    this.invokeCallback(() => this.onCompleted(status));
125  }
126
127  handleError(error: Status): void {
128    this.error = error;
129    this.responseQueue.push(undefined);
130    this.invokeCallback(() => this.onError(error));
131  }
132
133  private async queuePopWithTimeout(
134    timeoutMs: number
135  ): Promise<Message | undefined> {
136    return new Promise(async (resolve, reject) => {
137      let timeoutExpired = false;
138      const timeoutWatcher = setTimeout(() => {
139        timeoutExpired = true;
140        reject(new RpcTimeout(this.rpc, timeoutMs));
141      }, timeoutMs);
142      const response = await this.responseQueue.shift();
143      if (timeoutExpired) {
144        this.responseQueue.unshift(response);
145        return;
146      }
147      clearTimeout(timeoutWatcher);
148      resolve(response);
149    });
150  }
151
152  /**
153   * Yields responses up the specified count as they are added.
154   *
155   * Throws an error as soon as it is received even if there are still
156   * responses in the queue.
157   *
158   * Usage
159   * ```
160   * for await (const response of call.getResponses(5)) {
161   *  console.log(response);
162   * }
163   * ```
164   *
165   * @param {number} count The number of responses to read before returning.
166   *    If no value is specified, getResponses will block until the stream
167   *    either ends or hits an error.
168   * @param {number} timeout The number of milliseconds to wait for a response
169   *    before throwing an error.
170   */
171  async *getResponses(
172    count?: number,
173    timeoutMs?: number
174  ): AsyncGenerator<Message> {
175    this.checkErrors();
176
177    if (this.completed && this.responseQueue.length == 0) {
178      return;
179    }
180
181    let remaining = count ?? Number.POSITIVE_INFINITY;
182    while (remaining > 0) {
183      const response =
184        timeoutMs === undefined
185          ? await this.responseQueue.shift()
186          : await this.queuePopWithTimeout(timeoutMs!);
187      this.checkErrors();
188      if (response === undefined) {
189        return;
190      }
191      yield response!;
192      remaining -= 1;
193    }
194  }
195
196  cancel(): boolean {
197    if (this.completed) {
198      return false;
199    }
200
201    this.error = Status.CANCELLED;
202    return this.rpcs.sendCancel(this.rpc);
203  }
204
205  private checkErrors(): void {
206    if (this.callbackException !== undefined) {
207      throw this.callbackException;
208    }
209    if (this.error !== undefined) {
210      throw new RpcError(this.rpc, this.error);
211    }
212  }
213
214  protected async unaryWait(timeoutMs?: number): Promise<[Status, Message]> {
215    for await (const response of this.getResponses(1, timeoutMs)) {
216    }
217    if (this.status === undefined) {
218      throw Error('Unexpected undefined status at end of stream');
219    }
220    if (this.responses.length !== 1) {
221      throw Error(`Unexpected number of responses: ${this.responses.length}`);
222    }
223    return [this.status!, this.responses[0]];
224  }
225
226  protected async streamWait(timeoutMs?: number): Promise<[Status, Message[]]> {
227    for await (const response of this.getResponses(undefined, timeoutMs)) {
228    }
229    if (this.status === undefined) {
230      throw Error('Unexpected undefined status at end of stream');
231    }
232    return [this.status!, this.responses];
233  }
234
235  protected sendClientStream(request: Message) {
236    this.checkErrors();
237    if (this.status !== undefined) {
238      throw new RpcError(this.rpc, Status.FAILED_PRECONDITION);
239    }
240    this.rpcs.sendClientStream(this.rpc, request);
241  }
242
243  protected finishClientStream(requests: Message[]) {
244    for (const request of requests) {
245      this.sendClientStream(request);
246    }
247
248    if (!this.completed) {
249      this.rpcs.sendClientStreamEnd(this.rpc);
250    }
251  }
252}
253
254/** Tracks the state of a unary RPC call. */
255export class UnaryCall extends Call {
256  /** Awaits the server response */
257  async complete(timeoutMs?: number): Promise<[Status, Message]> {
258    return await this.unaryWait(timeoutMs);
259  }
260}
261
262/** Tracks the state of a client streaming RPC call. */
263export class ClientStreamingCall extends Call {
264  /** Gets the last server message, if it exists */
265  get response(): Message | undefined {
266    return this.responses.length > 0
267      ? this.responses[this.responses.length - 1]
268      : undefined;
269  }
270
271  /** Sends a message from the client. */
272  send(request: Message) {
273    this.sendClientStream(request);
274  }
275
276  /** Ends the client stream and waits for the RPC to complete. */
277  async finishAndWait(
278    requests: Message[] = [],
279    timeoutMs?: number
280  ): Promise<[Status, Message[]]> {
281    this.finishClientStream(requests);
282    return await this.streamWait(timeoutMs);
283  }
284}
285
286/** Tracks the state of a server streaming RPC call. */
287export class ServerStreamingCall extends Call {
288  complete(timeoutMs?: number): Promise<[Status, Message[]]> {
289    return this.streamWait(timeoutMs);
290  }
291}
292
293/** Tracks the state of a bidirectional streaming RPC call. */
294export class BidirectionalStreamingCall extends Call {
295  /** Sends a message from the client. */
296  send(request: Message) {
297    this.sendClientStream(request);
298  }
299
300  /** Ends the client stream and waits for the RPC to complete. */
301  async finishAndWait(
302    requests: Array<Message> = [],
303    timeoutMs?: number
304  ): Promise<[Status, Array<Message>]> {
305    this.finishClientStream(requests);
306    return await this.streamWait(timeoutMs);
307  }
308}
309