• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2021 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15import {Message} from 'google-protobuf';
16import {Status} from '@pigweed/pw_status';
17
18import {Call} from './call';
19import {Channel, Method, Service} from './descriptors';
20import * as packets from './packets';
21
22/** Data class for a pending RPC call. */
23export class Rpc {
24  readonly channel: Channel;
25  readonly service: Service;
26  readonly method: Method;
27
28  constructor(channel: Channel, service: Service, method: Method) {
29    this.channel = channel;
30    this.service = service;
31    this.method = method;
32  }
33
34  /** Returns channel service method id tuple */
35  get idSet(): [number, number, number] {
36    return [this.channel.id, this.service.id, this.method.id];
37  }
38
39  /**
40   * Returns a string sequence to uniquely identify channel, service, and
41   * method. This can be used to hash the Rpc.
42   *
43   * For example: "12346789.23452345.12341234"
44   */
45  get idString(): string {
46    return `${this.channel.id}.${this.service.id}.${this.method.id}`;
47  }
48
49  toString(): string {
50    return (
51      `${this.service.name}.${this.method.name} on channel ` +
52      `${this.channel.id}`
53    );
54  }
55}
56
57/** Tracks pending RPCs and encodes outgoing RPC packets. */
58export class PendingCalls {
59  pending: Map<string, Call> = new Map();
60
61  /** Starts the provided RPC and returns the encoded packet to send. */
62  request(rpc: Rpc, request: Message, call: Call): Uint8Array {
63    this.open(rpc, call);
64    console.log(`Starting ${rpc}`);
65    return packets.encodeRequest(rpc.idSet, request);
66  }
67
68  /** Calls request and sends the resulting packet to the channel. */
69  sendRequest(
70    rpc: Rpc,
71    call: Call,
72    ignoreError: boolean,
73    request?: Message
74  ): Call | undefined {
75    const previous = this.open(rpc, call);
76    const packet = packets.encodeRequest(rpc.idSet, request);
77    try {
78      rpc.channel.send(packet);
79    } catch (error) {
80      if (!ignoreError) {
81        throw error;
82      }
83    }
84    return previous;
85  }
86
87  /**
88   * Creates a call for an RPC, but does not invoke it.
89   *
90   * open() can be used to receive streaming responses to an RPC that was not
91   * invoked by this client. For example, a server may stream logs with a
92   * server streaming RPC prior to any clients invoking it.
93   */
94  open(rpc: Rpc, call: Call): Call | undefined {
95    console.debug(`Starting ${rpc}`);
96    const previous = this.pending.get(rpc.idString);
97    this.pending.set(rpc.idString, call);
98    return previous;
99  }
100
101  sendClientStream(rpc: Rpc, message: Message) {
102    if (this.getPending(rpc) === undefined) {
103      throw new Error(`Attempt to send client stream for inactive RPC: ${rpc}`);
104    }
105    rpc.channel.send(packets.encodeClientStream(rpc.idSet, message));
106  }
107
108  sendClientStreamEnd(rpc: Rpc) {
109    if (this.getPending(rpc) === undefined) {
110      throw new Error(`Attempt to send client stream for inactive RPC: ${rpc}`);
111    }
112    rpc.channel.send(packets.encodeClientStreamEnd(rpc.idSet));
113  }
114
115  /** Cancels the RPC. Returns the CANCEL packet to send. */
116  cancel(rpc: Rpc): Uint8Array | undefined {
117    console.debug(`Cancelling ${rpc}`);
118    this.pending.delete(rpc.idString);
119    if (rpc.method.clientStreaming && rpc.method.serverStreaming) {
120      return undefined;
121    }
122    return packets.encodeCancel(rpc.idSet);
123  }
124
125  /** Calls cancel and sends the cancel packet, if any, to the channel. */
126  sendCancel(rpc: Rpc): boolean {
127    let packet: Uint8Array | undefined;
128    try {
129      packet = this.cancel(rpc);
130    } catch (err) {
131      return false;
132    }
133
134    if (packet !== undefined) {
135      rpc.channel.send(packet);
136    }
137    return true;
138  }
139
140  /** Gets the pending RPC's call. If status is set, clears the RPC. */
141  getPending(rpc: Rpc, status?: Status): Call | undefined {
142    if (status === undefined) {
143      return this.pending.get(rpc.idString);
144    }
145
146    const call = this.pending.get(rpc.idString);
147    this.pending.delete(rpc.idString);
148    return call;
149  }
150}
151