• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
22 
23 #include <string.h>
24 
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/string_util.h>
28 #include "src/core/ext/transport/chttp2/transport/internal.h"
29 #include "src/core/lib/gpr/string.h"
30 #include "src/core/lib/gprpp/memory.h"
31 #include "src/core/lib/slice/slice_internal.h"
32 #include "src/core/lib/slice/slice_string_helpers.h"
33 #include "src/core/lib/transport/transport.h"
34 
grpc_chttp2_data_parser_init(grpc_chttp2_data_parser * parser)35 grpc_error* grpc_chttp2_data_parser_init(grpc_chttp2_data_parser* parser) {
36   parser->state = GRPC_CHTTP2_DATA_FH_0;
37   parser->parsing_frame = nullptr;
38   return GRPC_ERROR_NONE;
39 }
40 
grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser * parser)41 void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser* parser) {
42   if (parser->parsing_frame != nullptr) {
43     GRPC_ERROR_UNREF(parser->parsing_frame->Finished(
44         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false));
45   }
46   GRPC_ERROR_UNREF(parser->error);
47 }
48 
grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser * parser,uint8_t flags,uint32_t stream_id,grpc_chttp2_stream * s)49 grpc_error* grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser* parser,
50                                                 uint8_t flags,
51                                                 uint32_t stream_id,
52                                                 grpc_chttp2_stream* s) {
53   if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
54     char* msg;
55     gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags);
56     grpc_error* err = grpc_error_set_int(
57         GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg), GRPC_ERROR_INT_STREAM_ID,
58         static_cast<intptr_t>(stream_id));
59     gpr_free(msg);
60     return err;
61   }
62 
63   if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
64     s->received_last_frame = true;
65   } else {
66     s->received_last_frame = false;
67   }
68 
69   return GRPC_ERROR_NONE;
70 }
71 
grpc_chttp2_encode_data(uint32_t id,grpc_slice_buffer * inbuf,uint32_t write_bytes,int is_eof,grpc_transport_one_way_stats * stats,grpc_slice_buffer * outbuf)72 void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
73                              uint32_t write_bytes, int is_eof,
74                              grpc_transport_one_way_stats* stats,
75                              grpc_slice_buffer* outbuf) {
76   grpc_slice hdr;
77   uint8_t* p;
78   static const size_t header_size = 9;
79 
80   hdr = GRPC_SLICE_MALLOC(header_size);
81   p = GRPC_SLICE_START_PTR(hdr);
82   GPR_ASSERT(write_bytes < (1 << 24));
83   *p++ = static_cast<uint8_t>(write_bytes >> 16);
84   *p++ = static_cast<uint8_t>(write_bytes >> 8);
85   *p++ = static_cast<uint8_t>(write_bytes);
86   *p++ = GRPC_CHTTP2_FRAME_DATA;
87   *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0;
88   *p++ = static_cast<uint8_t>(id >> 24);
89   *p++ = static_cast<uint8_t>(id >> 16);
90   *p++ = static_cast<uint8_t>(id >> 8);
91   *p++ = static_cast<uint8_t>(id);
92   grpc_slice_buffer_add(outbuf, hdr);
93 
94   grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf);
95 
96   stats->framing_bytes += header_size;
97   stats->data_bytes += write_bytes;
98 }
99 
grpc_deframe_unprocessed_incoming_frames(grpc_chttp2_data_parser * p,grpc_chttp2_stream * s,grpc_slice_buffer * slices,grpc_slice * slice_out,grpc_core::OrphanablePtr<grpc_core::ByteStream> * stream_out)100 grpc_error* grpc_deframe_unprocessed_incoming_frames(
101     grpc_chttp2_data_parser* p, grpc_chttp2_stream* s,
102     grpc_slice_buffer* slices, grpc_slice* slice_out,
103     grpc_core::OrphanablePtr<grpc_core::ByteStream>* stream_out) {
104   grpc_error* error = GRPC_ERROR_NONE;
105   grpc_chttp2_transport* t = s->t;
106 
107   while (slices->count > 0) {
108     uint8_t* beg = nullptr;
109     uint8_t* end = nullptr;
110     uint8_t* cur = nullptr;
111 
112     grpc_slice slice = grpc_slice_buffer_take_first(slices);
113 
114     beg = GRPC_SLICE_START_PTR(slice);
115     end = GRPC_SLICE_END_PTR(slice);
116     cur = beg;
117     uint32_t message_flags;
118     char* msg;
119 
120     if (cur == end) {
121       grpc_slice_unref_internal(slice);
122       continue;
123     }
124 
125     switch (p->state) {
126       case GRPC_CHTTP2_DATA_ERROR:
127         p->state = GRPC_CHTTP2_DATA_ERROR;
128         grpc_slice_unref_internal(slice);
129         return GRPC_ERROR_REF(p->error);
130       case GRPC_CHTTP2_DATA_FH_0:
131         s->stats.incoming.framing_bytes++;
132         p->frame_type = *cur;
133         switch (p->frame_type) {
134           case 0:
135             p->is_frame_compressed = false; /* GPR_FALSE */
136             break;
137           case 1:
138             p->is_frame_compressed = true; /* GPR_TRUE */
139             break;
140           default:
141             gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
142             p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
143             p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
144                                           static_cast<intptr_t>(s->id));
145             gpr_free(msg);
146             msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
147             p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
148                                           grpc_slice_from_copied_string(msg));
149             gpr_free(msg);
150             p->error =
151                 grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
152             p->state = GRPC_CHTTP2_DATA_ERROR;
153             grpc_slice_unref_internal(slice);
154             return GRPC_ERROR_REF(p->error);
155         }
156         if (++cur == end) {
157           p->state = GRPC_CHTTP2_DATA_FH_1;
158           grpc_slice_unref_internal(slice);
159           continue;
160         }
161       /* fallthrough */
162       case GRPC_CHTTP2_DATA_FH_1:
163         s->stats.incoming.framing_bytes++;
164         p->frame_size = (static_cast<uint32_t>(*cur)) << 24;
165         if (++cur == end) {
166           p->state = GRPC_CHTTP2_DATA_FH_2;
167           grpc_slice_unref_internal(slice);
168           continue;
169         }
170       /* fallthrough */
171       case GRPC_CHTTP2_DATA_FH_2:
172         s->stats.incoming.framing_bytes++;
173         p->frame_size |= (static_cast<uint32_t>(*cur)) << 16;
174         if (++cur == end) {
175           p->state = GRPC_CHTTP2_DATA_FH_3;
176           grpc_slice_unref_internal(slice);
177           continue;
178         }
179       /* fallthrough */
180       case GRPC_CHTTP2_DATA_FH_3:
181         s->stats.incoming.framing_bytes++;
182         p->frame_size |= (static_cast<uint32_t>(*cur)) << 8;
183         if (++cur == end) {
184           p->state = GRPC_CHTTP2_DATA_FH_4;
185           grpc_slice_unref_internal(slice);
186           continue;
187         }
188       /* fallthrough */
189       case GRPC_CHTTP2_DATA_FH_4:
190         s->stats.incoming.framing_bytes++;
191         GPR_ASSERT(stream_out != nullptr);
192         GPR_ASSERT(p->parsing_frame == nullptr);
193         p->frame_size |= (static_cast<uint32_t>(*cur));
194         p->state = GRPC_CHTTP2_DATA_FRAME;
195         ++cur;
196         message_flags = 0;
197         if (p->is_frame_compressed) {
198           message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
199         }
200         p->parsing_frame = grpc_core::New<grpc_core::Chttp2IncomingByteStream>(
201             t, s, p->frame_size, message_flags);
202         stream_out->reset(p->parsing_frame);
203         if (p->parsing_frame->remaining_bytes() == 0) {
204           GRPC_ERROR_UNREF(p->parsing_frame->Finished(GRPC_ERROR_NONE, true));
205           p->parsing_frame = nullptr;
206           p->state = GRPC_CHTTP2_DATA_FH_0;
207         }
208         s->pending_byte_stream = true;
209 
210         if (cur != end) {
211           grpc_slice_buffer_undo_take_first(
212               slices, grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
213                                      static_cast<size_t>(end - beg)));
214         }
215         grpc_slice_unref_internal(slice);
216         return GRPC_ERROR_NONE;
217       case GRPC_CHTTP2_DATA_FRAME: {
218         GPR_ASSERT(p->parsing_frame != nullptr);
219         GPR_ASSERT(slice_out != nullptr);
220         if (cur == end) {
221           grpc_slice_unref_internal(slice);
222           continue;
223         }
224         uint32_t remaining = static_cast<uint32_t>(end - cur);
225         if (remaining == p->frame_size) {
226           s->stats.incoming.data_bytes += remaining;
227           if (GRPC_ERROR_NONE !=
228               (error = p->parsing_frame->Push(
229                    grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
230                                   static_cast<size_t>(end - beg)),
231                    slice_out))) {
232             grpc_slice_unref_internal(slice);
233             return error;
234           }
235           if (GRPC_ERROR_NONE !=
236               (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
237             grpc_slice_unref_internal(slice);
238             return error;
239           }
240           p->parsing_frame = nullptr;
241           p->state = GRPC_CHTTP2_DATA_FH_0;
242           grpc_slice_unref_internal(slice);
243           return GRPC_ERROR_NONE;
244         } else if (remaining < p->frame_size) {
245           s->stats.incoming.data_bytes += remaining;
246           if (GRPC_ERROR_NONE !=
247               (error = p->parsing_frame->Push(
248                    grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
249                                   static_cast<size_t>(end - beg)),
250                    slice_out))) {
251             return error;
252           }
253           p->frame_size -= remaining;
254           grpc_slice_unref_internal(slice);
255           return GRPC_ERROR_NONE;
256         } else {
257           GPR_ASSERT(remaining > p->frame_size);
258           s->stats.incoming.data_bytes += p->frame_size;
259           if (GRPC_ERROR_NONE !=
260               p->parsing_frame->Push(
261                   grpc_slice_sub(
262                       slice, static_cast<size_t>(cur - beg),
263                       static_cast<size_t>(cur + p->frame_size - beg)),
264                   slice_out)) {
265             grpc_slice_unref_internal(slice);
266             return error;
267           }
268           if (GRPC_ERROR_NONE !=
269               (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
270             grpc_slice_unref_internal(slice);
271             return error;
272           }
273           p->parsing_frame = nullptr;
274           p->state = GRPC_CHTTP2_DATA_FH_0;
275           cur += p->frame_size;
276           grpc_slice_buffer_undo_take_first(
277               slices, grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
278                                      static_cast<size_t>(end - beg)));
279           grpc_slice_unref_internal(slice);
280           return GRPC_ERROR_NONE;
281         }
282       }
283     }
284   }
285 
286   return GRPC_ERROR_NONE;
287 }
288 
grpc_chttp2_data_parser_parse(void * parser,grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_slice slice,int is_last)289 grpc_error* grpc_chttp2_data_parser_parse(void* parser,
290                                           grpc_chttp2_transport* t,
291                                           grpc_chttp2_stream* s,
292                                           grpc_slice slice, int is_last) {
293   if (!s->pending_byte_stream) {
294     grpc_slice_ref_internal(slice);
295     grpc_slice_buffer_add(&s->frame_storage, slice);
296     grpc_chttp2_maybe_complete_recv_message(t, s);
297   } else if (s->on_next) {
298     GPR_ASSERT(s->frame_storage.length == 0);
299     grpc_slice_ref_internal(slice);
300     grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
301     GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_NONE);
302     s->on_next = nullptr;
303     s->unprocessed_incoming_frames_decompressed = false;
304   } else {
305     grpc_slice_ref_internal(slice);
306     grpc_slice_buffer_add(&s->frame_storage, slice);
307   }
308 
309   if (is_last && s->received_last_frame) {
310     grpc_chttp2_mark_stream_closed(t, s, true, false, GRPC_ERROR_NONE);
311   }
312 
313   return GRPC_ERROR_NONE;
314 }
315