• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/work_serializer.h"
22 
23 namespace grpc_core {
24 
25 DebugOnlyTraceFlag grpc_work_serializer_trace(false, "work_serializer");
26 
27 struct CallbackWrapper {
CallbackWrappergrpc_core::CallbackWrapper28   CallbackWrapper(std::function<void()> cb, const grpc_core::DebugLocation& loc)
29       : callback(std::move(cb)), location(loc) {}
30 
31   MultiProducerSingleConsumerQueue::Node mpscq_node;
32   const std::function<void()> callback;
33   const DebugLocation location;
34 };
35 
36 class WorkSerializer::WorkSerializerImpl : public Orphanable {
37  public:
38   void Run(std::function<void()> callback,
39            const grpc_core::DebugLocation& location);
40 
41   void Orphan() override;
42 
43  private:
44   void DrainQueue();
45 
46   // An initial size of 1 keeps track of whether the work serializer has been
47   // orphaned.
48   Atomic<size_t> size_{1};
49   MultiProducerSingleConsumerQueue queue_;
50 };
51 
Run(std::function<void ()> callback,const grpc_core::DebugLocation & location)52 void WorkSerializer::WorkSerializerImpl::Run(
53     std::function<void()> callback, const grpc_core::DebugLocation& location) {
54   if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
55     gpr_log(GPR_INFO, "WorkSerializer::Run() %p Scheduling callback [%s:%d]",
56             this, location.file(), location.line());
57   }
58   const size_t prev_size = size_.FetchAdd(1);
59   // The work serializer should not have been orphaned.
60   GPR_DEBUG_ASSERT(prev_size > 0);
61   if (prev_size == 1) {
62     // There is no other closure executing right now on this work serializer.
63     // Execute this closure immediately.
64     if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
65       gpr_log(GPR_INFO, "  Executing immediately");
66     }
67     callback();
68     // Loan this thread to the work serializer thread and drain the queue.
69     DrainQueue();
70   } else {
71     CallbackWrapper* cb_wrapper =
72         new CallbackWrapper(std::move(callback), location);
73     // There already are closures executing on this work serializer. Simply add
74     // this closure to the queue.
75     if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
76       gpr_log(GPR_INFO, "  Scheduling on queue : item %p", cb_wrapper);
77     }
78     queue_.Push(&cb_wrapper->mpscq_node);
79   }
80 }
81 
Orphan()82 void WorkSerializer::WorkSerializerImpl::Orphan() {
83   if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
84     gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this);
85   }
86   size_t prev_size = size_.FetchSub(1);
87   if (prev_size == 1) {
88     if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
89       gpr_log(GPR_INFO, "  Destroying");
90     }
91     delete this;
92   }
93 }
94 
95 // The thread that calls this loans itself to the work serializer so as to
96 // execute all the scheduled callback. This is called from within
97 // WorkSerializer::Run() after executing a callback immediately, and hence size_
98 // is at least 1.
DrainQueue()99 void WorkSerializer::WorkSerializerImpl::DrainQueue() {
100   while (true) {
101     if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
102       gpr_log(GPR_INFO, "WorkSerializer::DrainQueue() %p", this);
103     }
104     size_t prev_size = size_.FetchSub(1);
105     GPR_DEBUG_ASSERT(prev_size >= 1);
106     // It is possible that while draining the queue, one of the callbacks ended
107     // up orphaning the work serializer. In that case, delete the object.
108     if (prev_size == 1) {
109       if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
110         gpr_log(GPR_INFO, "  Queue Drained. Destroying");
111       }
112       delete this;
113       return;
114     }
115     if (prev_size == 2) {
116       if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
117         gpr_log(GPR_INFO, "  Queue Drained");
118       }
119       return;
120     }
121     // There is at least one callback on the queue. Pop the callback from the
122     // queue and execute it.
123     CallbackWrapper* cb_wrapper = nullptr;
124     bool empty_unused;
125     while ((cb_wrapper = reinterpret_cast<CallbackWrapper*>(
126                 queue_.PopAndCheckEnd(&empty_unused))) == nullptr) {
127       // This can happen either due to a race condition within the mpscq
128       // implementation or because of a race with Run()
129       if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
130         gpr_log(GPR_INFO, "  Queue returned nullptr, trying again");
131       }
132     }
133     if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
134       gpr_log(GPR_INFO, "  Running item %p : callback scheduled at [%s:%d]",
135               cb_wrapper, cb_wrapper->location.file(),
136               cb_wrapper->location.line());
137     }
138     cb_wrapper->callback();
139     delete cb_wrapper;
140   }
141 }
142 
143 // WorkSerializer
144 
WorkSerializer()145 WorkSerializer::WorkSerializer()
146     : impl_(MakeOrphanable<WorkSerializerImpl>()) {}
147 
~WorkSerializer()148 WorkSerializer::~WorkSerializer() {}
149 
Run(std::function<void ()> callback,const grpc_core::DebugLocation & location)150 void WorkSerializer::Run(std::function<void()> callback,
151                          const grpc_core::DebugLocation& location) {
152   impl_->Run(std::move(callback), location);
153 }
154 
155 }  // namespace grpc_core
156