1// Copyright 2021 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15import {Message} from 'google-protobuf'; 16import {Status} from '@pigweed/pw_status'; 17 18import {Call} from './call'; 19import {Channel, Method, Service} from './descriptors'; 20import * as packets from './packets'; 21 22/** Data class for a pending RPC call. */ 23export class Rpc { 24 readonly channel: Channel; 25 readonly service: Service; 26 readonly method: Method; 27 28 constructor(channel: Channel, service: Service, method: Method) { 29 this.channel = channel; 30 this.service = service; 31 this.method = method; 32 } 33 34 /** Returns channel service method id tuple */ 35 get idSet(): [number, number, number] { 36 return [this.channel.id, this.service.id, this.method.id]; 37 } 38 39 /** 40 * Returns a string sequence to uniquely identify channel, service, and 41 * method. This can be used to hash the Rpc. 42 * 43 * For example: "12346789.23452345.12341234" 44 */ 45 get idString(): string { 46 return `${this.channel.id}.${this.service.id}.${this.method.id}`; 47 } 48 49 toString(): string { 50 return ( 51 `${this.service.name}.${this.method.name} on channel ` + 52 `${this.channel.id}` 53 ); 54 } 55} 56 57/** Tracks pending RPCs and encodes outgoing RPC packets. */ 58export class PendingCalls { 59 pending: Map<string, Call> = new Map(); 60 61 /** Starts the provided RPC and returns the encoded packet to send. */ 62 request(rpc: Rpc, request: Message, call: Call): Uint8Array { 63 this.open(rpc, call); 64 console.log(`Starting ${rpc}`); 65 return packets.encodeRequest(rpc.idSet, request); 66 } 67 68 /** Calls request and sends the resulting packet to the channel. */ 69 sendRequest( 70 rpc: Rpc, 71 call: Call, 72 ignoreError: boolean, 73 request?: Message 74 ): Call | undefined { 75 const previous = this.open(rpc, call); 76 const packet = packets.encodeRequest(rpc.idSet, request); 77 try { 78 rpc.channel.send(packet); 79 } catch (error) { 80 if (!ignoreError) { 81 throw error; 82 } 83 } 84 return previous; 85 } 86 87 /** 88 * Creates a call for an RPC, but does not invoke it. 89 * 90 * open() can be used to receive streaming responses to an RPC that was not 91 * invoked by this client. For example, a server may stream logs with a 92 * server streaming RPC prior to any clients invoking it. 93 */ 94 open(rpc: Rpc, call: Call): Call | undefined { 95 console.debug(`Starting ${rpc}`); 96 const previous = this.pending.get(rpc.idString); 97 this.pending.set(rpc.idString, call); 98 return previous; 99 } 100 101 sendClientStream(rpc: Rpc, message: Message) { 102 if (this.getPending(rpc) === undefined) { 103 throw new Error(`Attempt to send client stream for inactive RPC: ${rpc}`); 104 } 105 rpc.channel.send(packets.encodeClientStream(rpc.idSet, message)); 106 } 107 108 sendClientStreamEnd(rpc: Rpc) { 109 if (this.getPending(rpc) === undefined) { 110 throw new Error(`Attempt to send client stream for inactive RPC: ${rpc}`); 111 } 112 rpc.channel.send(packets.encodeClientStreamEnd(rpc.idSet)); 113 } 114 115 /** Cancels the RPC. Returns the CANCEL packet to send. */ 116 cancel(rpc: Rpc): Uint8Array | undefined { 117 console.debug(`Cancelling ${rpc}`); 118 this.pending.delete(rpc.idString); 119 if (rpc.method.clientStreaming && rpc.method.serverStreaming) { 120 return undefined; 121 } 122 return packets.encodeCancel(rpc.idSet); 123 } 124 125 /** Calls cancel and sends the cancel packet, if any, to the channel. */ 126 sendCancel(rpc: Rpc): boolean { 127 let packet: Uint8Array | undefined; 128 try { 129 packet = this.cancel(rpc); 130 } catch (err) { 131 return false; 132 } 133 134 if (packet !== undefined) { 135 rpc.channel.send(packet); 136 } 137 return true; 138 } 139 140 /** Gets the pending RPC's call. If status is set, clears the RPC. */ 141 getPending(rpc: Rpc, status?: Status): Call | undefined { 142 if (status === undefined) { 143 return this.pending.get(rpc.idString); 144 } 145 146 const call = this.pending.get(rpc.idString); 147 this.pending.delete(rpc.idString); 148 return call; 149 } 150} 151