• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#import "ProtoRPC.h"
20
21#if GPB_USE_PROTOBUF_FRAMEWORK_IMPORTS
22#import <Protobuf/GPBProtocolBuffers.h>
23#else
24#import <GPBProtocolBuffers.h>
25#endif
26#import <GRPCClient/GRPCCall.h>
27#import <RxLibrary/GRXWriteable.h>
28#import <RxLibrary/GRXWriter+Transformations.h>
29
30@implementation GRPCUnaryResponseHandler {
31  void (^_responseHandler)(id, NSError *);
32  dispatch_queue_t _responseDispatchQueue;
33
34  GPBMessage *_message;
35}
36
37- (nullable instancetype)initWithResponseHandler:(void (^)(id, NSError *))handler
38                           responseDispatchQueue:(dispatch_queue_t)dispatchQueue {
39  if ((self = [super init])) {
40    _responseHandler = handler;
41    if (dispatchQueue == nil) {
42      _responseDispatchQueue = dispatch_get_main_queue();
43    } else {
44      _responseDispatchQueue = dispatchQueue;
45    }
46  }
47  return self;
48}
49
50// Implements GRPCProtoResponseHandler
51- (dispatch_queue_t)dispatchQueue {
52  return _responseDispatchQueue;
53}
54
55- (void)didReceiveInitialMetadata:(NSDictionary *)initialMetadata {
56  _responseHeaders = [initialMetadata copy];
57}
58
59- (void)didReceiveProtoMessage:(GPBMessage *)message {
60  _message = message;
61}
62
63- (void)didCloseWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error {
64  _responseTrailers = [trailingMetadata copy];
65  GPBMessage *message = _message;
66  _message = nil;
67  _responseHandler(message, error);
68}
69
70// Intentional no-op since flow control is N/A in a unary call
71- (void)didWriteMessage {
72}
73
74@end
75
76@implementation GRPCUnaryProtoCall {
77  GRPCStreamingProtoCall *_call;
78  GPBMessage *_message;
79}
80
81- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
82                               message:(GPBMessage *)message
83                       responseHandler:(id<GRPCProtoResponseHandler>)handler
84                           callOptions:(GRPCCallOptions *)callOptions
85                         responseClass:(Class)responseClass {
86  NSAssert(message != nil, @"message cannot be empty.");
87  NSAssert(responseClass != nil, @"responseClass cannot be empty.");
88  if (message == nil || responseClass == nil) {
89    return nil;
90  }
91  if ((self = [super init])) {
92    _call = [[GRPCStreamingProtoCall alloc] initWithRequestOptions:requestOptions
93                                                   responseHandler:handler
94                                                       callOptions:callOptions
95                                                     responseClass:responseClass];
96    _message = [message copy];
97  }
98  return self;
99}
100
101- (void)start {
102  [_call start];
103  [_call receiveNextMessage];
104  [_call writeMessage:_message];
105  [_call finish];
106}
107
108- (void)cancel {
109  [_call cancel];
110}
111
112@end
113
114@interface GRPCStreamingProtoCall () <GRPCResponseHandler>
115
116@end
117
118@implementation GRPCStreamingProtoCall {
119  GRPCRequestOptions *_requestOptions;
120  id<GRPCProtoResponseHandler> _handler;
121  GRPCCallOptions *_callOptions;
122  Class _responseClass;
123
124  GRPCCall2 *_call;
125  dispatch_queue_t _dispatchQueue;
126}
127
128- (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
129                       responseHandler:(id<GRPCProtoResponseHandler>)handler
130                           callOptions:(GRPCCallOptions *)callOptions
131                         responseClass:(Class)responseClass {
132  NSAssert(requestOptions.host.length != 0 && requestOptions.path.length != 0 &&
133               requestOptions.safety <= GRPCCallSafetyCacheableRequest,
134           @"Invalid callOptions.");
135  NSAssert(handler != nil, @"handler cannot be empty.");
136  if (requestOptions.host.length == 0 || requestOptions.path.length == 0 ||
137      requestOptions.safety > GRPCCallSafetyCacheableRequest) {
138    return nil;
139  }
140  if (handler == nil) {
141    return nil;
142  }
143
144  if ((self = [super init])) {
145    _requestOptions = [requestOptions copy];
146    _handler = handler;
147    _callOptions = [callOptions copy];
148    _responseClass = responseClass;
149
150    // Set queue QoS only when iOS version is 8.0 or above and Xcode version is 9.0 or above
151#if __IPHONE_OS_VERSION_MAX_ALLOWED < 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED < 101300
152    if (@available(iOS 8.0, macOS 10.10, *)) {
153      _dispatchQueue = dispatch_queue_create(
154          NULL,
155          dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
156    } else {
157#else
158    {
159#endif
160      _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
161    }
162    dispatch_set_target_queue(_dispatchQueue, handler.dispatchQueue);
163
164    _call = [[GRPCCall2 alloc] initWithRequestOptions:_requestOptions
165                                      responseHandler:self
166                                          callOptions:_callOptions];
167  }
168  return self;
169}
170
171- (void)start {
172  GRPCCall2 *copiedCall;
173  @synchronized(self) {
174    copiedCall = _call;
175  }
176  [copiedCall start];
177}
178
179- (void)cancel {
180  GRPCCall2 *copiedCall;
181  @synchronized(self) {
182    copiedCall = _call;
183    _call = nil;
184    if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) {
185      dispatch_async(_dispatchQueue, ^{
186        id<GRPCProtoResponseHandler> copiedHandler = nil;
187        @synchronized(self) {
188          copiedHandler = self->_handler;
189          self->_handler = nil;
190        }
191        [copiedHandler didCloseWithTrailingMetadata:nil
192                                              error:[NSError errorWithDomain:kGRPCErrorDomain
193                                                                        code:GRPCErrorCodeCancelled
194                                                                    userInfo:@{
195                                                                      NSLocalizedDescriptionKey :
196                                                                          @"Canceled by app"
197                                                                    }]];
198      });
199    } else {
200      _handler = nil;
201    }
202  }
203  [copiedCall cancel];
204}
205
206- (void)writeMessage:(GPBMessage *)message {
207  NSAssert([message isKindOfClass:[GPBMessage class]], @"Parameter message must be a GPBMessage");
208  if (![message isKindOfClass:[GPBMessage class]]) {
209    NSLog(@"Failed to send a message that is non-proto.");
210    return;
211  }
212
213  GRPCCall2 *copiedCall;
214  @synchronized(self) {
215    copiedCall = _call;
216  }
217  [copiedCall writeData:[message data]];
218}
219
220- (void)finish {
221  GRPCCall2 *copiedCall;
222  @synchronized(self) {
223    copiedCall = _call;
224    _call = nil;
225  }
226  [copiedCall finish];
227}
228
229- (void)receiveNextMessage {
230  [self receiveNextMessages:1];
231}
232- (void)receiveNextMessages:(NSUInteger)numberOfMessages {
233  GRPCCall2 *copiedCall;
234  @synchronized(self) {
235    copiedCall = _call;
236  }
237  [copiedCall receiveNextMessages:numberOfMessages];
238}
239
240- (void)didReceiveInitialMetadata:(NSDictionary *)initialMetadata {
241  @synchronized(self) {
242    if (initialMetadata != nil &&
243        [_handler respondsToSelector:@selector(didReceiveInitialMetadata:)]) {
244      dispatch_async(_dispatchQueue, ^{
245        id<GRPCProtoResponseHandler> copiedHandler = nil;
246        @synchronized(self) {
247          copiedHandler = self->_handler;
248        }
249        [copiedHandler didReceiveInitialMetadata:initialMetadata];
250      });
251    }
252  }
253}
254
255- (void)didReceiveData:(id)data {
256  if (data == nil) return;
257
258  NSError *error = nil;
259  GPBMessage *parsed = [_responseClass parseFromData:data error:&error];
260  @synchronized(self) {
261    if (parsed && [_handler respondsToSelector:@selector(didReceiveProtoMessage:)]) {
262      dispatch_async(_dispatchQueue, ^{
263        id<GRPCProtoResponseHandler> copiedHandler = nil;
264        @synchronized(self) {
265          copiedHandler = self->_handler;
266        }
267        [copiedHandler didReceiveProtoMessage:parsed];
268      });
269    } else if (!parsed && [_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:
270                                                                                        error:)]) {
271      dispatch_async(_dispatchQueue, ^{
272        id<GRPCProtoResponseHandler> copiedHandler = nil;
273        @synchronized(self) {
274          copiedHandler = self->_handler;
275          self->_handler = nil;
276        }
277        [copiedHandler
278            didCloseWithTrailingMetadata:nil
279                                   error:ErrorForBadProto(data, self->_responseClass, error)];
280      });
281      [_call cancel];
282      _call = nil;
283    }
284  }
285}
286
287- (void)didCloseWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error {
288  @synchronized(self) {
289    if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) {
290      dispatch_async(_dispatchQueue, ^{
291        id<GRPCProtoResponseHandler> copiedHandler = nil;
292        @synchronized(self) {
293          copiedHandler = self->_handler;
294          self->_handler = nil;
295        }
296        [copiedHandler didCloseWithTrailingMetadata:trailingMetadata error:error];
297      });
298    }
299    _call = nil;
300  }
301}
302
303- (void)didWriteData {
304  @synchronized(self) {
305    if ([_handler respondsToSelector:@selector(didWriteMessage)]) {
306      dispatch_async(_dispatchQueue, ^{
307        id<GRPCProtoResponseHandler> copiedHandler = nil;
308        @synchronized(self) {
309          copiedHandler = self->_handler;
310        }
311        [copiedHandler didWriteMessage];
312      });
313    }
314  }
315}
316
317- (dispatch_queue_t)dispatchQueue {
318  return _dispatchQueue;
319}
320
321@end
322
323/**
324 * Generate an NSError object that represents a failure in parsing a proto class.
325 */
326NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsingError) {
327  NSDictionary *info = @{
328    NSLocalizedDescriptionKey : @"Unable to parse response from the server",
329    NSLocalizedRecoverySuggestionErrorKey :
330        @"If this RPC is idempotent, retry "
331        @"with exponential backoff. Otherwise, query the server status before "
332        @"retrying.",
333    NSUnderlyingErrorKey : parsingError,
334    @"Expected class" : expectedClass,
335    @"Received value" : proto,
336  };
337  // TODO(jcanizales): Use kGRPCErrorDomain and GRPCErrorCodeInternal when they're public.
338  return [NSError errorWithDomain:@"io.grpc" code:13 userInfo:info];
339}
340