1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
22
23 #include <string.h>
24
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/string_util.h>
28 #include "src/core/ext/transport/chttp2/transport/internal.h"
29 #include "src/core/lib/gpr/string.h"
30 #include "src/core/lib/gprpp/memory.h"
31 #include "src/core/lib/slice/slice_internal.h"
32 #include "src/core/lib/slice/slice_string_helpers.h"
33 #include "src/core/lib/transport/transport.h"
34
grpc_chttp2_data_parser_init(grpc_chttp2_data_parser * parser)35 grpc_error* grpc_chttp2_data_parser_init(grpc_chttp2_data_parser* parser) {
36 parser->state = GRPC_CHTTP2_DATA_FH_0;
37 parser->parsing_frame = nullptr;
38 return GRPC_ERROR_NONE;
39 }
40
grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser * parser)41 void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser* parser) {
42 if (parser->parsing_frame != nullptr) {
43 GRPC_ERROR_UNREF(parser->parsing_frame->Finished(
44 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false));
45 }
46 GRPC_ERROR_UNREF(parser->error);
47 }
48
grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser * parser,uint8_t flags,uint32_t stream_id,grpc_chttp2_stream * s)49 grpc_error* grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser* parser,
50 uint8_t flags,
51 uint32_t stream_id,
52 grpc_chttp2_stream* s) {
53 if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
54 char* msg;
55 gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags);
56 grpc_error* err = grpc_error_set_int(
57 GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg), GRPC_ERROR_INT_STREAM_ID,
58 static_cast<intptr_t>(stream_id));
59 gpr_free(msg);
60 return err;
61 }
62
63 if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
64 s->received_last_frame = true;
65 } else {
66 s->received_last_frame = false;
67 }
68
69 return GRPC_ERROR_NONE;
70 }
71
grpc_chttp2_encode_data(uint32_t id,grpc_slice_buffer * inbuf,uint32_t write_bytes,int is_eof,grpc_transport_one_way_stats * stats,grpc_slice_buffer * outbuf)72 void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
73 uint32_t write_bytes, int is_eof,
74 grpc_transport_one_way_stats* stats,
75 grpc_slice_buffer* outbuf) {
76 grpc_slice hdr;
77 uint8_t* p;
78 static const size_t header_size = 9;
79
80 hdr = GRPC_SLICE_MALLOC(header_size);
81 p = GRPC_SLICE_START_PTR(hdr);
82 GPR_ASSERT(write_bytes < (1 << 24));
83 *p++ = static_cast<uint8_t>(write_bytes >> 16);
84 *p++ = static_cast<uint8_t>(write_bytes >> 8);
85 *p++ = static_cast<uint8_t>(write_bytes);
86 *p++ = GRPC_CHTTP2_FRAME_DATA;
87 *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0;
88 *p++ = static_cast<uint8_t>(id >> 24);
89 *p++ = static_cast<uint8_t>(id >> 16);
90 *p++ = static_cast<uint8_t>(id >> 8);
91 *p++ = static_cast<uint8_t>(id);
92 grpc_slice_buffer_add(outbuf, hdr);
93
94 grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf);
95
96 stats->framing_bytes += header_size;
97 stats->data_bytes += write_bytes;
98 }
99
grpc_deframe_unprocessed_incoming_frames(grpc_chttp2_data_parser * p,grpc_chttp2_stream * s,grpc_slice_buffer * slices,grpc_slice * slice_out,grpc_core::OrphanablePtr<grpc_core::ByteStream> * stream_out)100 grpc_error* grpc_deframe_unprocessed_incoming_frames(
101 grpc_chttp2_data_parser* p, grpc_chttp2_stream* s,
102 grpc_slice_buffer* slices, grpc_slice* slice_out,
103 grpc_core::OrphanablePtr<grpc_core::ByteStream>* stream_out) {
104 grpc_error* error = GRPC_ERROR_NONE;
105 grpc_chttp2_transport* t = s->t;
106
107 while (slices->count > 0) {
108 uint8_t* beg = nullptr;
109 uint8_t* end = nullptr;
110 uint8_t* cur = nullptr;
111
112 grpc_slice slice = grpc_slice_buffer_take_first(slices);
113
114 beg = GRPC_SLICE_START_PTR(slice);
115 end = GRPC_SLICE_END_PTR(slice);
116 cur = beg;
117 uint32_t message_flags;
118 char* msg;
119
120 if (cur == end) {
121 grpc_slice_unref_internal(slice);
122 continue;
123 }
124
125 switch (p->state) {
126 case GRPC_CHTTP2_DATA_ERROR:
127 p->state = GRPC_CHTTP2_DATA_ERROR;
128 grpc_slice_unref_internal(slice);
129 return GRPC_ERROR_REF(p->error);
130 case GRPC_CHTTP2_DATA_FH_0:
131 s->stats.incoming.framing_bytes++;
132 p->frame_type = *cur;
133 switch (p->frame_type) {
134 case 0:
135 p->is_frame_compressed = false; /* GPR_FALSE */
136 break;
137 case 1:
138 p->is_frame_compressed = true; /* GPR_TRUE */
139 break;
140 default:
141 gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
142 p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
143 p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
144 static_cast<intptr_t>(s->id));
145 gpr_free(msg);
146 msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
147 p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
148 grpc_slice_from_copied_string(msg));
149 gpr_free(msg);
150 p->error =
151 grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
152 p->state = GRPC_CHTTP2_DATA_ERROR;
153 grpc_slice_unref_internal(slice);
154 return GRPC_ERROR_REF(p->error);
155 }
156 if (++cur == end) {
157 p->state = GRPC_CHTTP2_DATA_FH_1;
158 grpc_slice_unref_internal(slice);
159 continue;
160 }
161 /* fallthrough */
162 case GRPC_CHTTP2_DATA_FH_1:
163 s->stats.incoming.framing_bytes++;
164 p->frame_size = (static_cast<uint32_t>(*cur)) << 24;
165 if (++cur == end) {
166 p->state = GRPC_CHTTP2_DATA_FH_2;
167 grpc_slice_unref_internal(slice);
168 continue;
169 }
170 /* fallthrough */
171 case GRPC_CHTTP2_DATA_FH_2:
172 s->stats.incoming.framing_bytes++;
173 p->frame_size |= (static_cast<uint32_t>(*cur)) << 16;
174 if (++cur == end) {
175 p->state = GRPC_CHTTP2_DATA_FH_3;
176 grpc_slice_unref_internal(slice);
177 continue;
178 }
179 /* fallthrough */
180 case GRPC_CHTTP2_DATA_FH_3:
181 s->stats.incoming.framing_bytes++;
182 p->frame_size |= (static_cast<uint32_t>(*cur)) << 8;
183 if (++cur == end) {
184 p->state = GRPC_CHTTP2_DATA_FH_4;
185 grpc_slice_unref_internal(slice);
186 continue;
187 }
188 /* fallthrough */
189 case GRPC_CHTTP2_DATA_FH_4:
190 s->stats.incoming.framing_bytes++;
191 GPR_ASSERT(stream_out != nullptr);
192 GPR_ASSERT(p->parsing_frame == nullptr);
193 p->frame_size |= (static_cast<uint32_t>(*cur));
194 p->state = GRPC_CHTTP2_DATA_FRAME;
195 ++cur;
196 message_flags = 0;
197 if (p->is_frame_compressed) {
198 message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
199 }
200 p->parsing_frame = grpc_core::New<grpc_core::Chttp2IncomingByteStream>(
201 t, s, p->frame_size, message_flags);
202 stream_out->reset(p->parsing_frame);
203 if (p->parsing_frame->remaining_bytes() == 0) {
204 GRPC_ERROR_UNREF(p->parsing_frame->Finished(GRPC_ERROR_NONE, true));
205 p->parsing_frame = nullptr;
206 p->state = GRPC_CHTTP2_DATA_FH_0;
207 }
208 s->pending_byte_stream = true;
209
210 if (cur != end) {
211 grpc_slice_buffer_undo_take_first(
212 slices, grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
213 static_cast<size_t>(end - beg)));
214 }
215 grpc_slice_unref_internal(slice);
216 return GRPC_ERROR_NONE;
217 case GRPC_CHTTP2_DATA_FRAME: {
218 GPR_ASSERT(p->parsing_frame != nullptr);
219 GPR_ASSERT(slice_out != nullptr);
220 if (cur == end) {
221 grpc_slice_unref_internal(slice);
222 continue;
223 }
224 uint32_t remaining = static_cast<uint32_t>(end - cur);
225 if (remaining == p->frame_size) {
226 s->stats.incoming.data_bytes += remaining;
227 if (GRPC_ERROR_NONE !=
228 (error = p->parsing_frame->Push(
229 grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
230 static_cast<size_t>(end - beg)),
231 slice_out))) {
232 grpc_slice_unref_internal(slice);
233 return error;
234 }
235 if (GRPC_ERROR_NONE !=
236 (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
237 grpc_slice_unref_internal(slice);
238 return error;
239 }
240 p->parsing_frame = nullptr;
241 p->state = GRPC_CHTTP2_DATA_FH_0;
242 grpc_slice_unref_internal(slice);
243 return GRPC_ERROR_NONE;
244 } else if (remaining < p->frame_size) {
245 s->stats.incoming.data_bytes += remaining;
246 if (GRPC_ERROR_NONE !=
247 (error = p->parsing_frame->Push(
248 grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
249 static_cast<size_t>(end - beg)),
250 slice_out))) {
251 return error;
252 }
253 p->frame_size -= remaining;
254 grpc_slice_unref_internal(slice);
255 return GRPC_ERROR_NONE;
256 } else {
257 GPR_ASSERT(remaining > p->frame_size);
258 s->stats.incoming.data_bytes += p->frame_size;
259 if (GRPC_ERROR_NONE !=
260 p->parsing_frame->Push(
261 grpc_slice_sub(
262 slice, static_cast<size_t>(cur - beg),
263 static_cast<size_t>(cur + p->frame_size - beg)),
264 slice_out)) {
265 grpc_slice_unref_internal(slice);
266 return error;
267 }
268 if (GRPC_ERROR_NONE !=
269 (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
270 grpc_slice_unref_internal(slice);
271 return error;
272 }
273 p->parsing_frame = nullptr;
274 p->state = GRPC_CHTTP2_DATA_FH_0;
275 cur += p->frame_size;
276 grpc_slice_buffer_undo_take_first(
277 slices, grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
278 static_cast<size_t>(end - beg)));
279 grpc_slice_unref_internal(slice);
280 return GRPC_ERROR_NONE;
281 }
282 }
283 }
284 }
285
286 return GRPC_ERROR_NONE;
287 }
288
grpc_chttp2_data_parser_parse(void * parser,grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_slice slice,int is_last)289 grpc_error* grpc_chttp2_data_parser_parse(void* parser,
290 grpc_chttp2_transport* t,
291 grpc_chttp2_stream* s,
292 grpc_slice slice, int is_last) {
293 if (!s->pending_byte_stream) {
294 grpc_slice_ref_internal(slice);
295 grpc_slice_buffer_add(&s->frame_storage, slice);
296 grpc_chttp2_maybe_complete_recv_message(t, s);
297 } else if (s->on_next) {
298 GPR_ASSERT(s->frame_storage.length == 0);
299 grpc_slice_ref_internal(slice);
300 grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
301 GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_NONE);
302 s->on_next = nullptr;
303 s->unprocessed_incoming_frames_decompressed = false;
304 } else {
305 grpc_slice_ref_internal(slice);
306 grpc_slice_buffer_add(&s->frame_storage, slice);
307 }
308
309 if (is_last && s->received_last_frame) {
310 grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE);
311 }
312
313 return GRPC_ERROR_NONE;
314 }
315