1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
22
23 #include <string.h>
24
25 #include "absl/strings/str_format.h"
26
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include "src/core/ext/transport/chttp2/transport/internal.h"
30 #include "src/core/lib/gpr/string.h"
31 #include "src/core/lib/gprpp/memory.h"
32 #include "src/core/lib/slice/slice_internal.h"
33 #include "src/core/lib/slice/slice_string_helpers.h"
34 #include "src/core/lib/transport/transport.h"
35
~grpc_chttp2_data_parser()36 grpc_chttp2_data_parser::~grpc_chttp2_data_parser() {
37 if (parsing_frame != nullptr) {
38 GRPC_ERROR_UNREF(parsing_frame->Finished(
39 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false));
40 }
41 GRPC_ERROR_UNREF(error);
42 }
43
grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *,uint8_t flags,uint32_t stream_id,grpc_chttp2_stream * s)44 grpc_error* grpc_chttp2_data_parser_begin_frame(
45 grpc_chttp2_data_parser* /*parser*/, uint8_t flags, uint32_t stream_id,
46 grpc_chttp2_stream* s) {
47 if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
48 return grpc_error_set_int(
49 GRPC_ERROR_CREATE_FROM_COPIED_STRING(
50 absl::StrFormat("unsupported data flags: 0x%02x", flags).c_str()),
51 GRPC_ERROR_INT_STREAM_ID, static_cast<intptr_t>(stream_id));
52 }
53
54 if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
55 s->received_last_frame = true;
56 s->eos_received = true;
57 } else {
58 s->received_last_frame = false;
59 }
60
61 return GRPC_ERROR_NONE;
62 }
63
grpc_chttp2_encode_data(uint32_t id,grpc_slice_buffer * inbuf,uint32_t write_bytes,int is_eof,grpc_transport_one_way_stats * stats,grpc_slice_buffer * outbuf)64 void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
65 uint32_t write_bytes, int is_eof,
66 grpc_transport_one_way_stats* stats,
67 grpc_slice_buffer* outbuf) {
68 grpc_slice hdr;
69 uint8_t* p;
70 static const size_t header_size = 9;
71
72 hdr = GRPC_SLICE_MALLOC(header_size);
73 p = GRPC_SLICE_START_PTR(hdr);
74 GPR_ASSERT(write_bytes < (1 << 24));
75 *p++ = static_cast<uint8_t>(write_bytes >> 16);
76 *p++ = static_cast<uint8_t>(write_bytes >> 8);
77 *p++ = static_cast<uint8_t>(write_bytes);
78 *p++ = GRPC_CHTTP2_FRAME_DATA;
79 *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0;
80 *p++ = static_cast<uint8_t>(id >> 24);
81 *p++ = static_cast<uint8_t>(id >> 16);
82 *p++ = static_cast<uint8_t>(id >> 8);
83 *p++ = static_cast<uint8_t>(id);
84 grpc_slice_buffer_add(outbuf, hdr);
85
86 grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf);
87
88 stats->framing_bytes += header_size;
89 stats->data_bytes += write_bytes;
90 }
91
grpc_deframe_unprocessed_incoming_frames(grpc_chttp2_data_parser * p,grpc_chttp2_stream * s,grpc_slice_buffer * slices,grpc_slice * slice_out,grpc_core::OrphanablePtr<grpc_core::ByteStream> * stream_out)92 grpc_error* grpc_deframe_unprocessed_incoming_frames(
93 grpc_chttp2_data_parser* p, grpc_chttp2_stream* s,
94 grpc_slice_buffer* slices, grpc_slice* slice_out,
95 grpc_core::OrphanablePtr<grpc_core::ByteStream>* stream_out) {
96 grpc_error* error = GRPC_ERROR_NONE;
97 grpc_chttp2_transport* t = s->t;
98
99 while (slices->count > 0) {
100 uint8_t* beg = nullptr;
101 uint8_t* end = nullptr;
102 uint8_t* cur = nullptr;
103
104 grpc_slice* slice = grpc_slice_buffer_peek_first(slices);
105 beg = GRPC_SLICE_START_PTR(*slice);
106 end = GRPC_SLICE_END_PTR(*slice);
107 cur = beg;
108 uint32_t message_flags;
109
110 if (cur == end) {
111 grpc_slice_buffer_remove_first(slices);
112 continue;
113 }
114
115 switch (p->state) {
116 case GRPC_CHTTP2_DATA_ERROR:
117 p->state = GRPC_CHTTP2_DATA_ERROR;
118 grpc_slice_buffer_remove_first(slices);
119 return GRPC_ERROR_REF(p->error);
120 case GRPC_CHTTP2_DATA_FH_0:
121 s->stats.incoming.framing_bytes++;
122 p->frame_type = *cur;
123 switch (p->frame_type) {
124 case 0:
125 p->is_frame_compressed = false; /* GPR_FALSE */
126 break;
127 case 1:
128 p->is_frame_compressed = true; /* GPR_TRUE */
129 break;
130 default:
131 p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
132 absl::StrFormat("Bad GRPC frame type 0x%02x", p->frame_type)
133 .c_str());
134 p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
135 static_cast<intptr_t>(s->id));
136 p->error = grpc_error_set_str(
137 p->error, GRPC_ERROR_STR_RAW_BYTES,
138 grpc_slice_from_moved_string(grpc_core::UniquePtr<char>(
139 grpc_dump_slice(*slice, GPR_DUMP_HEX | GPR_DUMP_ASCII))));
140 p->error =
141 grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
142 p->state = GRPC_CHTTP2_DATA_ERROR;
143 grpc_slice_buffer_remove_first(slices);
144 return GRPC_ERROR_REF(p->error);
145 }
146 if (++cur == end) {
147 p->state = GRPC_CHTTP2_DATA_FH_1;
148 grpc_slice_buffer_remove_first(slices);
149 continue;
150 }
151 /* fallthrough */
152 case GRPC_CHTTP2_DATA_FH_1:
153 s->stats.incoming.framing_bytes++;
154 p->frame_size = (static_cast<uint32_t>(*cur)) << 24;
155 if (++cur == end) {
156 p->state = GRPC_CHTTP2_DATA_FH_2;
157 grpc_slice_buffer_remove_first(slices);
158 continue;
159 }
160 /* fallthrough */
161 case GRPC_CHTTP2_DATA_FH_2:
162 s->stats.incoming.framing_bytes++;
163 p->frame_size |= (static_cast<uint32_t>(*cur)) << 16;
164 if (++cur == end) {
165 p->state = GRPC_CHTTP2_DATA_FH_3;
166 grpc_slice_buffer_remove_first(slices);
167 continue;
168 }
169 /* fallthrough */
170 case GRPC_CHTTP2_DATA_FH_3:
171 s->stats.incoming.framing_bytes++;
172 p->frame_size |= (static_cast<uint32_t>(*cur)) << 8;
173 if (++cur == end) {
174 p->state = GRPC_CHTTP2_DATA_FH_4;
175 grpc_slice_buffer_remove_first(slices);
176 continue;
177 }
178 /* fallthrough */
179 case GRPC_CHTTP2_DATA_FH_4:
180 s->stats.incoming.framing_bytes++;
181 GPR_ASSERT(stream_out != nullptr);
182 GPR_ASSERT(p->parsing_frame == nullptr);
183 p->frame_size |= (static_cast<uint32_t>(*cur));
184 if (t->channelz_socket != nullptr) {
185 t->channelz_socket->RecordMessageReceived();
186 }
187 p->state = GRPC_CHTTP2_DATA_FRAME;
188 ++cur;
189 message_flags = 0;
190 if (p->is_frame_compressed) {
191 message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
192 }
193 p->parsing_frame = new grpc_core::Chttp2IncomingByteStream(
194 t, s, p->frame_size, message_flags);
195 stream_out->reset(p->parsing_frame);
196 if (p->parsing_frame->remaining_bytes() == 0) {
197 GRPC_ERROR_UNREF(p->parsing_frame->Finished(GRPC_ERROR_NONE, true));
198 p->parsing_frame = nullptr;
199 p->state = GRPC_CHTTP2_DATA_FH_0;
200 }
201 s->pending_byte_stream = true;
202 if (cur != end) {
203 grpc_slice_buffer_sub_first(slices, static_cast<size_t>(cur - beg),
204 static_cast<size_t>(end - beg));
205 } else {
206 grpc_slice_buffer_remove_first(slices);
207 }
208 return GRPC_ERROR_NONE;
209 case GRPC_CHTTP2_DATA_FRAME: {
210 GPR_ASSERT(p->parsing_frame != nullptr);
211 GPR_ASSERT(slice_out != nullptr);
212 if (cur == end) {
213 grpc_slice_buffer_remove_first(slices);
214 continue;
215 }
216 uint32_t remaining = static_cast<uint32_t>(end - cur);
217 if (remaining == p->frame_size) {
218 s->stats.incoming.data_bytes += remaining;
219 if (GRPC_ERROR_NONE !=
220 (error = p->parsing_frame->Push(
221 grpc_slice_sub(*slice, static_cast<size_t>(cur - beg),
222 static_cast<size_t>(end - beg)),
223 slice_out))) {
224 grpc_slice_buffer_remove_first(slices);
225 return error;
226 }
227 if (GRPC_ERROR_NONE !=
228 (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
229 grpc_slice_buffer_remove_first(slices);
230 return error;
231 }
232 p->parsing_frame = nullptr;
233 p->state = GRPC_CHTTP2_DATA_FH_0;
234 grpc_slice_buffer_remove_first(slices);
235 return GRPC_ERROR_NONE;
236 } else if (remaining < p->frame_size) {
237 s->stats.incoming.data_bytes += remaining;
238 if (GRPC_ERROR_NONE !=
239 (error = p->parsing_frame->Push(
240 grpc_slice_sub(*slice, static_cast<size_t>(cur - beg),
241 static_cast<size_t>(end - beg)),
242 slice_out))) {
243 return error;
244 }
245 p->frame_size -= remaining;
246 grpc_slice_buffer_remove_first(slices);
247 return GRPC_ERROR_NONE;
248 } else {
249 GPR_ASSERT(remaining > p->frame_size);
250 s->stats.incoming.data_bytes += p->frame_size;
251 if (GRPC_ERROR_NONE !=
252 p->parsing_frame->Push(
253 grpc_slice_sub(
254 *slice, static_cast<size_t>(cur - beg),
255 static_cast<size_t>(cur + p->frame_size - beg)),
256 slice_out)) {
257 grpc_slice_buffer_remove_first(slices);
258 return error;
259 }
260 if (GRPC_ERROR_NONE !=
261 (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
262 grpc_slice_buffer_remove_first(slices);
263 return error;
264 }
265 p->parsing_frame = nullptr;
266 p->state = GRPC_CHTTP2_DATA_FH_0;
267 cur += p->frame_size;
268 grpc_slice_buffer_sub_first(slices, static_cast<size_t>(cur - beg),
269 static_cast<size_t>(end - beg));
270 return GRPC_ERROR_NONE;
271 }
272 }
273 }
274 }
275 return GRPC_ERROR_NONE;
276 }
277
grpc_chttp2_data_parser_parse(void *,grpc_chttp2_transport * t,grpc_chttp2_stream * s,const grpc_slice & slice,int is_last)278 grpc_error* grpc_chttp2_data_parser_parse(void* /*parser*/,
279 grpc_chttp2_transport* t,
280 grpc_chttp2_stream* s,
281 const grpc_slice& slice,
282 int is_last) {
283 if (!s->pending_byte_stream) {
284 grpc_slice_ref_internal(slice);
285 grpc_slice_buffer_add(&s->frame_storage, slice);
286 grpc_chttp2_maybe_complete_recv_message(t, s);
287 } else if (s->on_next) {
288 GPR_ASSERT(s->frame_storage.length == 0);
289 grpc_slice_ref_internal(slice);
290 grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
291 grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->on_next, GRPC_ERROR_NONE);
292 s->on_next = nullptr;
293 s->unprocessed_incoming_frames_decompressed = false;
294 } else {
295 grpc_slice_ref_internal(slice);
296 grpc_slice_buffer_add(&s->frame_storage, slice);
297 }
298
299 if (is_last && s->received_last_frame) {
300 grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE);
301 }
302
303 return GRPC_ERROR_NONE;
304 }
305