1 /*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/work_serializer.h"
22
23 namespace grpc_core {
24
25 DebugOnlyTraceFlag grpc_work_serializer_trace(false, "work_serializer");
26
27 struct CallbackWrapper {
CallbackWrappergrpc_core::CallbackWrapper28 CallbackWrapper(std::function<void()> cb, const grpc_core::DebugLocation& loc)
29 : callback(std::move(cb)), location(loc) {}
30
31 MultiProducerSingleConsumerQueue::Node mpscq_node;
32 const std::function<void()> callback;
33 const DebugLocation location;
34 };
35
36 class WorkSerializer::WorkSerializerImpl : public Orphanable {
37 public:
38 void Run(std::function<void()> callback,
39 const grpc_core::DebugLocation& location);
40
41 void Orphan() override;
42
43 private:
44 void DrainQueue();
45
46 // An initial size of 1 keeps track of whether the work serializer has been
47 // orphaned.
48 Atomic<size_t> size_{1};
49 MultiProducerSingleConsumerQueue queue_;
50 };
51
Run(std::function<void ()> callback,const grpc_core::DebugLocation & location)52 void WorkSerializer::WorkSerializerImpl::Run(
53 std::function<void()> callback, const grpc_core::DebugLocation& location) {
54 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
55 gpr_log(GPR_INFO, "WorkSerializer::Run() %p Scheduling callback [%s:%d]",
56 this, location.file(), location.line());
57 }
58 const size_t prev_size = size_.FetchAdd(1);
59 // The work serializer should not have been orphaned.
60 GPR_DEBUG_ASSERT(prev_size > 0);
61 if (prev_size == 1) {
62 // There is no other closure executing right now on this work serializer.
63 // Execute this closure immediately.
64 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
65 gpr_log(GPR_INFO, " Executing immediately");
66 }
67 callback();
68 // Loan this thread to the work serializer thread and drain the queue.
69 DrainQueue();
70 } else {
71 CallbackWrapper* cb_wrapper =
72 new CallbackWrapper(std::move(callback), location);
73 // There already are closures executing on this work serializer. Simply add
74 // this closure to the queue.
75 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
76 gpr_log(GPR_INFO, " Scheduling on queue : item %p", cb_wrapper);
77 }
78 queue_.Push(&cb_wrapper->mpscq_node);
79 }
80 }
81
Orphan()82 void WorkSerializer::WorkSerializerImpl::Orphan() {
83 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
84 gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this);
85 }
86 size_t prev_size = size_.FetchSub(1);
87 if (prev_size == 1) {
88 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
89 gpr_log(GPR_INFO, " Destroying");
90 }
91 delete this;
92 }
93 }
94
95 // The thread that calls this loans itself to the work serializer so as to
96 // execute all the scheduled callback. This is called from within
97 // WorkSerializer::Run() after executing a callback immediately, and hence size_
98 // is at least 1.
DrainQueue()99 void WorkSerializer::WorkSerializerImpl::DrainQueue() {
100 while (true) {
101 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
102 gpr_log(GPR_INFO, "WorkSerializer::DrainQueue() %p", this);
103 }
104 size_t prev_size = size_.FetchSub(1);
105 GPR_DEBUG_ASSERT(prev_size >= 1);
106 // It is possible that while draining the queue, one of the callbacks ended
107 // up orphaning the work serializer. In that case, delete the object.
108 if (prev_size == 1) {
109 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
110 gpr_log(GPR_INFO, " Queue Drained. Destroying");
111 }
112 delete this;
113 return;
114 }
115 if (prev_size == 2) {
116 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
117 gpr_log(GPR_INFO, " Queue Drained");
118 }
119 return;
120 }
121 // There is at least one callback on the queue. Pop the callback from the
122 // queue and execute it.
123 CallbackWrapper* cb_wrapper = nullptr;
124 bool empty_unused;
125 while ((cb_wrapper = reinterpret_cast<CallbackWrapper*>(
126 queue_.PopAndCheckEnd(&empty_unused))) == nullptr) {
127 // This can happen either due to a race condition within the mpscq
128 // implementation or because of a race with Run()
129 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
130 gpr_log(GPR_INFO, " Queue returned nullptr, trying again");
131 }
132 }
133 if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
134 gpr_log(GPR_INFO, " Running item %p : callback scheduled at [%s:%d]",
135 cb_wrapper, cb_wrapper->location.file(),
136 cb_wrapper->location.line());
137 }
138 cb_wrapper->callback();
139 delete cb_wrapper;
140 }
141 }
142
143 // WorkSerializer
144
WorkSerializer()145 WorkSerializer::WorkSerializer()
146 : impl_(MakeOrphanable<WorkSerializerImpl>()) {}
147
~WorkSerializer()148 WorkSerializer::~WorkSerializer() {}
149
Run(std::function<void ()> callback,const grpc_core::DebugLocation & location)150 void WorkSerializer::Run(std::function<void()> callback,
151 const grpc_core::DebugLocation& location) {
152 impl_->Run(std::move(callback), location);
153 }
154
155 } // namespace grpc_core
156