• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "test/cpp/end2end/test_service_impl.h"
20 
21 #include <grpc/support/log.h>
22 #include <grpcpp/alarm.h>
23 #include <grpcpp/security/credentials.h>
24 #include <grpcpp/server_context.h>
25 #include <gtest/gtest.h>
26 
27 #include <string>
28 #include <thread>
29 
30 #include "src/proto/grpc/testing/echo.grpc.pb.h"
31 #include "test/cpp/util/string_ref_helper.h"
32 
33 using std::chrono::system_clock;
34 
35 namespace grpc {
36 namespace testing {
37 namespace internal {
38 
39 // When echo_deadline is requested, deadline seen in the ServerContext is set in
40 // the response in seconds.
MaybeEchoDeadline(experimental::ServerContextBase * context,const EchoRequest * request,EchoResponse * response)41 void MaybeEchoDeadline(experimental::ServerContextBase* context,
42                        const EchoRequest* request, EchoResponse* response) {
43   if (request->has_param() && request->param().echo_deadline()) {
44     gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
45     if (context->deadline() != system_clock::time_point::max()) {
46       Timepoint2Timespec(context->deadline(), &deadline);
47     }
48     response->mutable_param()->set_request_deadline(deadline.tv_sec);
49   }
50 }
51 
CheckServerAuthContext(const experimental::ServerContextBase * context,const std::string & expected_transport_security_type,const std::string & expected_client_identity)52 void CheckServerAuthContext(const experimental::ServerContextBase* context,
53                             const std::string& expected_transport_security_type,
54                             const std::string& expected_client_identity) {
55   std::shared_ptr<const AuthContext> auth_ctx = context->auth_context();
56   std::vector<grpc::string_ref> tst =
57       auth_ctx->FindPropertyValues("transport_security_type");
58   EXPECT_EQ(1u, tst.size());
59   EXPECT_EQ(expected_transport_security_type, ToString(tst[0]));
60   if (expected_client_identity.empty()) {
61     EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty());
62     EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty());
63     EXPECT_FALSE(auth_ctx->IsPeerAuthenticated());
64   } else {
65     auto identity = auth_ctx->GetPeerIdentity();
66     EXPECT_TRUE(auth_ctx->IsPeerAuthenticated());
67     EXPECT_EQ(1u, identity.size());
68     EXPECT_EQ(expected_client_identity, identity[0]);
69   }
70 }
71 
72 // Returns the number of pairs in metadata that exactly match the given
73 // key-value pair. Returns -1 if the pair wasn't found.
MetadataMatchCount(const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,const std::string & key,const std::string & value)74 int MetadataMatchCount(
75     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
76     const std::string& key, const std::string& value) {
77   int count = 0;
78   for (const auto& metadatum : metadata) {
79     if (ToString(metadatum.first) == key &&
80         ToString(metadatum.second) == value) {
81       count++;
82     }
83   }
84   return count;
85 }
86 
GetIntValueFromMetadataHelper(const char * key,const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,int default_value)87 int GetIntValueFromMetadataHelper(
88     const char* key,
89     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
90     int default_value) {
91   if (metadata.find(key) != metadata.end()) {
92     std::istringstream iss(ToString(metadata.find(key)->second));
93     iss >> default_value;
94     gpr_log(GPR_INFO, "%s : %d", key, default_value);
95   }
96 
97   return default_value;
98 }
99 
GetIntValueFromMetadata(const char * key,const std::multimap<grpc::string_ref,grpc::string_ref> & metadata,int default_value)100 int GetIntValueFromMetadata(
101     const char* key,
102     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
103     int default_value) {
104   return GetIntValueFromMetadataHelper(key, metadata, default_value);
105 }
106 
ServerTryCancel(ServerContext * context)107 void ServerTryCancel(ServerContext* context) {
108   EXPECT_FALSE(context->IsCancelled());
109   context->TryCancel();
110   gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
111   // Now wait until it's really canceled
112   while (!context->IsCancelled()) {
113     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
114                                  gpr_time_from_micros(1000, GPR_TIMESPAN)));
115   }
116 }
117 
ServerTryCancelNonblocking(experimental::CallbackServerContext * context)118 void ServerTryCancelNonblocking(experimental::CallbackServerContext* context) {
119   EXPECT_FALSE(context->IsCancelled());
120   context->TryCancel();
121   gpr_log(GPR_INFO,
122           "Server called TryCancelNonblocking() to cancel the request");
123 }
124 
125 }  // namespace internal
126 
Echo(experimental::CallbackServerContext * context,const EchoRequest * request,EchoResponse * response)127 experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
128     experimental::CallbackServerContext* context, const EchoRequest* request,
129     EchoResponse* response) {
130   class Reactor : public ::grpc::experimental::ServerUnaryReactor {
131    public:
132     Reactor(CallbackTestServiceImpl* service,
133             experimental::CallbackServerContext* ctx,
134             const EchoRequest* request, EchoResponse* response)
135         : service_(service), ctx_(ctx), req_(request), resp_(response) {
136       // It should be safe to call IsCancelled here, even though we don't know
137       // the result. Call it asynchronously to see if we trigger any data races.
138       // Join it in OnDone (technically that could be blocking but shouldn't be
139       // for very long).
140       async_cancel_check_ = std::thread([this] { (void)ctx_->IsCancelled(); });
141 
142       started_ = true;
143 
144       if (request->has_param() &&
145           request->param().server_notify_client_when_started()) {
146         service->signaller_.SignalClientThatRpcStarted();
147         // Block on the "wait to continue" decision in a different thread since
148         // we can't tie up an EM thread with blocking events. We can join it in
149         // OnDone since it would definitely be done by then.
150         rpc_wait_thread_ = std::thread([this] {
151           service_->signaller_.ServerWaitToContinue();
152           StartRpc();
153         });
154       } else {
155         StartRpc();
156       }
157     }
158 
159     void StartRpc() {
160       if (req_->has_param() && req_->param().server_sleep_us() > 0) {
161         // Set an alarm for that much time
162         alarm_.experimental().Set(
163             gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
164                          gpr_time_from_micros(req_->param().server_sleep_us(),
165                                               GPR_TIMESPAN)),
166             [this](bool ok) { NonDelayed(ok); });
167         return;
168       }
169       NonDelayed(true);
170     }
171     void OnSendInitialMetadataDone(bool ok) override {
172       EXPECT_TRUE(ok);
173       initial_metadata_sent_ = true;
174     }
175     void OnCancel() override {
176       EXPECT_TRUE(started_);
177       EXPECT_TRUE(ctx_->IsCancelled());
178       on_cancel_invoked_ = true;
179       std::lock_guard<std::mutex> l(cancel_mu_);
180       cancel_cv_.notify_one();
181     }
182     void OnDone() override {
183       if (req_->has_param() && req_->param().echo_metadata_initially()) {
184         EXPECT_TRUE(initial_metadata_sent_);
185       }
186       EXPECT_EQ(ctx_->IsCancelled(), on_cancel_invoked_);
187       // Validate that finishing with a non-OK status doesn't cause cancellation
188       if (req_->has_param() && req_->param().has_expected_error()) {
189         EXPECT_FALSE(on_cancel_invoked_);
190       }
191       async_cancel_check_.join();
192       if (rpc_wait_thread_.joinable()) {
193         rpc_wait_thread_.join();
194       }
195       if (finish_when_cancelled_.joinable()) {
196         finish_when_cancelled_.join();
197       }
198       delete this;
199     }
200 
201    private:
202     void NonDelayed(bool ok) {
203       if (!ok) {
204         EXPECT_TRUE(ctx_->IsCancelled());
205         Finish(Status::CANCELLED);
206         return;
207       }
208       if (req_->has_param() && req_->param().server_die()) {
209         gpr_log(GPR_ERROR, "The request should not reach application handler.");
210         GPR_ASSERT(0);
211       }
212       if (req_->has_param() && req_->param().has_expected_error()) {
213         const auto& error = req_->param().expected_error();
214         Finish(Status(static_cast<StatusCode>(error.code()),
215                       error.error_message(), error.binary_error_details()));
216         return;
217       }
218       int server_try_cancel = internal::GetIntValueFromMetadata(
219           kServerTryCancelRequest, ctx_->client_metadata(), DO_NOT_CANCEL);
220       if (server_try_cancel != DO_NOT_CANCEL) {
221         // Since this is a unary RPC, by the time this server handler is called,
222         // the 'request' message is already read from the client. So the
223         // scenarios in server_try_cancel don't make much sense. Just cancel the
224         // RPC as long as server_try_cancel is not DO_NOT_CANCEL
225         EXPECT_FALSE(ctx_->IsCancelled());
226         ctx_->TryCancel();
227         gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
228         FinishWhenCancelledAsync();
229         return;
230       }
231       resp_->set_message(req_->message());
232       internal::MaybeEchoDeadline(ctx_, req_, resp_);
233       if (service_->host_) {
234         resp_->mutable_param()->set_host(*service_->host_);
235       }
236       if (req_->has_param() && req_->param().client_cancel_after_us()) {
237         {
238           std::unique_lock<std::mutex> lock(service_->mu_);
239           service_->signal_client_ = true;
240         }
241         FinishWhenCancelledAsync();
242         return;
243       } else if (req_->has_param() && req_->param().server_cancel_after_us()) {
244         alarm_.experimental().Set(
245             gpr_time_add(
246                 gpr_now(GPR_CLOCK_REALTIME),
247                 gpr_time_from_micros(req_->param().server_cancel_after_us(),
248                                      GPR_TIMESPAN)),
249             [this](bool) { Finish(Status::CANCELLED); });
250         return;
251       } else if (!req_->has_param() || !req_->param().skip_cancelled_check()) {
252         EXPECT_FALSE(ctx_->IsCancelled());
253       }
254 
255       if (req_->has_param() && req_->param().echo_metadata_initially()) {
256         const std::multimap<grpc::string_ref, grpc::string_ref>&
257             client_metadata = ctx_->client_metadata();
258         for (const auto& metadatum : client_metadata) {
259           ctx_->AddInitialMetadata(ToString(metadatum.first),
260                                    ToString(metadatum.second));
261         }
262         StartSendInitialMetadata();
263       }
264 
265       if (req_->has_param() && req_->param().echo_metadata()) {
266         const std::multimap<grpc::string_ref, grpc::string_ref>&
267             client_metadata = ctx_->client_metadata();
268         for (const auto& metadatum : client_metadata) {
269           ctx_->AddTrailingMetadata(ToString(metadatum.first),
270                                     ToString(metadatum.second));
271         }
272         // Terminate rpc with error and debug info in trailer.
273         if (req_->param().debug_info().stack_entries_size() ||
274             !req_->param().debug_info().detail().empty()) {
275           std::string serialized_debug_info =
276               req_->param().debug_info().SerializeAsString();
277           ctx_->AddTrailingMetadata(kDebugInfoTrailerKey,
278                                     serialized_debug_info);
279           Finish(Status::CANCELLED);
280           return;
281         }
282       }
283       if (req_->has_param() &&
284           (req_->param().expected_client_identity().length() > 0 ||
285            req_->param().check_auth_context())) {
286         internal::CheckServerAuthContext(
287             ctx_, req_->param().expected_transport_security_type(),
288             req_->param().expected_client_identity());
289       }
290       if (req_->has_param() && req_->param().response_message_length() > 0) {
291         resp_->set_message(
292             std::string(req_->param().response_message_length(), '\0'));
293       }
294       if (req_->has_param() && req_->param().echo_peer()) {
295         resp_->mutable_param()->set_peer(ctx_->peer());
296       }
297       Finish(Status::OK);
298     }
299     void FinishWhenCancelledAsync() {
300       finish_when_cancelled_ = std::thread([this] {
301         std::unique_lock<std::mutex> l(cancel_mu_);
302         cancel_cv_.wait(l, [this] { return ctx_->IsCancelled(); });
303         Finish(Status::CANCELLED);
304       });
305     }
306 
307     CallbackTestServiceImpl* const service_;
308     experimental::CallbackServerContext* const ctx_;
309     const EchoRequest* const req_;
310     EchoResponse* const resp_;
311     Alarm alarm_;
312     std::mutex cancel_mu_;
313     std::condition_variable cancel_cv_;
314     bool initial_metadata_sent_ = false;
315     bool started_ = false;
316     bool on_cancel_invoked_ = false;
317     std::thread async_cancel_check_;
318     std::thread rpc_wait_thread_;
319     std::thread finish_when_cancelled_;
320   };
321 
322   return new Reactor(this, context, request, response);
323 }
324 
325 experimental::ServerUnaryReactor*
CheckClientInitialMetadata(experimental::CallbackServerContext * context,const SimpleRequest *,SimpleResponse *)326 CallbackTestServiceImpl::CheckClientInitialMetadata(
327     experimental::CallbackServerContext* context, const SimpleRequest*,
328     SimpleResponse*) {
329   class Reactor : public ::grpc::experimental::ServerUnaryReactor {
330    public:
331     explicit Reactor(experimental::CallbackServerContext* ctx) {
332       EXPECT_EQ(internal::MetadataMatchCount(ctx->client_metadata(),
333                                              kCheckClientInitialMetadataKey,
334                                              kCheckClientInitialMetadataVal),
335                 1);
336       EXPECT_EQ(ctx->client_metadata().count(kCheckClientInitialMetadataKey),
337                 1u);
338       Finish(Status::OK);
339     }
340     void OnDone() override { delete this; }
341   };
342 
343   return new Reactor(context);
344 }
345 
346 experimental::ServerReadReactor<EchoRequest>*
RequestStream(experimental::CallbackServerContext * context,EchoResponse * response)347 CallbackTestServiceImpl::RequestStream(
348     experimental::CallbackServerContext* context, EchoResponse* response) {
349   // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
350   // the server by calling ServerContext::TryCancel() depending on the
351   // value:
352   //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
353   //   reads any message from the client CANCEL_DURING_PROCESSING: The RPC
354   //   is cancelled while the server is reading messages from the client
355   //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
356   //   all the messages from the client
357   int server_try_cancel = internal::GetIntValueFromMetadata(
358       kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
359   if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
360     internal::ServerTryCancelNonblocking(context);
361     // Don't need to provide a reactor since the RPC is canceled
362     return nullptr;
363   }
364 
365   class Reactor : public ::grpc::experimental::ServerReadReactor<EchoRequest> {
366    public:
367     Reactor(experimental::CallbackServerContext* ctx, EchoResponse* response,
368             int server_try_cancel)
369         : ctx_(ctx),
370           response_(response),
371           server_try_cancel_(server_try_cancel) {
372       EXPECT_NE(server_try_cancel, CANCEL_BEFORE_PROCESSING);
373       response->set_message("");
374 
375       if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
376         ctx->TryCancel();
377         // Don't wait for it here
378       }
379       StartRead(&request_);
380       setup_done_ = true;
381     }
382     void OnDone() override { delete this; }
383     void OnCancel() override {
384       EXPECT_TRUE(setup_done_);
385       EXPECT_TRUE(ctx_->IsCancelled());
386       FinishOnce(Status::CANCELLED);
387     }
388     void OnReadDone(bool ok) override {
389       if (ok) {
390         response_->mutable_message()->append(request_.message());
391         num_msgs_read_++;
392         StartRead(&request_);
393       } else {
394         gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read_);
395 
396         if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
397           // Let OnCancel recover this
398           return;
399         }
400         if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
401           internal::ServerTryCancelNonblocking(ctx_);
402           return;
403         }
404         FinishOnce(Status::OK);
405       }
406     }
407 
408    private:
409     void FinishOnce(const Status& s) {
410       std::lock_guard<std::mutex> l(finish_mu_);
411       if (!finished_) {
412         Finish(s);
413         finished_ = true;
414       }
415     }
416 
417     experimental::CallbackServerContext* const ctx_;
418     EchoResponse* const response_;
419     EchoRequest request_;
420     int num_msgs_read_{0};
421     int server_try_cancel_;
422     std::mutex finish_mu_;
423     bool finished_{false};
424     bool setup_done_{false};
425   };
426 
427   return new Reactor(context, response, server_try_cancel);
428 }
429 
430 // Return 'kNumResponseStreamMsgs' messages.
431 // TODO(yangg) make it generic by adding a parameter into EchoRequest
432 experimental::ServerWriteReactor<EchoResponse>*
ResponseStream(experimental::CallbackServerContext * context,const EchoRequest * request)433 CallbackTestServiceImpl::ResponseStream(
434     experimental::CallbackServerContext* context, const EchoRequest* request) {
435   // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
436   // the server by calling ServerContext::TryCancel() depending on the
437   // value:
438   //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
439   //   reads any message from the client CANCEL_DURING_PROCESSING: The RPC
440   //   is cancelled while the server is reading messages from the client
441   //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
442   //   all the messages from the client
443   int server_try_cancel = internal::GetIntValueFromMetadata(
444       kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
445   if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
446     internal::ServerTryCancelNonblocking(context);
447   }
448 
449   class Reactor
450       : public ::grpc::experimental::ServerWriteReactor<EchoResponse> {
451    public:
452     Reactor(experimental::CallbackServerContext* ctx,
453             const EchoRequest* request, int server_try_cancel)
454         : ctx_(ctx), request_(request), server_try_cancel_(server_try_cancel) {
455       server_coalescing_api_ = internal::GetIntValueFromMetadata(
456           kServerUseCoalescingApi, ctx->client_metadata(), 0);
457       server_responses_to_send_ = internal::GetIntValueFromMetadata(
458           kServerResponseStreamsToSend, ctx->client_metadata(),
459           kServerDefaultResponseStreamsToSend);
460       if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
461         ctx->TryCancel();
462       }
463       if (server_try_cancel_ != CANCEL_BEFORE_PROCESSING) {
464         if (num_msgs_sent_ < server_responses_to_send_) {
465           NextWrite();
466         }
467       }
468       setup_done_ = true;
469     }
470     void OnDone() override { delete this; }
471     void OnCancel() override {
472       EXPECT_TRUE(setup_done_);
473       EXPECT_TRUE(ctx_->IsCancelled());
474       FinishOnce(Status::CANCELLED);
475     }
476     void OnWriteDone(bool /*ok*/) override {
477       if (num_msgs_sent_ < server_responses_to_send_) {
478         NextWrite();
479       } else if (server_coalescing_api_ != 0) {
480         // We would have already done Finish just after the WriteLast
481       } else if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
482         // Let OnCancel recover this
483       } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
484         internal::ServerTryCancelNonblocking(ctx_);
485       } else {
486         FinishOnce(Status::OK);
487       }
488     }
489 
490    private:
491     void FinishOnce(const Status& s) {
492       std::lock_guard<std::mutex> l(finish_mu_);
493       if (!finished_) {
494         Finish(s);
495         finished_ = true;
496       }
497     }
498 
499     void NextWrite() {
500       response_.set_message(request_->message() +
501                             std::to_string(num_msgs_sent_));
502       if (num_msgs_sent_ == server_responses_to_send_ - 1 &&
503           server_coalescing_api_ != 0) {
504         {
505           std::lock_guard<std::mutex> l(finish_mu_);
506           if (!finished_) {
507             num_msgs_sent_++;
508             StartWriteLast(&response_, WriteOptions());
509           }
510         }
511         // If we use WriteLast, we shouldn't wait before attempting Finish
512         FinishOnce(Status::OK);
513       } else {
514         std::lock_guard<std::mutex> l(finish_mu_);
515         if (!finished_) {
516           num_msgs_sent_++;
517           StartWrite(&response_);
518         }
519       }
520     }
521     experimental::CallbackServerContext* const ctx_;
522     const EchoRequest* const request_;
523     EchoResponse response_;
524     int num_msgs_sent_{0};
525     int server_try_cancel_;
526     int server_coalescing_api_;
527     int server_responses_to_send_;
528     std::mutex finish_mu_;
529     bool finished_{false};
530     bool setup_done_{false};
531   };
532   return new Reactor(context, request, server_try_cancel);
533 }
534 
535 experimental::ServerBidiReactor<EchoRequest, EchoResponse>*
BidiStream(experimental::CallbackServerContext * context)536 CallbackTestServiceImpl::BidiStream(
537     experimental::CallbackServerContext* context) {
538   class Reactor : public ::grpc::experimental::ServerBidiReactor<EchoRequest,
539                                                                  EchoResponse> {
540    public:
541     explicit Reactor(experimental::CallbackServerContext* ctx) : ctx_(ctx) {
542       // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
543       // the server by calling ServerContext::TryCancel() depending on the
544       // value:
545       //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
546       //   reads any message from the client CANCEL_DURING_PROCESSING: The RPC
547       //   is cancelled while the server is reading messages from the client
548       //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
549       //   all the messages from the client
550       server_try_cancel_ = internal::GetIntValueFromMetadata(
551           kServerTryCancelRequest, ctx->client_metadata(), DO_NOT_CANCEL);
552       server_write_last_ = internal::GetIntValueFromMetadata(
553           kServerFinishAfterNReads, ctx->client_metadata(), 0);
554       if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) {
555         internal::ServerTryCancelNonblocking(ctx);
556       } else {
557         if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
558           ctx->TryCancel();
559         }
560         StartRead(&request_);
561       }
562       setup_done_ = true;
563     }
564     void OnDone() override {
565       {
566         // Use the same lock as finish to make sure that OnDone isn't inlined.
567         std::lock_guard<std::mutex> l(finish_mu_);
568         EXPECT_TRUE(finished_);
569         finish_thread_.join();
570       }
571       delete this;
572     }
573     void OnCancel() override {
574       EXPECT_TRUE(setup_done_);
575       EXPECT_TRUE(ctx_->IsCancelled());
576       FinishOnce(Status::CANCELLED);
577     }
578     void OnReadDone(bool ok) override {
579       if (ok) {
580         num_msgs_read_++;
581         response_.set_message(request_.message());
582         std::lock_guard<std::mutex> l(finish_mu_);
583         if (!finished_) {
584           if (num_msgs_read_ == server_write_last_) {
585             StartWriteLast(&response_, WriteOptions());
586             // If we use WriteLast, we shouldn't wait before attempting Finish
587           } else {
588             StartWrite(&response_);
589             return;
590           }
591         }
592       }
593 
594       if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
595         // Let OnCancel handle this
596       } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
597         internal::ServerTryCancelNonblocking(ctx_);
598       } else {
599         FinishOnce(Status::OK);
600       }
601     }
602     void OnWriteDone(bool /*ok*/) override {
603       std::lock_guard<std::mutex> l(finish_mu_);
604       if (!finished_) {
605         StartRead(&request_);
606       }
607     }
608 
609    private:
610     void FinishOnce(const Status& s) {
611       std::lock_guard<std::mutex> l(finish_mu_);
612       if (!finished_) {
613         finished_ = true;
614         // Finish asynchronously to make sure that there are no deadlocks.
615         finish_thread_ = std::thread([this, s] {
616           std::lock_guard<std::mutex> l(finish_mu_);
617           Finish(s);
618         });
619       }
620     }
621 
622     experimental::CallbackServerContext* const ctx_;
623     EchoRequest request_;
624     EchoResponse response_;
625     int num_msgs_read_{0};
626     int server_try_cancel_;
627     int server_write_last_;
628     std::mutex finish_mu_;
629     bool finished_{false};
630     bool setup_done_{false};
631     std::thread finish_thread_;
632   };
633 
634   return new Reactor(context);
635 }
636 
637 }  // namespace testing
638 }  // namespace grpc
639