1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/ext/transport/chttp2/transport/stream_lists.h"
20
21 #include <grpc/support/port_platform.h>
22
23 #include "absl/log/check.h"
24 #include "absl/log/log.h"
25 #include "src/core/ext/transport/chttp2/transport/internal.h"
26 #include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
27 #include "src/core/lib/debug/trace.h"
28 #include "src/core/lib/experiments/experiments.h"
29 #include "src/core/util/bitset.h"
30
stream_list_id_string(grpc_chttp2_stream_list_id id)31 static const char* stream_list_id_string(grpc_chttp2_stream_list_id id) {
32 switch (id) {
33 case GRPC_CHTTP2_LIST_WRITABLE:
34 return "writable";
35 case GRPC_CHTTP2_LIST_WRITING:
36 return "writing";
37 case GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT:
38 return "stalled_by_transport";
39 case GRPC_CHTTP2_LIST_STALLED_BY_STREAM:
40 return "stalled_by_stream";
41 case GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY:
42 return "waiting_for_concurrency";
43 case STREAM_LIST_COUNT:
44 GPR_UNREACHABLE_CODE(return "unknown");
45 }
46 GPR_UNREACHABLE_CODE(return "unknown");
47 }
48
49 // core list management
50
stream_list_empty(grpc_chttp2_transport * t,grpc_chttp2_stream_list_id id)51 static bool stream_list_empty(grpc_chttp2_transport* t,
52 grpc_chttp2_stream_list_id id) {
53 return t->lists[id].head == nullptr;
54 }
55
stream_list_pop(grpc_chttp2_transport * t,grpc_chttp2_stream ** stream,grpc_chttp2_stream_list_id id)56 static bool stream_list_pop(grpc_chttp2_transport* t,
57 grpc_chttp2_stream** stream,
58 grpc_chttp2_stream_list_id id) {
59 grpc_chttp2_stream* s = t->lists[id].head;
60 if (s) {
61 grpc_chttp2_stream* new_head = s->links[id].next;
62 CHECK(s->included.is_set(id));
63 if (new_head) {
64 t->lists[id].head = new_head;
65 new_head->links[id].prev = nullptr;
66 } else {
67 t->lists[id].head = nullptr;
68 t->lists[id].tail = nullptr;
69 }
70 s->included.clear(id);
71 }
72 *stream = s;
73 if (s && GRPC_TRACE_FLAG_ENABLED(http2_stream_state)) {
74 LOG(INFO) << t << "[" << s->id << "][" << (t->is_client ? "cli" : "svr")
75 << "]: pop from " << stream_list_id_string(id);
76 }
77 return s != nullptr;
78 }
79
stream_list_remove(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_stream_list_id id)80 static void stream_list_remove(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
81 grpc_chttp2_stream_list_id id) {
82 CHECK(s->included.is_set(id));
83 s->included.clear(id);
84 if (s->links[id].prev) {
85 s->links[id].prev->links[id].next = s->links[id].next;
86 } else {
87 CHECK(t->lists[id].head == s);
88 t->lists[id].head = s->links[id].next;
89 }
90 if (s->links[id].next) {
91 s->links[id].next->links[id].prev = s->links[id].prev;
92 } else {
93 t->lists[id].tail = s->links[id].prev;
94 }
95 GRPC_TRACE_LOG(http2_stream_state, INFO)
96 << t << "[" << s->id << "][" << (t->is_client ? "cli" : "svr")
97 << "]: remove from " << stream_list_id_string(id);
98 }
99
stream_list_maybe_remove(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_stream_list_id id)100 static bool stream_list_maybe_remove(grpc_chttp2_transport* t,
101 grpc_chttp2_stream* s,
102 grpc_chttp2_stream_list_id id) {
103 if (s->included.is_set(id)) {
104 stream_list_remove(t, s, id);
105 return true;
106 } else {
107 return false;
108 }
109 }
110
stream_list_add_tail(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_stream_list_id id)111 static void stream_list_add_tail(grpc_chttp2_transport* t,
112 grpc_chttp2_stream* s,
113 grpc_chttp2_stream_list_id id) {
114 grpc_chttp2_stream* old_tail;
115 CHECK(!s->included.is_set(id));
116 old_tail = t->lists[id].tail;
117 s->links[id].next = nullptr;
118 s->links[id].prev = old_tail;
119 if (old_tail) {
120 old_tail->links[id].next = s;
121 } else {
122 t->lists[id].head = s;
123 }
124 t->lists[id].tail = s;
125 s->included.set(id);
126 GRPC_TRACE_LOG(http2_stream_state, INFO)
127 << t << "[" << s->id << "][" << (t->is_client ? "cli" : "svr")
128 << "]: add to " << stream_list_id_string(id);
129 }
130
stream_list_add_head(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_stream_list_id id)131 static void stream_list_add_head(grpc_chttp2_transport* t,
132 grpc_chttp2_stream* s,
133 grpc_chttp2_stream_list_id id) {
134 grpc_chttp2_stream* old_head;
135 CHECK(!s->included.is_set(id));
136 old_head = t->lists[id].head;
137 s->links[id].next = old_head;
138 s->links[id].prev = nullptr;
139 if (old_head) {
140 old_head->links[id].prev = s;
141 } else {
142 t->lists[id].tail = s;
143 }
144 t->lists[id].head = s;
145 s->included.set(id);
146 GRPC_TRACE_LOG(http2_stream_state, INFO)
147 << t << "[" << s->id << "][" << (t->is_client ? "cli" : "svr")
148 << "]: add to " << stream_list_id_string(id);
149 }
150
stream_list_add(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_stream_list_id id)151 static bool stream_list_add(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
152 grpc_chttp2_stream_list_id id) {
153 if (s->included.is_set(id)) {
154 return false;
155 }
156 stream_list_add_tail(t, s, id);
157 return true;
158 }
159
stream_list_prepend(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_stream_list_id id)160 static bool stream_list_prepend(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
161 grpc_chttp2_stream_list_id id) {
162 if (s->included.is_set(id)) {
163 return false;
164 }
165 stream_list_add_head(t, s, id);
166 return true;
167 }
168
169 // wrappers for specializations
170
grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s)171 bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport* t,
172 grpc_chttp2_stream* s) {
173 CHECK_NE(s->id, 0u);
174 if (grpc_core::IsPrioritizeFinishedRequestsEnabled() &&
175 s->send_trailing_metadata != nullptr) {
176 return stream_list_prepend(t, s, GRPC_CHTTP2_LIST_WRITABLE);
177 }
178 return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITABLE);
179 }
180
grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport * t,grpc_chttp2_stream ** s)181 bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport* t,
182 grpc_chttp2_stream** s) {
183 return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITABLE);
184 }
185
grpc_chttp2_list_remove_writable_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s)186 bool grpc_chttp2_list_remove_writable_stream(grpc_chttp2_transport* t,
187 grpc_chttp2_stream* s) {
188 return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_WRITABLE);
189 }
190
grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s)191 bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport* t,
192 grpc_chttp2_stream* s) {
193 return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITING);
194 }
195
grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport * t)196 bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport* t) {
197 return !stream_list_empty(t, GRPC_CHTTP2_LIST_WRITING);
198 }
199
grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport * t,grpc_chttp2_stream ** s)200 bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport* t,
201 grpc_chttp2_stream** s) {
202 return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITING);
203 }
204
grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport * t,grpc_chttp2_stream * s)205 void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport* t,
206 grpc_chttp2_stream* s) {
207 stream_list_add(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
208 }
209
grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport * t,grpc_chttp2_stream ** s)210 bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport* t,
211 grpc_chttp2_stream** s) {
212 return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
213 }
214
grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport * t,grpc_chttp2_stream * s)215 void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t,
216 grpc_chttp2_stream* s) {
217 stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
218 }
219
grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport * t,grpc_chttp2_stream * s)220 void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t,
221 grpc_chttp2_stream* s) {
222 if (grpc_core::IsPrioritizeFinishedRequestsEnabled() &&
223 s->send_trailing_metadata != nullptr) {
224 stream_list_prepend(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
225 } else {
226 stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
227 }
228 }
229
grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport * t,grpc_chttp2_stream ** s)230 bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport* t,
231 grpc_chttp2_stream** s) {
232 return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
233 }
234
grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport * t,grpc_chttp2_stream * s)235 void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t,
236 grpc_chttp2_stream* s) {
237 stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
238 }
239
grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s)240 void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t,
241 grpc_chttp2_stream* s) {
242 stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
243 }
244
grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport * t,grpc_chttp2_stream ** s)245 bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport* t,
246 grpc_chttp2_stream** s) {
247 return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
248 }
249
grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s)250 bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
251 grpc_chttp2_stream* s) {
252 return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
253 }
254