• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 Google Inc. All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef FLATBUFFERS_GRPC_H_
18 #define FLATBUFFERS_GRPC_H_
19 
20 // Helper functionality to glue FlatBuffers and GRPC.
21 
22 #include "flatbuffers/flatbuffers.h"
23 #include "grpcpp/support/byte_buffer.h"
24 #include "grpcpp/support/slice.h"
25 
26 namespace flatbuffers {
27 namespace grpc {
28 
29 // Message is a typed wrapper around a buffer that manages the underlying
30 // `grpc_slice` and also provides flatbuffers-specific helpers such as `Verify`
31 // and `GetRoot`. Since it is backed by a `grpc_slice`, the underlying buffer
32 // is refcounted and ownership is be managed automatically.
33 template<class T> class Message {
34  public:
Message()35   Message() {}
36 
Message(::grpc::Slice slice)37   Message(::grpc::Slice slice) : slice_(slice) {}
38 
39   Message &operator=(const Message &other) = delete;
40 
41   Message(Message &&other) = default;
42 
43   Message(const Message &other) = delete;
44 
45   Message &operator=(Message &&other) = default;
46 
mutable_data()47   const uint8_t *mutable_data() const { return slice_.begin(); }
48 
data()49   const uint8_t *data() const { return slice_.begin(); }
50 
size()51   size_t size() const { return slice_.size(); }
52 
Verify()53   bool Verify() const {
54     Verifier verifier(data(), size());
55     return verifier.VerifyBuffer<T>(nullptr);
56   }
57 
GetMutableRoot()58   T *GetMutableRoot() { return flatbuffers::GetMutableRoot<T>(mutable_data()); }
59 
GetRoot()60   const T *GetRoot() const { return flatbuffers::GetRoot<T>(data()); }
61 
62   // This is only intended for serializer use, or if you know what you're doing
BorrowSlice()63   const ::grpc::Slice &BorrowSlice() const { return slice_; }
64 
65  private:
66   ::grpc::Slice slice_;
67 };
68 
69 class MessageBuilder;
70 
71 // SliceAllocator is a gRPC-specific allocator that uses the `grpc_slice`
72 // refcounted slices to manage memory ownership. This makes it easy and
73 // efficient to transfer buffers to gRPC.
74 class SliceAllocator : public Allocator {
75  public:
SliceAllocator()76   SliceAllocator() {}
77 
78   SliceAllocator(const SliceAllocator &other) = delete;
79   SliceAllocator &operator=(const SliceAllocator &other) = delete;
80 
SliceAllocator(SliceAllocator && other)81   SliceAllocator(SliceAllocator &&other) {
82     // default-construct and swap idiom
83     swap(other);
84   }
85 
86   SliceAllocator &operator=(SliceAllocator &&other) {
87     // move-construct and swap idiom
88     SliceAllocator temp(std::move(other));
89     swap(temp);
90     return *this;
91   }
92 
swap(SliceAllocator & other)93   void swap(SliceAllocator &other) {
94     using std::swap;
95     swap(slice_, other.slice_);
96   }
97 
~SliceAllocator()98   virtual ~SliceAllocator() {}
99 
allocate(size_t size)100   virtual uint8_t *allocate(size_t size) override {
101     FLATBUFFERS_ASSERT(slice_.size() == 0);
102     slice_ = ::grpc::Slice(size);
103     return const_cast<uint8_t *>(slice_.begin());
104   }
105 
deallocate(uint8_t * p,size_t size)106   virtual void deallocate(uint8_t *p, size_t size) override {
107     FLATBUFFERS_ASSERT(p == slice_.begin());
108     FLATBUFFERS_ASSERT(size == slice_.size());
109     slice_ = ::grpc::Slice();
110   }
111 
reallocate_downward(uint8_t * old_p,size_t old_size,size_t new_size,size_t in_use_back,size_t in_use_front)112   virtual uint8_t *reallocate_downward(uint8_t *old_p, size_t old_size,
113                                        size_t new_size, size_t in_use_back,
114                                        size_t in_use_front) override {
115     FLATBUFFERS_ASSERT(old_p == slice_.begin());
116     FLATBUFFERS_ASSERT(old_size == slice_.size());
117     FLATBUFFERS_ASSERT(new_size > old_size);
118     ::grpc::Slice old_slice = slice_;
119     ::grpc::Slice new_slice = ::grpc::Slice(new_size);
120     uint8_t *new_p = const_cast<uint8_t *>(new_slice.begin());
121     memcpy_downward(old_p, old_size, new_p, new_size, in_use_back,
122                     in_use_front);
123     slice_ = new_slice;
124     return new_p;
125   }
126 
127  private:
get_slice(uint8_t * p,size_t size)128   ::grpc::Slice &get_slice(uint8_t *p, size_t size) {
129     FLATBUFFERS_ASSERT(p == slice_.begin());
130     FLATBUFFERS_ASSERT(size == slice_.size());
131     return slice_;
132   }
133 
134   ::grpc::Slice slice_;
135 
136   friend class MessageBuilder;
137 };
138 
139 // SliceAllocatorMember is a hack to ensure that the MessageBuilder's
140 // slice_allocator_ member is constructed before the FlatBufferBuilder, since
141 // the allocator is used in the FlatBufferBuilder ctor.
142 namespace detail {
143 struct SliceAllocatorMember {
144   SliceAllocator slice_allocator_;
145 };
146 }  // namespace detail
147 
148 // MessageBuilder is a gRPC-specific FlatBufferBuilder that uses SliceAllocator
149 // to allocate gRPC buffers.
150 class MessageBuilder : private detail::SliceAllocatorMember,
151                        public FlatBufferBuilder {
152  public:
153   explicit MessageBuilder(uoffset_t initial_size = 1024)
154       : FlatBufferBuilder(initial_size, &slice_allocator_, false) {}
155 
156   MessageBuilder(const MessageBuilder &other) = delete;
157   MessageBuilder &operator=(const MessageBuilder &other) = delete;
158 
MessageBuilder(MessageBuilder && other)159   MessageBuilder(MessageBuilder &&other)
160       : FlatBufferBuilder(1024, &slice_allocator_, false) {
161     // Default construct and swap idiom.
162     Swap(other);
163   }
164 
165   /// Create a MessageBuilder from a FlatBufferBuilder.
166   explicit MessageBuilder(FlatBufferBuilder &&src,
167                           void (*dealloc)(void *,
168                                           size_t) = &DefaultAllocator::dealloc)
169       : FlatBufferBuilder(1024, &slice_allocator_, false) {
170     src.Swap(*this);
171     src.SwapBufAllocator(*this);
172     if (buf_.capacity()) {
173       uint8_t *buf = buf_.scratch_data();  // pointer to memory
174       size_t capacity = buf_.capacity();   // size of memory
175       slice_allocator_.slice_ = ::grpc::Slice(buf, capacity, dealloc);
176     } else {
177       slice_allocator_.slice_ = ::grpc::Slice();
178     }
179   }
180 
181   /// Move-assign a FlatBufferBuilder to a MessageBuilder.
182   /// Only FlatBufferBuilder with default allocator (basically, nullptr) is
183   /// supported.
184   MessageBuilder &operator=(FlatBufferBuilder &&src) {
185     // Move construct a temporary and swap
186     MessageBuilder temp(std::move(src));
187     Swap(temp);
188     return *this;
189   }
190 
191   MessageBuilder &operator=(MessageBuilder &&other) {
192     // Move construct a temporary and swap
193     MessageBuilder temp(std::move(other));
194     Swap(temp);
195     return *this;
196   }
197 
Swap(MessageBuilder & other)198   void Swap(MessageBuilder &other) {
199     slice_allocator_.swap(other.slice_allocator_);
200     FlatBufferBuilder::Swap(other);
201     // After swapping the FlatBufferBuilder, we swap back the allocator, which
202     // restores the original allocator back in place. This is necessary because
203     // MessageBuilder's allocator is its own member (SliceAllocatorMember). The
204     // allocator passed to FlatBufferBuilder::vector_downward must point to this
205     // member.
206     buf_.swap_allocator(other.buf_);
207   }
208 
209   // Releases the ownership of the buffer pointer.
210   // Returns the size, offset, and the original grpc_slice that
211   // allocated the buffer. Also see grpc_slice_unref().
ReleaseRaw(size_t & size,size_t & offset,::grpc::Slice & slice)212   uint8_t *ReleaseRaw(size_t &size, size_t &offset, ::grpc::Slice &slice) {
213     uint8_t *buf = FlatBufferBuilder::ReleaseRaw(size, offset);
214     slice = slice_allocator_.slice_;
215     slice_allocator_.slice_ = ::grpc::Slice();
216     return buf;
217   }
218 
~MessageBuilder()219   ~MessageBuilder() {}
220 
221   // GetMessage extracts the subslice of the buffer corresponding to the
222   // flatbuffers-encoded region and wraps it in a `Message<T>` to handle buffer
223   // ownership.
GetMessage()224   template<class T> Message<T> GetMessage() {
225     auto buf_data = buf_.scratch_data();  // pointer to memory
226     auto buf_size = buf_.capacity();      // size of memory
227     auto msg_data = buf_.data();          // pointer to msg
228     auto msg_size = buf_.size();          // size of msg
229     // Do some sanity checks on data/size
230     FLATBUFFERS_ASSERT(msg_data);
231     FLATBUFFERS_ASSERT(msg_size);
232     FLATBUFFERS_ASSERT(msg_data >= buf_data);
233     FLATBUFFERS_ASSERT(msg_data + msg_size <= buf_data + buf_size);
234     // Calculate offsets from the buffer start
235     auto begin = msg_data - buf_data;
236     auto end = begin + msg_size;
237     // Get the slice we are working with (no refcount change)
238     ::grpc::Slice slice = slice_allocator_.get_slice(buf_data, buf_size);
239     // Extract a subslice of the existing slice (increment refcount)
240     ::grpc::Slice subslice = slice.sub(begin, end);
241     // Wrap the subslice in a `Message<T>`, but don't increment refcount
242     Message<T> msg(subslice);
243     return msg;
244   }
245 
ReleaseMessage()246   template<class T> Message<T> ReleaseMessage() {
247     Message<T> msg = GetMessage<T>();
248     Reset();
249     return msg;
250   }
251 
252  private:
253   // SliceAllocator slice_allocator_;  // part of SliceAllocatorMember
254 };
255 
256 }  // namespace grpc
257 }  // namespace flatbuffers
258 
259 namespace grpc {
260 
261 template<class T> class SerializationTraits<flatbuffers::grpc::Message<T>> {
262  public:
Serialize(const flatbuffers::grpc::Message<T> & msg,ByteBuffer * buffer,bool * own_buffer)263   static grpc::Status Serialize(const flatbuffers::grpc::Message<T> &msg,
264                                 ByteBuffer *buffer, bool *own_buffer) {
265     // Package the single slice into a `ByteBuffer`,
266     // incrementing the refcount in the process.
267     *buffer = ByteBuffer(&msg.BorrowSlice(), 1);
268     *own_buffer = true;
269     return grpc::Status::OK;
270   }
271 
272   // Deserialize by pulling the
Deserialize(ByteBuffer * buf,flatbuffers::grpc::Message<T> * msg)273   static grpc::Status Deserialize(ByteBuffer *buf,
274                                   flatbuffers::grpc::Message<T> *msg) {
275     Slice slice;
276     if (!buf->TrySingleSlice(&slice).ok()) {
277       if (!buf->DumpToSingleSlice(&slice).ok()) {
278         buf->Clear();
279         return ::grpc::Status(::grpc::StatusCode::INTERNAL, "No payload");
280       }
281     }
282     *msg = flatbuffers::grpc::Message<T>(slice);
283     buf->Clear();
284 #if FLATBUFFERS_GRPC_DISABLE_AUTO_VERIFICATION
285     return ::grpc::Status::OK;
286 #else
287     if (msg->Verify()) {
288       return ::grpc::Status::OK;
289     } else {
290       return ::grpc::Status(::grpc::StatusCode::INTERNAL,
291                             "Message verification failed");
292     }
293 #endif
294   }
295 };
296 
297 }  // namespace grpc
298 
299 #endif  // FLATBUFFERS_GRPC_H_
300