1// Copyright 2021 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15import {Status} from '@pigweed/pw_status'; 16import {Message} from 'google-protobuf'; 17 18import WaitQueue = require('wait-queue'); 19 20import {PendingCalls, Rpc} from './rpc_classes'; 21 22export type Callback = (a: any) => any; 23 24class RpcError extends Error { 25 status: Status; 26 27 constructor(rpc: Rpc, status: Status) { 28 let message = ''; 29 if (status === Status.NOT_FOUND) { 30 message = ': the RPC server does not support this RPC'; 31 } else if (status === Status.DATA_LOSS) { 32 message = ': an error occurred while decoding the RPC payload'; 33 } 34 35 super(`${rpc.method.name} failed with error ${Status[status]}${message}`); 36 this.status = status; 37 } 38} 39 40class RpcTimeout extends Error { 41 readonly rpc: Rpc; 42 readonly timeoutMs: number; 43 44 constructor(rpc: Rpc, timeoutMs: number) { 45 super(`${rpc.method.name} timed out after ${timeoutMs} ms`); 46 this.rpc = rpc; 47 this.timeoutMs = timeoutMs; 48 } 49} 50 51/** Represent an in-progress or completed RPC call. */ 52export class Call { 53 // Responses ordered by arrival time. Undefined signifies stream completion. 54 private responseQueue = new WaitQueue<Message | undefined>(); 55 protected responses: Message[] = []; 56 57 private rpcs: PendingCalls; 58 private rpc: Rpc; 59 60 private onNext: Callback; 61 private onCompleted: Callback; 62 private onError: Callback; 63 64 status?: Status; 65 error?: Status; 66 callbackException?: Error; 67 68 constructor( 69 rpcs: PendingCalls, 70 rpc: Rpc, 71 onNext: Callback, 72 onCompleted: Callback, 73 onError: Callback 74 ) { 75 this.rpcs = rpcs; 76 this.rpc = rpc; 77 78 this.onNext = onNext; 79 this.onCompleted = onCompleted; 80 this.onError = onError; 81 } 82 83 /* Calls the RPC. This must be called immediately after construction. */ 84 invoke(request?: Message, ignoreErrors = false): void { 85 const previous = this.rpcs.sendRequest( 86 this.rpc, 87 this, 88 ignoreErrors, 89 request 90 ); 91 92 if (previous !== undefined && !previous.completed) { 93 previous.handleError(Status.CANCELLED); 94 } 95 } 96 97 get completed(): boolean { 98 return this.status !== undefined || this.error !== undefined; 99 } 100 101 private invokeCallback(func: () => {}) { 102 try { 103 func(); 104 } catch (err: unknown) { 105 if (err instanceof Error) { 106 console.error( 107 `An exception was raised while invoking a callback: ${err}` 108 ); 109 this.callbackException = err; 110 } 111 console.error(`Unexpected item thrown while invoking callback: ${err}`); 112 } 113 } 114 115 handleResponse(response: Message): void { 116 this.responses.push(response); 117 this.responseQueue.push(response); 118 this.invokeCallback(() => this.onNext(response)); 119 } 120 121 handleCompletion(status: Status) { 122 this.status = status; 123 this.responseQueue.push(undefined); 124 this.invokeCallback(() => this.onCompleted(status)); 125 } 126 127 handleError(error: Status): void { 128 this.error = error; 129 this.responseQueue.push(undefined); 130 this.invokeCallback(() => this.onError(error)); 131 } 132 133 private async queuePopWithTimeout( 134 timeoutMs: number 135 ): Promise<Message | undefined> { 136 return new Promise(async (resolve, reject) => { 137 let timeoutExpired = false; 138 const timeoutWatcher = setTimeout(() => { 139 timeoutExpired = true; 140 reject(new RpcTimeout(this.rpc, timeoutMs)); 141 }, timeoutMs); 142 const response = await this.responseQueue.shift(); 143 if (timeoutExpired) { 144 this.responseQueue.unshift(response); 145 return; 146 } 147 clearTimeout(timeoutWatcher); 148 resolve(response); 149 }); 150 } 151 152 /** 153 * Yields responses up the specified count as they are added. 154 * 155 * Throws an error as soon as it is received even if there are still 156 * responses in the queue. 157 * 158 * Usage 159 * ``` 160 * for await (const response of call.getResponses(5)) { 161 * console.log(response); 162 * } 163 * ``` 164 * 165 * @param {number} count The number of responses to read before returning. 166 * If no value is specified, getResponses will block until the stream 167 * either ends or hits an error. 168 * @param {number} timeout The number of milliseconds to wait for a response 169 * before throwing an error. 170 */ 171 async *getResponses( 172 count?: number, 173 timeoutMs?: number 174 ): AsyncGenerator<Message> { 175 this.checkErrors(); 176 177 if (this.completed && this.responseQueue.length == 0) { 178 return; 179 } 180 181 let remaining = count ?? Number.POSITIVE_INFINITY; 182 while (remaining > 0) { 183 const response = 184 timeoutMs === undefined 185 ? await this.responseQueue.shift() 186 : await this.queuePopWithTimeout(timeoutMs!); 187 this.checkErrors(); 188 if (response === undefined) { 189 return; 190 } 191 yield response!; 192 remaining -= 1; 193 } 194 } 195 196 cancel(): boolean { 197 if (this.completed) { 198 return false; 199 } 200 201 this.error = Status.CANCELLED; 202 return this.rpcs.sendCancel(this.rpc); 203 } 204 205 private checkErrors(): void { 206 if (this.callbackException !== undefined) { 207 throw this.callbackException; 208 } 209 if (this.error !== undefined) { 210 throw new RpcError(this.rpc, this.error); 211 } 212 } 213 214 protected async unaryWait(timeoutMs?: number): Promise<[Status, Message]> { 215 for await (const response of this.getResponses(1, timeoutMs)) { 216 } 217 if (this.status === undefined) { 218 throw Error('Unexpected undefined status at end of stream'); 219 } 220 if (this.responses.length !== 1) { 221 throw Error(`Unexpected number of responses: ${this.responses.length}`); 222 } 223 return [this.status!, this.responses[0]]; 224 } 225 226 protected async streamWait(timeoutMs?: number): Promise<[Status, Message[]]> { 227 for await (const response of this.getResponses(undefined, timeoutMs)) { 228 } 229 if (this.status === undefined) { 230 throw Error('Unexpected undefined status at end of stream'); 231 } 232 return [this.status!, this.responses]; 233 } 234 235 protected sendClientStream(request: Message) { 236 this.checkErrors(); 237 if (this.status !== undefined) { 238 throw new RpcError(this.rpc, Status.FAILED_PRECONDITION); 239 } 240 this.rpcs.sendClientStream(this.rpc, request); 241 } 242 243 protected finishClientStream(requests: Message[]) { 244 for (const request of requests) { 245 this.sendClientStream(request); 246 } 247 248 if (!this.completed) { 249 this.rpcs.sendClientStreamEnd(this.rpc); 250 } 251 } 252} 253 254/** Tracks the state of a unary RPC call. */ 255export class UnaryCall extends Call { 256 /** Awaits the server response */ 257 async complete(timeoutMs?: number): Promise<[Status, Message]> { 258 return await this.unaryWait(timeoutMs); 259 } 260} 261 262/** Tracks the state of a client streaming RPC call. */ 263export class ClientStreamingCall extends Call { 264 /** Gets the last server message, if it exists */ 265 get response(): Message | undefined { 266 return this.responses.length > 0 267 ? this.responses[this.responses.length - 1] 268 : undefined; 269 } 270 271 /** Sends a message from the client. */ 272 send(request: Message) { 273 this.sendClientStream(request); 274 } 275 276 /** Ends the client stream and waits for the RPC to complete. */ 277 async finishAndWait( 278 requests: Message[] = [], 279 timeoutMs?: number 280 ): Promise<[Status, Message[]]> { 281 this.finishClientStream(requests); 282 return await this.streamWait(timeoutMs); 283 } 284} 285 286/** Tracks the state of a server streaming RPC call. */ 287export class ServerStreamingCall extends Call { 288 complete(timeoutMs?: number): Promise<[Status, Message[]]> { 289 return this.streamWait(timeoutMs); 290 } 291} 292 293/** Tracks the state of a bidirectional streaming RPC call. */ 294export class BidirectionalStreamingCall extends Call { 295 /** Sends a message from the client. */ 296 send(request: Message) { 297 this.sendClientStream(request); 298 } 299 300 /** Ends the client stream and waits for the RPC to complete. */ 301 async finishAndWait( 302 requests: Array<Message> = [], 303 timeoutMs?: number 304 ): Promise<[Status, Array<Message>]> { 305 this.finishClientStream(requests); 306 return await this.streamWait(timeoutMs); 307 } 308} 309