• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/ext/transport/chttp2/transport/stream_lists.h"
20 
21 #include <grpc/support/port_platform.h>
22 
23 #include "absl/log/check.h"
24 #include "absl/log/log.h"
25 #include "src/core/ext/transport/chttp2/transport/internal.h"
26 #include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
27 #include "src/core/lib/debug/trace.h"
28 #include "src/core/lib/experiments/experiments.h"
29 #include "src/core/util/bitset.h"
30 
stream_list_id_string(grpc_chttp2_stream_list_id id)31 static const char* stream_list_id_string(grpc_chttp2_stream_list_id id) {
32   switch (id) {
33     case GRPC_CHTTP2_LIST_WRITABLE:
34       return "writable";
35     case GRPC_CHTTP2_LIST_WRITING:
36       return "writing";
37     case GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT:
38       return "stalled_by_transport";
39     case GRPC_CHTTP2_LIST_STALLED_BY_STREAM:
40       return "stalled_by_stream";
41     case GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY:
42       return "waiting_for_concurrency";
43     case STREAM_LIST_COUNT:
44       GPR_UNREACHABLE_CODE(return "unknown");
45   }
46   GPR_UNREACHABLE_CODE(return "unknown");
47 }
48 
49 // core list management
50 
stream_list_empty(grpc_chttp2_transport * t,grpc_chttp2_stream_list_id id)51 static bool stream_list_empty(grpc_chttp2_transport* t,
52                               grpc_chttp2_stream_list_id id) {
53   return t->lists[id].head == nullptr;
54 }
55 
stream_list_pop(grpc_chttp2_transport * t,grpc_chttp2_stream ** stream,grpc_chttp2_stream_list_id id)56 static bool stream_list_pop(grpc_chttp2_transport* t,
57                             grpc_chttp2_stream** stream,
58                             grpc_chttp2_stream_list_id id) {
59   grpc_chttp2_stream* s = t->lists[id].head;
60   if (s) {
61     grpc_chttp2_stream* new_head = s->links[id].next;
62     CHECK(s->included.is_set(id));
63     if (new_head) {
64       t->lists[id].head = new_head;
65       new_head->links[id].prev = nullptr;
66     } else {
67       t->lists[id].head = nullptr;
68       t->lists[id].tail = nullptr;
69     }
70     s->included.clear(id);
71   }
72   *stream = s;
73   if (s && GRPC_TRACE_FLAG_ENABLED(http2_stream_state)) {
74     LOG(INFO) << t << "[" << s->id << "][" << (t->is_client ? "cli" : "svr")
75               << "]: pop from " << stream_list_id_string(id);
76   }
77   return s != nullptr;
78 }
79 
stream_list_remove(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_stream_list_id id)80 static void stream_list_remove(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
81                                grpc_chttp2_stream_list_id id) {
82   CHECK(s->included.is_set(id));
83   s->included.clear(id);
84   if (s->links[id].prev) {
85     s->links[id].prev->links[id].next = s->links[id].next;
86   } else {
87     CHECK(t->lists[id].head == s);
88     t->lists[id].head = s->links[id].next;
89   }
90   if (s->links[id].next) {
91     s->links[id].next->links[id].prev = s->links[id].prev;
92   } else {
93     t->lists[id].tail = s->links[id].prev;
94   }
95   GRPC_TRACE_LOG(http2_stream_state, INFO)
96       << t << "[" << s->id << "][" << (t->is_client ? "cli" : "svr")
97       << "]: remove from " << stream_list_id_string(id);
98 }
99 
stream_list_maybe_remove(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_stream_list_id id)100 static bool stream_list_maybe_remove(grpc_chttp2_transport* t,
101                                      grpc_chttp2_stream* s,
102                                      grpc_chttp2_stream_list_id id) {
103   if (s->included.is_set(id)) {
104     stream_list_remove(t, s, id);
105     return true;
106   } else {
107     return false;
108   }
109 }
110 
stream_list_add_tail(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_stream_list_id id)111 static void stream_list_add_tail(grpc_chttp2_transport* t,
112                                  grpc_chttp2_stream* s,
113                                  grpc_chttp2_stream_list_id id) {
114   grpc_chttp2_stream* old_tail;
115   CHECK(!s->included.is_set(id));
116   old_tail = t->lists[id].tail;
117   s->links[id].next = nullptr;
118   s->links[id].prev = old_tail;
119   if (old_tail) {
120     old_tail->links[id].next = s;
121   } else {
122     t->lists[id].head = s;
123   }
124   t->lists[id].tail = s;
125   s->included.set(id);
126   GRPC_TRACE_LOG(http2_stream_state, INFO)
127       << t << "[" << s->id << "][" << (t->is_client ? "cli" : "svr")
128       << "]: add to " << stream_list_id_string(id);
129 }
130 
stream_list_add_head(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_stream_list_id id)131 static void stream_list_add_head(grpc_chttp2_transport* t,
132                                  grpc_chttp2_stream* s,
133                                  grpc_chttp2_stream_list_id id) {
134   grpc_chttp2_stream* old_head;
135   CHECK(!s->included.is_set(id));
136   old_head = t->lists[id].head;
137   s->links[id].next = old_head;
138   s->links[id].prev = nullptr;
139   if (old_head) {
140     old_head->links[id].prev = s;
141   } else {
142     t->lists[id].tail = s;
143   }
144   t->lists[id].head = s;
145   s->included.set(id);
146   GRPC_TRACE_LOG(http2_stream_state, INFO)
147       << t << "[" << s->id << "][" << (t->is_client ? "cli" : "svr")
148       << "]: add to " << stream_list_id_string(id);
149 }
150 
stream_list_add(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_stream_list_id id)151 static bool stream_list_add(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
152                             grpc_chttp2_stream_list_id id) {
153   if (s->included.is_set(id)) {
154     return false;
155   }
156   stream_list_add_tail(t, s, id);
157   return true;
158 }
159 
stream_list_prepend(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_stream_list_id id)160 static bool stream_list_prepend(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
161                                 grpc_chttp2_stream_list_id id) {
162   if (s->included.is_set(id)) {
163     return false;
164   }
165   stream_list_add_head(t, s, id);
166   return true;
167 }
168 
169 // wrappers for specializations
170 
grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s)171 bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport* t,
172                                           grpc_chttp2_stream* s) {
173   CHECK_NE(s->id, 0u);
174   if (grpc_core::IsPrioritizeFinishedRequestsEnabled() &&
175       s->send_trailing_metadata != nullptr) {
176     return stream_list_prepend(t, s, GRPC_CHTTP2_LIST_WRITABLE);
177   }
178   return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITABLE);
179 }
180 
grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport * t,grpc_chttp2_stream ** s)181 bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport* t,
182                                           grpc_chttp2_stream** s) {
183   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITABLE);
184 }
185 
grpc_chttp2_list_remove_writable_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s)186 bool grpc_chttp2_list_remove_writable_stream(grpc_chttp2_transport* t,
187                                              grpc_chttp2_stream* s) {
188   return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_WRITABLE);
189 }
190 
grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s)191 bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport* t,
192                                          grpc_chttp2_stream* s) {
193   return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITING);
194 }
195 
grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport * t)196 bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport* t) {
197   return !stream_list_empty(t, GRPC_CHTTP2_LIST_WRITING);
198 }
199 
grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport * t,grpc_chttp2_stream ** s)200 bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport* t,
201                                          grpc_chttp2_stream** s) {
202   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITING);
203 }
204 
grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport * t,grpc_chttp2_stream * s)205 void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport* t,
206                                                   grpc_chttp2_stream* s) {
207   stream_list_add(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
208 }
209 
grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport * t,grpc_chttp2_stream ** s)210 bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport* t,
211                                                   grpc_chttp2_stream** s) {
212   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
213 }
214 
grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport * t,grpc_chttp2_stream * s)215 void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t,
216                                                      grpc_chttp2_stream* s) {
217   stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
218 }
219 
grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport * t,grpc_chttp2_stream * s)220 void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t,
221                                                grpc_chttp2_stream* s) {
222   if (grpc_core::IsPrioritizeFinishedRequestsEnabled() &&
223       s->send_trailing_metadata != nullptr) {
224     stream_list_prepend(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
225   } else {
226     stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
227   }
228 }
229 
grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport * t,grpc_chttp2_stream ** s)230 bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport* t,
231                                                grpc_chttp2_stream** s) {
232   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
233 }
234 
grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport * t,grpc_chttp2_stream * s)235 void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t,
236                                                   grpc_chttp2_stream* s) {
237   stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
238 }
239 
grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s)240 void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t,
241                                             grpc_chttp2_stream* s) {
242   stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
243 }
244 
grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport * t,grpc_chttp2_stream ** s)245 bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport* t,
246                                             grpc_chttp2_stream** s) {
247   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
248 }
249 
grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s)250 bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
251                                                grpc_chttp2_stream* s) {
252   return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
253 }
254