• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/cpp/end2end/test_service_impl.h"
20 
21 #include <grpcpp/alarm.h>
22 #include <grpcpp/security/credentials.h>
23 #include <grpcpp/server_context.h>
24 #include <gtest/gtest.h>
25 
26 #include <string>
27 #include <thread>
28 
29 #include "absl/log/check.h"
30 #include "absl/log/log.h"
31 #include "src/core/util/crash.h"
32 #include "src/core/util/notification.h"
33 #include "src/proto/grpc/testing/echo.grpc.pb.h"
34 #include "test/cpp/util/string_ref_helper.h"
35 
36 using std::chrono::system_clock;
37 
38 namespace grpc {
39 namespace testing {
40 namespace internal {
41 
42 // When echo_deadline is requested, deadline seen in the ServerContext is set in
43 // the response in seconds.
MaybeEchoDeadline(ServerContextBase * context,const EchoRequest * request,EchoResponse * response)44 void MaybeEchoDeadline(ServerContextBase* context, const EchoRequest* request,
45                        EchoResponse* response) {
46   if (request->has_param() && request->param().echo_deadline()) {
47     gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
48     if (context->deadline() != system_clock::time_point::max()) {
49       Timepoint2Timespec(context->deadline(), &deadline);
50     }
51     response->mutable_param()->set_request_deadline(deadline.tv_sec);
52   }
53 }
54 
CheckServerAuthContext(const ServerContextBase * context,const std::string & expected_transport_security_type,const std::string & expected_client_identity)55 void CheckServerAuthContext(const ServerContextBase* context,
56                             const std::string& expected_transport_security_type,
57                             const std::string& expected_client_identity) {
58   std::shared_ptr<const AuthContext> auth_ctx = context->auth_context();
59   std::vector<grpc::string_ref> tst =
60       auth_ctx->FindPropertyValues("transport_security_type");
61   EXPECT_EQ(1u, tst.size());
62   EXPECT_EQ(expected_transport_security_type, ToString(tst[0]));
63   if (expected_client_identity.empty()) {
64     EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty());
65     EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty());
66     EXPECT_FALSE(auth_ctx->IsPeerAuthenticated());
67   } else {
68     auto identity = auth_ctx->GetPeerIdentity();
69     EXPECT_TRUE(auth_ctx->IsPeerAuthenticated());
70     EXPECT_EQ(1u, identity.size());
71     EXPECT_EQ(expected_client_identity, identity[0]);
72   }
73 }
74 
75 // Returns the number of pairs in metadata that exactly match the given
76 // key-value pair. Returns -1 if the pair wasn't found.
MetadataMatchCount(const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,const std::string & key,const std::string & value)77 int MetadataMatchCount(
78     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
79     const std::string& key, const std::string& value) {
80   int count = 0;
81   for (const auto& metadatum : metadata) {
82     if (ToString(metadatum.first) == key &&
83         ToString(metadatum.second) == value) {
84       count++;
85     }
86   }
87   return count;
88 }
89 
GetIntValueFromMetadataHelper(const char * key,const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,int default_value)90 int GetIntValueFromMetadataHelper(
91     const char* key,
92     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
93     int default_value) {
94   if (metadata.find(key) != metadata.end()) {
95     std::istringstream iss(ToString(metadata.find(key)->second));
96     iss >> default_value;
97     LOG(INFO) << key << " : " << default_value;
98   }
99 
100   return default_value;
101 }
102 
GetIntValueFromMetadata(const char * key,const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,int default_value)103 int GetIntValueFromMetadata(
104     const char* key,
105     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
106     int default_value) {
107   return GetIntValueFromMetadataHelper(key, metadata, default_value);
108 }
109 
ServerTryCancel(ServerContext * context)110 void ServerTryCancel(ServerContext* context) {
111   EXPECT_FALSE(context->IsCancelled());
112   context->TryCancel();
113   LOG(INFO) << "Server called TryCancel() to cancel the request";
114   // Now wait until it's really canceled
115   while (!context->IsCancelled()) {
116     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
117                                  gpr_time_from_micros(1000, GPR_TIMESPAN)));
118   }
119 }
120 
ServerTryCancelNonblocking(CallbackServerContext * context)121 void ServerTryCancelNonblocking(CallbackServerContext* context) {
122   EXPECT_FALSE(context->IsCancelled());
123   context->TryCancel();
124   LOG(INFO) << "Server called TryCancelNonblocking() to cancel the request";
125 }
126 
127 }  // namespace internal
128 
Echo(CallbackServerContext * context,const EchoRequest * request,EchoResponse * response)129 ServerUnaryReactor* CallbackTestServiceImpl::Echo(
130     CallbackServerContext* context, const EchoRequest* request,
131     EchoResponse* response) {
132   class Reactor : public grpc::ServerUnaryReactor {
133    public:
134     Reactor(CallbackTestServiceImpl* service, CallbackServerContext* ctx,
135             const EchoRequest* request, EchoResponse* response)
136         : service_(service), ctx_(ctx), req_(request), resp_(response) {
137       // It should be safe to call IsCancelled here, even though we don't know
138       // the result. Call it asynchronously to see if we trigger any data races.
139       // Join it in OnDone (technically that could be blocking but shouldn't be
140       // for very long).
141       async_cancel_check_ = std::thread([this] { (void)ctx_->IsCancelled(); });
142 
143       started_ = true;
144 
145       if (request->has_param() &&
146           request->param().server_notify_client_when_started()) {
147         service->signaller_.SignalClientThatRpcStarted();
148         // Block on the "wait to continue" decision in a different thread since
149         // we can't tie up an EM thread with blocking events. We can join it in
150         // OnDone since it would definitely be done by then.
151         rpc_wait_thread_ = std::thread([this] {
152           service_->signaller_.ServerWaitToContinue();
153           StartRpc();
154         });
155       } else {
156         StartRpc();
157       }
158     }
159 
160     void StartRpc() {
161       if (req_->has_param() && req_->param().server_sleep_us() > 0) {
162         // Set an alarm for that much time
163         alarm_.Set(
164             gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
165                          gpr_time_from_micros(req_->param().server_sleep_us() *
166                                                   grpc_test_slowdown_factor(),
167                                               GPR_TIMESPAN)),
168             [this](bool ok) { NonDelayed(ok); });
169         return;
170       }
171       NonDelayed(true);
172     }
173     void OnSendInitialMetadataDone(bool ok) override {
174       EXPECT_TRUE(ok);
175       initial_metadata_sent_ = true;
176     }
177     void OnCancel() override {
178       EXPECT_TRUE(started_);
179       EXPECT_TRUE(ctx_->IsCancelled());
180       on_cancel_invoked_ = true;
181       std::lock_guard<std::mutex> l(cancel_mu_);
182       cancel_cv_.notify_one();
183     }
184     void OnDone() override {
185       if (req_->has_param() && req_->param().echo_metadata_initially()) {
186         EXPECT_TRUE(initial_metadata_sent_);
187       }
188       EXPECT_EQ(ctx_->IsCancelled(), on_cancel_invoked_);
189       // Validate that finishing with a non-OK status doesn't cause cancellation
190       if (req_->has_param() && req_->param().has_expected_error()) {
191         EXPECT_FALSE(on_cancel_invoked_);
192       }
193       async_cancel_check_.join();
194       if (rpc_wait_thread_.joinable()) {
195         rpc_wait_thread_.join();
196       }
197       if (finish_when_cancelled_.joinable()) {
198         finish_when_cancelled_.join();
199       }
200       delete this;
201     }
202 
203    private:
204     void NonDelayed(bool ok) {
205       if (!ok) {
206         EXPECT_TRUE(ctx_->IsCancelled());
207         Finish(Status::CANCELLED);
208         return;
209       }
210       if (req_->has_param() && req_->param().server_die()) {
211         LOG(ERROR) << "The request should not reach application handler.";
212         CHECK(0);
213       }
214       if (req_->has_param() && req_->param().has_expected_error()) {
215         const auto& error = req_->param().expected_error();
216         Finish(Status(static_cast<StatusCode>(error.code()),
217                       error.error_message(), error.binary_error_details()));
218         return;
219       }
220       int server_try_cancel = internal::GetIntValueFromMetadata(
221           kServerTryCancelRequest, ctx_->client_metadata(), DO_NOT_CANCEL);
222       if (server_try_cancel != DO_NOT_CANCEL) {
223         // Since this is a unary RPC, by the time this server handler is called,
224         // the 'request' message is already read from the client. So the
225         // scenarios in server_try_cancel don't make much sense. Just cancel the
226         // RPC as long as server_try_cancel is not DO_NOT_CANCEL
227         EXPECT_FALSE(ctx_->IsCancelled());
228         ctx_->TryCancel();
229         LOG(INFO) << "Server called TryCancel() to cancel the request";
230         FinishWhenCancelledAsync();
231         return;
232       }
233       resp_->set_message(req_->message());
234       internal::MaybeEchoDeadline(ctx_, req_, resp_);
235       if (service_->host_) {
236         resp_->mutable_param()->set_host(*service_->host_);
237       } else if (req_->has_param() &&
238                  req_->param().echo_host_from_authority_header()) {
239         auto authority = ctx_->ExperimentalGetAuthority();
240         std::string authority_str(authority.data(), authority.size());
241         resp_->mutable_param()->set_host(std::move(authority_str));
242       }
243       if (req_->has_param() && req_->param().client_cancel_after_us()) {
244         {
245           std::unique_lock<std::mutex> lock(service_->mu_);
246           service_->signal_client_ = true;
247         }
248         FinishWhenCancelledAsync();
249         return;
250       } else if (req_->has_param() && req_->param().server_cancel_after_us()) {
251         alarm_.Set(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
252                                 gpr_time_from_micros(
253                                     req_->param().server_cancel_after_us() *
254                                         grpc_test_slowdown_factor(),
255                                     GPR_TIMESPAN)),
256                    [this](bool) { Finish(Status::CANCELLED); });
257         return;
258       } else if (!req_->has_param() || !req_->param().skip_cancelled_check()) {
259         EXPECT_FALSE(ctx_->IsCancelled());
260       }
261 
262       if (req_->has_param() && req_->param().echo_metadata_initially()) {
263         const std::multimap<grpc::string_ref, grpc::string_ref>&
264             client_metadata = ctx_->client_metadata();
265         for (const auto& metadatum : client_metadata) {
266           ctx_->AddInitialMetadata(ToString(metadatum.first),
267                                    ToString(metadatum.second));
268         }
269         StartSendInitialMetadata();
270       }
271 
272       if (req_->has_param() && req_->param().echo_metadata()) {
273         const std::multimap<grpc::string_ref, grpc::string_ref>&
274             client_metadata = ctx_->client_metadata();
275         for (const auto& metadatum : client_metadata) {
276           ctx_->AddTrailingMetadata(ToString(metadatum.first),
277                                     ToString(metadatum.second));
278         }
279         // Terminate rpc with error and debug info in trailer.
280         if (req_->param().debug_info().stack_entries_size() ||
281             !req_->param().debug_info().detail().empty()) {
282           std::string serialized_debug_info =
283               req_->param().debug_info().SerializeAsString();
284           ctx_->AddTrailingMetadata(kDebugInfoTrailerKey,
285                                     serialized_debug_info);
286           Finish(Status::CANCELLED);
287           return;
288         }
289       }
290       if (req_->has_param() &&
291           (!req_->param().expected_client_identity().empty() ||
292            req_->param().check_auth_context())) {
293         internal::CheckServerAuthContext(
294             ctx_, req_->param().expected_transport_security_type(),
295             req_->param().expected_client_identity());
296       }
297       if (req_->has_param() && req_->param().response_message_length() > 0) {
298         resp_->set_message(
299             std::string(req_->param().response_message_length(), '\0'));
300       }
301       if (req_->has_param() && req_->param().echo_peer()) {
302         resp_->mutable_param()->set_peer(ctx_->peer());
303       }
304       Finish(Status::OK);
305     }
306     void FinishWhenCancelledAsync() {
307       finish_when_cancelled_ = std::thread([this] {
308         std::unique_lock<std::mutex> l(cancel_mu_);
309         cancel_cv_.wait(l, [this] { return ctx_->IsCancelled(); });
310         Finish(Status::CANCELLED);
311       });
312     }
313 
314     CallbackTestServiceImpl* const service_;
315     CallbackServerContext* const ctx_;
316     const EchoRequest* const req_;
317     EchoResponse* const resp_;
318     Alarm alarm_;
319     std::mutex cancel_mu_;
320     std::condition_variable cancel_cv_;
321     bool initial_metadata_sent_ = false;
322     bool started_ = false;
323     bool on_cancel_invoked_ = false;
324     std::thread async_cancel_check_;
325     std::thread rpc_wait_thread_;
326     std::thread finish_when_cancelled_;
327   };
328 
329   return new Reactor(this, context, request, response);
330 }
331 
CheckClientInitialMetadata(CallbackServerContext * context,const SimpleRequest *,SimpleResponse *)332 ServerUnaryReactor* CallbackTestServiceImpl::CheckClientInitialMetadata(
333     CallbackServerContext* context, const SimpleRequest*, SimpleResponse*) {
334   class Reactor : public grpc::ServerUnaryReactor {
335    public:
336     explicit Reactor(CallbackServerContext* ctx) {
337       EXPECT_EQ(internal::MetadataMatchCount(ctx->client_metadata(),
338                                              kCheckClientInitialMetadataKey,
339                                              kCheckClientInitialMetadataVal),
340                 1);
341       EXPECT_EQ(ctx->client_metadata().count(kCheckClientInitialMetadataKey),
342                 1u);
343       Finish(Status::OK);
344     }
345     void OnDone() override { delete this; }
346   };
347 
348   return new Reactor(context);
349 }
350 
RequestStream(CallbackServerContext * context,EchoResponse * response)351 ServerReadReactor<EchoRequest>* CallbackTestServiceImpl::RequestStream(
352     CallbackServerContext* context, EchoResponse* response) {
353   // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
354   // the server by calling ServerContext::TryCancel() depending on the
355   // value:
356   //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
357   //   reads any message from the client CANCEL_DURING_PROCESSING: The RPC
358   //   is cancelled while the server is reading messages from the client
359   //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
360   //   all the messages from the client
361   int server_try_cancel = internal::GetIntValueFromMetadata(
362       kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
363   if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
364     internal::ServerTryCancelNonblocking(context);
365     // Don't need to provide a reactor since the RPC is canceled
366     return nullptr;
367   }
368 
369   class Reactor : public grpc::ServerReadReactor<EchoRequest> {
370    public:
371     Reactor(CallbackServerContext* ctx, EchoResponse* response,
372             int server_try_cancel)
373         : ctx_(ctx),
374           response_(response),
375           server_try_cancel_(server_try_cancel) {
376       EXPECT_NE(server_try_cancel, CANCEL_BEFORE_PROCESSING);
377       response->set_message("");
378 
379       if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
380         ctx->TryCancel();
381         // Don't wait for it here
382       }
383       StartRead(&request_);
384       setup_done_ = true;
385     }
386     void OnDone() override { delete this; }
387     void OnCancel() override {
388       EXPECT_TRUE(setup_done_);
389       EXPECT_TRUE(ctx_->IsCancelled());
390       FinishOnce(Status::CANCELLED);
391     }
392     void OnReadDone(bool ok) override {
393       if (ok) {
394         response_->mutable_message()->append(request_.message());
395         num_msgs_read_++;
396         StartRead(&request_);
397       } else {
398         LOG(INFO) << "Read: " << num_msgs_read_ << " messages";
399 
400         if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
401           // Let OnCancel recover this
402           return;
403         }
404         if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
405           internal::ServerTryCancelNonblocking(ctx_);
406           return;
407         }
408         FinishOnce(Status::OK);
409       }
410     }
411 
412    private:
413     void FinishOnce(const Status& s) {
414       std::lock_guard<std::mutex> l(finish_mu_);
415       if (!finished_) {
416         Finish(s);
417         finished_ = true;
418       }
419     }
420 
421     CallbackServerContext* const ctx_;
422     EchoResponse* const response_;
423     EchoRequest request_;
424     int num_msgs_read_{0};
425     int server_try_cancel_;
426     std::mutex finish_mu_;
427     bool finished_{false};
428     bool setup_done_{false};
429   };
430 
431   return new Reactor(context, response, server_try_cancel);
432 }
433 
434 // Return 'kNumResponseStreamMsgs' messages.
435 // TODO(yangg) make it generic by adding a parameter into EchoRequest
ResponseStream(CallbackServerContext * context,const EchoRequest * request)436 ServerWriteReactor<EchoResponse>* CallbackTestServiceImpl::ResponseStream(
437     CallbackServerContext* context, const EchoRequest* request) {
438   // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
439   // the server by calling ServerContext::TryCancel() depending on the
440   // value:
441   //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
442   //   reads any message from the client CANCEL_DURING_PROCESSING: The RPC
443   //   is cancelled while the server is reading messages from the client
444   //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
445   //   all the messages from the client
446   int server_try_cancel = internal::GetIntValueFromMetadata(
447       kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
448   if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
449     internal::ServerTryCancelNonblocking(context);
450   }
451 
452   class Reactor : public grpc::ServerWriteReactor<EchoResponse> {
453    public:
454     Reactor(CallbackServerContext* ctx, const EchoRequest* request,
455             int server_try_cancel)
456         : ctx_(ctx), request_(request), server_try_cancel_(server_try_cancel) {
457       server_coalescing_api_ = internal::GetIntValueFromMetadata(
458           kServerUseCoalescingApi, ctx->client_metadata(), 0);
459       server_responses_to_send_ = internal::GetIntValueFromMetadata(
460           kServerResponseStreamsToSend, ctx->client_metadata(),
461           kServerDefaultResponseStreamsToSend);
462       if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
463         ctx->TryCancel();
464       }
465       if (server_try_cancel_ != CANCEL_BEFORE_PROCESSING) {
466         if (num_msgs_sent_ < server_responses_to_send_) {
467           NextWrite();
468         }
469       }
470       setup_done_ = true;
471     }
472     void OnDone() override { delete this; }
473     void OnCancel() override {
474       EXPECT_TRUE(setup_done_);
475       EXPECT_TRUE(ctx_->IsCancelled());
476       FinishOnce(Status::CANCELLED);
477     }
478     void OnWriteDone(bool /*ok*/) override {
479       if (num_msgs_sent_ < server_responses_to_send_) {
480         NextWrite();
481       } else if (server_coalescing_api_ != 0) {
482         // We would have already done Finish just after the WriteLast
483       } else if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
484         // Let OnCancel recover this
485       } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
486         internal::ServerTryCancelNonblocking(ctx_);
487       } else {
488         FinishOnce(Status::OK);
489       }
490     }
491 
492    private:
493     void FinishOnce(const Status& s) {
494       std::lock_guard<std::mutex> l(finish_mu_);
495       if (!finished_) {
496         Finish(s);
497         finished_ = true;
498       }
499     }
500 
501     void NextWrite() {
502       response_.set_message(request_->message() +
503                             std::to_string(num_msgs_sent_));
504       if (num_msgs_sent_ == server_responses_to_send_ - 1 &&
505           server_coalescing_api_ != 0) {
506         {
507           std::lock_guard<std::mutex> l(finish_mu_);
508           if (!finished_) {
509             num_msgs_sent_++;
510             StartWriteLast(&response_, WriteOptions());
511           }
512         }
513         // If we use WriteLast, we shouldn't wait before attempting Finish
514         FinishOnce(Status::OK);
515       } else {
516         std::lock_guard<std::mutex> l(finish_mu_);
517         if (!finished_) {
518           num_msgs_sent_++;
519           StartWrite(&response_);
520         }
521       }
522     }
523     CallbackServerContext* const ctx_;
524     const EchoRequest* const request_;
525     EchoResponse response_;
526     int num_msgs_sent_{0};
527     int server_try_cancel_;
528     int server_coalescing_api_;
529     int server_responses_to_send_;
530     std::mutex finish_mu_;
531     bool finished_{false};
532     bool setup_done_{false};
533   };
534   return new Reactor(context, request, server_try_cancel);
535 }
536 
537 ServerBidiReactor<EchoRequest, EchoResponse>*
BidiStream(CallbackServerContext * context)538 CallbackTestServiceImpl::BidiStream(CallbackServerContext* context) {
539   class Reactor : public grpc::ServerBidiReactor<EchoRequest, EchoResponse> {
540    public:
541     explicit Reactor(CallbackServerContext* ctx) : ctx_(ctx) {
542       // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
543       // the server by calling ServerContext::TryCancel() depending on the
544       // value:
545       //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
546       //   reads any message from the client CANCEL_DURING_PROCESSING: The RPC
547       //   is cancelled while the server is reading messages from the client
548       //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
549       //   all the messages from the client
550       server_try_cancel_ = internal::GetIntValueFromMetadata(
551           kServerTryCancelRequest, ctx->client_metadata(), DO_NOT_CANCEL);
552       server_write_last_ = internal::GetIntValueFromMetadata(
553           kServerFinishAfterNReads, ctx->client_metadata(), 0);
554       client_try_cancel_ = static_cast<bool>(internal::GetIntValueFromMetadata(
555           kClientTryCancelRequest, ctx->client_metadata(), 0));
556       if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) {
557         internal::ServerTryCancelNonblocking(ctx);
558       } else {
559         if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
560           ctx->TryCancel();
561         }
562         StartRead(&request_);
563       }
564       setup_done_ = true;
565     }
566     void OnDone() override {
567       {
568         // Use the same lock as finish to make sure that OnDone isn't inlined.
569         std::lock_guard<std::mutex> l(finish_mu_);
570         EXPECT_TRUE(finished_);
571         finish_thread_.join();
572       }
573       delete this;
574     }
575     void OnCancel() override {
576       cancel_notification_.Notify();
577       EXPECT_TRUE(setup_done_);
578       EXPECT_TRUE(ctx_->IsCancelled());
579       FinishOnce(Status::CANCELLED);
580     }
581     void OnReadDone(bool ok) override {
582       if (ok) {
583         num_msgs_read_++;
584         response_.set_message(request_.message());
585         std::lock_guard<std::mutex> l(finish_mu_);
586         if (!finished_) {
587           if (num_msgs_read_ == server_write_last_) {
588             StartWriteLast(&response_, WriteOptions());
589             // If we use WriteLast, we shouldn't wait before attempting Finish
590           } else {
591             StartWrite(&response_);
592             return;
593           }
594         }
595       } else if (client_try_cancel_) {
596         cancel_notification_.WaitForNotificationWithTimeout(absl::Seconds(10));
597         EXPECT_TRUE(ctx_->IsCancelled());
598       }
599 
600       if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
601         // Let OnCancel handle this
602       } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
603         internal::ServerTryCancelNonblocking(ctx_);
604       } else {
605         FinishOnce(Status::OK);
606       }
607     }
608     void OnWriteDone(bool /*ok*/) override {
609       std::lock_guard<std::mutex> l(finish_mu_);
610       if (!finished_) {
611         StartRead(&request_);
612       }
613     }
614 
615    private:
616     void FinishOnce(const Status& s) {
617       std::lock_guard<std::mutex> l(finish_mu_);
618       if (!finished_) {
619         finished_ = true;
620         // Finish asynchronously to make sure that there are no deadlocks.
621         finish_thread_ = std::thread([this, s] {
622           std::lock_guard<std::mutex> l(finish_mu_);
623           Finish(s);
624         });
625       }
626     }
627 
628     CallbackServerContext* const ctx_;
629     EchoRequest request_;
630     EchoResponse response_;
631     int num_msgs_read_{0};
632     int server_try_cancel_;
633     int server_write_last_;
634     std::mutex finish_mu_;
635     bool finished_{false};
636     bool setup_done_{false};
637     std::thread finish_thread_;
638     bool client_try_cancel_ = false;
639     grpc_core::Notification cancel_notification_;
640   };
641 
642   return new Reactor(context);
643 }
644 
645 }  // namespace testing
646 }  // namespace grpc
647