1// Copyright 2021 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15import {Status} from '@pigweed/pw_status'; 16import {Message} from 'google-protobuf'; 17 18import { 19 BidirectionalStreamingCall, 20 Call, 21 Callback, 22 ClientStreamingCall, 23 ServerStreamingCall, 24 UnaryCall, 25} from './call'; 26import {Channel, Method, MethodType, Service} from './descriptors'; 27import {PendingCalls, Rpc} from './rpc_classes'; 28 29export function methodStubFactory( 30 rpcs: PendingCalls, 31 channel: Channel, 32 method: Method 33): MethodStub { 34 switch (method.type) { 35 case MethodType.BIDIRECTIONAL_STREAMING: 36 return new BidirectionalStreamingMethodStub(rpcs, channel, method); 37 case MethodType.CLIENT_STREAMING: 38 return new ClientStreamingMethodStub(rpcs, channel, method); 39 case MethodType.SERVER_STREAMING: 40 return new ServerStreamingMethodStub(rpcs, channel, method); 41 case MethodType.UNARY: 42 return new UnaryMethodStub(rpcs, channel, method); 43 } 44} 45 46export abstract class MethodStub { 47 readonly method: Method; 48 readonly rpcs: PendingCalls; 49 readonly rpc: Rpc; 50 private channel: Channel; 51 52 constructor(rpcs: PendingCalls, channel: Channel, method: Method) { 53 this.method = method; 54 this.rpcs = rpcs; 55 this.channel = channel; 56 this.rpc = new Rpc(channel, method.service, method); 57 } 58 59 get id(): number { 60 return this.method.id; 61 } 62} 63 64export class UnaryMethodStub extends MethodStub { 65 invoke( 66 request: Message, 67 onNext: Callback = () => {}, 68 onCompleted: Callback = () => {}, 69 onError: Callback = () => {} 70 ): UnaryCall { 71 const call = new UnaryCall( 72 this.rpcs, 73 this.rpc, 74 onNext, 75 onCompleted, 76 onError 77 ); 78 call.invoke(request); 79 return call; 80 } 81 82 open( 83 request: Message, 84 onNext: Callback = () => {}, 85 onCompleted: Callback = () => {}, 86 onError: Callback = () => {} 87 ): UnaryCall { 88 const call = new UnaryCall( 89 this.rpcs, 90 this.rpc, 91 onNext, 92 onCompleted, 93 onError 94 ); 95 call.invoke(request, true); 96 return call; 97 } 98 99 async call(request: Message, timeout?: number): Promise<[Status, Message]> { 100 return await this.invoke(request).complete(timeout); 101 } 102} 103 104export class ServerStreamingMethodStub extends MethodStub { 105 invoke( 106 request?: Message, 107 onNext: Callback = () => {}, 108 onCompleted: Callback = () => {}, 109 onError: Callback = () => {} 110 ): ServerStreamingCall { 111 const call = new ServerStreamingCall( 112 this.rpcs, 113 this.rpc, 114 onNext, 115 onCompleted, 116 onError 117 ); 118 call.invoke(request); 119 return call; 120 } 121 122 open( 123 request: Message, 124 onNext: Callback = () => {}, 125 onCompleted: Callback = () => {}, 126 onError: Callback = () => {} 127 ): UnaryCall { 128 const call = new UnaryCall( 129 this.rpcs, 130 this.rpc, 131 onNext, 132 onCompleted, 133 onError 134 ); 135 call.invoke(request, true); 136 return call; 137 } 138 139 call(request?: Message, timeout?: number): Promise<[Status, Message[]]> { 140 return this.invoke(request).complete(timeout); 141 } 142} 143 144export class ClientStreamingMethodStub extends MethodStub { 145 invoke( 146 onNext: Callback = () => {}, 147 onCompleted: Callback = () => {}, 148 onError: Callback = () => {} 149 ): ClientStreamingCall { 150 const call = new ClientStreamingCall( 151 this.rpcs, 152 this.rpc, 153 onNext, 154 onCompleted, 155 onError 156 ); 157 call.invoke(); 158 return call; 159 } 160 161 open( 162 onNext: Callback = () => {}, 163 onCompleted: Callback = () => {}, 164 onError: Callback = () => {} 165 ): ClientStreamingCall { 166 const call = new ClientStreamingCall( 167 this.rpcs, 168 this.rpc, 169 onNext, 170 onCompleted, 171 onError 172 ); 173 call.invoke(undefined, true); 174 return call; 175 } 176 177 async call(requests: Array<Message> = [], timeout?: number) { 178 return this.invoke().finishAndWait(requests, timeout); 179 } 180} 181 182export class BidirectionalStreamingMethodStub extends MethodStub { 183 invoke( 184 onNext: Callback = () => {}, 185 onCompleted: Callback = () => {}, 186 onError: Callback = () => {} 187 ): BidirectionalStreamingCall { 188 const call = new BidirectionalStreamingCall( 189 this.rpcs, 190 this.rpc, 191 onNext, 192 onCompleted, 193 onError 194 ); 195 call.invoke(); 196 return call; 197 } 198 199 open( 200 onNext: Callback = () => {}, 201 onCompleted: Callback = () => {}, 202 onError: Callback = () => {} 203 ): BidirectionalStreamingCall { 204 const call = new BidirectionalStreamingCall( 205 this.rpcs, 206 this.rpc, 207 onNext, 208 onCompleted, 209 onError 210 ); 211 call.invoke(undefined, true); 212 return call; 213 } 214 215 async call(requests: Array<Message> = [], timeout?: number) { 216 return this.invoke().finishAndWait(requests, timeout); 217 } 218} 219