• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "test/core/call/batch_builder.h"
16 
17 #include <grpc/byte_buffer_reader.h>
18 
19 #include "src/core/lib/compression/message_compress.h"
20 
21 namespace grpc_core {
22 
ByteBufferFromSlice(Slice slice)23 ByteBufferUniquePtr ByteBufferFromSlice(Slice slice) {
24   return ByteBufferUniquePtr(
25       grpc_raw_byte_buffer_create(const_cast<grpc_slice*>(&slice.c_slice()), 1),
26       grpc_byte_buffer_destroy);
27 }
28 
FindInMetadataArray(const grpc_metadata_array & md,absl::string_view key)29 absl::optional<std::string> FindInMetadataArray(const grpc_metadata_array& md,
30                                                 absl::string_view key) {
31   for (size_t i = 0; i < md.count; i++) {
32     if (key == StringViewFromSlice(md.metadata[i].key)) {
33       return std::string(StringViewFromSlice(md.metadata[i].value));
34     }
35   }
36   return absl::nullopt;
37 }
38 
Get(absl::string_view key) const39 absl::optional<std::string> IncomingMetadata::Get(absl::string_view key) const {
40   return FindInMetadataArray(*metadata_, key);
41 }
42 
MakeOp()43 grpc_op IncomingMetadata::MakeOp() {
44   grpc_op op;
45   memset(&op, 0, sizeof(op));
46   op.op = GRPC_OP_RECV_INITIAL_METADATA;
47   op.data.recv_initial_metadata.recv_initial_metadata = metadata_.get();
48   return op;
49 }
50 
GetSuccessfulStateString()51 std::string IncomingMetadata::GetSuccessfulStateString() {
52   std::string out = "incoming_metadata: {";
53   for (size_t i = 0; i < metadata_->count; i++) {
54     absl::StrAppend(&out, StringViewFromSlice(metadata_->metadata[i].key), ":",
55                     StringViewFromSlice(metadata_->metadata[i].value), ",");
56   }
57   return out + "}";
58 }
59 
payload() const60 std::string IncomingMessage::payload() const {
61   Slice out;
62   if (payload_->data.raw.compression > GRPC_COMPRESS_NONE) {
63     grpc_slice_buffer decompressed_buffer;
64     grpc_slice_buffer_init(&decompressed_buffer);
65     CHECK(grpc_msg_decompress(payload_->data.raw.compression,
66                               &payload_->data.raw.slice_buffer,
67                               &decompressed_buffer));
68     grpc_byte_buffer* rbb = grpc_raw_byte_buffer_create(
69         decompressed_buffer.slices, decompressed_buffer.count);
70     grpc_byte_buffer_reader reader;
71     CHECK(grpc_byte_buffer_reader_init(&reader, rbb));
72     out = Slice(grpc_byte_buffer_reader_readall(&reader));
73     grpc_byte_buffer_reader_destroy(&reader);
74     grpc_byte_buffer_destroy(rbb);
75     grpc_slice_buffer_destroy(&decompressed_buffer);
76   } else {
77     grpc_byte_buffer_reader reader;
78     CHECK(grpc_byte_buffer_reader_init(&reader, payload_));
79     out = Slice(grpc_byte_buffer_reader_readall(&reader));
80     grpc_byte_buffer_reader_destroy(&reader);
81   }
82   return std::string(out.begin(), out.end());
83 }
84 
MakeOp()85 grpc_op IncomingMessage::MakeOp() {
86   grpc_op op;
87   memset(&op, 0, sizeof(op));
88   op.op = GRPC_OP_RECV_MESSAGE;
89   op.data.recv_message.recv_message = &payload_;
90   return op;
91 }
92 
GetTrailingMetadata(absl::string_view key) const93 absl::optional<std::string> IncomingStatusOnClient::GetTrailingMetadata(
94     absl::string_view key) const {
95   return FindInMetadataArray(data_->trailing_metadata, key);
96 }
97 
GetSuccessfulStateString()98 std::string IncomingStatusOnClient::GetSuccessfulStateString() {
99   std::string out = absl::StrCat(
100       "status_on_client: status=", data_->status,
101       " msg=", data_->status_details.as_string_view(), " trailing_metadata={");
102   for (size_t i = 0; i < data_->trailing_metadata.count; i++) {
103     absl::StrAppend(
104         &out, StringViewFromSlice(data_->trailing_metadata.metadata[i].key),
105         ": ", StringViewFromSlice(data_->trailing_metadata.metadata[i].value),
106         ",");
107   }
108   return out + "}";
109 }
110 
GetSuccessfulStateString()111 std::string IncomingMessage::GetSuccessfulStateString() {
112   if (payload_ == nullptr) return "message: empty";
113   return absl::StrCat("message: ", payload().size(), "b uncompressed");
114 }
115 
MakeOp()116 grpc_op IncomingStatusOnClient::MakeOp() {
117   grpc_op op;
118   memset(&op, 0, sizeof(op));
119   op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
120   op.data.recv_status_on_client.trailing_metadata = &data_->trailing_metadata;
121   op.data.recv_status_on_client.status = &data_->status;
122   op.data.recv_status_on_client.status_details =
123       const_cast<grpc_slice*>(&data_->status_details.c_slice());
124   op.data.recv_status_on_client.error_string = &data_->error_string;
125   return op;
126 }
127 
MakeOp()128 grpc_op IncomingCloseOnServer::MakeOp() {
129   grpc_op op;
130   memset(&op, 0, sizeof(op));
131   op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
132   op.data.recv_close_on_server.cancelled = &cancelled_;
133   return op;
134 }
135 
SendInitialMetadata(std::initializer_list<std::pair<absl::string_view,absl::string_view>> md,uint32_t flags,absl::optional<grpc_compression_level> compression_level)136 BatchBuilder& BatchBuilder::SendInitialMetadata(
137     std::initializer_list<std::pair<absl::string_view, absl::string_view>> md,
138     uint32_t flags, absl::optional<grpc_compression_level> compression_level) {
139   auto& v = Make<std::vector<grpc_metadata>>();
140   for (const auto& p : md) {
141     grpc_metadata m;
142     m.key = Make<Slice>(Slice::FromCopiedString(p.first)).c_slice();
143     m.value = Make<Slice>(Slice::FromCopiedString(p.second)).c_slice();
144     v.push_back(m);
145   }
146   grpc_op op;
147   memset(&op, 0, sizeof(op));
148   op.op = GRPC_OP_SEND_INITIAL_METADATA;
149   op.flags = flags;
150   op.data.send_initial_metadata.count = v.size();
151   op.data.send_initial_metadata.metadata = v.data();
152   if (compression_level.has_value()) {
153     op.data.send_initial_metadata.maybe_compression_level.is_set = 1;
154     op.data.send_initial_metadata.maybe_compression_level.level =
155         compression_level.value();
156   }
157   ops_.push_back(op);
158   return *this;
159 }
160 
SendMessage(Slice payload,uint32_t flags)161 BatchBuilder& BatchBuilder::SendMessage(Slice payload, uint32_t flags) {
162   grpc_op op;
163   memset(&op, 0, sizeof(op));
164   op.op = GRPC_OP_SEND_MESSAGE;
165   op.data.send_message.send_message =
166       Make<ByteBufferUniquePtr>(ByteBufferFromSlice(std::move(payload))).get();
167   op.flags = flags;
168   ops_.push_back(op);
169   return *this;
170 }
171 
SendCloseFromClient()172 BatchBuilder& BatchBuilder::SendCloseFromClient() {
173   grpc_op op;
174   memset(&op, 0, sizeof(op));
175   op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
176   ops_.push_back(op);
177   return *this;
178 }
179 
SendStatusFromServer(grpc_status_code status,absl::string_view message,std::initializer_list<std::pair<absl::string_view,absl::string_view>> md)180 BatchBuilder& BatchBuilder::SendStatusFromServer(
181     grpc_status_code status, absl::string_view message,
182     std::initializer_list<std::pair<absl::string_view, absl::string_view>> md) {
183   auto& v = Make<std::vector<grpc_metadata>>();
184   for (const auto& p : md) {
185     grpc_metadata m;
186     m.key = Make<Slice>(Slice::FromCopiedString(p.first)).c_slice();
187     m.value = Make<Slice>(Slice::FromCopiedString(p.second)).c_slice();
188     v.push_back(m);
189   }
190   grpc_op op;
191   memset(&op, 0, sizeof(op));
192   op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
193   op.data.send_status_from_server.trailing_metadata_count = v.size();
194   op.data.send_status_from_server.trailing_metadata = v.data();
195   op.data.send_status_from_server.status = status;
196   op.data.send_status_from_server.status_details = &Make<grpc_slice>(
197       Make<Slice>(Slice::FromCopiedString(message)).c_slice());
198   ops_.push_back(op);
199   return *this;
200 }
201 
~BatchBuilder()202 BatchBuilder::~BatchBuilder() {
203   grpc_call_error err = grpc_call_start_batch(call_, ops_.data(), ops_.size(),
204                                               CqVerifier::tag(tag_), nullptr);
205   EXPECT_EQ(err, GRPC_CALL_OK) << grpc_call_error_to_string(err);
206 }
207 
208 }  // namespace grpc_core
209