• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
20 
21 #include <grpc/slice_buffer.h>
22 #include <grpc/support/port_platform.h>
23 #include <stdlib.h>
24 
25 #include "absl/log/check.h"
26 #include "absl/status/status.h"
27 #include "absl/strings/str_format.h"
28 #include "src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h"
29 #include "src/core/ext/transport/chttp2/transport/internal.h"
30 #include "src/core/lib/experiments/experiments.h"
31 #include "src/core/lib/slice/slice.h"
32 #include "src/core/lib/slice/slice_buffer.h"
33 #include "src/core/lib/transport/transport.h"
34 #include "src/core/util/status_helper.h"
35 
grpc_chttp2_data_parser_begin_frame(uint8_t flags,uint32_t stream_id,grpc_chttp2_stream * s)36 absl::Status grpc_chttp2_data_parser_begin_frame(uint8_t flags,
37                                                  uint32_t stream_id,
38                                                  grpc_chttp2_stream* s) {
39   if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
40     return absl::InternalError(absl::StrFormat(
41         "unsupported data flags: 0x%02x stream: %d", flags, stream_id));
42   }
43 
44   if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
45     s->received_last_frame = true;
46     s->eos_received = true;
47   } else {
48     s->received_last_frame = false;
49   }
50 
51   return absl::OkStatus();
52 }
53 
grpc_chttp2_encode_data(uint32_t id,grpc_slice_buffer * inbuf,uint32_t write_bytes,int is_eof,grpc_core::CallTracerInterface * call_tracer,grpc_slice_buffer * outbuf)54 void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
55                              uint32_t write_bytes, int is_eof,
56                              grpc_core::CallTracerInterface* call_tracer,
57                              grpc_slice_buffer* outbuf) {
58   grpc_slice hdr;
59   uint8_t* p;
60   static const size_t header_size = 9;
61 
62   hdr = GRPC_SLICE_MALLOC(header_size);
63   p = GRPC_SLICE_START_PTR(hdr);
64   CHECK(write_bytes < (1 << 24));
65   *p++ = static_cast<uint8_t>(write_bytes >> 16);
66   *p++ = static_cast<uint8_t>(write_bytes >> 8);
67   *p++ = static_cast<uint8_t>(write_bytes);
68   *p++ = GRPC_CHTTP2_FRAME_DATA;
69   *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0;
70   *p++ = static_cast<uint8_t>(id >> 24);
71   *p++ = static_cast<uint8_t>(id >> 16);
72   *p++ = static_cast<uint8_t>(id >> 8);
73   *p++ = static_cast<uint8_t>(id);
74   grpc_slice_buffer_add(outbuf, hdr);
75 
76   grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf);
77 
78   call_tracer->RecordOutgoingBytes({header_size, 0, 0});
79 }
80 
grpc_deframe_unprocessed_incoming_frames(grpc_chttp2_stream * s,int64_t * min_progress_size,grpc_core::SliceBuffer * stream_out,uint32_t * message_flags)81 grpc_core::Poll<grpc_error_handle> grpc_deframe_unprocessed_incoming_frames(
82     grpc_chttp2_stream* s, int64_t* min_progress_size,
83     grpc_core::SliceBuffer* stream_out, uint32_t* message_flags) {
84   grpc_slice_buffer* slices = &s->frame_storage;
85   grpc_error_handle error;
86 
87   if (slices->length < GRPC_HEADER_SIZE_IN_BYTES) {
88     if (min_progress_size != nullptr) {
89       *min_progress_size = GRPC_HEADER_SIZE_IN_BYTES - slices->length;
90     }
91     return grpc_core::Pending{};
92   }
93 
94   uint8_t header[GRPC_HEADER_SIZE_IN_BYTES];
95   grpc_slice_buffer_copy_first_into_buffer(slices, GRPC_HEADER_SIZE_IN_BYTES,
96                                            header);
97 
98   switch (header[0]) {
99     case 0:
100       if (message_flags != nullptr) *message_flags = 0;
101       break;
102     case 1:
103       if (message_flags != nullptr) {
104         *message_flags = GRPC_WRITE_INTERNAL_COMPRESS;
105       }
106       break;
107     default:
108       error = GRPC_ERROR_CREATE(
109           absl::StrFormat("Bad GRPC frame type 0x%02x", header[0]));
110       error = grpc_error_set_int(error, grpc_core::StatusIntProperty::kStreamId,
111                                  static_cast<intptr_t>(s->id));
112       return error;
113   }
114 
115   size_t length = (static_cast<uint32_t>(header[1]) << 24) |
116                   (static_cast<uint32_t>(header[2]) << 16) |
117                   (static_cast<uint32_t>(header[3]) << 8) |
118                   static_cast<uint32_t>(header[4]);
119 
120   if (slices->length < length + GRPC_HEADER_SIZE_IN_BYTES) {
121     if (min_progress_size != nullptr) {
122       *min_progress_size = length + GRPC_HEADER_SIZE_IN_BYTES - slices->length;
123     }
124     return grpc_core::Pending{};
125   }
126 
127   if (min_progress_size != nullptr) *min_progress_size = 0;
128 
129   if (stream_out != nullptr) {
130     s->call_tracer_wrapper.RecordIncomingBytes(
131         {GRPC_HEADER_SIZE_IN_BYTES, length, 0});
132     grpc_slice_buffer_move_first_into_buffer(slices, GRPC_HEADER_SIZE_IN_BYTES,
133                                              header);
134     grpc_slice_buffer_move_first(slices, length, stream_out->c_slice_buffer());
135   }
136 
137   return absl::OkStatus();
138 }
139 
grpc_chttp2_data_parser_parse(void *,grpc_chttp2_transport * t,grpc_chttp2_stream * s,const grpc_slice & slice,int is_last)140 grpc_error_handle grpc_chttp2_data_parser_parse(void* /*parser*/,
141                                                 grpc_chttp2_transport* t,
142                                                 grpc_chttp2_stream* s,
143                                                 const grpc_slice& slice,
144                                                 int is_last) {
145   grpc_core::CSliceRef(slice);
146   grpc_slice_buffer_add(&s->frame_storage, slice);
147   grpc_chttp2_maybe_complete_recv_message(t, s);
148 
149   if (is_last && s->received_last_frame) {
150     grpc_chttp2_mark_stream_closed(
151         t, s, true, false,
152         t->is_client
153             ? GRPC_ERROR_CREATE("Data frame with END_STREAM flag received")
154             : absl::OkStatus());
155   }
156 
157   return absl::OkStatus();
158 }
159