1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #ifndef GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
20 #define GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
21
22 #include <cstring>
23 #include <map>
24 #include <memory>
25
26 #include <grpc/impl/codegen/compression_types.h>
27 #include <grpc/impl/codegen/grpc_types.h>
28 #include <grpcpp/impl/codegen/byte_buffer.h>
29 #include <grpcpp/impl/codegen/call.h>
30 #include <grpcpp/impl/codegen/call_hook.h>
31 #include <grpcpp/impl/codegen/call_op_set_interface.h>
32 #include <grpcpp/impl/codegen/client_context_impl.h>
33 #include <grpcpp/impl/codegen/completion_queue_impl.h>
34 #include <grpcpp/impl/codegen/completion_queue_tag.h>
35 #include <grpcpp/impl/codegen/config.h>
36 #include <grpcpp/impl/codegen/core_codegen_interface.h>
37 #include <grpcpp/impl/codegen/intercepted_channel.h>
38 #include <grpcpp/impl/codegen/interceptor_common.h>
39 #include <grpcpp/impl/codegen/serialization_traits.h>
40 #include <grpcpp/impl/codegen/slice.h>
41 #include <grpcpp/impl/codegen/string_ref.h>
42
43 namespace grpc {
44
45 extern CoreCodegenInterface* g_core_codegen_interface;
46
47 namespace internal {
48 class Call;
49 class CallHook;
50
51 // TODO(yangg) if the map is changed before we send, the pointers will be a
52 // mess. Make sure it does not happen.
FillMetadataArray(const std::multimap<std::string,std::string> & metadata,size_t * metadata_count,const std::string & optional_error_details)53 inline grpc_metadata* FillMetadataArray(
54 const std::multimap<std::string, std::string>& metadata,
55 size_t* metadata_count, const std::string& optional_error_details) {
56 *metadata_count = metadata.size() + (optional_error_details.empty() ? 0 : 1);
57 if (*metadata_count == 0) {
58 return nullptr;
59 }
60 grpc_metadata* metadata_array =
61 (grpc_metadata*)(g_core_codegen_interface->gpr_malloc(
62 (*metadata_count) * sizeof(grpc_metadata)));
63 size_t i = 0;
64 for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
65 metadata_array[i].key = SliceReferencingString(iter->first);
66 metadata_array[i].value = SliceReferencingString(iter->second);
67 }
68 if (!optional_error_details.empty()) {
69 metadata_array[i].key =
70 g_core_codegen_interface->grpc_slice_from_static_buffer(
71 kBinaryErrorDetailsKey, sizeof(kBinaryErrorDetailsKey) - 1);
72 metadata_array[i].value = SliceReferencingString(optional_error_details);
73 }
74 return metadata_array;
75 }
76 } // namespace internal
77
78 /// Per-message write options.
79 class WriteOptions {
80 public:
WriteOptions()81 WriteOptions() : flags_(0), last_message_(false) {}
WriteOptions(const WriteOptions & other)82 WriteOptions(const WriteOptions& other)
83 : flags_(other.flags_), last_message_(other.last_message_) {}
84
85 /// Default assignment operator
86 WriteOptions& operator=(const WriteOptions& other) = default;
87
88 /// Clear all flags.
Clear()89 inline void Clear() { flags_ = 0; }
90
91 /// Returns raw flags bitset.
flags()92 inline uint32_t flags() const { return flags_; }
93
94 /// Sets flag for the disabling of compression for the next message write.
95 ///
96 /// \sa GRPC_WRITE_NO_COMPRESS
set_no_compression()97 inline WriteOptions& set_no_compression() {
98 SetBit(GRPC_WRITE_NO_COMPRESS);
99 return *this;
100 }
101
102 /// Clears flag for the disabling of compression for the next message write.
103 ///
104 /// \sa GRPC_WRITE_NO_COMPRESS
clear_no_compression()105 inline WriteOptions& clear_no_compression() {
106 ClearBit(GRPC_WRITE_NO_COMPRESS);
107 return *this;
108 }
109
110 /// Get value for the flag indicating whether compression for the next
111 /// message write is forcefully disabled.
112 ///
113 /// \sa GRPC_WRITE_NO_COMPRESS
get_no_compression()114 inline bool get_no_compression() const {
115 return GetBit(GRPC_WRITE_NO_COMPRESS);
116 }
117
118 /// Sets flag indicating that the write may be buffered and need not go out on
119 /// the wire immediately.
120 ///
121 /// \sa GRPC_WRITE_BUFFER_HINT
set_buffer_hint()122 inline WriteOptions& set_buffer_hint() {
123 SetBit(GRPC_WRITE_BUFFER_HINT);
124 return *this;
125 }
126
127 /// Clears flag indicating that the write may be buffered and need not go out
128 /// on the wire immediately.
129 ///
130 /// \sa GRPC_WRITE_BUFFER_HINT
clear_buffer_hint()131 inline WriteOptions& clear_buffer_hint() {
132 ClearBit(GRPC_WRITE_BUFFER_HINT);
133 return *this;
134 }
135
136 /// Get value for the flag indicating that the write may be buffered and need
137 /// not go out on the wire immediately.
138 ///
139 /// \sa GRPC_WRITE_BUFFER_HINT
get_buffer_hint()140 inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
141
142 /// corked bit: aliases set_buffer_hint currently, with the intent that
143 /// set_buffer_hint will be removed in the future
set_corked()144 inline WriteOptions& set_corked() {
145 SetBit(GRPC_WRITE_BUFFER_HINT);
146 return *this;
147 }
148
clear_corked()149 inline WriteOptions& clear_corked() {
150 ClearBit(GRPC_WRITE_BUFFER_HINT);
151 return *this;
152 }
153
is_corked()154 inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
155
156 /// last-message bit: indicates this is the last message in a stream
157 /// client-side: makes Write the equivalent of performing Write, WritesDone
158 /// in a single step
159 /// server-side: hold the Write until the service handler returns (sync api)
160 /// or until Finish is called (async api)
set_last_message()161 inline WriteOptions& set_last_message() {
162 last_message_ = true;
163 return *this;
164 }
165
166 /// Clears flag indicating that this is the last message in a stream,
167 /// disabling coalescing.
clear_last_message()168 inline WriteOptions& clear_last_message() {
169 last_message_ = false;
170 return *this;
171 }
172
173 /// Guarantee that all bytes have been written to the socket before completing
174 /// this write (usually writes are completed when they pass flow control).
set_write_through()175 inline WriteOptions& set_write_through() {
176 SetBit(GRPC_WRITE_THROUGH);
177 return *this;
178 }
179
is_write_through()180 inline bool is_write_through() const { return GetBit(GRPC_WRITE_THROUGH); }
181
182 /// Get value for the flag indicating that this is the last message, and
183 /// should be coalesced with trailing metadata.
184 ///
185 /// \sa GRPC_WRITE_LAST_MESSAGE
is_last_message()186 bool is_last_message() const { return last_message_; }
187
188 private:
SetBit(const uint32_t mask)189 void SetBit(const uint32_t mask) { flags_ |= mask; }
190
ClearBit(const uint32_t mask)191 void ClearBit(const uint32_t mask) { flags_ &= ~mask; }
192
GetBit(const uint32_t mask)193 bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; }
194
195 uint32_t flags_;
196 bool last_message_;
197 };
198
199 namespace internal {
200
201 /// Default argument for CallOpSet. The Unused parameter is unused by
202 /// the class, but can be used for generating multiple names for the
203 /// same thing.
204 template <int Unused>
205 class CallNoOp {
206 protected:
AddOp(grpc_op *,size_t *)207 void AddOp(grpc_op* /*ops*/, size_t* /*nops*/) {}
FinishOp(bool *)208 void FinishOp(bool* /*status*/) {}
SetInterceptionHookPoint(InterceptorBatchMethodsImpl *)209 void SetInterceptionHookPoint(
210 InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *)211 void SetFinishInterceptionHookPoint(
212 InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
SetHijackingState(InterceptorBatchMethodsImpl *)213 void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
214 }
215 };
216
217 class CallOpSendInitialMetadata {
218 public:
CallOpSendInitialMetadata()219 CallOpSendInitialMetadata() : send_(false) {
220 maybe_compression_level_.is_set = false;
221 }
222
SendInitialMetadata(std::multimap<std::string,std::string> * metadata,uint32_t flags)223 void SendInitialMetadata(std::multimap<std::string, std::string>* metadata,
224 uint32_t flags) {
225 maybe_compression_level_.is_set = false;
226 send_ = true;
227 flags_ = flags;
228 metadata_map_ = metadata;
229 }
230
set_compression_level(grpc_compression_level level)231 void set_compression_level(grpc_compression_level level) {
232 maybe_compression_level_.is_set = true;
233 maybe_compression_level_.level = level;
234 }
235
236 protected:
AddOp(grpc_op * ops,size_t * nops)237 void AddOp(grpc_op* ops, size_t* nops) {
238 if (!send_ || hijacked_) return;
239 grpc_op* op = &ops[(*nops)++];
240 op->op = GRPC_OP_SEND_INITIAL_METADATA;
241 op->flags = flags_;
242 op->reserved = NULL;
243 initial_metadata_ =
244 FillMetadataArray(*metadata_map_, &initial_metadata_count_, "");
245 op->data.send_initial_metadata.count = initial_metadata_count_;
246 op->data.send_initial_metadata.metadata = initial_metadata_;
247 op->data.send_initial_metadata.maybe_compression_level.is_set =
248 maybe_compression_level_.is_set;
249 if (maybe_compression_level_.is_set) {
250 op->data.send_initial_metadata.maybe_compression_level.level =
251 maybe_compression_level_.level;
252 }
253 }
FinishOp(bool *)254 void FinishOp(bool* /*status*/) {
255 if (!send_ || hijacked_) return;
256 g_core_codegen_interface->gpr_free(initial_metadata_);
257 send_ = false;
258 }
259
SetInterceptionHookPoint(InterceptorBatchMethodsImpl * interceptor_methods)260 void SetInterceptionHookPoint(
261 InterceptorBatchMethodsImpl* interceptor_methods) {
262 if (!send_) return;
263 interceptor_methods->AddInterceptionHookPoint(
264 experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA);
265 interceptor_methods->SetSendInitialMetadata(metadata_map_);
266 }
267
SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *)268 void SetFinishInterceptionHookPoint(
269 InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
270
SetHijackingState(InterceptorBatchMethodsImpl *)271 void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
272 hijacked_ = true;
273 }
274
275 bool hijacked_ = false;
276 bool send_;
277 uint32_t flags_;
278 size_t initial_metadata_count_;
279 std::multimap<std::string, std::string>* metadata_map_;
280 grpc_metadata* initial_metadata_;
281 struct {
282 bool is_set;
283 grpc_compression_level level;
284 } maybe_compression_level_;
285 };
286
287 class CallOpSendMessage {
288 public:
CallOpSendMessage()289 CallOpSendMessage() : send_buf_() {}
290
291 /// Send \a message using \a options for the write. The \a options are cleared
292 /// after use.
293 template <class M>
294 Status SendMessage(const M& message,
295 WriteOptions options) GRPC_MUST_USE_RESULT;
296
297 template <class M>
298 Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
299
300 /// Send \a message using \a options for the write. The \a options are cleared
301 /// after use. This form of SendMessage allows gRPC to reference \a message
302 /// beyond the lifetime of SendMessage.
303 template <class M>
304 Status SendMessagePtr(const M* message,
305 WriteOptions options) GRPC_MUST_USE_RESULT;
306
307 /// This form of SendMessage allows gRPC to reference \a message beyond the
308 /// lifetime of SendMessage.
309 template <class M>
310 Status SendMessagePtr(const M* message) GRPC_MUST_USE_RESULT;
311
312 protected:
AddOp(grpc_op * ops,size_t * nops)313 void AddOp(grpc_op* ops, size_t* nops) {
314 if (msg_ == nullptr && !send_buf_.Valid()) return;
315 if (hijacked_) {
316 serializer_ = nullptr;
317 return;
318 }
319 if (msg_ != nullptr) {
320 GPR_CODEGEN_ASSERT(serializer_(msg_).ok());
321 }
322 serializer_ = nullptr;
323 grpc_op* op = &ops[(*nops)++];
324 op->op = GRPC_OP_SEND_MESSAGE;
325 op->flags = write_options_.flags();
326 op->reserved = NULL;
327 op->data.send_message.send_message = send_buf_.c_buffer();
328 // Flags are per-message: clear them after use.
329 write_options_.Clear();
330 }
FinishOp(bool * status)331 void FinishOp(bool* status) {
332 if (msg_ == nullptr && !send_buf_.Valid()) return;
333 if (hijacked_ && failed_send_) {
334 // Hijacking interceptor failed this Op
335 *status = false;
336 } else if (!*status) {
337 // This Op was passed down to core and the Op failed
338 failed_send_ = true;
339 }
340 }
341
SetInterceptionHookPoint(InterceptorBatchMethodsImpl * interceptor_methods)342 void SetInterceptionHookPoint(
343 InterceptorBatchMethodsImpl* interceptor_methods) {
344 if (msg_ == nullptr && !send_buf_.Valid()) return;
345 interceptor_methods->AddInterceptionHookPoint(
346 experimental::InterceptionHookPoints::PRE_SEND_MESSAGE);
347 interceptor_methods->SetSendMessage(&send_buf_, &msg_, &failed_send_,
348 serializer_);
349 }
350
SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl * interceptor_methods)351 void SetFinishInterceptionHookPoint(
352 InterceptorBatchMethodsImpl* interceptor_methods) {
353 if (msg_ != nullptr || send_buf_.Valid()) {
354 interceptor_methods->AddInterceptionHookPoint(
355 experimental::InterceptionHookPoints::POST_SEND_MESSAGE);
356 }
357 send_buf_.Clear();
358 msg_ = nullptr;
359 // The contents of the SendMessage value that was previously set
360 // has had its references stolen by core's operations
361 interceptor_methods->SetSendMessage(nullptr, nullptr, &failed_send_,
362 nullptr);
363 }
364
SetHijackingState(InterceptorBatchMethodsImpl *)365 void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
366 hijacked_ = true;
367 }
368
369 private:
370 const void* msg_ = nullptr; // The original non-serialized message
371 bool hijacked_ = false;
372 bool failed_send_ = false;
373 ByteBuffer send_buf_;
374 WriteOptions write_options_;
375 std::function<Status(const void*)> serializer_;
376 };
377
378 template <class M>
SendMessage(const M & message,WriteOptions options)379 Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) {
380 write_options_ = options;
381 serializer_ = [this](const void* message) {
382 bool own_buf;
383 send_buf_.Clear();
384 // TODO(vjpai): Remove the void below when possible
385 // The void in the template parameter below should not be needed
386 // (since it should be implicit) but is needed due to an observed
387 // difference in behavior between clang and gcc for certain internal users
388 Status result = SerializationTraits<M, void>::Serialize(
389 *static_cast<const M*>(message), send_buf_.bbuf_ptr(), &own_buf);
390 if (!own_buf) {
391 send_buf_.Duplicate();
392 }
393 return result;
394 };
395 // Serialize immediately only if we do not have access to the message pointer
396 if (msg_ == nullptr) {
397 Status result = serializer_(&message);
398 serializer_ = nullptr;
399 return result;
400 }
401 return Status();
402 }
403
404 template <class M>
SendMessage(const M & message)405 Status CallOpSendMessage::SendMessage(const M& message) {
406 return SendMessage(message, WriteOptions());
407 }
408
409 template <class M>
SendMessagePtr(const M * message,WriteOptions options)410 Status CallOpSendMessage::SendMessagePtr(const M* message,
411 WriteOptions options) {
412 msg_ = message;
413 return SendMessage(*message, options);
414 }
415
416 template <class M>
SendMessagePtr(const M * message)417 Status CallOpSendMessage::SendMessagePtr(const M* message) {
418 msg_ = message;
419 return SendMessage(*message, WriteOptions());
420 }
421
422 template <class R>
423 class CallOpRecvMessage {
424 public:
RecvMessage(R * message)425 void RecvMessage(R* message) { message_ = message; }
426
427 // Do not change status if no message is received.
AllowNoMessage()428 void AllowNoMessage() { allow_not_getting_message_ = true; }
429
430 bool got_message = false;
431
432 protected:
AddOp(grpc_op * ops,size_t * nops)433 void AddOp(grpc_op* ops, size_t* nops) {
434 if (message_ == nullptr || hijacked_) return;
435 grpc_op* op = &ops[(*nops)++];
436 op->op = GRPC_OP_RECV_MESSAGE;
437 op->flags = 0;
438 op->reserved = NULL;
439 op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
440 }
441
FinishOp(bool * status)442 void FinishOp(bool* status) {
443 if (message_ == nullptr) return;
444 if (recv_buf_.Valid()) {
445 if (*status) {
446 got_message = *status =
447 SerializationTraits<R>::Deserialize(recv_buf_.bbuf_ptr(), message_)
448 .ok();
449 recv_buf_.Release();
450 } else {
451 got_message = false;
452 recv_buf_.Clear();
453 }
454 } else if (hijacked_) {
455 if (hijacked_recv_message_failed_) {
456 FinishOpRecvMessageFailureHandler(status);
457 } else {
458 // The op was hijacked and it was successful. There is no further action
459 // to be performed since the message is already in its non-serialized
460 // form.
461 }
462 } else {
463 FinishOpRecvMessageFailureHandler(status);
464 }
465 }
466
SetInterceptionHookPoint(InterceptorBatchMethodsImpl * interceptor_methods)467 void SetInterceptionHookPoint(
468 InterceptorBatchMethodsImpl* interceptor_methods) {
469 if (message_ == nullptr) return;
470 interceptor_methods->SetRecvMessage(message_,
471 &hijacked_recv_message_failed_);
472 }
473
SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl * interceptor_methods)474 void SetFinishInterceptionHookPoint(
475 InterceptorBatchMethodsImpl* interceptor_methods) {
476 if (message_ == nullptr) return;
477 interceptor_methods->AddInterceptionHookPoint(
478 experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
479 if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
480 }
SetHijackingState(InterceptorBatchMethodsImpl * interceptor_methods)481 void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
482 hijacked_ = true;
483 if (message_ == nullptr) return;
484 interceptor_methods->AddInterceptionHookPoint(
485 experimental::InterceptionHookPoints::PRE_RECV_MESSAGE);
486 got_message = true;
487 }
488
489 private:
490 // Sets got_message and \a status for a failed recv message op
FinishOpRecvMessageFailureHandler(bool * status)491 void FinishOpRecvMessageFailureHandler(bool* status) {
492 got_message = false;
493 if (!allow_not_getting_message_) {
494 *status = false;
495 }
496 }
497
498 R* message_ = nullptr;
499 ByteBuffer recv_buf_;
500 bool allow_not_getting_message_ = false;
501 bool hijacked_ = false;
502 bool hijacked_recv_message_failed_ = false;
503 };
504
505 class DeserializeFunc {
506 public:
507 virtual Status Deserialize(ByteBuffer* buf) = 0;
~DeserializeFunc()508 virtual ~DeserializeFunc() {}
509 };
510
511 template <class R>
512 class DeserializeFuncType final : public DeserializeFunc {
513 public:
DeserializeFuncType(R * message)514 DeserializeFuncType(R* message) : message_(message) {}
Deserialize(ByteBuffer * buf)515 Status Deserialize(ByteBuffer* buf) override {
516 return SerializationTraits<R>::Deserialize(buf->bbuf_ptr(), message_);
517 }
518
~DeserializeFuncType()519 ~DeserializeFuncType() override {}
520
521 private:
522 R* message_; // Not a managed pointer because management is external to this
523 };
524
525 class CallOpGenericRecvMessage {
526 public:
527 template <class R>
RecvMessage(R * message)528 void RecvMessage(R* message) {
529 // Use an explicit base class pointer to avoid resolution error in the
530 // following unique_ptr::reset for some old implementations.
531 DeserializeFunc* func = new DeserializeFuncType<R>(message);
532 deserialize_.reset(func);
533 message_ = message;
534 }
535
536 // Do not change status if no message is received.
AllowNoMessage()537 void AllowNoMessage() { allow_not_getting_message_ = true; }
538
539 bool got_message = false;
540
541 protected:
AddOp(grpc_op * ops,size_t * nops)542 void AddOp(grpc_op* ops, size_t* nops) {
543 if (!deserialize_ || hijacked_) return;
544 grpc_op* op = &ops[(*nops)++];
545 op->op = GRPC_OP_RECV_MESSAGE;
546 op->flags = 0;
547 op->reserved = NULL;
548 op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
549 }
550
FinishOp(bool * status)551 void FinishOp(bool* status) {
552 if (!deserialize_) return;
553 if (recv_buf_.Valid()) {
554 if (*status) {
555 got_message = true;
556 *status = deserialize_->Deserialize(&recv_buf_).ok();
557 recv_buf_.Release();
558 } else {
559 got_message = false;
560 recv_buf_.Clear();
561 }
562 } else if (hijacked_) {
563 if (hijacked_recv_message_failed_) {
564 FinishOpRecvMessageFailureHandler(status);
565 } else {
566 // The op was hijacked and it was successful. There is no further action
567 // to be performed since the message is already in its non-serialized
568 // form.
569 }
570 } else {
571 got_message = false;
572 if (!allow_not_getting_message_) {
573 *status = false;
574 }
575 }
576 }
577
SetInterceptionHookPoint(InterceptorBatchMethodsImpl * interceptor_methods)578 void SetInterceptionHookPoint(
579 InterceptorBatchMethodsImpl* interceptor_methods) {
580 if (!deserialize_) return;
581 interceptor_methods->SetRecvMessage(message_,
582 &hijacked_recv_message_failed_);
583 }
584
SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl * interceptor_methods)585 void SetFinishInterceptionHookPoint(
586 InterceptorBatchMethodsImpl* interceptor_methods) {
587 if (!deserialize_) return;
588 interceptor_methods->AddInterceptionHookPoint(
589 experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
590 if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
591 deserialize_.reset();
592 }
SetHijackingState(InterceptorBatchMethodsImpl * interceptor_methods)593 void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
594 hijacked_ = true;
595 if (!deserialize_) return;
596 interceptor_methods->AddInterceptionHookPoint(
597 experimental::InterceptionHookPoints::PRE_RECV_MESSAGE);
598 got_message = true;
599 }
600
601 private:
602 // Sets got_message and \a status for a failed recv message op
FinishOpRecvMessageFailureHandler(bool * status)603 void FinishOpRecvMessageFailureHandler(bool* status) {
604 got_message = false;
605 if (!allow_not_getting_message_) {
606 *status = false;
607 }
608 }
609
610 void* message_ = nullptr;
611 std::unique_ptr<DeserializeFunc> deserialize_;
612 ByteBuffer recv_buf_;
613 bool allow_not_getting_message_ = false;
614 bool hijacked_ = false;
615 bool hijacked_recv_message_failed_ = false;
616 };
617
618 class CallOpClientSendClose {
619 public:
CallOpClientSendClose()620 CallOpClientSendClose() : send_(false) {}
621
ClientSendClose()622 void ClientSendClose() { send_ = true; }
623
624 protected:
AddOp(grpc_op * ops,size_t * nops)625 void AddOp(grpc_op* ops, size_t* nops) {
626 if (!send_ || hijacked_) return;
627 grpc_op* op = &ops[(*nops)++];
628 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
629 op->flags = 0;
630 op->reserved = NULL;
631 }
FinishOp(bool *)632 void FinishOp(bool* /*status*/) { send_ = false; }
633
SetInterceptionHookPoint(InterceptorBatchMethodsImpl * interceptor_methods)634 void SetInterceptionHookPoint(
635 InterceptorBatchMethodsImpl* interceptor_methods) {
636 if (!send_) return;
637 interceptor_methods->AddInterceptionHookPoint(
638 experimental::InterceptionHookPoints::PRE_SEND_CLOSE);
639 }
640
SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *)641 void SetFinishInterceptionHookPoint(
642 InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
643
SetHijackingState(InterceptorBatchMethodsImpl *)644 void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
645 hijacked_ = true;
646 }
647
648 private:
649 bool hijacked_ = false;
650 bool send_;
651 };
652
653 class CallOpServerSendStatus {
654 public:
CallOpServerSendStatus()655 CallOpServerSendStatus() : send_status_available_(false) {}
656
ServerSendStatus(std::multimap<std::string,std::string> * trailing_metadata,const Status & status)657 void ServerSendStatus(
658 std::multimap<std::string, std::string>* trailing_metadata,
659 const Status& status) {
660 send_error_details_ = status.error_details();
661 metadata_map_ = trailing_metadata;
662 send_status_available_ = true;
663 send_status_code_ = static_cast<grpc_status_code>(status.error_code());
664 send_error_message_ = status.error_message();
665 }
666
667 protected:
AddOp(grpc_op * ops,size_t * nops)668 void AddOp(grpc_op* ops, size_t* nops) {
669 if (!send_status_available_ || hijacked_) return;
670 trailing_metadata_ = FillMetadataArray(
671 *metadata_map_, &trailing_metadata_count_, send_error_details_);
672 grpc_op* op = &ops[(*nops)++];
673 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
674 op->data.send_status_from_server.trailing_metadata_count =
675 trailing_metadata_count_;
676 op->data.send_status_from_server.trailing_metadata = trailing_metadata_;
677 op->data.send_status_from_server.status = send_status_code_;
678 error_message_slice_ = SliceReferencingString(send_error_message_);
679 op->data.send_status_from_server.status_details =
680 send_error_message_.empty() ? nullptr : &error_message_slice_;
681 op->flags = 0;
682 op->reserved = NULL;
683 }
684
FinishOp(bool *)685 void FinishOp(bool* /*status*/) {
686 if (!send_status_available_ || hijacked_) return;
687 g_core_codegen_interface->gpr_free(trailing_metadata_);
688 send_status_available_ = false;
689 }
690
SetInterceptionHookPoint(InterceptorBatchMethodsImpl * interceptor_methods)691 void SetInterceptionHookPoint(
692 InterceptorBatchMethodsImpl* interceptor_methods) {
693 if (!send_status_available_) return;
694 interceptor_methods->AddInterceptionHookPoint(
695 experimental::InterceptionHookPoints::PRE_SEND_STATUS);
696 interceptor_methods->SetSendTrailingMetadata(metadata_map_);
697 interceptor_methods->SetSendStatus(&send_status_code_, &send_error_details_,
698 &send_error_message_);
699 }
700
SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *)701 void SetFinishInterceptionHookPoint(
702 InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
703
SetHijackingState(InterceptorBatchMethodsImpl *)704 void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
705 hijacked_ = true;
706 }
707
708 private:
709 bool hijacked_ = false;
710 bool send_status_available_;
711 grpc_status_code send_status_code_;
712 std::string send_error_details_;
713 std::string send_error_message_;
714 size_t trailing_metadata_count_;
715 std::multimap<std::string, std::string>* metadata_map_;
716 grpc_metadata* trailing_metadata_;
717 grpc_slice error_message_slice_;
718 };
719
720 class CallOpRecvInitialMetadata {
721 public:
CallOpRecvInitialMetadata()722 CallOpRecvInitialMetadata() : metadata_map_(nullptr) {}
723
RecvInitialMetadata(::grpc_impl::ClientContext * context)724 void RecvInitialMetadata(::grpc_impl::ClientContext* context) {
725 context->initial_metadata_received_ = true;
726 metadata_map_ = &context->recv_initial_metadata_;
727 }
728
729 protected:
AddOp(grpc_op * ops,size_t * nops)730 void AddOp(grpc_op* ops, size_t* nops) {
731 if (metadata_map_ == nullptr || hijacked_) return;
732 grpc_op* op = &ops[(*nops)++];
733 op->op = GRPC_OP_RECV_INITIAL_METADATA;
734 op->data.recv_initial_metadata.recv_initial_metadata = metadata_map_->arr();
735 op->flags = 0;
736 op->reserved = NULL;
737 }
738
FinishOp(bool *)739 void FinishOp(bool* /*status*/) {
740 if (metadata_map_ == nullptr || hijacked_) return;
741 }
742
SetInterceptionHookPoint(InterceptorBatchMethodsImpl * interceptor_methods)743 void SetInterceptionHookPoint(
744 InterceptorBatchMethodsImpl* interceptor_methods) {
745 interceptor_methods->SetRecvInitialMetadata(metadata_map_);
746 }
747
SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl * interceptor_methods)748 void SetFinishInterceptionHookPoint(
749 InterceptorBatchMethodsImpl* interceptor_methods) {
750 if (metadata_map_ == nullptr) return;
751 interceptor_methods->AddInterceptionHookPoint(
752 experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
753 metadata_map_ = nullptr;
754 }
755
SetHijackingState(InterceptorBatchMethodsImpl * interceptor_methods)756 void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
757 hijacked_ = true;
758 if (metadata_map_ == nullptr) return;
759 interceptor_methods->AddInterceptionHookPoint(
760 experimental::InterceptionHookPoints::PRE_RECV_INITIAL_METADATA);
761 }
762
763 private:
764 bool hijacked_ = false;
765 MetadataMap* metadata_map_;
766 };
767
768 class CallOpClientRecvStatus {
769 public:
CallOpClientRecvStatus()770 CallOpClientRecvStatus()
771 : recv_status_(nullptr), debug_error_string_(nullptr) {}
772
ClientRecvStatus(::grpc_impl::ClientContext * context,Status * status)773 void ClientRecvStatus(::grpc_impl::ClientContext* context, Status* status) {
774 client_context_ = context;
775 metadata_map_ = &client_context_->trailing_metadata_;
776 recv_status_ = status;
777 error_message_ = g_core_codegen_interface->grpc_empty_slice();
778 }
779
780 protected:
AddOp(grpc_op * ops,size_t * nops)781 void AddOp(grpc_op* ops, size_t* nops) {
782 if (recv_status_ == nullptr || hijacked_) return;
783 grpc_op* op = &ops[(*nops)++];
784 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
785 op->data.recv_status_on_client.trailing_metadata = metadata_map_->arr();
786 op->data.recv_status_on_client.status = &status_code_;
787 op->data.recv_status_on_client.status_details = &error_message_;
788 op->data.recv_status_on_client.error_string = &debug_error_string_;
789 op->flags = 0;
790 op->reserved = NULL;
791 }
792
FinishOp(bool *)793 void FinishOp(bool* /*status*/) {
794 if (recv_status_ == nullptr || hijacked_) return;
795 if (static_cast<StatusCode>(status_code_) == StatusCode::OK) {
796 *recv_status_ = Status();
797 GPR_CODEGEN_DEBUG_ASSERT(debug_error_string_ == nullptr);
798 } else {
799 *recv_status_ =
800 Status(static_cast<StatusCode>(status_code_),
801 GRPC_SLICE_IS_EMPTY(error_message_)
802 ? std::string()
803 : std::string(GRPC_SLICE_START_PTR(error_message_),
804 GRPC_SLICE_END_PTR(error_message_)),
805 metadata_map_->GetBinaryErrorDetails());
806 if (debug_error_string_ != nullptr) {
807 client_context_->set_debug_error_string(debug_error_string_);
808 g_core_codegen_interface->gpr_free((void*)debug_error_string_);
809 }
810 }
811 // TODO(soheil): Find callers that set debug string even for status OK,
812 // and fix them.
813 g_core_codegen_interface->grpc_slice_unref(error_message_);
814 }
815
SetInterceptionHookPoint(InterceptorBatchMethodsImpl * interceptor_methods)816 void SetInterceptionHookPoint(
817 InterceptorBatchMethodsImpl* interceptor_methods) {
818 interceptor_methods->SetRecvStatus(recv_status_);
819 interceptor_methods->SetRecvTrailingMetadata(metadata_map_);
820 }
821
SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl * interceptor_methods)822 void SetFinishInterceptionHookPoint(
823 InterceptorBatchMethodsImpl* interceptor_methods) {
824 if (recv_status_ == nullptr) return;
825 interceptor_methods->AddInterceptionHookPoint(
826 experimental::InterceptionHookPoints::POST_RECV_STATUS);
827 recv_status_ = nullptr;
828 }
829
SetHijackingState(InterceptorBatchMethodsImpl * interceptor_methods)830 void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
831 hijacked_ = true;
832 if (recv_status_ == nullptr) return;
833 interceptor_methods->AddInterceptionHookPoint(
834 experimental::InterceptionHookPoints::PRE_RECV_STATUS);
835 }
836
837 private:
838 bool hijacked_ = false;
839 ::grpc_impl::ClientContext* client_context_;
840 MetadataMap* metadata_map_;
841 Status* recv_status_;
842 const char* debug_error_string_;
843 grpc_status_code status_code_;
844 grpc_slice error_message_;
845 };
846
847 template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
848 class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
849 class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
850 class CallOpSet;
851
852 /// Primary implementation of CallOpSetInterface.
853 /// Since we cannot use variadic templates, we declare slots up to
854 /// the maximum count of ops we'll need in a set. We leverage the
855 /// empty base class optimization to slim this class (especially
856 /// when there are many unused slots used). To avoid duplicate base classes,
857 /// the template parameter for CallNoOp is varied by argument position.
858 template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
859 class CallOpSet : public CallOpSetInterface,
860 public Op1,
861 public Op2,
862 public Op3,
863 public Op4,
864 public Op5,
865 public Op6 {
866 public:
CallOpSet()867 CallOpSet() : core_cq_tag_(this), return_tag_(this) {}
868 // The copy constructor and assignment operator reset the value of
869 // core_cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_
870 // since those are only meaningful on a specific object, not across objects.
CallOpSet(const CallOpSet & other)871 CallOpSet(const CallOpSet& other)
872 : core_cq_tag_(this),
873 return_tag_(this),
874 call_(other.call_),
875 done_intercepting_(false),
876 interceptor_methods_(InterceptorBatchMethodsImpl()) {}
877
878 CallOpSet& operator=(const CallOpSet& other) {
879 core_cq_tag_ = this;
880 return_tag_ = this;
881 call_ = other.call_;
882 done_intercepting_ = false;
883 interceptor_methods_ = InterceptorBatchMethodsImpl();
884 return *this;
885 }
886
FillOps(Call * call)887 void FillOps(Call* call) override {
888 done_intercepting_ = false;
889 g_core_codegen_interface->grpc_call_ref(call->call());
890 call_ =
891 *call; // It's fine to create a copy of call since it's just pointers
892
893 if (RunInterceptors()) {
894 ContinueFillOpsAfterInterception();
895 } else {
896 // After the interceptors are run, ContinueFillOpsAfterInterception will
897 // be run
898 }
899 }
900
FinalizeResult(void ** tag,bool * status)901 bool FinalizeResult(void** tag, bool* status) override {
902 if (done_intercepting_) {
903 // Complete the avalanching since we are done with this batch of ops
904 call_.cq()->CompleteAvalanching();
905 // We have already finished intercepting and filling in the results. This
906 // round trip from the core needed to be made because interceptors were
907 // run
908 *tag = return_tag_;
909 *status = saved_status_;
910 g_core_codegen_interface->grpc_call_unref(call_.call());
911 return true;
912 }
913
914 this->Op1::FinishOp(status);
915 this->Op2::FinishOp(status);
916 this->Op3::FinishOp(status);
917 this->Op4::FinishOp(status);
918 this->Op5::FinishOp(status);
919 this->Op6::FinishOp(status);
920 saved_status_ = *status;
921 if (RunInterceptorsPostRecv()) {
922 *tag = return_tag_;
923 g_core_codegen_interface->grpc_call_unref(call_.call());
924 return true;
925 }
926 // Interceptors are going to be run, so we can't return the tag just yet.
927 // After the interceptors are run, ContinueFinalizeResultAfterInterception
928 return false;
929 }
930
set_output_tag(void * return_tag)931 void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
932
core_cq_tag()933 void* core_cq_tag() override { return core_cq_tag_; }
934
935 /// set_core_cq_tag is used to provide a different core CQ tag than "this".
936 /// This is used for callback-based tags, where the core tag is the core
937 /// callback function. It does not change the use or behavior of any other
938 /// function (such as FinalizeResult)
set_core_cq_tag(void * core_cq_tag)939 void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
940
941 // This will be called while interceptors are run if the RPC is a hijacked
942 // RPC. This should set hijacking state for each of the ops.
SetHijackingState()943 void SetHijackingState() override {
944 this->Op1::SetHijackingState(&interceptor_methods_);
945 this->Op2::SetHijackingState(&interceptor_methods_);
946 this->Op3::SetHijackingState(&interceptor_methods_);
947 this->Op4::SetHijackingState(&interceptor_methods_);
948 this->Op5::SetHijackingState(&interceptor_methods_);
949 this->Op6::SetHijackingState(&interceptor_methods_);
950 }
951
952 // Should be called after interceptors are done running
ContinueFillOpsAfterInterception()953 void ContinueFillOpsAfterInterception() override {
954 static const size_t MAX_OPS = 6;
955 grpc_op ops[MAX_OPS];
956 size_t nops = 0;
957 this->Op1::AddOp(ops, &nops);
958 this->Op2::AddOp(ops, &nops);
959 this->Op3::AddOp(ops, &nops);
960 this->Op4::AddOp(ops, &nops);
961 this->Op5::AddOp(ops, &nops);
962 this->Op6::AddOp(ops, &nops);
963
964 grpc_call_error err = g_core_codegen_interface->grpc_call_start_batch(
965 call_.call(), ops, nops, core_cq_tag(), nullptr);
966
967 if (err != GRPC_CALL_OK) {
968 // A failure here indicates an API misuse; for example, doing a Write
969 // while another Write is already pending on the same RPC or invoking
970 // WritesDone multiple times
971 // gpr_log(GPR_ERROR, "API misuse of type %s observed",
972 // g_core_codegen_interface->grpc_call_error_to_string(err));
973 GPR_CODEGEN_ASSERT(false);
974 }
975 }
976
977 // Should be called after interceptors are done running on the finalize result
978 // path
ContinueFinalizeResultAfterInterception()979 void ContinueFinalizeResultAfterInterception() override {
980 done_intercepting_ = true;
981 // The following call_start_batch is internally-generated so no need for an
982 // explanatory log on failure.
983 GPR_CODEGEN_ASSERT(g_core_codegen_interface->grpc_call_start_batch(
984 call_.call(), nullptr, 0, core_cq_tag(), nullptr) ==
985 GRPC_CALL_OK);
986 }
987
988 private:
989 // Returns true if no interceptors need to be run
RunInterceptors()990 bool RunInterceptors() {
991 interceptor_methods_.ClearState();
992 interceptor_methods_.SetCallOpSetInterface(this);
993 interceptor_methods_.SetCall(&call_);
994 this->Op1::SetInterceptionHookPoint(&interceptor_methods_);
995 this->Op2::SetInterceptionHookPoint(&interceptor_methods_);
996 this->Op3::SetInterceptionHookPoint(&interceptor_methods_);
997 this->Op4::SetInterceptionHookPoint(&interceptor_methods_);
998 this->Op5::SetInterceptionHookPoint(&interceptor_methods_);
999 this->Op6::SetInterceptionHookPoint(&interceptor_methods_);
1000 if (interceptor_methods_.InterceptorsListEmpty()) {
1001 return true;
1002 }
1003 // This call will go through interceptors and would need to
1004 // schedule new batches, so delay completion queue shutdown
1005 call_.cq()->RegisterAvalanching();
1006 return interceptor_methods_.RunInterceptors();
1007 }
1008 // Returns true if no interceptors need to be run
RunInterceptorsPostRecv()1009 bool RunInterceptorsPostRecv() {
1010 // Call and OpSet had already been set on the set state.
1011 // SetReverse also clears previously set hook points
1012 interceptor_methods_.SetReverse();
1013 this->Op1::SetFinishInterceptionHookPoint(&interceptor_methods_);
1014 this->Op2::SetFinishInterceptionHookPoint(&interceptor_methods_);
1015 this->Op3::SetFinishInterceptionHookPoint(&interceptor_methods_);
1016 this->Op4::SetFinishInterceptionHookPoint(&interceptor_methods_);
1017 this->Op5::SetFinishInterceptionHookPoint(&interceptor_methods_);
1018 this->Op6::SetFinishInterceptionHookPoint(&interceptor_methods_);
1019 return interceptor_methods_.RunInterceptors();
1020 }
1021
1022 void* core_cq_tag_;
1023 void* return_tag_;
1024 Call call_;
1025 bool done_intercepting_ = false;
1026 InterceptorBatchMethodsImpl interceptor_methods_;
1027 bool saved_status_;
1028 };
1029
1030 } // namespace internal
1031 } // namespace grpc
1032
1033 #endif // GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
1034