1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
20
21 #include <grpc/slice_buffer.h>
22 #include <grpc/support/port_platform.h>
23 #include <stdlib.h>
24
25 #include "absl/log/check.h"
26 #include "absl/status/status.h"
27 #include "absl/strings/str_format.h"
28 #include "src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h"
29 #include "src/core/ext/transport/chttp2/transport/internal.h"
30 #include "src/core/lib/experiments/experiments.h"
31 #include "src/core/lib/slice/slice.h"
32 #include "src/core/lib/slice/slice_buffer.h"
33 #include "src/core/lib/transport/transport.h"
34 #include "src/core/util/status_helper.h"
35
grpc_chttp2_data_parser_begin_frame(uint8_t flags,uint32_t stream_id,grpc_chttp2_stream * s)36 absl::Status grpc_chttp2_data_parser_begin_frame(uint8_t flags,
37 uint32_t stream_id,
38 grpc_chttp2_stream* s) {
39 if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
40 return absl::InternalError(absl::StrFormat(
41 "unsupported data flags: 0x%02x stream: %d", flags, stream_id));
42 }
43
44 if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
45 s->received_last_frame = true;
46 s->eos_received = true;
47 } else {
48 s->received_last_frame = false;
49 }
50
51 return absl::OkStatus();
52 }
53
grpc_chttp2_encode_data(uint32_t id,grpc_slice_buffer * inbuf,uint32_t write_bytes,int is_eof,grpc_core::CallTracerInterface * call_tracer,grpc_slice_buffer * outbuf)54 void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
55 uint32_t write_bytes, int is_eof,
56 grpc_core::CallTracerInterface* call_tracer,
57 grpc_slice_buffer* outbuf) {
58 grpc_slice hdr;
59 uint8_t* p;
60 static const size_t header_size = 9;
61
62 hdr = GRPC_SLICE_MALLOC(header_size);
63 p = GRPC_SLICE_START_PTR(hdr);
64 CHECK(write_bytes < (1 << 24));
65 *p++ = static_cast<uint8_t>(write_bytes >> 16);
66 *p++ = static_cast<uint8_t>(write_bytes >> 8);
67 *p++ = static_cast<uint8_t>(write_bytes);
68 *p++ = GRPC_CHTTP2_FRAME_DATA;
69 *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0;
70 *p++ = static_cast<uint8_t>(id >> 24);
71 *p++ = static_cast<uint8_t>(id >> 16);
72 *p++ = static_cast<uint8_t>(id >> 8);
73 *p++ = static_cast<uint8_t>(id);
74 grpc_slice_buffer_add(outbuf, hdr);
75
76 grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf);
77
78 call_tracer->RecordOutgoingBytes({header_size, 0, 0});
79 }
80
grpc_deframe_unprocessed_incoming_frames(grpc_chttp2_stream * s,int64_t * min_progress_size,grpc_core::SliceBuffer * stream_out,uint32_t * message_flags)81 grpc_core::Poll<grpc_error_handle> grpc_deframe_unprocessed_incoming_frames(
82 grpc_chttp2_stream* s, int64_t* min_progress_size,
83 grpc_core::SliceBuffer* stream_out, uint32_t* message_flags) {
84 grpc_slice_buffer* slices = &s->frame_storage;
85 grpc_error_handle error;
86
87 if (slices->length < GRPC_HEADER_SIZE_IN_BYTES) {
88 if (min_progress_size != nullptr) {
89 *min_progress_size = GRPC_HEADER_SIZE_IN_BYTES - slices->length;
90 }
91 return grpc_core::Pending{};
92 }
93
94 uint8_t header[GRPC_HEADER_SIZE_IN_BYTES];
95 grpc_slice_buffer_copy_first_into_buffer(slices, GRPC_HEADER_SIZE_IN_BYTES,
96 header);
97
98 switch (header[0]) {
99 case 0:
100 if (message_flags != nullptr) *message_flags = 0;
101 break;
102 case 1:
103 if (message_flags != nullptr) {
104 *message_flags = GRPC_WRITE_INTERNAL_COMPRESS;
105 }
106 break;
107 default:
108 error = GRPC_ERROR_CREATE(
109 absl::StrFormat("Bad GRPC frame type 0x%02x", header[0]));
110 error = grpc_error_set_int(error, grpc_core::StatusIntProperty::kStreamId,
111 static_cast<intptr_t>(s->id));
112 return error;
113 }
114
115 size_t length = (static_cast<uint32_t>(header[1]) << 24) |
116 (static_cast<uint32_t>(header[2]) << 16) |
117 (static_cast<uint32_t>(header[3]) << 8) |
118 static_cast<uint32_t>(header[4]);
119
120 if (slices->length < length + GRPC_HEADER_SIZE_IN_BYTES) {
121 if (min_progress_size != nullptr) {
122 *min_progress_size = length + GRPC_HEADER_SIZE_IN_BYTES - slices->length;
123 }
124 return grpc_core::Pending{};
125 }
126
127 if (min_progress_size != nullptr) *min_progress_size = 0;
128
129 if (stream_out != nullptr) {
130 s->call_tracer_wrapper.RecordIncomingBytes(
131 {GRPC_HEADER_SIZE_IN_BYTES, length, 0});
132 grpc_slice_buffer_move_first_into_buffer(slices, GRPC_HEADER_SIZE_IN_BYTES,
133 header);
134 grpc_slice_buffer_move_first(slices, length, stream_out->c_slice_buffer());
135 }
136
137 return absl::OkStatus();
138 }
139
grpc_chttp2_data_parser_parse(void *,grpc_chttp2_transport * t,grpc_chttp2_stream * s,const grpc_slice & slice,int is_last)140 grpc_error_handle grpc_chttp2_data_parser_parse(void* /*parser*/,
141 grpc_chttp2_transport* t,
142 grpc_chttp2_stream* s,
143 const grpc_slice& slice,
144 int is_last) {
145 grpc_core::CSliceRef(slice);
146 grpc_slice_buffer_add(&s->frame_storage, slice);
147 grpc_chttp2_maybe_complete_recv_message(t, s);
148
149 if (is_last && s->received_last_frame) {
150 grpc_chttp2_mark_stream_closed(
151 t, s, true, false,
152 t->is_client
153 ? GRPC_ERROR_CREATE("Data frame with END_STREAM flag received")
154 : absl::OkStatus());
155 }
156
157 return absl::OkStatus();
158 }
159