1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #ifndef GRPCXX_CHANNEL_FILTER_H
20 #define GRPCXX_CHANNEL_FILTER_H
21
22 #include <grpc/grpc.h>
23 #include <grpc/support/alloc.h>
24 #include <grpcpp/impl/codegen/config.h>
25
26 #include <functional>
27 #include <vector>
28
29 #include "src/core/lib/channel/channel_stack.h"
30 #include "src/core/lib/surface/channel_init.h"
31 #include "src/core/lib/transport/metadata_batch.h"
32
33 /// An interface to define filters.
34 ///
35 /// To define a filter, implement a subclass of each of \c CallData and
36 /// \c ChannelData. Then register the filter using something like this:
37 /// \code{.cpp}
38 /// RegisterChannelFilter<MyChannelDataSubclass, MyCallDataSubclass>(
39 /// "name-of-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr);
40 /// \endcode
41
42 namespace grpc {
43
44 /// A C++ wrapper for the \c grpc_metadata_batch struct.
45 class MetadataBatch {
46 public:
47 /// Borrows a pointer to \a batch, but does NOT take ownership.
48 /// The caller must ensure that \a batch continues to exist for as
49 /// long as the MetadataBatch object does.
MetadataBatch(grpc_metadata_batch * batch)50 explicit MetadataBatch(grpc_metadata_batch* batch) : batch_(batch) {}
51
batch()52 grpc_metadata_batch* batch() const { return batch_; }
53
54 /// Adds metadata and returns the newly allocated storage.
55 /// The caller takes ownership of the result, which must exist for the
56 /// lifetime of the gRPC call.
57 grpc_linked_mdelem* AddMetadata(const string& key, const string& value);
58
59 class const_iterator : public std::iterator<std::bidirectional_iterator_tag,
60 const grpc_mdelem> {
61 public:
62 const grpc_mdelem& operator*() const { return elem_->md; }
63 const grpc_mdelem operator->() const { return elem_->md; }
64
65 const_iterator& operator++() {
66 elem_ = elem_->next;
67 return *this;
68 }
69 const_iterator operator++(int) {
70 const_iterator tmp(*this);
71 operator++();
72 return tmp;
73 }
74 const_iterator& operator--() {
75 elem_ = elem_->prev;
76 return *this;
77 }
78 const_iterator operator--(int) {
79 const_iterator tmp(*this);
80 operator--();
81 return tmp;
82 }
83
84 bool operator==(const const_iterator& other) const {
85 return elem_ == other.elem_;
86 }
87 bool operator!=(const const_iterator& other) const {
88 return elem_ != other.elem_;
89 }
90
91 private:
92 friend class MetadataBatch;
const_iterator(grpc_linked_mdelem * elem)93 explicit const_iterator(grpc_linked_mdelem* elem) : elem_(elem) {}
94
95 grpc_linked_mdelem* elem_;
96 };
97
begin()98 const_iterator begin() const { return const_iterator(batch_->list.head); }
end()99 const_iterator end() const { return const_iterator(nullptr); }
100
101 private:
102 grpc_metadata_batch* batch_; // Not owned.
103 };
104
105 /// A C++ wrapper for the \c grpc_transport_op struct.
106 class TransportOp {
107 public:
108 /// Borrows a pointer to \a op, but does NOT take ownership.
109 /// The caller must ensure that \a op continues to exist for as
110 /// long as the TransportOp object does.
TransportOp(grpc_transport_op * op)111 explicit TransportOp(grpc_transport_op* op) : op_(op) {}
112
op()113 grpc_transport_op* op() const { return op_; }
114
115 // TODO(roth): Add a C++ wrapper for grpc_error?
disconnect_with_error()116 grpc_error* disconnect_with_error() const {
117 return op_->disconnect_with_error;
118 }
send_goaway()119 bool send_goaway() const { return op_->goaway_error != GRPC_ERROR_NONE; }
120
121 // TODO(roth): Add methods for additional fields as needed.
122
123 private:
124 grpc_transport_op* op_; // Not owned.
125 };
126
127 /// A C++ wrapper for the \c grpc_transport_stream_op_batch struct.
128 class TransportStreamOpBatch {
129 public:
130 /// Borrows a pointer to \a op, but does NOT take ownership.
131 /// The caller must ensure that \a op continues to exist for as
132 /// long as the TransportStreamOpBatch object does.
TransportStreamOpBatch(grpc_transport_stream_op_batch * op)133 explicit TransportStreamOpBatch(grpc_transport_stream_op_batch* op)
134 : op_(op),
135 send_initial_metadata_(
136 op->send_initial_metadata
137 ? op->payload->send_initial_metadata.send_initial_metadata
138 : nullptr),
139 send_trailing_metadata_(
140 op->send_trailing_metadata
141 ? op->payload->send_trailing_metadata.send_trailing_metadata
142 : nullptr),
143 recv_initial_metadata_(
144 op->recv_initial_metadata
145 ? op->payload->recv_initial_metadata.recv_initial_metadata
146 : nullptr),
147 recv_trailing_metadata_(
148 op->recv_trailing_metadata
149 ? op->payload->recv_trailing_metadata.recv_trailing_metadata
150 : nullptr) {}
151
op()152 grpc_transport_stream_op_batch* op() const { return op_; }
153
on_complete()154 grpc_closure* on_complete() const { return op_->on_complete; }
set_on_complete(grpc_closure * closure)155 void set_on_complete(grpc_closure* closure) { op_->on_complete = closure; }
156
send_initial_metadata()157 MetadataBatch* send_initial_metadata() {
158 return op_->send_initial_metadata ? &send_initial_metadata_ : nullptr;
159 }
send_trailing_metadata()160 MetadataBatch* send_trailing_metadata() {
161 return op_->send_trailing_metadata ? &send_trailing_metadata_ : nullptr;
162 }
recv_initial_metadata()163 MetadataBatch* recv_initial_metadata() {
164 return op_->recv_initial_metadata ? &recv_initial_metadata_ : nullptr;
165 }
recv_trailing_metadata()166 MetadataBatch* recv_trailing_metadata() {
167 return op_->recv_trailing_metadata ? &recv_trailing_metadata_ : nullptr;
168 }
169
send_initial_metadata_flags()170 uint32_t* send_initial_metadata_flags() const {
171 return op_->send_initial_metadata ? &op_->payload->send_initial_metadata
172 .send_initial_metadata_flags
173 : nullptr;
174 }
175
recv_initial_metadata_ready()176 grpc_closure* recv_initial_metadata_ready() const {
177 return op_->recv_initial_metadata
178 ? op_->payload->recv_initial_metadata.recv_initial_metadata_ready
179 : nullptr;
180 }
set_recv_initial_metadata_ready(grpc_closure * closure)181 void set_recv_initial_metadata_ready(grpc_closure* closure) {
182 op_->payload->recv_initial_metadata.recv_initial_metadata_ready = closure;
183 }
184
send_message()185 grpc_core::OrphanablePtr<grpc_core::ByteStream>* send_message() const {
186 return op_->send_message ? &op_->payload->send_message.send_message
187 : nullptr;
188 }
set_send_message(grpc_core::OrphanablePtr<grpc_core::ByteStream> send_message)189 void set_send_message(
190 grpc_core::OrphanablePtr<grpc_core::ByteStream> send_message) {
191 op_->send_message = true;
192 op_->payload->send_message.send_message = std::move(send_message);
193 }
194
recv_message()195 grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message() const {
196 return op_->recv_message ? op_->payload->recv_message.recv_message
197 : nullptr;
198 }
set_recv_message(grpc_core::OrphanablePtr<grpc_core::ByteStream> * recv_message)199 void set_recv_message(
200 grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message) {
201 op_->recv_message = true;
202 op_->payload->recv_message.recv_message = recv_message;
203 }
204
get_census_context()205 census_context* get_census_context() const {
206 return static_cast<census_context*>(
207 op_->payload->context[GRPC_CONTEXT_TRACING].value);
208 }
209
get_peer_string()210 const gpr_atm* get_peer_string() const {
211 if (op_->send_initial_metadata &&
212 op_->payload->send_initial_metadata.peer_string != nullptr) {
213 return op_->payload->send_initial_metadata.peer_string;
214 } else if (op_->recv_initial_metadata &&
215 op_->payload->recv_initial_metadata.peer_string != nullptr) {
216 return op_->payload->recv_initial_metadata.peer_string;
217 } else {
218 return nullptr;
219 }
220 }
221
222 private:
223 grpc_transport_stream_op_batch* op_; // Not owned.
224 MetadataBatch send_initial_metadata_;
225 MetadataBatch send_trailing_metadata_;
226 MetadataBatch recv_initial_metadata_;
227 MetadataBatch recv_trailing_metadata_;
228 };
229
230 /// Represents channel data.
231 class ChannelData {
232 public:
ChannelData()233 ChannelData() {}
~ChannelData()234 virtual ~ChannelData() {}
235
236 // TODO(roth): Come up with a more C++-like API for the channel element.
237
238 /// Initializes the channel data.
Init(grpc_channel_element * elem,grpc_channel_element_args * args)239 virtual grpc_error* Init(grpc_channel_element* elem,
240 grpc_channel_element_args* args) {
241 return GRPC_ERROR_NONE;
242 }
243
244 // Called before destruction.
Destroy(grpc_channel_element * elem)245 virtual void Destroy(grpc_channel_element* elem) {}
246
247 virtual void StartTransportOp(grpc_channel_element* elem, TransportOp* op);
248
249 virtual void GetInfo(grpc_channel_element* elem,
250 const grpc_channel_info* channel_info);
251 };
252
253 /// Represents call data.
254 class CallData {
255 public:
CallData()256 CallData() {}
~CallData()257 virtual ~CallData() {}
258
259 // TODO(roth): Come up with a more C++-like API for the call element.
260
261 /// Initializes the call data.
Init(grpc_call_element * elem,const grpc_call_element_args * args)262 virtual grpc_error* Init(grpc_call_element* elem,
263 const grpc_call_element_args* args) {
264 return GRPC_ERROR_NONE;
265 }
266
267 // Called before destruction.
Destroy(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_call_closure)268 virtual void Destroy(grpc_call_element* elem,
269 const grpc_call_final_info* final_info,
270 grpc_closure* then_call_closure) {}
271
272 /// Starts a new stream operation.
273 virtual void StartTransportStreamOpBatch(grpc_call_element* elem,
274 TransportStreamOpBatch* op);
275
276 /// Sets a pollset or pollset set.
277 virtual void SetPollsetOrPollsetSet(grpc_call_element* elem,
278 grpc_polling_entity* pollent);
279 };
280
281 namespace internal {
282
283 // Defines static members for passing to C core.
284 // Members of this class correspond to the members of the C
285 // grpc_channel_filter struct.
286 template <typename ChannelDataType, typename CallDataType>
287 class ChannelFilter final {
288 public:
289 static const size_t channel_data_size = sizeof(ChannelDataType);
290
InitChannelElement(grpc_channel_element * elem,grpc_channel_element_args * args)291 static grpc_error* InitChannelElement(grpc_channel_element* elem,
292 grpc_channel_element_args* args) {
293 // Construct the object in the already-allocated memory.
294 ChannelDataType* channel_data = new (elem->channel_data) ChannelDataType();
295 return channel_data->Init(elem, args);
296 }
297
DestroyChannelElement(grpc_channel_element * elem)298 static void DestroyChannelElement(grpc_channel_element* elem) {
299 ChannelDataType* channel_data =
300 static_cast<ChannelDataType*>(elem->channel_data);
301 channel_data->Destroy(elem);
302 channel_data->~ChannelDataType();
303 }
304
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)305 static void StartTransportOp(grpc_channel_element* elem,
306 grpc_transport_op* op) {
307 ChannelDataType* channel_data =
308 static_cast<ChannelDataType*>(elem->channel_data);
309 TransportOp op_wrapper(op);
310 channel_data->StartTransportOp(elem, &op_wrapper);
311 }
312
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * channel_info)313 static void GetChannelInfo(grpc_channel_element* elem,
314 const grpc_channel_info* channel_info) {
315 ChannelDataType* channel_data =
316 static_cast<ChannelDataType*>(elem->channel_data);
317 channel_data->GetInfo(elem, channel_info);
318 }
319
320 static const size_t call_data_size = sizeof(CallDataType);
321
InitCallElement(grpc_call_element * elem,const grpc_call_element_args * args)322 static grpc_error* InitCallElement(grpc_call_element* elem,
323 const grpc_call_element_args* args) {
324 // Construct the object in the already-allocated memory.
325 CallDataType* call_data = new (elem->call_data) CallDataType();
326 return call_data->Init(elem, args);
327 }
328
DestroyCallElement(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_call_closure)329 static void DestroyCallElement(grpc_call_element* elem,
330 const grpc_call_final_info* final_info,
331 grpc_closure* then_call_closure) {
332 CallDataType* call_data = static_cast<CallDataType*>(elem->call_data);
333 call_data->Destroy(elem, final_info, then_call_closure);
334 call_data->~CallDataType();
335 }
336
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)337 static void StartTransportStreamOpBatch(grpc_call_element* elem,
338 grpc_transport_stream_op_batch* op) {
339 CallDataType* call_data = static_cast<CallDataType*>(elem->call_data);
340 TransportStreamOpBatch op_wrapper(op);
341 call_data->StartTransportStreamOpBatch(elem, &op_wrapper);
342 }
343
SetPollsetOrPollsetSet(grpc_call_element * elem,grpc_polling_entity * pollent)344 static void SetPollsetOrPollsetSet(grpc_call_element* elem,
345 grpc_polling_entity* pollent) {
346 CallDataType* call_data = static_cast<CallDataType*>(elem->call_data);
347 call_data->SetPollsetOrPollsetSet(elem, pollent);
348 }
349 };
350
351 struct FilterRecord {
352 grpc_channel_stack_type stack_type;
353 int priority;
354 std::function<bool(const grpc_channel_args&)> include_filter;
355 grpc_channel_filter filter;
356 };
357 extern std::vector<FilterRecord>* channel_filters;
358
359 void ChannelFilterPluginInit();
360 void ChannelFilterPluginShutdown();
361
362 } // namespace internal
363
364 /// Registers a new filter.
365 /// Must be called by only one thread at a time.
366 /// The \a include_filter argument specifies a function that will be called
367 /// to determine at run-time whether or not to add the filter. If the
368 /// value is nullptr, the filter will be added unconditionally.
369 template <typename ChannelDataType, typename CallDataType>
RegisterChannelFilter(const char * name,grpc_channel_stack_type stack_type,int priority,std::function<bool (const grpc_channel_args &)> include_filter)370 void RegisterChannelFilter(
371 const char* name, grpc_channel_stack_type stack_type, int priority,
372 std::function<bool(const grpc_channel_args&)> include_filter) {
373 // If we haven't been called before, initialize channel_filters and
374 // call grpc_register_plugin().
375 if (internal::channel_filters == nullptr) {
376 grpc_register_plugin(internal::ChannelFilterPluginInit,
377 internal::ChannelFilterPluginShutdown);
378 internal::channel_filters = new std::vector<internal::FilterRecord>();
379 }
380 // Add an entry to channel_filters. The filter will be added when the
381 // C-core initialization code calls ChannelFilterPluginInit().
382 typedef internal::ChannelFilter<ChannelDataType, CallDataType> FilterType;
383 internal::FilterRecord filter_record = {
384 stack_type,
385 priority,
386 include_filter,
387 {FilterType::StartTransportStreamOpBatch, FilterType::StartTransportOp,
388 FilterType::call_data_size, FilterType::InitCallElement,
389 FilterType::SetPollsetOrPollsetSet, FilterType::DestroyCallElement,
390 FilterType::channel_data_size, FilterType::InitChannelElement,
391 FilterType::DestroyChannelElement, FilterType::GetChannelInfo, name}};
392 internal::channel_filters->push_back(filter_record);
393 }
394
395 } // namespace grpc
396
397 #endif // GRPCXX_CHANNEL_FILTER_H
398