1// Copyright 2021 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15/** Provides a pw_rpc client for TypeScript. */ 16 17import {ProtoCollection} from '@pigweed/pw_protobuf_compiler'; 18import {Status} from '@pigweed/pw_status'; 19import {Message} from 'google-protobuf'; 20import { 21 PacketType, 22 RpcPacket, 23} from 'packet_proto_tspb/packet_proto_tspb_pb/pw_rpc/internal/packet_pb'; 24 25import {Channel, Service} from './descriptors'; 26import {MethodStub, methodStubFactory} from './method'; 27import * as packets from './packets'; 28import {PendingCalls, Rpc} from './rpc_classes'; 29 30/** 31 * Object for managing RPC service and contained methods. 32 */ 33export class ServiceClient { 34 private service: Service; 35 private methods: MethodStub[] = []; 36 private methodsByName = new Map<string, MethodStub>(); 37 38 constructor(client: Client, channel: Channel, service: Service) { 39 this.service = service; 40 const methods = service.methods; 41 methods.forEach(method => { 42 const stub = methodStubFactory(client.rpcs, channel, method); 43 this.methods.push(stub); 44 this.methodsByName.set(method.name, stub); 45 }); 46 } 47 48 method(methodName: string): MethodStub | undefined { 49 return this.methodsByName.get(methodName); 50 } 51 52 get id(): number { 53 return this.service.id; 54 } 55} 56 57/** 58 * Object for managing RPC channel and contained services. 59 */ 60export class ChannelClient { 61 readonly channel: Channel; 62 private services = new Map<string, ServiceClient>(); 63 64 constructor(client: Client, channel: Channel, services: Service[]) { 65 this.channel = channel; 66 services.forEach(service => { 67 const serviceClient = new ServiceClient(client, this.channel, service); 68 this.services.set(service.name, serviceClient); 69 }); 70 } 71 72 /** 73 * Find a service client via its full name. 74 * 75 * For example: 76 * `service = client.channel().service('the.package.FooService');` 77 */ 78 service(serviceName: string): ServiceClient | undefined { 79 return this.services.get(serviceName); 80 } 81 82 /** 83 * Find a method stub via its full name. 84 * 85 * For example: 86 * `method = client.channel().methodStub('the.package.AService.AMethod');` 87 */ 88 methodStub(name: string): MethodStub | undefined { 89 const index = name.lastIndexOf('.'); 90 if (index <= 0) { 91 console.error(`Malformed method name: ${name}`); 92 return undefined; 93 } 94 const serviceName = name.slice(0, index); 95 const methodName = name.slice(index + 1); 96 const method = this.service(serviceName)?.method(methodName); 97 if (method === undefined) { 98 console.error(`Method not found: ${name}`); 99 return undefined; 100 } 101 return method; 102 } 103} 104 105/** 106 * RPCs are invoked through a MethodStub. These can be found by name via 107 * methodStub(string name). 108 * 109 * ``` 110 * method = client.channel(1).methodStub('the.package.FooService.SomeMethod') 111 * call = method.invoke(request); 112 * ``` 113 */ 114export class Client { 115 private channelsById = new Map<number, ChannelClient>(); 116 readonly rpcs: PendingCalls; 117 readonly services = new Map<number, Service>(); 118 119 constructor(channels: Channel[], services: Service[]) { 120 this.rpcs = new PendingCalls(); 121 services.forEach(service => { 122 this.services.set(service.id, service); 123 }); 124 125 channels.forEach(channel => { 126 this.channelsById.set( 127 channel.id, 128 new ChannelClient(this, channel, services) 129 ); 130 }); 131 } 132 133 /** 134 * Creates a client from a set of Channels and a library of Protos. 135 * 136 * @param {Channel[]} channels List of possible channels to use. 137 * @param {ProtoCollection} protoSet ProtoCollection containing protos 138 * defining RPC services 139 * and methods. 140 */ 141 static fromProtoSet(channels: Channel[], protoSet: ProtoCollection): Client { 142 let services: Service[] = []; 143 const descriptors = protoSet.fileDescriptorSet.getFileList(); 144 descriptors.forEach(fileDescriptor => { 145 const packageName = fileDescriptor.getPackage()!; 146 fileDescriptor.getServiceList().forEach(serviceDescriptor => { 147 services = services.concat( 148 new Service(serviceDescriptor, protoSet, packageName) 149 ); 150 }); 151 }); 152 153 return new Client(channels, services); 154 } 155 156 /** 157 * Finds the channel with the provided id. Returns undefined if there are no 158 * channels or no channel with a matching id. 159 * 160 * @param {number?} id If no id is specified, returns the first channel. 161 */ 162 channel(id?: number): ChannelClient | undefined { 163 if (id === undefined) { 164 return this.channelsById.values().next().value; 165 } 166 return this.channelsById.get(id); 167 } 168 169 /** 170 * Creates a new RPC object holding channel, method, and service info. 171 * Returns undefined if the service or method does not exist. 172 */ 173 private rpc( 174 packet: RpcPacket, 175 channelClient: ChannelClient 176 ): Rpc | undefined { 177 const service = this.services.get(packet.getServiceId()); 178 if (service == undefined) { 179 return undefined; 180 } 181 const method = service.methods.get(packet.getMethodId()); 182 if (method == undefined) { 183 return undefined; 184 } 185 return new Rpc(channelClient.channel, service, method); 186 } 187 188 private decodeStatus(rpc: Rpc, packet: RpcPacket): Status | undefined { 189 if (packet.getType() === PacketType.SERVER_STREAM) { 190 return; 191 } 192 return packet.getStatus(); 193 } 194 195 private decodePayload(rpc: Rpc, packet: RpcPacket): Message | undefined { 196 if (packet.getType() === PacketType.SERVER_ERROR) { 197 return undefined; 198 } 199 200 if ( 201 packet.getType() === PacketType.RESPONSE && 202 rpc.method.serverStreaming 203 ) { 204 return undefined; 205 } 206 207 const payload = packet.getPayload_asU8(); 208 return packets.decodePayload(payload, rpc.method.responseType); 209 } 210 211 private sendClientError( 212 client: ChannelClient, 213 packet: RpcPacket, 214 error: Status 215 ) { 216 client.channel.send(packets.encodeClientError(packet, error)); 217 } 218 219 /** 220 * Processes an incoming packet. 221 * 222 * @param {Uint8Array} rawPacketData binary data for a pw_rpc packet. 223 * @return {Status} The status of processing the packet. 224 * - OK: the packet was processed by the client 225 * - DATA_LOSS: the packet could not be decoded 226 * - INVALID_ARGUMENT: the packet is for a server, not a client 227 * - NOT_FOUND: the packet's channel ID is not known to this client 228 */ 229 processPacket(rawPacketData: Uint8Array): Status { 230 let packet; 231 try { 232 packet = packets.decode(rawPacketData); 233 } catch (err) { 234 console.warn(`Failed to decode packet: ${err}`); 235 console.debug(`Raw packet: ${rawPacketData}`); 236 return Status.DATA_LOSS; 237 } 238 239 if (packets.forServer(packet)) { 240 return Status.INVALID_ARGUMENT; 241 } 242 243 const channelClient = this.channelsById.get(packet.getChannelId()); 244 if (channelClient == undefined) { 245 console.warn(`Unrecognized channel ID: ${packet.getChannelId()}`); 246 return Status.NOT_FOUND; 247 } 248 249 const rpc = this.rpc(packet, channelClient); 250 if (rpc == undefined) { 251 this.sendClientError(channelClient, packet, Status.NOT_FOUND); 252 console.warn('rpc service/method not found'); 253 return Status.OK; 254 } 255 256 if ( 257 packet.getType() !== PacketType.RESPONSE && 258 packet.getType() !== PacketType.SERVER_STREAM && 259 packet.getType() !== PacketType.SERVER_ERROR 260 ) { 261 console.error(`${rpc}: Unexpected packet type ${packet.getType()}`); 262 console.debug(`Packet: ${packet}`); 263 return Status.OK; 264 } 265 266 let status = this.decodeStatus(rpc, packet); 267 let payload; 268 try { 269 payload = this.decodePayload(rpc, packet); 270 } catch (error) { 271 this.sendClientError(channelClient, packet, Status.DATA_LOSS); 272 console.warn(`Failed to decode response: ${error}`); 273 console.debug(`Raw payload: ${packet.getPayload()}`); 274 275 // Make this an error packet so the error handler is called. 276 packet.setType(PacketType.SERVER_ERROR); 277 status = Status.DATA_LOSS; 278 } 279 280 const call = this.rpcs.getPending(rpc, status); 281 if (call === undefined) { 282 this.sendClientError(channelClient, packet, Status.FAILED_PRECONDITION); 283 console.debug(`Discarding response for ${rpc}, which is not pending`); 284 return Status.OK; 285 } 286 287 if (packet.getType() === PacketType.SERVER_ERROR) { 288 if (status === Status.OK) { 289 throw 'Unexpected OK status on SERVER_ERROR'; 290 } 291 if (status === undefined) { 292 throw 'Missing status on SERVER_ERROR'; 293 } 294 console.warn(`${rpc}: invocation failed with status: ${Status[status]}`); 295 call.handleError(status); 296 return Status.OK; 297 } 298 299 if (payload !== undefined) { 300 call.handleResponse(payload); 301 } 302 if (status !== undefined) { 303 call.handleCompletion(status); 304 } 305 return Status.OK; 306 } 307} 308