1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H 20 #define GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <grpc/slice_buffer.h> 25 #include "src/core/lib/gprpp/orphanable.h" 26 #include "src/core/lib/iomgr/closure.h" 27 28 /** Internal bit flag for grpc_begin_message's \a flags signaling the use of 29 * compression for the message. (Does not apply for stream compression.) */ 30 #define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u) 31 /** Internal bit flag for determining whether the message was compressed and had 32 * to be decompressed by the message_decompress filter. (Does not apply for 33 * stream compression.) */ 34 #define GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED (0x40000000u) 35 /** Mask of all valid internal flags. */ 36 #define GRPC_WRITE_INTERNAL_USED_MASK \ 37 (GRPC_WRITE_INTERNAL_COMPRESS | GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED) 38 39 namespace grpc_core { 40 41 class ByteStream : public Orphanable { 42 public: ~ByteStream()43 ~ByteStream() override {} 44 45 // Returns true if the bytes are available immediately (in which case 46 // on_complete will not be called), or false if the bytes will be available 47 // asynchronously (in which case on_complete will be called when they 48 // are available). Should not be called if there is no data left on the 49 // stream. 50 // 51 // max_size_hint can be set as a hint as to the maximum number 52 // of bytes that would be acceptable to read. 53 virtual bool Next(size_t max_size_hint, grpc_closure* on_complete) = 0; 54 55 // Returns the next slice in the byte stream when it is available, as 56 // indicated by Next(). 57 // 58 // Once a slice is returned into *slice, it is owned by the caller. 59 virtual grpc_error* Pull(grpc_slice* slice) = 0; 60 61 // Shuts down the byte stream. 62 // 63 // If there is a pending call to on_complete from Next(), it will be 64 // invoked with the error passed to Shutdown(). 65 // 66 // The next call to Pull() (if any) will return the error passed to 67 // Shutdown(). 68 virtual void Shutdown(grpc_error* error) = 0; 69 length()70 uint32_t length() const { return length_; } flags()71 uint32_t flags() const { return flags_; } 72 set_flags(uint32_t flags)73 void set_flags(uint32_t flags) { flags_ = flags; } 74 75 protected: ByteStream(uint32_t length,uint32_t flags)76 ByteStream(uint32_t length, uint32_t flags) 77 : length_(length), flags_(flags) {} 78 79 private: 80 const uint32_t length_; 81 uint32_t flags_; 82 }; 83 84 // 85 // SliceBufferByteStream 86 // 87 // A ByteStream that wraps a slice buffer. 88 // 89 90 class SliceBufferByteStream : public ByteStream { 91 public: 92 // Removes all slices in slice_buffer, leaving it empty. 93 SliceBufferByteStream(grpc_slice_buffer* slice_buffer, uint32_t flags); 94 95 ~SliceBufferByteStream() override; 96 97 void Orphan() override; 98 99 bool Next(size_t max_size_hint, grpc_closure* on_complete) override; 100 grpc_error* Pull(grpc_slice* slice) override; 101 void Shutdown(grpc_error* error) override; 102 103 private: 104 grpc_error* shutdown_error_ = GRPC_ERROR_NONE; 105 grpc_slice_buffer backing_buffer_; 106 }; 107 108 // 109 // CachingByteStream 110 // 111 // A ByteStream that that wraps an underlying byte stream but caches 112 // the resulting slices in a slice buffer. If an initial attempt fails 113 // without fully draining the underlying stream, a new caching stream 114 // can be created from the same underlying cache, in which case it will 115 // return whatever is in the backing buffer before continuing to read the 116 // underlying stream. 117 // 118 // NOTE: No synchronization is done, so it is not safe to have multiple 119 // CachingByteStreams simultaneously drawing from the same underlying 120 // ByteStreamCache at the same time. 121 // 122 123 class ByteStreamCache { 124 public: 125 class CachingByteStream : public ByteStream { 126 public: 127 explicit CachingByteStream(ByteStreamCache* cache); 128 129 ~CachingByteStream() override; 130 131 void Orphan() override; 132 133 bool Next(size_t max_size_hint, grpc_closure* on_complete) override; 134 grpc_error* Pull(grpc_slice* slice) override; 135 void Shutdown(grpc_error* error) override; 136 137 // Resets the byte stream to the start of the underlying stream. 138 void Reset(); 139 140 private: 141 ByteStreamCache* cache_; 142 size_t cursor_ = 0; 143 size_t offset_ = 0; 144 grpc_error* shutdown_error_ = GRPC_ERROR_NONE; 145 }; 146 147 explicit ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream); 148 149 ~ByteStreamCache(); 150 151 // Must not be destroyed while still in use by a CachingByteStream. 152 void Destroy(); 153 cache_buffer()154 grpc_slice_buffer* cache_buffer() { return &cache_buffer_; } 155 156 private: 157 OrphanablePtr<ByteStream> underlying_stream_; 158 uint32_t length_; 159 uint32_t flags_; 160 grpc_slice_buffer cache_buffer_; 161 }; 162 163 } // namespace grpc_core 164 165 #endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */ 166