1 /* 2 * Copyright 2014 Google Inc. All rights reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef FLATBUFFERS_GRPC_H_ 18 #define FLATBUFFERS_GRPC_H_ 19 20 // Helper functionality to glue FlatBuffers and GRPC. 21 22 #include "flatbuffers/flatbuffers.h" 23 #include "grpcpp/support/byte_buffer.h" 24 #include "grpcpp/support/slice.h" 25 26 namespace flatbuffers { 27 namespace grpc { 28 29 // Message is a typed wrapper around a buffer that manages the underlying 30 // `grpc_slice` and also provides flatbuffers-specific helpers such as `Verify` 31 // and `GetRoot`. Since it is backed by a `grpc_slice`, the underlying buffer 32 // is refcounted and ownership is be managed automatically. 33 template<class T> class Message { 34 public: Message()35 Message() {} 36 Message(::grpc::Slice slice)37 Message(::grpc::Slice slice) : slice_(slice) {} 38 39 Message &operator=(const Message &other) = delete; 40 41 Message(Message &&other) = default; 42 43 Message(const Message &other) = delete; 44 45 Message &operator=(Message &&other) = default; 46 mutable_data()47 const uint8_t *mutable_data() const { return slice_.begin(); } 48 data()49 const uint8_t *data() const { return slice_.begin(); } 50 size()51 size_t size() const { return slice_.size(); } 52 Verify()53 bool Verify() const { 54 Verifier verifier(data(), size()); 55 return verifier.VerifyBuffer<T>(nullptr); 56 } 57 GetMutableRoot()58 T *GetMutableRoot() { return flatbuffers::GetMutableRoot<T>(mutable_data()); } 59 GetRoot()60 const T *GetRoot() const { return flatbuffers::GetRoot<T>(data()); } 61 62 // This is only intended for serializer use, or if you know what you're doing BorrowSlice()63 const ::grpc::Slice &BorrowSlice() const { return slice_; } 64 65 private: 66 ::grpc::Slice slice_; 67 }; 68 69 class MessageBuilder; 70 71 // SliceAllocator is a gRPC-specific allocator that uses the `grpc_slice` 72 // refcounted slices to manage memory ownership. This makes it easy and 73 // efficient to transfer buffers to gRPC. 74 class SliceAllocator : public Allocator { 75 public: SliceAllocator()76 SliceAllocator() {} 77 78 SliceAllocator(const SliceAllocator &other) = delete; 79 SliceAllocator &operator=(const SliceAllocator &other) = delete; 80 SliceAllocator(SliceAllocator && other)81 SliceAllocator(SliceAllocator &&other) { 82 // default-construct and swap idiom 83 swap(other); 84 } 85 86 SliceAllocator &operator=(SliceAllocator &&other) { 87 // move-construct and swap idiom 88 SliceAllocator temp(std::move(other)); 89 swap(temp); 90 return *this; 91 } 92 swap(SliceAllocator & other)93 void swap(SliceAllocator &other) { 94 using std::swap; 95 swap(slice_, other.slice_); 96 } 97 ~SliceAllocator()98 virtual ~SliceAllocator() {} 99 allocate(size_t size)100 virtual uint8_t *allocate(size_t size) override { 101 FLATBUFFERS_ASSERT(slice_.size() == 0); 102 slice_ = ::grpc::Slice(size); 103 return const_cast<uint8_t *>(slice_.begin()); 104 } 105 deallocate(uint8_t * p,size_t size)106 virtual void deallocate(uint8_t *p, size_t size) override { 107 FLATBUFFERS_ASSERT(p == slice_.begin()); 108 FLATBUFFERS_ASSERT(size == slice_.size()); 109 slice_ = ::grpc::Slice(); 110 } 111 reallocate_downward(uint8_t * old_p,size_t old_size,size_t new_size,size_t in_use_back,size_t in_use_front)112 virtual uint8_t *reallocate_downward(uint8_t *old_p, size_t old_size, 113 size_t new_size, size_t in_use_back, 114 size_t in_use_front) override { 115 FLATBUFFERS_ASSERT(old_p == slice_.begin()); 116 FLATBUFFERS_ASSERT(old_size == slice_.size()); 117 FLATBUFFERS_ASSERT(new_size > old_size); 118 ::grpc::Slice old_slice = slice_; 119 ::grpc::Slice new_slice = ::grpc::Slice(new_size); 120 uint8_t *new_p = const_cast<uint8_t *>(new_slice.begin()); 121 memcpy_downward(old_p, old_size, new_p, new_size, in_use_back, 122 in_use_front); 123 slice_ = new_slice; 124 return new_p; 125 } 126 127 private: get_slice(uint8_t * p,size_t size)128 ::grpc::Slice &get_slice(uint8_t *p, size_t size) { 129 FLATBUFFERS_ASSERT(p == slice_.begin()); 130 FLATBUFFERS_ASSERT(size == slice_.size()); 131 return slice_; 132 } 133 134 ::grpc::Slice slice_; 135 136 friend class MessageBuilder; 137 }; 138 139 // SliceAllocatorMember is a hack to ensure that the MessageBuilder's 140 // slice_allocator_ member is constructed before the FlatBufferBuilder, since 141 // the allocator is used in the FlatBufferBuilder ctor. 142 namespace detail { 143 struct SliceAllocatorMember { 144 SliceAllocator slice_allocator_; 145 }; 146 } // namespace detail 147 148 // MessageBuilder is a gRPC-specific FlatBufferBuilder that uses SliceAllocator 149 // to allocate gRPC buffers. 150 class MessageBuilder : private detail::SliceAllocatorMember, 151 public FlatBufferBuilder { 152 public: 153 explicit MessageBuilder(uoffset_t initial_size = 1024) 154 : FlatBufferBuilder(initial_size, &slice_allocator_, false) {} 155 156 MessageBuilder(const MessageBuilder &other) = delete; 157 MessageBuilder &operator=(const MessageBuilder &other) = delete; 158 MessageBuilder(MessageBuilder && other)159 MessageBuilder(MessageBuilder &&other) 160 : FlatBufferBuilder(1024, &slice_allocator_, false) { 161 // Default construct and swap idiom. 162 Swap(other); 163 } 164 165 /// Create a MessageBuilder from a FlatBufferBuilder. 166 explicit MessageBuilder(FlatBufferBuilder &&src, 167 void (*dealloc)(void *, 168 size_t) = &DefaultAllocator::dealloc) 169 : FlatBufferBuilder(1024, &slice_allocator_, false) { 170 src.Swap(*this); 171 src.SwapBufAllocator(*this); 172 if (buf_.capacity()) { 173 uint8_t *buf = buf_.scratch_data(); // pointer to memory 174 size_t capacity = buf_.capacity(); // size of memory 175 slice_allocator_.slice_ = ::grpc::Slice(buf, capacity, dealloc); 176 } else { 177 slice_allocator_.slice_ = ::grpc::Slice(); 178 } 179 } 180 181 /// Move-assign a FlatBufferBuilder to a MessageBuilder. 182 /// Only FlatBufferBuilder with default allocator (basically, nullptr) is 183 /// supported. 184 MessageBuilder &operator=(FlatBufferBuilder &&src) { 185 // Move construct a temporary and swap 186 MessageBuilder temp(std::move(src)); 187 Swap(temp); 188 return *this; 189 } 190 191 MessageBuilder &operator=(MessageBuilder &&other) { 192 // Move construct a temporary and swap 193 MessageBuilder temp(std::move(other)); 194 Swap(temp); 195 return *this; 196 } 197 Swap(MessageBuilder & other)198 void Swap(MessageBuilder &other) { 199 slice_allocator_.swap(other.slice_allocator_); 200 FlatBufferBuilder::Swap(other); 201 // After swapping the FlatBufferBuilder, we swap back the allocator, which 202 // restores the original allocator back in place. This is necessary because 203 // MessageBuilder's allocator is its own member (SliceAllocatorMember). The 204 // allocator passed to FlatBufferBuilder::vector_downward must point to this 205 // member. 206 buf_.swap_allocator(other.buf_); 207 } 208 209 // Releases the ownership of the buffer pointer. 210 // Returns the size, offset, and the original grpc_slice that 211 // allocated the buffer. Also see grpc_slice_unref(). ReleaseRaw(size_t & size,size_t & offset,::grpc::Slice & slice)212 uint8_t *ReleaseRaw(size_t &size, size_t &offset, ::grpc::Slice &slice) { 213 uint8_t *buf = FlatBufferBuilder::ReleaseRaw(size, offset); 214 slice = slice_allocator_.slice_; 215 slice_allocator_.slice_ = ::grpc::Slice(); 216 return buf; 217 } 218 ~MessageBuilder()219 ~MessageBuilder() {} 220 221 // GetMessage extracts the subslice of the buffer corresponding to the 222 // flatbuffers-encoded region and wraps it in a `Message<T>` to handle buffer 223 // ownership. GetMessage()224 template<class T> Message<T> GetMessage() { 225 auto buf_data = buf_.scratch_data(); // pointer to memory 226 auto buf_size = buf_.capacity(); // size of memory 227 auto msg_data = buf_.data(); // pointer to msg 228 auto msg_size = buf_.size(); // size of msg 229 // Do some sanity checks on data/size 230 FLATBUFFERS_ASSERT(msg_data); 231 FLATBUFFERS_ASSERT(msg_size); 232 FLATBUFFERS_ASSERT(msg_data >= buf_data); 233 FLATBUFFERS_ASSERT(msg_data + msg_size <= buf_data + buf_size); 234 // Calculate offsets from the buffer start 235 auto begin = msg_data - buf_data; 236 auto end = begin + msg_size; 237 // Get the slice we are working with (no refcount change) 238 ::grpc::Slice slice = slice_allocator_.get_slice(buf_data, buf_size); 239 // Extract a subslice of the existing slice (increment refcount) 240 ::grpc::Slice subslice = slice.sub(begin, end); 241 // Wrap the subslice in a `Message<T>`, but don't increment refcount 242 Message<T> msg(subslice); 243 return msg; 244 } 245 ReleaseMessage()246 template<class T> Message<T> ReleaseMessage() { 247 Message<T> msg = GetMessage<T>(); 248 Reset(); 249 return msg; 250 } 251 252 private: 253 // SliceAllocator slice_allocator_; // part of SliceAllocatorMember 254 }; 255 256 } // namespace grpc 257 } // namespace flatbuffers 258 259 namespace grpc { 260 261 template<class T> class SerializationTraits<flatbuffers::grpc::Message<T>> { 262 public: Serialize(const flatbuffers::grpc::Message<T> & msg,ByteBuffer * buffer,bool * own_buffer)263 static grpc::Status Serialize(const flatbuffers::grpc::Message<T> &msg, 264 ByteBuffer *buffer, bool *own_buffer) { 265 // Package the single slice into a `ByteBuffer`, 266 // incrementing the refcount in the process. 267 *buffer = ByteBuffer(&msg.BorrowSlice(), 1); 268 *own_buffer = true; 269 return grpc::Status::OK; 270 } 271 272 // Deserialize by pulling the Deserialize(ByteBuffer * buf,flatbuffers::grpc::Message<T> * msg)273 static grpc::Status Deserialize(ByteBuffer *buf, 274 flatbuffers::grpc::Message<T> *msg) { 275 Slice slice; 276 if (!buf->TrySingleSlice(&slice).ok()) { 277 if (!buf->DumpToSingleSlice(&slice).ok()) { 278 buf->Clear(); 279 return ::grpc::Status(::grpc::StatusCode::INTERNAL, "No payload"); 280 } 281 } 282 *msg = flatbuffers::grpc::Message<T>(slice); 283 buf->Clear(); 284 #if FLATBUFFERS_GRPC_DISABLE_AUTO_VERIFICATION 285 return ::grpc::Status::OK; 286 #else 287 if (msg->Verify()) { 288 return ::grpc::Status::OK; 289 } else { 290 return ::grpc::Status(::grpc::StatusCode::INTERNAL, 291 "Message verification failed"); 292 } 293 #endif 294 } 295 }; 296 297 } // namespace grpc 298 299 #endif // FLATBUFFERS_GRPC_H_ 300