• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2021 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15/** Provides a pw_rpc client for TypeScript. */
16
17import {ProtoCollection} from '@pigweed/pw_protobuf_compiler';
18import {Status} from '@pigweed/pw_status';
19import {Message} from 'google-protobuf';
20import {
21  PacketType,
22  RpcPacket,
23} from 'packet_proto_tspb/packet_proto_tspb_pb/pw_rpc/internal/packet_pb';
24
25import {Channel, Service} from './descriptors';
26import {MethodStub, methodStubFactory} from './method';
27import * as packets from './packets';
28import {PendingCalls, Rpc} from './rpc_classes';
29
30/**
31 * Object for managing RPC service and contained methods.
32 */
33export class ServiceClient {
34  private service: Service;
35  private methods: MethodStub[] = [];
36  private methodsByName = new Map<string, MethodStub>();
37
38  constructor(client: Client, channel: Channel, service: Service) {
39    this.service = service;
40    const methods = service.methods;
41    methods.forEach(method => {
42      const stub = methodStubFactory(client.rpcs, channel, method);
43      this.methods.push(stub);
44      this.methodsByName.set(method.name, stub);
45    });
46  }
47
48  method(methodName: string): MethodStub | undefined {
49    return this.methodsByName.get(methodName);
50  }
51
52  get id(): number {
53    return this.service.id;
54  }
55}
56
57/**
58 * Object for managing RPC channel and contained services.
59 */
60export class ChannelClient {
61  readonly channel: Channel;
62  private services = new Map<string, ServiceClient>();
63
64  constructor(client: Client, channel: Channel, services: Service[]) {
65    this.channel = channel;
66    services.forEach(service => {
67      const serviceClient = new ServiceClient(client, this.channel, service);
68      this.services.set(service.name, serviceClient);
69    });
70  }
71
72  /**
73   * Find a service client via its full name.
74   *
75   * For example:
76   * `service = client.channel().service('the.package.FooService');`
77   */
78  service(serviceName: string): ServiceClient | undefined {
79    return this.services.get(serviceName);
80  }
81
82  /**
83   * Find a method stub via its full name.
84   *
85   * For example:
86   * `method = client.channel().methodStub('the.package.AService.AMethod');`
87   */
88  methodStub(name: string): MethodStub | undefined {
89    const index = name.lastIndexOf('.');
90    if (index <= 0) {
91      console.error(`Malformed method name: ${name}`);
92      return undefined;
93    }
94    const serviceName = name.slice(0, index);
95    const methodName = name.slice(index + 1);
96    const method = this.service(serviceName)?.method(methodName);
97    if (method === undefined) {
98      console.error(`Method not found: ${name}`);
99      return undefined;
100    }
101    return method;
102  }
103}
104
105/**
106 * RPCs are invoked through a MethodStub. These can be found by name via
107 * methodStub(string name).
108 *
109 * ```
110 * method = client.channel(1).methodStub('the.package.FooService.SomeMethod')
111 * call = method.invoke(request);
112 * ```
113 */
114export class Client {
115  private channelsById = new Map<number, ChannelClient>();
116  readonly rpcs: PendingCalls;
117  readonly services = new Map<number, Service>();
118
119  constructor(channels: Channel[], services: Service[]) {
120    this.rpcs = new PendingCalls();
121    services.forEach(service => {
122      this.services.set(service.id, service);
123    });
124
125    channels.forEach(channel => {
126      this.channelsById.set(
127        channel.id,
128        new ChannelClient(this, channel, services)
129      );
130    });
131  }
132
133  /**
134   * Creates a client from a set of Channels and a library of Protos.
135   *
136   * @param {Channel[]} channels List of possible channels to use.
137   * @param {ProtoCollection} protoSet ProtoCollection containing protos
138   *     defining RPC services
139   * and methods.
140   */
141  static fromProtoSet(channels: Channel[], protoSet: ProtoCollection): Client {
142    let services: Service[] = [];
143    const descriptors = protoSet.fileDescriptorSet.getFileList();
144    descriptors.forEach(fileDescriptor => {
145      const packageName = fileDescriptor.getPackage()!;
146      fileDescriptor.getServiceList().forEach(serviceDescriptor => {
147        services = services.concat(
148          new Service(serviceDescriptor, protoSet, packageName)
149        );
150      });
151    });
152
153    return new Client(channels, services);
154  }
155
156  /**
157   * Finds the channel with the provided id. Returns undefined if there are no
158   * channels or no channel with a matching id.
159   *
160   * @param {number?} id If no id is specified, returns the first channel.
161   */
162  channel(id?: number): ChannelClient | undefined {
163    if (id === undefined) {
164      return this.channelsById.values().next().value;
165    }
166    return this.channelsById.get(id);
167  }
168
169  /**
170   * Creates a new RPC object holding channel, method, and service info.
171   * Returns undefined if the service or method does not exist.
172   */
173  private rpc(
174    packet: RpcPacket,
175    channelClient: ChannelClient
176  ): Rpc | undefined {
177    const service = this.services.get(packet.getServiceId());
178    if (service == undefined) {
179      return undefined;
180    }
181    const method = service.methods.get(packet.getMethodId());
182    if (method == undefined) {
183      return undefined;
184    }
185    return new Rpc(channelClient.channel, service, method);
186  }
187
188  private decodeStatus(rpc: Rpc, packet: RpcPacket): Status | undefined {
189    if (packet.getType() === PacketType.SERVER_STREAM) {
190      return;
191    }
192    return packet.getStatus();
193  }
194
195  private decodePayload(rpc: Rpc, packet: RpcPacket): Message | undefined {
196    if (packet.getType() === PacketType.SERVER_ERROR) {
197      return undefined;
198    }
199
200    if (
201      packet.getType() === PacketType.RESPONSE &&
202      rpc.method.serverStreaming
203    ) {
204      return undefined;
205    }
206
207    const payload = packet.getPayload_asU8();
208    return packets.decodePayload(payload, rpc.method.responseType);
209  }
210
211  private sendClientError(
212    client: ChannelClient,
213    packet: RpcPacket,
214    error: Status
215  ) {
216    client.channel.send(packets.encodeClientError(packet, error));
217  }
218
219  /**
220   * Processes an incoming packet.
221   *
222   * @param {Uint8Array} rawPacketData binary data for a pw_rpc packet.
223   * @return {Status} The status of processing the packet.
224   *    - OK: the packet was processed by the client
225   *    - DATA_LOSS: the packet could not be decoded
226   *    - INVALID_ARGUMENT: the packet is for a server, not a client
227   *    - NOT_FOUND: the packet's channel ID is not known to this client
228   */
229  processPacket(rawPacketData: Uint8Array): Status {
230    let packet;
231    try {
232      packet = packets.decode(rawPacketData);
233    } catch (err) {
234      console.warn(`Failed to decode packet: ${err}`);
235      console.debug(`Raw packet: ${rawPacketData}`);
236      return Status.DATA_LOSS;
237    }
238
239    if (packets.forServer(packet)) {
240      return Status.INVALID_ARGUMENT;
241    }
242
243    const channelClient = this.channelsById.get(packet.getChannelId());
244    if (channelClient == undefined) {
245      console.warn(`Unrecognized channel ID: ${packet.getChannelId()}`);
246      return Status.NOT_FOUND;
247    }
248
249    const rpc = this.rpc(packet, channelClient);
250    if (rpc == undefined) {
251      this.sendClientError(channelClient, packet, Status.NOT_FOUND);
252      console.warn('rpc service/method not found');
253      return Status.OK;
254    }
255
256    if (
257      packet.getType() !== PacketType.RESPONSE &&
258      packet.getType() !== PacketType.SERVER_STREAM &&
259      packet.getType() !== PacketType.SERVER_ERROR
260    ) {
261      console.error(`${rpc}: Unexpected packet type ${packet.getType()}`);
262      console.debug(`Packet: ${packet}`);
263      return Status.OK;
264    }
265
266    let status = this.decodeStatus(rpc, packet);
267    let payload;
268    try {
269      payload = this.decodePayload(rpc, packet);
270    } catch (error) {
271      this.sendClientError(channelClient, packet, Status.DATA_LOSS);
272      console.warn(`Failed to decode response: ${error}`);
273      console.debug(`Raw payload: ${packet.getPayload()}`);
274
275      // Make this an error packet so the error handler is called.
276      packet.setType(PacketType.SERVER_ERROR);
277      status = Status.DATA_LOSS;
278    }
279
280    const call = this.rpcs.getPending(rpc, status);
281    if (call === undefined) {
282      this.sendClientError(channelClient, packet, Status.FAILED_PRECONDITION);
283      console.debug(`Discarding response for ${rpc}, which is not pending`);
284      return Status.OK;
285    }
286
287    if (packet.getType() === PacketType.SERVER_ERROR) {
288      if (status === Status.OK) {
289        throw 'Unexpected OK status on SERVER_ERROR';
290      }
291      if (status === undefined) {
292        throw 'Missing status on SERVER_ERROR';
293      }
294      console.warn(`${rpc}: invocation failed with status: ${Status[status]}`);
295      call.handleError(status);
296      return Status.OK;
297    }
298
299    if (payload !== undefined) {
300      call.handleResponse(payload);
301    }
302    if (status !== undefined) {
303      call.handleCompletion(status);
304    }
305    return Status.OK;
306  }
307}
308