1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H 20 #define GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H 21 22 #include <assert.h> 23 #include <grpcpp/impl/codegen/call.h> 24 #include <grpcpp/impl/codegen/channel_interface.h> 25 #include <grpcpp/impl/codegen/client_context.h> 26 #include <grpcpp/impl/codegen/server_context.h> 27 #include <grpcpp/impl/codegen/service_type.h> 28 #include <grpcpp/impl/codegen/status.h> 29 30 namespace grpc { 31 32 class CompletionQueue; 33 extern CoreCodegenInterface* g_core_codegen_interface; 34 35 /// An interface relevant for async client side unary RPCs (which send 36 /// one request message to a server and receive one response message). 37 template <class R> 38 class ClientAsyncResponseReaderInterface { 39 public: ~ClientAsyncResponseReaderInterface()40 virtual ~ClientAsyncResponseReaderInterface() {} 41 42 /// Start the call that was set up by the constructor, but only if the 43 /// constructor was invoked through the "Prepare" API which doesn't actually 44 /// start the call 45 virtual void StartCall() = 0; 46 47 /// Request notification of the reading of initial metadata. Completion 48 /// will be notified by \a tag on the associated completion queue. 49 /// This call is optional, but if it is used, it cannot be used concurrently 50 /// with or after the \a Finish method. 51 /// 52 /// \param[in] tag Tag identifying this request. 53 virtual void ReadInitialMetadata(void* tag) = 0; 54 55 /// Request to receive the server's response \a msg and final \a status for 56 /// the call, and to notify \a tag on this call's completion queue when 57 /// finished. 58 /// 59 /// This function will return when either: 60 /// - when the server's response message and status have been received. 61 /// - when the server has returned a non-OK status (no message expected in 62 /// this case). 63 /// - when the call failed for some reason and the library generated a 64 /// non-OK status. 65 /// 66 /// \param[in] tag Tag identifying this request. 67 /// \param[out] status To be updated with the operation status. 68 /// \param[out] msg To be filled in with the server's response message. 69 virtual void Finish(R* msg, Status* status, void* tag) = 0; 70 }; 71 72 namespace internal { 73 template <class R> 74 class ClientAsyncResponseReaderFactory { 75 public: 76 /// Start a call and write the request out if \a start is set. 77 /// \a tag will be notified on \a cq when the call has been started (i.e. 78 /// intitial metadata sent) and \a request has been written out. 79 /// If \a start is not set, the actual call must be initiated by StartCall 80 /// Note that \a context will be used to fill in custom initial metadata 81 /// used to send to the server when starting the call. 82 template <class W> Create(ChannelInterface * channel,CompletionQueue * cq,const::grpc::internal::RpcMethod & method,ClientContext * context,const W & request,bool start)83 static ClientAsyncResponseReader<R>* Create( 84 ChannelInterface* channel, CompletionQueue* cq, 85 const ::grpc::internal::RpcMethod& method, ClientContext* context, 86 const W& request, bool start) { 87 ::grpc::internal::Call call = channel->CreateCall(method, context, cq); 88 return new (g_core_codegen_interface->grpc_call_arena_alloc( 89 call.call(), sizeof(ClientAsyncResponseReader<R>))) 90 ClientAsyncResponseReader<R>(call, context, request, start); 91 } 92 }; 93 } // namespace internal 94 95 /// Async API for client-side unary RPCs, where the message response 96 /// received from the server is of type \a R. 97 template <class R> 98 class ClientAsyncResponseReader final 99 : public ClientAsyncResponseReaderInterface<R> { 100 public: 101 // always allocated against a call arena, no memory free required delete(void * ptr,std::size_t size)102 static void operator delete(void* ptr, std::size_t size) { 103 assert(size == sizeof(ClientAsyncResponseReader)); 104 } 105 106 // This operator should never be called as the memory should be freed as part 107 // of the arena destruction. It only exists to provide a matching operator 108 // delete to the operator new so that some compilers will not complain (see 109 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 110 // there are no tests catching the compiler warning. delete(void *,void *)111 static void operator delete(void*, void*) { assert(0); } 112 StartCall()113 void StartCall() override { 114 assert(!started_); 115 started_ = true; 116 StartCallInternal(); 117 } 118 119 /// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for 120 /// semantics. 121 /// 122 /// Side effect: 123 /// - the \a ClientContext associated with this call is updated with 124 /// possible initial and trailing metadata sent from the server. ReadInitialMetadata(void * tag)125 void ReadInitialMetadata(void* tag) override { 126 assert(started_); 127 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 128 129 single_buf.set_output_tag(tag); 130 single_buf.RecvInitialMetadata(context_); 131 call_.PerformOps(&single_buf); 132 initial_metadata_read_ = true; 133 } 134 135 /// See \a ClientAysncResponseReaderInterface::Finish for semantics. 136 /// 137 /// Side effect: 138 /// - the \a ClientContext associated with this call is updated with 139 /// possible initial and trailing metadata sent from the server. Finish(R * msg,Status * status,void * tag)140 void Finish(R* msg, Status* status, void* tag) override { 141 assert(started_); 142 if (initial_metadata_read_) { 143 finish_buf.set_output_tag(tag); 144 finish_buf.RecvMessage(msg); 145 finish_buf.AllowNoMessage(); 146 finish_buf.ClientRecvStatus(context_, status); 147 call_.PerformOps(&finish_buf); 148 } else { 149 single_buf.set_output_tag(tag); 150 single_buf.RecvInitialMetadata(context_); 151 single_buf.RecvMessage(msg); 152 single_buf.AllowNoMessage(); 153 single_buf.ClientRecvStatus(context_, status); 154 call_.PerformOps(&single_buf); 155 } 156 } 157 158 private: 159 friend class internal::ClientAsyncResponseReaderFactory<R>; 160 ClientContext* const context_; 161 ::grpc::internal::Call call_; 162 bool started_; 163 bool initial_metadata_read_ = false; 164 165 template <class W> ClientAsyncResponseReader(::grpc::internal::Call call,ClientContext * context,const W & request,bool start)166 ClientAsyncResponseReader(::grpc::internal::Call call, ClientContext* context, 167 const W& request, bool start) 168 : context_(context), call_(call), started_(start) { 169 // Bind the metadata at time of StartCallInternal but set up the rest here 170 // TODO(ctiller): don't assert 171 GPR_CODEGEN_ASSERT(single_buf.SendMessage(request).ok()); 172 single_buf.ClientSendClose(); 173 if (start) StartCallInternal(); 174 } 175 StartCallInternal()176 void StartCallInternal() { 177 single_buf.SendInitialMetadata(context_->send_initial_metadata_, 178 context_->initial_metadata_flags()); 179 } 180 181 // disable operator new 182 static void* operator new(std::size_t size); new(std::size_t size,void * p)183 static void* operator new(std::size_t size, void* p) { return p; } 184 185 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 186 ::grpc::internal::CallOpSendMessage, 187 ::grpc::internal::CallOpClientSendClose, 188 ::grpc::internal::CallOpRecvInitialMetadata, 189 ::grpc::internal::CallOpRecvMessage<R>, 190 ::grpc::internal::CallOpClientRecvStatus> 191 single_buf; 192 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>, 193 ::grpc::internal::CallOpClientRecvStatus> 194 finish_buf; 195 }; 196 197 /// Async server-side API for handling unary calls, where the single 198 /// response message sent to the client is of type \a W. 199 template <class W> 200 class ServerAsyncResponseWriter final 201 : public internal::ServerAsyncStreamingInterface { 202 public: ServerAsyncResponseWriter(ServerContext * ctx)203 explicit ServerAsyncResponseWriter(ServerContext* ctx) 204 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 205 206 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 207 /// 208 /// Side effect: 209 /// The initial metadata that will be sent to the client from this op will 210 /// be taken from the \a ServerContext associated with the call. 211 /// 212 /// \param[in] tag Tag identifying this request. SendInitialMetadata(void * tag)213 void SendInitialMetadata(void* tag) override { 214 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 215 216 meta_buf_.set_output_tag(tag); 217 meta_buf_.SendInitialMetadata(ctx_->initial_metadata_, 218 ctx_->initial_metadata_flags()); 219 if (ctx_->compression_level_set()) { 220 meta_buf_.set_compression_level(ctx_->compression_level()); 221 } 222 ctx_->sent_initial_metadata_ = true; 223 call_.PerformOps(&meta_buf_); 224 } 225 226 /// Indicate that the stream is to be finished and request notification 227 /// when the server has sent the appropriate signals to the client to 228 /// end the call. Should not be used concurrently with other operations. 229 /// 230 /// \param[in] tag Tag identifying this request. 231 /// \param[in] status To be sent to the client as the result of the call. 232 /// \param[in] msg Message to be sent to the client. 233 /// 234 /// Side effect: 235 /// - also sends initial metadata if not already sent (using the 236 /// \a ServerContext associated with this call). 237 /// 238 /// Note: if \a status has a non-OK code, then \a msg will not be sent, 239 /// and the client will receive only the status with possible trailing 240 /// metadata. Finish(const W & msg,const Status & status,void * tag)241 void Finish(const W& msg, const Status& status, void* tag) { 242 finish_buf_.set_output_tag(tag); 243 if (!ctx_->sent_initial_metadata_) { 244 finish_buf_.SendInitialMetadata(ctx_->initial_metadata_, 245 ctx_->initial_metadata_flags()); 246 if (ctx_->compression_level_set()) { 247 finish_buf_.set_compression_level(ctx_->compression_level()); 248 } 249 ctx_->sent_initial_metadata_ = true; 250 } 251 // The response is dropped if the status is not OK. 252 if (status.ok()) { 253 finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, 254 finish_buf_.SendMessage(msg)); 255 } else { 256 finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); 257 } 258 call_.PerformOps(&finish_buf_); 259 } 260 261 /// Indicate that the stream is to be finished with a non-OK status, 262 /// and request notification for when the server has finished sending the 263 /// appropriate signals to the client to end the call. 264 /// Should not be used concurrently with other operations. 265 /// 266 /// \param[in] tag Tag identifying this request. 267 /// \param[in] status To be sent to the client as the result of the call. 268 /// - Note: \a status must have a non-OK code. 269 /// 270 /// Side effect: 271 /// - also sends initial metadata if not already sent (using the 272 /// \a ServerContext associated with this call). FinishWithError(const Status & status,void * tag)273 void FinishWithError(const Status& status, void* tag) { 274 GPR_CODEGEN_ASSERT(!status.ok()); 275 finish_buf_.set_output_tag(tag); 276 if (!ctx_->sent_initial_metadata_) { 277 finish_buf_.SendInitialMetadata(ctx_->initial_metadata_, 278 ctx_->initial_metadata_flags()); 279 if (ctx_->compression_level_set()) { 280 finish_buf_.set_compression_level(ctx_->compression_level()); 281 } 282 ctx_->sent_initial_metadata_ = true; 283 } 284 finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); 285 call_.PerformOps(&finish_buf_); 286 } 287 288 private: BindCall(::grpc::internal::Call * call)289 void BindCall(::grpc::internal::Call* call) override { call_ = *call; } 290 291 ::grpc::internal::Call call_; 292 ServerContext* ctx_; 293 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 294 meta_buf_; 295 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 296 ::grpc::internal::CallOpSendMessage, 297 ::grpc::internal::CallOpServerSendStatus> 298 finish_buf_; 299 }; 300 301 } // namespace grpc 302 303 namespace std { 304 template <class R> 305 class default_delete<grpc::ClientAsyncResponseReader<R>> { 306 public: operator()307 void operator()(void* p) {} 308 }; 309 template <class R> 310 class default_delete<grpc::ClientAsyncResponseReaderInterface<R>> { 311 public: operator()312 void operator()(void* p) {} 313 }; 314 } // namespace std 315 316 #endif // GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H 317