1 /* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPCPP_IMPL_CODEGEN_PROTO_BUFFER_WRITER_H 20 #define GRPCPP_IMPL_CODEGEN_PROTO_BUFFER_WRITER_H 21 22 #include <type_traits> 23 24 #include <grpc/impl/codegen/grpc_types.h> 25 #include <grpc/impl/codegen/slice.h> 26 #include <grpcpp/impl/codegen/byte_buffer.h> 27 #include <grpcpp/impl/codegen/config_protobuf.h> 28 #include <grpcpp/impl/codegen/core_codegen_interface.h> 29 #include <grpcpp/impl/codegen/serialization_traits.h> 30 #include <grpcpp/impl/codegen/status.h> 31 32 /// This header provides an object that writes bytes directly into a 33 /// grpc::ByteBuffer, via the ZeroCopyOutputStream interface 34 35 namespace grpc { 36 37 extern CoreCodegenInterface* g_core_codegen_interface; 38 39 // Forward declaration for testing use only 40 namespace internal { 41 class ProtoBufferWriterPeer; 42 } // namespace internal 43 44 const int kProtoBufferWriterMaxBufferLength = 1024 * 1024; 45 46 /// This is a specialization of the protobuf class ZeroCopyOutputStream. 47 /// The principle is to give the proto layer one buffer of bytes at a time 48 /// that it can use to serialize the next portion of the message, with the 49 /// option to "backup" if more buffer is given than required at the last buffer. 50 /// 51 /// Read more about ZeroCopyOutputStream interface here: 52 /// https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.zero_copy_stream#ZeroCopyOutputStream 53 class ProtoBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { 54 public: 55 /// Constructor for this derived class 56 /// 57 /// \param[out] byte_buffer A pointer to the grpc::ByteBuffer created 58 /// \param block_size How big are the chunks to allocate at a time 59 /// \param total_size How many total bytes are required for this proto ProtoBufferWriter(ByteBuffer * byte_buffer,int block_size,int total_size)60 ProtoBufferWriter(ByteBuffer* byte_buffer, int block_size, int total_size) 61 : block_size_(block_size), 62 total_size_(total_size), 63 byte_count_(0), 64 have_backup_(false) { 65 GPR_CODEGEN_ASSERT(!byte_buffer->Valid()); 66 /// Create an empty raw byte buffer and look at its underlying slice buffer 67 grpc_byte_buffer* bp = 68 g_core_codegen_interface->grpc_raw_byte_buffer_create(NULL, 0); 69 byte_buffer->set_buffer(bp); 70 slice_buffer_ = &bp->data.raw.slice_buffer; 71 } 72 ~ProtoBufferWriter()73 ~ProtoBufferWriter() { 74 if (have_backup_) { 75 g_core_codegen_interface->grpc_slice_unref(backup_slice_); 76 } 77 } 78 79 /// Give the proto library the next buffer of bytes and its size. It is 80 /// safe for the caller to write from data[0, size - 1]. Next(void ** data,int * size)81 bool Next(void** data, int* size) override { 82 // Protobuf should not ask for more memory than total_size_. 83 GPR_CODEGEN_ASSERT(byte_count_ < total_size_); 84 // 1. Use the remaining backup slice if we have one 85 // 2. Otherwise allocate a slice, up to the remaining length needed 86 // or our maximum allocation size 87 // 3. Provide the slice start and size available 88 // 4. Add the slice being returned to the slice buffer 89 size_t remain = total_size_ - byte_count_; 90 if (have_backup_) { 91 /// If we have a backup slice, we should use it first 92 slice_ = backup_slice_; 93 have_backup_ = false; 94 if (GRPC_SLICE_LENGTH(slice_) > remain) { 95 GRPC_SLICE_SET_LENGTH(slice_, remain); 96 } 97 } else { 98 // When less than a whole block is needed, only allocate that much. 99 // But make sure the allocated slice is not inlined. 100 size_t allocate_length = 101 remain > static_cast<size_t>(block_size_) ? block_size_ : remain; 102 slice_ = g_core_codegen_interface->grpc_slice_malloc( 103 allocate_length > GRPC_SLICE_INLINED_SIZE 104 ? allocate_length 105 : GRPC_SLICE_INLINED_SIZE + 1); 106 } 107 *data = GRPC_SLICE_START_PTR(slice_); 108 // On win x64, int is only 32bit 109 GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX); 110 byte_count_ += * size = (int)GRPC_SLICE_LENGTH(slice_); 111 g_core_codegen_interface->grpc_slice_buffer_add(slice_buffer_, slice_); 112 return true; 113 } 114 115 /// Backup by \a count bytes because Next returned more bytes than needed 116 /// (only used in the last buffer). \a count must be less than or equal too 117 /// the last buffer returned from next. BackUp(int count)118 void BackUp(int count) override { 119 /// 1. Remove the partially-used last slice from the slice buffer 120 /// 2. Split it into the needed (if any) and unneeded part 121 /// 3. Add the needed part back to the slice buffer 122 /// 4. Mark that we still have the remaining part (for later use/unref) 123 GPR_CODEGEN_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(slice_))); 124 g_core_codegen_interface->grpc_slice_buffer_pop(slice_buffer_); 125 if ((size_t)count == GRPC_SLICE_LENGTH(slice_)) { 126 backup_slice_ = slice_; 127 } else { 128 backup_slice_ = g_core_codegen_interface->grpc_slice_split_tail( 129 &slice_, GRPC_SLICE_LENGTH(slice_) - count); 130 g_core_codegen_interface->grpc_slice_buffer_add(slice_buffer_, slice_); 131 } 132 // It's dangerous to keep an inlined grpc_slice as the backup slice, since 133 // on a following Next() call, a reference will be returned to this slice 134 // via GRPC_SLICE_START_PTR, which will not be an address held by 135 // slice_buffer_. 136 have_backup_ = backup_slice_.refcount != NULL; 137 byte_count_ -= count; 138 } 139 140 /// Returns the total number of bytes written since this object was created. ByteCount()141 grpc::protobuf::int64 ByteCount() const override { return byte_count_; } 142 143 // These protected members are needed to support internal optimizations. 144 // they expose internal bits of grpc core that are NOT stable. If you have 145 // a use case needs to use one of these functions, please send an email to 146 // https://groups.google.com/forum/#!forum/grpc-io. 147 protected: slice_buffer()148 grpc_slice_buffer* slice_buffer() { return slice_buffer_; } set_byte_count(int64_t byte_count)149 void set_byte_count(int64_t byte_count) { byte_count_ = byte_count; } 150 151 private: 152 // friend for testing purposes only 153 friend class internal::ProtoBufferWriterPeer; 154 const int block_size_; ///< size to alloc for each new \a grpc_slice needed 155 const int total_size_; ///< byte size of proto being serialized 156 int64_t byte_count_; ///< bytes written since this object was created 157 grpc_slice_buffer* 158 slice_buffer_; ///< internal buffer of slices holding the serialized data 159 bool have_backup_; ///< if we are holding a backup slice or not 160 grpc_slice backup_slice_; ///< holds space we can still write to, if the 161 ///< caller has called BackUp 162 grpc_slice slice_; ///< current slice passed back to the caller 163 }; 164 165 } // namespace grpc 166 167 #endif // GRPCPP_IMPL_CODEGEN_PROTO_BUFFER_WRITER_H 168